From 88787266c4b01afb67abfc77a0cfdedb3da194bf Mon Sep 17 00:00:00 2001 From: mohsentaba Date: Sun, 22 Feb 2026 12:03:11 +0330 Subject: [PATCH] Add new SQL file for wiki content retrieval and refactor API routes to implement a new background task for syncing Wiki data to Qdrant. Remove deprecated global embedding sync functionality and introduce a dedicated sync process for Wiki content, enhancing the overall data synchronization strategy. --- imam-reza.session.sql | 2 + src/api/routes.py | 15 ++-- src/knowledge/sync_rag.py | 108 ---------------------- src/knowledge/sync_wiki.py | 177 +++++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 114 deletions(-) create mode 100644 imam-reza.session.sql delete mode 100644 src/knowledge/sync_rag.py create mode 100644 src/knowledge/sync_wiki.py diff --git a/imam-reza.session.sql b/imam-reza.session.sql new file mode 100644 index 0000000..dd72e45 --- /dev/null +++ b/imam-reza.session.sql @@ -0,0 +1,2 @@ +select * from wiki_wikicontent +where language_id = 69; \ No newline at end of file diff --git a/src/api/routes.py b/src/api/routes.py index 75d3ee5..0d75f86 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -1,14 +1,17 @@ from fastapi import APIRouter, BackgroundTasks from pydantic import BaseModel -from src.knowledge.sync_rag import run_global_embedding_sync +from src.knowledge.sync_wiki import run_wiki_embedding_sync router = APIRouter() class SyncRequest(BaseModel): session_id: int -@router.post("/api/sync-knowledge") -async def sync_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): - # Pass the session ID to the background worker - background_tasks.add_task(run_global_embedding_sync, request.session_id) - return {"status": "started", "session_id": request.session_id} \ No newline at end of file +@router.post("/api/sync-wiki-knowledge") +async def sync_wiki_knowledge(request: SyncRequest, background_tasks: BackgroundTasks): + """ + Triggers the background task to fetch Wiki HTML, convert to MD via Jina, + embed in Qdrant, and update the database. + """ + background_tasks.add_task(run_wiki_embedding_sync, request.session_id) + return {"status": "started", "session_id": request.session_id, "type": "wiki_sync"} diff --git a/src/knowledge/sync_rag.py b/src/knowledge/sync_rag.py deleted file mode 100644 index ea7afc9..0000000 --- a/src/knowledge/sync_rag.py +++ /dev/null @@ -1,108 +0,0 @@ -# src/knowledge/rag_pipeline.py (FastAPI Agent) -import json -import hashlib -import uuid -from sqlalchemy import text -from src.utils.load_settings import engine -from src.knowledge.embedding_factory import EmbeddingFactory -from src.knowledge.vector_store import get_qdrant_store -from agno.knowledge.document import Document -def run_global_embedding_sync(session_id: int): - """Background task to selectively embed ALL missing data across tables""" - - # 1. Get the ACTIVE embedder from your own Agent config! - embed_factory = EmbeddingFactory() - embedder = embed_factory.get_embedder() # Uses default from yaml - vector_db = get_qdrant_store(embedder=embedder) - - active_collection_name = vector_db.collection - - print(f"🚀 Starting Global Sync for model: {active_collection_name}") - - # List of tables to process and their data type prefix - # Change 'myapp' to your actual Django app name! - tables_to_sync = [ - # {"table": "hadith_hadith", "type": "HADITH"}, - {"table": "article_article", "type": "ARTICLE"} - ] - - with engine.connect() as conn: - try: - # 2. Count TOTAL pending items across all tables - total_pending = 0 - model_json_str = json.dumps([active_collection_name]) - - for t in tables_to_sync: - count_query = text(f""" - SELECT COUNT(*) FROM {t['table']} - WHERE NOT (CAST(embedded_in AS jsonb) @> CAST(:model_json AS jsonb)) - """) - count = conn.execute(count_query, {"model_json": model_json_str}).scalar() - total_pending += count - - # Update session to PROCESSING - conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"), - {"t": total_pending, "id": session_id}) - conn.commit() - - if total_pending == 0: - conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) - conn.commit() - return - - # 3. Process each table - processed = 0 - - for t in tables_to_sync: - table_name = t['table'] - data_type = t['type'] - - # Fetch pending rows for this specific table - query = text(f""" - SELECT * FROM {table_name} - WHERE NOT (CAST(embedded_in AS jsonb) @> CAST(:model_json AS jsonb)) - """) - pending_rows = conn.execute(query, {"model_json": model_json_str}).fetchall() - - for row in pending_rows: - # Build dynamic content based on the table type - if data_type == "HADITH": - content = f"HADITH TYPE: HADITH\nTITLE: {row.title}\nARABIC: {row.arabic_text}\nTRANSLATION: {row.translation}\nSOURCE: {row.source_info}" - elif data_type == "ARTICLE": - content = f"ARTICLE TYPE: ARTICLE\nTITLE: {row.title}\nDESCRIPTION: {row.description}\nCONTENT: {row.content}" - - # Generate deterministic Qdrant ID (Prefix + DB ID + Model) - hash_id = hashlib.md5(f"{data_type}_{row.id}_{active_collection_name}".encode()).hexdigest() - qdrant_id = str(uuid.UUID(hash_id)) - - # Insert into Qdrant - # 🟢 THE FIX: Wrap the text in an Agno Document and use upsert() - doc = Document(id=qdrant_id, content=content) - vector_db.upsert(content_hash=qdrant_id, documents=[doc]) - - # 🟢 Update the JSON array in PostgreSQL - update_query = text(f""" - UPDATE {table_name} - SET embedded_in = CAST(embedded_in AS jsonb) || CAST(:model_json AS jsonb) - WHERE id = :id - """) - conn.execute(update_query, {"model_json": model_json_str, "id": row.id}) - conn.commit() # Commit row-by-row or batch to ensure state is saved - - processed += 1 - - # Update progress every 5 items - if processed % 5 == 0 or processed == total_pending: - pct = int((processed / total_pending) * 100) - conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"), - {"p": pct, "proc": processed, "id": session_id}) - conn.commit() - - # 4. Mark Completed - conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED' WHERE id=:id"), {"id": session_id}) - conn.commit() - - except Exception as e: - conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"), - {"err": str(e), "id": session_id}) - conn.commit() \ No newline at end of file diff --git a/src/knowledge/sync_wiki.py b/src/knowledge/sync_wiki.py new file mode 100644 index 0000000..658235d --- /dev/null +++ b/src/knowledge/sync_wiki.py @@ -0,0 +1,177 @@ +import json +import hashlib +import uuid +import requests +import os +from sqlalchemy import text +from dotenv import load_dotenv + +from src.utils.load_settings import engine +from src.knowledge.embedding_factory import EmbeddingFactory +from src.knowledge.vector_store import get_qdrant_store +from agno.knowledge.document import Document + +load_dotenv() +JINA_API_KEY = os.getenv("JINA_API_KEY") + +def get_text_from_json(json_data, target_lang='fa'): + """Helper to safely extract exactly the target text from Django JSONField arrays.""" + if not json_data or not isinstance(json_data, list): + return "Unknown" + + for entry in json_data: + if isinstance(entry, dict) and entry.get('language_code') == target_lang: + return entry.get('text', 'Unknown') + + if len(json_data) > 0 and isinstance(json_data[0], dict): + return json_data[0].get('text', 'Unknown') + + return "Unknown" + +def convert_html_to_md_jina(html_content: str, row_id: int) -> str: + """Helper to call Jina AI and convert HTML to clean Markdown.""" + headers = { + "Authorization": f"Bearer {JINA_API_KEY}", + "Accept": "application/json" + } + files = { + 'file': (f'document_{row_id}.html', html_content, 'text/html') + } + try: + response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30) + response.raise_for_status() + jina_data = response.json().get('data', {}) + return jina_data.get('content', '') + except Exception as e: + print(f"⚠️ [Jina Error] ID {row_id}: {e}") + return "" + + +def run_wiki_embedding_sync(session_id: int): + """Background task to sync Wiki content -> Jina -> Qdrant -> Postgres""" + + # 1. Initialize Vector DB + embed_factory = EmbeddingFactory() + embedder = embed_factory.get_embedder() + vector_db = get_qdrant_store(embedder=embedder) + active_collection_name = vector_db.collection + + model_json_str = json.dumps([active_collection_name]) + print(f"🚀 Starting Wiki Sync Pipeline for model: {active_collection_name}") + + with engine.connect() as conn: + try: + # 2. Count pending Wiki items + count_query = text(""" + SELECT COUNT(*) FROM wiki_wikicontent + WHERE NOT (CAST(COALESCE(embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb)) + """) + total_pending = conn.execute(count_query, {"model_json": model_json_str}).scalar() + + # Update session to PROCESSING + conn.execute(text("UPDATE agent_embeddingsession SET status='PROCESSING', total_items=:t WHERE id=:id"), + {"t": total_pending, "id": session_id}) + conn.commit() + + if total_pending == 0: + print("✅ No new Wiki entries to process.") + conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) + conn.commit() + return + + # 3. Fetch relational data for pending items + print(f"📥 Fetching {total_pending} pending Wiki entries from Database...") + fetch_query = text(""" + SELECT + wc.id as content_id, + wc.wiki_id, + wc.language_id as lang_code, + wc.content as html_content, + w.title as wiki_titles, + a.id as author_id, + a.name as author_names, + c.id as category_id, + c.name as cat_names + FROM wiki_wikicontent wc + JOIN wiki_wiki w ON wc.wiki_id = w.id + LEFT JOIN wiki_author a ON w.author_id = a.id + JOIN wiki_wikicategory c ON w.category_id = c.id + WHERE NOT (CAST(COALESCE(wc.embedded_in, '[]') AS jsonb) @> CAST(:model_json AS jsonb)) + """) + + pending_rows = conn.execute(fetch_query, {"model_json": model_json_str}).fetchall() + + processed = 0 + + # 4. Process each row sequentially + for row in pending_rows: + content_id = row.content_id + + # A. Extract Translations + wiki_title = get_text_from_json(row.wiki_titles, 'fa') + author_name = get_text_from_json(row.author_names, 'fa') + category_name = get_text_from_json(row.cat_names, 'fa') + + # B. Convert HTML to MD via Jina API + clean_text = convert_html_to_md_jina(row.html_content, content_id) + + if not clean_text: + print(f"⏭️ Skipping ID {content_id} due to empty Jina response.") + continue + + # C. Build the Rich Agno Document + narrative_text = ( + f"CATEGORY: {category_name}\n" + f"WIKI TITLE: {wiki_title}\n" + f"AUTHOR: {author_name}\n" + f"CONTENT:\n{clean_text}" + ) + + payload = { + "source_type": "WIKI", + "content_id": content_id, + "wiki_id": row.wiki_id, + "wiki_title": wiki_title, + "category_id": row.category_id, + "category_name": category_name, + "author_id": row.author_id, + "author_name": author_name, + "language": row.lang_code + } + + hash_id = hashlib.md5(f"WIKI_{content_id}_{active_collection_name}".encode()).hexdigest() + qdrant_id = str(uuid.UUID(hash_id)) + + doc = Document(id=qdrant_id, content=narrative_text, meta_data=payload) + + # D. Upsert to Qdrant + vector_db.upsert(content_hash=qdrant_id, documents=[doc]) + + # E. Update PostgreSQL `embedded_in` column for this row + update_query = text(""" + UPDATE wiki_wikicontent + SET embedded_in = CAST(COALESCE(embedded_in, '[]') AS jsonb) || CAST(:model_json AS jsonb) + WHERE id = :id + """) + conn.execute(update_query, {"model_json": model_json_str, "id": content_id}) + conn.commit() + + # F. Update overall progress + processed += 1 + if processed % 5 == 0 or processed == total_pending: + pct = int((processed / total_pending) * 100) + conn.execute(text("UPDATE agent_embeddingsession SET progress=:p, processed_items=:proc WHERE id=:id"), + {"p": pct, "proc": processed, "id": session_id}) + conn.commit() + print(f"🔄 Progress: {pct}% ({processed}/{total_pending})") + + # 5. Mark Session as Completed + conn.execute(text("UPDATE agent_embeddingsession SET status='COMPLETED', progress=100 WHERE id=:id"), {"id": session_id}) + conn.commit() + print("🎉 Wiki Embedding Sync Completed Successfully!") + + except Exception as e: + print(f"❌ Fatal Error in Wiki Sync: {str(e)}") + conn.execute(text("UPDATE agent_embeddingsession SET status='FAILED', error_message=:err WHERE id=:id"), + {"err": str(e), "id": session_id}) + conn.commit() \ No newline at end of file