implementar multithreading na copia de ficheiros

This commit is contained in:
2025-05-02 15:17:17 +01:00
parent ddb8905358
commit ee26905bcd

View File

@ -8,15 +8,61 @@ from modules.organizer import (
from pathlib import Path from pathlib import Path
import shutil import shutil
from tqdm import tqdm from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from modules.logger import logger from modules.logger import logger
def copiar_ficheiro_individual(
ficheiro: str,
base_destino: str,
categorizar_data: bool = False,
usar_criacao: bool = False,
formato_data: str = "%Y-%m-%d",
) -> None:
"""copiar um ficheiro para o destino indicado,
com base nos parametros passados
argumentos:
ficheiro (str) -> caminho (como str) do ficheiro a ser copiado
base_destino (str) -> directório de destino para onde será copiado o ficheiro
categorizar_data (bool) -> flag para activar categorização por data (defeito: False)
usar_criacao (bool) -> usar data de criação (defeito: False usar data de modificacao)
formato_data (str) -> formato de data para nome dos subdirectórios (defeito: '%d-%m-%Y')
retorna:
None
"""
try:
ficheiro_path = Path(ficheiro)
if not ficheiro_path.exists():
logger.warning("Ficheiro '%s' não encontrado. A ignorar.", ficheiro)
return
if categorizar_data:
data_ficheiro = obter_data_ficheiro(ficheiro, usar_criacao=usar_criacao)
pasta_destino = criar_pasta_destino(
str(base_destino),
categorizar_por_tipo(ficheiro),
data_ficheiro,
formato_data,
)
else:
pasta_destino = criar_pasta_destino(
str(base_destino), categorizar_por_tipo(ficheiro)
)
destino_ficheiro = pasta_destino / ficheiro_path.name
shutil.copy2(ficheiro_path, destino_ficheiro)
except Exception as e:
logger.error("Erro a copiar '%s': %s", Path(ficheiro).name, e)
def copiar_ficheiros_para_destino( def copiar_ficheiros_para_destino(
lista_ficheiros: list[str], lista_ficheiros: list[str],
base_destino: str, base_destino: str,
categorizar_data: bool = False, categorizar_data: bool = False,
usar_criacao: bool = False, usar_criacao: bool = False,
formato_data: str = "%d-%m-%Y", formato_data: str = "%Y-%m-%d",
usar_threads: bool = True,
max_workers: int = 8,
) -> None: ) -> None:
""" """
Copiar cada ficheiro na lista para o directório base, seguindo as regras de categorização Copiar cada ficheiro na lista para o directório base, seguindo as regras de categorização
@ -31,42 +77,31 @@ def copiar_ficheiros_para_destino(
retorna: retorna:
None None
""" """
destino_path = Path(base_destino) if usar_threads:
with tqdm( with ThreadPoolExecutor(max_workers=max_workers) as executor:
total=len(lista_ficheiros), desc="A copiar ficheiros ", unit="ficheiro " futures = [
) as barra_progresso: executor.submit(
for ficheiro in lista_ficheiros: copiar_ficheiro_individual,
ficheiro_path = Path(ficheiro) ficheiro,
try: base_destino,
if not ficheiro_path.exists(): categorizar_data,
logger.warning("Ficheiro '%s' não encontrado. A ignorar.", ficheiro) usar_criacao,
barra_progresso.update(1)
continue
if categorizar_data:
data_ficheiro = obter_data_ficheiro(
ficheiro, usar_criacao=usar_criacao
)
pasta_destino = criar_pasta_destino(
str(destino_path),
categorizar_por_tipo(ficheiro),
data_ficheiro,
formato_data, formato_data,
) )
else: for ficheiro in lista_ficheiros
pasta_destino = criar_pasta_destino( ]
str(destino_path),
categorizar_por_tipo(ficheiro),
)
destino_ficheiro = pasta_destino / ficheiro_path.name for _ in tqdm(
shutil.copy2(ficheiro_path, destino_ficheiro) as_completed(futures),
logger.info( total=len(futures),
"Ficheiro '%s' copiado com sucesso para '%s'", desc="A copiar ficheiros ",
ficheiro, unit="ficheiro ",
destino_ficheiro, ):
pass
else:
for ficheiro in tqdm(
lista_ficheiros, desc="A copiar ficheiros ", unit="ficheiro "
):
copiar_ficheiro_individual(
ficheiro, base_destino, categorizar_data, usar_criacao, formato_data
) )
except Exception as e:
logger.error("Erro a copiar '%s': %s", Path(ficheiro).name, e)
finally:
barra_progresso.update(1)