Source code for piel.flows.digital_electro_optic

from collections import OrderedDict
import numpy as np
from typing import Optional, Callable
from piel.types import (
    BitPhaseMap,
    BitsType,
    PhaseMapType,
    OpticalStateTransitionCollection,
    TruthTable,
    TruthTableLogicType,
)
from piel.conversion import (
    convert_tuple_to_string,
    convert_to_bits,
)
import logging

logger = logging.getLogger(__name__)


[docs] def add_truth_table_bit_to_phase_data( truth_table: TruthTable, bit_phase_map: BitPhaseMap, bit_phase_column_name: str = None, ) -> TruthTable: """ This function converts the bit column of a dataframe into a phase tuple using the phase_bit_dataframe. The phase_bit_dataframe is a dataframe that maps the phase to the bit. The phase_series_name is the name of the phase series in the phase_bit_dataframe. The bit_series_name is the name of the bit series in the phase_bit_dataframe. The bit_column_name is the name of the bit column in the dataframe. The function returns a tuple of phases that correspond to the bit column of the dataframe. Args: truth_table (pd.DataFrame): The dataframe that contains the bit column. bit_phase_map (BitPhaseMap): The dataframe that maps the phase to the bit. bit_phase_column_name (str): The name of the bit column in the dataframe. Returns: truth_table (TruthTable): a TruthTable object with the phase columns added. """ if bit_phase_column_name is None: bit_phase_column_name = "bits" phase_list = [] # Iterate through the dataframe's phase tuples column for bit_phase_i in getattr( truth_table, bit_phase_column_name ).values(): # TODO update on truth table declaration # Convert the bitstrings into a tule of phases phase_i = find_nearest_phase_for_bit(bit_phase_i, bit_phase_map) # Add the bits to the final list phase_list.extend(phase_i) phase_tuple = tuple(phase_list) # Transpose the phase_tuple using zip(*phase_tuple) transposed_phase_tuple = list(zip(*phase_tuple)) phase_array_length = len(phase_tuple[0]) for phase_iterable_id_i in range(phase_array_length): # Initialise lists # TODO auto select which one to implement ordered_dict_i = OrderedDict( (i, value) for i, value in enumerate(transposed_phase_tuple[phase_iterable_id_i]) ) setattr(truth_table, f"phase_{phase_iterable_id_i}", ordered_dict_i) return truth_table
[docs] def add_truth_table_phase_to_bit_data( truth_table: TruthTable, bit_phase_map: BitPhaseMap, phase_column_name: str = None, rounding_function: Optional[Callable] = None, ) -> TruthTable: """ This function converts the phase column of a dataframe into a bit tuple using the phase_bit_dataframe. The phase_bit_dataframe is a dataframe that maps the phase to the bit. The phase_series_name is the name of the phase series in the phase_bit_dataframe. The bit_series_name is the name of the bit series in the phase_bit_dataframe. The phase_column_name is the name of the phase column in the dataframe. The function returns a tuple of bits that correspond to the phase column of the dataframe. Args: truth_table (pd.DataFrame): The dataframe that contains the phase column. bit_phase_map (BitPhaseMap): The dataframe that maps the phase to the bit. rounding_function (Optional[Callable]): The rounding function that is used to round the phase to the nearest phase in the phase_bit_dataframe. Returns: tuple: A tuple of bits that correspond to the phase column of the dataframe. """ if phase_column_name is None: phase_column_name = "phase" bit_list = [] # Iterate through the dataframe's phase tuples column for phase_tuple in getattr( truth_table, phase_column_name ): # TODO update on truth table declaration if not isinstance(phase_tuple, BitPhaseMap): phase_tuple = tuple([phase_tuple]) # Convert the tuple of phases into bitstrings using convert_phase_array_to_bit_array bits = convert_phase_to_bit_iterable( phase_tuple, bit_phase_map, rounding_function ) # Add the bits to the final list bit_list.extend(tuple([bits])) bits_tuple = tuple(bit_list) # Transpose the bits_tuple using zip(*bits_tuple) transposed_bits_tuple = list(zip(*bits_tuple)) phase_bit_array_length = len(bits_tuple[0]) for phase_iterable_id_i in range(phase_bit_array_length): # Initialise lists # TODO auto select which one to implement # TODO adsuming given ordering ordered_dict_i = OrderedDict( (i, value) for i, value in enumerate(transposed_bits_tuple[phase_iterable_id_i]) ) setattr(truth_table, f"bit_phase_{phase_iterable_id_i}", ordered_dict_i) return truth_table
[docs] def convert_optical_transitions_to_truth_table( optical_state_transitions: OpticalStateTransitionCollection, bit_phase_map=BitPhaseMap, logic: TruthTableLogicType = "implementation", ) -> TruthTable: ports_list = optical_state_transitions.keys_list output_ports_list = list() if logic == "implementation": transitions_dataframe = optical_state_transitions.target_output_dataframe elif logic == "full": transitions_dataframe = optical_state_transitions.transition_dataframe else: raise ValueError(f"Invalid logic type: {logic}") logger.debug(transitions_dataframe["phase"]) phase_bit_array_length = len(transitions_dataframe["phase"].iloc[0]) truth_table_raw = dict() # Check if all input and output connection are in the dataframe for port_i in ports_list: truth_table_raw[port_i] = transitions_dataframe.loc[:, port_i].values if port_i == "phase": continue if not isinstance(transitions_dataframe.loc[0, port_i], tuple): print(transitions_dataframe.loc[0, port_i]) continue truth_table_raw[f"{port_i}_str"] = transitions_dataframe.loc[:, port_i].apply( convert_tuple_to_string ) truth_table_raw[f"{port_i}_str"] = truth_table_raw[f"{port_i}_str"].apply( lambda x: "".join(str(x)) ) truth_table_raw[f"{port_i}_str"] = truth_table_raw[ f"{port_i}_str" ].values.tolist() for phase_iterable_id_i in range(phase_bit_array_length): # Initialise lists truth_table_raw[f"bit_phase_{phase_iterable_id_i}"] = list() output_ports_list.append(f"bit_phase_{phase_iterable_id_i}") for transition_id_i in range(len(transitions_dataframe)): bit_phase = convert_phase_to_bit_iterable( phase=transitions_dataframe["phase"].iloc[transition_id_i], bit_phase_map=bit_phase_map, ) for phase_iterable_id_i in range(phase_bit_array_length): truth_table_raw[f"bit_phase_{phase_iterable_id_i}"].append( bit_phase[phase_iterable_id_i] ) input_ports = ["input_fock_state_str"] output_ports = output_ports_list truth_table_filtered_dictionary = filter_and_correct_truth_table( truth_table_dictionary=truth_table_raw, input_ports=input_ports, output_ports=output_ports, ) return TruthTable( input_ports=input_ports, output_ports=output_ports, **truth_table_filtered_dictionary, )
[docs] def convert_phase_to_bit_iterable( phase: PhaseMapType, bit_phase_map: BitPhaseMap, rounding_function: Optional[Callable] = None, ) -> tuple: """ This function converts a phase array or tuple iterable, into the corresponding mapping of their bitstring required within a particular bit-phase mapping. A ``phase_array`` iterable is provided, and each phase is mapped to a particular bitstring based on the ``phase_bit_dataframe``. A tuple is composed of strings that represent the bitstrings of the phases provided. Args: phase(Iterable): Iterable of phases to map to bitstrings. bit_phase_map(BitPhaseMap): Dataframe containing the phase-bits mapping. rounding_function(Callable): Rounding function to apply to the target phase. Returns: bit_array(tuple): Tuple of bitstrings corresponding to the phases. """ # Determine the maximum length of the bitstrings in the dataframe # Assumes last bit phase mapping is the largest one max_bit_length = len(bit_phase_map.bits[-1]) bit_array = [] for phase_i in phase: # Apply rounding function if provided if rounding_function: phase_i = rounding_function(phase_i) # Check if phase is in the dataframe matched_rows = bit_phase_map.dataframe.loc[ bit_phase_map.dataframe["phase"] == phase_i, "bits" ] # If exact phase is not found, use the nearest phase bit representation if matched_rows.empty: bitstring, _ = find_nearest_bit_for_phase( phase_i, bit_phase_map, rounding_function ) else: bitstring = matched_rows.iloc[0] # Pad the bitstring to the maximum length full_length_bitstring = bitstring.zfill(max_bit_length) bit_array.append(full_length_bitstring) return tuple(bit_array)
[docs] def find_nearest_bit_for_phase( target_phase: float, bit_phase_map: BitPhaseMap, rounding_function: Optional[Callable] = None, ) -> tuple: """ This is a mapping function between a provided target phase that might be more analogous, with the closest bit-value in a `bit-phase` ideal relationship. The error between the target phase and the applied phase is limited to the discretisation error of the phase mapping. Args: target_phase(float): Target phase to map to. bit_phase_map(pd.DataFrame): Dataframe containing the phase-bits mapping. rounding_function(Callable): Rounding function to apply to the target phase. Returns: bitstring(str): Bitstring corresponding to the nearest phase. """ # TODO interim pydantic-dataframe migration bit_phase_map = bit_phase_map.dataframe # Apply rounding function if provided if rounding_function: target_phase = rounding_function(target_phase) # Find the nearest phase from the dataframe phases = bit_phase_map["phase"].values nearest_phase = phases[ np.argmin(np.abs(phases - target_phase)) ] # TODO implement rounding function here. # Get the corresponding bitstring for the nearest phase bitstring = bit_phase_map.loc[bit_phase_map["phase"] == nearest_phase, "bits"].iloc[ 0 ] return bitstring, nearest_phase
[docs] def find_nearest_phase_for_bit( bits: BitsType, phase_map: BitPhaseMap, ) -> PhaseMapType: """ Maps a bitstring to the nearest phase(s) in a phase-bit mapping dataframe. Args: bits (AbstractBitsType): Bitstring to map to a phase. phase_map (BitPhaseMap): Dataframe containing the phase-bits mapping. Returns: Tuple[str, ...]: Tuple of phases or an empty tuple if no match is found. """ bits = convert_to_bits(bits) try: # Find all phases corresponding to the given bitstring matching_phases = phase_map.dataframe.loc[ phase_map.dataframe["bits"] == bits, "phase" ].values # Check if any phases were found if len(matching_phases) == 0: print(f"No phases found for bits: {bits}") return tuple() # Convert to a tuple phase_array = tuple([matching_phases]) return phase_array except Exception as e: print(f"An error occurred: {e}") return tuple()
[docs] def filter_and_correct_truth_table( truth_table_dictionary: dict, input_ports: list, output_ports: list ): """ Ensures each unique value of the specified input connection maps to a unique set of values of the output connection. If conflicts are found (i.e., the same input maps to different outputs), it retains the first unique mapping. It returns a corrected truth table dictionary with only the unique mappings. Args: input_ports (list of str): List of input port names. output_ports (list of str): List of output port names. truth_table_dictionary (dict): Dictionary containing input and output files. Keys are port names, and values are lists of values. Returns: dict: A corrected truth table dictionary with unique mappings. Raises: ValueError: If input_ports or output_ports are not found in truth_table_dictionary. """ # Ensure the specified connection are in the truth table dictionary for port in input_ports + output_ports: if port not in truth_table_dictionary: raise ValueError(f"Port '{port}' not found in truth_table_dictionary.") # Initialize a dictionary to keep track of mappings mapping_dict = {} # Initialize lists to store the corrected inputs and outputs corrected_inputs = {input_port: [] for input_port in input_ports} corrected_outputs = {output_port: [] for output_port in output_ports} # Get the length of the files lists num_entries = len(truth_table_dictionary[input_ports[0]]) for i in range(num_entries): # Extract the current input value input_value = tuple( truth_table_dictionary[input_port][i] for input_port in input_ports ) # Extract the current set of output values output_values = tuple( truth_table_dictionary[output_port][i] for output_port in output_ports ) if input_value not in mapping_dict: # Store the unique mapping mapping_dict[input_value] = output_values # Append to the corrected lists for input_port, value in zip(input_ports, input_value): corrected_inputs[input_port].append(value) for output_port, value in zip(output_ports, output_values): corrected_outputs[output_port].append(value) # Combine the corrected inputs and outputs into a single dictionary corrected_truth_table = {**corrected_inputs, **corrected_outputs} return corrected_truth_table