Browse Source

Update environment configurations, modify Docker settings, and add new scripts for data processing. Adjusted Jenkinsfile for deployment, updated .gitignore, and refined agent instructions for improved user interaction.

master
Mohsen Taba 1 week ago
parent
commit
38333e4452
  1. 1
      .gitignore
  2. 66
      Jenkinsfile
  3. 26
      config/production.env
  4. 18
      docker-compose.yml
  5. 178
      scripts/embed_excel_wiki.py
  6. 2
      src/agents/base_agent.py
  7. 40
      src/utils/load_settings.py
  8. 144
      src/utils/recovery_reformat.py
  9. 139
      src/utils/reformat_wiki.py

1
.gitignore

@ -160,3 +160,4 @@ langfuse_test/
# Node modules (if any frontend components)
node_modules/
meow.txt
wiki_lang69_cleaned_final_final.xlsx

66
Jenkinsfile

@ -1,34 +1,34 @@
// pipeline {
// environment {
// develop_server_ip = ''
// develop_server_name = ''
// production_server_ip = "88.99.212.243"
// production_server_name = "newhorizon_germany_001_server"
// project_path = "/projects/dovodi/agent"
// version = "master"
// gitBranch = "origin/master"
// }
// agent any
// stages {
// stage('deploy'){
// steps{
// script{
// if(gitBranch=="origin/master"){
// withCredentials([usernamePassword(credentialsId: production_server_name, usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
// sh 'sshpass -p $PASSWORD ssh -p 1782 $USERNAME@$production_server_ip -o StrictHostKeyChecking=no "cd $project_path && ./runner.sh"'
pipeline {
environment {
develop_server_ip = ''
develop_server_name = ''
production_server_ip = "88.99.212.243"
production_server_name = "newhorizon_germany_001_server"
project_path = "/projects/imam-reza-shrine/imam_reza_shrine_agent
version = "master"
gitBranch = "origin/master"
}
agent any
stages {
stage('deploy'){
steps{
script{
if(gitBranch=="origin/master"){
withCredentials([usernamePassword(credentialsId: production_server_name, usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
sh 'sshpass -p $PASSWORD ssh -p 1782 $USERNAME@$production_server_ip -o StrictHostKeyChecking=no "cd $project_path && ./runner.sh"'
// def lastCommit = sh(script: 'git log -1 --pretty=format:"%h - %s (%an)"', returnStdout: true).trim()
// sh """
// curl -F chat_id=1457670318 \
// -F message_thread_id=6 \
// -F document=@/var/jenkins_home/jobs/${env.JOB_NAME}/builds/${env.BUILD_NUMBER}/log \
// -F caption='Project name: #${env.JOB_NAME} \nBuild status is ${currentBuild.currentResult} \nBuild url: ${BUILD_URL} \nLast Commit: ${lastCommit}' \
// https://api.telegram.org/bot7207581748:AAFeymryw7S44D86LYfWqYK-tSNeV3TOwBs/sendDocument
// """
// }
// }
// }
// }
// }
// }
// }
def lastCommit = sh(script: 'git log -1 --pretty=format:"%h - %s (%an)"', returnStdout: true).trim()
sh """
curl -F chat_id=1457670318 \
-F message_thread_id=6 \
-F document=@/var/jenkins_home/jobs/${env.JOB_NAME}/builds/${env.BUILD_NUMBER}/log \
-F caption='Project name: #${env.JOB_NAME} \nBuild status is ${currentBuild.currentResult} \nBuild url: ${BUILD_URL} \nLast Commit: ${lastCommit}' \
https://api.telegram.org/bot7207581748:AAFeymryw7S44D86LYfWqYK-tSNeV3TOwBs/sendDocument
"""
}
}
}
}
}
}
}

26
config/production.env

@ -1,4 +1,4 @@
# Production Environment Configuration
# Development Environment Configuration
# Application Settings
DEBUG_MODE=false
@ -11,8 +11,8 @@ API_URL=https://gpt.nwhco.ir
OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
# API KEYS
MEGALLM_API_KEY=sk-mega-7bc75715897fcb91a7965f0d32347d44bc4bbd7f75225d7ca4c775059576843e
OPENROUTER_API_KEY=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
MEGALLM_API_KEY=sk-mega-7bc75715897fcb91a7965f0d32347d44bc4bbd7f75225d7ca4c775059576843e
# ---------------- Vector DB Settings ----------------
QDRANT_HOST=88.99.212.243
@ -20,24 +20,22 @@ QDRANT_PORT=6333
QDRANT_API_KEY=e9432295b3541bb2d50593zbsacas222xzk
# ---------------- Database Settings ----------------
DB_USER=pg-user
DB_NAME=imam-javad
DB_PASSWORD=f1hd484fgsfddsdaf5@4d392js1jnx92
DB_PORT=5575
DB_USER=us_imamreza
DB_NAME=imamreza
DB_PASSWORD=pdhd48ascfadsdaf5@4df8zascjas2je2sr3
DB_PORT=5579
DB_HOST=88.99.212.243
# ---------------- Enbeddings Settings ----------------
BASE_COLLECTION_NAME=dovoodi_collection
BASE_COLLECTION_NAME=imam_reza_collection
EMBEDDER_MODEL=all-MiniLM-L6-v2
RERANKER_MODEL=jina-reranker-v3
EMBEDDER_DIMENSIONS=384
JINA_API_KEY=jina_04dfa26cdf724e2dacee1be256f93afbWBUebOzdFcDBWErlq16xwWQVwkOM
JINA_API_KEY=jina_acfe129adad644c494e085b386736d72kXVyA_ZdcbHq6nRb1GrHPkqb8bw9
OPENAI_API_KEY=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
# ---------------- LANGFUSE Settings ----------------
LANGFUSE_SECRET_KEY=sk-lf-a0ffa718-1de5-42e5-bebc-b7cc59fc1d70
LANGFUSE_PUBLIC_KEY=pk-lf-9d769bac-1443-439f-9398-4384768614eb
LANGFUSE_BASE_URL=http://langfuse-server:3000
LANGFUSE_DB_URL=postgresql://${DB_USER}:f1hd484fgsfddsdaf5%404d392js1jnx92@${DB_HOST}:${DB_PORT}/langfuse
LANGFUSE_PUBLIC_KEY=pk-lf-e265cd5f-b232-4162-88ff-451beb605c2c
LANGFUSE_SECRET_KEY=sk-lf-1205de1a-b47c-4b0c-a707-6c6b87aef5e2
LANGFUSE_HOST=http://localhost:3000
LANGFUSE_DB_URL=postgresql://${DB_USER}:pdhd48ascfadsdaf5%404df8zascjas2je2sr3@${DB_HOST}:${DB_PORT}/langfuse

18
docker-compose.yml

@ -10,13 +10,13 @@ services:
env_file:
- config/production.env
ports:
- "8098:8081"
- "9005:8081"
restart: unless-stopped
networks:
- imam-javad_backend_imam-javad # Use the reference name defined below
dns:
- 8.8.8.8
- 1.1.1.1
# networks:
# - imam-javad_backend_imam-javad # Use the reference name defined below
# dns:
# - 8.8.8.8
# - 1.1.1.1
# ⚠️ IMPORTANT ARCHITECTURE NOTE
#
@ -38,9 +38,9 @@ services:
# networks:
# - imam-javad_backend_imam-javad
networks:
imam-javad_backend_imam-javad:
external: true
# networks:
# imam-javad_backend_imam-javad:
# external: true
# volumes:
# qdrant_storage:

178
scripts/embed_excel_wiki.py

@ -0,0 +1,178 @@
# src/scripts/embed_excel_to_qdrant.py
import hashlib
import uuid
import pandas as pd
from sqlalchemy import text
from agno.knowledge.document import Document
import os
from urllib.parse import quote_plus
from sqlalchemy import create_engine
from dotenv import load_dotenv
# Import your custom Qdrant and Embedding factories
from src.knowledge.embedding_factory import EmbeddingFactory
from src.knowledge.vector_store import get_qdrant_store
load_dotenv()
# --- Database Setup ---
db_user = quote_plus(os.getenv("DB_USER"))
db_pass = quote_plus(os.getenv("DB_PASSWORD"))
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")
db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
engine = create_engine(db_url)
# Read the file that the recovery script just finished
EXCEL_FILE = "wiki_lang69_cleaned_final.xlsx"
BATCH_SIZE = 100 # Number of vectors to send to Qdrant at once
def get_text_from_json(json_data, target_lang='fa'):
"""Helper to safely extract exactly the Persian text from the JSON arrays."""
if not json_data or not isinstance(json_data, list):
return "Unknown"
# 1. Strictly look for the requested language ('fa')
for entry in json_data:
if isinstance(entry, dict) and entry.get('language_code') == target_lang:
return entry.get('text', 'Unknown')
# 2. Fallback: If no Persian translation exists, grab the first available language
if len(json_data) > 0 and isinstance(json_data[0], dict):
return json_data[0].get('text', 'Unknown')
return "Unknown"
def run_qdrant_ingestion():
print(f"📂 Loading cleaned text from {EXCEL_FILE}...")
try:
df = pd.read_excel(EXCEL_FILE)
except FileNotFoundError:
print(f"❌ Could not find {EXCEL_FILE}.")
return
# 1. Filter out any rows that still have errors or empty text
valid_df = df[df['error'].isna() & df['clean_text'].notna() & (df['clean_text'] != '')]
valid_ids = valid_df['id'].tolist()
total_docs = len(valid_ids)
print(f"📊 Found {total_docs} valid texts ready for embedding.")
if total_docs == 0:
return
# 2. Fetch all relational metadata from PostgreSQL
print("📥 Fetching relational metadata (Titles, Authors, Categories) from Database...")
query = text("""
SELECT
wc.id as content_id,
wc.wiki_id,
wc.language as lang_code,
w.title as wiki_titles,
a.id as author_id,
a.name as author_names,
c.id as category_id,
c.name as cat_names
FROM wiki_wikicontent wc
JOIN wiki_wiki w ON wc.wiki_id = w.id
LEFT JOIN wiki_author a ON w.author_id = a.id
JOIN wiki_wikicategory c ON w.category_id = c.id
WHERE wc.id IN :ids
""")
with engine.connect() as conn:
metadata_rows = conn.execute(query, {"ids": tuple(valid_ids)}).fetchall()
metadata_lookup = {row.content_id: row for row in metadata_rows}
# 3. Initialize Qdrant
print("🤖 Initializing Embedding Model and Qdrant Connection...")
embed_factory = EmbeddingFactory()
embedder = embed_factory.get_embedder()
vector_db = get_qdrant_store(embedder=embedder)
active_collection = vector_db.collection
# We need to track the model name to update the database later
model_json_str = f'["{active_collection}"]'
documents_to_upsert = []
processed_count = 0
print(f"🚀 Starting batch embedding into collection: {active_collection}")
# 4. Process valid Excel rows and build Agno Documents
for _, row in valid_df.iterrows():
content_id = row['id']
clean_text = row['clean_text']
db_meta = metadata_lookup.get(content_id)
if not db_meta:
continue
# Extract strictly Persian strings
wiki_title = get_text_from_json(db_meta.wiki_titles, 'fa')
author_name = get_text_from_json(db_meta.author_names, 'fa')
category_name = get_text_from_json(db_meta.cat_names, 'fa')
# Part A: The Narrative String
narrative_text = (
f"CATEGORY: {category_name}\n"
f"WIKI TITLE: {wiki_title}\n"
f"AUTHOR: {author_name}\n"
f"CONTENT:\n{clean_text}"
)
# Part B: The Strict Payload
payload = {
"source_type": "WIKI",
"content_id": content_id,
"wiki_id": db_meta.wiki_id,
"wiki_title": wiki_title,
"category_id": db_meta.category_id,
"category_name": category_name,
"author_id": db_meta.author_id, # Will be None if missing, which is fine
"author_name": author_name,
"language": db_meta.lang_code
}
# Deterministic Hash ID
hash_id = hashlib.md5(f"WIKI_{content_id}_{active_collection}".encode()).hexdigest()
qdrant_id = str(uuid.UUID(hash_id))
doc = Document(
id=qdrant_id,
content=narrative_text,
meta_data=payload
)
documents_to_upsert.append(doc)
# 5. Batch Upsert to Qdrant
if len(documents_to_upsert) >= BATCH_SIZE:
vector_db.upsert(documents=documents_to_upsert)
processed_count += len(documents_to_upsert)
print(f"✅ Embedded {processed_count}/{total_docs} vectors...")
documents_to_upsert = []
# Flush final partial batch
if documents_to_upsert:
vector_db.upsert(documents=documents_to_upsert)
processed_count += len(documents_to_upsert)
print(f"✅ Embedded {processed_count}/{total_docs} vectors...")
# 6. Mark as synced in PostgreSQL
print("🔄 Updating PostgreSQL to mark items as embedded...")
with engine.begin() as conn: # Use .begin() for automatic transaction commit
update_sql = text("""
UPDATE wiki_wikicontent
SET embedded_in = CAST(COALESCE(embedded_in, '[]') AS jsonb) || CAST(:model_json AS jsonb)
WHERE id IN :ids AND NOT (COALESCE(embedded_in, '[]')::jsonb @> :model_json::jsonb)
""")
conn.execute(update_sql, {"model_json": model_json_str, "ids": tuple(valid_ids)})
print("🎉 All documents successfully embedded and database updated!")
if __name__ == "__main__":
run_qdrant_ingestion()

2
src/agents/base_agent.py

@ -29,7 +29,7 @@ class IslamicScholarAgent:
self.knowledge_base = knowledge_base
self.db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
self.culture_manager = get_culture_manager()
self.custom_instructions = custom_instructions or default_system_prompt()
self.custom_instructions =default_system_prompt()
print(f"*******DB struct: {custom_instructions}")

40
src/utils/load_settings.py

@ -55,22 +55,32 @@ def get_active_agent_config(retries=3, delay=1):
def default_system_prompt():
return [
"You are a strict Islamic Knowledge Assistant.",
"Your Goal: Answer the user's question using the provided 'Context from the database'.",
"You are a humble, polite, and knowledgeable Khadem (Servant) of the Holy Shrine of Ali ibn Musa al-Reza (peace be upon him) in Mashhad. Your purpose is to welcome, guide, and serve the pilgrims (Zaer) and visitors who seek knowledge about the Shrine, its history, its categories, and its authors.",
"You do not possess personal ego. You consider serving the pilgrims of Imam Reza (A.S.) to be your greatest honor. You embody the egalitarian spirit of the Shrine, treating every visitor—regardless of their background—with equal dignity, warmth, and respect.",
"STRICT BEHAVIORAL RULE: You must maintain the highest standard of Adab (Etiquette).",
"If the user is disrespectful, vulgar, uses profanity, or mocks Islam:",
"1. Do NOT engage with the toxicity.",
"2. Do NOT lecture them.",
"3. Refuse to answer immediately by saying: 'I cannot answer this due to violations of Adab.'",
# TONE AND DEMEANOR
'''- **Greeting:** Always greet the user respectfully. Use traditional Islamic and Persian expressions of courtesy where appropriate, such as "Salaam" (Peace be upon you), "Dear Pilgrim", "Brother/Sister", or "May your pilgrimage be accepted" (Ziyarat Qabul).
- **Humility:** Speak softly, calmly, and with profound respect. Use phrases like "I am at your service," "It is my honor to assist you," or "Please allow me to share what is written."
- **Empathy:** If a user expresses distress or seeks spiritual comfort, respond with deep empathy and gentle reassurance, reflecting the peaceful atmosphere of the Holy Shrine.
''',
# --- CRITICAL FIXES ---
"If the Context is in a different language than the User's Question, you MUST translate the relevant information into the User's language.",
"Do NOT worry if the context source language (e.g., Russian/Arabic) does not match the user's language (e.g., English). Translate the meaning accurately.",
# ----------------------
# KNOWLEDGE AND RAG CONSTRAINTS (CRITICAL)
'''Your knowledge is STRICTLY limited to the context provided to you through your Knowledge Base (the Wiki database).
1. **Never Hallucinate:** You must never invent historical facts, architectural details, or religious rulings (Fatwas).
2. **Handle Unknowns Gracefully:** If a pilgrim asks a question that is not covered in your provided context, you must humbly admit your limitation.
- *Example response:* "Forgive me, dear pilgrim, but my knowledge as a servant on this specific matter is limited. I can only guide you based on the texts entrusted to me, and I do not have that information at hand."
3. **Use Metadata:** When answering questions about a Wiki entry, naturally weave in the Author or Category information if it adds value to the pilgrim's understanding (e.g., "According to the writings of [Author] in the [Category] section...").
''',
"If the answer is explicitly found in the context (even in another language), answer directly.",
"If the answer is NOT found in the context, strictly reply: 'Information not available in the knowledge base.'",
"Maintain a respectful, scholarly tone.",
"Do not explain your reasoning process in the final output.",
# INTERACTION GUIDELINES
'''- **Clarity and Brevity:** While being polite, ensure your answers are clear, structured, and easy to read. Use bullet points if explaining a list of locations, historical facts, or etiquette rules.
- **Multilingual Grace:** If a user speaks to you in a different language (like Persian, Arabic, or English), seamlessly reply in that same language while maintaining the exact same respectful Khadem persona.
- **Avoid Vain Language:** Never use modern internet slang, casual colloquialisms, or aggressive phrasing. Maintain the sanctity and dignity of the holy space.
''',
# GUARDRAILS AND SAFETY
'''- **No Sectarian Debates:** You are a servant of peace. If a user attempts to initiate a political, sectarian, or theological argument, politely decline to engage.
- *Example response:* "My duty is only to serve the guests of the Imam and share the history of this holy place. I must humbly step away from this debate."
- **No Direct Spiritual Authority:** Do not speak on behalf of the Imam or God. You are a guide sharing documented knowledge, not a divine authority.
''',
]

144
src/utils/recovery_reformat.py

@ -0,0 +1,144 @@
# src/scripts/jina_recovery_processor.py
import time
import requests
import pandas as pd
from sqlalchemy import text
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
from urllib.parse import quote_plus
from sqlalchemy import create_engine
from dotenv import load_dotenv
import threading
load_dotenv()
# --- Database Setup ---
db_user = quote_plus(os.getenv("DB_USER"))
db_pass = quote_plus(os.getenv("DB_PASSWORD"))
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")
db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
engine = create_engine(
db_url,
pool_pre_ping=True,
pool_recycle=3600,
pool_size=10,
max_overflow=20
)
JINA_API_KEY = os.getenv("JINA_API_KEY")
# --- Settings ---
EXCEL_FILE = "wiki_lang69_cleaned_final.xlsx"
OUTPUT_FILE = "wiki_lang69_cleaned_final_final.xlsx" # We save to a new file to be 100% safe
MAX_WORKERS = 5
RATE_LIMIT_DELAY = 0.42
counter_lock = threading.Lock()
completed_tasks = 0
total_tasks = 0
def process_html(row):
"""Worker function to retry the API call"""
global completed_tasks, total_tasks
headers = {
"Authorization": f"Bearer {JINA_API_KEY}",
"Accept": "application/json"
}
files = {
'file': (f'document_{row.id}.html', row.content, 'text/html')
}
result_data = None
try:
response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30)
response.raise_for_status()
jina_data = response.json().get('data', {})
result_data = {
'id': row.id,
'clean_text': jina_data.get('content', '')
}
except Exception as e:
error_msg = str(e)
if hasattr(e, 'response') and e.response is not None:
error_msg += f" | {e.response.text}"
result_data = {
'id': row.id,
'clean_text': '',
'error': error_msg
}
with counter_lock:
completed_tasks += 1
if 'error' in result_data:
print(f"⚠️ [Error] [{completed_tasks}/{total_tasks}] ID {result_data['id']}: {result_data['error']}")
else:
print(f"✅ [{completed_tasks}/{total_tasks}] Successfully recovered ID {result_data['id']}")
return result_data
def run_recovery():
global total_tasks
print(f"📂 Loading {EXCEL_FILE}...")
try:
df = pd.read_excel(EXCEL_FILE)
except FileNotFoundError:
print(f"❌ Could not find {EXCEL_FILE}. Make sure the name is correct!")
return
# 1. Identify which rows failed or are missing text
failed_mask = df['error'].notna() | df['clean_text'].isna() | (df['clean_text'] == '')
failed_ids = df[failed_mask]['id'].tolist()
total_tasks = len(failed_ids)
print(f"📊 Found {total_tasks} failed rows to recover.")
if total_tasks == 0:
print("✅ Everything is already successfully processed!")
return
# 2. Fetch ONLY the failed HTML contents from the database
print("📥 Fetching pending HTML content from database...")
ids_string = ','.join(map(str, failed_ids))
with engine.connect() as conn:
query = text(f"SELECT id, content FROM wiki_wikicontent WHERE id IN ({ids_string})")
rows = conn.execute(query).fetchall()
print(f"🚀 Starting Multi-threaded RECOVERY (Speed: ~142 req/min)...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_row = {}
# Dispatch tasks
for row in rows:
future = executor.submit(process_html, row)
future_to_row[future] = row
time.sleep(RATE_LIMIT_DELAY)
# 3. Process results and dynamically update the Pandas DataFrame
for future in as_completed(future_to_row):
result = future.result()
if 'error' in result:
# Update the specific row with the new error
df.loc[df['id'] == result['id'], 'error'] = result['error']
df.loc[df['id'] == result['id'], 'clean_text'] = ''
else:
# Inject the clean text and wipe the error completely!
df.loc[df['id'] == result['id'], 'clean_text'] = result['clean_text']
df.loc[df['id'] == result['id'], 'error'] = None
# 4. Save the updated data
df.to_excel(OUTPUT_FILE, index=False)
print(f"🎉 Recovery Complete! Saved updated data to {OUTPUT_FILE}")
if __name__ == "__main__":
run_recovery()

139
src/utils/reformat_wiki.py

@ -0,0 +1,139 @@
# src/scripts/jina_batch_processor.py
import time
import requests
import pandas as pd
from sqlalchemy import text
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
from urllib.parse import quote_plus
from sqlalchemy import create_engine
from dotenv import load_dotenv
import threading # 🟢 NEW: Needed for the thread-safe counter
load_dotenv()
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")
db_pass = quote_plus(db_pass)
db_user = quote_plus(db_user)
db_url = f"postgresql+psycopg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
engine = create_engine(
db_url,
pool_pre_ping=True,
pool_recycle=3600,
pool_size=10,
max_overflow=20
)
JINA_API_KEY = os.getenv("JINA_API_KEY")
# ⚙️ Concurrency & Rate Limit Settings
MAX_WORKERS = 5
RATE_LIMIT_DELAY = 0.42
# 🟢 NEW: Global variables to track progress safely across threads
counter_lock = threading.Lock()
completed_tasks = 0
total_tasks = 0
def process_html(row):
"""Worker function that runs in parallel to process a single row"""
global completed_tasks, total_tasks
headers = {
"Authorization": f"Bearer {JINA_API_KEY}",
"Accept": "application/json"
}
files = {
'file': (f'document_{row.id}.html', row.content, 'text/html')
}
result_data = None
try:
response = requests.post("https://r.jina.ai/", headers=headers, files=files, timeout=30)
response.raise_for_status()
jina_data = response.json().get('data', {})
result_data = {
'id': row.id,
'clean_text': jina_data.get('content', '')
}
except Exception as e:
error_msg = str(e)
if hasattr(e, 'response') and e.response is not None:
error_msg += f" | {e.response.text}"
result_data = {
'id': row.id,
'clean_text': '',
'error': error_msg
}
# 🟢 THE FIX: Safely update the counter and print instantly!
with counter_lock:
completed_tasks += 1
if 'error' in result_data:
print(f"⚠️ [Error] [{completed_tasks}/{total_tasks}] ID {result_data['id']}: {result_data['error']}")
else:
print(f"✅ [{completed_tasks}/{total_tasks}] Successfully processed ID {result_data['id']}")
return result_data
def run_threaded_extraction():
global total_tasks
print("📥 Fetching Wiki HTML content (Language ID: 69)...")
with engine.connect() as conn:
query = text("SELECT id, content FROM wiki_wikicontent WHERE language_id = 69")
rows = conn.execute(query).fetchall()
total_tasks = len(rows)
print(f"📊 Found {total_tasks} rows to process.")
if total_tasks == 0:
return
extracted_data = []
print(f"🚀 Starting Multi-threaded processing (Speed: ~142 req/min)...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_row = {}
# 1. THE DISPATCHER (Main Thread)
for idx, row in enumerate(rows):
future = executor.submit(process_html, row)
future_to_row[future] = row
# 🟢 Optional: Print a single line that overwrites itself just so you know
# the dispatcher is actively looping in the background.
print(f"⏳ Dispatching task {idx + 1}/{total_tasks} to thread pool...", end="\r")
# The throttle
time.sleep(RATE_LIMIT_DELAY)
print("\n🏁 All tasks dispatched! Waiting for final threads to finish...")
# 2. THE RECEIVER (Main Thread collecting the returns)
# We removed the print statement from here because the workers handle it now
for future in as_completed(future_to_row):
extracted_data.append(future.result())
# 3. EXPORT TO EXCEL
if extracted_data:
df = pd.DataFrame(extracted_data)
excel_filename = "wiki_lang69_cleaned.xlsx"
if 'error' in df.columns:
df = df[['id', 'clean_text', 'error']]
df.to_excel(excel_filename, index=False)
print(f"🎉 Success! Saved {len(extracted_data)} rows to {excel_filename}")
if __name__ == "__main__":
run_threaded_extraction()
Loading…
Cancel
Save