You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
244 lines
10 KiB
244 lines
10 KiB
"""
|
|
Integration tests for Qdrant vector database connection
|
|
"""
|
|
import pytest
|
|
import os
|
|
from unittest.mock import patch, Mock
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http.exceptions import UnexpectedResponse
|
|
import sys
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv # <--- ADD THIS
|
|
load_dotenv()
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# DYNAMIC PATH SETUP
|
|
# This finds the project root automatically, whether run from root or tests/ folder
|
|
# -----------------------------------------------------------------------------
|
|
# Get the absolute path of this test file
|
|
current_file = Path(__file__).resolve()
|
|
|
|
# Find the 'src' directory by looking up the tree
|
|
# We look for the folder that contains 'src'
|
|
root_path = current_file.parent
|
|
while not (root_path / 'src').exists():
|
|
if root_path == root_path.parent: # Reached system root
|
|
raise FileNotFoundError("Could not find project root containing 'src' folder")
|
|
root_path = root_path.parent
|
|
|
|
# Add the project root to Python path
|
|
sys.path.insert(0, str(root_path))
|
|
print(f"🔧 Added project root to path: {root_path}")
|
|
# -----------------------------------------------------------------------------
|
|
|
|
# Import the modules we need to test
|
|
from src.knowledge.vector_store import get_qdrant_store
|
|
from src.knowledge.embedding_factory import EmbeddingFactory
|
|
|
|
|
|
class TestQdrantConnection:
|
|
"""Test Qdrant vector database connection"""
|
|
|
|
@pytest.fixture
|
|
def mock_embedder(self):
|
|
"""Create a mock embedder for testing"""
|
|
embedder = Mock()
|
|
embedder.id = "test_embedder"
|
|
return embedder
|
|
|
|
@pytest.fixture
|
|
def real_embedder(self):
|
|
"""Create a real embedder for integration testing"""
|
|
factory = EmbeddingFactory()
|
|
return factory.get_embedder("jina_AI")
|
|
|
|
@pytest.mark.unit
|
|
def test_qdrant_connection_mock_success(self, mock_embedder):
|
|
"""Test Qdrant connection with mocked successful response"""
|
|
# Setup environment variables
|
|
test_env = {
|
|
"BASE_COLLECTION_NAME": "test_collection",
|
|
"QDRANT_URL": "http://localhost:6333",
|
|
"QDRANT_API_KEY": "test_key"
|
|
}
|
|
|
|
with patch.dict(os.environ, test_env):
|
|
with patch('src.knowledge.vector_store.Qdrant') as mock_qdrant_class:
|
|
mock_qdrant_instance = Mock()
|
|
mock_qdrant_instance.client = Mock()
|
|
mock_qdrant_class.return_value = mock_qdrant_instance
|
|
|
|
# Test connection
|
|
vector_store = get_qdrant_store(
|
|
collection_name="test_collection",
|
|
url="http://localhost:6333",
|
|
embedder=mock_embedder
|
|
)
|
|
|
|
# Verify Qdrant was initialized correctly
|
|
mock_qdrant_class.assert_called_once_with(
|
|
collection="test_collection_test_embedder",
|
|
url="http://localhost:6333",
|
|
embedder=mock_embedder,
|
|
timeout=10.0,
|
|
api_key="test_key"
|
|
)
|
|
|
|
assert vector_store is not None
|
|
|
|
@pytest.mark.unit
|
|
def test_qdrant_connection_missing_embedder(self):
|
|
"""Test that connection fails when no embedder is provided"""
|
|
with pytest.raises(ValueError, match="You must provide an 'embedder' instance"):
|
|
get_qdrant_store()
|
|
|
|
@pytest.mark.unit
|
|
def test_qdrant_connection_missing_env_vars(self, mock_embedder):
|
|
"""Test connection with missing environment variables"""
|
|
# Remove relevant env vars (don't set them to None as os.environ expects strings)
|
|
env_vars_to_remove = ["BASE_COLLECTION_NAME", "QDRANT_URL", "QDRANT_API_KEY"]
|
|
|
|
with patch.dict(os.environ, {}, clear=False): # Start with empty dict
|
|
# Remove the specific environment variables
|
|
for var in env_vars_to_remove:
|
|
os.environ.pop(var, None)
|
|
|
|
with patch('src.knowledge.vector_store.Qdrant') as mock_qdrant_class:
|
|
mock_qdrant_instance = Mock()
|
|
mock_qdrant_class.return_value = mock_qdrant_instance
|
|
|
|
# This should work with explicit parameters
|
|
vector_store = get_qdrant_store(
|
|
collection_name="explicit_collection",
|
|
url="http://explicit:6333",
|
|
embedder=mock_embedder
|
|
)
|
|
|
|
mock_qdrant_class.assert_called_once_with(
|
|
collection="explicit_collection_test_embedder",
|
|
url="http://explicit:6333",
|
|
embedder=mock_embedder,
|
|
timeout=10.0,
|
|
api_key=None # No API key provided
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
def test_qdrant_real_connection_success(self, real_embedder):
|
|
"""Test real Qdrant connection using environment configuration"""
|
|
# Skip if QDRANT_URL is not set (no real Qdrant instance available)
|
|
qdrant_url = os.getenv("QDRANT_URL")
|
|
if not qdrant_url:
|
|
pytest.skip("QDRANT_URL not set - skipping real connection test")
|
|
|
|
try:
|
|
# Attempt to create vector store with real embedder
|
|
vector_store = get_qdrant_store(
|
|
collection_name="test_connection",
|
|
embedder=real_embedder
|
|
)
|
|
|
|
# Test basic connectivity by checking if client is accessible
|
|
assert vector_store is not None
|
|
assert hasattr(vector_store, 'client')
|
|
assert vector_store.client is not None
|
|
|
|
# Try a simple operation to verify connection
|
|
# This will fail if Qdrant is not reachable
|
|
collections = vector_store.client.get_collections()
|
|
assert hasattr(collections, 'collections') # Response should have collections attribute
|
|
|
|
# Log all collections in the database as requested
|
|
print(f"📊 Found {len(collections.collections)} collections in Qdrant:")
|
|
for i, col in enumerate(collections.collections, 1):
|
|
print(f" {i}. {col.name}")
|
|
print(f" Total collections: {len(collections.collections)}")
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Qdrant connection test failed: {str(e)}")
|
|
|
|
@pytest.mark.integration
|
|
def test_qdrant_real_connection_failure(self):
|
|
"""Test behavior when Qdrant connection fails"""
|
|
# Skip if QDRANT_URL is set (we want to test failure case)
|
|
qdrant_url = os.getenv("QDRANT_URL")
|
|
if qdrant_url:
|
|
pytest.skip("QDRANT_URL is set - cannot test connection failure")
|
|
|
|
# Test with invalid URL
|
|
invalid_url = "http://invalid.qdrant.url:6333"
|
|
|
|
try:
|
|
factory = EmbeddingFactory()
|
|
embedder = factory.get_embedder("jina_AI")
|
|
|
|
# This should fail due to invalid URL
|
|
vector_store = get_qdrant_store(
|
|
collection_name="test_connection",
|
|
url=invalid_url,
|
|
embedder=embedder
|
|
)
|
|
|
|
# If we get here, try to perform an operation that requires connection
|
|
collections = vector_store.client.get_collections()
|
|
|
|
# If we reach this point without exception, the test should fail
|
|
pytest.fail("Expected connection to fail with invalid URL, but it succeeded")
|
|
|
|
except (UnexpectedResponse, Exception) as e:
|
|
# Expected to fail - this is the correct behavior
|
|
error_str = str(e).lower()
|
|
# Check for various connection failure indicators
|
|
has_connection_error = (
|
|
"failed" in error_str or
|
|
"refused" in error_str or
|
|
"timeout" in error_str or
|
|
"getaddrinfo" in error_str or # DNS resolution failure
|
|
"connection" in error_str or
|
|
isinstance(e, UnexpectedResponse)
|
|
)
|
|
assert has_connection_error, f"Expected connection error but got: {e}"
|
|
|
|
@pytest.mark.integration
|
|
def test_qdrant_collection_operations(self, real_embedder):
|
|
"""Test basic collection operations on Qdrant"""
|
|
qdrant_url = os.getenv("QDRANT_URL")
|
|
if not qdrant_url:
|
|
pytest.skip("QDRANT_URL not set - skipping collection operations test")
|
|
|
|
try:
|
|
vector_store = get_qdrant_store(
|
|
collection_name="test_operations",
|
|
embedder=real_embedder
|
|
)
|
|
|
|
# Test collection creation/deletion if needed
|
|
collection_name = f"test_operations_{real_embedder.id}"
|
|
|
|
# Check if collection exists and clean up if necessary
|
|
try:
|
|
existing_collections = vector_store.client.get_collections()
|
|
collection_names = [col.name for col in existing_collections.collections]
|
|
|
|
print(f"📋 Current collections in database ({len(collection_names)} total):")
|
|
for i, name in enumerate(collection_names, 1):
|
|
print(f" {i}. {name}")
|
|
|
|
if collection_name in collection_names:
|
|
print(f"🧹 Cleaning up existing test collection: {collection_name}")
|
|
# Clean up existing collection
|
|
vector_store.client.delete_collection(collection_name)
|
|
print(f"✅ Deleted collection: {collection_name}")
|
|
else:
|
|
print(f"ℹ️ Test collection {collection_name} does not exist (this is normal)")
|
|
|
|
# Verify collection was deleted
|
|
existing_collections = vector_store.client.get_collections()
|
|
collection_names = [col.name for col in existing_collections.collections]
|
|
assert collection_name not in collection_names
|
|
print(f"✅ Verified collection {collection_name} is not in database")
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Failed to verify/clean up test collection: {str(e)}")
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Collection operations test failed: {str(e)}")
|