Source code for piel.analysis.signals.time.core.transition
import numpy as np
from piel.types import TimeSignalData, MultiTimeSignalData
[docs]
def extract_rising_edges(
signal: TimeSignalData,
lower_threshold_ratio: float = 0.1,
upper_threshold_ratio: float = 0.9,
) -> MultiTimeSignalData:
"""
Extracts rising edges from a signal defined as transitions from lower_threshold to upper_threshold.
Args:
signal (TimeSignalData): The input signal data.
lower_threshold_ratio (float): Lower threshold as a fraction of signal amplitude (default 0.1).
upper_threshold_ratio (float): Upper threshold as a fraction of signal amplitude (default 0.9).
Returns:
MultiTimeSignalData: A list of DataTimeSignalData instances, each representing a rising edge.
"""
# Convert lists to numpy arrays for efficient processing
time = np.array(signal.time_s)
data = np.array(signal.data)
if len(time) != len(data):
raise ValueError("time_s and data must be of the same length.")
# Determine signal amplitude range
data_min = np.min(data)
data_max = np.max(data)
amplitude = data_max - data_min
# Calculate absolute threshold values
lower_threshold = data_min + lower_threshold_ratio * amplitude
upper_threshold = data_min + upper_threshold_ratio * amplitude
# Initialize list to hold rising edges
rising_edges: MultiTimeSignalData = []
# State variables
in_rising = False
start_idx = None
for i in range(1, len(data)):
# Detect transition from below lower_threshold to above lower_threshold
if not in_rising:
if data[i - 1] < lower_threshold and data[i] >= lower_threshold:
start_idx = i - 1 # Potential start of rising edge
in_rising = True
else:
# Check if signal has reached upper_threshold
if data[i] >= upper_threshold:
end_idx = i
# Extract the segment corresponding to the rising edge
edge_time = time[start_idx : end_idx + 1]
edge_data = data[start_idx : end_idx + 1]
# Create a new TimeSignalData instance for the rising edge
edge_signal = TimeSignalData(
time_s=edge_time.tolist(),
data=edge_data.tolist(),
data_name=f"{signal.data_name}_rising_edge_{len(rising_edges) + 1}",
)
rising_edges.append(edge_signal)
# Reset state
in_rising = False
start_idx = None
elif data[i] < lower_threshold:
# False alarm, reset state
in_rising = False
start_idx = None
return rising_edges