3 Commits

  1. 120
      docs/COLLECTION_NAMING.md
  2. 204
      docs/INCREMENTAL_VECTOR_SYNC.md
  3. 7
      docs/index.md
  4. 2
      imam-reza.session.sql
  5. 18
      out.md
  6. 29
      src/api/routes.py
  7. 177
      src/knowledge/sync_wiki.py
  8. 2
      src/models/factory.py
  9. 67
      src/utils/collection_name.py

120
docs/COLLECTION_NAMING.md

@ -0,0 +1,120 @@
# Collection Naming Strategy
## Why This Module Exists
When working with vector databases like Qdrant, **the name of your collection is not cosmetic — it is a data integrity contract.** Mixing incompatible vectors in the same collection silently corrupts your search results with no error or warning.
This module (`collection_name.py`) generates deterministic, self-documenting collection names that encode every critical parameter. If any parameter changes, a new name is produced, which means a new collection — preventing data corruption automatically.
---
## The Golden Rule of Vector Collections
> **You MUST create a new collection whenever the mathematical comparison between two vectors becomes invalid.**
If you compare a vector from **Model A** with a vector from **Model B**, the result is **meaningless garbage**. The numbers look real, cosine similarity returns a value, but it means nothing. There is no error. There is no crash. Your app just silently returns wrong answers.
Therefore, incompatible vectors **cannot live in the same collection.**
---
## The 4 Factors That Force a New Collection
| # | Factor | Why It Forces a New Collection | Priority |
|---|--------|-------------------------------|----------|
| 1 | **Model** | `jina-v3` vectors are numerically unrelated to `openai-v3` vectors. They live in completely different mathematical spaces. | :red_circle: Critical |
| 2 | **Dimensions** | You cannot insert a 1024-dim vector into a collection built for 768-dim. Qdrant will reject it outright. | :red_circle: Critical |
| 3 | **Distance Metric** | A collection using `Cosine` ranks results differently than one using `Dot Product`. Mixing them invalidates your ranking logic. | :orange_circle: High |
| 4 | **Chunking Strategy** | If Batch A was chunked by paragraphs and Batch B by sentences, search results become biased and inconsistent. Apples vs. oranges. | :yellow_circle: Medium |
---
## Naming Format
```
[PROJECT]_[MODEL]_[DIMENSIONS]d_[CHUNK_SIZE]t_[TYPE]
```
### Example Breakdown
```
imam_reza_jina-v3_1024d_512t_hybrid
│ │ │ │ │
│ │ │ │ └── hybrid = sparse + dense vectors enabled
│ │ │ └── 512t = 512 tokens per chunk
│ │ └── 1024d = vector dimensionality
│ └── jina-v3 = embedding model
└── imam_reza = project / domain name
```
| Segment | Purpose |
|---------|---------|
| `imam_reza` | **Project/Domain** — keeps this data isolated from other apps or datasets. |
| `jina-v3` | **Model** — the embedding brain. Changing models = new collection, always. |
| `1024d` | **Dimensions** — tells you (and Qdrant) the vector size at a glance. |
| `512t` | **Chunk size** — "512 tokens per chunk". If you later experiment with 128-token chunks, a separate collection lets you compare performance fairly. |
| `hybrid` | **Search type** — indicates sparse vectors are enabled alongside dense vectors. |
---
## How It Works
`get_collection_name()` pulls values from two sources:
1. **`config/embeddings.yaml`** — the active embedding model, its `id`, and `dimensions`.
2. **Environment variables** (`.env`) — `BASE_COLLECTION_NAME`, `EMBEDDER_DIMENSIONS`, `CHUNK_SIZE`, `IS_HYBRID`.
Every parameter can also be overridden explicitly via function arguments.
### Resolution Priority (per parameter)
| Parameter | 1st (highest) | 2nd | 3rd (fallback) |
|-----------|---------------|-----|-----------------|
| `project_name` | Explicit arg | `BASE_COLLECTION_NAME` env var | `"default_project"` |
| `model_name` | Explicit arg | `embeddings.yaml``default` key | — |
| `dimensions` | Explicit arg | `embeddings.yaml` → model config | `EMBEDDER_DIMENSIONS` env var |
| `chunk_size` | Explicit arg | `CHUNK_SIZE` env var | `500` |
| `is_hybrid` | Explicit arg | `IS_HYBRID` env var | `true` |
### Model Lookup
The `model_name` parameter is flexible — it accepts either:
- A **config key** from `embeddings.yaml` (e.g., `"jina_AI"`)
- A **model id** (e.g., `"jina-embeddings-v4"`)
---
## Usage
```python
from src.utils.collection_name import get_collection_name
# All defaults from config + env
name = get_collection_name()
# → "dovodi_collection_jina-embeddings-v4_1024d_500t_hybrid"
# Override chunk size
name = get_collection_name(chunk_size=128)
# → "dovodi_collection_jina-embeddings-v4_1024d_128t_hybrid"
# Dense-only collection
name = get_collection_name(is_hybrid=False)
# → "dovodi_collection_jina-embeddings-v4_1024d_500t_dense"
# Different model from embeddings.yaml
name = get_collection_name(model_name="openai_small")
# → "dovodi_collection_text-embedding-3-small_1536d_500t_hybrid"
```
---
## What Happens If You Don't Do This
| Scenario | What Goes Wrong |
|----------|----------------|
| You switch from `jina-v3` to `openai-v3` but keep the same collection | Old jina vectors and new openai vectors get compared. Search returns nonsense. **No error is raised.** |
| You change dimensions from 1024 to 768 | Qdrant crashes on insert — dimension mismatch. At least this one is loud. |
| You change chunk size from 512 to 128 | Short chunks get unfairly boosted in similarity scores against long chunks. Search quality degrades silently. |
| You switch from Cosine to Dot Product | Ranking logic is inverted for unnormalized vectors. Top results become bottom results. |
The naming convention makes these mistakes **structurally impossible** — if any parameter changes, a new collection name is generated, and the old data is never touched.

