Files
oxpanel25/app/torrent/utils.py
T
2026-04-11 21:51:30 +02:00

179 lines
6.0 KiB
Python

import os
from django.conf import settings
import traceback
import base64
import io
from transmission_rpc import Client
from transmission_rpc.error import TransmissionError
# from app.utils import send_sync_channel_message
from .models import Torrent, File
from user.models import User
class Transmission:
trpc_args = [
"id", "percentDone", "uploadRatio", "rateUpload", "rateDownload", "hashString", "status", "sizeWhenDone",
"leftUntilDone", "name", "eta", "totalSize", "uploadedEver", "peersGettingFromUs", "peersSendingToUs",
"tracker", "trackerStats", "activityDate"
]
def __init__(self):
self.client = Client(**settings.TRANSMISSION)
self.update_vpn_port()
def update_vpn_port(self):
"""Lit le port forwarded par Gluetun et met à jour Transmission si nécessaire"""
port_file = "/tmp/gluetun/forwarded_port"
if os.path.exists(port_file):
try:
with open(port_file) as f:
vpn_port = int(f.read().strip())
# Récupère le port actuel configuré dans Transmission
current_settings = self.client.get_session()
if current_settings.peer_port != vpn_port:
self.client.set_session(peer_port=vpn_port)
# Optionnel: loguer le changement
print(f"Transmission peer-port updated to {vpn_port}")
except Exception as e:
print(f"Error updating Transmission port: {e}")
def add_torrent(self, file, file_mode="file_object"):
match file_mode:
case "file_object":
return self.client.add_torrent(file)
case "base64":
file_content = base64.b64decode(file)
file_obj = io.BytesIO(file_content)
return self.client.add_torrent(file_obj)
return None
def get_data(self, hash_string):
data = self.client.get_torrent(hash_string, self.trpc_args)
return {
"progress": data.progress,
"status_str": data.status,
**data.fields
}
def get_all_data(self, hash_strings=None):
return {
data.hashString: {"progress": data.progress, "status_str": data.status, **data.fields}
for data in self.client.get_torrents(hash_strings, self.trpc_args)
}
def get_files(self, hash_string):
return self.client.get_torrent(hash_string).get_files()
def delete(self, hash_string):
return self.client.remove_torrent(hash_string, delete_data=True)
def get_diagnostics(self):
"""
Retourne des informations détaillées sur l'état de Transmission et du VPN.
Utile pour le debug via le shell Django.
"""
session = self.client.get_session()
# 1. Vérification du port VPN (Lecture du fichier Gluetun)
port_file = "/tmp/gluetun/forwarded_port"
vpn_port = None
if os.path.exists(port_file):
with open(port_file, "r") as f:
vpn_port = f.read().strip()
# 2. Test de connectivité du port (via l'API Transmission)
# Ceci demande à Transmission de vérifier si son port est ouvert sur internet
port_is_open = self.client.port_test()
return {
"transmission_version": session.version,
"config_peer_port": session.peer_port,
"gluetun_forwarded_port": vpn_port,
"port_test_success": port_is_open,
"download_dir": session.download_dir,
"rpc_version": session.rpc_version,
"peer_port_random_on_start": session.peer_port_random_on_start,
}
transmission_handler = Transmission()
def torrent_proceed(user, file, file_mode="file_object"):
r = {
"torrent": None,
"status": "error",
"message": "Unexpected error"
}
user: User
if user.size_used > user.max_size:
r["message"] = "Size exceed"
return r
try:
torrent_uploaded = transmission_handler.add_torrent(file, file_mode=file_mode)
except TransmissionError:
print(traceback.format_exc())
r["message"] = "Transmission Error"
return r
except:
print(traceback.format_exc())
return r
else:
r["status"] = "warn"
qs = Torrent.objects.filter(pk=torrent_uploaded.hashString)
if qs.exists():
torrent = qs.get()
if torrent.user == user:
r["message"] = "Already exist"
return r
elif torrent.shared_users.filter(id=user.id).exists():
r["message"] = "Already shared"
return r
else:
torrent.shared_users.add(user)
r["status"] = "success"
r["message"] = "Torrent downloaded by an other user, added to your list"
return r
else:
data = transmission_handler.get_data(torrent_uploaded.hashString)
torrent = Torrent.objects.create(
id=data["hashString"],
name=data["name"],
user=user,
size=data["totalSize"],
transmission_data=data
)
File.objects.bulk_create([
File(
torrent=torrent,
rel_name=file.name,
size=file.size,
)
for file in transmission_handler.get_files(torrent.id)
])
r["torrent"] = torrent
r["status"] = "success"
r["message"] = "Torrent added"
return r
def torrent_share(torrent, current_user, target_user_id):
from .models import Torrent, SharedUser
torrent: Torrent
if (torrent.user_id != target_user_id and
any([torrent.user == current_user, torrent.shared_users.filter(id=current_user.id)]) and
not SharedUser.objects.filter(torrent_id=torrent.id, user_id=target_user_id).exists()):
torrent.shared_users.add(target_user_id)
return True
return False