Browse Source
Add documentation for collection naming strategy and incremental vector synchronization. Implement background task for syncing data from PostgreSQL to Qdrant, ensuring zero duplicate embeddings and crash resilience. Update API routes to trigger synchronization process.
master
Add documentation for collection naming strategy and incremental vector synchronization. Implement background task for syncing data from PostgreSQL to Qdrant, ensuring zero duplicate embeddings and crash resilience. Update API routes to trigger synchronization process.
master
7 changed files with 536 additions and 16 deletions
-
120docs/COLLECTION_NAMING.md
-
204docs/INCREMENTAL_VECTOR_SYNC.md
-
7docs/index.md
-
18out.md
-
26src/api/routes.py
-
108src/knowledge/sync_rag.py
-
67src/utils/collection_name.py
@ -0,0 +1,120 @@ |
|||||
|
# Collection Naming Strategy |
||||
|
|
||||
|
## Why This Module Exists |
||||
|
|
||||
|
When working with vector databases like Qdrant, **the name of your collection is not cosmetic — it is a data integrity contract.** Mixing incompatible vectors in the same collection silently corrupts your search results with no error or warning. |
||||
|
|
||||
|
This module (`collection_name.py`) generates deterministic, self-documenting collection names that encode every critical parameter. If any parameter changes, a new name is produced, which means a new collection — preventing data corruption automatically. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## The Golden Rule of Vector Collections |
||||
|
|
||||
|
> **You MUST create a new collection whenever the mathematical comparison between two vectors becomes invalid.** |
||||
|
|
||||
|
If you compare a vector from **Model A** with a vector from **Model B**, the result is **meaningless garbage**. The numbers look real, cosine similarity returns a value, but it means nothing. There is no error. There is no crash. Your app just silently returns wrong answers. |
||||
|
|
||||
|
Therefore, incompatible vectors **cannot live in the same collection.** |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## The 4 Factors That Force a New Collection |
||||
|
|
||||
|
| # | Factor | Why It Forces a New Collection | Priority | |
||||
|
|---|--------|-------------------------------|----------| |
||||
|
| 1 | **Model** | `jina-v3` vectors are numerically unrelated to `openai-v3` vectors. They live in completely different mathematical spaces. | :red_circle: Critical | |
||||
|
| 2 | **Dimensions** | You cannot insert a 1024-dim vector into a collection built for 768-dim. Qdrant will reject it outright. | :red_circle: Critical | |
||||
|
| 3 | **Distance Metric** | A collection using `Cosine` ranks results differently than one using `Dot Product`. Mixing them invalidates your ranking logic. | :orange_circle: High | |
||||
|
| 4 | **Chunking Strategy** | If Batch A was chunked by paragraphs and Batch B by sentences, search results become biased and inconsistent. Apples vs. oranges. | :yellow_circle: Medium | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Naming Format |
||||
|
|
||||
|
``` |
||||
|
[PROJECT]_[MODEL]_[DIMENSIONS]d_[CHUNK_SIZE]t_[TYPE] |
||||
|
``` |
||||
|
|
||||
|
### Example Breakdown |
||||
|
|
||||
|
``` |
||||
|
imam_reza_jina-v3_1024d_512t_hybrid |
||||
|
│ │ │ │ │ |
||||
|
│ │ │ │ └── hybrid = sparse + dense vectors enabled |
||||
|
│ │ │ └── 512t = 512 tokens per chunk |
||||
|
│ │ └── 1024d = vector dimensionality |
||||
|
│ └── jina-v3 = embedding model |
||||
|
└── imam_reza = project / domain name |
||||
|
``` |
||||
|
|
||||
|
| Segment | Purpose | |
||||
|
|---------|---------| |
||||
|
| `imam_reza` | **Project/Domain** — keeps this data isolated from other apps or datasets. | |
||||
|
| `jina-v3` | **Model** — the embedding brain. Changing models = new collection, always. | |
||||
|
| `1024d` | **Dimensions** — tells you (and Qdrant) the vector size at a glance. | |
||||
|
| `512t` | **Chunk size** — "512 tokens per chunk". If you later experiment with 128-token chunks, a separate collection lets you compare performance fairly. | |
||||
|
| `hybrid` | **Search type** — indicates sparse vectors are enabled alongside dense vectors. | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## How It Works |
||||
|
|
||||
|
`get_collection_name()` pulls values from two sources: |
||||
|
|
||||
|
1. **`config/embeddings.yaml`** — the active embedding model, its `id`, and `dimensions`. |
||||
|
2. **Environment variables** (`.env`) — `BASE_COLLECTION_NAME`, `EMBEDDER_DIMENSIONS`, `CHUNK_SIZE`, `IS_HYBRID`. |
||||
|
|
||||
|
Every parameter can also be overridden explicitly via function arguments. |
||||
|
|
||||
|
### Resolution Priority (per parameter) |
||||
|
|
||||
|
| Parameter | 1st (highest) | 2nd | 3rd (fallback) | |
||||
|
|-----------|---------------|-----|-----------------| |
||||
|
| `project_name` | Explicit arg | `BASE_COLLECTION_NAME` env var | `"default_project"` | |
||||
|
| `model_name` | Explicit arg | `embeddings.yaml` → `default` key | — | |
||||
|
| `dimensions` | Explicit arg | `embeddings.yaml` → model config | `EMBEDDER_DIMENSIONS` env var | |
||||
|
| `chunk_size` | Explicit arg | `CHUNK_SIZE` env var | `500` | |
||||
|
| `is_hybrid` | Explicit arg | `IS_HYBRID` env var | `true` | |
||||
|
|
||||
|
### Model Lookup |
||||
|
|
||||
|
The `model_name` parameter is flexible — it accepts either: |
||||
|
- A **config key** from `embeddings.yaml` (e.g., `"jina_AI"`) |
||||
|
- A **model id** (e.g., `"jina-embeddings-v4"`) |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Usage |
||||
|
|
||||
|
```python |
||||
|
from src.utils.collection_name import get_collection_name |
||||
|
|
||||
|
# All defaults from config + env |
||||
|
name = get_collection_name() |
||||
|
# → "dovodi_collection_jina-embeddings-v4_1024d_500t_hybrid" |
||||
|
|
||||
|
# Override chunk size |
||||
|
name = get_collection_name(chunk_size=128) |
||||
|
# → "dovodi_collection_jina-embeddings-v4_1024d_128t_hybrid" |
||||
|
|
||||
|
# Dense-only collection |
||||
|
name = get_collection_name(is_hybrid=False) |
||||
|
# → "dovodi_collection_jina-embeddings-v4_1024d_500t_dense" |
||||
|
|
||||
|
# Different model from embeddings.yaml |
||||
|
name = get_collection_name(model_name="openai_small") |
||||
|
# → "dovodi_collection_text-embedding-3-small_1536d_500t_hybrid" |
||||
|
``` |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## What Happens If You Don't Do This |
||||
|
|
||||
|
| Scenario | What Goes Wrong | |
||||
|
|----------|----------------| |
||||
|
| You switch from `jina-v3` to `openai-v3` but keep the same collection | Old jina vectors and new openai vectors get compared. Search returns nonsense. **No error is raised.** | |
||||
|
| You change dimensions from 1024 to 768 | Qdrant crashes on insert — dimension mismatch. At least this one is loud. | |
||||
|
| You change chunk size from 512 to 128 | Short chunks get unfairly boosted in similarity scores against long chunks. Search quality degrades silently. | |
||||
|
| You switch from Cosine to Dot Product | Ranking logic is inverted for unnormalized vectors. Top results become bottom results. | |
||||
|
|
||||
|
The naming convention makes these mistakes **structurally impossible** — if any parameter changes, a new collection name is generated, and the old data is never touched. |
||||
@ -0,0 +1,204 @@ |
|||||
|
# Incremental Vector Synchronization |
||||
|
|
||||
|
**PostgreSQL → Qdrant | Database-Driven Incremental Embedding Pipeline** |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Overview |
||||
|
|
||||
|
This feature introduces a fully automated, admin-triggered pipeline that synchronizes textual data from a relational database (PostgreSQL / Django ORM) to a vector database (Qdrant). The system guarantees **zero duplicate embeddings**, **crash resilience**, and seamless **A/B testing** across different embedding models — all without manual file uploads. |
||||
|
|
||||
|
| Property | Value | |
||||
|
| ----------------- | -------------------------------------------- | |
||||
|
| **Source** | PostgreSQL (Django Models) | |
||||
|
| **Target** | Qdrant Vector Database | |
||||
|
| **State Tracking**| JSONB Arrays (`embedded_in` column) | |
||||
|
| **Deduplication** | Deterministic Hash ID via MD5 | |
||||
|
| **Trigger** | Admin UI → FastAPI Background Task | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Architecture |
||||
|
|
||||
|
``` |
||||
|
┌──────────────────────┐ HTTP POST ┌──────────────────────────┐ |
||||
|
│ Django Admin UI │ ────────────────────▸ │ FastAPI Agent │ |
||||
|
│ (EmbeddingSession) │ │ /api/sync-knowledge │ |
||||
|
└──────────────────────┘ └────────────┬─────────────┘ |
||||
|
▲ │ |
||||
|
│ progress updates BackgroundTask |
||||
|
│ (status, %, items) │ |
||||
|
│ ▼ |
||||
|
┌────────┴─────────────┐ ┌──────────────────────────┐ |
||||
|
│ PostgreSQL │ ◂────── read/update ──▸ │ run_global_embedding_ │ |
||||
|
│ embedded_in (JSONB) │ │ sync(session_id) │ |
||||
|
└──────────────────────┘ └────────────┬─────────────┘ |
||||
|
│ |
||||
|
embed + upsert |
||||
|
│ |
||||
|
▼ |
||||
|
┌──────────────────────────┐ |
||||
|
│ Qdrant Vector DB │ |
||||
|
│ (hybrid search collection)│ |
||||
|
└──────────────────────────┘ |
||||
|
``` |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## How It Works |
||||
|
|
||||
|
### 1. State Tracking via JSONB |
||||
|
|
||||
|
Every syncable Django model (e.g. `Article`, `Hadith`) includes an `embedded_in` column: |
||||
|
|
||||
|
```python |
||||
|
# Django model field |
||||
|
embedded_in = models.JSONField(default=list) |
||||
|
# Example value: ["dovodi_jina-embeddings-v4_hybrid", "dovodi_text-embedding-3-small_hybrid"] |
||||
|
``` |
||||
|
|
||||
|
**Why an array instead of a boolean?** A simple `is_embedded = True` flag cannot distinguish *which* model the row was embedded into. By storing the exact collection name, the system natively supports multiple embedding models simultaneously. Switching the active model in `config/embeddings.yaml` from `jina_AI` to `openai_small` causes every row to be detected as "missing" for the new collection — triggering a full, automatic re-sync. |
||||
|
|
||||
|
**Self-healing on update:** When an admin edits the text of a record in Django admin, the overridden `save()` method resets `embedded_in` to `[]`, guaranteeing the updated content is picked up in the next sync cycle. |
||||
|
|
||||
|
### 2. The Trigger Mechanism (Django → FastAPI) |
||||
|
|
||||
|
A dedicated `EmbeddingSession` model in Django tracks the lifecycle of each sync operation: |
||||
|
|
||||
|
| Field | Purpose | |
||||
|
| ----------------- | ------------------------------------------ | |
||||
|
| `status` | `PENDING` / `PROCESSING` / `COMPLETED` / `FAILED` | |
||||
|
| `total_items` | Total rows needing embedding | |
||||
|
| `processed_items` | Rows embedded so far | |
||||
|
| `progress` | Percentage (0–100) | |
||||
|
| `error_message` | Captured exception on failure | |
||||
|
|
||||
|
When the admin saves a new session, Django sends a **non-blocking** `HTTP POST` to the FastAPI agent: |
||||
|
|
||||
|
``` |
||||
|
POST /api/sync-knowledge |
||||
|
Body: { "session_id": 42 } |
||||
|
``` |
||||
|
|
||||
|
The FastAPI route (`src/api/routes.py`) receives this and delegates to a `BackgroundTask`: |
||||
|
|
||||
|
```python |
||||
|
@router.post("/api/sync-knowledge") |
||||
|
async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): |
||||
|
background_tasks.add_task(run_global_embedding_sync, request.session_id) |
||||
|
return {"status": "started", "session_id": request.session_id} |
||||
|
``` |
||||
|
|
||||
|
The Django admin UI renders a **live Tailwind progress bar** powered by `processed_items / total_items`. |
||||
|
|
||||
|
### 3. The Smart Background Worker |
||||
|
|
||||
|
The core logic lives in `src/knowledge/sync_rag.py` → `run_global_embedding_sync(session_id)`: |
||||
|
|
||||
|
**Step A — Resolve the active model:** |
||||
|
|
||||
|
```python |
||||
|
embed_factory = EmbeddingFactory() # reads config/embeddings.yaml |
||||
|
embedder = embed_factory.get_embedder() # returns the default embedder |
||||
|
vector_db = get_qdrant_store(embedder=embedder) |
||||
|
active_collection_name = vector_db.collection |
||||
|
``` |
||||
|
|
||||
|
The collection name is dynamically composed as `{BASE_COLLECTION_NAME}_{model_id}_hybrid` (e.g. `dovodi_jina-embeddings-v4_hybrid`). |
||||
|
|
||||
|
**Step B — Query only missing rows:** |
||||
|
|
||||
|
Using PostgreSQL's native JSONB containment operator (`@>`), the system fetches *only* the rows whose `embedded_in` array does **not** contain the active collection name: |
||||
|
|
||||
|
```sql |
||||
|
SELECT * FROM article_article |
||||
|
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb)) |
||||
|
``` |
||||
|
|
||||
|
This is the key to incremental sync — already-embedded rows are never touched. |
||||
|
|
||||
|
**Step C — Process, embed, upsert:** |
||||
|
|
||||
|
For each pending row, the worker: |
||||
|
|
||||
|
1. Formats the record into a continuous string (`TITLE: ... \n CONTENT: ...`) |
||||
|
2. Generates a **deterministic Qdrant ID** (see below) |
||||
|
3. Wraps it in an Agno `Document` and calls `vector_db.upsert()` |
||||
|
4. Updates the row's `embedded_in` array in PostgreSQL |
||||
|
5. Reports progress to the `EmbeddingSession` every 5 items |
||||
|
|
||||
|
### 4. Deterministic Deduplication |
||||
|
|
||||
|
To prevent duplicate vectors when resuming a crashed sync or re-embedding updated text, auto-generated UUIDs are **not used**. Instead, the system computes a deterministic ID: |
||||
|
|
||||
|
```python |
||||
|
hash_input = f"{data_type}_{row.id}_{active_collection_name}" # e.g. "ARTICLE_42_dovodi_jina-embeddings-v4_hybrid" |
||||
|
hash_id = hashlib.md5(hash_input.encode()).hexdigest() |
||||
|
qdrant_id = str(uuid.UUID(hash_id)) # valid UUID from MD5 |
||||
|
``` |
||||
|
|
||||
|
This ID is passed as `content_hash` to `vector_db.upsert()`, which causes Qdrant to **overwrite** any existing point with the same ID rather than creating a duplicate. |
||||
|
|
||||
|
### 5. Closing the Loop |
||||
|
|
||||
|
After a successful upsert, the worker appends the active collection name to the row's `embedded_in` array: |
||||
|
|
||||
|
```sql |
||||
|
UPDATE article_article |
||||
|
SET embedded_in = CAST(embedded_in AS jsonb) || CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb) |
||||
|
WHERE id = 42 |
||||
|
``` |
||||
|
|
||||
|
Once all tables are processed, the session status is set to `COMPLETED`. If any exception occurs, the status is set to `FAILED` with the error message captured. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## File Reference |
||||
|
|
||||
|
| File | Responsibility | |
||||
|
| --- | --- | |
||||
|
| `src/knowledge/sync_rag.py` | Core sync logic — queries missing rows, embeds, upserts, updates state | |
||||
|
| `src/api/routes.py` | FastAPI endpoint `/api/sync-knowledge` that triggers the background task | |
||||
|
| `src/main.py` | Application factory — includes the API router | |
||||
|
| `src/knowledge/embedding_factory.py` | Reads `config/embeddings.yaml` and instantiates the correct embedder | |
||||
|
| `src/knowledge/vector_store.py` | Builds the Qdrant client with dynamic collection naming | |
||||
|
| `config/embeddings.yaml` | Declares available embedding models and the active default | |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Configuration |
||||
|
|
||||
|
The active embedding model is controlled by `config/embeddings.yaml`: |
||||
|
|
||||
|
```yaml |
||||
|
embeddings: |
||||
|
default: jina_AI # switch to "openai_small" to trigger full re-sync |
||||
|
|
||||
|
models: |
||||
|
openai_small: |
||||
|
provider: "openai" |
||||
|
id: "text-embedding-3-small" |
||||
|
dimensions: 1536 |
||||
|
api_key: ${OPENAI_API_KEY} |
||||
|
|
||||
|
jina_AI: |
||||
|
provider: "jinaai" |
||||
|
id: "jina-embeddings-v4" |
||||
|
dimensions: 1024 |
||||
|
api_key: ${JINA_API_KEY} |
||||
|
``` |
||||
|
|
||||
|
Changing `default` from one model to another is all that is needed — the sync worker will detect that no rows contain the new collection name and re-embed everything. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## Why This Design is Production-Ready |
||||
|
|
||||
|
| Property | Detail | |
||||
|
| --- | --- | |
||||
|
| **Zero data duplication** | Deterministic MD5 hashing ensures the same row always maps to the same Qdrant point ID. Re-running sync never creates duplicates. | |
||||
|
| **Seamless model switching** | Changing the embedding model in YAML causes the system to treat all rows as "missing" for the new collection. No manual migration needed. | |
||||
|
| **Crash resilience** | State is committed row-by-row to PostgreSQL. If the server crashes at 99%, restarting picks up the remaining 1%. | |
||||
|
| **Cost efficiency** | Only un-embedded rows hit the AI API. Already-processed data is never re-sent, saving API costs. | |
||||
|
| **Non-blocking UI** | The Django admin fires an async HTTP call and renders live progress — the UI never freezes. | |
||||
|
| **Multi-table support** | The `tables_to_sync` list can be extended with new Django models without touching the core algorithm. | |
||||
@ -0,0 +1,7 @@ |
|||||
|
# Welcome to the Dovoodi Agent Docs |
||||
|
|
||||
|
This documentation covers the architecture, setup, and maintenance of the Dovoodi Agent microservice. |
||||
|
|
||||
|
### Quick Links |
||||
|
* If you are looking for how the data syncs from Django to Qdrant, see the [Incremental Vector Sync](INCREMENTAL_VECTOR_SYNC.md) guide. |
||||
|
* For prompt engineering details, check the [Dynamic System Prompt](DYNAMIC_SYSTEM_PROMPT.md). |
||||
18
out.md
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,18 +1,14 @@ |
|||||
from typing import Optional, Any, Dict, List |
|
||||
from fastapi import APIRouter, HTTPException |
|
||||
from pydantic import BaseModel, Field |
|
||||
from src.agents.islamic_scholar_agent import IslamicScholarAgent |
|
||||
from src.models.factory import ModelFactory |
|
||||
from src.knowledge.embedding_factory import EmbeddingFactory |
|
||||
from src.knowledge.rag_pipeline import create_knowledge_base |
|
||||
from src.utils.load_settings import get_active_agent_config |
|
||||
from langfuse.decorators import observe, langfuse_context |
|
||||
import json |
|
||||
from fastapi.responses import StreamingResponse |
|
||||
|
from fastapi import APIRouter, BackgroundTasks |
||||
|
from pydantic import BaseModel |
||||
|
from src.knowledge.sync_rag import run_global_embedding_sync |
||||
|
|
||||
|
router = APIRouter() |
||||
|
|
||||
|
class SyncRequest(BaseModel): |
||||
|
session_id: int |
||||
|
|
||||
# @router.get("/health") |
|
||||
# async def health_check(): |
|
||||
# """Health check endpoint""" |
|
||||
# return {"status": "healthy", "agent": "Islamic Scholar Agent"} |
|
||||
|
@router.post("/api/sync-knowledge") |
||||
|
async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): |
||||
|
# Pass the session ID to the background worker |
||||
|
background_tasks.add_task(run_global_embedding_sync, request.session_id) |
||||
|
return {"status": "started", "session_id": request.session_id} |
||||
@ -0,0 +1,108 @@ |
|||||
|
# src/knowledge/rag_pipeline.py (FastAPI Agent) |
||||
|
import json |
||||
|
import hashlib |
||||
|
import uuid |
||||
|
from sqlalchemy import text |
||||
|
from src.utils.load_settings import engine |
||||
|
from src.knowledge.embedding_factory import EmbeddingFactory |
||||
|
from src.knowledge.vector_store import get_qdrant_store |
||||
|
from agno.knowledge.document import Document |
||||
|
def run_global_embedding_sync(session_id: int): |
||||
|
"""Background task to selectively embed ALL missing data across tables""" |
||||
|
|
||||
|
# 1. Get the ACTIVE embedder from your own Agent config! |
||||
|
embed_factory = EmbeddingFactory() |
||||
|
embedder = embed_factory.get_embedder() # Uses default from yaml |
||||
|
vector_db = get_qdrant_store(embedder=embedder) |
||||
|
|
||||
|
active_collection_name = vector_db.collection |
||||
|
|
||||
|
print(f"🚀 Starting Global Sync for model: {active_collection_name}") |
||||
|
|
||||
|
# List of tables to process and their data type prefix |
||||
|
# Change 'myapp' to your actual Django app name! |
||||
|
tables_to_sync = [ |
||||
|
# {"table": "hadith_hadith", "type": "HADITH"}, |
||||
|
{"table": "article_article", "type": "ARTICLE"} |
||||
|
] |
||||
|
|
||||
|
with engine.connect() as conn: |
||||
|
try: |
||||
|
# 2. Count TOTAL pending items across all tables |
||||
|
total_pending = 0 |
||||
|
model_json_str = json.dumps([active_collection_name]) |
||||
|
|
||||
|
for t in tables_to_sync: |
||||
|
count_query = text(f""" |
||||
|
SELECT COUNT(*) FROM {t['table']} |
||||
|
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST(:model_json AS jsonb)) |
||||
|
""") |
||||
|
count = conn.execute(count_query, {"model_json": model_json_str}).scalar() |
||||
|
total_pending += count |
||||
|
|
||||
|
# Update session to PROCESSING |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"), |
||||
|
{"t": total_pending, "id": session_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
if total_pending == 0: |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) |
||||
|
conn.commit() |
||||
|
return |
||||
|
|
||||
|
# 3. Process each table |
||||
|
processed = 0 |
||||
|
|
||||
|
for t in tables_to_sync: |
||||
|
table_name = t['table'] |
||||
|
data_type = t['type'] |
||||
|
|
||||
|
# Fetch pending rows for this specific table |
||||
|
query = text(f""" |
||||
|
SELECT * FROM {table_name} |
||||
|
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST(:model_json AS jsonb)) |
||||
|
""") |
||||
|
pending_rows = conn.execute(query, {"model_json": model_json_str}).fetchall() |
||||
|
|
||||
|
for row in pending_rows: |
||||
|
# Build dynamic content based on the table type |
||||
|
if data_type == "HADITH": |
||||
|
content = f"HADITH TYPE: HADITH\nTITLE: {row.title}\nARABIC: {row.arabic_text}\nTRANSLATION: {row.translation}\nSOURCE: {row.source_info}" |
||||
|
elif data_type == "ARTICLE": |
||||
|
content = f"ARTICLE TYPE: ARTICLE\nTITLE: {row.title}\nDESCRIPTION: {row.description}\nCONTENT: {row.content}" |
||||
|
|
||||
|
# Generate deterministic Qdrant ID (Prefix + DB ID + Model) |
||||
|
hash_id = hashlib.md5(f"{data_type}_{row.id}_{active_collection_name}".encode()).hexdigest() |
||||
|
qdrant_id = str(uuid.UUID(hash_id)) |
||||
|
|
||||
|
# Insert into Qdrant |
||||
|
# 🟢 THE FIX: Wrap the text in an Agno Document and use upsert() |
||||
|
doc = Document(id=qdrant_id, content=content) |
||||
|
vector_db.upsert(content_hash=qdrant_id, documents=[doc]) |
||||
|
|
||||
|
# 🟢 Update the JSON array in PostgreSQL |
||||
|
update_query = text(f""" |
||||
|
UPDATE {table_name} |
||||
|
SET embedded_in = CAST(embedded_in AS jsonb) || CAST(:model_json AS jsonb) |
||||
|
WHERE id = :id |
||||
|
""") |
||||
|
conn.execute(update_query, {"model_json": model_json_str, "id": row.id}) |
||||
|
conn.commit() # Commit row-by-row or batch to ensure state is saved |
||||
|
|
||||
|
processed += 1 |
||||
|
|
||||
|
# Update progress every 5 items |
||||
|
if processed % 5 == 0 or processed == total_pending: |
||||
|
pct = int((processed / total_pending) * 100) |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"), |
||||
|
{"p": pct, "proc": processed, "id": session_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
# 4. Mark Completed |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED' WHERE id=:id"), {"id": session_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
except Exception as e: |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"), |
||||
|
{"err": str(e), "id": session_id}) |
||||
|
conn.commit() |
||||
@ -0,0 +1,67 @@ |
|||||
|
import os |
||||
|
import yaml |
||||
|
from pathlib import Path |
||||
|
from typing import Optional |
||||
|
|
||||
|
|
||||
|
def _load_embeddings_config() -> dict: |
||||
|
project_root = Path(__file__).resolve().parent.parent.parent |
||||
|
config_path = project_root / "config" / "embeddings.yaml" |
||||
|
|
||||
|
with open(config_path) as f: |
||||
|
content = f.read() |
||||
|
for key, val in os.environ.items(): |
||||
|
content = content.replace(f"${{{key}}}", val) |
||||
|
return yaml.safe_load(content) |
||||
|
|
||||
|
|
||||
|
def _get_active_model_config(config: dict, model_name: Optional[str] = None) -> dict: |
||||
|
embeddings = config["embeddings"] |
||||
|
name = model_name or embeddings["default"] |
||||
|
models = embeddings["models"] |
||||
|
|
||||
|
# Direct match on config key (e.g. "jina_AI") |
||||
|
if name in models: |
||||
|
return {**models[name], "name": name} |
||||
|
|
||||
|
# Fallback: match on the model's id field (e.g. "jina-embeddings-v4") |
||||
|
for key, cfg in models.items(): |
||||
|
if cfg.get("id") == name: |
||||
|
return {**cfg, "name": key} |
||||
|
|
||||
|
raise ValueError(f"Embedding model '{name}' not found in embeddings.yaml (searched by key and id)") |
||||
|
|
||||
|
|
||||
|
def get_collection_name( |
||||
|
*, |
||||
|
project_name: Optional[str] = None, |
||||
|
model_name: Optional[str] = None, |
||||
|
dimensions: Optional[int] = None, |
||||
|
chunk_size: Optional[int] = None, |
||||
|
is_hybrid: Optional[bool] = None, |
||||
|
) -> str: |
||||
|
"""Build a descriptive, deterministic collection name. |
||||
|
|
||||
|
Reads from env vars and embeddings.yaml, but every parameter |
||||
|
can be overridden explicitly. |
||||
|
|
||||
|
Pattern: {project}_{model}_{dim}d_{chunk}t_{type} |
||||
|
Example: dovodi_collection_jina-embeddings-v4_1024d_500t_hybrid |
||||
|
""" |
||||
|
config = _load_embeddings_config() |
||||
|
model_cfg = _get_active_model_config(config, model_name) |
||||
|
|
||||
|
project = project_name or os.getenv("BASE_COLLECTION_NAME", "default_project") |
||||
|
model_id = model_cfg["id"] |
||||
|
dim = dimensions or model_cfg.get("dimensions") or int(os.getenv("EMBEDDER_DIMENSIONS", "384")) |
||||
|
chunk = chunk_size or int(os.getenv("CHUNK_SIZE", "500")) |
||||
|
hybrid = is_hybrid if is_hybrid is not None else os.getenv("IS_HYBRID", "true").lower() == "true" |
||||
|
|
||||
|
clean_model = model_id.split("/")[-1] |
||||
|
type_suffix = "hybrid" if hybrid else "dense" |
||||
|
|
||||
|
return f"{project}_{clean_model}_{dim}d_{chunk}t_{type_suffix}" |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
print(get_collection_name()) |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue