You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

564 lines
23 KiB

import json
import hmac
import hashlib
import logging
from typing import Dict, Any
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from apps.course.models import CourseLiveSession, LiveSessionUser, Course, Participant
from apps.account.models import User
from utils.exceptions import AppAPIException
logger = logging.getLogger(__name__)
@method_decorator(csrf_exempt, name='dispatch')
class PlugNMeetWebhookAPIView(APIView):
"""
Webhook endpoint to receive events from PlugNMeet server.
Events handled:
- room_finished: Close the live session
- participant_joined: Create LiveSessionUser entry
- participant_left: Mark LiveSessionUser as offline/exited
- end_recording: (Future implementation)
"""
permission_classes = [AllowAny]
def post(self, request, *args, **kwargs):
logger.info(f"[PlugNMeet Webhook] Received webhook request")
# Verify webhook signature
if not self._verify_webhook_signature(request):
logger.warning(f"[PlugNMeet Webhook] Invalid signature")
raise AppAPIException(
{'message': 'Invalid webhook signature'},
status_code=status.HTTP_403_FORBIDDEN
)
try:
payload = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.error(f"[PlugNMeet Webhook] Invalid JSON payload - error={str(e)}")
raise AppAPIException(
{'message': 'Invalid JSON payload'},
status_code=status.HTTP_400_BAD_REQUEST
)
event = payload.get('event')
if not event:
logger.warning(f"[PlugNMeet Webhook] Missing event field")
raise AppAPIException(
{'message': 'Missing event field'},
status_code=status.HTTP_400_BAD_REQUEST
)
logger.info(f"[PlugNMeet Webhook] Processing event={event}")
# Route to appropriate handler
handler_map = {
'room_finished': self._handle_room_finished,
'participant_joined': self._handle_participant_joined,
'participant_left': self._handle_participant_left,
'end_recording': self._handle_end_recording,
}
handler = handler_map.get(event)
if not handler:
logger.info(f"[PlugNMeet Webhook] Unhandled event={event}, ignoring")
return Response({'status': 'ok', 'message': f'Event {event} ignored'})
try:
result = handler(payload)
logger.info(f"[PlugNMeet Webhook] Event processed successfully - event={event}")
return Response({'status': 'ok', **result})
except Exception as e:
logger.error(f"[PlugNMeet Webhook] Error processing event={event} - error={str(e)}", exc_info=True)
return Response(
{'status': 'error', 'message': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def _verify_webhook_signature(self, request) -> bool:
"""
Verify webhook signature using SHA256 HMAC.
Expects Hash-Token header with SHA256 signature of request body.
"""
hash_token = request.headers.get('Hash-Token')
if not hash_token:
logger.warning(f"[PlugNMeet Webhook] Missing Hash-Token header")
return False
# Get API secret from settings
api_secret = getattr(settings, 'PLUGNMEET_API_SECRET', None)
if not api_secret:
logger.error(f"[PlugNMeet Webhook] PLUGNMEET_API_SECRET not configured")
raise ImproperlyConfigured("PLUGNMEET_API_SECRET is not configured")
# Calculate expected signature
body = request.body
expected_signature = hmac.new(
api_secret.encode('utf-8'),
body,
hashlib.sha256
).hexdigest()
# Compare signatures (constant time comparison)
is_valid = hmac.compare_digest(hash_token, expected_signature)
if not is_valid:
logger.warning(f"[PlugNMeet Webhook] Signature mismatch - expected={expected_signature[:10]}... got={hash_token[:10]}...")
return is_valid
def _handle_room_finished(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle room_finished event: Close the live session and all user sessions.
Payload structure:
{
"event": "room_finished",
"room": {
"sid": "room-123456",
"identity": "course-slug-20240101120000",
"name": "کلاس جبر",
"duration": 3600
}
}
"""
room_data = payload.get('room', {})
room_id = room_data.get('identity')
if not room_id:
logger.warning(f"[PlugNMeet Webhook] room_finished: Missing room identity")
return {'message': 'Missing room identity'}
logger.info(f"[PlugNMeet Webhook] room_finished - room_id={room_id}")
try:
session = CourseLiveSession.objects.get(room_id=room_id, ended_at__isnull=True)
except CourseLiveSession.DoesNotExist:
logger.warning(f"[PlugNMeet Webhook] room_finished: Session not found or already ended - room_id={room_id}")
return {'message': 'Session not found or already ended'}
# Close the session
now = timezone.now()
session.ended_at = now
session.save(update_fields=['ended_at', 'updated_at'])
logger.info(f"[PlugNMeet Webhook] Session closed - session_id={session.id} room_id={room_id}")
# Close all active user sessions
updated_count = LiveSessionUser.objects.filter(
session=session,
is_online=True,
exited_at__isnull=True
).update(
is_online=False,
exited_at=now,
updated_at=now
)
logger.info(f"[PlugNMeet Webhook] User sessions closed - session_id={session.id} count={updated_count}")
return {
'message': 'Room finished',
'session_id': session.id,
'users_disconnected': updated_count
}
def _handle_participant_joined(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle participant_joined event: Create or update LiveSessionUser entry.
Payload structure:
{
"event": "participant_joined",
"room": {
"sid": "room-123456",
"identity": "course-slug-20240101120000"
},
"participant": {
"sid": "participant-user-27",
"identity": "27",
"name": "دانشجو نمونه",
"metadata": "{\"is_admin\": false}",
"joinedAt": 1697497300
}
}
"""
room_data = payload.get('room', {})
participant_data = payload.get('participant', {})
room_id = room_data.get('identity')
user_identity = participant_data.get('identity')
joined_at_timestamp = participant_data.get('joinedAt')
if not room_id or not user_identity:
logger.warning(f"[PlugNMeet Webhook] participant_joined: Missing room_id or user_identity")
return {'message': 'Missing required fields'}
logger.info(f"[PlugNMeet Webhook] participant_joined - room_id={room_id} user_identity={user_identity}")
# Get session
try:
session = CourseLiveSession.objects.get(room_id=room_id, ended_at__isnull=True)
except CourseLiveSession.DoesNotExist:
logger.warning(f"[PlugNMeet Webhook] participant_joined: Active session not found - room_id={room_id}")
return {'message': 'Active session not found'}
# Get user
try:
user = User.objects.get(id=int(user_identity))
except (User.DoesNotExist, ValueError):
logger.warning(f"[PlugNMeet Webhook] participant_joined: User not found - user_identity={user_identity}")
return {'message': 'User not found'}
# Determine user role
is_admin = user.can_manage_course(session.course)
role = 'moderator' if is_admin else 'participant'
# Parse joined_at timestamp
if joined_at_timestamp:
entered_at = timezone.datetime.fromtimestamp(joined_at_timestamp, tz=timezone.utc)
else:
entered_at = timezone.now()
# Check if user has an existing session entry that was marked as offline
# If so, reactivate it instead of creating a new one
existing_session = LiveSessionUser.objects.filter(
session=session,
user=user,
is_online=False
).order_by('-entered_at').first()
if existing_session:
# Reactivate existing session
existing_session.is_online = True
existing_session.exited_at = None
existing_session.save(update_fields=['is_online', 'exited_at', 'updated_at'])
logger.info(f"[PlugNMeet Webhook] User rejoined (reactivated) - session_user_id={existing_session.id} user_id={user.id}")
return {
'message': 'Participant rejoined',
'session_user_id': existing_session.id,
'created': False
}
# Create new LiveSessionUser entry
try:
session_user = LiveSessionUser.objects.create(
session=session,
user=user,
role=role,
entered_at=entered_at,
is_online=True,
)
logger.info(f"[PlugNMeet Webhook] User joined - session_user_id={session_user.id} user_id={user.id} role={role}")
except Exception as e:
logger.error(f"[PlugNMeet Webhook] Failed to create session user - error={str(e)}")
return {'message': f'Failed to create session user: {str(e)}'}
return {
'message': 'Participant joined',
'session_user_id': session_user.id,
'created': created
}
def _handle_participant_left(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle participant_left event: Mark LiveSessionUser as offline/exited.
Payload structure:
{
"event": "participant_left",
"room": {
"sid": "room-123456",
"identity": "course-slug-20240101120000"
},
"participant": {
"sid": "participant-user-27",
"identity": "27",
"state": "DISCONNECTED",
"joinedAt": 1697497300,
"duration": 1800
}
}
"""
room_data = payload.get('room', {})
participant_data = payload.get('participant', {})
room_id = room_data.get('identity')
user_identity = participant_data.get('identity')
if not room_id or not user_identity:
logger.warning(f"[PlugNMeet Webhook] participant_left: Missing room_id or user_identity")
return {'message': 'Missing required fields'}
logger.info(f"[PlugNMeet Webhook] participant_left - room_id={room_id} user_identity={user_identity}")
# Get session
try:
session = CourseLiveSession.objects.get(room_id=room_id)
except CourseLiveSession.DoesNotExist:
logger.warning(f"[PlugNMeet Webhook] participant_left: Session not found - room_id={room_id}")
return {'message': 'Session not found'}
# Get user
try:
user = User.objects.get(id=int(user_identity))
except (User.DoesNotExist, ValueError):
logger.warning(f"[PlugNMeet Webhook] participant_left: User not found - user_identity={user_identity}")
return {'message': 'User not found'}
# Update LiveSessionUser
now = timezone.now()
updated_count = LiveSessionUser.objects.filter(
session=session,
user=user,
is_online=True,
exited_at__isnull=True
).update(
is_online=False,
exited_at=now,
updated_at=now
)
if updated_count > 0:
logger.info(f"[PlugNMeet Webhook] User left - session_id={session.id} user_id={user.id}")
else:
logger.warning(f"[PlugNMeet Webhook] participant_left: No active session found for user - session_id={session.id} user_id={user.id}")
return {
'message': 'Participant left',
'updated': updated_count > 0
}
def _handle_end_recording(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle end_recording event: Download and save recording file.
Payload structure:
{
"event": "end_recording",
"room": {
"sid": "room-123456",
"identity": "course-slug-20240101120000"
},
"recording_info": {
"recordingId": "rec-123456",
"roomId": "course-slug-20240101120000",
"recordingType": "COMPOSITE",
"fileName": "algebra-1402-20231016.mp4",
"duration": 3600,
"status": "FINISHED"
}
}
"""
from django.core.files.base import ContentFile
from apps.course.services.plugnmeet import PlugNMeetClient, PlugNMeetError
from apps.course.models import LiveSessionRecording
import os
import tempfile
import subprocess
room_data = payload.get('room', {})
recording_info = payload.get('recording_info', {})
room_id = room_data.get('identity')
recording_id = recording_info.get('recordingId')
file_name = recording_info.get('fileName', 'recording.mp4')
duration = recording_info.get('duration', 0)
if not room_id or not recording_id:
logger.warning(f"[PlugNMeet Webhook] end_recording: Missing room_id or recording_id")
return {'message': 'Missing required fields'}
logger.info(f"[PlugNMeet Webhook] end_recording - room_id={room_id} recording_id={recording_id}")
# Get session
try:
session = CourseLiveSession.objects.get(room_id=room_id)
except CourseLiveSession.DoesNotExist:
logger.warning(f"[PlugNMeet Webhook] end_recording: Session not found - room_id={room_id}")
return {'message': 'Session not found'}
try:
client = PlugNMeetClient()
# Step 1: Get recording info
logger.info(f"[PlugNMeet Webhook] Fetching recording info - recording_id={recording_id}")
recording_data = client.get_recording_info(recording_id)
if not recording_data.get('status'):
logger.error(f"[PlugNMeet Webhook] Failed to get recording info - recording_id={recording_id}")
return {'message': 'Failed to get recording info', 'error': recording_data.get('msg')}
# Step 2: Get download token
logger.info(f"[PlugNMeet Webhook] Getting download token - recording_id={recording_id}")
token_response = client.get_recording_download_token(recording_id)
if not token_response.get('status'):
logger.error(f"[PlugNMeet Webhook] Failed to get download token - recording_id={recording_id}")
return {'message': 'Failed to get download token', 'error': token_response.get('msg')}
download_token = token_response.get('token')
if not download_token:
logger.error(f"[PlugNMeet Webhook] No download token in response - recording_id={recording_id}")
return {'message': 'No download token received'}
# Step 3: Download file
download_path = f"/download/recording/{download_token}"
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file:
tmp_file_path = tmp_file.name
try:
logger.info(f"[PlugNMeet Webhook] Downloading recording file - recording_id={recording_id}")
client.download_file(download_path, tmp_file_path)
# Get file size
file_size = os.path.getsize(tmp_file_path)
logger.info(f"[PlugNMeet Webhook] File downloaded - size={file_size} bytes")
# Determine recording type (video or audio)
is_video = file_name.lower().endswith(('.mp4', '.webm', '.mkv'))
recording_type = 'video' if is_video else 'voice'
# Read file content
with open(tmp_file_path, 'rb') as f:
file_content = f.read()
# Create LiveSessionRecording entry
recording = LiveSessionRecording.objects.create(
session=session,
title=f"{session.subject} - Recording",
file_time=timezone.timedelta(seconds=duration) if duration > 0 else None,
recording_type=recording_type,
)
# Save file to Django FileField
recording.file.save(file_name, ContentFile(file_content), save=True)
logger.info(f"[PlugNMeet Webhook] Recording saved - recording_id={recording.id} file={file_name}")
# Generate thumbnail for video recordings (if ffmpeg is available)
thumbnail_generated = False
if is_video and file_size > 0:
try:
thumbnail_generated = self._generate_video_thumbnail(tmp_file_path, recording)
if thumbnail_generated:
logger.info(f"[PlugNMeet Webhook] Thumbnail generated - recording_id={recording.id}")
except Exception as e:
logger.warning(f"[PlugNMeet Webhook] Thumbnail generation skipped - error={str(e)}")
return {
'message': 'Recording downloaded and saved successfully',
'recording_id': recording.id,
'file_name': file_name,
'file_size': file_size,
'thumbnail_generated': thumbnail_generated
}
finally:
# Clean up temporary file
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
logger.debug(f"[PlugNMeet Webhook] Temporary file deleted - path={tmp_file_path}")
except PlugNMeetError as e:
logger.error(f"[PlugNMeet Webhook] PlugNMeet API error - recording_id={recording_id} error={str(e)}")
return {'message': f'PlugNMeet API error: {str(e)}'}
except Exception as e:
logger.error(f"[PlugNMeet Webhook] Unexpected error - recording_id={recording_id} error={str(e)}", exc_info=True)
return {'message': f'Unexpected error: {str(e)}'}
def _generate_video_thumbnail(self, video_path: str, recording: 'LiveSessionRecording') -> bool:
"""
Generate thumbnail from video file using ffmpeg.
Args:
video_path: Path to the video file
recording: LiveSessionRecording instance
Returns:
True if thumbnail generated successfully, False otherwise
"""
from django.core.files.base import ContentFile
import subprocess
import tempfile
import os
try:
# Create temporary file for thumbnail
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_thumb:
thumbnail_path = tmp_thumb.name
# Extract frame at 1 second using ffmpeg
# -ss 1: seek to 1 second
# -i: input file
# -frames:v 1: extract 1 frame
# -q:v 2: quality (2 is high quality)
cmd = [
'ffmpeg',
'-ss', '1',
'-i', video_path,
'-frames:v', '1',
'-q:v', '2',
'-vf', 'scale=640:-1', # scale to width 640, maintain aspect ratio
'-y', # overwrite output file
thumbnail_path
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=30
)
if result.returncode != 0:
logger.warning(f"[PlugNMeet Webhook] ffmpeg failed - return_code={result.returncode}")
return False
# Check if thumbnail was created
if not os.path.exists(thumbnail_path) or os.path.getsize(thumbnail_path) == 0:
logger.warning(f"[PlugNMeet Webhook] Thumbnail file not created or empty")
return False
# Save thumbnail to recording
with open(thumbnail_path, 'rb') as f:
thumbnail_content = f.read()
thumbnail_filename = f"thumb_{recording.id}.jpg"
recording.thumbnail.save(thumbnail_filename, ContentFile(thumbnail_content), save=True)
# Clean up temporary file
os.unlink(thumbnail_path)
return True
except subprocess.TimeoutExpired:
logger.error(f"[PlugNMeet Webhook] ffmpeg timeout during thumbnail generation")
return False
except FileNotFoundError:
logger.error(f"[PlugNMeet Webhook] ffmpeg not found - please install ffmpeg")
return False
except Exception as e:
logger.error(f"[PlugNMeet Webhook] Thumbnail generation error - error={str(e)}", exc_info=True)
return False
finally:
# Clean up temporary file if it still exists
if 'thumbnail_path' in locals() and os.path.exists(thumbnail_path):
try:
os.unlink(thumbnail_path)
except:
pass