Skip to content

working version #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ wheels/

# Virtual environments
.venv/

venv/
.env

.dockerignore
Expand All @@ -22,3 +22,6 @@ src/models/
*.log.*
*.rdb
*.db

codexify.sqlite-shm
codexify.sqlite-wal
40 changes: 0 additions & 40 deletions docker-compose.yml

This file was deleted.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ uvicorn
uvloop
zstandard
rq==1.10.1
greenlet
20 changes: 14 additions & 6 deletions src/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,16 @@ async def get_texts_for_model_and_embedding_pooling_method(llm_model_name: str,
)
return texts_by_model_and_embedding_pooling_method

async def get_or_compute_embedding(request: EmbeddingRequest, req: Request = None, client_ip: str = None, document_file_hash: str = None, use_verbose: bool = True) -> dict:
request_time = datetime.utcnow() # Capture request time as datetime object
ip_address = (
client_ip or (req.client.host if req else "localhost")
) # If client_ip is provided, use it; otherwise, try to get from req; if not available, default to "localhost"
async def get_or_compute_embedding(
request: EmbeddingRequest,
req: Request = None,
client_ip: str = None,
document_file_hash: str = None,
use_verbose: bool = True,
db_writer = None
) -> dict:
request_time = datetime.utcnow()
ip_address = client_ip or (req.client.host if req else None)
if use_verbose:
logger.info(f"Received request for embedding for '{request.text}' using model '{request.llm_model_name}' and embedding pooling method '{request.embedding_pooling_method}' from IP address '{ip_address}'")
text_embedding_instance = await get_embedding_from_db(
Expand Down Expand Up @@ -469,7 +474,10 @@ async def get_or_compute_embedding(request: EmbeddingRequest, req: Request = Non
if word_length_of_input_text > 0:
if use_verbose:
logger.info(f"Embedding calculated for '{request.text}' using model '{request.llm_model_name}' and embedding pooling method '{request.embedding_pooling_method}' in {total_time:,.2f} seconds, or an average of {total_time/word_length_of_input_text :.2f} seconds per word. Now saving to database...")
await db_writer.enqueue_write([embedding_instance]) # Enqueue the write operation using the db_writer instance directly
# Use the passed db_writer if available
if db_writer:
await db_writer.enqueue_write([embedding_instance])

return {"text_embedding_dict": embedding_instance.as_dict()}

async def calculate_sentence_embeddings_list(llama, texts: list, embedding_pooling_method: str) -> list:
Expand Down
224 changes: 155 additions & 69 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from db import AsyncSessionLocal, create_async_engine, create_tables
from utils import build_faiss_indexes, configure_redis_optimally
from models import DocumentEmbedding, Document, TextEmbedding, DocumentContentResponse, DocumentPydantic, SemanticDataTypeResponse, AllSemanticDataTypesResponse
Expand Down Expand Up @@ -498,77 +497,164 @@ async def simple_semantic_search(request: SemanticSearchRequest, req: Request, t
logger.error(traceback.format_exc()) # Print the traceback
raise HTTPException(status_code=500, detail="Internal Server Error")

@app.post("/semantic-search/advanced")
async def advanced_semantic_search(request: AdvancedSemanticSearchRequest, req: Request, token: str = None) -> AdvancedSemanticSearchResponse:

global faiss_indexes, associated_texts_by_model_and_pooling_method
request_time = datetime.utcnow()
request.query_text = prepare_string_for_embedding(request.query_text)
unique_id = f"advanced_semantic_search_{request.query_text}_{request.llm_model_name}_{request.embedding_pooling_method}_{request.similarity_filter_percentage}_{request.number_of_most_similar_strings_to_return}"


faiss_indexes, associated_texts_by_model_and_pooling_method = await build_faiss_indexes(force_rebuild=True)
@app.post("/semantic-search/advanced", response_model=Dict[str, Any])
async def advanced_semantic_search(
request: AdvancedSemanticSearchRequest,
token: str = None
) -> Dict[str, Any]:
"""
Queue an advanced semantic search request for processing by a worker.
"""
try:
faiss_index = faiss_indexes[(request.llm_model_name, request.embedding_pooling_method)]
except KeyError:
raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {request.llm_model_name} and pooling method: {request.embedding_pooling_method}")
llm_model_name = request.llm_model_name
embedding_pooling_method = request.embedding_pooling_method
num_results_before_corpus_filter = request.number_of_most_similar_strings_to_return*25
logger.info(f"Received request to find most similar strings for query text: `{request.query_text}` using model: {llm_model_name}")
# Create a unique job ID
timestamp = datetime.now().isoformat()
unique_id = hashlib.md5(f"{request.query_text}_{request.llm_model_name}_{timestamp}".encode()).hexdigest()

logger.info(f"Processing advanced semantic search request for query: {request.query_text}")

# Check existing jobs using the global redis_manager
try:
document_scans_queue = redis_manager.get_queue('document_scans')
existing_jobs = document_scans_queue.get_job_ids()
for job_id in existing_jobs:
try:
job = Job.fetch(job_id, connection=redis_manager.redis_sync)
if (job and job.args and
job.args[6] == request.query_text and # query_text is the 7th argument
job.args[1] == request.llm_model_name and
job.args[2] == request.embedding_pooling_method and
job.get_status() != 'failed'):
return {
"status": "already_queued",
"message": f"Search already in progress. Job ID: {job_id}",
"job_id": job_id
}
except Exception as fetch_err:
logger.warning(f"Error checking existing job {job_id}: {str(fetch_err)}")
continue

except Exception as e:
logger.warning(f"Error checking existing jobs: {str(e)}")

# Enqueue the new task using the global redis_manager
try:
job = document_scans_queue.enqueue(
'worker.scan_document_task',
args=(
None, # document_hash is no longer needed
request.llm_model_name,
request.embedding_pooling_method,
request.corpus_identifier_string,
request.json_format,
request.send_back_json_or_zip_file,
request.query_text,
request.similarity_filter_percentage,
request.number_of_most_similar_strings_to_return,
request.result_sorting_metric
),
job_id=unique_id,
result_ttl=86400,
failure_ttl=86400,
timeout='1h'
)

if not job:
raise Exception("Job creation failed")

logger.info(f"Job enqueued successfully. Job ID: {unique_id}")

# Verify the job was enqueued
verification_attempts = 3
for attempt in range(verification_attempts):
try:
enqueued_job = Job.fetch(unique_id, connection=redis_manager.redis_sync)
if enqueued_job:
return {
"status": "queued",
"message": f"Search queued successfully. Job ID: {unique_id}",
"job_id": unique_id
}
except Exception as fetch_err:
if attempt == verification_attempts - 1:
raise
logger.warning(f"Verification attempt {attempt + 1} failed: {str(fetch_err)}")
await asyncio.sleep(0.5) # Wait briefly before retrying

raise Exception("Job verification failed after multiple attempts")

except Exception as e:
logger.error(f"Failed to enqueue job: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to queue semantic search: {str(e)}"
)

except Exception as e:
error_msg = f"Failed to process semantic search request: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=error_msg)

@app.get("/semantic-search/status/{job_id}", response_model=Dict[str, Any])
async def get_search_status(job_id: str) -> Dict[str, Any]:
"""
Check the status of a semantic search job
"""
try:
logger.info(f"Computing embedding for input text: {request.query_text}")
embedding_request = EmbeddingRequest(text=request.query_text, llm_model_name=llm_model_name, embedding_pooling_method=embedding_pooling_method)
embedding_response = await get_or_compute_embedding(embedding_request, req)
embedding_json = embedding_response["text_embedding_dict"]["embedding_json"]
embedding_vector = json.loads(embedding_json)
input_embedding = np.array(embedding_vector).astype('float32').reshape(1, -1)
faiss.normalize_L2(input_embedding)
logger.info(f"Computed embedding for input text: {request.query_text}")
final_results = []
faiss_index = faiss_indexes[(llm_model_name, embedding_pooling_method)]
if faiss_index is None:
raise HTTPException(status_code=400, detail=f"No FAISS index found for model: {llm_model_name} and pooling method: {embedding_pooling_method}")
num_results = max([1, int((1 - request.similarity_filter_percentage) * len(associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method]))])
num_results_before_corpus_filter = min(num_results_before_corpus_filter, len(associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method]))
similarities, indices = faiss_index.search(input_embedding, num_results_before_corpus_filter)
filtered_indices = indices[0]
filtered_similarities = similarities[0]
similarity_results = []
associated_texts = associated_texts_by_model_and_pooling_method[llm_model_name][embedding_pooling_method]
list_of_corpus_identifier_strings = await get_list_of_corpus_identifiers_from_list_of_embedding_texts(associated_texts, llm_model_name, embedding_pooling_method)
for idx, similarity in zip(filtered_indices, filtered_similarities):
if idx < len(associated_texts) and list_of_corpus_identifier_strings[idx] == request.corpus_identifier_string:
associated_text = associated_texts[idx]
similarity_results.append((similarity, associated_text))
similarity_results = sorted(similarity_results, key=lambda x: x[0], reverse=True)[:num_results]
for _, associated_text in similarity_results:
embedding_request = EmbeddingRequest(text=associated_text, llm_model_name=llm_model_name, embedding_pooling_method=embedding_pooling_method)
embedding_response = await get_or_compute_embedding(request=embedding_request, req=req, use_verbose=False)
embedding_json = embedding_response["text_embedding_dict"]["embedding_json"]
embedding_vector = json.loads(embedding_json)
comparison__embedding = np.array(embedding_vector).astype('float32').reshape(1, -1)
params = {
"vector_1": input_embedding.tolist()[0],
"vector_2": comparison__embedding.tolist()[0],
"similarity_measure": "all"
}
similarity_stats_str = fvs.py_compute_vector_similarity_stats(json.dumps(params))
similarity_stats_json = json.loads(similarity_stats_str)
final_results.append({
"search_result_text": associated_text,
"similarity_to_query_text": similarity_stats_json
job = Job.fetch(job_id, connection=redis_manager.redis_sync)

status_mapping = {
'queued': {'status': 'pending', 'message': 'Search queued'},
'started': {'status': 'pending', 'message': 'Search in progress'},
'finished': {'status': 'completed', 'result': job.result},
'failed': {'status': 'failed', 'error': str(job.exc_info)},
'stopped': {'status': 'stopped', 'message': 'Search stopped'},
'deferred': {'status': 'pending', 'message': 'Search deferred'}
}

job_status = job.get_status()
return status_mapping.get(job_status, {
'status': 'unknown',
'message': f'Unknown job status: {job_status}'
})

except Exception as e:
error_msg = f"Error fetching job status: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return {
"status": "error",
"message": error_msg
}

@app.get("/semantic-search/jobs", response_model=Dict[str, Any])
async def list_search_jobs() -> Dict[str, Any]:
"""
List all semantic search jobs and their statuses
"""
try:
jobs = []
job_ids = Queue('document_scans', connection=redis_manager.redis_sync).get_job_ids()

for job_id in job_ids:
job = Job.fetch(job_id, connection=redis_manager.redis_sync)
jobs.append({
'job_id': job_id,
'status': job.get_status(),
'created_at': job.created_at.isoformat() if job.created_at else None,
'query_text': job.args[6] if job.args else None # query_text is the 7th argument
})
num_to_return = request.number_of_most_similar_strings_to_return if request.number_of_most_similar_strings_to_return is not None else len(final_results)
results = sorted(final_results, key=lambda x: x["similarity_to_query_text"][request.result_sorting_metric], reverse=True)[:num_to_return]
response_time = datetime.utcnow()
total_time = (response_time - request_time).total_seconds()
logger.info(f"Finished advanced search in {total_time} seconds. Found {len(results)} results.")
return {"query_text": request.query_text, "corpus_identifier_string": request.corpus_identifier_string, "embedding_pooling_method": request.embedding_pooling_method, "results": results}
return {
"status": "success",
"jobs": jobs
}

except Exception as e:
logger.error(f"An error occurred while processing the request: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail="Internal Server Error")
error_msg = f"Error listing jobs: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=error_msg)

@app.post("/semantic-types/")
async def create_new_semantic_data_type(request: SemanticDataTypeEmbeddingRequest, req: Request = None, token: str = None, client_ip: str = None, document_file_hash: str = None) -> SemanticDataTypeEmbeddingResponse:
Expand Down Expand Up @@ -959,4 +1045,4 @@ async def delete_documents(

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8089)
uvicorn.run(app, host="0.0.0.0", port=8089)
2 changes: 1 addition & 1 deletion src/model_urls.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
["https://huggingface.co/nomic-ai/nomic-embed-text-v1.5-GGUF/resolve/main/nomic-embed-text-v1.5.Q6_K.gguf", "https://huggingface.co/NousResearch/Hermes-3-Llama-3.1-8B-GGUF/resolve/main/Hermes-3-Llama-3.1-8B.Q4_K_M.gguf", "https://huggingface.co/vonjack/bge-m3-gguf/resolve/main/bge-m3-q8_0.gguf", "https://huggingface.co/xtuner/llava-llama-3-8b-v1_1-gguf/resolve/main/llava-llama-3-8b-v1_1-int4.gguf"]
["https://huggingface.co/nomic-ai/nomic-embed-text-v1.5-GGUF/resolve/main/nomic-embed-text-v1.5.Q6_K.gguf"]
3 changes: 3 additions & 0 deletions src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ class AdvancedSemanticSearchRequest(BaseModel):
similarity_filter_percentage: float = 0.01
number_of_most_similar_strings_to_return: int = 10
result_sorting_metric: str = "hoeffding_d"
json_format: str = "records"
send_back_json_or_zip_file: str = "json"

@field_validator('result_sorting_metric')
def validate_similarity_measure(cls, value):
valid_measures = ["spearman_rho", "kendall_tau", "approximate_distance_correlation", "jensen_shannon_dependency_measure", "hoeffding_d"]
Expand Down
Loading