import asyncio from functools import partial from concurrent.futures import ThreadPoolExecutor # Créer un executor global pour toutes les opérations fichier _executor = ThreadPoolExecutor() class AsyncFile: """Un wrapper pour les opérations de fichier asynchrones basé sur run_in_executor.""" def __init__(self, file_path, mode='r', *args, **kwargs): self.file_path = file_path self.mode = mode self.args = args self.kwargs = kwargs self.file = None self._loop = asyncio.get_running_loop() async def __aenter__(self): # Ouvrir le fichier de façon asynchrone open_func = partial(open, self.file_path, self.mode, *self.args, **self.kwargs) self.file = await self._loop.run_in_executor(_executor, open_func) return self async def __aexit__(self, exc_type, exc_val, exc_tb): # Fermer le fichier de façon asynchrone if self.file: await self._loop.run_in_executor(_executor, self.file.close) async def write(self, data): """Écrire des données dans le fichier de façon asynchrone.""" if not self.file: raise ValueError("Le fichier n'est pas ouvert") await self._loop.run_in_executor(_executor, self.file.write, data) async def read(self, size=-1): """Lire des données depuis le fichier de façon asynchrone.""" if not self.file: raise ValueError("Le fichier n'est pas ouvert") return await self._loop.run_in_executor(_executor, self.file.read, size) async def seek(self, offset, whence=0): """Déplacer le curseur dans le fichier de façon asynchrone.""" if not self.file: raise ValueError("Le fichier n'est pas ouvert") return await self._loop.run_in_executor(_executor, self.file.seek, offset, whence) async def tell(self): """Obtenir la position actuelle dans le fichier de façon asynchrone.""" if not self.file: raise ValueError("Le fichier n'est pas ouvert") return await self._loop.run_in_executor(_executor, self.file.tell) async def flush(self): """Forcer l'écriture des données en mémoire tampon sur le disque.""" if not self.file: raise ValueError("Le fichier n'est pas ouvert") await self._loop.run_in_executor(_executor, self.file.flush) # Fonction helper pour simplifier l'utilisation (comme aiofiles.open) async def async_open(file_path, mode='r', *args, **kwargs): """Ouvre un fichier en mode asynchrone, similaire à aiofiles.open.""" return AsyncFile(file_path, mode, *args, **kwargs)