diff --git a/.gitignore b/.gitignore index 9119059..4fbd558 100644 --- a/.gitignore +++ b/.gitignore @@ -159,4 +159,5 @@ langfuse_test/ # Node modules (if any frontend components) node_modules/ -meow.txt \ No newline at end of file +meow.txt +wiki_lang69_cleaned_final_final.xlsx \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile index 3e1899e..80b369d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,34 +1,34 @@ -// pipeline { -// environment { -// develop_server_ip = '' -// develop_server_name = '' -// production_server_ip = "88.99.212.243" -// production_server_name = "newhorizon_germany_001_server" -// project_path = "/projects/dovodi/agent" -// version = "master" -// gitBranch = "origin/master" -// } -// agent any -// stages { -// stage('deploy'){ -// steps{ -// script{ -// if(gitBranch=="origin/master"){ -// withCredentials([usernamePassword(credentialsId: production_server_name, usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) { -// sh 'sshpass -p $PASSWORD ssh -p 1782 $USERNAME@$production_server_ip -o StrictHostKeyChecking=no "cd $project_path && ./runner.sh"' +pipeline { + environment { + develop_server_ip = '' + develop_server_name = '' + production_server_ip = "88.99.212.243" + production_server_name = "newhorizon_germany_001_server" + project_path = "/projects/imam-reza-shrine/imam_reza_shrine_agent + version = "master" + gitBranch = "origin/master" + } + agent any + stages { + stage('deploy'){ + steps{ + script{ + if(gitBranch=="origin/master"){ + withCredentials([usernamePassword(credentialsId: production_server_name, usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) { + sh 'sshpass -p $PASSWORD ssh -p 1782 $USERNAME@$production_server_ip -o StrictHostKeyChecking=no "cd $project_path && ./runner.sh"' -// def lastCommit = sh(script: 'git log -1 --pretty=format:"%h - %s (%an)"', returnStdout: true).trim() -// sh """ -// curl -F chat_id=1457670318 \ -// -F message_thread_id=6 \ -// -F document=@/var/jenkins_home/jobs/${env.JOB_NAME}/builds/${env.BUILD_NUMBER}/log \ -// -F caption='Project name: #${env.JOB_NAME} \nBuild status is ${currentBuild.currentResult} \nBuild url: ${BUILD_URL} \nLast Commit: ${lastCommit}' \ -// https://api.telegram.org/bot7207581748:AAFeymryw7S44D86LYfWqYK-tSNeV3TOwBs/sendDocument -// """ -// } -// } -// } -// } -// } -// } -// } + def lastCommit = sh(script: 'git log -1 --pretty=format:"%h - %s (%an)"', returnStdout: true).trim() + sh """ + curl -F chat_id=1457670318 \ + -F message_thread_id=6 \ + -F document=@/var/jenkins_home/jobs/${env.JOB_NAME}/builds/${env.BUILD_NUMBER}/log \ + -F caption='Project name: #${env.JOB_NAME} \nBuild status is ${currentBuild.currentResult} \nBuild url: ${BUILD_URL} \nLast Commit: ${lastCommit}' \ + https://api.telegram.org/bot7207581748:AAFeymryw7S44D86LYfWqYK-tSNeV3TOwBs/sendDocument + """ + } + } + } + } + } + } +} diff --git a/config/production.env b/config/production.env index 726933c..4db80ab 100644 --- a/config/production.env +++ b/config/production.env @@ -1,4 +1,4 @@ -# Production Environment Configuration +# Development Environment Configuration # Application Settings DEBUG_MODE=false @@ -11,8 +11,8 @@ API_URL=https://gpt.nwhco.ir OPENROUTER_BASE_URL=https://openrouter.ai/api/v1 # API KEYS -MEGALLM_API_KEY=sk-mega-7bc75715897fcb91a7965f0d32347d44bc4bbd7f75225d7ca4c775059576843e OPENROUTER_API_KEY=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 +MEGALLM_API_KEY=sk-mega-7bc75715897fcb91a7965f0d32347d44bc4bbd7f75225d7ca4c775059576843e # ---------------- Vector DB Settings ---------------- QDRANT_HOST=88.99.212.243 @@ -20,24 +20,22 @@ QDRANT_PORT=6333 QDRANT_API_KEY=e9432295b3541bb2d50593zbsacas222xzk # ---------------- Database Settings ---------------- -DB_USER=pg-user -DB_NAME=imam-javad -DB_PASSWORD=f1hd484fgsfddsdaf5@4d392js1jnx92 -DB_PORT=5575 +DB_USER=us_imamreza +DB_NAME=imamreza +DB_PASSWORD=pdhd48ascfadsdaf5@4df8zascjas2je2sr3 +DB_PORT=5579 DB_HOST=88.99.212.243 - # ---------------- Enbeddings Settings ---------------- -BASE_COLLECTION_NAME=dovoodi_collection +BASE_COLLECTION_NAME=imam_reza_collection EMBEDDER_MODEL=all-MiniLM-L6-v2 RERANKER_MODEL=jina-reranker-v3 EMBEDDER_DIMENSIONS=384 -JINA_API_KEY=jina_04dfa26cdf724e2dacee1be256f93afbWBUebOzdFcDBWErlq16xwWQVwkOM +JINA_API_KEY=jina_acfe129adad644c494e085b386736d72kXVyA_ZdcbHq6nRb1GrHPkqb8bw9 OPENAI_API_KEY=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 - # ---------------- LANGFUSE Settings ---------------- -LANGFUSE_SECRET_KEY=sk-lf-a0ffa718-1de5-42e5-bebc-b7cc59fc1d70 -LANGFUSE_PUBLIC_KEY=pk-lf-9d769bac-1443-439f-9398-4384768614eb -LANGFUSE_BASE_URL=http://langfuse-server:3000 -LANGFUSE_DB_URL=postgresql://${DB_USER}:f1hd484fgsfddsdaf5%404d392js1jnx92@${DB_HOST}:${DB_PORT}/langfuse +LANGFUSE_PUBLIC_KEY=pk-lf-e265cd5f-b232-4162-88ff-451beb605c2c +LANGFUSE_SECRET_KEY=sk-lf-1205de1a-b47c-4b0c-a707-6c6b87aef5e2 +LANGFUSE_HOST=http://localhost:3000 +LANGFUSE_DB_URL=postgresql://${DB_USER}:pdhd48ascfadsdaf5%404df8zascjas2je2sr3@${DB_HOST}:${DB_PORT}/langfuse \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index f5e0439..03e6306 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,13 +10,13 @@ services: env_file: - config/production.env ports: - - "8098:8081" + - "9005:8081" restart: unless-stopped - networks: - - imam-javad_backend_imam-javad # Use the reference name defined below - dns: - - 8.8.8.8 - - 1.1.1.1 + # networks: + # - imam-javad_backend_imam-javad # Use the reference name defined below + # dns: + # - 8.8.8.8 + # - 1.1.1.1 # ⚠️ IMPORTANT ARCHITECTURE NOTE # @@ -38,9 +38,9 @@ services: # networks: # - imam-javad_backend_imam-javad -networks: - imam-javad_backend_imam-javad: - external: true +# networks: +# imam-javad_backend_imam-javad: +# external: true # volumes: # qdrant_storage: \ No newline at end of file diff --git a/scripts/embed_excel_wiki.py b/scripts/embed_excel_wiki.py new file mode 100644 index 0000000..2ae7d58 --- /dev/null +++ b/scripts/embed_excel_wiki.py @@ -0,0 +1,178 @@ +# src/scripts/embed_excel_to_qdrant.py +import hashlib +import uuid +import pandas as pd +from sqlalchemy import text +from agno.knowledge.document import Document +import os +from urllib.parse import quote_plus +from sqlalchemy import create_engine +from dotenv import load_dotenv + +# Import your custom Qdrant and Embedding factories +from src.knowledge.embedding_factory import EmbeddingFactory +from src.knowledge.vector_store import get_qdrant_store + +load_dotenv() + +# --- Database Setup --- +db_user = quote_plus(os.getenv("DB_USER")) +db_pass = quote_plus(os.getenv("DB_PASSWORD")) +db_host = os.getenv("DB_HOST") +db_port = os.getenv("DB_PORT") +db_name = os.getenv("DB_NAME") +db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}" + +engine = create_engine(db_url) + +# Read the file that the recovery script just finished +EXCEL_FILE = "wiki_lang69_cleaned_final.xlsx" +BATCH_SIZE = 100 # Number of vectors to send to Qdrant at once + +def get_text_from_json(json_data, target_lang='fa'): + """Helper to safely extract exactly the Persian text from the JSON arrays.""" + if not json_data or not isinstance(json_data, list): + return "Unknown" + + # 1. Strictly look for the requested language ('fa') + for entry in json_data: + if isinstance(entry, dict) and entry.get('language_code') == target_lang: + return entry.get('text', 'Unknown') + + # 2. Fallback: If no Persian translation exists, grab the first available language + if len(json_data) > 0 and isinstance(json_data[0], dict): + return json_data[0].get('text', 'Unknown') + + return "Unknown" + +def run_qdrant_ingestion(): + print(f"πŸ“‚ Loading cleaned text from {EXCEL_FILE}...") + try: + df = pd.read_excel(EXCEL_FILE) + except FileNotFoundError: + print(f"❌ Could not find {EXCEL_FILE}.") + return + + # 1. Filter out any rows that still have errors or empty text + valid_df = df[df['error'].isna() & df['clean_text'].notna() & (df['clean_text'] != '')] + valid_ids = valid_df['id'].tolist() + + total_docs = len(valid_ids) + print(f"πŸ“Š Found {total_docs} valid texts ready for embedding.") + + if total_docs == 0: + return + + # 2. Fetch all relational metadata from PostgreSQL + print("πŸ“₯ Fetching relational metadata (Titles, Authors, Categories) from Database...") + + query = text(""" + SELECT + wc.id as content_id, + wc.wiki_id, + wc.language as lang_code, + w.title as wiki_titles, + a.id as author_id, + a.name as author_names, + c.id as category_id, + c.name as cat_names + FROM wiki_wikicontent wc + JOIN wiki_wiki w ON wc.wiki_id = w.id + LEFT JOIN wiki_author a ON w.author_id = a.id + JOIN wiki_wikicategory c ON w.category_id = c.id + WHERE wc.id IN :ids + """) + + with engine.connect() as conn: + metadata_rows = conn.execute(query, {"ids": tuple(valid_ids)}).fetchall() + + metadata_lookup = {row.content_id: row for row in metadata_rows} + + # 3. Initialize Qdrant + print("πŸ€– Initializing Embedding Model and Qdrant Connection...") + embed_factory = EmbeddingFactory() + embedder = embed_factory.get_embedder() + vector_db = get_qdrant_store(embedder=embedder) + active_collection = vector_db.collection + + # We need to track the model name to update the database later + model_json_str = f'["{active_collection}"]' + + documents_to_upsert = [] + processed_count = 0 + + print(f"πŸš€ Starting batch embedding into collection: {active_collection}") + + # 4. Process valid Excel rows and build Agno Documents + for _, row in valid_df.iterrows(): + content_id = row['id'] + clean_text = row['clean_text'] + + db_meta = metadata_lookup.get(content_id) + if not db_meta: + continue + + # Extract strictly Persian strings + wiki_title = get_text_from_json(db_meta.wiki_titles, 'fa') + author_name = get_text_from_json(db_meta.author_names, 'fa') + category_name = get_text_from_json(db_meta.cat_names, 'fa') + + # Part A: The Narrative String + narrative_text = ( + f"CATEGORY: {category_name}\n" + f"WIKI TITLE: {wiki_title}\n" + f"AUTHOR: {author_name}\n" + f"CONTENT:\n{clean_text}" + ) + + # Part B: The Strict Payload + payload = { + "source_type": "WIKI", + "content_id": content_id, + "wiki_id": db_meta.wiki_id, + "wiki_title": wiki_title, + "category_id": db_meta.category_id, + "category_name": category_name, + "author_id": db_meta.author_id, # Will be None if missing, which is fine + "author_name": author_name, + "language": db_meta.lang_code + } + + # Deterministic Hash ID + hash_id = hashlib.md5(f"WIKI_{content_id}_{active_collection}".encode()).hexdigest() + qdrant_id = str(uuid.UUID(hash_id)) + + doc = Document( + id=qdrant_id, + content=narrative_text, + meta_data=payload + ) + documents_to_upsert.append(doc) + + # 5. Batch Upsert to Qdrant + if len(documents_to_upsert) >= BATCH_SIZE: + vector_db.upsert(documents=documents_to_upsert) + processed_count += len(documents_to_upsert) + print(f"βœ… Embedded {processed_count}/{total_docs} vectors...") + documents_to_upsert = [] + + # Flush final partial batch + if documents_to_upsert: + vector_db.upsert(documents=documents_to_upsert) + processed_count += len(documents_to_upsert) + print(f"βœ… Embedded {processed_count}/{total_docs} vectors...") + + # 6. Mark as synced in PostgreSQL + print("πŸ”„ Updating PostgreSQL to mark items as embedded...") + with engine.begin() as conn: # Use .begin() for automatic transaction commit + update_sql = text(""" + UPDATE wiki_wikicontent + SET embedded_in = CAST(COALESCE(embedded_in, '[]') AS jsonb) || CAST(:model_json AS jsonb) + WHERE id IN :ids AND NOT (COALESCE(embedded_in, '[]')::jsonb @> :model_json::jsonb) + """) + conn.execute(update_sql, {"model_json": model_json_str, "ids": tuple(valid_ids)}) + + print("πŸŽ‰ All documents successfully embedded and database updated!") + +if __name__ == "__main__": + run_qdrant_ingestion() \ No newline at end of file diff --git a/src/agents/base_agent.py b/src/agents/base_agent.py index 9e81d2e..4fa6f72 100644 --- a/src/agents/base_agent.py +++ b/src/agents/base_agent.py @@ -29,7 +29,7 @@ class IslamicScholarAgent: self.knowledge_base = knowledge_base self.db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}" self.culture_manager = get_culture_manager() - self.custom_instructions = custom_instructions or default_system_prompt() + self.custom_instructions =default_system_prompt() print(f"*******DB struct: {custom_instructions}") diff --git a/src/utils/load_settings.py b/src/utils/load_settings.py index c192f40..3bc309a 100644 --- a/src/utils/load_settings.py +++ b/src/utils/load_settings.py @@ -55,22 +55,32 @@ def get_active_agent_config(retries=3, delay=1): def default_system_prompt(): return [ - "You are a strict Islamic Knowledge Assistant.", - "Your Goal: Answer the user's question using the provided 'Context from the database'.", + "You are a humble, polite, and knowledgeable Khadem (Servant) of the Holy Shrine of Ali ibn Musa al-Reza (peace be upon him) in Mashhad. Your purpose is to welcome, guide, and serve the pilgrims (Zaer) and visitors who seek knowledge about the Shrine, its history, its categories, and its authors.", + "You do not possess personal ego. You consider serving the pilgrims of Imam Reza (A.S.) to be your greatest honor. You embody the egalitarian spirit of the Shrine, treating every visitorβ€”regardless of their backgroundβ€”with equal dignity, warmth, and respect.", - "STRICT BEHAVIORAL RULE: You must maintain the highest standard of Adab (Etiquette).", - "If the user is disrespectful, vulgar, uses profanity, or mocks Islam:", - "1. Do NOT engage with the toxicity.", - "2. Do NOT lecture them.", - "3. Refuse to answer immediately by saying: 'I cannot answer this due to violations of Adab.'", - - # --- CRITICAL FIXES --- - "If the Context is in a different language than the User's Question, you MUST translate the relevant information into the User's language.", - "Do NOT worry if the context source language (e.g., Russian/Arabic) does not match the user's language (e.g., English). Translate the meaning accurately.", - # ---------------------- + # TONE AND DEMEANOR + '''- **Greeting:** Always greet the user respectfully. Use traditional Islamic and Persian expressions of courtesy where appropriate, such as "Salaam" (Peace be upon you), "Dear Pilgrim", "Brother/Sister", or "May your pilgrimage be accepted" (Ziyarat Qabul). + - **Humility:** Speak softly, calmly, and with profound respect. Use phrases like "I am at your service," "It is my honor to assist you," or "Please allow me to share what is written." + - **Empathy:** If a user expresses distress or seeks spiritual comfort, respond with deep empathy and gentle reassurance, reflecting the peaceful atmosphere of the Holy Shrine. + ''', + + # KNOWLEDGE AND RAG CONSTRAINTS (CRITICAL) + '''Your knowledge is STRICTLY limited to the context provided to you through your Knowledge Base (the Wiki database). + 1. **Never Hallucinate:** You must never invent historical facts, architectural details, or religious rulings (Fatwas). + 2. **Handle Unknowns Gracefully:** If a pilgrim asks a question that is not covered in your provided context, you must humbly admit your limitation. + - *Example response:* "Forgive me, dear pilgrim, but my knowledge as a servant on this specific matter is limited. I can only guide you based on the texts entrusted to me, and I do not have that information at hand." + 3. **Use Metadata:** When answering questions about a Wiki entry, naturally weave in the Author or Category information if it adds value to the pilgrim's understanding (e.g., "According to the writings of [Author] in the [Category] section..."). + ''', + + # INTERACTION GUIDELINES + '''- **Clarity and Brevity:** While being polite, ensure your answers are clear, structured, and easy to read. Use bullet points if explaining a list of locations, historical facts, or etiquette rules. + - **Multilingual Grace:** If a user speaks to you in a different language (like Persian, Arabic, or English), seamlessly reply in that same language while maintaining the exact same respectful Khadem persona. + - **Avoid Vain Language:** Never use modern internet slang, casual colloquialisms, or aggressive phrasing. Maintain the sanctity and dignity of the holy space. + ''', - "If the answer is explicitly found in the context (even in another language), answer directly.", - "If the answer is NOT found in the context, strictly reply: 'Information not available in the knowledge base.'", - "Maintain a respectful, scholarly tone.", - "Do not explain your reasoning process in the final output.", + # GUARDRAILS AND SAFETY + '''- **No Sectarian Debates:** You are a servant of peace. If a user attempts to initiate a political, sectarian, or theological argument, politely decline to engage. + - *Example response:* "My duty is only to serve the guests of the Imam and share the history of this holy place. I must humbly step away from this debate." + - **No Direct Spiritual Authority:** Do not speak on behalf of the Imam or God. You are a guide sharing documented knowledge, not a divine authority. + ''', ] \ No newline at end of file diff --git a/src/utils/recovery_reformat.py b/src/utils/recovery_reformat.py new file mode 100644 index 0000000..19280c5 --- /dev/null +++ b/src/utils/recovery_reformat.py @@ -0,0 +1,144 @@ +# src/scripts/jina_recovery_processor.py +import time +import requests +import pandas as pd +from sqlalchemy import text +from concurrent.futures import ThreadPoolExecutor, as_completed +import os +from urllib.parse import quote_plus +from sqlalchemy import create_engine +from dotenv import load_dotenv +import threading + +load_dotenv() + +# --- Database Setup --- +db_user = quote_plus(os.getenv("DB_USER")) +db_pass = quote_plus(os.getenv("DB_PASSWORD")) +db_host = os.getenv("DB_HOST") +db_port = os.getenv("DB_PORT") +db_name = os.getenv("DB_NAME") +db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}" + +engine = create_engine( + db_url, + pool_pre_ping=True, + pool_recycle=3600, + pool_size=10, + max_overflow=20 +) + +JINA_API_KEY = os.getenv("JINA_API_KEY") + +# --- Settings --- +EXCEL_FILE = "wiki_lang69_cleaned_final.xlsx" +OUTPUT_FILE = "wiki_lang69_cleaned_final_final.xlsx" # We save to a new file to be 100% safe +MAX_WORKERS = 5 +RATE_LIMIT_DELAY = 0.42 + +counter_lock = threading.Lock() +completed_tasks = 0 +total_tasks = 0 + +def process_html(row): + """Worker function to retry the API call""" + global completed_tasks, total_tasks + + headers = { + "Authorization": f"Bearer {JINA_API_KEY}", + "Accept": "application/json" + } + + files = { + 'file': (f'document_{row.id}.html', row.content, 'text/html') + } + + result_data = None + try: + response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30) + response.raise_for_status() + + jina_data = response.json().get('data', {}) + result_data = { + 'id': row.id, + 'clean_text': jina_data.get('content', '') + } + except Exception as e: + error_msg = str(e) + if hasattr(e, 'response') and e.response is not None: + error_msg += f" | {e.response.text}" + + result_data = { + 'id': row.id, + 'clean_text': '', + 'error': error_msg + } + + with counter_lock: + completed_tasks += 1 + if 'error' in result_data: + print(f"⚠️ [Error] [{completed_tasks}/{total_tasks}] ID {result_data['id']}: {result_data['error']}") + else: + print(f"βœ… [{completed_tasks}/{total_tasks}] Successfully recovered ID {result_data['id']}") + + return result_data + +def run_recovery(): + global total_tasks + + print(f"πŸ“‚ Loading {EXCEL_FILE}...") + try: + df = pd.read_excel(EXCEL_FILE) + except FileNotFoundError: + print(f"❌ Could not find {EXCEL_FILE}. Make sure the name is correct!") + return + + # 1. Identify which rows failed or are missing text + failed_mask = df['error'].notna() | df['clean_text'].isna() | (df['clean_text'] == '') + failed_ids = df[failed_mask]['id'].tolist() + + total_tasks = len(failed_ids) + print(f"πŸ“Š Found {total_tasks} failed rows to recover.") + + if total_tasks == 0: + print("βœ… Everything is already successfully processed!") + return + + # 2. Fetch ONLY the failed HTML contents from the database + print("πŸ“₯ Fetching pending HTML content from database...") + ids_string = ','.join(map(str, failed_ids)) + + with engine.connect() as conn: + query = text(f"SELECT id, content FROM wiki_wikicontent WHERE id IN ({ids_string})") + rows = conn.execute(query).fetchall() + + print(f"πŸš€ Starting Multi-threaded RECOVERY (Speed: ~142 req/min)...") + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_row = {} + + # Dispatch tasks + for row in rows: + future = executor.submit(process_html, row) + future_to_row[future] = row + time.sleep(RATE_LIMIT_DELAY) + + # 3. Process results and dynamically update the Pandas DataFrame + for future in as_completed(future_to_row): + result = future.result() + + if 'error' in result: + # Update the specific row with the new error + df.loc[df['id'] == result['id'], 'error'] = result['error'] + df.loc[df['id'] == result['id'], 'clean_text'] = '' + else: + # Inject the clean text and wipe the error completely! + df.loc[df['id'] == result['id'], 'clean_text'] = result['clean_text'] + df.loc[df['id'] == result['id'], 'error'] = None + + # 4. Save the updated data + df.to_excel(OUTPUT_FILE, index=False) + print(f"πŸŽ‰ Recovery Complete! Saved updated data to {OUTPUT_FILE}") + +if __name__ == "__main__": + run_recovery() \ No newline at end of file diff --git a/src/utils/reformat_wiki.py b/src/utils/reformat_wiki.py new file mode 100644 index 0000000..7a56f95 --- /dev/null +++ b/src/utils/reformat_wiki.py @@ -0,0 +1,139 @@ +# src/scripts/jina_batch_processor.py +import time +import requests +import pandas as pd +from sqlalchemy import text +from concurrent.futures import ThreadPoolExecutor, as_completed +import os +from urllib.parse import quote_plus +from sqlalchemy import create_engine +from dotenv import load_dotenv +import threading # 🟒 NEW: Needed for the thread-safe counter + +load_dotenv() + +db_user = os.getenv("DB_USER") +db_pass = os.getenv("DB_PASSWORD") +db_host = os.getenv("DB_HOST") +db_port = os.getenv("DB_PORT") +db_name = os.getenv("DB_NAME") + +db_pass = quote_plus(db_pass) +db_user = quote_plus(db_user) +db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}" + +engine = create_engine( + db_url, + pool_pre_ping=True, + pool_recycle=3600, + pool_size=10, + max_overflow=20 +) + +JINA_API_KEY = os.getenv("JINA_API_KEY") + +# βš™οΈ Concurrency & Rate Limit Settings +MAX_WORKERS = 5 +RATE_LIMIT_DELAY = 0.42 + +# 🟒 NEW: Global variables to track progress safely across threads +counter_lock = threading.Lock() +completed_tasks = 0 +total_tasks = 0 + +def process_html(row): + """Worker function that runs in parallel to process a single row""" + global completed_tasks, total_tasks + + headers = { + "Authorization": f"Bearer {JINA_API_KEY}", + "Accept": "application/json" + } + + files = { + 'file': (f'document_{row.id}.html', row.content, 'text/html') + } + + result_data = None + try: + response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30) + response.raise_for_status() + + jina_data = response.json().get('data', {}) + result_data = { + 'id': row.id, + 'clean_text': jina_data.get('content', '') + } + except Exception as e: + error_msg = str(e) + if hasattr(e, 'response') and e.response is not None: + error_msg += f" | {e.response.text}" + + result_data = { + 'id': row.id, + 'clean_text': '', + 'error': error_msg + } + + # 🟒 THE FIX: Safely update the counter and print instantly! + with counter_lock: + completed_tasks += 1 + if 'error' in result_data: + print(f"⚠️ [Error] [{completed_tasks}/{total_tasks}] ID {result_data['id']}: {result_data['error']}") + else: + print(f"βœ… [{completed_tasks}/{total_tasks}] Successfully processed ID {result_data['id']}") + + return result_data + +def run_threaded_extraction(): + global total_tasks + + print("πŸ“₯ Fetching Wiki HTML content (Language ID: 69)...") + with engine.connect() as conn: + query = text("SELECT id, content FROM wiki_wikicontent WHERE language_id = 69") + rows = conn.execute(query).fetchall() + + total_tasks = len(rows) + print(f"πŸ“Š Found {total_tasks} rows to process.") + + if total_tasks == 0: + return + + extracted_data = [] + print(f"πŸš€ Starting Multi-threaded processing (Speed: ~142 req/min)...") + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_row = {} + + # 1. THE DISPATCHER (Main Thread) + for idx, row in enumerate(rows): + future = executor.submit(process_html, row) + future_to_row[future] = row + + # 🟒 Optional: Print a single line that overwrites itself just so you know + # the dispatcher is actively looping in the background. + print(f"⏳ Dispatching task {idx + 1}/{total_tasks} to thread pool...", end="\r") + + # The throttle + time.sleep(RATE_LIMIT_DELAY) + + print("\n🏁 All tasks dispatched! Waiting for final threads to finish...") + + # 2. THE RECEIVER (Main Thread collecting the returns) + # We removed the print statement from here because the workers handle it now + for future in as_completed(future_to_row): + extracted_data.append(future.result()) + + # 3. EXPORT TO EXCEL + if extracted_data: + df = pd.DataFrame(extracted_data) + excel_filename = "wiki_lang69_cleaned.xlsx" + + if 'error' in df.columns: + df = df[['id', 'clean_text', 'error']] + + df.to_excel(excel_filename, index=False) + print(f"πŸŽ‰ Success! Saved {len(extracted_data)} rows to {excel_filename}") + +if __name__ == "__main__": + run_threaded_extraction() \ No newline at end of file