19 KiB
اتصال Langfuse و سیستم Tracing
مقدمه
در پروژههای مبتنی بر LLM، مشاهدهپذیری (Observability) اهمیت بالایی دارد. بدون آن نمیدانیم:
- هر درخواست چقدر توکن مصرف کرده و هزینه واقعی چقدر بوده
- چه دادهای از RAG به مدل رسیده
- کدام کاربر و Session چه سوالی پرسیده
- کیفیت پاسخها در طول زمان چگونه بوده
Langfuse یک پلتفرم Observability مخصوص LLM است که امکان ثبت trace، محاسبه هزینه و امتیازدهی به پاسخها را فراهم میکند.
مشکل اصلی
وقتی از API های مدلهای زبان به صورت Streaming استفاده میکنیم، بسیاری از API ها (مثل DeepSeek از طریق OpenRouter) مقدار usage (تعداد توکن مصرفی) را صفر برمیگردانند. این یعنی Langfuse هیچ اطلاعات هزینهای ثبت نمیکند.
همچنین، prompt واقعی که به مدل ارسال میشود فقط سوال کوتاه کاربر نیست. بلکه شامل System Prompt + RAG Context + سوال کاربر است. بدون ثبت این اطلاعات، تصویر واقعی هزینه و عملکرد سیستم قابل مشاهده نیست.
راهحل: TracingAgent
کلاس TracingAgent به عنوان یک "Smart Interceptor" عمل میکند. این کلاس بین API Route و منطق اصلی AI قرار میگیرد و سه مشکل اساسی را حل میکند:
- Observability: تزریق
user_idوsession_idبه Langfuse - Accuracy: رفع باگ "0 Token Usage" با شمارش دستی توکنها توسط
tiktoken - Completeness: ثبت prompt واقعی RAG (نه فقط سوال کوتاه کاربر) برای محاسبه هزینه واقعی
معماری
جایگاه TracingAgent در سیستم
graph TD
A["درخواست کاربر (API)"] --> B["TracingAgent.arun()"]
subgraph "TracingAgent - Smart Interceptor"
B --> C["ثبت user_id / session_id"]
C --> D["Agent اصلی اجرا میشود"]
D --> E["pre_hooks: guardrails + config + RAG"]
E --> F["مدل زبان (LLM)"]
F --> G["پاسخ Streaming به کاربر"]
G --> H["شمارش توکنها با tiktoken"]
H --> I["ارسال دادهها به Langfuse"]
I --> J["امتیازدهی خودکار"]
end
K["rag_prompt_var (ContextVar)"] -.->|انتقال RAG prompt| H
جریان داده
درخواست API
│
▼
┌─────────────────────────────────────┐
│ TracingAgent.arun() │
│ ───────────────────────────────── │
│ 1. ذخیره user_id, session_id │
│ 2. ذخیره input_message │
│ 3. ذخیره system_prompt │
│ 4. ریست rag_prompt_var │
└───────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ _stream_wrapper() [@observe] │
│ ───────────────────────────────── │
│ • تگگذاری trace با user/session │
│ • اجرای super().arun (Agent اصلی) │
│ • جمعآوری chunk ها │
│ • تشخیص RunCompleted │
│ • yield هر chunk به کاربر │
└───────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ محاسبه و ارسال به Langfuse │
│ ───────────────────────────────── │
│ • خواندن rag_prompt_var │
│ • شمارش توکن input + output │
│ • ارسال usage به Langfuse │
│ • امتیازدهی (scoring) │
└─────────────────────────────────────┘
پیادهسازی
فایلهای مرتبط
| فایل | مسئولیت |
|---|---|
src/agents/tracing_agent.py |
کلاس TracingAgent - interceptor اصلی |
src/agents/base_agent.py |
ساخت Agent با TracingAgent به جای Agent معمولی |
src/utils/shared_context.py |
متغیر rag_prompt_var برای انتقال RAG prompt بین لایهها |
src/utils/search_knowledge.py |
ذخیره prompt نهایی RAG در rag_prompt_var |
src/models/factory.py |
ساخت مدل زبان از فایل تنظیمات |
بخش ۱: ابزارهای پایه و مقداردهی اولیه
فایل: src/agents/tracing_agent.py
import tiktoken
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from src.utils.shared_context import rag_prompt_var
langfuse_client = Langfuse()
| ابزار | نقش |
|---|---|
tiktoken |
شمارنده توکن: چون API در حالت streaming مقدار usage را صفر برمیگرداند، خودمان توکنها را میشماریم |
rag_prompt_var |
پل ارتباطی: جستجوی RAG در عمق hook ها اتفاق میافتد، اما باید توکنهایش را در TracingAgent (لایه بالاتر) بشماریم. این ContextVar داده را بین این دو لایه منتقل میکند |
langfuse_client |
کلاینت اصلی: langfuse_context (دکوراتور) از scoring پشتیبانی نمیکند، بنابراین یک client کامل برای ارسال score ها ساخته میشود |
بخش ۲: شمارنده توکن
def _count_tokens(self, text: str, model: str = "gpt-4o") -> int:
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
Fallback: اگر مدل ناشناخته باشد (مثلاً deepseek-v3)، از cl100k_base استفاده میشود. این tokenizer استاندارد GPT-4 است و به عنوان تخمین صنعتی برای اکثر مدلها مناسب است.
بخش ۳: نقطه ورود Polymorphic - متد arun
# بدون async!
def arun(self, *args, **kwargs):
چرا def معمولی و نه async def؟
روتر Agno خیلی سختگیر است:
- اگر
stream=Trueباشد، یک Generator (چیزی که بتوان رویش loop زد) انتظار دارد - اگر
stream=Falseباشد، یک Coroutine (چیزی که بتوانawaitکرد) انتظار دارد
با تعریف arun به صورت def معمولی (نه async def)، میتوانیم داخل تابع تصمیم بگیریم کدام نوع را برگردانیم. اگر async def بود، Python آن را خودکار به coroutine تبدیل میکرد و در حالت streaming با خطا مواجه میشدیم.
بخش ۴: پیشپردازش ورودی
input_message = ""
if "message" in kwargs: input_message = kwargs["message"]
elif args: input_message = args[0]
system_prompt = "\n".join(self.instructions) if self.instructions else ""
rag_prompt_var.set("") # ریست
| عملیات | توضیح |
|---|---|
| Input Capture | پیام کاربر فوراً ذخیره میشود تا برای logging آماده باشد |
| System Prompt | دستورالعملهای Agent (Adab، Culture و ...) برای محاسبه هزینه استخراج میشوند |
| Reset | rag_prompt_var پاک میشود تا داده درخواست قبلی اشتباهاً استفاده نشود |
بخش ۵: Streaming Wrapper - هسته اصلی
این بخش کل فرایند streaming را درون یک Langfuse Observation قرار میدهد.
الف - تگگذاری Trace
@observe(as_type="generation", name="Islamic Scholar Stream")
async def _stream_wrapper():
if user_id: langfuse_context.update_current_trace(user_id=str(user_id))
if session_id: langfuse_context.update_current_trace(session_id=str(session_id))
بدون این مرحله، trace ها در Langfuse ناشناس میمانند. با تگگذاری، میتوان مصرف هر کاربر و هر session را جداگانه دید.
ب - حلقه De-Duplication
async for chunk in super(TracingAgent, self).arun(*args, **kwargs):
event_type = getattr(chunk, "event", "")
content = getattr(chunk, "content", "") or ""
if event_type == "RunCompleted":
final_event_content = content # متن نهایی تمیز
elif isinstance(content, str) and content:
full_content += content # تکههای کوچک جمع میشوند
yield chunk
مشکل: Agno پاسخ را به صورت تکههای کوچک stream میکند (مثلاً "سلا", "م ", "علی", "کم") و سپس یک event نهایی RunCompleted ارسال میکند که متن کامل و تمیز را دارد ("سلام علیکم").
راهحل: همه chunk ها را به کاربر yield میکنیم (تجربه real-time حفظ شود)، اما برای ثبت در Langfuse از final_event_content استفاده میکنیم چون تمیزتر است و از تکرار متن در log جلوگیری میکند.
بخش ۶: محاسبه توکن و ارسال به Langfuse
# 1. خواندن RAG prompt از ContextVar
actual_rag_prompt = rag_prompt_var.get()
# 2. بازسازی prompt واقعی
if actual_rag_prompt:
full_input_text = f"{system_prompt}\n{actual_rag_prompt}"
else:
full_input_text = f"{system_prompt}\n{input_message}"
# 3. محاسبه توکنها
input_count = self._count_tokens(full_input_text)
output_count = self._count_tokens(final_output)
total_count = input_count + output_count
# 4. ارسال به Langfuse
update_payload = {
"output": final_output,
"input": actual_rag_prompt if actual_rag_prompt else input_message,
"usage": {
"input": input_count,
"output": output_count,
"total": total_count
},
"model": "deepseek-ai/deepseek-v3.1"
}
langfuse_context.update_current_observation(**update_payload)
مکانیزم rag_prompt_var - پل ارتباطی بین لایهها
این یکی از مهمترین بخشهای طراحی است. مسئله اینجاست:
- RAG prompt در
build_rag_prompt()(داخلrag_injection_hook، در عمق Agent) ساخته میشود - شمارش توکن باید در
TracingAgent(لایه بیرونی، بالاتر از Agent) انجام شود
این دو لایه مستقیماً به هم دسترسی ندارند. ContextVar یک متغیر thread-safe و async-safe است که داده را بین لایههای مختلف یک request منتقل میکند.
فایل: src/utils/shared_context.py
from contextvars import ContextVar
rag_prompt_var = ContextVar("rag_prompt_var", default="")
فایل: src/utils/search_knowledge.py (انتهای تابع build_rag_prompt)
# ذخیره prompt نهایی در ContextVar
rag_prompt_var.set(final_prompt)
return final_prompt
چرا ContextVar و نه یک متغیر global ساده؟
| متغیر global | ContextVar | |
|---|---|---|
| thread-safe | خیر - در درخواستهای همزمان داده قاطی میشود | بله - هر request مقدار مستقل خود را دارد |
| async-safe | خیر | بله - با asyncio سازگار است |
| مناسب وبسرور | خیر | بله |
محاسبه هزینه واقعی
هزینه واقعی = tokens(System Prompt + RAG Context + سوال کاربر) + tokens(پاسخ مدل)
بدون TracingAgent، Langfuse فقط سوال کوتاه کاربر (مثلاً "حکم روزه مسافر چیست؟") را میبیند. اما prompt واقعی شامل System Prompt (دستورالعملهای Adab، Culture و ...)، context بازیابی شده از RAG (ممکن است هزاران توکن باشد) و سوال کاربر است.
بخش ۷: امتیازدهی خودکار (Scoring)
current_trace_id = langfuse_context.get_current_trace_id()
if current_trace_id:
langfuse_client.score(
trace_id=current_trace_id,
name="completeness",
value=1.0 if len(final_output) > 50 else 0.0,
comment="Auto-scored by TracingAgent"
)
چون داخل wrapper هستیم، Trace ID در دسترس است. از langfuse_client (نه langfuse_context) برای ارسال score استفاده میشود.
منطق فعلی: اگر طول پاسخ بیشتر از 50 کاراکتر باشد، امتیاز 1.0 (کامل) و در غیر این صورت 0.0 (ناقص). این منطق ساده قابل جایگزینی با بررسیهای پیچیدهتر است (مثلاً بررسی regex، وجود کلمات کلیدی، یا حتی ارزیابی توسط یک مدل دیگر).
نحوه اتصال TracingAgent به سیستم
ثبت در Agent
فایل: src/agents/base_agent.py
from src.agents.tracing_agent import TracingAgent
class IslamicScholarAgent:
def __init__(self, model, knowledge_base, custom_instructions=None, db_url=None):
# ...
self.agent = TracingAgent( # ← به جای Agent معمولی
name="Islamic Scholar Agent",
model=model,
instructions=self.custom_instructions,
pre_hooks=[
PromptInjectionGuardrail(),
InputLimitGuardrail(),
sync_config_hook,
rag_injection_hook,
],
# ...
)
نکته کلیدی: TracingAgent جایگزین Agent شده. تمام عملکرد Agent حفظ میشود، فقط لایه tracing روی آن اضافه شده.
زنجیره اجرا
API Route
│ POST /agents/islamic-scholar-agent/runs
▼
TracingAgent.arun()
│ ذخیره metadata + ریست rag_prompt_var
▼
_stream_wrapper() [@observe → Langfuse trace]
│ تگگذاری user_id / session_id
▼
super().arun() → Agent اصلی Agno
│
├── pre_hooks:
│ ├── PromptInjectionGuardrail
│ ├── InputLimitGuardrail
│ ├── sync_config_hook (پرامپت از دیتابیس)
│ └── rag_injection_hook → build_rag_prompt()
│ └── rag_prompt_var.set(final_prompt) ← ذخیره در ContextVar
│
├── LLM Call (streaming)
│ └── chunks → yield به کاربر
│
▼
پس از اتمام stream:
├── rag_prompt_var.get() ← خواندن RAG prompt
├── tiktoken: شمارش توکن input + output
├── langfuse_context.update_current_observation(usage=...)
└── langfuse_client.score(completeness=...)
تنظیمات محیطی
| متغیر | توضیح | الزامی |
|---|---|---|
LANGFUSE_PUBLIC_KEY |
کلید عمومی Langfuse | بله |
LANGFUSE_SECRET_KEY |
کلید خصوصی Langfuse | بله |
LANGFUSE_HOST |
آدرس سرور Langfuse | بله |
Langfuse client به صورت خودکار این متغیرها را از محیط میخواند.
دادههایی که در Langfuse ثبت میشوند
هر Trace شامل:
| فیلد | مقدار | منبع |
|---|---|---|
user_id |
شناسه کاربر | از API request |
session_id |
شناسه نشست | از API request |
input |
prompt واقعی (RAG + سوال) | از rag_prompt_var |
output |
پاسخ نهایی مدل | از RunCompleted event |
usage.input |
تعداد توکن ورودی | محاسبه با tiktoken |
usage.output |
تعداد توکن خروجی | محاسبه با tiktoken |
usage.total |
مجموع توکنها | input + output |
model |
نام مدل | تنظیم دستی |
هر Score شامل:
| فیلد | مقدار |
|---|---|
name |
completeness |
value |
1.0 (اگر طول > 50) یا 0.0 |
comment |
Auto-scored by TracingAgent |
خلاصه مشکلات حلشده
| مشکل | بدون TracingAgent | با TracingAgent |
|---|---|---|
| Token usage در streaming | صفر (0) | محاسبه دقیق با tiktoken |
| هویت کاربر در trace | ناشناس | user_id + session_id ثبت میشود |
| ورودی واقعی مدل | فقط سوال کوتاه کاربر | System Prompt + RAG Context + سوال |
| هزینه واقعی | نامشخص | محاسبه بر اساس توکن واقعی |
| کیفیت پاسخ | غیرقابل سنجش | امتیازدهی خودکار |
| دادههای تکراری در log | stream chunks + final event | فقط متن تمیز نهایی |
نتیجهگیری
TracingAgent به عنوان یک لایه شفاف بین API و Agent عمل میکند و بدون تغییر در رفتار اصلی Agent، سه مشکل حیاتی را حل میکند:
- Observability: هر درخواست با هویت کاربر و session در Langfuse ثبت میشود
- Accuracy: باگ "0 Token Usage" در streaming با شمارش دستی توسط
tiktokenرفع میشود - Completeness: با استفاده از
rag_prompt_var(ContextVar)، prompt واقعی RAG (که ممکن است هزاران توکن باشد) ثبت میشود و هزینه واقعی هر درخواست قابل محاسبه است
این طراحی قابل گسترش است: میتوان منطق scoring را پیچیدهتر کرد، مدلهای بیشتری را پشتیبانی کرد، یا metric های سفارشی دیگری به Langfuse اضافه کرد.