Source code for piel.connectivity.core

import logging
from typing import Optional, Callable
from piel.types import (
    Port,
    Connection,
    PhysicalConnection,
    ConnectionTypes,
    ComponentTypes,
    TimeMetric,
    PhysicalComponent,
)


logger = logging.getLogger(__name__)


__all__ = [
    "create_all_connections",
    "create_component_connections",
    "create_sequential_component_path",
    "create_connection_list_from_ports_lists",
]


[docs] def create_all_connections( ports: list[Port], connection_factory: Optional[Callable[[list[Port]], Connection]] = None, connection_type_output: Optional[ConnectionTypes] = Connection, ) -> list[ConnectionTypes]: """ This function receives a list of connection and creates the connections between them as two-port relationships. It returns a list of connections. More than two connection can be provided, and it will create all the possible connections. Parameters ---------- ports : list[Port] The connection list to create connections. connection_factory : Optional[Callable[[list[Port]], Connection]], optional A function that creates a connection object from a list of connection. The function should receive a list of connection and return a connection object. If not provided, a default connection factory will be used. The default connection factory creates a tuple of connection as a connection object. The default is None. connection_type_output : Optional[type[Connection]], optional The type of connection object to return. If not provided, the default connection factory will be used. The default is None Returns ------- list[Connection] A list of connections that were created. """ def default_connection_factory(ports: list[Port, Port]) -> ConnectionTypes: """ This function creates a connection object from a list of connection, by default it creates a tuple of connection as a connection object. Should be two connection. """ raw_connection = tuple(ports) if type(connection_type_output) is type(Connection): connection = Connection(ports=raw_connection) elif type(connection_type_output) is type(PhysicalConnection): connection = PhysicalConnection(connections=[raw_connection]) else: raise TypeError( f"Expected a Connection or PhysicalConnection type, got {type(connection_type_output)} instead." ) return connection connection_factory = connection_factory or default_connection_factory # Ensure that connection is a list if not isinstance(ports, list): # Raise descriptive error raise TypeError(f"Expected a list of connection, got {type(ports)} instead.") connections = [] for i, port1 in enumerate(ports): for port2 in ports[i + 1 :]: # Verify port2 is a port if not isinstance(port2, Port): # Raise descriptive error raise TypeError(f"Expected a port, got {type(port2)} instead.") else: connections.append(connection_factory([port1, port2])) return connections
[docs] def create_connection_list_from_ports_lists( port_connection_list: list[list[Port]], ) -> list[ConnectionTypes]: """ When a list of a list of connection is provided, we construct all the required connections accordingly. TODO more docs. """ connection_list = list() for raw_connection_i in port_connection_list: connection_list_i = create_all_connections(raw_connection_i) connection_list.extend(connection_list_i) return connection_list
[docs] def create_component_connections( components: list[ComponentTypes], connection_reference_str_list: list[str] | list[list[str]], ) -> list[ConnectionTypes]: """ The way this function works is by composing the connection namespaces from the names of the components, and a given connection dot notation which corresponds to that component. Notes ----- The dot notation would be in the format ``"component_1.port1"``. Hence, the input to a connection would be ``["component1.port1", "component2.port1"]`` and this function would compile into generating the corresponding connection. This is by splitting the component name and port name accordingly and then programmatically acquiring the corresponding `Port` reference and creating the `Connection` from this. Parameters ---------- components : list[ComponentTypes] The components to create connections from. connection_reference_str_list : list[str] | list[list[str]] The list of strings that represent the connections to create. Returns ------- list[ConnectionTypes] The list of connections created from the components. """ # Verify the list has at least an element inside it if len(connection_reference_str_list) == 0: raise ValueError( "The list of connection references must have at least one connection." ) # Ensure connection_reference_str_list is a list of lists if isinstance(connection_reference_str_list[0], str): connection_reference_str_list = [connection_reference_str_list] connection_list = [] for connection_reference in connection_reference_str_list: # Ensure each connection reference is a list of two strings # TODO extend to more than two strings if not isinstance(connection_reference, list) or len(connection_reference) != 2: raise ValueError("Each connection reference must be a list of two strings.") # Split the connection reference strings into component and port names component1_name, port1_name = connection_reference[0].split(".") component2_name, port2_name = connection_reference[1].split(".") # Initialize connection port1 = None port2 = None # Get the port references for component in components: if component.name == component1_name: port1 = component.get_port(port1_name) if component.name == component2_name: port2 = component.get_port(port2_name) # Check if the connection were found if port1 is None or port2 is None: raise ValueError( f"Could not find the connection for the connection {connection_reference}" ) # Create the connection connection = Connection(ports=(port1, port2)) connection_list.append(connection) return connection_list
[docs] def create_sequential_component_path( components: list[ComponentTypes], name: str = "", **kwargs ) -> ComponentTypes: """ This function takes in a list of components and creates a sequential path connectivity of components with all the connection defined in each component. By default, the connectivity will be implemented with the first two connection of the components. There is a clear input and output on each component. The timing metric calculations is provided by the timing model of each connection of the component, if there is none defined it will assume a default zero time connectivity between the relevant connection. For the output component collection, it will output the timing of the network as a whole based on the defined subcomponents. This will create an output component with all the subcomponents, TODO more than two connection, and the list of connection Creates a sequential path connectivity of components with all the connection defined in each component. Parameters: ----------- components : List[ComponentTypes] A list of components to be connected sequentially. Returns: -------- ComponentTypes A new component that encapsulates the sequential path of input components. """ if len(components) < 2: raise ValueError( "At least two components are required to create a sequential path." ) connections = [] total_time_value = 0 # = TimeMetric(name=name, attrs={}, value=0, mean=0, min=0, max=0, standard_deviation=0) for i in range(len(components) - 1): current_component = components[i] next_component = components[i + 1] # Assume the first port is output and the second is input if len(current_component.connection) < 1 or len(next_component.connection) < 1: raise ValueError( f"Component {current_component.name} or {next_component.name} doesn't have enough connection." ) output_port = current_component.connection[1] input_port = next_component.connection[0] # Create connection with timing information connection_time = ( output_port.time if hasattr(output_port, "time") else TimeMetric(value=0) ) connection_i = Connection( ports=[output_port, input_port], time=connection_time, name=f"{current_component.name}_to_{next_component.name}", ) physical_connection = PhysicalConnection(connections=[connection_i]) connections.append(physical_connection) # Update total time total_time_value += connection_time.value # TODO total_time.mean += connection_time.mean # TODO total_time.min += connection_time.min # TODO total_time.max += connection_time.max # Assuming standard deviation is not simply additive total_time = TimeMetric(value=total_time_value) # TODO implement full network timing analysis top_level_ports = [components[0].connection[0], components[-1].connection[-1]] # TODO best define top level connection top_level_connection = Connection(ports=top_level_ports, time=total_time) top_level_physical_connection = PhysicalConnection( connections=[top_level_connection] ) # Define abstract path. Note that this is not a physical connection, just that there is a connection path between the connection. # TODO this may have to be redefined connections.append(top_level_physical_connection) ports = [ components[0].connection[0], components[-1].connection[-1], ] logger.debug(f"Sequential Component connections: {connections}") logger.debug(f"Sequential Component components: {components}") logger.debug(f"Sequential Component connection: {ports}") # Create a new component that encapsulates this path path_component = PhysicalComponent( ports=ports, # Input of first, output of last components=components, connections=connections, ) return path_component
[docs] def get_port_index_from_name(port: Port, starting_index: int | None = None) -> int: """ Extracts the numerical index from a port identifier and adjusts based on starting index. If port numbering starts at 0, adds 1. If starts at 1 or is None, leaves as is. Parameters: - port (int or str): The port identifier. - starting_index (int, optional): The starting index (0 or 1). Defaults to None. Returns: - int: The adjusted numerical index of the port. Raises: - ValueError: If starting_index is not 0, 1, or None. - ValueError: If the port string does not contain a numerical index. - TypeError: If the port is neither int nor str. """ import re if isinstance(port, int): port_index = port elif isinstance(port, str): match = re.search(r"\d+", port) if match: port_index = int(match.group()) else: raise ValueError(f"Cannot extract port number from string '{port}'") else: raise TypeError(f"Unsupported port type: {type(port)}. Must be int or str.") # Adjust based on starting index if starting_index == 0: return port_index + 1 elif starting_index == 1: return port_index elif starting_index is None: return port_index else: raise ValueError( f"Unsupported starting index: {starting_index}. Must be 0 or 1." )