# اتصال Langfuse و سیستم Tracing ## مقدمه در پروژه‌های مبتنی بر LLM، **مشاهده‌پذیری (Observability)** اهمیت بالایی دارد. بدون آن نمی‌دانیم: - هر درخواست چقدر توکن مصرف کرده و هزینه واقعی چقدر بوده - چه داده‌ای از RAG به مدل رسیده - کدام کاربر و Session چه سوالی پرسیده - کیفیت پاسخ‌ها در طول زمان چگونه بوده **Langfuse** یک پلتفرم Observability مخصوص LLM است که امکان ثبت trace، محاسبه هزینه و امتیازدهی به پاسخ‌ها را فراهم می‌کند. ### مشکل اصلی وقتی از API های مدل‌های زبان به صورت **Streaming** استفاده می‌کنیم، بسیاری از API ها (مثل DeepSeek از طریق OpenRouter) مقدار `usage` (تعداد توکن مصرفی) را **صفر** برمی‌گردانند. این یعنی Langfuse هیچ اطلاعات هزینه‌ای ثبت نمی‌کند. همچنین، prompt واقعی که به مدل ارسال می‌شود فقط سوال کوتاه کاربر نیست. بلکه شامل **System Prompt + RAG Context + سوال کاربر** است. بدون ثبت این اطلاعات، تصویر واقعی هزینه و عملکرد سیستم قابل مشاهده نیست. ### راه‌حل: TracingAgent کلاس `TracingAgent` به عنوان یک **"Smart Interceptor"** عمل می‌کند. این کلاس بین API Route و منطق اصلی AI قرار می‌گیرد و سه مشکل اساسی را حل می‌کند: 1. **Observability**: تزریق `user_id` و `session_id` به Langfuse 2. **Accuracy**: رفع باگ "0 Token Usage" با شمارش دستی توکن‌ها توسط `tiktoken` 3. **Completeness**: ثبت prompt واقعی RAG (نه فقط سوال کوتاه کاربر) برای محاسبه هزینه واقعی ## معماری ### جایگاه TracingAgent در سیستم ```mermaid graph TD A["درخواست کاربر (API)"] --> B["TracingAgent.arun()"] subgraph "TracingAgent - Smart Interceptor" B --> C["ثبت user_id / session_id"] C --> D["Agent اصلی اجرا می‌شود"] D --> E["pre_hooks: guardrails + config + RAG"] E --> F["مدل زبان (LLM)"] F --> G["پاسخ Streaming به کاربر"] G --> H["شمارش توکن‌ها با tiktoken"] H --> I["ارسال داده‌ها به Langfuse"] I --> J["امتیازدهی خودکار"] end K["rag_prompt_var (ContextVar)"] -.->|انتقال RAG prompt| H ``` ### جریان داده ``` درخواست API │ ▼ ┌─────────────────────────────────────┐ │ TracingAgent.arun() │ │ ───────────────────────────────── │ │ 1. ذخیره user_id, session_id │ │ 2. ذخیره input_message │ │ 3. ذخیره system_prompt │ │ 4. ریست rag_prompt_var │ └───────────────┬─────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ _stream_wrapper() [@observe] │ │ ───────────────────────────────── │ │ • تگ‌گذاری trace با user/session │ │ • اجرای super().arun (Agent اصلی) │ │ • جمع‌آوری chunk ها │ │ • تشخیص RunCompleted │ │ • yield هر chunk به کاربر │ └───────────────┬─────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ محاسبه و ارسال به Langfuse │ │ ───────────────────────────────── │ │ • خواندن rag_prompt_var │ │ • شمارش توکن input + output │ │ • ارسال usage به Langfuse │ │ • امتیازدهی (scoring) │ └─────────────────────────────────────┘ ``` ## پیاده‌سازی ### فایل‌های مرتبط | فایل | مسئولیت | |------|---------| | `src/agents/tracing_agent.py` | کلاس TracingAgent - interceptor اصلی | | `src/agents/base_agent.py` | ساخت Agent با TracingAgent به جای Agent معمولی | | `src/utils/shared_context.py` | متغیر `rag_prompt_var` برای انتقال RAG prompt بین لایه‌ها | | `src/utils/search_knowledge.py` | ذخیره prompt نهایی RAG در `rag_prompt_var` | | `src/models/factory.py` | ساخت مدل زبان از فایل تنظیمات | ### بخش ۱: ابزارهای پایه و مقداردهی اولیه فایل: `src/agents/tracing_agent.py` ```python import tiktoken from langfuse import Langfuse from langfuse.decorators import observe, langfuse_context from src.utils.shared_context import rag_prompt_var langfuse_client = Langfuse() ``` | ابزار | نقش | |-------|-----| | `tiktoken` | **شمارنده توکن**: چون API در حالت streaming مقدار usage را صفر برمی‌گرداند، خودمان توکن‌ها را می‌شماریم | | `rag_prompt_var` | **پل ارتباطی**: جستجوی RAG در عمق hook ها اتفاق می‌افتد، اما باید توکن‌هایش را در TracingAgent (لایه بالاتر) بشماریم. این `ContextVar` داده را بین این دو لایه منتقل می‌کند | | `langfuse_client` | **کلاینت اصلی**: `langfuse_context` (دکوراتور) از scoring پشتیبانی نمی‌کند، بنابراین یک client کامل برای ارسال score ها ساخته می‌شود | ### بخش ۲: شمارنده توکن ```python def _count_tokens(self, text: str, model: str = "gpt-4o") -> int: try: encoding = tiktoken.encoding_for_model(model) except KeyError: encoding = tiktoken.get_encoding("cl100k_base") return len(encoding.encode(text)) ``` **Fallback**: اگر مدل ناشناخته باشد (مثلاً `deepseek-v3`)، از `cl100k_base` استفاده می‌شود. این tokenizer استاندارد GPT-4 است و به عنوان تخمین صنعتی برای اکثر مدل‌ها مناسب است. ### بخش ۳: نقطه ورود Polymorphic - متد `arun` ```python # بدون async! def arun(self, *args, **kwargs): ``` **چرا `def` معمولی و نه `async def`؟** روتر Agno خیلی سخت‌گیر است: - اگر `stream=True` باشد، یک **Generator** (چیزی که بتوان رویش loop زد) انتظار دارد - اگر `stream=False` باشد، یک **Coroutine** (چیزی که بتوان `await` کرد) انتظار دارد با تعریف `arun` به صورت `def` معمولی (نه `async def`)، می‌توانیم **داخل تابع تصمیم بگیریم** کدام نوع را برگردانیم. اگر `async def` بود، Python آن را خودکار به coroutine تبدیل می‌کرد و در حالت streaming با خطا مواجه می‌شدیم. ### بخش ۴: پیش‌پردازش ورودی ```python input_message = "" if "message" in kwargs: input_message = kwargs["message"] elif args: input_message = args[0] system_prompt = "\n".join(self.instructions) if self.instructions else "" rag_prompt_var.set("") # ریست ``` | عملیات | توضیح | |--------|-------| | **Input Capture** | پیام کاربر فوراً ذخیره می‌شود تا برای logging آماده باشد | | **System Prompt** | دستورالعمل‌های Agent (Adab، Culture و ...) برای محاسبه هزینه استخراج می‌شوند | | **Reset** | `rag_prompt_var` پاک می‌شود تا داده درخواست قبلی اشتباهاً استفاده نشود | ### بخش ۵: Streaming Wrapper - هسته اصلی این بخش کل فرایند streaming را درون یک **Langfuse Observation** قرار می‌دهد. #### الف - تگ‌گذاری Trace ```python @observe(as_type="generation", name="Islamic Scholar Stream") async def _stream_wrapper(): if user_id: langfuse_context.update_current_trace(user_id=str(user_id)) if session_id: langfuse_context.update_current_trace(session_id=str(session_id)) ``` بدون این مرحله، trace ها در Langfuse **ناشناس** می‌مانند. با تگ‌گذاری، می‌توان مصرف هر کاربر و هر session را جداگانه دید. #### ب - حلقه De-Duplication ```python async for chunk in super(TracingAgent, self).arun(*args, **kwargs): event_type = getattr(chunk, "event", "") content = getattr(chunk, "content", "") or "" if event_type == "RunCompleted": final_event_content = content # متن نهایی تمیز elif isinstance(content, str) and content: full_content += content # تکه‌های کوچک جمع می‌شوند yield chunk ``` **مشکل**: Agno پاسخ را به صورت تکه‌های کوچک stream می‌کند (مثلاً `"سلا"`, `"م "`, `"علی"`, `"کم"`) و سپس یک event نهایی `RunCompleted` ارسال می‌کند که متن کامل و تمیز را دارد (`"سلام علیکم"`). **راه‌حل**: همه chunk ها را به کاربر `yield` می‌کنیم (تجربه real-time حفظ شود)، اما برای ثبت در Langfuse از `final_event_content` استفاده می‌کنیم چون تمیزتر است و از تکرار متن در log جلوگیری می‌کند. ### بخش ۶: محاسبه توکن و ارسال به Langfuse ```python # 1. خواندن RAG prompt از ContextVar actual_rag_prompt = rag_prompt_var.get() # 2. بازسازی prompt واقعی if actual_rag_prompt: full_input_text = f"{system_prompt}\n{actual_rag_prompt}" else: full_input_text = f"{system_prompt}\n{input_message}" # 3. محاسبه توکن‌ها input_count = self._count_tokens(full_input_text) output_count = self._count_tokens(final_output) total_count = input_count + output_count # 4. ارسال به Langfuse update_payload = { "output": final_output, "input": actual_rag_prompt if actual_rag_prompt else input_message, "usage": { "input": input_count, "output": output_count, "total": total_count }, "model": "deepseek-ai/deepseek-v3.1" } langfuse_context.update_current_observation(**update_payload) ``` #### مکانیزم `rag_prompt_var` - پل ارتباطی بین لایه‌ها این یکی از مهم‌ترین بخش‌های طراحی است. مسئله اینجاست: - **RAG prompt** در `build_rag_prompt()` (داخل `rag_injection_hook`، در عمق Agent) ساخته می‌شود - **شمارش توکن** باید در `TracingAgent` (لایه بیرونی، بالاتر از Agent) انجام شود این دو لایه مستقیماً به هم دسترسی ندارند. `ContextVar` یک متغیر **thread-safe و async-safe** است که داده را بین لایه‌های مختلف یک request منتقل می‌کند. فایل: `src/utils/shared_context.py` ```python from contextvars import ContextVar rag_prompt_var = ContextVar("rag_prompt_var", default="") ``` فایل: `src/utils/search_knowledge.py` (انتهای تابع `build_rag_prompt`) ```python # ذخیره prompt نهایی در ContextVar rag_prompt_var.set(final_prompt) return final_prompt ``` #### چرا `ContextVar` و نه یک متغیر global ساده؟ | | متغیر global | ContextVar | |--|-------------|------------| | **thread-safe** | خیر - در درخواست‌های همزمان داده قاطی می‌شود | بله - هر request مقدار مستقل خود را دارد | | **async-safe** | خیر | بله - با `asyncio` سازگار است | | **مناسب وب‌سرور** | خیر | بله | #### محاسبه هزینه واقعی ``` هزینه واقعی = tokens(System Prompt + RAG Context + سوال کاربر) + tokens(پاسخ مدل) ``` بدون TracingAgent، Langfuse فقط سوال کوتاه کاربر (مثلاً "حکم روزه مسافر چیست؟") را می‌بیند. اما prompt واقعی شامل System Prompt (دستورالعمل‌های Adab، Culture و ...)، context بازیابی شده از RAG (ممکن است هزاران توکن باشد) و سوال کاربر است. ### بخش ۷: امتیازدهی خودکار (Scoring) ```python current_trace_id = langfuse_context.get_current_trace_id() if current_trace_id: langfuse_client.score( trace_id=current_trace_id, name="completeness", value=1.0 if len(final_output) > 50 else 0.0, comment="Auto-scored by TracingAgent" ) ``` چون داخل wrapper هستیم، Trace ID در دسترس است. از `langfuse_client` (نه `langfuse_context`) برای ارسال score استفاده می‌شود. **منطق فعلی**: اگر طول پاسخ بیشتر از 50 کاراکتر باشد، امتیاز 1.0 (کامل) و در غیر این صورت 0.0 (ناقص). این منطق ساده قابل جایگزینی با بررسی‌های پیچیده‌تر است (مثلاً بررسی regex، وجود کلمات کلیدی، یا حتی ارزیابی توسط یک مدل دیگر). ## نحوه اتصال TracingAgent به سیستم ### ثبت در Agent فایل: `src/agents/base_agent.py` ```python from src.agents.tracing_agent import TracingAgent class IslamicScholarAgent: def __init__(self, model, knowledge_base, custom_instructions=None, db_url=None): # ... self.agent = TracingAgent( # ← به جای Agent معمولی name="Islamic Scholar Agent", model=model, instructions=self.custom_instructions, pre_hooks=[ PromptInjectionGuardrail(), InputLimitGuardrail(), sync_config_hook, rag_injection_hook, ], # ... ) ``` **نکته کلیدی**: `TracingAgent` جایگزین `Agent` شده. تمام عملکرد Agent حفظ می‌شود، فقط لایه tracing روی آن اضافه شده. ### زنجیره اجرا ``` API Route │ POST /agents/islamic-scholar-agent/runs ▼ TracingAgent.arun() │ ذخیره metadata + ریست rag_prompt_var ▼ _stream_wrapper() [@observe → Langfuse trace] │ تگ‌گذاری user_id / session_id ▼ super().arun() → Agent اصلی Agno │ ├── pre_hooks: │ ├── PromptInjectionGuardrail │ ├── InputLimitGuardrail │ ├── sync_config_hook (پرامپت از دیتابیس) │ └── rag_injection_hook → build_rag_prompt() │ └── rag_prompt_var.set(final_prompt) ← ذخیره در ContextVar │ ├── LLM Call (streaming) │ └── chunks → yield به کاربر │ ▼ پس از اتمام stream: ├── rag_prompt_var.get() ← خواندن RAG prompt ├── tiktoken: شمارش توکن input + output ├── langfuse_context.update_current_observation(usage=...) └── langfuse_client.score(completeness=...) ``` ## تنظیمات محیطی | متغیر | توضیح | الزامی | |--------|--------|--------| | `LANGFUSE_PUBLIC_KEY` | کلید عمومی Langfuse | بله | | `LANGFUSE_SECRET_KEY` | کلید خصوصی Langfuse | بله | | `LANGFUSE_HOST` | آدرس سرور Langfuse | بله | Langfuse client به صورت خودکار این متغیرها را از محیط می‌خواند. ## داده‌هایی که در Langfuse ثبت می‌شوند ### هر Trace شامل: | فیلد | مقدار | منبع | |------|-------|------| | `user_id` | شناسه کاربر | از API request | | `session_id` | شناسه نشست | از API request | | `input` | prompt واقعی (RAG + سوال) | از `rag_prompt_var` | | `output` | پاسخ نهایی مدل | از `RunCompleted` event | | `usage.input` | تعداد توکن ورودی | محاسبه با `tiktoken` | | `usage.output` | تعداد توکن خروجی | محاسبه با `tiktoken` | | `usage.total` | مجموع توکن‌ها | `input + output` | | `model` | نام مدل | تنظیم دستی | ### هر Score شامل: | فیلد | مقدار | |------|-------| | `name` | `completeness` | | `value` | `1.0` (اگر طول > 50) یا `0.0` | | `comment` | `Auto-scored by TracingAgent` | ## خلاصه مشکلات حل‌شده | مشکل | بدون TracingAgent | با TracingAgent | |------|-------------------|-----------------| | Token usage در streaming | صفر (0) | محاسبه دقیق با tiktoken | | هویت کاربر در trace | ناشناس | user_id + session_id ثبت می‌شود | | ورودی واقعی مدل | فقط سوال کوتاه کاربر | System Prompt + RAG Context + سوال | | هزینه واقعی | نامشخص | محاسبه بر اساس توکن واقعی | | کیفیت پاسخ | غیرقابل سنجش | امتیازدهی خودکار | | داده‌های تکراری در log | stream chunks + final event | فقط متن تمیز نهایی | ## نتیجه‌گیری `TracingAgent` به عنوان یک لایه شفاف بین API و Agent عمل می‌کند و بدون تغییر در رفتار اصلی Agent، سه مشکل حیاتی را حل می‌کند: 1. **Observability**: هر درخواست با هویت کاربر و session در Langfuse ثبت می‌شود 2. **Accuracy**: باگ "0 Token Usage" در streaming با شمارش دستی توسط `tiktoken` رفع می‌شود 3. **Completeness**: با استفاده از `rag_prompt_var` (ContextVar)، prompt واقعی RAG (که ممکن است هزاران توکن باشد) ثبت می‌شود و هزینه واقعی هر درخواست قابل محاسبه است این طراحی قابل گسترش است: می‌توان منطق scoring را پیچیده‌تر کرد، مدل‌های بیشتری را پشتیبانی کرد، یا metric های سفارشی دیگری به Langfuse اضافه کرد.