Source code for piel.experimental.devices.DPO73304.extract

import pandas as pd
import logging

from piel.units import match_unit_abbreviation, prefix2int
from piel.types.experimental import (
    PropagationDelayMeasurement,
    PropagationDelayMeasurementCollection,
    PropagationDelayMeasurementDataCollection,
    PropagationDelayMeasurementData,
    OscilloscopeMeasurement,
    OscilloscopeMeasurementDataCollection,
    OscilloscopeMeasurementData,
)
from piel.types import (
    TimeSignalData,
    MultiTimeSignalData,
    PathTypes,
    ScalarMetric,
    ScalarMetricCollection,
)
from piel.file_system import return_path
from .types import ParsedColumnInfo


logger = logging.getLogger(__name__)


[docs] def extract_measurement_to_dataframe(file: PathTypes) -> pd.DataFrame: """ Extracts the measurement files from a csv file and returns it as a pandas dataframe. Parameters ---------- file : PathTypes The path to the csv file. Returns ------- pd.DataFrame The measurement files as a pandas dataframe. """ # TODO write here functionality to validate the file exists and it is a csv file in a particular structure compatible with a measurement. # TODO sort out actual measurement information dataframe = pd.read_csv( file, names=tuple( [ "value", "mean", "min", "max", "standard_deviation", "count", "name1", "name2", "name3", ] ), ) try: dataframe["count"] = prefix2int(dataframe["count"].values[0]) except ValueError: logger.debug(f"Converting count measurement failed at dataframe: {dataframe}") # Merge the name columns into a single column called 'name' dataframe["name"] = dataframe[["name1", "name2", "name3"]].apply( lambda x: " ".join(x.dropna().astype(str)), axis=1 ) # Drop the original name columns dataframe = dataframe.drop(columns=["name1", "name2", "name3"]) # Convert all spaces to underscores in the 'name' column dataframe["name"] = dataframe["name"].str.replace(" ", "_") dataframe["name"] = dataframe["name"].str.replace("(", "_") dataframe["name"] = dataframe["name"].str.replace(")", "_") # Handle duplicate names by adding a prefix dataframe["name"] = dataframe["name"].apply(lambda x: x.lower()) name_counts = dataframe["name"].value_counts() duplicates = name_counts[name_counts > 1].index for dup in duplicates: duplicate_indices = dataframe[dataframe["name"] == dup].index for i, idx in enumerate(duplicate_indices, 1): dataframe.at[idx, "name"] = f"{dup}_{i}" return dataframe
[docs] def extract_waveform_to_dataframe(file: PathTypes) -> pd.DataFrame: """ Extracts the waveform files from a csv file and returns it as a pandas dataframe. Parameters ---------- file : PathTypes The path to the csv file. Returns ------- pd.DataFrame The waveform files as a pandas dataframe. """ # TODO write here functionality to validate the file exists and it is a csv file in a particular structure return pd.read_csv(file, header=0, names=["time_s", "voltage_V"], usecols=[3, 4])
[docs] def extract_to_data_time_signal( file: PathTypes, ) -> TimeSignalData: """ Extracts the waveform files from a csv file and returns it as a DataTimeSignal that can be used to analyse the signal with other methods. Parameters ---------- file : PathTypes The path to the csv file. Returns ------- TimeSignalData The waveform files as a DataTimeSignal. """ logger.debug(f"Extracting waveform from file: {file}") dataframe = extract_waveform_to_dataframe(file) data_time_signal = TimeSignalData( time_s=dataframe.time_s.values, data=dataframe.voltage_V.values, data_name="voltage_V", ) return data_time_signal
[docs] def extract_propagation_delay_data_from_measurement( propagation_delay_measurement: PropagationDelayMeasurement, ) -> PropagationDelayMeasurementData: propagation_delay_measurement_i = propagation_delay_measurement data_i = dict() data_i["name"] = propagation_delay_measurement.name if hasattr(propagation_delay_measurement_i, "measurements_file"): try: file = propagation_delay_measurement_i.measurements_file file = return_path(file) if not file.exists(): # Try appending to parent directory if file does not exist file = ( propagation_delay_measurement_i.parent_directory / propagation_delay_measurement_i.measurements_file ) data_i["measurements"] = extract_to_signal_measurement(file) except Exception as e: file = propagation_delay_measurement_i.measurements_file logger.debug( f"Failed to extract propagation delay measurement from measurement file: {file}, exception {e}" ) if hasattr(propagation_delay_measurement_i, "reference_waveform_file"): try: file = propagation_delay_measurement_i.reference_waveform_file file = return_path(file) if not file.exists(): # Try appending to parent directory if file does not exist file = ( propagation_delay_measurement_i.parent_directory / propagation_delay_measurement_i.reference_waveform_file ) data_i["reference_waveform"] = extract_to_data_time_signal(file) except Exception as e: file = propagation_delay_measurement_i.reference_waveform_file logger.debug( f"Failed to extract reference waveform from reference_waveform_file file: {file}, exception {e}" ) if hasattr(propagation_delay_measurement_i, "dut_waveform_file"): try: file = propagation_delay_measurement_i.dut_waveform_file file = return_path(file) if not file.exists(): # Try appending to parent directory if file does not exist file = ( propagation_delay_measurement_i.parent_directory / propagation_delay_measurement_i.dut_waveform_file ) data_i["dut_waveform"] = extract_to_data_time_signal(file) except Exception as e: file = propagation_delay_measurement_i.dut_waveform_file logger.debug( f"Failed to extract dut waveform from dut_waveform_file file: {file}, exception {e}" ) return PropagationDelayMeasurementData(**data_i)
[docs] def extract_propagation_delay_measurement_sweep_data( propagation_delay_measurement_sweep: PropagationDelayMeasurementCollection, ) -> PropagationDelayMeasurementDataCollection: """ This function is used to extract the relevant measurement files amd relate them to the sweep parameter. Because this function extracts multi-index files then we use xarray to analyze this files more clearly. It aims to extract all the files in the sweep file collection. """ measurement_sweep_data = list() for ( propagation_delay_measurement_i ) in propagation_delay_measurement_sweep.collection: measurement_data_i = extract_propagation_delay_data_from_measurement( propagation_delay_measurement_i ) measurement_sweep_data.append(measurement_data_i) measurement_data_collection = PropagationDelayMeasurementDataCollection( name=propagation_delay_measurement_sweep.name, collection=measurement_sweep_data, ) return measurement_data_collection
[docs] def extract_oscilloscope_data_from_measurement( oscilloscope_measurement: OscilloscopeMeasurement, ) -> OscilloscopeMeasurementData: logger.debug( f"Extracting oscilloscope data from measurement: {oscilloscope_measurement}" ) data_i = dict() data_i["name"] = oscilloscope_measurement.name # Extracting measurements from the measurements file if it exists if hasattr(oscilloscope_measurement, "measurements_file"): file = oscilloscope_measurement.measurements_file file = return_path(file) if not file.exists(): # Try appending to parent directory if file does not exist file = oscilloscope_measurement.parent_directory / file if file.exists(): data_i["measurements"] = extract_to_signal_measurement(file) else: raise FileNotFoundError(f"Measurements file {file} does not exist.") else: data_i["measurements"] = None # Or some default value if needed # Extracting data from waveform files waveform_data_list = [] for waveform_file in oscilloscope_measurement.waveform_file_list: file = return_path(waveform_file) if not file.exists(): # Try appending to parent directory if file does not exist file = oscilloscope_measurement.parent_directory / waveform_file if file.exists(): waveform_data = extract_to_data_time_signal(file) waveform_data_list.append(waveform_data) else: raise FileNotFoundError(f"Waveform file {file} does not exist.") data_i["waveform_list"] = waveform_data_list return OscilloscopeMeasurementData(**data_i)
[docs] def extract_oscilloscope_measurement_data_collection( oscilloscope_measurement_data: OscilloscopeMeasurementData, ) -> OscilloscopeMeasurementDataCollection: # TODO write this. """ This function is used to extract the relevant measurement files amd relate them to the sweep parameter. Because this function extracts multi-index files then we use xarray to analyze this files more clearly. It aims to extract all the files in the sweep file collection. """ measurement_data_collection_raw = list() for oscilloscope_measurement_i in oscilloscope_measurement_data.collection: measurement_data_i = extract_oscilloscope_data_from_measurement( oscilloscope_measurement_i ) measurement_data_collection_raw.append(measurement_data_i) measurement_data_collection = OscilloscopeMeasurementDataCollection( name=oscilloscope_measurement_data.name, collection=measurement_data_collection_raw, ) return measurement_data_collection
[docs] def extract_to_signal_measurement(file: PathTypes, **kwargs) -> ScalarMetricCollection: """ Extracts the measurement files from a csv file and returns it as a SignalMeasurement that can be used to analyse the signal. Parameters ---------- file : PathTypes Returns ------- SignalMetricsMeasurementCollection : dict[str, SignalMetricsData] """ logger.debug(f"Extracting signal measurement from file: {file}") dataframe = extract_measurement_to_dataframe(file) metrics_list = list() for i, row in dataframe.iterrows(): row = row.copy() metrics_information = ParsedColumnInfo() try: metrics_information = parse_column_name(row["name"]) except Exception as e: logger.debug(f"Failed to parse column {i} index with error: %s", e) pass row["raw_name"] = row["name"] # del row["name"] metrics_i = ScalarMetric( # name=metrics_information.analysis_type, unit=metrics_information.unit, attrs={"raw_name": row["raw_name"]}, **row, ) metrics_list.append(metrics_i) return ScalarMetricCollection(metrics=metrics_list, **kwargs)
[docs] def combine_channel_data( channel_file: list[PathTypes], ) -> MultiTimeSignalData: """ Extracts the waveform files from a list of csv files and returns it as a MultiTimeSignalData that can be used to analyse the signals together. Parameters ---------- channel_file : list[PathTypes] The list of paths to the csv files. Returns ------- MultiTimeSignalData The waveform files as a MultiTimeSignalData. """ multi_channel_data_time_signals = list() for file in channel_file: data_time_signal_i = extract_to_data_time_signal(file) multi_channel_data_time_signals.append(data_time_signal_i) return multi_channel_data_time_signals
[docs] def parse_column_name(name: str) -> ParsedColumnInfo: """ Parses a column name to extract the analysis type, unit, and channel information. Expected column name format: <analysis_type>_<channels>__<unit>[_<index>] Examples: 'delay_ch1_ch2__s_1' -> analysis_type='delay', channels='ch1_ch2', unit='seconds', index=1 'pk-pk_ch2__v' -> analysis_type='peak_to_peak', channels='ch2', unit='V' 'neg._duty_cyc_ch2__%' -> analysis_type='negative_duty_cycle', channels='ch2', unit='percent' 'amplitude_ch2__v' -> analysis_type='amplitude', channels='ch2', unit='V' Parameters: name (str): The column name to parse. Returns: ParsedColumnInfo: An object containing the extracted information. Raises: ValueError: If the column name does not match the expected pattern or contains an unknown analysis type. """ import re # Mapping from analysis type in column name to standardized analysis types analysis_type_map = { "mean": "mean", "pk-pk": "peak_to_peak", "peak_to_peak": "peak_to_peak", "delay": "delay", "rise_time": "rise_time", "fall_time": "fall_time", "amplitude": "amplitude", "neg._duty_cyc": "negative_duty_cycle", "negative_duty_cycle": "negative_duty_cycle", # Add more mappings as necessary } # Regular expression pattern to parse the column name # Pattern breakdown: # ^(?P<analysis_type>[a-zA-Z\._\-]+) : Starts with analysis_type (letters, dots, underscores, hyphens) # _(?P<channels>ch\d+(?:_ch\d+)*) : Followed by channels like ch1, ch1_ch2, etc. # __(?P<unit>[%a-zA-Z]+) : Double underscore followed by unit (including %) # (?:_(?P<index>\d+))?$ : Optional single underscore and index number at the end pattern = r"^(?P<analysis_type>[a-zA-Z\._\-]+)_(?P<channels>ch\d+(?:_ch\d+)*)__(?P<unit>[%a-zA-Z]+)(?:_(?P<index>\d+))?$" match = re.match(pattern, name) if not match: raise ValueError(f"Column name '{name}' does not match the expected pattern.") analysis_type_key = match.group("analysis_type") analysis_type = analysis_type_map.get(analysis_type_key.lower()) if analysis_type is None: raise ValueError( f"Unknown analysis type '{analysis_type_key}' in column name '{name}'." ) channels = match.group("channels") unit = match.group("unit") unit = match_unit_abbreviation(unit_str=unit) index_str = match.group("index") index = int(index_str) if index_str is not None else None return ParsedColumnInfo( analysis_type=analysis_type, unit=unit, channels=channels, index=index )