204
docs/INCREMENTAL_VECTOR_SYNC.md

@ -0,0 +1,204 @@
# Incremental Vector Synchronization
**PostgreSQL → Qdrant | Database-Driven Incremental Embedding Pipeline**
---
## Overview
This feature introduces a fully automated, admin-triggered pipeline that synchronizes textual data from a relational database (PostgreSQL / Django ORM) to a vector database (Qdrant). The system guarantees **zero duplicate embeddings**, **crash resilience**, and seamless **A/B testing** across different embedding models — all without manual file uploads.
| Property | Value |
| ----------------- | -------------------------------------------- |
| **Source** | PostgreSQL (Django Models) |
| **Target** | Qdrant Vector Database |
| **State Tracking**| JSONB Arrays (`embedded_in` column) |
| **Deduplication** | Deterministic Hash ID via MD5 |
| **Trigger** | Admin UI → FastAPI Background Task |
---
## Architecture
```
┌──────────────────────┐ HTTP POST ┌──────────────────────────┐
│ Django Admin UI │ ────────────────────▸ │ FastAPI Agent │
│ (EmbeddingSession) │ │ /api/sync-knowledge │
└──────────────────────┘ └────────────┬─────────────┘
▲ │
│ progress updates BackgroundTask
│ (status, %, items) │
│ ▼
┌────────┴─────────────┐ ┌──────────────────────────┐
│ PostgreSQL │ ◂────── read/update ──▸ │ run_global_embedding_ │
│ embedded_in (JSONB) │ │ sync(session_id) │
└──────────────────────┘ └────────────┬─────────────┘
embed + upsert
┌──────────────────────────┐
│ Qdrant Vector DB │
│ (hybrid search collection)│
└──────────────────────────┘
```
---
## How It Works
### 1. State Tracking via JSONB
Every syncable Django model (e.g. `Article`, `Hadith`) includes an `embedded_in` column:
```python
# Django model field
embedded_in = models.JSONField(default=list)
# Example value: ["dovodi_jina-embeddings-v4_hybrid", "dovodi_text-embedding-3-small_hybrid"]
```
**Why an array instead of a boolean?** A simple `is_embedded = True` flag cannot distinguish *which* model the row was embedded into. By storing the exact collection name, the system natively supports multiple embedding models simultaneously. Switching the active model in `config/embeddings.yaml` from `jina_AI` to `openai_small` causes every row to be detected as "missing" for the new collection — triggering a full, automatic re-sync.
**Self-healing on update:** When an admin edits the text of a record in Django admin, the overridden `save()` method resets `embedded_in` to `[]`, guaranteeing the updated content is picked up in the next sync cycle.
### 2. The Trigger Mechanism (Django → FastAPI)
A dedicated `EmbeddingSession` model in Django tracks the lifecycle of each sync operation:
| Field | Purpose |
| ----------------- | ------------------------------------------ |
| `status` | `PENDING` / `PROCESSING` / `COMPLETED` / `FAILED` |
| `total_items` | Total rows needing embedding |
| `processed_items` | Rows embedded so far |
| `progress` | Percentage (0–100) |
| `error_message` | Captured exception on failure |
When the admin saves a new session, Django sends a **non-blocking** `HTTP POST` to the FastAPI agent:
```
POST /api/sync-knowledge
Body: { "session_id": 42 }
```
The FastAPI route (`src/api/routes.py`) receives this and delegates to a `BackgroundTask`:
```python
@router.post("/api/sync-knowledge")
async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks):
background_tasks.add_task(run_global_embedding_sync, request.session_id)
return {"status": "started", "session_id": request.session_id}
```
The Django admin UI renders a **live Tailwind progress bar** powered by `processed_items / total_items`.
### 3. The Smart Background Worker
The core logic lives in `src/knowledge/sync_rag.py` → `run_global_embedding_sync(session_id)`:
**Step A — Resolve the active model:**
```python
embed_factory = EmbeddingFactory() # reads config/embeddings.yaml
embedder = embed_factory.get_embedder() # returns the default embedder
vector_db = get_qdrant_store(embedder=embedder)
active_collection_name = vector_db.collection
```
The collection name is dynamically composed as `{BASE_COLLECTION_NAME}_{model_id}_hybrid` (e.g. `dovodi_jina-embeddings-v4_hybrid`).
**Step B — Query only missing rows:**
Using PostgreSQL's native JSONB containment operator (`@>`), the system fetches *only* the rows whose `embedded_in` array does **not** contain the active collection name:
```sql
SELECT * FROM article_article
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb))
```
This is the key to incremental sync — already-embedded rows are never touched.
**Step C — Process, embed, upsert:**
For each pending row, the worker:
1. Formats the record into a continuous string (`TITLE: ... \n CONTENT: ...`)
2. Generates a **deterministic Qdrant ID** (see below)
3. Wraps it in an Agno `Document` and calls `vector_db.upsert()`
4. Updates the row's `embedded_in` array in PostgreSQL
5. Reports progress to the `EmbeddingSession` every 5 items
### 4. Deterministic Deduplication
To prevent duplicate vectors when resuming a crashed sync or re-embedding updated text, auto-generated UUIDs are **not used**. Instead, the system computes a deterministic ID:
```python
hash_input = f"{data_type}_{row.id}_{active_collection_name}" # e.g. "ARTICLE_42_dovodi_jina-embeddings-v4_hybrid"
hash_id = hashlib.md5(hash_input.encode()).hexdigest()
qdrant_id = str(uuid.UUID(hash_id)) # valid UUID from MD5
```
This ID is passed as `content_hash` to `vector_db.upsert()`, which causes Qdrant to **overwrite** any existing point with the same ID rather than creating a duplicate.
### 5. Closing the Loop
After a successful upsert, the worker appends the active collection name to the row's `embedded_in` array:
```sql
UPDATE article_article
SET embedded_in = CAST(embedded_in AS jsonb) || CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb)
WHERE id = 42
```
Once all tables are processed, the session status is set to `COMPLETED`. If any exception occurs, the status is set to `FAILED` with the error message captured.
---
## File Reference
| File | Responsibility |
| --- | --- |
| `src/knowledge/sync_rag.py` | Core sync logic — queries missing rows, embeds, upserts, updates state |
| `src/api/routes.py` | FastAPI endpoint `/api/sync-knowledge` that triggers the background task |
| `src/main.py` | Application factory — includes the API router |
| `src/knowledge/embedding_factory.py` | Reads `config/embeddings.yaml` and instantiates the correct embedder |
| `src/knowledge/vector_store.py` | Builds the Qdrant client with dynamic collection naming |
| `config/embeddings.yaml` | Declares available embedding models and the active default |
---
## Configuration
The active embedding model is controlled by `config/embeddings.yaml`:
```yaml
embeddings:
default: jina_AI # switch to "openai_small" to trigger full re-sync
models:
openai_small:
provider: "openai"
id: "text-embedding-3-small"
dimensions: 1536
api_key: ${OPENAI_API_KEY}
jina_AI:
provider: "jinaai"
id: "jina-embeddings-v4"
dimensions: 1024
api_key: ${JINA_API_KEY}
```
Changing `default` from one model to another is all that is needed — the sync worker will detect that no rows contain the new collection name and re-embed everything.
---
## Why This Design is Production-Ready
| Property | Detail |
| --- | --- |
| **Zero data duplication** | Deterministic MD5 hashing ensures the same row always maps to the same Qdrant point ID. Re-running sync never creates duplicates. |
| **Seamless model switching** | Changing the embedding model in YAML causes the system to treat all rows as "missing" for the new collection. No manual migration needed. |
| **Crash resilience** | State is committed row-by-row to PostgreSQL. If the server crashes at 99%, restarting picks up the remaining 1%. |
| **Cost efficiency** | Only un-embedded rows hit the AI API. Already-processed data is never re-sent, saving API costs. |
| **Non-blocking UI** | The Django admin fires an async HTTP call and renders live progress — the UI never freezes. |
| **Multi-table support** | The `tables_to_sync` list can be extended with new Django models without touching the core algorithm. |

