You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

151 lines
6.1 KiB

import json
import hmac
import hashlib
from typing import Any, Dict, Optional
from urllib.parse import urljoin
import requests
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
class PlugNMeetError(Exception):
def __init__(self, message: str, *, status_code: Optional[int] = None, response_data: Optional[Dict[str, Any]] = None):
super().__init__(message)
self.status_code = status_code
self.response_data = response_data or {}
class PlugNMeetClient:
def __init__(self, *, base_url: Optional[str] = None, api_key: Optional[str] = None, api_secret: Optional[str] = None, timeout: Optional[float] = None):
self.base_url = (base_url or getattr(settings, "PLUGNMEET_SERVER_URL", "")).rstrip("/")
self.api_key = api_key or getattr(settings, "PLUGNMEET_API_KEY", "")
self.api_secret = api_secret or getattr(settings, "PLUGNMEET_API_SECRET", "")
self.timeout = timeout or getattr(settings, "PLUGNMEET_TIMEOUT", 10.0)
if not self.base_url or not self.api_key or not self.api_secret:
raise ImproperlyConfigured("PlugNMeet integration settings are incomplete.")
def create_room(self, payload: Dict[str, Any]) -> Dict[str, Any]:
# Convert entire payload keys to camelCase as required by PlugNMeet protocol
print(f"[PlugNMeet] Creating room with payload: {payload}")
prepared = self._camelize_dict(payload)
return self._post("/auth/room/create", prepared)
def get_join_token(self, payload: Dict[str, Any]) -> Dict[str, Any]:
# Convert entire payload keys to camelCase as required by PlugNMeet protocol
prepared = self._camelize_dict(payload)
return self._post("/auth/room/getJoinToken", prepared)
def is_room_active(self, room_id: str) -> Dict[str, Any]:
return self._post("/auth/room/isRoomActive", {"roomId": room_id})
def get_recording_info(self, record_id: str) -> Dict[str, Any]:
"""Get detailed information about a recording."""
return self._post("/auth/recording/recordingInfo", {"recordId": record_id})
def get_recording_download_token(self, record_id: str) -> Dict[str, Any]:
"""Get a temporary download token for a recording."""
return self._post("/auth/recording/getDownloadToken", {"recordId": record_id})
def download_file(self, download_path: str, save_to: str) -> bool:
"""
Download a file from PlugNMeet server.
Args:
download_path: The download path (e.g., '/download/recording/token_xxx')
save_to: Local file path to save the downloaded file
Returns:
True if download successful, False otherwise
"""
import logging
logger = logging.getLogger(__name__)
url = urljoin(f"{self.base_url}/", download_path.lstrip("/"))
logger.info(f"[PlugNMeet] Downloading file from {url}")
try:
response = requests.get(url, stream=True, timeout=300) # 5 minute timeout for large files
response.raise_for_status()
# Write file in chunks
with open(save_to, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info(f"[PlugNMeet] File downloaded successfully to {save_to}")
return True
except requests.RequestException as exc:
logger.error(f"[PlugNMeet] Failed to download file - error={str(exc)}")
raise PlugNMeetError(f"Failed to download file: {str(exc)}") from exc
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = urljoin(f"{self.base_url}/", path.lstrip("/"))
body = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
headers = {
"Content-Type": "application/json",
"API-KEY": self.api_key,
"HASH-SIGNATURE": self._build_signature(body),
}
import logging
logger = logging.getLogger(__name__)
logger.debug(f"[PlugNMeet] POST {path} - Body: {body[:500]}")
try:
response = requests.post(url, data=body.encode("utf-8"), headers=headers, timeout=self.timeout)
except requests.RequestException as exc:
raise PlugNMeetError("Failed to reach PlugNMeet server.") from exc
if response.status_code >= 400:
response_data = self._safe_json(response)
raise PlugNMeetError(
"PlugNMeet server returned an error.",
status_code=response.status_code,
response_data=response_data,
)
data = self._safe_json(response)
if data is None:
raise PlugNMeetError("PlugNMeet server returned an invalid response format.")
if isinstance(data, dict) and data.get('status') is False:
error_message = data.get('msg') or data.get('message') or "PlugNMeet operation failed."
raise PlugNMeetError(
error_message,
status_code=response.status_code,
response_data=data,
)
return data
@staticmethod
def _snake_to_camel(key: str) -> str:
parts = key.split("_")
if not parts:
return key
return parts[0] + "".join(p.capitalize() or "" for p in parts[1:])
def _camelize_dict(self, obj: Any) -> Any:
if isinstance(obj, dict):
return {self._snake_to_camel(k): self._camelize_dict(v) for k, v in obj.items()}
if isinstance(obj, list):
return [self._camelize_dict(v) for v in obj]
return obj
def _build_signature(self, body: str) -> str:
digest = hmac.new(self.api_secret.encode("utf-8"), body.encode("utf-8"), hashlib.sha256)
return digest.hexdigest()
@staticmethod
def _safe_json(response: requests.Response) -> Optional[Dict[str, Any]]:
try:
return response.json()
except ValueError:
return None
__all__ = ["PlugNMeetClient", "PlugNMeetError"]