Source code for piel.file_system

import glob
import json
import os
import sys
import pathlib
import shutil
import stat
import subprocess
import types
from typing import Optional
from piel.types import PathTypes, ProjectType, PielBaseModel

__all__ = [
    "check_path_exists",
    "check_example_design",
    "copy_source_folder",
    "copy_example_design",
    "create_new_directory",
    "create_piel_home_directory",
    "delete_path",
    "delete_path_list_in_directory",
    "get_files_recursively_in_directory",
    "get_top_level_script_directory",
    "get_id_map_directory_dictionary",
    "list_prefix_match_directories",
    "permit_directory_all",
    "permit_script_execution",
    "read_json",
    "rename_file",
    "rename_files_in_directory",
    "replace_string_in_file",
    "replace_string_in_directory_files",
    "return_path",
    "rp",
    "run_script",
    "write_file",
    "write_model_to_json",
]


[docs] def check_path_exists( path: PathTypes, raise_errors: bool = False, ) -> bool: """ Checks if a directory exists. Args: path(PathTypes): Input path. Returns: directory_exists(bool): True if directory exists. """ directory_exists = False path = return_path(path) if path.exists(): directory_exists = True else: if raise_errors: raise ValueError("Path: " + str(path) + " does not exist.") return directory_exists
[docs] def check_example_design( design_name: str = "simple_design", designs_directory: PathTypes | None = None, ) -> bool: """ We copy the example simple_design from docs to the `/foss/designs` in the `iic-osic-tools` environment. Args: design_name(str): Name of the design to check. designs_directory(PathTypes): Directory that contains the DESIGNS environment flag. # TODO Returns: None """ if designs_directory is None: designs_directory = pathlib.Path(os.environ["DESIGNS"]) design_folder = ( designs_directory / design_name ) # TODO verify this copying operation return design_folder.exists()
[docs] def copy_source_folder( source_directory: PathTypes, target_directory: PathTypes, delete: bool = None, ) -> None: """ Copies the files from a source_directory to a target_directory Args: source_directory(PathTypes): Source directory. target_directory(PathTypes): Target directory. delete(bool): Delete target directory. Default: False. Returns: None """ source_directory = return_path(source_directory) target_directory = return_path(target_directory) if source_directory == target_directory: print( Warning( f"source_directory: {source_directory} and target_directory: {target_directory} cannot be the same." ) ) return if delete is True: shutil.rmtree(target_directory) else: if target_directory.exists(): answer = input("Confirm deletion of: " + str(target_directory.resolve())) if answer.upper() in ["Y", "YES"]: shutil.rmtree(target_directory) elif answer.upper() in ["N", "NO"]: print( "Copying files now from: " + str(source_directory.resolve()) + " to " + str(target_directory.resolve()) ) shutil.copytree( source_directory, target_directory, symlinks=False, ignore=None, copy_function=shutil.copy2, ignore_dangling_symlinks=False, dirs_exist_ok=False, )
[docs] def copy_example_design( project_source: ProjectType = "piel", example_name: str = "simple_design", target_directory: PathTypes = None, target_project_name: Optional[str] = None, **kwargs, ) -> None: """ We copy the example simple_design from docs to the `/foss/designs` in the `iic-osic-tools` environment. Args: project_source(str): Source of the project. example_name(str): Name of the example design. target_directory(PathTypes): Target directory. target_project_name(str): Name of the target project. Returns: None """ if project_source == "piel": example_design_folder = ( os.environ["PIEL_PACKAGE_DIRECTORY"] + "/docs/examples/designs/" + example_name ) elif project_source == "openlane": import openlane example_design_folder = ( pathlib.Path(openlane.__file__).parent.resolve() / example_name ) design_folder = os.environ["DESIGNS"] + "/" + example_name else: raise ValueError("project_source must be either 'piel' or 'openlane'.") if target_directory is not None: target_directory = return_path(target_directory) if target_project_name is not None: design_folder = target_directory / target_project_name else: design_folder = target_directory else: # Copy default openlane example design_folder = os.environ["DESIGNS"] + "/" + example_name copy_source_folder( source_directory=example_design_folder, target_directory=design_folder, **kwargs ) if target_project_name is not None: rename_files_in_directory( target_directory=design_folder, match_string=example_name, renamed_string=target_project_name, ) replace_string_in_directory_files( target_directory=design_folder, match_string=example_name, replace_string=target_project_name, )
def convert_list_to_path_list( input_list: list[PathTypes], ) -> list[pathlib.Path]: """ Converts a list of strings or pathlib.Path to a list of pathlib.Path. Args: input_list(list[PathTypes]): Input list. Returns: output_list(list[pathlib.Path]): Output list. """ output_list = [] for item in input_list: item = return_path(item) output_list.append(item) return output_list
[docs] def create_new_directory( directory_path: str | pathlib.Path, overwrite: bool = False, ) -> bool: """ Creates a new directory. If the parents of the target_directory do not exist, they will be created too. Args: overwrite: Overwrite directory if it already exists. directory_path(str | pathlib.Path): Input path. Returns: None """ directory_path = return_path(directory_path) if directory_path.exists(): if overwrite: delete_path(directory_path) else: return False # Check permissions of the parent to be able to create the directory parent_directory = directory_path.parent parent_directory_permissions = oct(parent_directory.stat().st_mode) # If permissions are not read, write and execute for all, we change them if parent_directory_permissions != "0o777": permit_directory_all(parent_directory) # Create the directory directory_path.mkdir(parents=True) return True
[docs] def create_piel_home_directory() -> None: """ Creates the piel home directory. Returns: None """ # TODO implement check so it does not overwrite. piel_home_directory = pathlib.Path.home() / ".piel" create_new_directory(piel_home_directory)
[docs] def delete_path(path: str | pathlib.Path) -> None: """ Deletes a path. Args: path(str | pathlib.Path): Input path. Returns: None """ path = return_path(path) if path.exists(): if path.is_dir(): shutil.rmtree(path) elif path.is_file(): path.unlink()
[docs] def delete_path_list_in_directory( directory_path: PathTypes, path_list: list, ignore_confirmation: bool = False, validate_individual: bool = False, ) -> None: """ Deletes a list of files in a directory. Usage: ```python delete_path_list_in_directory( directory_path=directory_path, path_list=path_list, ignore_confirmation=True ) ``` Args: directory_path(PathTypes): Input path. path_list(list): List of files. ignore_confirmation(bool): Ignore confirmation. Default: False. validate_individual(bool): Validate individual files. Default: False. Returns: None """ directory_path = return_path(directory_path) path_list = convert_list_to_path_list(path_list) if validate_individual: if ignore_confirmation: for path in path_list: if path.exists(): delete_path(path) else: for path in path_list: if path.exists(): answer = input("Confirm deletion of: " + str(path)) if answer.upper() in ["Y", "YES"]: delete_path(path) elif answer.upper() in ["N", "NO"]: print("Skipping deletion of: " + str(path)) else: if ignore_confirmation: for path in path_list: if path.exists(): delete_path(path) else: answer = input("Confirm deletion of: " + str(path_list)) if answer.upper() in ["Y", "YES"]: for path in path_list: if path.exists(): delete_path(path) elif answer.upper() in ["N", "NO"]: print("Skipping deletion of: " + str(path_list))
[docs] def get_files_recursively_in_directory( path: PathTypes, extension: str = "*", ): """ Returns a list of files in a directory. Usage: get_files_recursively_in_directory('path/to/directory', 'extension') Args: path(PathTypes): Input path. extension(str): File extension. Returns: file_list(list): List of files. """ path = return_path(path) file_list = [] for x in os.walk(str(path.resolve())): for file_path in glob.glob(os.path.join(x[0], f"*.{extension}")): file_list.append(file_path) return file_list
[docs] def get_id_map_directory_dictionary(path_list: list[PathTypes], target_prefix: str): """ Returns a dictionary of ids to directories. Usage: get_id_to_directory_dictionary(path_list, target_prefix) Args: path_list(list[PathTypes]): List of paths. target_prefix(str): Target prefix. Returns: id_dict(dict): Dictionary of ids to directories. """ id_dict = {} for path in path_list: basename = os.path.basename(path) # Check if the basename starts with the provided prefix if basename.startswith(target_prefix): # Extract the id after the prefix id_str = basename[len(target_prefix) :] # Convert the id string into an integer and use it as a key for the dictionary id_dict[int(id_str)] = path return id_dict
[docs] def get_top_level_script_directory() -> pathlib.Path: """ Attempts to return the top-level script directory when this file is run, compatible with various execution environments like Jupyter Lab, pytest, PDM, etc. TODO run full verification. Returns: top_level_script_directory(pathlib.Path): Top level script directory. """ # For Jupyter notebooks and IPython environments if "ipykernel" in sys.modules or "IPython" in sys.modules: try: from IPython.core.getipython import get_ipython # IPython's get_ipython function provides access to the IPython interactive environment ipython = get_ipython() if ipython and hasattr(ipython, "starting_dir"): return pathlib.Path(ipython.starting_dir).resolve() except Exception as e: # Log or print the error as needed print(f"Could not determine the notebook directory due to: {e}") # For pytest, PDM, and similar environments where sys.argv might be manipulated # or __main__.__file__ is not set as expected. if "pytest" in sys.modules or "_pytest" in sys.modules or "pdm" in sys.modules: return pathlib.Path.cwd() # For standard script executions and other environments # This checks if __main__ module has __file__ attribute and uses it main_module = sys.modules.get("__main__", None) if main_module and hasattr(main_module, "__file__"): main_file = main_module.__file__ return pathlib.Path(main_file).resolve().parent # As a general fallback, use the current working directory return pathlib.Path.cwd()
[docs] def list_prefix_match_directories( output_directory: PathTypes, target_prefix: str, ): """ Returns a list of directories that match a prefix. Usage: list_prefix_match_directories('path/to/directory', 'prefix') Args: output_directory(PathTypes): Output directory. target_prefix(str): Target prefix. Returns: matching_dirs(list): List of directories. """ output_directory = return_path(output_directory) # Use os.path.join to ensure the path is constructed correctly # irrespective of the operating system search_path = os.path.join(output_directory, target_prefix + "*") # Use glob to get all matching directories matching_directories = [d for d in glob.glob(search_path) if os.path.isdir(d)] return matching_directories
[docs] def permit_script_execution(script_path: PathTypes) -> None: """ Permits the execution of a script. Usage: permit_script_execution('path/to/script') Args: script_path(PathTypes): Script path. Returns: None """ script = return_path(script_path) script.chmod(script.stat().st_mode | stat.S_IEXEC)
[docs] def permit_directory_all(directory_path: PathTypes) -> None: """ Permits a directory to be read, written and executed. Use with care as it can be a source for security issues. Usage: permit_directory_all('path/to/directory') Args: directory_path(PathTypes): Input path. Returns: None """ directory_path = return_path(directory_path) try: directory_path.chmod(0o777) except PermissionError: print( UserWarning( "Could not change permissions of directory: " + str(directory_path.resolve()) + " to 777. Your Python executable might not have the required permissions. Restructure your project directory so Python does not have to change permissions." ) )
[docs] def read_json(path: PathTypes) -> dict: """ Reads a JSON file. Usage: read_json('path/to/file.json') Args: path(PathTypes): Input path. Returns: json_data(dict): JSON files. """ path = return_path(path) with open(path, "r") as json_file: json_data = json.load(json_file) return json_data
[docs] def rename_file( match_file_path: PathTypes, renamed_file_path: PathTypes, ) -> None: """ Renames a file. Usage: rename_file('path/to/match_file', 'path/to/renamed_file') Args: match_file_path(PathTypes): Input path. renamed_file_path(PathTypes): Input path. Returns: None """ match_file_path = return_path(match_file_path) renamed_file_path = return_path(renamed_file_path) match_file_path.rename(renamed_file_path)
[docs] def rename_files_in_directory( target_directory: PathTypes, match_string: str, renamed_string: str, ) -> None: """ Renames all files in a directory. Usage: rename_files_in_directory('path/to/directory', 'match_string', 'renamed_string') Args: target_directory(PathTypes): Input path. match_string(str): String to match. renamed_string(str): String to replace. Returns: None """ target_directory = return_path(target_directory) for path in target_directory.iterdir(): if path.is_file(): new_filename = path.name.replace(match_string, renamed_string) new_path = path.with_name(new_filename) rename_file(path, new_path)
[docs] def replace_string_in_file( file_path: PathTypes, match_string: str, replace_string: str, ): """ Replaces a string in a file. Usage: replace_string_in_file('path/to/file', 'match_string', 'replace_string') Args: file_path(PathTypes): Input path. match_string(str): String to match. replace_string(str): String to replace. Returns: None """ file_path = return_path(file_path) try: with open(file_path, "r", encoding="utf-8") as file: content = file.read() content = content.replace(match_string, replace_string) with file_path.open("w") as file_write: file_write.write(content) except (UnicodeDecodeError, OSError): pass
[docs] def replace_string_in_directory_files( target_directory: PathTypes, match_string: str, replace_string: str, ): """ Replaces a string in all files in a directory. Usage: replace_string_in_directory_files('path/to/directory', 'match_string', 'replace_string') Args: target_directory(PathTypes): Input path. match_string(str): String to match. replace_string(str): String to replace. Returns: None """ target_directory = return_path(target_directory) for path in target_directory.rglob("*"): if path.is_file(): replace_string_in_file(path, match_string, replace_string)
[docs] def return_path( input_path: PathTypes, as_piel_module: bool = False, ) -> pathlib.Path: """ Returns a pathlib.Path to be able to perform operations accordingly internally. This allows us to maintain compatibility between POSIX and Windows systems. When the `as_piel_module` flag is enabled, it will analyse whether the input path can be treated as a piel module, and treat the returned path as a module would be treated. This comes useful when analysing files generated in this particular structure accordingly. Usage: return_path('path/to/file') Args: input_path(str): Input path. Returns: pathlib.Path: Pathlib path. """ def treat_as_module(input_path_i: pathlib.Path): """ This function is useful after the path has been converted accordingly. It will analyse whether the path can be treated as a module, and return the path to the module accordingly. If it cannot be treated as a piel module, then it will return the original path. Args: input_path_i(pathlib.Path): Input path. Returns: pathlib.Path: Pathlib path. """ def verify_install_file(install_file_path: pathlib.Path): if install_file_path.exists(): if (input_path_i / directory_name).exists(): return input_path_i / directory_name else: return input_path_i else: raise ValueError( "input_path: " + str(input_path_i) + " cannot be treated as a piel module." ) directory_name = input_path_i.name try: setup_py_path = input_path_i / "setup.py" module_directory = verify_install_file(setup_py_path) except ValueError: input_path_parent_setup_py_path = input_path_i.parent / "setup.py" module_directory = verify_install_file(input_path_parent_setup_py_path) return module_directory if isinstance(input_path, str): output_path = pathlib.Path(input_path) if as_piel_module: output_path = treat_as_module(output_path) elif isinstance(input_path, pathlib.Path): output_path = input_path if as_piel_module: output_path = treat_as_module(output_path) elif isinstance(input_path, types.ModuleType): try: output_path = pathlib.Path(input_path.__file__) / ".." except Exception: # TODO FIX this hacked af output_path_raw = pathlib.Path(input_path.__path__[0]) output_directory_name = output_path_raw.name output_path = output_path_raw / output_directory_name pass elif isinstance(input_path, os.PathLike): output_path = pathlib.Path(input_path) if as_piel_module: output_path = treat_as_module(output_path) else: raise ValueError( "input_path: " + str(input_path) + " is of type: " + str(type(input_path)) ) output_path = output_path.resolve() return output_path
rp = return_path
[docs] def run_script( script_path: PathTypes, program: str = None, ) -> None: """ Runs a script on the filesystem `script_path`. By default this is a bash script. Args: script_path(PathTypes): Script path. Returns: None """ script = return_path(script_path) try: if program is None: subprocess.run(str(script.resolve()), check=True, capture_output=True) else: subprocess.run( [program, str(script.resolve())], check=True, capture_output=True ) except subprocess.CalledProcessError as e: print(f"Error running script {script}:") print(f"Command: {e.cmd}") print(f"Return code: {e.returncode}") print(f"Output: {e.stdout}") print(f"Error output: {e.stderr}") raise e # Re-raise the exception after logging the details
[docs] def write_file( directory_path: PathTypes, file_text: str, file_name: str, append: bool = False, ) -> bool: """ Writes a file to a directory. Appends to the file if it exists and append is True. Args: directory_path(PathTypes): Directory path. file_text(str): File text. file_name(str): File name. append(bool): If True, appends to the file if it exists. Defaults to False. Returns: bool: True if successful. """ directory_path = return_path(directory_path) directory_exists = check_path_exists(directory_path) if not directory_exists: try: create_new_directory(directory_path) except PermissionError: print( UserWarning( "Could not create directory: " + str(directory_path.resolve()) + ". Your Python executable might not have the required permissions. Restructure your project " "directory so Python does not have to change permissions." ) ) return False mode = ( "a" if append else "w" ) # Use append mode if append is True, otherwise use write mode with open(str(directory_path / file_name), mode) as file: file.write(file_text) return True
[docs] def write_model_to_json( model: PielBaseModel, file_path: PathTypes, ): """ Writes a pydantic model to a JSON file. """ file_path = return_path(file_path) with open(file_path, "w") as file: json.dump(model.model_dump(), file, indent=4) return file_path