Browse Source
Add new SQL file for wiki content retrieval and refactor API routes to implement a new background task for syncing Wiki data to Qdrant. Remove deprecated global embedding sync functionality and introduce a dedicated sync process for Wiki content, enhancing the overall data synchronization strategy.
master
Add new SQL file for wiki content retrieval and refactor API routes to implement a new background task for syncing Wiki data to Qdrant. Remove deprecated global embedding sync functionality and introduce a dedicated sync process for Wiki content, enhancing the overall data synchronization strategy.
master
4 changed files with 188 additions and 114 deletions
-
2imam-reza.session.sql
-
15src/api/routes.py
-
108src/knowledge/sync_rag.py
-
177src/knowledge/sync_wiki.py
@ -0,0 +1,2 @@ |
|||||
|
select * from wiki_wikicontent |
||||
|
where language_id = 69; |
||||
@ -1,14 +1,17 @@ |
|||||
from fastapi import APIRouter, BackgroundTasks |
from fastapi import APIRouter, BackgroundTasks |
||||
from pydantic import BaseModel |
from pydantic import BaseModel |
||||
from src.knowledge.sync_rag import run_global_embedding_sync |
|
||||
|
from src.knowledge.sync_wiki import run_wiki_embedding_sync |
||||
|
|
||||
router = APIRouter() |
router = APIRouter() |
||||
|
|
||||
class SyncRequest(BaseModel): |
class SyncRequest(BaseModel): |
||||
session_id: int |
session_id: int |
||||
|
|
||||
@router.post("/api/sync-knowledge") |
|
||||
async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): |
|
||||
# Pass the session ID to the background worker |
|
||||
background_tasks.add_task(run_global_embedding_sync, request.session_id) |
|
||||
return {"status": "started", "session_id": request.session_id} |
|
||||
|
@router.post("/api/sync-wiki-knowledge") |
||||
|
async def sync_wiki_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): |
||||
|
""" |
||||
|
Triggers the background task to fetch Wiki HTML, convert to MD via Jina, |
||||
|
embed in Qdrant, and update the database. |
||||
|
""" |
||||
|
background_tasks.add_task(run_wiki_embedding_sync, request.session_id) |
||||
|
return {"status": "started", "session_id": request.session_id, "type": "wiki_sync"} |
||||
@ -1,108 +0,0 @@ |
|||||
# src/knowledge/rag_pipeline.py (FastAPI Agent) |
|
||||
import json |
|
||||
import hashlib |
|
||||
import uuid |
|
||||
from sqlalchemy import text |
|
||||
from src.utils.load_settings import engine |
|
||||
from src.knowledge.embedding_factory import EmbeddingFactory |
|
||||
from src.knowledge.vector_store import get_qdrant_store |
|
||||
from agno.knowledge.document import Document |
|
||||
def run_global_embedding_sync(session_id: int): |
|
||||
"""Background task to selectively embed ALL missing data across tables""" |
|
||||
|
|
||||
# 1. Get the ACTIVE embedder from your own Agent config! |
|
||||
embed_factory = EmbeddingFactory() |
|
||||
embedder = embed_factory.get_embedder() # Uses default from yaml |
|
||||
vector_db = get_qdrant_store(embedder=embedder) |
|
||||
|
|
||||
active_collection_name = vector_db.collection |
|
||||
|
|
||||
print(f"🚀 Starting Global Sync for model: {active_collection_name}") |
|
||||
|
|
||||
# List of tables to process and their data type prefix |
|
||||
# Change 'myapp' to your actual Django app name! |
|
||||
tables_to_sync = [ |
|
||||
# {"table": "hadith_hadith", "type": "HADITH"}, |
|
||||
{"table": "article_article", "type": "ARTICLE"} |
|
||||
] |
|
||||
|
|
||||
with engine.connect() as conn: |
|
||||
try: |
|
||||
# 2. Count TOTAL pending items across all tables |
|
||||
total_pending = 0 |
|
||||
model_json_str = json.dumps([active_collection_name]) |
|
||||
|
|
||||
for t in tables_to_sync: |
|
||||
count_query = text(f""" |
|
||||
SELECT COUNT(*) FROM {t['table']} |
|
||||
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST(:model_json AS jsonb)) |
|
||||
""") |
|
||||
count = conn.execute(count_query, {"model_json": model_json_str}).scalar() |
|
||||
total_pending += count |
|
||||
|
|
||||
# Update session to PROCESSING |
|
||||
conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"), |
|
||||
{"t": total_pending, "id": session_id}) |
|
||||
conn.commit() |
|
||||
|
|
||||
if total_pending == 0: |
|
||||
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) |
|
||||
conn.commit() |
|
||||
return |
|
||||
|
|
||||
# 3. Process each table |
|
||||
processed = 0 |
|
||||
|
|
||||
for t in tables_to_sync: |
|
||||
table_name = t['table'] |
|
||||
data_type = t['type'] |
|
||||
|
|
||||
# Fetch pending rows for this specific table |
|
||||
query = text(f""" |
|
||||
SELECT * FROM {table_name} |
|
||||
WHERE NOT (CAST(embedded_in AS jsonb) @> CAST(:model_json AS jsonb)) |
|
||||
""") |
|
||||
pending_rows = conn.execute(query, {"model_json": model_json_str}).fetchall() |
|
||||
|
|
||||
for row in pending_rows: |
|
||||
# Build dynamic content based on the table type |
|
||||
if data_type == "HADITH": |
|
||||
content = f"HADITH TYPE: HADITH\nTITLE: {row.title}\nARABIC: {row.arabic_text}\nTRANSLATION: {row.translation}\nSOURCE: {row.source_info}" |
|
||||
elif data_type == "ARTICLE": |
|
||||
content = f"ARTICLE TYPE: ARTICLE\nTITLE: {row.title}\nDESCRIPTION: {row.description}\nCONTENT: {row.content}" |
|
||||
|
|
||||
# Generate deterministic Qdrant ID (Prefix + DB ID + Model) |
|
||||
hash_id = hashlib.md5(f"{data_type}_{row.id}_{active_collection_name}".encode()).hexdigest() |
|
||||
qdrant_id = str(uuid.UUID(hash_id)) |
|
||||
|
|
||||
# Insert into Qdrant |
|
||||
# 🟢 THE FIX: Wrap the text in an Agno Document and use upsert() |
|
||||
doc = Document(id=qdrant_id, content=content) |
|
||||
vector_db.upsert(content_hash=qdrant_id, documents=[doc]) |
|
||||
|
|
||||
# 🟢 Update the JSON array in PostgreSQL |
|
||||
update_query = text(f""" |
|
||||
UPDATE {table_name} |
|
||||
SET embedded_in = CAST(embedded_in AS jsonb) || CAST(:model_json AS jsonb) |
|
||||
WHERE id = :id |
|
||||
""") |
|
||||
conn.execute(update_query, {"model_json": model_json_str, "id": row.id}) |
|
||||
conn.commit() # Commit row-by-row or batch to ensure state is saved |
|
||||
|
|
||||
processed += 1 |
|
||||
|
|
||||
# Update progress every 5 items |
|
||||
if processed % 5 == 0 or processed == total_pending: |
|
||||
pct = int((processed / total_pending) * 100) |
|
||||
conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"), |
|
||||
{"p": pct, "proc": processed, "id": session_id}) |
|
||||
conn.commit() |
|
||||
|
|
||||
# 4. Mark Completed |
|
||||
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED' WHERE id=:id"), {"id": session_id}) |
|
||||
conn.commit() |
|
||||
|
|
||||
except Exception as e: |
|
||||
conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"), |
|
||||
{"err": str(e), "id": session_id}) |
|
||||
conn.commit() |
|
||||
@ -0,0 +1,177 @@ |
|||||
|
import json |
||||
|
import hashlib |
||||
|
import uuid |
||||
|
import requests |
||||
|
import os |
||||
|
from sqlalchemy import text |
||||
|
from dotenv import load_dotenv |
||||
|
|
||||
|
from src.utils.load_settings import engine |
||||
|
from src.knowledge.embedding_factory import EmbeddingFactory |
||||
|
from src.knowledge.vector_store import get_qdrant_store |
||||
|
from agno.knowledge.document import Document |
||||
|
|
||||
|
load_dotenv() |
||||
|
JINA_API_KEY = os.getenv("JINA_API_KEY") |
||||
|
|
||||
|
def get_text_from_json(json_data, target_lang='fa'): |
||||
|
"""Helper to safely extract exactly the target text from Django JSONField arrays.""" |
||||
|
if not json_data or not isinstance(json_data, list): |
||||
|
return "Unknown" |
||||
|
|
||||
|
for entry in json_data: |
||||
|
if isinstance(entry, dict) and entry.get('language_code') == target_lang: |
||||
|
return entry.get('text', 'Unknown') |
||||
|
|
||||
|
if len(json_data) > 0 and isinstance(json_data[0], dict): |
||||
|
return json_data[0].get('text', 'Unknown') |
||||
|
|
||||
|
return "Unknown" |
||||
|
|
||||
|
def convert_html_to_md_jina(html_content: str, row_id: int) -> str: |
||||
|
"""Helper to call Jina AI and convert HTML to clean Markdown.""" |
||||
|
headers = { |
||||
|
"Authorization": f"Bearer {JINA_API_KEY}", |
||||
|
"Accept": "application/json" |
||||
|
} |
||||
|
files = { |
||||
|
'file': (f'document_{row_id}.html', html_content, 'text/html') |
||||
|
} |
||||
|
try: |
||||
|
response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30) |
||||
|
response.raise_for_status() |
||||
|
jina_data = response.json().get('data', {}) |
||||
|
return jina_data.get('content', '') |
||||
|
except Exception as e: |
||||
|
print(f"⚠️ [Jina Error] ID {row_id}: {e}") |
||||
|
return "" |
||||
|
|
||||
|
|
||||
|
def run_wiki_embedding_sync(session_id: int): |
||||
|
"""Background task to sync Wiki content -> Jina -> Qdrant -> Postgres""" |
||||
|
|
||||
|
# 1. Initialize Vector DB |
||||
|
embed_factory = EmbeddingFactory() |
||||
|
embedder = embed_factory.get_embedder() |
||||
|
vector_db = get_qdrant_store(embedder=embedder) |
||||
|
active_collection_name = vector_db.collection |
||||
|
|
||||
|
model_json_str = json.dumps([active_collection_name]) |
||||
|
print(f"🚀 Starting Wiki Sync Pipeline for model: {active_collection_name}") |
||||
|
|
||||
|
with engine.connect() as conn: |
||||
|
try: |
||||
|
# 2. Count pending Wiki items |
||||
|
count_query = text(""" |
||||
|
SELECT COUNT(*) FROM wiki_wikicontent |
||||
|
WHERE NOT (CAST(COALESCE(embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb)) |
||||
|
""") |
||||
|
total_pending = conn.execute(count_query, {"model_json": model_json_str}).scalar() |
||||
|
|
||||
|
# Update session to PROCESSING |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"), |
||||
|
{"t": total_pending, "id": session_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
if total_pending == 0: |
||||
|
print("✅ No new Wiki entries to process.") |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) |
||||
|
conn.commit() |
||||
|
return |
||||
|
|
||||
|
# 3. Fetch relational data for pending items |
||||
|
print(f"📥 Fetching {total_pending} pending Wiki entries from Database...") |
||||
|
fetch_query = text(""" |
||||
|
SELECT |
||||
|
wc.id as content_id, |
||||
|
wc.wiki_id, |
||||
|
wc.language_id as lang_code, |
||||
|
wc.content as html_content, |
||||
|
w.title as wiki_titles, |
||||
|
a.id as author_id, |
||||
|
a.name as author_names, |
||||
|
c.id as category_id, |
||||
|
c.name as cat_names |
||||
|
FROM wiki_wikicontent wc |
||||
|
JOIN wiki_wiki w ON wc.wiki_id = w.id |
||||
|
LEFT JOIN wiki_author a ON w.author_id = a.id |
||||
|
JOIN wiki_wikicategory c ON w.category_id = c.id |
||||
|
WHERE NOT (CAST(COALESCE(wc.embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb)) |
||||
|
""") |
||||
|
|
||||
|
pending_rows = conn.execute(fetch_query, {"model_json": model_json_str}).fetchall() |
||||
|
|
||||
|
processed = 0 |
||||
|
|
||||
|
# 4. Process each row sequentially |
||||
|
for row in pending_rows: |
||||
|
content_id = row.content_id |
||||
|
|
||||
|
# A. Extract Translations |
||||
|
wiki_title = get_text_from_json(row.wiki_titles, 'fa') |
||||
|
author_name = get_text_from_json(row.author_names, 'fa') |
||||
|
category_name = get_text_from_json(row.cat_names, 'fa') |
||||
|
|
||||
|
# B. Convert HTML to MD via Jina API |
||||
|
clean_text = convert_html_to_md_jina(row.html_content, content_id) |
||||
|
|
||||
|
if not clean_text: |
||||
|
print(f"⏭️ Skipping ID {content_id} due to empty Jina response.") |
||||
|
continue |
||||
|
|
||||
|
# C. Build the Rich Agno Document |
||||
|
narrative_text = ( |
||||
|
f"CATEGORY: {category_name}\n" |
||||
|
f"WIKI TITLE: {wiki_title}\n" |
||||
|
f"AUTHOR: {author_name}\n" |
||||
|
f"CONTENT:\n{clean_text}" |
||||
|
) |
||||
|
|
||||
|
payload = { |
||||
|
"source_type": "WIKI", |
||||
|
"content_id": content_id, |
||||
|
"wiki_id": row.wiki_id, |
||||
|
"wiki_title": wiki_title, |
||||
|
"category_id": row.category_id, |
||||
|
"category_name": category_name, |
||||
|
"author_id": row.author_id, |
||||
|
"author_name": author_name, |
||||
|
"language": row.lang_code |
||||
|
} |
||||
|
|
||||
|
hash_id = hashlib.md5(f"WIKI_{content_id}_{active_collection_name}".encode()).hexdigest() |
||||
|
qdrant_id = str(uuid.UUID(hash_id)) |
||||
|
|
||||
|
doc = Document(id=qdrant_id, content=narrative_text, meta_data=payload) |
||||
|
|
||||
|
# D. Upsert to Qdrant |
||||
|
vector_db.upsert(content_hash=qdrant_id, documents=[doc]) |
||||
|
|
||||
|
# E. Update PostgreSQL `embedded_in` column for this row |
||||
|
update_query = text(""" |
||||
|
UPDATE wiki_wikicontent |
||||
|
SET embedded_in = CAST(COALESCE(embedded_in, '[]') AS jsonb) || CAST(:model_json AS jsonb) |
||||
|
WHERE id = :id |
||||
|
""") |
||||
|
conn.execute(update_query, {"model_json": model_json_str, "id": content_id}) |
||||
|
conn.commit() |
||||
|
|
||||
|
# F. Update overall progress |
||||
|
processed += 1 |
||||
|
if processed % 5 == 0 or processed == total_pending: |
||||
|
pct = int((processed / total_pending) * 100) |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"), |
||||
|
{"p": pct, "proc": processed, "id": session_id}) |
||||
|
conn.commit() |
||||
|
print(f"🔄 Progress: {pct}% ({processed}/{total_pending})") |
||||
|
|
||||
|
# 5. Mark Session as Completed |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) |
||||
|
conn.commit() |
||||
|
print("🎉 Wiki Embedding Sync Completed Successfully!") |
||||
|
|
||||
|
except Exception as e: |
||||
|
print(f"❌ Fatal Error in Wiki Sync: {str(e)}") |
||||
|
conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"), |
||||
|
{"err": str(e), "id": session_id}) |
||||
|
conn.commit() |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue