299 lines
11 KiB
Python
299 lines
11 KiB
Python
from unittest.mock import MagicMock, patch
|
|
|
|
from django.conf import settings
|
|
from django.test import TestCase
|
|
from django.urls import reverse
|
|
from rest_framework import status
|
|
from rest_framework.test import APIClient, APITestCase
|
|
|
|
from user.models import User
|
|
|
|
from .models import File, SharedUser, Torrent
|
|
from .utils import Transmission, torrent_proceed
|
|
|
|
|
|
class TorrentModelTestCase(TestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(
|
|
username="testuser", email="test@example.com", password="testpassword"
|
|
)
|
|
self.torrent = Torrent.objects.create(
|
|
id="abc123",
|
|
name="Test Torrent",
|
|
user=self.user,
|
|
size=1000,
|
|
transmission_data={},
|
|
)
|
|
self.shared_user = User.objects.create_user(
|
|
username="shareduser", email="shared@example.com", password="sharedpassword"
|
|
)
|
|
self.file = File.objects.create(
|
|
torrent=self.torrent, rel_name="test_file.txt", size=100
|
|
)
|
|
|
|
def test_len_files(self):
|
|
"""Test the len_files property returns the correct count of files"""
|
|
self.assertEqual(self.torrent.len_files, 1)
|
|
|
|
# Add another file and test again
|
|
File.objects.create(torrent=self.torrent, rel_name="another_file.txt", size=200)
|
|
|
|
# Clear cached_property
|
|
if hasattr(self.torrent, "_len_files"):
|
|
delattr(self.torrent, "_len_files")
|
|
|
|
self.assertEqual(self.torrent.len_files, 2)
|
|
|
|
def test_related_users(self):
|
|
"""Test the related_users property returns the correct list of users"""
|
|
# Initially only the owner
|
|
self.assertEqual(self.torrent.related_users, [self.user.id])
|
|
|
|
# Add a shared user
|
|
self.torrent.shared_users.add(self.shared_user)
|
|
|
|
# Clear cached_property
|
|
if hasattr(self.torrent, "_related_users"):
|
|
delattr(self.torrent, "_related_users")
|
|
|
|
# Should include both users now
|
|
self.assertIn(self.user.id, self.torrent.related_users)
|
|
self.assertIn(self.shared_user.id, self.torrent.related_users)
|
|
|
|
|
|
class FileModelTestCase(TestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(
|
|
username="testuser", email="test@example.com", password="testpassword"
|
|
)
|
|
self.torrent = Torrent.objects.create(
|
|
id="abc123",
|
|
name="Test Torrent",
|
|
user=self.user,
|
|
size=1000,
|
|
transmission_data={},
|
|
)
|
|
self.file = File.objects.create(
|
|
torrent=self.torrent, rel_name="test/path/file.mp4", size=100
|
|
)
|
|
|
|
def test_pathname(self):
|
|
"""Test the pathname property returns the correct path"""
|
|
self.assertEqual(str(self.file.pathname), "test/path/file.mp4")
|
|
|
|
def test_filename(self):
|
|
"""Test the filename property returns the correct filename"""
|
|
self.assertEqual(self.file.filename, "file.mp4")
|
|
|
|
def test_abs_pathname(self):
|
|
"""Test the abs_pathname property returns the correct absolute path"""
|
|
expected_path = settings.DOWNLOAD_BASE_DIR / self.file.pathname
|
|
self.assertEqual(self.file.abs_pathname, expected_path)
|
|
|
|
def test_is_video(self):
|
|
"""Test the is_video property correctly identifies video files"""
|
|
self.assertTrue(self.file.is_video) # mp4 should be identified as video
|
|
|
|
# Test non-video file
|
|
non_video_file = File.objects.create(
|
|
torrent=self.torrent, rel_name="test/path/document.pdf", size=50
|
|
)
|
|
self.assertFalse(non_video_file.is_video)
|
|
|
|
|
|
class SharedUserModelTestCase(TestCase):
|
|
def setUp(self):
|
|
self.owner = User.objects.create_user(
|
|
username="owner", email="owner@example.com", password="ownerpassword"
|
|
)
|
|
self.shared_user = User.objects.create_user(
|
|
username="shareduser", email="shared@example.com", password="sharedpassword"
|
|
)
|
|
self.torrent = Torrent.objects.create(
|
|
id="abc123",
|
|
name="Test Torrent",
|
|
user=self.owner,
|
|
size=1000,
|
|
transmission_data={},
|
|
)
|
|
|
|
def test_shared_user_creation(self):
|
|
"""Test creating a shared user relationship"""
|
|
shared = SharedUser.objects.create(user=self.shared_user, torrent=self.torrent)
|
|
self.assertEqual(shared.user, self.shared_user)
|
|
self.assertEqual(shared.torrent, self.torrent)
|
|
|
|
# Verify the relationship is reflected in the torrent's shared_users
|
|
self.assertIn(self.shared_user, self.torrent.shared_users.all())
|
|
|
|
|
|
class TorrentViewSetTestCase(APITestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(
|
|
username="testuser", email="test@example.com", password="testpassword"
|
|
)
|
|
self.client = APIClient()
|
|
self.client.force_authenticate(user=self.user)
|
|
|
|
self.torrent = Torrent.objects.create(
|
|
id="abc123",
|
|
name="Test Torrent",
|
|
user=self.user,
|
|
size=1000,
|
|
transmission_data={},
|
|
)
|
|
|
|
self.file = File.objects.create(
|
|
torrent=self.torrent, rel_name="test_file.txt", size=100
|
|
)
|
|
|
|
def test_list_torrents(self):
|
|
"""Test listing torrents"""
|
|
url = reverse("torrent-list")
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(len(response.data), 1)
|
|
self.assertEqual(response.data[0]["id"], self.torrent.id)
|
|
|
|
def test_retrieve_torrent(self):
|
|
"""Test retrieving a specific torrent"""
|
|
url = reverse("torrent-detail", args=[self.torrent.id])
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.data["id"], self.torrent.id)
|
|
self.assertEqual(response.data["name"], "Test Torrent")
|
|
|
|
@patch("torrent.views.torrent_share")
|
|
def test_share_torrent(self, mock_torrent_share):
|
|
"""Test sharing a torrent with another user"""
|
|
mock_torrent_share.return_value = True
|
|
|
|
shared_user = User.objects.create_user(
|
|
username="shareduser", email="shared@example.com", password="sharedpassword"
|
|
)
|
|
|
|
url = reverse("torrent-share", args=[self.torrent.id])
|
|
response = self.client.post(url, {"user_id": shared_user.id})
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertTrue(response.data["success"])
|
|
mock_torrent_share.assert_called_once()
|
|
|
|
|
|
class FileViewSetTestCase(APITestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(
|
|
username="testuser", email="test@example.com", password="testpassword"
|
|
)
|
|
self.client = APIClient()
|
|
self.client.force_authenticate(user=self.user)
|
|
|
|
self.torrent = Torrent.objects.create(
|
|
id="abc123",
|
|
name="Test Torrent",
|
|
user=self.user,
|
|
size=1000,
|
|
transmission_data={},
|
|
)
|
|
|
|
self.file = File.objects.create(
|
|
torrent=self.torrent, rel_name="test_file.txt", size=100
|
|
)
|
|
|
|
def test_list_files(self):
|
|
"""Test listing files"""
|
|
url = reverse("file-list")
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
|
|
def test_retrieve_file(self):
|
|
"""Test retrieving a specific file"""
|
|
url = reverse("file-detail", args=[self.file.id])
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.data["id"], str(self.file.id))
|
|
self.assertEqual(response.data["rel_name"], "test_file.txt")
|
|
|
|
|
|
class TransmissionUtilsTestCase(TestCase):
|
|
@patch("torrent.utils.Client")
|
|
def test_transmission_init(self, mock_client):
|
|
"""Test Transmission class initialization"""
|
|
transmission = Transmission()
|
|
mock_client.assert_called_once_with(**settings.TRANSMISSION)
|
|
|
|
@patch("torrent.utils.Client")
|
|
def test_add_torrent(self, mock_client):
|
|
"""Test adding a torrent"""
|
|
mock_instance = mock_client.return_value
|
|
mock_instance.add_torrent.return_value = MagicMock()
|
|
|
|
transmission = Transmission()
|
|
file_obj = MagicMock()
|
|
result = transmission.add_torrent(file_obj)
|
|
|
|
mock_instance.add_torrent.assert_called_once_with(file_obj)
|
|
self.assertEqual(result, mock_instance.add_torrent.return_value)
|
|
|
|
@patch("torrent.utils.Client")
|
|
def test_get_data(self, mock_client):
|
|
"""Test getting torrent data"""
|
|
mock_instance = mock_client.return_value
|
|
mock_torrent = MagicMock()
|
|
mock_torrent.progress = 50
|
|
mock_torrent.fields = {"name": "Test", "size": 1000}
|
|
mock_instance.get_torrent.return_value = mock_torrent
|
|
|
|
transmission = Transmission()
|
|
result = transmission.get_data("hash123")
|
|
|
|
mock_instance.get_torrent.assert_called_once_with(
|
|
"hash123", transmission.trpc_args
|
|
)
|
|
self.assertEqual(result["progress"], 50)
|
|
self.assertEqual(result["name"], "Test")
|
|
self.assertEqual(result["size"], 1000)
|
|
|
|
|
|
class TorrentProceedTestCase(TestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(
|
|
username="testuser",
|
|
email="test@example.com",
|
|
password="testpassword",
|
|
max_size=10000,
|
|
)
|
|
|
|
@patch("torrent.utils.transmission_handler")
|
|
def test_torrent_proceed_size_exceed(self, mock_transmission):
|
|
"""Test torrent_proceed when user size is exceeded"""
|
|
# Set user's used size to exceed max_size
|
|
self.user.max_size = 100
|
|
Torrent.objects.create(
|
|
id="abc123",
|
|
name="Test Torrent",
|
|
user=self.user,
|
|
size=200, # Exceeds max_size
|
|
transmission_data={},
|
|
)
|
|
|
|
file_obj = MagicMock()
|
|
result = torrent_proceed(self.user, file_obj)
|
|
|
|
self.assertEqual(result["status"], "error")
|
|
self.assertEqual(result["message"], "Size exceed")
|
|
mock_transmission.add_torrent.assert_not_called()
|
|
|
|
@patch("torrent.utils.transmission_handler")
|
|
def test_torrent_proceed_transmission_error(self, mock_transmission):
|
|
"""Test torrent_proceed when transmission raises an error"""
|
|
from transmission_rpc.error import TransmissionError
|
|
|
|
mock_transmission.add_torrent.side_effect = TransmissionError("Test error")
|
|
|
|
file_obj = MagicMock()
|
|
result = torrent_proceed(self.user, file_obj)
|
|
|
|
self.assertEqual(result["status"], "error")
|
|
self.assertEqual(result["message"], "Transmission Error")
|