9.8 KiB
Incremental Vector Synchronization
PostgreSQL → Qdrant | Database-Driven Incremental Embedding Pipeline
Overview
This feature introduces a fully automated, admin-triggered pipeline that synchronizes textual data from a relational database (PostgreSQL / Django ORM) to a vector database (Qdrant). The system guarantees zero duplicate embeddings, crash resilience, and seamless A/B testing across different embedding models — all without manual file uploads.
| Property | Value |
|---|---|
| Source | PostgreSQL (Django Models) |
| Target | Qdrant Vector Database |
| State Tracking | JSONB Arrays (embedded_in column) |
| Deduplication | Deterministic Hash ID via MD5 |
| Trigger | Admin UI → FastAPI Background Task |
Architecture
┌──────────────────────┐ HTTP POST ┌──────────────────────────┐
│ Django Admin UI │ ────────────────────▸ │ FastAPI Agent │
│ (EmbeddingSession) │ │ /api/sync-knowledge │
└──────────────────────┘ └────────────┬─────────────┘
▲ │
│ progress updates BackgroundTask
│ (status, %, items) │
│ ▼
┌────────┴─────────────┐ ┌──────────────────────────┐
│ PostgreSQL │ ◂────── read/update ──▸ │ run_global_embedding_ │
│ embedded_in (JSONB) │ │ sync(session_id) │
└──────────────────────┘ └────────────┬─────────────┘
│
embed + upsert
│
▼
┌──────────────────────────┐
│ Qdrant Vector DB │
│ (hybrid search collection)│
└──────────────────────────┘
How It Works
1. State Tracking via JSONB
Every syncable Django model (e.g. Article, Hadith) includes an embedded_in column:
# Django model field
embedded_in = models.JSONField(default=list)
# Example value: ["dovodi_jina-embeddings-v4_hybrid", "dovodi_text-embedding-3-small_hybrid"]
Why an array instead of a boolean? A simple is_embedded = True flag cannot distinguish which model the row was embedded into. By storing the exact collection name, the system natively supports multiple embedding models simultaneously. Switching the active model in config/embeddings.yaml from jina_AI to openai_small causes every row to be detected as "missing" for the new collection — triggering a full, automatic re-sync.
Self-healing on update: When an admin edits the text of a record in Django admin, the overridden save() method resets embedded_in to [], guaranteeing the updated content is picked up in the next sync cycle.
2. The Trigger Mechanism (Django → FastAPI)
A dedicated EmbeddingSession model in Django tracks the lifecycle of each sync operation:
| Field | Purpose |
|---|---|
status |
PENDING / PROCESSING / COMPLETED / FAILED |
total_items |
Total rows needing embedding |
processed_items |
Rows embedded so far |
progress |
Percentage (0–100) |
error_message |
Captured exception on failure |
When the admin saves a new session, Django sends a non-blocking HTTP POST to the FastAPI agent:
POST /api/sync-knowledge
Body: { "session_id": 42 }
The FastAPI route (src/api/routes.py) receives this and delegates to a BackgroundTask:
@router.post("/api/sync-knowledge")
async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks):
background_tasks.add_task(run_global_embedding_sync, request.session_id)
return {"status": "started", "session_id": request.session_id}
The Django admin UI renders a live Tailwind progress bar powered by processed_items / total_items.
3. The Smart Background Worker
The core logic lives in src/knowledge/sync_rag.py → run_global_embedding_sync(session_id):
Step A — Resolve the active model:
embed_factory = EmbeddingFactory() # reads config/embeddings.yaml
embedder = embed_factory.get_embedder() # returns the default embedder
vector_db = get_qdrant_store(embedder=embedder)
active_collection_name = vector_db.collection
The collection name is dynamically composed as {BASE_COLLECTION_NAME}_{model_id}_hybrid (e.g. dovodi_jina-embeddings-v4_hybrid).
Step B — Query only missing rows:
Using PostgreSQL's native JSONB containment operator (@>), the system fetches only the rows whose embedded_in array does not contain the active collection name:
SELECT * FROM article_article
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb))
This is the key to incremental sync — already-embedded rows are never touched.
Step C — Process, embed, upsert:
For each pending row, the worker:
- Formats the record into a continuous string (
TITLE: ... \n CONTENT: ...) - Generates a deterministic Qdrant ID (see below)
- Wraps it in an Agno
Documentand callsvector_db.upsert() - Updates the row's
embedded_inarray in PostgreSQL - Reports progress to the
EmbeddingSessionevery 5 items
4. Deterministic Deduplication
To prevent duplicate vectors when resuming a crashed sync or re-embedding updated text, auto-generated UUIDs are not used. Instead, the system computes a deterministic ID:
hash_input = f"{data_type}_{row.id}_{active_collection_name}" # e.g. "ARTICLE_42_dovodi_jina-embeddings-v4_hybrid"
hash_id = hashlib.md5(hash_input.encode()).hexdigest()
qdrant_id = str(uuid.UUID(hash_id)) # valid UUID from MD5
This ID is passed as content_hash to vector_db.upsert(), which causes Qdrant to overwrite any existing point with the same ID rather than creating a duplicate.
5. Closing the Loop
After a successful upsert, the worker appends the active collection name to the row's embedded_in array:
UPDATE article_article
SET embedded_in = CAST(embedded_in AS jsonb) || CAST('["dovodi_jina-embeddings-v4_hybrid"]' AS jsonb)
WHERE id = 42
Once all tables are processed, the session status is set to COMPLETED. If any exception occurs, the status is set to FAILED with the error message captured.
File Reference
| File | Responsibility |
|---|---|
src/knowledge/sync_rag.py |
Core sync logic — queries missing rows, embeds, upserts, updates state |
src/api/routes.py |
FastAPI endpoint /api/sync-knowledge that triggers the background task |
src/main.py |
Application factory — includes the API router |
src/knowledge/embedding_factory.py |
Reads config/embeddings.yaml and instantiates the correct embedder |
src/knowledge/vector_store.py |
Builds the Qdrant client with dynamic collection naming |
config/embeddings.yaml |
Declares available embedding models and the active default |
Configuration
The active embedding model is controlled by config/embeddings.yaml:
embeddings:
default: jina_AI # switch to "openai_small" to trigger full re-sync
models:
openai_small:
provider: "openai"
id: "text-embedding-3-small"
dimensions: 1536
api_key: ${OPENAI_API_KEY}
jina_AI:
provider: "jinaai"
id: "jina-embeddings-v4"
dimensions: 1024
api_key: ${JINA_API_KEY}
Changing default from one model to another is all that is needed — the sync worker will detect that no rows contain the new collection name and re-embed everything.
Why This Design is Production-Ready
| Property | Detail |
|---|---|
| Zero data duplication | Deterministic MD5 hashing ensures the same row always maps to the same Qdrant point ID. Re-running sync never creates duplicates. |
| Seamless model switching | Changing the embedding model in YAML causes the system to treat all rows as "missing" for the new collection. No manual migration needed. |
| Crash resilience | State is committed row-by-row to PostgreSQL. If the server crashes at 99%, restarting picks up the remaining 1%. |
| Cost efficiency | Only un-embedded rows hit the AI API. Already-processed data is never re-sent, saving API costs. |
| Non-blocking UI | The Django admin fires an async HTTP call and renders live progress — the UI never freezes. |
| Multi-table support | The tables_to_sync list can be extended with new Django models without touching the core algorithm. |