from__future__importannotationsfrompathlibimportPathimportaiofilesfromchainlit.data.storage_clients.baseimportBaseStorageClientdef_normalize_subdir(value:str)->str:"""Normalize a relative storage subdirectory string."""cleaned=(valueor"").replace("\\","/")parts=[partforpartincleaned.split("/")ifpartandpart!="."]return"/".join(parts)def_join_url(root_path:str,path:str)->str:"""Join a root URL path prefix with a child path."""root=(root_pathor"").rstrip("/")tail="/"+path.lstrip("/")returnf"{root}{tail}"ifrootelsetail
[docs]classLocalPublicStorageClient(BaseStorageClient):"""Chainlit storage client that persists artifacts under `/public`."""
def_resolve_public_root(*,configured_public_dir:str,app_root:Path,package_public_root:Path,is_router_only_import:bool,)->Path:"""Resolve effective Chainlit public root and seed packaged assets if needed."""configured=Path(configured_public_dir).expanduser()ifnotconfigured.is_absolute():configured=(app_root/configured).resolve()ifis_router_only_import:ifpackage_public_root.is_dir():returnpackage_public_rootreturnconfiguredtry:configured.mkdir(parents=True,exist_ok=True)exceptOSError:ifpackage_public_root.is_dir():returnpackage_public_rootreturnconfiguredifpackage_public_root.is_dir():forassetinpackage_public_root.iterdir():target=configured/asset.nameiftarget.exists():continueifasset.is_file():try:target.write_bytes(asset.read_bytes())exceptOSError:continuereturnconfigureddef_build_storage_provider(*,subdir:str,public_root:Path,root_path:str)->BaseStorageClient:"""Instantiate the local public storage provider for Chainlit."""returnLocalPublicStorageClient(public_root=public_root,subdir=subdir,root_path=root_path,)