7
docs/index.md

@ -0,0 +1,7 @@
# Welcome to the Dovoodi Agent Docs
This documentation covers the architecture, setup, and maintenance of the Dovoodi Agent microservice.
### Quick Links
* If you are looking for how the data syncs from Django to Qdrant, see the [Incremental Vector Sync](INCREMENTAL_VECTOR_SYNC.md) guide.
* For prompt engineering details, check the [Dynamic System Prompt](DYNAMIC_SYSTEM_PROMPT.md).

2
imam-reza.session.sql

@ -0,0 +1,2 @@
select * from wiki_wikicontent
where language_id = 69;

18
out.md
File diff suppressed because it is too large
View File

29
src/api/routes.py

@ -1,18 +1,17 @@
from typing import Optional, Any, Dict, List
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from src.agents.islamic_scholar_agent import IslamicScholarAgent
from src.models.factory import ModelFactory
from src.knowledge.embedding_factory import EmbeddingFactory
from src.knowledge.rag_pipeline import create_knowledge_base
from src.utils.load_settings import get_active_agent_config
from langfuse.decorators import observe, langfuse_context
import json
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel
from src.knowledge.sync_wiki import run_wiki_embedding_sync
router = APIRouter()
class SyncRequest(BaseModel):
session_id: int
# @router.get("/health")
# async def health_check():
# """Health check endpoint"""
# return {"status": "healthy", "agent": "Islamic Scholar Agent"}
@router.post("/api/sync-wiki-knowledge")
async def sync_wiki_knowledge(request: SyncRequest, background_tasks: BackgroundTasks):
"""
Triggers the background task to fetch Wiki HTML, convert to MD via Jina,
embed in Qdrant, and update the database.
"""
background_tasks.add_task(run_wiki_embedding_sync, request.session_id)
return {"status": "started", "session_id": request.session_id, "type": "wiki_sync"}

