3 Commits
0e8bba1a76
...
f9dd62f241
9 changed files with 611 additions and 17 deletions
-
120docs/COLLECTION_NAMING.md
-
204docs/INCREMENTAL_VECTOR_SYNC.md
-
7docs/index.md
-
2imam-reza.session.sql
-
20out.md
-
29src/api/routes.py
-
177src/knowledge/sync_wiki.py
-
2src/models/factory.py
-
67src/utils/collection_name.py
@ -0,0 +1,120 @@ |
|||||
|
# Collection Naming Strategy |
||||
|
|
||||
|
## Why This Module Exists |
||||
|
|
||||
|
When working with vector databases like Qdrant, **the name of your collection is not cosmetic — it is a data integrity contract.** Mixing incompatible vectors in the same collection silently corrupts your search results with no error or warning. |
||||
|
|
||||
|
This module (`collection_name.py`) generates deterministic, self-documenting collection names that encode every critical parameter. If any parameter changes, a new name is produced, which means a new collection — preventing data corruption automatically. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## The Golden Rule of Vector Collections |
||||
|
|
||||
|
> **You MUST create a new collection whenever the mathematical comparison between two vectors becomes invalid.** |
||||
|
|
||||
|
If you compare a vector from **Model A** with a vector from **Model B**, the result is **meaningless garbage**. The numbers look real, cosine similarity returns a value, but it means nothing. There is no error. There is no crash. Your app just silently returns wrong answers. |
||||
|
|
||||
|
Therefore, incompatible vectors **cannot live in the same collection.** |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## The 4 Factors That Force a New Collection |
||||
|
|
||||
|
| # | Factor | Why It Forces a New Collection | Priority | |
||||
|
|---|--------|-------------------------------|----------| |
||||
|
| 1 | **Model** | `jina-v3` vectors are numerically unrelated to `openai-v3` vectors. They live in completely different mathematical spaces. | :red_circle: Critical | |
||||
|
| 2 | **Dimensions** | You cannot insert a 1024-dim vector into a collection built for 768-dim. Qdrant will reject it outright. | :red_circle: Critical | |
||||
|
| 3 | **Distance Metric** | A collection using `Cosine` ranks results differently than one using `Dot Product`. Mixing them invalidates your ranking logic. | :orange_circle: High | |
||||
|
| 4 | **Chunking Strategy** | If Batch A was chunked by paragraphs and Batch B by sentences, search results become biased and inconsistent. Apples vs. oranges. | :yellow_circle: Medium | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Naming Format |
||||
|
|
||||
|
``` |
||||
|
[PROJECT]_[MODEL]_[DIMENSIONS]d_[CHUNK_SIZE]t_[TYPE] |
||||
|
``` |
||||
|
|
||||
|
### Example Breakdown |
||||
|
|
||||
|
``` |
||||
|
imam_reza_jina-v3_1024d_512t_hybrid |
||||
|
│ │ │ │ │ |
||||
|
│ │ │ │ └── hybrid = sparse + dense vectors enabled |
||||
|
│ │ │ └── 512t = 512 tokens per chunk |
||||
|
│ │ └── 1024d = vector dimensionality |
||||
|
│ └── jina-v3 = embedding model |
||||
|
└── imam_reza = project / domain name |
||||
|
``` |
||||
|
|
||||
|
| Segment | Purpose | |
||||
|
|---------|---------| |
||||
|
| `imam_reza` | **Project/Domain** — keeps this data isolated from other apps or datasets. | |
||||
|
| `jina-v3` | **Model** — the embedding brain. Changing models = new collection, always. | |
||||
|
| `1024d` | **Dimensions** — tells you (and Qdrant) the vector size at a glance. | |
||||
|
| `512t` | **Chunk size** — "512 tokens per chunk". If you later experiment with 128-token chunks, a separate collection lets you compare performance fairly. | |
||||
|
| `hybrid` | **Search type** — indicates sparse vectors are enabled alongside dense vectors. | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## How It Works |
||||
|
|
||||
|
`get_collection_name()` pulls values from two sources: |
||||
|
|
||||
|
1. **`config/embeddings.yaml`** — the active embedding model, its `id`, and `dimensions`. |
||||
|
2. **Environment variables** (`.env`) — `BASE_COLLECTION_NAME`, `EMBEDDER_DIMENSIONS`, `CHUNK_SIZE`, `IS_HYBRID`. |
||||
|
|
||||
|
Every parameter can also be overridden explicitly via function arguments. |
||||
|
|
||||
|
### Resolution Priority (per parameter) |
||||
|
|
||||
|
| Parameter | 1st (highest) | 2nd | 3rd (fallback) | |
||||
|
|-----------|---------------|-----|-----------------| |
||||
|
| `project_name` | Explicit arg | `BASE_COLLECTION_NAME` env var | `"default_project"` | |
||||
|
| `model_name` | Explicit arg | `embeddings.yaml` → `default` key | — | |
||||
|
| `dimensions` | Explicit arg | `embeddings.yaml` → model config | `EMBEDDER_DIMENSIONS` env var | |
||||
|
| `chunk_size` | Explicit arg | `CHUNK_SIZE` env var | `500` | |
||||
|
| `is_hybrid` | Explicit arg | `IS_HYBRID` env var | `true` | |
||||
|
|
||||
|
### Model Lookup |
||||
|
|
||||
|
The `model_name` parameter is flexible — it accepts either: |
||||
|
- A **config key** from `embeddings.yaml` (e.g., `"jina_AI"`) |
||||
|
- A **model id** (e.g., `"jina-embeddings-v4"`) |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Usage |
||||
|
|
||||
|
```python |
||||
|
from src.utils.collection_name import get_collection_name |
||||
|
|
||||
|
# All defaults from config + env |
||||
|
name = get_collection_name() |
||||
|
# → "dovodi_collection_jina-embeddings-v4_1024d_500t_hybrid" |
||||
|
|
||||
|
# Override chunk size |
||||
|
name = get_collection_name(chunk_size=128) |
||||
|
# → "dovodi_collection_jina-embeddings-v4_1024d_128t_hybrid" |
||||
|
|
||||
|
# Dense-only collection |
||||
|
name = get_collection_name(is_hybrid=False) |
||||
|
# → "dovodi_collection_jina-embeddings-v4_1024d_500t_dense" |
||||
|
|
||||
|
# Different model from embeddings.yaml |
||||
|
name = get_collection_name(model_name="openai_small") |
||||
|
# → "dovodi_collection_text-embedding-3-small_1536d_500t_hybrid" |
||||
|
``` |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## What Happens If You Don't Do This |
||||
|
|
||||
|
| Scenario | What Goes Wrong | |
||||
|
|----------|----------------| |
||||
|
| You switch from `jina-v3` to `openai-v3` but keep the same collection | Old jina vectors and new openai vectors get compared. Search returns nonsense. **No error is raised.** | |
||||
|
| You change dimensions from 1024 to 768 | Qdrant crashes on insert — dimension mismatch. At least this one is loud. | |
||||
|
| You change chunk size from 512 to 128 | Short chunks get unfairly boosted in similarity scores against long chunks. Search quality degrades silently. | |
||||
|
| You switch from Cosine to Dot Product | Ranking logic is inverted for unnormalized vectors. Top results become bottom results. | |
||||
|
|
||||
|
The naming convention makes these mistakes **structurally impossible** — if any parameter changes, a new collection name is generated, and the old data is never touched. |
||||
@ -0,0 +1,204 @@ |
|||||
|
# Incremental Vector Synchronization |
||||
|
|
||||
|
**PostgreSQL → Qdrant | Database-Driven Incremental Embedding Pipeline** |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Overview |
||||
|
|
||||
|
This feature introduces a fully automated, admin-triggered pipeline that synchronizes textual data from a relational database (PostgreSQL / Django ORM) to a vector database (Qdrant). The system guarantees **zero duplicate embeddings**, **crash resilience**, and seamless **A/B testing** across different embedding models — all without manual file uploads. |
||||
|
|
||||
|
| Property | Value | |
||||
|
| ----------------- | -------------------------------------------- | |
||||
|
| **Source** | PostgreSQL (Django Models) | |
||||
|
| **Target** | Qdrant Vector Database | |
||||
|
| **State Tracking**| JSONB Arrays (`embedded_in` column) | |
||||
|
| **Deduplication** | Deterministic Hash ID via MD5 | |
||||
|
| **Trigger** | Admin UI → FastAPI Background Task | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Architecture |
||||
|
|
||||
|
``` |
||||
|
┌──────────────────────┐ HTTP POST ┌──────────────────────────┐ |
||||
|
│ Django Admin UI │ ────────────────────▸ │ FastAPI Agent │ |
||||
|
│ (EmbeddingSession) │ │ /api/sync-knowledge │ |
||||
|
└──────────────────────┘ └────────────┬─────────────┘ |
||||
|
▲ │ |
||||
|
│ progress updates BackgroundTask |
||||
|
│ (status, %, items) │ |
||||
|
│ ▼ |
||||
|
┌────────┴─────────────┐ ┌──────────────────────────┐ |
||||
|
│ PostgreSQL │ ◂────── read/update ──▸ │ run_global_embedding_ │ |
||||
|
│ embedded_in (JSONB) │ │ sync(session_id) │ |
||||
|
└──────────────────────┘ └────────────┬─────────────┘ |
||||
|
│ |
||||
|
embed + upsert |
||||
|
│ |
||||
|
▼ |
||||
|
┌──────────────────────────┐ |
||||
|
│ Qdrant Vector DB │ |
||||
|
│ (hybrid search collection)│ |
||||
|
└──────────────────────────┘ |
||||
|
``` |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## How It Works |
||||
|
|
||||
|
### 1. State Tracking via JSONB |
||||
|
|
||||
|
Every syncable Django model (e.g. `Article`, `Hadith`) includes an `embedded_in` column: |
||||
|
|
||||
|
```python |
||||
|
# Django model field |
||||
|
embedded_in = models.JSONField(default=list) |
||||
|
# Example value: ["dovodi_jina-embeddings-v4_hybrid", "dovodi_text-embedding-3-small_hybrid"] |
||||
|
``` |
||||
|
|
||||
|
**Why an array instead of a boolean?** A simple `is_embedded = True` flag cannot distinguish *which* model the row was embedded into. By storing the exact collection name, the system natively supports multiple embedding models simultaneously. Switching the active model in `config/embeddings.yaml` from `jina_AI` to `openai_small` causes every row to be detected as "missing" for the new collection — triggering a full, automatic re-sync. |
||||
|
|
||||
|
**Self-healing on update:** When an admin edits the text of a record in Django admin, the overridden `save()` method resets `embedded_in` to `[]`, guaranteeing the updated content is picked up in the next sync cycle. |
||||
|
|
||||
|
### 2. The Trigger Mechanism (Django → FastAPI) |
||||
|
|
||||
|
A dedicated `EmbeddingSession` model in Django tracks the lifecycle of each sync operation: |
||||
|
|
||||
|
| Field | Purpose | |
||||
|
| ----------------- | ------------------------------------------ | |
||||
|
| `status` | `PENDING` / `PROCESSING` / `COMPLETED` / `FAILED` | |
||||
|
| `total_items` | Total rows needing embedding | |
||||
|
| `processed_items` | Rows embedded so far | |
||||
|
| `progress` | Percentage (0–100) | |
||||
|
| `error_message` | Captured exception on failure | |
||||
|
|
||||
|
When the admin saves a new session, Django sends a **non-blocking** `HTTP POST` to the FastAPI agent: |
||||
|
|
||||
|
``` |
||||
|
POST /api/sync-knowledge |
||||
|
Body: { "session_id": 42 } |
||||
|
``` |
||||
|
|
||||
|
The FastAPI route (`src/api/routes.py`) receives this and delegates to a `BackgroundTask`: |
||||
|
|
||||
|
```python |
||||
|
@router.post("/api/sync-knowledge") |
||||
|
async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): |
||||
|
background_tasks.add_task(run_global_embedding_sync, request.session_id) |
||||
|
return {"status": "started", "session_id": request.session_id} |
||||
|
``` |
||||
|
|
||||
|
The Django admin UI renders a **live Tailwind progress bar** powered by `processed_items / total_items`. |
||||
|
|
||||
|
### 3. The Smart Background Worker |
||||
|
|
||||
|
The core logic lives in `src/knowledge/sync_rag.py` → `run_global_embedding_sync(session_id)`: |
||||
|
|
||||
|
**Step A — Resolve the active model:** |
||||
|
|
||||
|
```python |
||||
|
embed_factory = EmbeddingFactory() # reads config/embeddings.yaml |
||||
|
embedder = embed_factory.get_embedder() # returns the default embedder |
||||
|
vector_db = get_qdrant_store(embedder=embedder) |
||||
|
active_collection_name = vector_db.collection |
||||
|
``` |
||||
|
|
||||
|
The collection name is dynamically composed as `{BASE_COLLECTION_NAME}_{model_id}_hybrid` (e.g. `dovodi_jina-embeddings-v4_hybrid`). |
||||
|
|
||||
|
**Step B — Query only missing rows:** |
||||
|
|
||||
|
Using PostgreSQL's native JSONB containment operator (`@>`), the system fetches *only* the rows whose `embedded_in` array does **not** contain the active collection name: |
||||
|
|
||||
|
```sql |
||||
|
SELECT * FROM article_article |
||||
|
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb)) |
||||
|
``` |
||||
|
|
||||
|
This is the key to incremental sync — already-embedded rows are never touched. |
||||
|
|
||||
|
**Step C — Process, embed, upsert:** |
||||
|
|
||||
|
For each pending row, the worker: |
||||
|
|
||||
|
1. Formats the record into a continuous string (`TITLE: ... \n CONTENT: ...`) |
||||
|
2. Generates a **deterministic Qdrant ID** (see below) |
||||
|
3. Wraps it in an Agno `Document` and calls `vector_db.upsert()` |
||||
|
4. Updates the row's `embedded_in` array in PostgreSQL |
||||
|
5. Reports progress to the `EmbeddingSession` every 5 items |
||||
|
|
||||
|
### 4. Deterministic Deduplication |
||||
|
|
||||
|
To prevent duplicate vectors when resuming a crashed sync or re-embedding updated text, auto-generated UUIDs are **not used**. Instead, the system computes a deterministic ID: |
||||
|
|
||||
|
```python |
||||
|
hash_input = f"{data_type}_{row.id}_{active_collection_name}" # e.g. "ARTICLE_42_dovodi_jina-embeddings-v4_hybrid" |
||||
|
hash_id = hashlib.md5(hash_input.encode()).hexdigest() |
||||
|
qdrant_id = str(uuid.UUID(hash_id)) # valid UUID from MD5 |
||||
|
``` |
||||
|
|
||||
|
This ID is passed as `content_hash` to `vector_db.upsert()`, which causes Qdrant to **overwrite** any existing point with the same ID rather than creating a duplicate. |
||||
|
|
||||
|
### 5. Closing the Loop |
||||
|
|
||||
|
After a successful upsert, the worker appends the active collection name to the row's `embedded_in` array: |
||||
|
|
||||
|
```sql |
||||
|
UPDATE article_article |
||||
|
SET embedded_in = CAST(embedded_in AS jsonb) || CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb) |
||||
|
WHERE id = 42 |
||||
|
``` |
||||
|
|
||||
|
Once all tables are processed, the session status is set to `COMPLETED`. If any exception occurs, the status is set to `FAILED` with the error message captured. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## File Reference |
||||
|
|
||||
|
| File | Responsibility | |
||||
|
| --- | --- | |
||||
|
| `src/knowledge/sync_rag.py` | Core sync logic — queries missing rows, embeds, upserts, updates state | |
||||
|
| `src/api/routes.py` | FastAPI endpoint `/api/sync-knowledge` that triggers the background task | |
||||
|
| `src/main.py` | Application factory — includes the API router | |
||||
|
| `src/knowledge/embedding_factory.py` | Reads `config/embeddings.yaml` and instantiates the correct embedder | |
||||
|
| `src/knowledge/vector_store.py` | Builds the Qdrant client with dynamic collection naming | |
||||
|
| `config/embeddings.yaml` | Declares available embedding models and the active default | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Configuration |
||||
|
|
||||
|
The active embedding model is controlled by `config/embeddings.yaml`: |
||||
|
|
||||
|
```yaml |
||||
|
embeddings: |
||||
|
default: jina_AI # switch to "openai_small" to trigger full re-sync |
||||
|
|
||||
|
models: |
||||
|
openai_small: |
||||
|
provider: "openai" |
||||
|
id: "text-embedding-3-small" |
||||
|
dimensions: 1536 |
||||
|
api_key: ${OPENAI_API_KEY} |
||||
|
|
||||
|
jina_AI: |
||||
|
provider: "jinaai" |
||||
|
id: "jina-embeddings-v4" |
||||
|
dimensions: 1024 |
||||
|
api_key: ${JINA_API_KEY} |
||||
|
``` |
||||
|
|
||||
|
Changing `default` from one model to another is all that is needed — the sync worker will detect that no rows contain the new collection name and re-embed everything. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Why This Design is Production-Ready |
||||
|
|
||||
|
| Property | Detail | |
||||
|
| --- | --- | |
||||
|
| **Zero data duplication** | Deterministic MD5 hashing ensures the same row always maps to the same Qdrant point ID. Re-running sync never creates duplicates. | |
||||
|
| **Seamless model switching** | Changing the embedding model in YAML causes the system to treat all rows as "missing" for the new collection. No manual migration needed. | |
||||
|
| **Crash resilience** | State is committed row-by-row to PostgreSQL. If the server crashes at 99%, restarting picks up the remaining 1%. | |
||||
|
| **Cost efficiency** | Only un-embedded rows hit the AI API. Already-processed data is never re-sent, saving API costs. | |
||||
|
| **Non-blocking UI** | The Django admin fires an async HTTP call and renders live progress — the UI never freezes. | |
||||
|
| **Multi-table support** | The `tables_to_sync` list can be extended with new Django models without touching the core algorithm. | |
||||
@ -0,0 +1,7 @@ |
|||||
|
# Welcome to the Dovoodi Agent Docs |
||||
|
|
||||
|
This documentation covers the architecture, setup, and maintenance of the Dovoodi Agent microservice. |
||||
|
|
||||
|
### Quick Links |
||||
|
* If you are looking for how the data syncs from Django to Qdrant, see the [Incremental Vector Sync](INCREMENTAL_VECTOR_SYNC.md) guide. |
||||
|
* For prompt engineering details, check the [Dynamic System Prompt](DYNAMIC_SYSTEM_PROMPT.md). |
||||
@ -0,0 +1,2 @@ |
|||||
|
select * from wiki_wikicontent |
||||
|
where language_id = 69; |
||||
20
out.md
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,18 +1,17 @@ |
|||||
from typing import Optional, Any, Dict, List |
|
||||
from fastapi import APIRouter, HTTPException |
|
||||
from pydantic import BaseModel, Field |
|
||||
from src.agents.islamic_scholar_agent import IslamicScholarAgent |
|
||||
from src.models.factory import ModelFactory |
|
||||
from src.knowledge.embedding_factory import EmbeddingFactory |
|
||||
from src.knowledge.rag_pipeline import create_knowledge_base |
|
||||
from src.utils.load_settings import get_active_agent_config |
|
||||
from langfuse.decorators import observe, langfuse_context |
|
||||
import json |
|
||||
from fastapi.responses import StreamingResponse |
|
||||
|
from fastapi import APIRouter, BackgroundTasks |
||||
|
from pydantic import BaseModel |
||||
|
from src.knowledge.sync_wiki import run_wiki_embedding_sync |
||||
|
|
||||
|
router = APIRouter() |
||||
|
|
||||
|
class SyncRequest(BaseModel): |
||||
|
session_id: int |
||||
|
|
||||
# @router.get("/health") |
|
||||
# async def health_check(): |
|
||||
# """Health check endpoint""" |
|
||||
# return {"status": "healthy", "agent": "Islamic Scholar Agent"} |
|
||||
|
@router.post("/api/sync-wiki-knowledge") |
||||
|
async def sync_wiki_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): |
||||
|
""" |
||||
|
Triggers the background task to fetch Wiki HTML, convert to MD via Jina, |
||||
|
embed in Qdrant, and update the database. |
||||
|
""" |
||||
|
background_tasks.add_task(run_wiki_embedding_sync, request.session_id) |
||||
|
return {"status": "started", "session_id": request.session_id, "type": "wiki_sync"} |
||||
@ -0,0 +1,177 @@ |
|||||
|
import json |
||||
|
import hashlib |
||||
|
import uuid |
||||
|
import requests |
||||
|
import os |
||||
|
from sqlalchemy import text |
||||
|
from dotenv import load_dotenv |
||||
|
|
||||
|
from src.utils.load_settings import engine |
||||
|
from src.knowledge.embedding_factory import EmbeddingFactory |
||||
|
from src.knowledge.vector_store import get_qdrant_store |
||||
|
from agno.knowledge.document import Document |
||||
|
|
||||
|
load_dotenv() |
||||
|
JINA_API_KEY = os.getenv("JINA_API_KEY") |
||||
|
|
||||
|
def get_text_from_json(json_data, target_lang='fa'): |
||||
|
"""Helper to safely extract exactly the target text from Django JSONField arrays.""" |
||||
|
if not json_data or not isinstance(json_data, list): |
||||
|
return "Unknown" |
||||
|
|
||||
|
for entry in json_data: |
||||
|
if isinstance(entry, dict) and entry.get('language_code') == target_lang: |
||||
|
return entry.get('text', 'Unknown') |
||||
|
|
||||
|
if len(json_data) > 0 and isinstance(json_data[0], dict): |
||||
|
return json_data[0].get('text', 'Unknown') |
||||
|
|
||||
|
return "Unknown" |
||||
|
|
||||
|
def convert_html_to_md_jina(html_content: str, row_id: int) -> str: |
||||
|
"""Helper to call Jina AI and convert HTML to clean Markdown.""" |
||||
|
headers = { |
||||
|
"Authorization": f"Bearer {JINA_API_KEY}", |
||||
|
"Accept": "application/json" |
||||
|
} |
||||
|
files = { |
||||
|
'file': (f'document_{row_id}.html', html_content, 'text/html') |
||||
|
} |
||||
|
try: |
||||
|
response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30) |
||||
|
response.raise_for_status() |
||||
|
jina_data = response.json().get('data', {}) |
||||
|
return jina_data.get('content', '') |
||||
|
except Exception as e: |
||||
|
print(f"⚠️ [Jina Error] ID {row_id}: {e}") |
||||
|
return "" |
||||
|
|
||||
|
|
||||
|
def run_wiki_embedding_sync(session_id: int): |
||||
|
"""Background task to sync Wiki content -> Jina -> Qdrant -> Postgres""" |
||||
|
|
||||
|
# 1. Initialize Vector DB |
||||
|
embed_factory = EmbeddingFactory() |
||||
|
embedder = embed_factory.get_embedder() |
||||
|
vector_db = get_qdrant_store(embedder=embedder) |
||||
|
active_collection_name = vector_db.collection |
||||
|
|
||||
|
model_json_str = json.dumps([active_collection_name]) |
||||
|
print(f"🚀 Starting Wiki Sync Pipeline for model: {active_collection_name}") |
||||
|
|
||||
|
with engine.connect() as conn: |
||||
|
try: |
||||
|
# 2. Count pending Wiki items |
||||
|
count_query = text(""" |
||||
|
SELECT COUNT(*) FROM wiki_wikicontent |
||||
|
WHERE NOT (CAST(COALESCE(embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb)) |
||||
|
""") |
||||
|
total_pending = conn.execute(count_query, {"model_json": model_json_str}).scalar() |
||||
|
|
||||
|
# Update session to PROCESSING |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"), |
||||
|
{"t": total_pending, "id": session_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
if total_pending == 0: |
||||
|
print("✅ No new Wiki entries to process.") |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) |
||||
|
conn.commit() |
||||
|
return |
||||
|
|
||||
|
# 3. Fetch relational data for pending items |
||||
|
print(f"📥 Fetching {total_pending} pending Wiki entries from Database...") |
||||
|
fetch_query = text(""" |
||||
|
SELECT |
||||
|
wc.id as content_id, |
||||
|
wc.wiki_id, |
||||
|
wc.language_id as lang_code, |
||||
|
wc.content as html_content, |
||||
|
w.title as wiki_titles, |
||||
|
a.id as author_id, |
||||
|
a.name as author_names, |
||||
|
c.id as category_id, |
||||
|
c.name as cat_names |
||||
|
FROM wiki_wikicontent wc |
||||
|
JOIN wiki_wiki w ON wc.wiki_id = w.id |
||||
|
LEFT JOIN wiki_author a ON w.author_id = a.id |
||||
|
JOIN wiki_wikicategory c ON w.category_id = c.id |
||||
|
WHERE NOT (CAST(COALESCE(wc.embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb)) |
||||
|
""") |
||||
|
|
||||
|
pending_rows = conn.execute(fetch_query, {"model_json": model_json_str}).fetchall() |
||||
|
|
||||
|
processed = 0 |
||||
|
|
||||
|
# 4. Process each row sequentially |
||||
|
for row in pending_rows: |
||||
|
content_id = row.content_id |
||||
|
|
||||
|
# A. Extract Translations |
||||
|
wiki_title = get_text_from_json(row.wiki_titles, 'fa') |
||||
|
author_name = get_text_from_json(row.author_names, 'fa') |
||||
|
category_name = get_text_from_json(row.cat_names, 'fa') |
||||
|
|
||||
|
# B. Convert HTML to MD via Jina API |
||||
|
clean_text = convert_html_to_md_jina(row.html_content, content_id) |
||||
|
|
||||
|
if not clean_text: |
||||
|
print(f"⏭️ Skipping ID {content_id} due to empty Jina response.") |
||||
|
continue |
||||
|
|
||||
|
# C. Build the Rich Agno Document |
||||
|
narrative_text = ( |
||||
|
f"CATEGORY: {category_name}\n" |
||||
|
f"WIKI TITLE: {wiki_title}\n" |
||||
|
f"AUTHOR: {author_name}\n" |
||||
|
f"CONTENT:\n{clean_text}" |
||||
|
) |
||||
|
|
||||
|
payload = { |
||||
|
"source_type": "WIKI", |
||||
|
"content_id": content_id, |
||||
|
"wiki_id": row.wiki_id, |
||||
|
"wiki_title": wiki_title, |
||||
|
"category_id": row.category_id, |
||||
|
"category_name": category_name, |
||||
|
"author_id": row.author_id, |
||||
|
"author_name": author_name, |
||||
|
"language": row.lang_code |
||||
|
} |
||||
|
|
||||
|
hash_id = hashlib.md5(f"WIKI_{content_id}_{active_collection_name}".encode()).hexdigest() |
||||
|
qdrant_id = str(uuid.UUID(hash_id)) |
||||
|
|
||||
|
doc = Document(id=qdrant_id, content=narrative_text, meta_data=payload) |
||||
|
|
||||
|
# D. Upsert to Qdrant |
||||
|
vector_db.upsert(content_hash=qdrant_id, documents=[doc]) |
||||
|
|
||||
|
# E. Update PostgreSQL `embedded_in` column for this row |
||||
|
update_query = text(""" |
||||
|
UPDATE wiki_wikicontent |
||||
|
SET embedded_in = CAST(COALESCE(embedded_in, '[]') AS jsonb) || CAST(:model_json AS jsonb) |
||||
|
WHERE id = :id |
||||
|
""") |
||||
|
conn.execute(update_query, {"model_json": model_json_str, "id": content_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
# F. Update overall progress |
||||
|
processed += 1 |
||||
|
if processed % 5 == 0 or processed == total_pending: |
||||
|
pct = int((processed / total_pending) * 100) |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"), |
||||
|
{"p": pct, "proc": processed, "id": session_id}) |
||||
|
conn.commit() |
||||
|
print(f"🔄 Progress: {pct}% ({processed}/{total_pending})") |
||||
|
|
||||
|
# 5. Mark Session as Completed |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) |
||||
|
conn.commit() |
||||
|
print("🎉 Wiki Embedding Sync Completed Successfully!") |
||||
|
|
||||
|
except Exception as e: |
||||
|
print(f"❌ Fatal Error in Wiki Sync: {str(e)}") |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"), |
||||
|
{"err": str(e), "id": session_id}) |
||||
|
conn.commit() |
||||
@ -0,0 +1,67 @@ |
|||||
|
import os |
||||
|
import yaml |
||||
|
from pathlib import Path |
||||
|
from typing import Optional |
||||
|
|
||||
|
|
||||
|
def _load_embeddings_config() -> dict: |
||||
|
project_root = Path(__file__).resolve().parent.parent.parent |
||||
|
config_path = project_root / "config" / "embeddings.yaml" |
||||
|
|
||||
|
with open(config_path) as f: |
||||
|
content = f.read() |
||||
|
for key, val in os.environ.items(): |
||||
|
content = content.replace(f"${{{key}}}", val) |
||||
|
return yaml.safe_load(content) |
||||
|
|
||||
|
|
||||
|
def _get_active_model_config(config: dict, model_name: Optional[str] = None) -> dict: |
||||
|
embeddings = config["embeddings"] |
||||
|
name = model_name or embeddings["default"] |
||||
|
models = embeddings["models"] |
||||
|
|
||||
|
# Direct match on config key (e.g. "jina_AI") |
||||
|
if name in models: |
||||
|
return {**models[name], "name": name} |
||||
|
|
||||
|
# Fallback: match on the model's id field (e.g. "jina-embeddings-v4") |
||||
|
for key, cfg in models.items(): |
||||
|
if cfg.get("id") == name: |
||||
|
return {**cfg, "name": key} |
||||
|
|
||||
|
raise ValueError(f"Embedding model '{name}' not found in embeddings.yaml (searched by key and id)") |
||||
|
|
||||
|
|
||||
|
def get_collection_name( |
||||
|
*, |
||||
|
project_name: Optional[str] = None, |
||||
|
model_name: Optional[str] = None, |
||||
|
dimensions: Optional[int] = None, |
||||
|
chunk_size: Optional[int] = None, |
||||
|
is_hybrid: Optional[bool] = None, |
||||
|
) -> str: |
||||
|
"""Build a descriptive, deterministic collection name. |
||||
|
|
||||
|
Reads from env vars and embeddings.yaml, but every parameter |
||||
|
can be overridden explicitly. |
||||
|
|
||||
|
Pattern: {project}_{model}_{dim}d_{chunk}t_{type} |
||||
|
Example: dovodi_collection_jina-embeddings-v4_1024d_500t_hybrid |
||||
|
""" |
||||
|
config = _load_embeddings_config() |
||||
|
model_cfg = _get_active_model_config(config, model_name) |
||||
|
|
||||
|
project = project_name or os.getenv("BASE_COLLECTION_NAME", "default_project") |
||||
|
model_id = model_cfg["id"] |
||||
|
dim = dimensions or model_cfg.get("dimensions") or int(os.getenv("EMBEDDER_DIMENSIONS", "384")) |
||||
|
chunk = chunk_size or int(os.getenv("CHUNK_SIZE", "500")) |
||||
|
hybrid = is_hybrid if is_hybrid is not None else os.getenv("IS_HYBRID", "true").lower() == "true" |
||||
|
|
||||
|
clean_model = model_id.split("/")[-1] |
||||
|
type_suffix = "hybrid" if hybrid else "dense" |
||||
|
|
||||
|
return f"{project}_{clean_model}_{dim}d_{chunk}t_{type_suffix}" |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
print(get_collection_name()) |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue