init
This commit is contained in:
100
app/torrent/consumers.py
Normal file
100
app/torrent/consumers.py
Normal file
@@ -0,0 +1,100 @@
|
||||
from django.db.models import Q
|
||||
from django.db.models.functions import Coalesce
|
||||
|
||||
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
||||
from typing import Optional, Union
|
||||
import asyncio
|
||||
|
||||
from user.models import User
|
||||
from .models import Torrent
|
||||
|
||||
|
||||
class TorrentEventConsumer(AsyncJsonWebsocketConsumer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.channel_groups = set()
|
||||
self.user: Optional[User] = None
|
||||
self.follow_user: Optional[User] = None
|
||||
|
||||
async def connect(self):
|
||||
self.user = self.scope['user']
|
||||
if not self.user.is_authenticated:
|
||||
await self.close()
|
||||
return
|
||||
|
||||
self.follow_user = self.user
|
||||
await self.channel_layer.group_add(f"user_{self.follow_user.id}", self.channel_name)
|
||||
|
||||
# user_id = int(self.scope['url_route']["kwargs"]["user_id"])
|
||||
# if user_id == self.user.id:
|
||||
# self.follow_user = self.user
|
||||
# else:
|
||||
# if await self.change_follow_user(user_id) is None:
|
||||
# await self.close()
|
||||
# return
|
||||
|
||||
await self.channel_layer.group_add("torrent", self.channel_name)
|
||||
await self.accept()
|
||||
|
||||
async def disconnect(self, code):
|
||||
await self.channel_layer.group_discard("torrent", self.channel_name)
|
||||
|
||||
async def dispatch(self, message):
|
||||
print("dispatch ws :", message)
|
||||
return await super().dispatch(message)
|
||||
|
||||
async def receive_json(self, content, **kwargs):
|
||||
if "context" not in content:
|
||||
return
|
||||
match content["context"]:
|
||||
case "change_follow_user":
|
||||
await self.change_follow_user(content["user_id"])
|
||||
case _:
|
||||
print("call websocket not supported", content)
|
||||
|
||||
async def change_follow_user(self, user_id):
|
||||
await self.channel_layer.group_discard(f"user_{self.follow_user.id}", self.channel_name)
|
||||
if user_id == self.user.id:
|
||||
self.follow_user = self.user
|
||||
await self.channel_layer.group_add(f"user_{self.follow_user.id}", self.channel_name)
|
||||
return self.follow_user
|
||||
elif await self.user.friends.filter(id=user_id).aexists():
|
||||
self.follow_user = await User.objects.filter(id=user_id).aget()
|
||||
await self.channel_layer.group_add(f"user_{self.follow_user.id}", self.channel_name)
|
||||
return self.follow_user
|
||||
else:
|
||||
return None
|
||||
|
||||
async def transmission_data_updated(self, datas):
|
||||
torrent_stats = datas["data"]
|
||||
qs = (Torrent.objects
|
||||
.filter(Q(user_id=self.follow_user.id) | Q(shared_users=self.follow_user.id))
|
||||
.values_list("id", flat=True).distinct())
|
||||
torrent_ids = [i async for i in qs]
|
||||
|
||||
for hash_string, data in torrent_stats.items():
|
||||
if hash_string in torrent_ids:
|
||||
await self.send_json({
|
||||
"context": "transmission_data_updated",
|
||||
"data": data
|
||||
})
|
||||
|
||||
async def add_torrent(self, data):
|
||||
await self.send_json({
|
||||
"context": "add_torrent",
|
||||
"torrent_id": data["data"]
|
||||
})
|
||||
|
||||
async def remove_torrent(self, data):
|
||||
await self.send_json({
|
||||
"context": "remove_torrent",
|
||||
"torrent_id": data["data"]
|
||||
})
|
||||
|
||||
async def update_torrent(self, data):
|
||||
await self.send_json({
|
||||
"context": "update_torrent",
|
||||
"torrent_id": data["data"]["torrent_id"],
|
||||
"updated_fields": data["data"]["updated_fields"]
|
||||
})
|
||||
Reference in New Issue
Block a user