init
This commit is contained in:
@@ -1,3 +1,328 @@
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
from rest_framework.test import APITestCase, APIClient
|
||||
from rest_framework import status
|
||||
from django.conf import settings
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# Create your tests here.
|
||||
from .models import Torrent, SharedUser, File
|
||||
from user.models import User
|
||||
from .views import TorrentViewSet, FileViewSet
|
||||
from .utils import Transmission, torrent_proceed, torrent_share
|
||||
|
||||
|
||||
class TorrentModelTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
email='test@example.com',
|
||||
password='testpassword'
|
||||
)
|
||||
self.torrent = Torrent.objects.create(
|
||||
id='abc123',
|
||||
name='Test Torrent',
|
||||
user=self.user,
|
||||
size=1000,
|
||||
transmission_data={}
|
||||
)
|
||||
self.shared_user = User.objects.create_user(
|
||||
username='shareduser',
|
||||
email='shared@example.com',
|
||||
password='sharedpassword'
|
||||
)
|
||||
self.file = File.objects.create(
|
||||
torrent=self.torrent,
|
||||
rel_name='test_file.txt',
|
||||
size=100
|
||||
)
|
||||
|
||||
def test_len_files(self):
|
||||
"""Test the len_files property returns the correct count of files"""
|
||||
self.assertEqual(self.torrent.len_files, 1)
|
||||
|
||||
# Add another file and test again
|
||||
File.objects.create(
|
||||
torrent=self.torrent,
|
||||
rel_name='another_file.txt',
|
||||
size=200
|
||||
)
|
||||
|
||||
# Clear cached_property
|
||||
if hasattr(self.torrent, '_len_files'):
|
||||
delattr(self.torrent, '_len_files')
|
||||
|
||||
self.assertEqual(self.torrent.len_files, 2)
|
||||
|
||||
def test_related_users(self):
|
||||
"""Test the related_users property returns the correct list of users"""
|
||||
# Initially only the owner
|
||||
self.assertEqual(self.torrent.related_users, [self.user.id])
|
||||
|
||||
# Add a shared user
|
||||
self.torrent.shared_users.add(self.shared_user)
|
||||
|
||||
# Clear cached_property
|
||||
if hasattr(self.torrent, '_related_users'):
|
||||
delattr(self.torrent, '_related_users')
|
||||
|
||||
# Should include both users now
|
||||
self.assertIn(self.user.id, self.torrent.related_users)
|
||||
self.assertIn(self.shared_user.id, self.torrent.related_users)
|
||||
|
||||
|
||||
class FileModelTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
email='test@example.com',
|
||||
password='testpassword'
|
||||
)
|
||||
self.torrent = Torrent.objects.create(
|
||||
id='abc123',
|
||||
name='Test Torrent',
|
||||
user=self.user,
|
||||
size=1000,
|
||||
transmission_data={}
|
||||
)
|
||||
self.file = File.objects.create(
|
||||
torrent=self.torrent,
|
||||
rel_name='test/path/file.mp4',
|
||||
size=100
|
||||
)
|
||||
|
||||
def test_pathname(self):
|
||||
"""Test the pathname property returns the correct path"""
|
||||
self.assertEqual(str(self.file.pathname), 'test/path/file.mp4')
|
||||
|
||||
def test_filename(self):
|
||||
"""Test the filename property returns the correct filename"""
|
||||
self.assertEqual(self.file.filename, 'file.mp4')
|
||||
|
||||
def test_abs_pathname(self):
|
||||
"""Test the abs_pathname property returns the correct absolute path"""
|
||||
expected_path = settings.DOWNLOAD_BASE_DIR / self.file.pathname
|
||||
self.assertEqual(self.file.abs_pathname, expected_path)
|
||||
|
||||
def test_is_video(self):
|
||||
"""Test the is_video property correctly identifies video files"""
|
||||
self.assertTrue(self.file.is_video) # mp4 should be identified as video
|
||||
|
||||
# Test non-video file
|
||||
non_video_file = File.objects.create(
|
||||
torrent=self.torrent,
|
||||
rel_name='test/path/document.pdf',
|
||||
size=50
|
||||
)
|
||||
self.assertFalse(non_video_file.is_video)
|
||||
|
||||
|
||||
class SharedUserModelTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.owner = User.objects.create_user(
|
||||
username='owner',
|
||||
email='owner@example.com',
|
||||
password='ownerpassword'
|
||||
)
|
||||
self.shared_user = User.objects.create_user(
|
||||
username='shareduser',
|
||||
email='shared@example.com',
|
||||
password='sharedpassword'
|
||||
)
|
||||
self.torrent = Torrent.objects.create(
|
||||
id='abc123',
|
||||
name='Test Torrent',
|
||||
user=self.owner,
|
||||
size=1000,
|
||||
transmission_data={}
|
||||
)
|
||||
|
||||
def test_shared_user_creation(self):
|
||||
"""Test creating a shared user relationship"""
|
||||
shared = SharedUser.objects.create(
|
||||
user=self.shared_user,
|
||||
torrent=self.torrent
|
||||
)
|
||||
self.assertEqual(shared.user, self.shared_user)
|
||||
self.assertEqual(shared.torrent, self.torrent)
|
||||
|
||||
# Verify the relationship is reflected in the torrent's shared_users
|
||||
self.assertIn(self.shared_user, self.torrent.shared_users.all())
|
||||
|
||||
|
||||
class TorrentViewSetTestCase(APITestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
email='test@example.com',
|
||||
password='testpassword'
|
||||
)
|
||||
self.client = APIClient()
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
self.torrent = Torrent.objects.create(
|
||||
id='abc123',
|
||||
name='Test Torrent',
|
||||
user=self.user,
|
||||
size=1000,
|
||||
transmission_data={}
|
||||
)
|
||||
|
||||
self.file = File.objects.create(
|
||||
torrent=self.torrent,
|
||||
rel_name='test_file.txt',
|
||||
size=100
|
||||
)
|
||||
|
||||
def test_list_torrents(self):
|
||||
"""Test listing torrents"""
|
||||
url = reverse('torrent-list')
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(len(response.data), 1)
|
||||
self.assertEqual(response.data[0]['id'], self.torrent.id)
|
||||
|
||||
def test_retrieve_torrent(self):
|
||||
"""Test retrieving a specific torrent"""
|
||||
url = reverse('torrent-detail', args=[self.torrent.id])
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data['id'], self.torrent.id)
|
||||
self.assertEqual(response.data['name'], 'Test Torrent')
|
||||
|
||||
@patch('torrent.views.torrent_share')
|
||||
def test_share_torrent(self, mock_torrent_share):
|
||||
"""Test sharing a torrent with another user"""
|
||||
mock_torrent_share.return_value = True
|
||||
|
||||
shared_user = User.objects.create_user(
|
||||
username='shareduser',
|
||||
email='shared@example.com',
|
||||
password='sharedpassword'
|
||||
)
|
||||
|
||||
url = reverse('torrent-share', args=[self.torrent.id])
|
||||
response = self.client.post(url, {'user_id': shared_user.id})
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertTrue(response.data['success'])
|
||||
mock_torrent_share.assert_called_once()
|
||||
|
||||
|
||||
class FileViewSetTestCase(APITestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
email='test@example.com',
|
||||
password='testpassword'
|
||||
)
|
||||
self.client = APIClient()
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
self.torrent = Torrent.objects.create(
|
||||
id='abc123',
|
||||
name='Test Torrent',
|
||||
user=self.user,
|
||||
size=1000,
|
||||
transmission_data={}
|
||||
)
|
||||
|
||||
self.file = File.objects.create(
|
||||
torrent=self.torrent,
|
||||
rel_name='test_file.txt',
|
||||
size=100
|
||||
)
|
||||
|
||||
def test_list_files(self):
|
||||
"""Test listing files"""
|
||||
url = reverse('file-list')
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_retrieve_file(self):
|
||||
"""Test retrieving a specific file"""
|
||||
url = reverse('file-detail', args=[self.file.id])
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data['id'], str(self.file.id))
|
||||
self.assertEqual(response.data['rel_name'], 'test_file.txt')
|
||||
|
||||
|
||||
class TransmissionUtilsTestCase(TestCase):
|
||||
@patch('torrent.utils.Client')
|
||||
def test_transmission_init(self, mock_client):
|
||||
"""Test Transmission class initialization"""
|
||||
transmission = Transmission()
|
||||
mock_client.assert_called_once_with(**settings.TRANSMISSION)
|
||||
|
||||
@patch('torrent.utils.Client')
|
||||
def test_add_torrent(self, mock_client):
|
||||
"""Test adding a torrent"""
|
||||
mock_instance = mock_client.return_value
|
||||
mock_instance.add_torrent.return_value = MagicMock()
|
||||
|
||||
transmission = Transmission()
|
||||
file_obj = MagicMock()
|
||||
result = transmission.add_torrent(file_obj)
|
||||
|
||||
mock_instance.add_torrent.assert_called_once_with(file_obj)
|
||||
self.assertEqual(result, mock_instance.add_torrent.return_value)
|
||||
|
||||
@patch('torrent.utils.Client')
|
||||
def test_get_data(self, mock_client):
|
||||
"""Test getting torrent data"""
|
||||
mock_instance = mock_client.return_value
|
||||
mock_torrent = MagicMock()
|
||||
mock_torrent.progress = 50
|
||||
mock_torrent.fields = {'name': 'Test', 'size': 1000}
|
||||
mock_instance.get_torrent.return_value = mock_torrent
|
||||
|
||||
transmission = Transmission()
|
||||
result = transmission.get_data('hash123')
|
||||
|
||||
mock_instance.get_torrent.assert_called_once_with('hash123', transmission.trpc_args)
|
||||
self.assertEqual(result['progress'], 50)
|
||||
self.assertEqual(result['name'], 'Test')
|
||||
self.assertEqual(result['size'], 1000)
|
||||
|
||||
|
||||
class TorrentProceedTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
email='test@example.com',
|
||||
password='testpassword',
|
||||
max_size=10000
|
||||
)
|
||||
|
||||
@patch('torrent.utils.transmission_handler')
|
||||
def test_torrent_proceed_size_exceed(self, mock_transmission):
|
||||
"""Test torrent_proceed when user size is exceeded"""
|
||||
# Set user's used size to exceed max_size
|
||||
self.user.max_size = 100
|
||||
Torrent.objects.create(
|
||||
id='abc123',
|
||||
name='Test Torrent',
|
||||
user=self.user,
|
||||
size=200, # Exceeds max_size
|
||||
transmission_data={}
|
||||
)
|
||||
|
||||
file_obj = MagicMock()
|
||||
result = torrent_proceed(self.user, file_obj)
|
||||
|
||||
self.assertEqual(result['status'], 'error')
|
||||
self.assertEqual(result['message'], 'Size exceed')
|
||||
mock_transmission.add_torrent.assert_not_called()
|
||||
|
||||
@patch('torrent.utils.transmission_handler')
|
||||
def test_torrent_proceed_transmission_error(self, mock_transmission):
|
||||
"""Test torrent_proceed when transmission raises an error"""
|
||||
from transmission_rpc.error import TransmissionError
|
||||
|
||||
mock_transmission.add_torrent.side_effect = TransmissionError('Test error')
|
||||
|
||||
file_obj = MagicMock()
|
||||
result = torrent_proceed(self.user, file_obj)
|
||||
|
||||
self.assertEqual(result['status'], 'error')
|
||||
self.assertEqual(result['message'], 'Transmission Error')
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
from django.conf import settings
|
||||
|
||||
import traceback
|
||||
import base64
|
||||
import io
|
||||
|
||||
from transmission_rpc import Client
|
||||
from transmission_rpc.error import TransmissionError
|
||||
|
||||
@@ -18,8 +21,15 @@ class Transmission:
|
||||
def __init__(self):
|
||||
self.client = Client(**settings.TRANSMISSION)
|
||||
|
||||
def add_torrent(self, file):
|
||||
return self.client.add_torrent(file)
|
||||
def add_torrent(self, file, file_mode="file_object"):
|
||||
match file_mode:
|
||||
case "file_object":
|
||||
return self.client.add_torrent(file)
|
||||
case "base64":
|
||||
file_content = base64.b64decode(file)
|
||||
file_obj = io.BytesIO(file_content)
|
||||
return self.client.add_torrent(file_obj)
|
||||
return None
|
||||
|
||||
def get_data(self, hash_string):
|
||||
data = self.client.get_torrent(hash_string, self.trpc_args)
|
||||
@@ -45,7 +55,7 @@ class Transmission:
|
||||
transmission_handler = Transmission()
|
||||
|
||||
|
||||
def torrent_proceed(user, file):
|
||||
def torrent_proceed(user, file, file_mode="file_object"):
|
||||
r = {
|
||||
"torrent": None,
|
||||
"status": "error",
|
||||
@@ -58,7 +68,7 @@ def torrent_proceed(user, file):
|
||||
return r
|
||||
|
||||
try:
|
||||
torrent_uploaded = transmission_handler.add_torrent(file)
|
||||
torrent_uploaded = transmission_handler.add_torrent(file, file_mode=file_mode)
|
||||
except TransmissionError:
|
||||
print(traceback.format_exc())
|
||||
r["message"] = "Transmission Error"
|
||||
@@ -74,7 +84,7 @@ def torrent_proceed(user, file):
|
||||
if torrent.user == user:
|
||||
r["message"] = "Already exist"
|
||||
return r
|
||||
elif torrent.shared_users.filter(user=user).exists():
|
||||
elif torrent.shared_users.filter(id=user.id).exists():
|
||||
r["message"] = "Already shared"
|
||||
return r
|
||||
else:
|
||||
|
||||
@@ -2,7 +2,7 @@ from django.shortcuts import redirect
|
||||
from django.urls import reverse
|
||||
from django.views.generic import TemplateView
|
||||
from django.contrib.auth.mixins import LoginRequiredMixin
|
||||
from django.db.models import Q, Count, OuterRef
|
||||
from django.db.models import Q, Count, OuterRef, Sum
|
||||
from django.db.models.functions import Coalesce
|
||||
from django.http import HttpResponse, Http404, StreamingHttpResponse
|
||||
|
||||
@@ -152,7 +152,9 @@ class TorrentViewSet(mixins.CreateModelMixin,
|
||||
|
||||
def create(self, request, *args, **kwargs):
|
||||
file = request.data["file"]
|
||||
r = torrent_proceed(self.request.user, file)
|
||||
file_mode = request.data.get("file_mode", "file_object")
|
||||
r = torrent_proceed(self.request.user, file, file_mode)
|
||||
|
||||
if r["torrent"]:
|
||||
r["torrent"] = self.get_serializer_class()(r["torrent"]).data
|
||||
return Response(r)
|
||||
|
||||
Reference in New Issue
Block a user