177
src/knowledge/sync_wiki.py

@ -0,0 +1,177 @@
import json
import hashlib
import uuid
import requests
import os
from sqlalchemy import text
from dotenv import load_dotenv
from src.utils.load_settings import engine
from src.knowledge.embedding_factory import EmbeddingFactory
from src.knowledge.vector_store import get_qdrant_store
from agno.knowledge.document import Document
load_dotenv()
JINA_API_KEY = os.getenv("JINA_API_KEY")
def get_text_from_json(json_data, target_lang='fa'):
"""Helper to safely extract exactly the target text from Django JSONField arrays."""
if not json_data or not isinstance(json_data, list):
return "Unknown"
for entry in json_data:
if isinstance(entry, dict) and entry.get('language_code') == target_lang:
return entry.get('text', 'Unknown')
if len(json_data) > 0 and isinstance(json_data[0], dict):
return json_data[0].get('text', 'Unknown')
return "Unknown"
def convert_html_to_md_jina(html_content: str, row_id: int) -> str:
"""Helper to call Jina AI and convert HTML to clean Markdown."""
headers = {
"Authorization": f"Bearer {JINA_API_KEY}",
"Accept": "application/json"
}
files = {
'file': (f'document_{row_id}.html', html_content, 'text/html')
}
try:
response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30)
response.raise_for_status()
jina_data = response.json().get('data', {})
return jina_data.get('content', '')
except Exception as e:
print(f"⚠️ [Jina Error] ID {row_id}: {e}")
return ""
def run_wiki_embedding_sync(session_id: int):
"""Background task to sync Wiki content -> Jina -> Qdrant -> Postgres"""
# 1. Initialize Vector DB
embed_factory = EmbeddingFactory()
embedder = embed_factory.get_embedder()
vector_db = get_qdrant_store(embedder=embedder)
active_collection_name = vector_db.collection
model_json_str = json.dumps([active_collection_name])
print(f"🚀 Starting Wiki Sync Pipeline for model: {active_collection_name}")
with engine.connect() as conn:
try:
# 2. Count pending Wiki items
count_query = text("""
SELECT COUNT(*) FROM wiki_wikicontent
WHERE NOT (CAST(COALESCE(embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb))
""")
total_pending = conn.execute(count_query, {"model_json": model_json_str}).scalar()
# Update session to PROCESSING
conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"),
{"t": total_pending, "id": session_id})
conn.commit()
if total_pending == 0:
print("✅ No new Wiki entries to process.")
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id})
conn.commit()
return
# 3. Fetch relational data for pending items
print(f"📥 Fetching {total_pending} pending Wiki entries from Database...")
fetch_query = text("""
SELECT
wc.id as content_id,
wc.wiki_id,
wc.language_id as lang_code,
wc.content as html_content,
w.title as wiki_titles,
a.id as author_id,
a.name as author_names,
c.id as category_id,
c.name as cat_names
FROM wiki_wikicontent wc
JOIN wiki_wiki w ON wc.wiki_id = w.id
LEFT JOIN wiki_author a ON w.author_id = a.id
JOIN wiki_wikicategory c ON w.category_id = c.id
WHERE NOT (CAST(COALESCE(wc.embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb))
""")
pending_rows = conn.execute(fetch_query, {"model_json": model_json_str}).fetchall()
processed = 0
# 4. Process each row sequentially
for row in pending_rows:
content_id = row.content_id
# A. Extract Translations
wiki_title = get_text_from_json(row.wiki_titles, 'fa')
author_name = get_text_from_json(row.author_names, 'fa')
category_name = get_text_from_json(row.cat_names, 'fa')
# B. Convert HTML to MD via Jina API
clean_text = convert_html_to_md_jina(row.html_content, content_id)
if not clean_text:
print(f"⏭️ Skipping ID {content_id} due to empty Jina response.")
continue
# C. Build the Rich Agno Document
narrative_text = (
f"CATEGORY: {category_name}\n"
f"WIKI TITLE: {wiki_title}\n"
f"AUTHOR: {author_name}\n"
f"CONTENT:\n{clean_text}"
)
payload = {
"source_type": "WIKI",
"content_id": content_id,
"wiki_id": row.wiki_id,
"wiki_title": wiki_title,
"category_id": row.category_id,
"category_name": category_name,
"author_id": row.author_id,
"author_name": author_name,
"language": row.lang_code
}
hash_id = hashlib.md5(f"WIKI_{content_id}_{active_collection_name}".encode()).hexdigest()
qdrant_id = str(uuid.UUID(hash_id))
doc = Document(id=qdrant_id, content=narrative_text, meta_data=payload)
# D. Upsert to Qdrant
vector_db.upsert(content_hash=qdrant_id, documents=[doc])
# E. Update PostgreSQL `embedded_in` column for this row
update_query = text("""
UPDATE wiki_wikicontent
SET embedded_in = CAST(COALESCE(embedded_in, '[]') AS jsonb) || CAST(:model_json AS jsonb)
WHERE id = :id
""")
conn.execute(update_query, {"model_json": model_json_str, "id": content_id})
conn.commit()
# F. Update overall progress
processed += 1
if processed % 5 == 0 or processed == total_pending:
pct = int((processed / total_pending) * 100)
conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"),
{"p": pct, "proc": processed, "id": session_id})
conn.commit()
print(f"🔄 Progress: {pct}% ({processed}/{total_pending})")
# 5. Mark Session as Completed
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id})
conn.commit()
print("🎉 Wiki Embedding Sync Completed Successfully!")
except Exception as e:
print(f"❌ Fatal Error in Wiki Sync: {str(e)}")
conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"),
{"err": str(e), "id": session_id})
conn.commit()

