disable encryption inter process
This commit is contained in:
116
main.py
116
main.py
@@ -1,4 +1,4 @@
|
||||
from PySide6.QtCore import QStandardPaths, QDataStream, QByteArray, QIODevice, Signal, Qt, QTimer, QCryptographicHash
|
||||
from PySide6.QtCore import QDataStream, QIODevice, Signal, QTimer
|
||||
from PySide6.QtGui import QPalette, QColor
|
||||
from PySide6.QtNetwork import QLocalServer, QLocalSocket
|
||||
from PySide6.QtWidgets import QApplication
|
||||
@@ -7,14 +7,8 @@ import qasync
|
||||
import sys
|
||||
import asyncio
|
||||
import os
|
||||
import platform
|
||||
import argparse
|
||||
import hashlib
|
||||
import random
|
||||
import string
|
||||
import base64
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from src.logs import configure_logging
|
||||
from windows.main_window import MainWindow
|
||||
@@ -63,9 +57,6 @@ class SingleApplication(QApplication):
|
||||
self.app_id = app_id
|
||||
self.logger.debug(f"ID de l'application: {app_id}")
|
||||
|
||||
self.shared_key = hashlib.sha256(app_id.encode()).hexdigest()[:16]
|
||||
self.logger.debug(f"Clé partagée générée: {self.shared_key}")
|
||||
|
||||
self.server = None
|
||||
self.is_primary_instance = self.try_connect_to_primary()
|
||||
|
||||
@@ -77,7 +68,8 @@ class SingleApplication(QApplication):
|
||||
self.logger.debug("Signal newConnection connecté")
|
||||
|
||||
if not self.server.listen(self.app_id):
|
||||
self.logger.warning(f"Échec de l'écoute sur {self.app_id}, tentative de suppression du serveur existant")
|
||||
self.logger.warning(
|
||||
f"Échec de l'écoute sur {self.app_id}, tentative de suppression du serveur existant")
|
||||
# En cas d'erreur (serveur déjà existant mais zombie), on le supprime et on réessaie
|
||||
QLocalServer.removeServer(self.app_id)
|
||||
if self.server.listen(self.app_id):
|
||||
@@ -90,78 +82,6 @@ class SingleApplication(QApplication):
|
||||
self.logger.info("Instance secondaire détectée, fermeture de l'application")
|
||||
QTimer.singleShot(0, self.quit)
|
||||
|
||||
def encrypt_data(self, data_str):
|
||||
"""
|
||||
Méthode simple pour brouiller les données
|
||||
|
||||
Args:
|
||||
data_str (str): Données à chiffrer
|
||||
|
||||
Returns:
|
||||
str: Données chiffrées en base64
|
||||
"""
|
||||
self.logger.debug(f"Chiffrement des données (longueur: {len(data_str)})")
|
||||
|
||||
# Générer une "nonce" aléatoire pour éviter que les mêmes données produisent le même résultat
|
||||
nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
|
||||
self.logger.debug(f"Nonce générée: {nonce}")
|
||||
|
||||
# Combiner la nonce, la clé et les données
|
||||
combined = nonce + self.shared_key + data_str
|
||||
self.logger.debug("Données combinées avec nonce et clé partagée")
|
||||
|
||||
# Utiliser SHA-256 pour obtenir un hash
|
||||
hash_obj = QCryptographicHash(QCryptographicHash.Algorithm.Sha256)
|
||||
hash_obj.addData(combined.encode())
|
||||
signature = hash_obj.result().toHex().data().decode()[:16]
|
||||
self.logger.debug(f"Signature générée: {signature}")
|
||||
|
||||
# Encoder le tout en base64
|
||||
encoded = base64.b64encode((nonce + signature + data_str).encode()).decode()
|
||||
self.logger.debug(f"Données encodées en base64 (longueur: {len(encoded)})")
|
||||
return encoded
|
||||
|
||||
def decrypt_data(self, encoded_str):
|
||||
"""
|
||||
Déchiffre les données et vérifie leur intégrité
|
||||
|
||||
Args:
|
||||
encoded_str (str): Données chiffrées en base64
|
||||
|
||||
Returns:
|
||||
str ou None: Données déchiffrées ou None en cas d'erreur
|
||||
"""
|
||||
self.logger.debug(f"Déchiffrement des données (longueur: {len(encoded_str)})")
|
||||
|
||||
try:
|
||||
# Décoder de base64
|
||||
decoded = base64.b64decode(encoded_str.encode()).decode()
|
||||
self.logger.debug("Données décodées de base64")
|
||||
|
||||
# Extraire nonce, signature et données
|
||||
nonce = decoded[:8]
|
||||
signature = decoded[8:24]
|
||||
data_str = decoded[24:]
|
||||
self.logger.debug(f"Nonce extraite: {nonce}, signature: {signature}")
|
||||
|
||||
# Vérifier la signature
|
||||
combined = nonce + self.shared_key + data_str
|
||||
hash_obj = QCryptographicHash(QCryptographicHash.Algorithm.Sha256)
|
||||
hash_obj.addData(combined.encode())
|
||||
expected_signature = hash_obj.result().toHex().data().decode()[:16]
|
||||
self.logger.debug(f"Signature attendue: {expected_signature}")
|
||||
|
||||
if signature != expected_signature:
|
||||
self.logger.warning("Signature invalide, données potentiellement corrompues ou falsifiées")
|
||||
return None
|
||||
|
||||
self.logger.debug(f"Données déchiffrées avec succès (longueur: {len(data_str)})")
|
||||
return data_str
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Erreur lors du déchiffrement: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
def try_connect_to_primary(self):
|
||||
"""
|
||||
Essaie de se connecter à l'instance primaire de l'application
|
||||
@@ -180,12 +100,13 @@ class SingleApplication(QApplication):
|
||||
args = sys.argv[1:] if len(sys.argv) > 1 else []
|
||||
self.logger.debug(f"Arguments à transmettre: {args}")
|
||||
|
||||
encrypt_args = self.encrypt_data(";".join(args))
|
||||
self.logger.debug("Arguments chiffrés pour transmission")
|
||||
# Envoi des arguments sans chiffrement
|
||||
args_str = ";".join(args)
|
||||
self.logger.debug("Arguments à envoyer: " + args_str)
|
||||
|
||||
# Envoyer les arguments à l'instance primaire
|
||||
stream = QDataStream(socket)
|
||||
stream.writeQString(encrypt_args)
|
||||
stream.writeQString(args_str)
|
||||
socket.flush()
|
||||
self.logger.debug("Données envoyées à l'instance primaire")
|
||||
|
||||
@@ -216,22 +137,17 @@ class SingleApplication(QApplication):
|
||||
self.logger.debug("Données disponibles pour lecture")
|
||||
stream = QDataStream(socket)
|
||||
|
||||
encrypted_args = stream.readQString()
|
||||
self.logger.debug(f"Arguments chiffrés reçus (longueur: {len(encrypted_args)})")
|
||||
# Lecture des arguments sans déchiffrement
|
||||
args_str = stream.readQString()
|
||||
self.logger.debug(f"Arguments reçus: {args_str}")
|
||||
|
||||
args_str = self.decrypt_data(encrypted_args)
|
||||
if args_str:
|
||||
self.logger.debug(f"Arguments déchiffrés: {args_str}")
|
||||
|
||||
# Émettre un signal pour informer l'application des fichiers à ouvrir
|
||||
args = args_str.split(";") if args_str else []
|
||||
if args:
|
||||
self.logger.info(f"Émission du signal files_received avec {len(args)} arguments")
|
||||
self.files_received.emit(args)
|
||||
else:
|
||||
self.logger.debug("Aucun argument à traiter")
|
||||
# Émettre un signal pour informer l'application des fichiers à ouvrir
|
||||
args = args_str.split(";") if args_str else []
|
||||
if args:
|
||||
self.logger.info(f"Émission du signal files_received avec {len(args)} arguments")
|
||||
self.files_received.emit(args)
|
||||
else:
|
||||
self.logger.warning("Échec du déchiffrement des arguments")
|
||||
self.logger.debug("Aucun argument à traiter")
|
||||
else:
|
||||
self.logger.warning("Délai d'attente dépassé pour la lecture des données")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user