Source code for piel.analysis.signals.time.core.metrics

import numpy as np
from piel.types import (
    MultiTimeSignalData,
    ScalarMetric,
    EdgeTransitionAnalysisTypes,
    ScalarMetricCollection,
)
from piel.types.units import V
from piel.analysis.metrics import aggregate_scalar_metrics_collection


[docs] def extract_mean_metrics_list( multi_data_time_signal: MultiTimeSignalData, **kwargs ) -> ScalarMetricCollection: """ Extracts scalar metrics from a collection of rising edge signals. Standard deviation is not calculated as this just computes individual metrics list. Args: multi_data_time_signal (List[TimeSignalData]): A list of rising edge signals. Returns: ScalarMetricCollection: A collection of ScalarMetric instances containing the extracted metrics. """ if not multi_data_time_signal: raise ValueError("The multi_signal list is empty.") if kwargs.get("unit") is None: kwargs["unit"] = V metrics_list = [] for signal in multi_data_time_signal: if not signal.data: raise ValueError(f"Signal '{signal.data_name}' has an empty data array.") data_array = np.array(signal.data) mean_val = float(np.mean(data_array)) min_val = float(np.min(data_array)) max_val = float(np.max(data_array)) std_dev = None count = None # Assuming 'value' is the mean; adjust if different meaning is intended scalar_metric = ScalarMetric( value=mean_val, mean=mean_val, min=min_val, max=max_val, standard_deviation=std_dev, count=count, ) metrics_list.append(scalar_metric) return ScalarMetricCollection(metrics=metrics_list, **kwargs)
[docs] def extract_peak_to_peak_metrics_list( multi_data_time_signal: MultiTimeSignalData, metric_kwargs_list: list[dict] = None, **kwargs, ) -> ScalarMetricCollection: """ Extracts peak-to-peak metrics from a collection of signals. The peak-to-peak value is defined as the difference between the maximum and minimum values of the signal. Args: multi_data_time_signal (MultiTimeSignalData): A collection of time signals to analyze. Returns: ScalarMetricCollection: A collection of ScalarMetric instances containing the peak-to-peak values for each signal. Raises: ValueError: If the input list is empty or any signal has an empty data array. """ if not multi_data_time_signal: raise ValueError("The multi_data_time_signal list is empty.") if metric_kwargs_list is None: metric_kwargs_list = [dict().copy() for _ in range(len(multi_data_time_signal))] if kwargs.get("unit") is None: kwargs["unit"] = V metrics_list = [] i = 0 for signal in multi_data_time_signal: if not signal.data: raise ValueError(f"Signal '{signal.data_name}' has an empty data array.") data_array = np.array(signal.data) min_val = float(np.min(data_array)) max_val = float(np.max(data_array)) peak_to_peak = max_val - min_val scalar_metric = ScalarMetric( value=peak_to_peak, # Using peak-to-peak as the primary value mean=peak_to_peak, # Mean is not applicable for peak-to-peak min=peak_to_peak, # Min is already represented in peak-to-peak max=peak_to_peak, # Max is already represented in peak-to-peak standard_deviation=None, # Not applicable count=None, # Not applicable unit=kwargs.get("unit"), **metric_kwargs_list[i], ) metrics_list.append(scalar_metric) i += 1 return ScalarMetricCollection(metrics=metrics_list, **kwargs)
[docs] def extract_multi_time_signal_statistical_metrics( multi_data_time_signal: MultiTimeSignalData, analysis_type: EdgeTransitionAnalysisTypes = "peak_to_peak", **kwargs, ) -> ScalarMetric: """ Extracts scalar metrics from a collection of rising edge signals. Args: multi_data_time_signal (List[TimeSignalData]): A list of rising edge signals. analysis_type (piel.types.EdgeTransitionAnalysisTypes): The type of analysis to perform. Returns: ScalarMetric: Aggregated ScalarMetrics instance containing the extracted metrics. """ if analysis_type == "mean": metrics_list = extract_mean_metrics_list(multi_data_time_signal, **kwargs) elif analysis_type == "peak_to_peak": metrics_list = extract_peak_to_peak_metrics_list( multi_data_time_signal, **kwargs ) else: raise TypeError( f"Undefined analysis type. Current options are: {str(EdgeTransitionAnalysisTypes)}. Feel free to contribute to this." ) aggregate_metrics = aggregate_scalar_metrics_collection(metrics_list) return aggregate_metrics
[docs] def extract_statistical_metrics_collection( multi_data_time_signal: MultiTimeSignalData, analysis_types: list[EdgeTransitionAnalysisTypes], **kwargs, ) -> ScalarMetricCollection: """ Extracts a collection of scalar metrics from a collection of rising edge signals based on multiple analysis types. Args: multi_data_time_signal (MultiTimeSignalData): A collection of rising edge signals. analysis_types (list[EdgeTransitionAnalysisTypes], optional): The types of analyses to perform. Defaults to ["peak_to_peak"]. Returns: ScalarMetricCollection: A collection of aggregated ScalarMetric instances for each analysis type. """ if not isinstance(analysis_types, list): raise TypeError( f"analysis_types must be a list of EdgeTransitionAnalysisTypes: {EdgeTransitionAnalysisTypes}." ) metrics_list = list() for analysis in analysis_types: aggregated_metrics = extract_multi_time_signal_statistical_metrics( multi_data_time_signal, analysis_type=analysis ) metrics_list.append(aggregated_metrics) return ScalarMetricCollection(metrics=metrics_list, **kwargs)