44 KiB
گزارش بررسی آمادگی پروداکشن - Islamic Scholar Agent
خلاصه اجرایی
این پروژه یک چتبات RAG (Retrieval-Augmented Generation) برای پاسخ به سوالات اسلامی است که با استفاده از Agno Framework، FastAPI و Vector Databases پیادهسازی شده است. با وجود عملکرد اولیه، این پروژه فاقد استانداردهای لازم برای محیط پروداکشن است و بیشتر شبیه یک پروژه تمرینی/آزمایشی میباشد.
امتیاز کلی آمادگی پروداکشن: 3/10
🔴 مشکلات بحرانی (Critical Issues)
1. ساختار پروژه نامنظم و غیرحرفهای
مشکل فعلی:
agent/
├── app/ # همه چیز در یک فولدر!
│ ├── app.py # Production code
│ ├── test_*.py # Test files mixed with production
│ ├── scholar_agent.py # Agent implementation
│ ├── connection_test.py # Test file
│ ├── README_connection_test.md # Documentation mixed in
│ └── ...
├── hadiths_data.xlsx # Data files in root
├── dovodi_articles.xlsx # Data files in root
└── requirements.txt
ساختار پیشنهادی برای پروداکشن:
agent/
├── src/ # Production source code
│ ├── agents/ # Agent implementations
│ │ ├── __init__.py
│ │ ├── base_agent.py
│ │ └── islamic_scholar_agent.py
│ ├── knowledge/ # Knowledge base & RAG pipeline
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ ├── vector_store.py
│ │ └── rag_pipeline.py
│ ├── models/ # LLM integrations
│ │ ├── __init__.py
│ │ ├── base_model.py
│ │ ├── openrouter.py
│ │ └── openai.py
│ ├── api/ # FastAPI routes
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ └── dependencies.py
│ ├── core/ # Core configurations
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── settings.py
│ │ └── logging.py
│ ├── utils/ # Utility functions
│ │ ├── __init__.py
│ │ └── helpers.py
│ └── main.py # Application entry point
├── data/ # Data files
│ ├── raw/ # Original data files
│ │ ├── hadiths_data.xlsx
│ │ └── dovodi_articles.xlsx
│ ├── processed/ # Processed/cleaned data
│ └── embeddings/ # Pre-computed embeddings (optional)
├── scripts/ # Utility scripts
│ ├── ingest_data.py # Data ingestion pipeline
│ ├── setup_vectordb.py # Vector DB initialization
│ └── health_check.py # Health check script
├── tests/ # All test files
│ ├── unit/
│ │ ├── test_agent.py
│ │ ├── test_rag.py
│ │ └── test_models.py
│ ├── integration/
│ │ ├── test_api.py
│ │ └── test_pipeline.py
│ └── conftest.py # Pytest configuration
├── docs/ # Documentation
│ ├── README.md # Main documentation
│ ├── API.md # API documentation
│ ├── DEPLOYMENT.md # Deployment guide
│ └── ARCHITECTURE.md # Architecture overview
├── config/ # Configuration files
│ ├── development.env
│ ├── production.env
│ └── models.yaml # LLM model configurations
├── .github/ # GitHub workflows (if using GitHub)
│ └── workflows/
│ └── ci.yml
├── docker/ # Docker files
│ ├── Dockerfile.dev
│ ├── Dockerfile.prod
│ └── docker-compose.dev.yml
├── .env.example # Example environment file
├── .gitignore
├── requirements.txt # Base requirements
├── requirements-dev.txt # Development requirements
├── pytest.ini # Pytest configuration
├── docker-compose.yml # Production compose
└── README.md # Project overview
تاثیر: ⭐⭐⭐⭐⭐ (بسیار بحرانی)
تلاش رفع: 2-3 روز برای بازسازی کامل
2. عدم وجود لایه انتزاع برای مدلهای LLM
مشکل:
# در app.py - hardcoded
agent = Agent(
model=OpenRouter(id="deepseek/deepseek-r1-0528:free"), # ❌ Hardcoded
...
)
# در scholar_agent.py - مدل متفاوت
scholar = Agent(
model=OpenAIChat(id="gpt-4o"), # ❌ مدل دیگر، هیچ consistency نیست
...
)
# در scholar_rag.py - باز مدل دیگر
agent = Agent(
model=OpenRouter(id="xiaomi/mimo-v2-flash:free"), # ❌ سومین مدل!
...
)
راهحل پیشنهادی:
فایل: config/models.yaml
models:
default: deepseek_r1
providers:
openrouter:
api_key: ${OPENROUTER_API_KEY}
base_url: https://openrouter.ai/api/v1
models:
deepseek_r1:
id: "deepseek/deepseek-r1-0528:free"
temperature: 0.7
max_tokens: 4096
supports_streaming: true
supports_tools: false
mimo_v2:
id: "xiaomi/mimo-v2-flash:free"
temperature: 0.7
max_tokens: 2048
supports_streaming: true
openai:
api_key: ${OPENAI_API_KEY}
base_url: https://api.openai.com/v1
models:
gpt4:
id: "gpt-4o"
temperature: 0.7
max_tokens: 4096
supports_streaming: true
supports_tools: true
gpt4_mini:
id: "gpt-4o-mini"
temperature: 0.7
max_tokens: 4096
فایل: src/models/base_model.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from pydantic import BaseModel
class LLMConfig(BaseModel):
"""Configuration for LLM models"""
id: str
temperature: float = 0.7
max_tokens: int = 4096
supports_streaming: bool = True
supports_tools: bool = False
extra_params: Dict[str, Any] = {}
class BaseLLMProvider(ABC):
"""Abstract base class for LLM providers"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
self.base_url = base_url
@abstractmethod
def get_model(self, config: LLMConfig):
"""Return configured model instance"""
pass
فایل: src/models/factory.py
from typing import Optional
import yaml
from pathlib import Path
from agno.models.openrouter import OpenRouter
from agno.models.openai import OpenAIChat
from .base_model import LLMConfig, BaseLLMProvider
class ModelFactory:
"""Factory for creating LLM instances from configuration"""
def __init__(self, config_path: str = "config/models.yaml"):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self._providers = {}
def get_model(self, model_name: Optional[str] = None):
"""
Get model instance by name.
Args:
model_name: Name of the model (e.g., 'deepseek_r1', 'gpt4')
If None, uses default from config
Returns:
Configured model instance
Example:
>>> factory = ModelFactory()
>>> model = factory.get_model('deepseek_r1')
>>> # Switch to GPT-4 with single line change:
>>> model = factory.get_model('gpt4')
"""
if model_name is None:
model_name = self.config['models']['default']
# Find which provider has this model
for provider_name, provider_config in self.config['models']['providers'].items():
if model_name in provider_config['models']:
model_config = provider_config['models'][model_name]
if provider_name == 'openrouter':
return OpenRouter(
id=model_config['id'],
api_key=provider_config['api_key'],
temperature=model_config.get('temperature', 0.7),
max_tokens=model_config.get('max_tokens', 4096),
)
elif provider_name == 'openai':
return OpenAIChat(
id=model_config['id'],
api_key=provider_config['api_key'],
temperature=model_config.get('temperature', 0.7),
max_tokens=model_config.get('max_tokens', 4096),
)
raise ValueError(f"Model '{model_name}' not found in configuration")
def list_available_models(self):
"""List all available models"""
models = []
for provider_name, provider_config in self.config['models']['providers'].items():
for model_name in provider_config['models'].keys():
models.append({
'name': model_name,
'provider': provider_name,
'id': provider_config['models'][model_name]['id']
})
return models
# Usage in application:
# model = ModelFactory().get_model() # Uses default
# model = ModelFactory().get_model('gpt4') # Uses GPT-4
استفاده در کد:
# قبل (❌ Bad):
from agno.models.openrouter import OpenRouter
agent = Agent(
model=OpenRouter(id="deepseek/deepseek-r1-0528:free"),
...
)
# بعد (✅ Good):
from src.models.factory import ModelFactory
model_factory = ModelFactory()
agent = Agent(
model=model_factory.get_model(), # Uses default from config
# OR
model=model_factory.get_model('gpt4'), # Easy switching!
...
)
مزایا:
- ✅ تغییر مدل با یک خط کد
- ✅ مدیریت متمرکز تنظیمات
- ✅ قابلیت A/B testing ساده
- ✅ مدیریت آسان API keys
- ✅ امکان افزودن provider جدید بدون تغییر کد
تاثیر: ⭐⭐⭐⭐⭐ (بحرانی)
تلاش رفع: 1 روز
3. پایپلاین Data Ingestion نامنظم و غیرقابل اطمینان
مشکلات موجود:
# در ingest_excel.py - ❌ Problems:
def ingest_hadiths(file_path: str):
df = pd.read_excel(file_path) # ❌ No validation
count = 0
for _, row in df.iterrows(): # ❌ No error handling per row
content = f"HADITH TYPE: HADITH\n..." # ❌ Hardcoded format
knowledge_base.add_content(text_content=content) # ❌ No retry logic
count += 1
print(f"✅ {count} Hadiths") # ❌ Only print, no logging
# ❌ Hardcoded paths
qdrant_url = "http://localhost:6333"
# ❌ No progress tracking
# ❌ No duplicate detection
# ❌ No data validation
# ❌ Can't resume if failed midway
راهحل پیشنهادی:
فایل: src/core/config.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""Application settings loaded from environment"""
# Database
DATABASE_URL: str
# Vector Database
VECTOR_DB_TYPE: str = "qdrant" # or "pgvector"
QDRANT_URL: str = "http://qdrant:6333"
QDRANT_COLLECTION: str = "islamic_knowledge"
PGVECTOR_TABLE: str = "islamic_knowledge"
# Embeddings
EMBEDDING_MODEL: str = "all-MiniLM-L6-v2"
EMBEDDING_BATCH_SIZE: int = 100
EMBEDDING_DIMENSIONS: int = 384
# LLM
LLM_MODEL_NAME: str = "deepseek_r1"
# Ingestion
DATA_DIR: str = "data/raw"
PROCESSED_DATA_DIR: str = "data/processed"
BATCH_SIZE: int = 50
MAX_RETRIES: int = 3
# Monitoring
LANGFUSE_PUBLIC_KEY: Optional[str] = None
LANGFUSE_SECRET_KEY: Optional[str] = None
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
فایل: scripts/ingest_data.py
import logging
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
from tqdm import tqdm
from pydantic import BaseModel, field_validator
from src.core.config import settings
from src.knowledge.vector_store import get_vector_store
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/ingestion.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class HadithRecord(BaseModel):
"""Validated Hadith record"""
title: str
arabic_text: str
translation: str
source_info: str
@field_validator('title', 'arabic_text', 'translation')
@classmethod
def not_empty(cls, v):
if not v or not v.strip():
raise ValueError('Field cannot be empty')
return v.strip()
class ArticleRecord(BaseModel):
"""Validated Article record"""
title: str
author: str
content: str
url: str
@field_validator('title', 'content')
@classmethod
def not_empty(cls, v):
if not v or not v.strip():
raise ValueError('Field cannot be empty')
return v.strip()
class DataIngestionPipeline:
"""Production-ready data ingestion pipeline"""
def __init__(self):
self.vector_store = get_vector_store()
self.processed_ids = self._load_processed_ids()
self.stats = {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'errors': []
}
def _load_processed_ids(self) -> set:
"""Load IDs of already processed documents (for resumability)"""
processed_file = Path(settings.PROCESSED_DATA_DIR) / "processed_ids.txt"
if processed_file.exists():
with open(processed_file) as f:
return set(line.strip() for line in f)
return set()
def _save_processed_id(self, doc_id: str):
"""Save processed document ID"""
processed_file = Path(settings.PROCESSED_DATA_DIR) / "processed_ids.txt"
processed_file.parent.mkdir(parents=True, exist_ok=True)
with open(processed_file, 'a') as f:
f.write(f"{doc_id}\n")
def ingest_hadiths(self, file_path: str):
"""
Ingest Hadiths with validation, error handling, and progress tracking
Features:
- ✅ Data validation with Pydantic
- ✅ Row-level error handling
- ✅ Progress tracking
- ✅ Duplicate detection
- ✅ Resumable (tracks processed IDs)
- ✅ Comprehensive logging
- ✅ Batch processing for efficiency
"""
logger.info(f"Starting Hadith ingestion from {file_path}")
try:
df = pd.read_excel(file_path)
logger.info(f"Loaded {len(df)} records from Excel")
except Exception as e:
logger.error(f"Failed to load Excel file: {e}")
raise
# Validate required columns
required_cols = {'Title', 'Arabic Text', 'Translation', 'Source Info'}
missing_cols = required_cols - set(df.columns)
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
batch = []
for idx, row in tqdm(df.iterrows(), total=len(df), desc="Processing Hadiths"):
self.stats['total'] += 1
# Generate unique ID
doc_id = f"hadith_{idx}"
# Skip if already processed
if doc_id in self.processed_ids:
logger.debug(f"Skipping already processed: {doc_id}")
self.stats['skipped'] += 1
continue
try:
# Validate data
hadith = HadithRecord(
title=row.get('Title', ''),
arabic_text=row.get('Arabic Text', ''),
translation=row.get('Translation', ''),
source_info=row.get('Source Info', '')
)
# Format content
content = (
f"TYPE: HADITH\n"
f"TITLE: {hadith.title}\n"
f"ARABIC: {hadith.arabic_text}\n"
f"TRANSLATION: {hadith.translation}\n"
f"SOURCE: {hadith.source_info}"
)
# Add to batch
batch.append({
'id': doc_id,
'content': content,
'metadata': {
'type': 'hadith',
'title': hadith.title,
'source': hadith.source_info
}
})
# Process batch when full
if len(batch) >= settings.BATCH_SIZE:
self._process_batch(batch)
batch = []
except Exception as e:
logger.error(f"Error processing row {idx}: {e}")
self.stats['failed'] += 1
self.stats['errors'].append({
'row': idx,
'error': str(e)
})
continue
# Process remaining batch
if batch:
self._process_batch(batch)
logger.info(f"Hadith ingestion completed: {self.stats}")
def _process_batch(self, batch: List[Dict[str, Any]]):
"""Process a batch of documents with retry logic"""
for attempt in range(settings.MAX_RETRIES):
try:
# Add to vector store
for doc in batch:
self.vector_store.add_content(
text_content=doc['content'],
metadata=doc['metadata']
)
self._save_processed_id(doc['id'])
self.processed_ids.add(doc['id'])
self.stats['success'] += 1
logger.info(f"Successfully processed batch of {len(batch)} documents")
break
except Exception as e:
if attempt < settings.MAX_RETRIES - 1:
logger.warning(f"Batch processing failed (attempt {attempt + 1}): {e}")
continue
else:
logger.error(f"Batch processing failed after {settings.MAX_RETRIES} attempts: {e}")
for doc in batch:
self.stats['failed'] += 1
self.stats['errors'].append({
'doc_id': doc['id'],
'error': str(e)
})
def generate_report(self) -> str:
"""Generate ingestion summary report"""
report = f"""
╔══════════════════════════════════════════╗
║ Data Ingestion Summary Report ║
╚══════════════════════════════════════════╝
Total Records: {self.stats['total']}
✅ Successfully Processed: {self.stats['success']}
⏭️ Skipped (Already Processed): {self.stats['skipped']}
❌ Failed: {self.stats['failed']}
Success Rate: {(self.stats['success'] / self.stats['total'] * 100):.2f}%
"""
if self.stats['errors']:
report += "\n❌ Errors:\n"
for error in self.stats['errors'][:10]: # Show first 10
report += f" - {error}\n"
if len(self.stats['errors']) > 10:
report += f" ... and {len(self.stats['errors']) - 10} more errors\n"
return report
# CLI interface
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Ingest data into vector database')
parser.add_argument('--hadiths', type=str, help='Path to hadiths Excel file')
parser.add_argument('--articles', type=str, help='Path to articles Excel file')
parser.add_argument('--reset', action='store_true', help='Reset processed IDs (re-ingest all)')
args = parser.parse_args()
# Reset if requested
if args.reset:
processed_file = Path(settings.PROCESSED_DATA_DIR) / "processed_ids.txt"
if processed_file.exists():
processed_file.unlink()
logger.info("Reset processed IDs")
pipeline = DataIngestionPipeline()
if args.hadiths:
pipeline.ingest_hadiths(args.hadiths)
# Similar implementation for articles...
print(pipeline.generate_report())
استفاده:
# Basic ingestion
python scripts/ingest_data.py --hadiths data/raw/hadiths_data.xlsx
# Reset and re-ingest all
python scripts/ingest_data.py --hadiths data/raw/hadiths_data.xlsx --reset
# Ingest both
python scripts/ingest_data.py \
--hadiths data/raw/hadiths_data.xlsx \
--articles data/raw/dovodi_articles.xlsx
تاثیر: ⭐⭐⭐⭐⭐ (بحرانی)
تلاش رفع: 2-3 روز
4. عدم وجود Observability و Monitoring (Langfuse)
مشکل:
- هیچ logging framework ندارد
- هیچ tracing ندارد
- نمیتوان performance را track کرد
- دیباگ مشکلات بسیار سخت است
- نمیتوان user feedback را جمعآوری کرد
راهحل: ادغام Langfuse
فایل: src/core/monitoring.py
from typing import Optional, Dict, Any
from functools import wraps
import time
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from src.core.config import settings
import logging
logger = logging.getLogger(__name__)
# Initialize Langfuse
langfuse = None
if settings.LANGFUSE_PUBLIC_KEY and settings.LANGFUSE_SECRET_KEY:
langfuse = Langfuse(
public_key=settings.LANGFUSE_PUBLIC_KEY,
secret_key=settings.LANGFUSE_SECRET_KEY,
host=settings.LANGFUSE_HOST
)
logger.info("✅ Langfuse monitoring enabled")
else:
logger.warning("⚠️ Langfuse not configured - monitoring disabled")
def trace_agent_run(func):
"""Decorator to trace agent runs with Langfuse"""
@wraps(func)
def wrapper(*args, **kwargs):
if not langfuse:
return func(*args, **kwargs)
# Create trace
trace = langfuse.trace(
name=f"agent_run_{func.__name__}",
metadata={
"function": func.__name__,
"timestamp": time.time()
}
)
try:
start_time = time.time()
result = func(*args, **kwargs)
duration = time.time() - start_time
# Log success
trace.update(
output=str(result)[:1000], # Limit output size
metadata={
"duration_seconds": duration,
"status": "success"
}
)
return result
except Exception as e:
# Log error
trace.update(
metadata={
"status": "error",
"error": str(e)
}
)
raise
return wrapper
class RAGMonitor:
"""Monitor RAG pipeline with Langfuse"""
@staticmethod
@observe(name="rag_search")
def track_search(query: str, results: list, duration: float):
"""Track vector search operation"""
langfuse_context.update_current_observation(
input=query,
output={
"num_results": len(results),
"results": [r.content[:100] for r in results] # Preview only
},
metadata={
"duration_seconds": duration,
"search_type": "vector_similarity"
}
)
@staticmethod
@observe(name="rag_generation")
def track_generation(
prompt: str,
response: str,
model: str,
duration: float,
tokens_used: Optional[int] = None
):
"""Track LLM generation"""
langfuse_context.update_current_observation(
input=prompt[:500], # Limit size
output=response,
model=model,
metadata={
"duration_seconds": duration,
"tokens_used": tokens_used
}
)
@staticmethod
@observe(name="rag_pipeline")
def track_full_pipeline(
user_question: str,
context: str,
final_answer: str,
metadata: Dict[str, Any]
):
"""Track complete RAG pipeline"""
langfuse_context.update_current_observation(
input=user_question,
output=final_answer,
metadata={
**metadata,
"context_length": len(context),
"answer_length": len(final_answer)
}
)
# Usage in RAG pipeline:
@observe(name="build_rag_prompt")
def build_rag_prompt(user_question: str) -> str:
"""RAG pipeline with Langfuse tracking"""
import time
# Search phase
search_start = time.time()
relevant_docs = knowledge_base.search(query=user_question, max_results=3)
search_duration = time.time() - search_start
RAGMonitor.track_search(
query=user_question,
results=relevant_docs,
duration=search_duration
)
# Build context
context_str = "\n\n".join([doc.content for doc in relevant_docs])
# Build final prompt
final_prompt = f"Context:\n{context_str}\n\nQuestion: {user_question}"
return final_prompt
استفاده در Agent:
from src.core.monitoring import trace_agent_run, RAGMonitor, observe
class IslamicScholarAgent(Agent):
@observe(name="agent_run")
def run(self, message, **kwargs):
"""Override run with monitoring"""
rag_prompt = build_rag_prompt(message)
# Track generation
import time
gen_start = time.time()
result = super().run(rag_prompt, **kwargs)
gen_duration = time.time() - gen_start
RAGMonitor.track_generation(
prompt=rag_prompt,
response=result.content,
model=self.model.id,
duration=gen_duration
)
return result
Dashboard در Langfuse:
- 📊 تعداد queries در روز/هفته/ماه
- ⏱️ متوسط response time
- 💰 Token usage و هزینهها
- 🔍 کیفیت پاسخها (با user feedback)
- 🐛 Error tracking
- 📈 Performance trends
تاثیر: ⭐⭐⭐⭐⭐ (بحرانی برای پروداکشن)
تلاش رفع: 1-2 روز
🟡 مشکلات مهم (Major Issues)
5. کد تکراری و غیراستاندارد
مثالهای کد تکراری:
# فایل scholar_rag_pipelined.py - خطوط 1-73 و 74-146 دقیقاً یکسان!
# این کل کد را دوبار copy-paste کرده
# همچنین:
# - app.py و app_local.py - کدهای مشابه با تفاوتهای جزئی
# - ingest_knowledge.py و ingest_excel.py - منطق مشابه
# - چندین فایل test که هیچ consistency ندارند
راهحل:
- حذف فایلهای تکراری
- استفاده از inheritance و composition
- ایجاد base classes
- DRY principle
تاثیر: ⭐⭐⭐
تلاش رفع: 1 روز
6. مدیریت Configuration ضعیف
مشکلات:
# Hardcoded در جاهای مختلف:
db_url = "postgresql+psycopg://ai:ai@localhost:5532/ai" # ❌
qdrant_url = "http://localhost:6333" # ❌
qdrant_url = "http://127.0.0.1:6333" # ❌ مقدار متفاوت!
qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333") # ❌ سومین مقدار!
راهحل:
- استفاده از Pydantic Settings
- فایلهای .env مجزا برای هر environment
- validation و type checking
- مستندسازی همه environment variables
تاثیر: ⭐⭐⭐⭐
تلاش رفع: 1 روز
7. عدم وجود Test Framework مناسب
مشکل فعلی:
# test files are just scripts:
if __name__ == "__main__":
test_agent_rag() # ❌ No assertions
# ❌ No test discovery
# ❌ No fixtures
# ❌ No coverage reports
راهحل: استفاده از pytest
فایل: tests/conftest.py
import pytest
from src.core.config import settings
from src.models.factory import ModelFactory
from src.knowledge.vector_store import get_vector_store
@pytest.fixture
def model_factory():
"""Provide model factory for tests"""
return ModelFactory(config_path="config/models.test.yaml")
@pytest.fixture
def vector_store():
"""Provide test vector store"""
# Use in-memory or test database
return get_vector_store(collection_name="test_collection")
@pytest.fixture
def sample_hadith():
"""Sample hadith for testing"""
return {
'title': 'Test Hadith',
'arabic_text': 'نص عربي',
'translation': 'English translation',
'source_info': 'Test Source'
}
فایل: tests/unit/test_rag.py
import pytest
from src.knowledge.rag_pipeline import build_rag_prompt
def test_build_rag_prompt_with_results(vector_store, sample_hadith):
"""Test RAG prompt building when results are found"""
# Setup
vector_store.add_content(sample_hadith)
# Execute
prompt = build_rag_prompt("What is this hadith about?")
# Assert
assert "Context:" in prompt
assert sample_hadith['translation'] in prompt
assert "Question:" in prompt
def test_build_rag_prompt_no_results(vector_store):
"""Test RAG prompt building when no results found"""
prompt = build_rag_prompt("Something not in database")
assert "No information found" in prompt
@pytest.mark.parametrize("query,expected_in_response", [
("prayer", "salah"),
("fasting", "ramadan"),
("charity", "zakat"),
])
def test_rag_responses(query, expected_in_response, agent):
"""Test various query types"""
response = agent.run(query)
assert expected_in_response.lower() in response.content.lower()
فایل: pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--cov=src
--cov-report=html
--cov-report=term-missing
markers =
unit: Unit tests
integration: Integration tests
slow: Slow tests
اجرا:
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/unit/test_rag.py
# Run tests matching pattern
pytest -k "test_rag"
# Run only unit tests
pytest -m unit
تاثیر: ⭐⭐⭐⭐
تلاش رفع: 2 روز
8. Docker Configuration برای Production مناسب نیست
مشکلات:
# Dockerfile - Current issues:
FROM python:3.9 # ❌ Not pinned version, security risk
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt # ❌ No layer caching optimization
COPY app/ ./app/ # ❌ Copies everything including tests
EXPOSE 7777
CMD ["python", "app/test_agentos_simple.py"] # ❌❌ Running TEST file in production!
راهحل:
فایل: docker/Dockerfile.prod
# Multi-stage build for production
FROM python:3.11-slim as builder
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
WORKDIR /build
# Install dependencies in separate layer for caching
COPY requirements.txt .
RUN pip install --user -r requirements.txt
# Production stage
FROM python:3.11-slim
# Create non-root user for security
RUN useradd -m -u 1000 appuser && \
mkdir -p /app /app/logs /app/data && \
chown -R appuser:appuser /app
# Copy dependencies from builder
COPY --from=builder --chown=appuser:appuser /root/.local /home/appuser/.local
# Set working directory
WORKDIR /app
# Copy only production code (no tests)
COPY --chown=appuser:appuser src/ ./src/
COPY --chown=appuser:appuser config/ ./config/
COPY --chown=appuser:appuser scripts/health_check.py ./scripts/
# Switch to non-root user
USER appuser
# Add local bin to PATH
ENV PATH=/home/appuser/.local/bin:$PATH
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD python scripts/health_check.py || exit 1
# Expose port
EXPOSE 7777
# Run application
CMD ["python", "-m", "uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "7777"]
فایل: docker/Dockerfile.dev
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
# Install dev dependencies
COPY requirements.txt requirements-dev.txt ./
RUN pip install -r requirements.txt -r requirements-dev.txt
# Copy all code (including tests) for development
COPY . .
# Development with hot reload
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "7777", "--reload"]
فایل: scripts/health_check.py
#!/usr/bin/env python3
"""Health check script for Docker"""
import sys
import requests
try:
response = requests.get("http://localhost:7777/health", timeout=5)
if response.status_code == 200:
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
print(f"Health check failed: {e}")
sys.exit(1)
تاثیر: ⭐⭐⭐⭐
تلاش رفع: 1 روز
9. عدم وجود CI/CD Pipeline مناسب
مشکل فعلی:
// Jenkinsfile - فقط deploy میکند:
sh 'git pull origin master' # ❌ No tests
sh 'docker compose up -d --build' # ❌ No validation
// ❌ No rollback strategy
// ❌ No smoke tests after deployment
راهحل:
فایل: .github/workflows/ci.yml (اگر GitHub استفاده میکنید)
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: test
POSTGRES_DB: testdb
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
qdrant:
image: qdrant/qdrant:latest
ports:
- 6333:6333
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run linting
run: |
black --check src/
flake8 src/
mypy src/
- name: Run tests
env:
DATABASE_URL: postgresql://postgres:test@localhost/testdb
QDRANT_URL: http://localhost:6333
run: |
pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -f docker/Dockerfile.prod -t islamic-scholar-agent:${{ github.sha }} .
- name: Run security scan
uses: aquasecurity/trivy-action@master
with:
image-ref: islamic-scholar-agent:${{ github.sha }}
format: 'sarif'
output: 'trivy-results.sarif'
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
run: |
# Your deployment logic here
echo "Deploying..."
**تاثیر:** ⭐⭐⭐⭐
**تلاش رفع:** 2 روز
---
## 🟢 مشکلات جزئی (Minor Issues)
### 10. عدم وجود Logging مناسب
**راهحل:**
**فایل: `src/core/logging.py`**
```python
import logging
import sys
from pathlib import Path
from logging.handlers import RotatingFileHandler
from src.core.config import settings
def setup_logging():
"""Configure application logging"""
# Create logs directory
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# Root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Format
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler with rotation
file_handler = RotatingFileHandler(
log_dir / "app.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Error file handler
error_handler = RotatingFileHandler(
log_dir / "error.log",
maxBytes=10*1024*1024,
backupCount=5
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
return logger
تاثیر: ⭐⭐⭐
تلاش رفع: 4 ساعت
11. عدم وجود Documentation مناسب
فایلهای مورد نیاز:
- README.md - نمای کلی پروژه
- docs/ARCHITECTURE.md - معماری سیستم
- docs/API.md - مستندات API
- docs/DEPLOYMENT.md - راهنمای دیپلویمنت
- docs/DEVELOPMENT.md - راهنمای توسعه
- docs/TROUBLESHOOTING.md - رفع مشکلات رایج
تاثیر: ⭐⭐⭐
تلاش رفع: 2 روز
12. عدم وجود Environment Variables Documentation
فایل: .env.example
# Database Configuration
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
# Vector Database - Choose one
VECTOR_DB_TYPE=qdrant # or "pgvector"
# Qdrant Configuration (if using Qdrant)
QDRANT_URL=http://localhost:6333
QDRANT_COLLECTION=islamic_knowledge
# PgVector Configuration (if using PgVector)
PGVECTOR_TABLE=islamic_knowledge
# LLM Configuration
LLM_MODEL_NAME=deepseek_r1 # See config/models.yaml for options
OPENROUTER_API_KEY=your_openrouter_key_here
OPENAI_API_KEY=your_openai_key_here # Optional
# Embedding Configuration
EMBEDDING_MODEL=all-MiniLM-L6-v2
EMBEDDING_BATCH_SIZE=100
EMBEDDING_DIMENSIONS=384
# Monitoring (Optional but recommended for production)
LANGFUSE_PUBLIC_KEY=your_public_key
LANGFUSE_SECRET_KEY=your_secret_key
LANGFUSE_HOST=https://cloud.langfuse.com
# Application
PORT=7777
LOG_LEVEL=INFO
ENVIRONMENT=production # or "development", "staging"
# Data Ingestion
DATA_DIR=data/raw
BATCH_SIZE=50
MAX_RETRIES=3
تاثیر: ⭐⭐
تلاش رفع: 2 ساعت
📊 خلاصه اولویتبندی
اولویت 1 (فوری - هفته اول):
- ✅ بازسازی ساختار پروژه
- ✅ ایجاد لایه انتزاع LLM (Model Factory)
- ✅ راهاندازی Langfuse monitoring
- ✅ اصلاح Dockerfile و docker-compose
اولویت 2 (مهم - هفته دوم):
- ✅ بازنویسی Data Ingestion Pipeline
- ✅ راهاندازی pytest و نوشتن tests
- ✅ اصلاح CI/CD pipeline
- ✅ Configuration management با Pydantic
اولویت 3 (بهبود - هفته سوم):
- ✅ Logging framework
- ✅ مستندسازی کامل
- ✅ Security improvements
- ✅ Performance optimization
🎯 Checklist آمادگی Production
### Infrastructure
- [ ] ساختار فولدرها اصلاح شده
- [ ] Dockerfile production-ready
- [ ] docker-compose.yml بهینهسازی شده
- [ ] Health checks فعال
- [ ] Resource limits تنظیم شده
### Code Quality
- [ ] هیچ کد تکراری وجود ندارد
- [ ] همه فایلهای test حذف/منتقل شدند
- [ ] Code style consistent است (black, flake8)
- [ ] Type hints اضافه شده (mypy)
- [ ] Docstrings برای همه functions
### Configuration
- [ ] همه configs از environment variables میآیند
- [ ] .env.example بهروز است
- [ ] model configs در YAML مجزا
- [ ] secrets در environment variables (نه hardcoded)
### Testing
- [ ] pytest راهاندازی شده
- [ ] Unit tests نوشته شده (coverage > 80%)
- [ ] Integration tests موجود
- [ ] CI/CD تستها را اجرا میکند
### Monitoring
- [ ] Langfuse integrate شده
- [ ] Logging framework فعال
- [ ] Error tracking راهاندازی شده
- [ ] Performance metrics جمعآوری میشوند
### Data Pipeline
- [ ] Data validation پیادهسازی شده
- [ ] Error handling برای هر row
- [ ] Progress tracking موجود
- [ ] Resumable pipeline (tracking processed IDs)
- [ ] Comprehensive logging
### Security
- [ ] No secrets in code
- [ ] Non-root user در Docker
- [ ] Security scanning در CI/CD
- [ ] Input validation everywhere
- [ ] Rate limiting (if public API)
### Deployment
- [ ] CI/CD pipeline تست میکند
- [ ] Rollback strategy موجود
- [ ] Smoke tests بعد از deploy
- [ ] Monitoring alerts تنظیم شده
- [ ] Backup strategy
💡 توصیههای اضافی
1. Rate Limiting برای API
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/agui")
@limiter.limit("10/minute") # 10 requests per minute per IP
async def agui_endpoint(request: Request):
...
2. Caching برای Embeddings
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_embedding(text: str):
"""Cache embeddings to avoid recomputation"""
return embedder.encode(text)
3. Async Processing برای بهتر شدن Performance
import asyncio
from typing import List
async def process_documents_async(documents: List[str]):
"""Process multiple documents in parallel"""
tasks = [process_single_doc(doc) for doc in documents]
return await asyncio.gather(*tasks)
4. Database Connection Pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Verify connections before use
)
🚨 نتیجهگیری
این پروژه در حال حاضر فاقد استانداردهای پروداکشن است و نیازمند بازسازی قابل توجه میباشد. مشکلات عمده شامل:
- ساختار نامنظم - فایلها به صورت غیرحرفهای سازماندهی شدهاند
- Hardcoded configs - عدم انعطافپذیری برای تغییرات
- عدم monitoring - غیرقابل debug در production
- Pipeline نامطمئن - data ingestion fragile است
- عدم testing - خطر بالای bugs در production
با وجود این، معماری اصلی RAG خوب است و با 3-4 هفته کار میتوان آن را به یک سیستم production-ready تبدیل کرد.