2
src/models/factory.py

@ -58,7 +58,7 @@ class ModelFactory:
# 3. Instantiate the correct Class
if provider_name == 'openai_like':
return OpenRouter(
return OpenAILike(
id=model_config_data['id'],
api_key=api_key,
base_url=base_url,

67
src/utils/collection_name.py

@ -0,0 +1,67 @@
import os
import yaml
from pathlib import Path
from typing import Optional
def _load_embeddings_config() -> dict:
project_root = Path(__file__).resolve().parent.parent.parent
config_path = project_root / "config" / "embeddings.yaml"
with open(config_path) as f:
content = f.read()
for key, val in os.environ.items():
content = content.replace(f"${{{key}}}", val)
return yaml.safe_load(content)
def _get_active_model_config(config: dict, model_name: Optional[str] = None) -> dict:
embeddings = config["embeddings"]
name = model_name or embeddings["default"]
models = embeddings["models"]
# Direct match on config key (e.g. "jina_AI")
if name in models:
return {**models[name], "name": name}
# Fallback: match on the model's id field (e.g. "jina-embeddings-v4")
for key, cfg in models.items():
if cfg.get("id") == name:
return {**cfg, "name": key}
raise ValueError(f"Embedding model '{name}' not found in embeddings.yaml (searched by key and id)")
def get_collection_name(
*,
project_name: Optional[str] = None,
model_name: Optional[str] = None,
dimensions: Optional[int] = None,
chunk_size: Optional[int] = None,
is_hybrid: Optional[bool] = None,
) -> str:
"""Build a descriptive, deterministic collection name.
Reads from env vars and embeddings.yaml, but every parameter
can be overridden explicitly.
Pattern: {project}_{model}_{dim}d_{chunk}t_{type}
Example: dovodi_collection_jina-embeddings-v4_1024d_500t_hybrid
"""
config = _load_embeddings_config()
model_cfg = _get_active_model_config(config, model_name)
project = project_name or os.getenv("BASE_COLLECTION_NAME", "default_project")
model_id = model_cfg["id"]
dim = dimensions or model_cfg.get("dimensions") or int(os.getenv("EMBEDDER_DIMENSIONS", "384"))
chunk = chunk_size or int(os.getenv("CHUNK_SIZE", "500"))
hybrid = is_hybrid if is_hybrid is not None else os.getenv("IS_HYBRID", "true").lower() == "true"
clean_model = model_id.split("/")[-1]
type_suffix = "hybrid" if hybrid else "dense"
return f"{project}_{clean_model}_{dim}d_{chunk}t_{type_suffix}"
if __name__ == "__main__":
print(get_collection_name())
Loading…
Cancel
Save