This commit is contained in:
2025-04-17 13:56:26 +02:00
parent cee644ffd3
commit 16520043a9
8 changed files with 207 additions and 106 deletions

79
main.py
View File

@@ -1,6 +1,4 @@
from PySide6.QtCore import QStandardPaths, QDataStream, QByteArray, QIODevice, Signal, Qt
from PySide6.QtCore import QStandardPaths, QDataStream, QByteArray, QIODevice, Signal, Qt, QTimer, QCryptographicHash
from PySide6.QtNetwork import QLocalServer, QLocalSocket
from PySide6.QtWidgets import QApplication
@@ -10,6 +8,10 @@ import asyncio
import os
import platform
import argparse
import hashlib
import random
import string
import base64
from pathlib import Path
from src.logs import configure_logging
@@ -23,6 +25,7 @@ class SingleApplication(QApplication):
def __init__(self, app_id, args):
super().__init__(args)
self.app_id = app_id
self.shared_key = hashlib.sha256(app_id.encode()).hexdigest()[:16]
self.server = None
self.is_primary_instance = self.try_connect_to_primary()
@@ -34,6 +37,52 @@ class SingleApplication(QApplication):
# En cas d'erreur (serveur déjà existant mais zombie), on le supprime et on réessaie
QLocalServer.removeServer(self.app_id)
self.server.listen(self.app_id)
else:
QTimer.singleShot(0, self.quit)
def encrypt_data(self, data_str):
"""Méthode simple pour brouiller les données"""
# Générer une "nonce" aléatoire pour éviter que les mêmes données produisent le même résultat
nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
# Combiner la nonce, la clé et les données
combined = nonce + self.shared_key + data_str
# Utiliser SHA-256 pour obtenir un hash
hash_obj = QCryptographicHash(QCryptographicHash.Algorithm.Sha256)
hash_obj.addData(combined.encode())
signature = hash_obj.result().toHex().data().decode()[:16]
# Encoder le tout en base64
encoded = base64.b64encode((nonce + signature + data_str).encode()).decode()
return encoded
def decrypt_data(self, encoded_str):
"""Déchiffre les données et vérifie leur intégrité"""
try:
# Décoder de base64
decoded = base64.b64decode(encoded_str.encode()).decode()
# Extraire nonce, signature et données
nonce = decoded[:8]
signature = decoded[8:24]
data_str = decoded[24:]
# Vérifier la signature
combined = nonce + self.shared_key + data_str
hash_obj = QCryptographicHash(QCryptographicHash.Algorithm.Sha256)
hash_obj.addData(combined.encode())
expected_signature = hash_obj.result().toHex().data().decode()[:16]
if signature != expected_signature:
print("Signature invalide, données potentiellement corrompues ou falsifiées")
return None
return data_str
except Exception as e:
print(f"Erreur lors du déchiffrement: {e}")
return None
def try_connect_to_primary(self):
"""Essaie de se connecter à l'instance primaire de l'application"""
@@ -43,26 +92,35 @@ class SingleApplication(QApplication):
if socket.waitForConnected(500):
# Récupérer les arguments pour les envoyer à l'instance primaire
args = sys.argv[1:] if len(sys.argv) > 1 else []
encrypt_args = self.encrypt_data(";".join(args))
# Envoyer les arguments à l'instance primaire
stream = QDataStream(socket)
stream.writeQString(";".join(args))
stream.writeQString(encrypt_args)
socket.flush()
socket.waitForBytesWritten(1000)
socket.disconnectFromServer()
QTimer.singleShot(0, self.quit)
return False # Ce n'est pas l'instance primaire
return True # C'est l'instance primaire
def handle_new_connection(self):
"""Gère une nouvelle connexion d'une instance secondaire"""
socket = self.server.nextPendingConnection()
if socket.waitForReadyRead(1000):
if socket.waitForReadyRead(2000):
stream = QDataStream(socket)
args_str = stream.readQString()
args = args_str.split(";") if args_str else []
encrypted_args = stream.readQString()
args_str = self.decrypt_data(encrypted_args)
# Émettre un signal pour informer l'application des fichiers à ouvrir
if args:
self.files_received.emit(args)
if args_str:
args = args_str.split(";") if args_str else []
if args:
self.files_received.emit(args)
socket.disconnectFromServer()
@@ -84,6 +142,9 @@ if __name__ == "__main__":
app_id = "OxAPP25"
app = SingleApplication(app_id, sys.argv)
if not app.is_primary_instance:
sys.exit(0)
event_loop = qasync.QEventLoop(app)
asyncio.set_event_loop(event_loop)