Source code for piel.analysis.signals.time.core.remove
from piel.types import TimeSignalData
import numpy as np
[docs]
def remove_before_first_rising_edge(
waveform: TimeSignalData,
lower_threshold_ratio: float = 0.1,
upper_threshold_ratio: float = 0.9,
) -> TimeSignalData:
"""
Removes all data points before the first rising edge in the waveform.
A rising edge is defined as the point where the signal transitions from below the lower
threshold to above the upper threshold.
Parameters:
waveform (TimeSignalData): The input waveform data.
lower_threshold_ratio (float): Lower threshold as a ratio of the amplitude range.
upper_threshold_ratio (float): Upper threshold as a ratio of the amplitude range.
Returns:
TimeSignalData: A new waveform with data points before the first rising edge removed.
Raises:
ValueError: If no rising edge is found in the waveform.
"""
# Convert time and data to numpy arrays for efficient computation
time = np.array(waveform.time_s)
data = np.array(waveform.data)
# Validate input lengths
if len(time) != len(data):
raise ValueError("Time and data arrays must have the same length.")
# Calculate amplitude range
data_min = np.min(data)
data_max = np.max(data)
amplitude_range = data_max - data_min
if amplitude_range == 0:
raise ValueError("Signal has zero amplitude range; cannot detect rising edge.")
# Define thresholds based on the amplitude range
lower_threshold = data_min + amplitude_range * lower_threshold_ratio
upper_threshold = data_min + amplitude_range * upper_threshold_ratio
# Identify indices where signal crosses the lower threshold
below_lower = data < lower_threshold
above_lower = data >= lower_threshold
# Detect transitions from below_lower to above_lower
rising_cross_lower = np.where(below_lower[:-1] & above_lower[1:])[0] + 1
# Iterate through potential rising edges to find the first that crosses the upper threshold
rising_edge_idx = None
for idx in rising_cross_lower:
if data[idx] >= upper_threshold:
rising_edge_idx = idx
break
# Alternatively, find the exact crossing point using interpolation
crossing_indices = np.where(data[idx:] >= upper_threshold)[0]
if crossing_indices.size > 0:
rising_edge_idx = idx + crossing_indices[0]
break
if rising_edge_idx is None:
raise ValueError("No rising edge found that crosses the specified thresholds.")
# Slice the time and data arrays from the rising edge onwards
sliced_time = time[rising_edge_idx:]
sliced_data = data[rising_edge_idx:]
# Optionally, reset the time so that the rising edge starts at zero
sliced_time = sliced_time - sliced_time[0]
# Create a new TimeSignalData instance with the sliced data
trimmed_signal = TimeSignalData(
time_s=sliced_time.tolist(),
data=sliced_data.tolist(),
data_name=waveform.data_name,
)
return trimmed_signal