Skip to content

Enhanced Ingestion with Named Graph Restrictions #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 65 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
096e809
updated shared.py to add new helper function convert_ttl_to_named_graph
tekrajchhetri Jan 30, 2025
816e1d9
Updated RabbitMQ listener to handle, i.e., consume, the published mes…
tekrajchhetri Jan 30, 2025
2f109c7
new functions added to attach provenance information about ingestion …
tekrajchhetri Jan 31, 2025
fedb7b7
checks added to ensure required parameters are present
tekrajchhetri Jan 31, 2025
ad7f206
namespace updated from test example.org to brainkb.org
tekrajchhetri Jan 31, 2025
3a9413c
update the test connection method to handle empty result for success
tekrajchhetri Jan 31, 2025
08f9ca2
endpoint that handles insertion of data to graphdatabase
tekrajchhetri Jan 31, 2025
bc8d263
new endpoint added
tekrajchhetri Jan 31, 2025
2ec9f6f
removed /query/insert-jsonld endpoint
tekrajchhetri Jan 31, 2025
b27e37f
updated pydantic schema.
tekrajchhetri Jan 31, 2025
c645a04
code update to end-to-end raw json-ld data ingestion.
tekrajchhetri Jan 31, 2025
ec5fe1f
query service updated to fetch the registered named graphs
tekrajchhetri Feb 18, 2025
d322e35
shared.py updated to add the metadata about the named graph
tekrajchhetri Feb 18, 2025
c66d071
NamedGraphSchema added
tekrajchhetri Feb 18, 2025
047b3f7
endpoint renamed and new endpoint to register the named graph added.
tekrajchhetri Feb 18, 2025
136ea9d
fixed issue - unable to upload multiple json-ld/turtle files
tekrajchhetri Feb 19, 2025
f261d7e
logger added
tekrajchhetri Feb 19, 2025
98f033a
updated configuration.py to read QUERY_SERVICE_BASE_URL for ingestion.
tekrajchhetri Feb 21, 2025
d380973
updated pydantic schema for named graphs
tekrajchhetri Feb 21, 2025
331300d
new function added to check if the targeted named graphs exists, i.e.…
tekrajchhetri Feb 21, 2025
6edb19d
code updated restrict ingestion of ttl/jsonld (raw+file) to registere…
tekrajchhetri Feb 21, 2025
ac30ae2
updated listener to ack the delivery on successful ingestion to db
tekrajchhetri Mar 3, 2025
5d94afb
init ml service
tekrajchhetri Apr 2, 2025
a5d58f5
Update .gitignore
tekrajchhetri Apr 3, 2025
3276a8f
updated requirements to fix dependency conflict issue
tekrajchhetri Apr 3, 2025
9efbe07
pydantic models for agents, tasks..
tekrajchhetri Apr 3, 2025
fa40552
crew_memory added
tekrajchhetri Apr 3, 2025
3204684
new endpoints added
tekrajchhetri Apr 3, 2025
3f51ceb
added parse_yaml_or_json to handle the input
tekrajchhetri Apr 3, 2025
57b856b
removed StructSenseInput
tekrajchhetri Apr 3, 2025
fab8a60
api endpoint added
tekrajchhetri Apr 3, 2025
a8e1a58
concurrency features added.
tekrajchhetri Apr 3, 2025
8204b7a
WEAVIATE configuration added.
tekrajchhetri Apr 4, 2025
056549a
updated docker & docker compose configuration
tekrajchhetri Apr 4, 2025
78282ef
updated grobid env variables
tekrajchhetri Apr 4, 2025
55bc6eb
structsense integrated and now the extracted NER is stored in mongodb
tekrajchhetri Apr 8, 2025
a3ed7c2
updated requirements for deprecating warning
tekrajchhetri Apr 8, 2025
f1f2403
removed direct saving the agent output because we first want user fee…
tekrajchhetri Apr 9, 2025
6b86e0b
Update configure_logging.py
tekrajchhetri Apr 9, 2025
21b2be8
Update configure_logging.py
tekrajchhetri Apr 9, 2025
efb2def
removed / from the end of endpoint name
tekrajchhetri Apr 9, 2025
acb9311
save data to mongodb
tekrajchhetri Apr 15, 2025
acc27fc
updated structsense + disabled API human feedback
tekrajchhetri May 9, 2025
ff104b5
CORSMiddleware added
tekrajchhetri May 9, 2025
4ec473f
Update requirements.txt
tekrajchhetri Jun 11, 2025
438e5fc
JWT security added
tekrajchhetri Jun 11, 2025
1a0a92f
Update jwt_auth.py
tekrajchhetri Jun 11, 2025
162726c
removed /
tekrajchhetri Jun 11, 2025
f3c1ade
rdflib PyLD added
tekrajchhetri Jun 11, 2025
e49d26c
removed / from register-named-graph endpoint to maintain consistency
tekrajchhetri Jun 12, 2025
ab412ae
updated listener to handle error and reconnect automatically
tekrajchhetri Jun 16, 2025
0624dcf
Create .gitignore
tekrajchhetri Jun 16, 2025
49f9d6f
example .env
tekrajchhetri Jun 16, 2025
7dab3a0
screenshots
tekrajchhetri Jun 16, 2025
efd76ac
custom configuration for rabbitmq for large frame size
tekrajchhetri Jun 16, 2025
af0be6c
prometheus configuration
tekrajchhetri Jun 16, 2025
ea91562
updated docker compose to include the prometheus + grafana
tekrajchhetri Jun 16, 2025
8f94d8f
created readme with detail instructions on deploying about the rabbit…
tekrajchhetri Jun 16, 2025
cb62036
Update readme.md
tekrajchhetri Jun 17, 2025
dbfe977
removed prov:wasInformedBy attached to the original triple
tekrajchhetri Jun 17, 2025
780cb6b
new endpoint added for kg file upload + json based ingestion
tekrajchhetri Jun 21, 2025
1e18f77
endpoint renamed
tekrajchhetri Jun 22, 2025
b6f837e
updated code to support json file format for resource (e.g., bbqs) ra…
tekrajchhetri Jun 22, 2025
481c79e
updated to use named-graph iri
tekrajchhetri Jun 22, 2025
4560602
Namespace issue fixed
tekrajchhetri Jun 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ingestion_service/producer/core/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ def load_environment(env_name="env"):
"RABBITMQ_PASSWORD": os.getenv("RABBITMQ_PASSWORD"),
"RABBITMQ_URL": os.getenv("RABBITMQ_URL", "localhost"),
"RABBITMQ_PORT": os.getenv("RABBITMQ_PORT", 5672),
"RABBITMQ_VHOST": os.getenv("RABBITMQ_VHOST","/")
"RABBITMQ_VHOST": os.getenv("RABBITMQ_VHOST","/"),

#query service
"QUERY_SERVICE_BASE_URL": os.getenv("QUERY_SERVICE_BASE_URL", "localhost:8010")
}


Expand Down
1 change: 1 addition & 0 deletions ingestion_service/producer/core/pydantic_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Config:

class InputJSONSLdchema(BaseModel):
user: str
graph: str
kg_data: Dict[Any, Any]


Expand Down
160 changes: 97 additions & 63 deletions ingestion_service/producer/core/routers/api_endpoints_input.py

Large diffs are not rendered by default.

60 changes: 59 additions & 1 deletion ingestion_service/producer/core/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from rdflib import Graph
import requests
import logging

from core.configuration import load_environment
logger = logging.getLogger(__name__)

# Helper function to resolve issues during the conversion from JSON-LD to Turtle representation.
Expand Down Expand Up @@ -121,3 +121,61 @@ def is_valid_jsonld(jsonld_str):
except ValueError:
return False

def check_url_for_slash(url:str):
if not url.endswith("/"):
return url + "/"
return url

def check_if_url_wellformed(url:str):
"We want to ensure that the name graph IRI is wellformed, i.e., starts with http or https, not www"
if url is None:
return False
else:
return True if url.startswith("http://") or url.startswith("https://") else False


import requests


def named_graph_exists(named_graph_iri: str) -> dict:
"""
Checks whether a named graph exists in the registered named graphs list.

Args:
named_graph_iri (str): The IRI of the named graph to check.

Returns:
dict: A dictionary indicating success or failure with a relevant message.
"""

query_service_url = load_environment().get("QUERY_SERVICE_BASE_URL", "")
endpoint = f"{check_url_for_slash(query_service_url)}query/registered-named-graphs"

# Validate the named graph IRI
print(check_if_url_wellformed(named_graph_iri))
if not check_if_url_wellformed(named_graph_iri):
return {
"status": "error",
"message": "The graph IRI is not well-formed. It should start with 'http' or 'https'."
}

try:
response = requests.get(endpoint)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requests is a synchronous call -- is this function eventually being called in an upstream async call?

response.raise_for_status() # Raise an error for bad responses (4xx, 5xx)

registered_graphs = response.json()
formatted_iri= check_url_for_slash(named_graph_iri)
if formatted_iri in registered_graphs:
return {
"status": True,
"formatted_iri": formatted_iri
}
return {
"status": False,
"message": f"The graph is not registered. Available graphs: {list(registered_graphs.keys())}"
}
except requests.exceptions.RequestException as e:
return {
"status": "error",
"message": f"Error connecting to query service: {str(e)}"
}
Comment on lines +178 to +181

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have an HTTP status code indicating it is an error returned as well?

4 changes: 3 additions & 1 deletion ingestion_service/producer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ psycopg==3.1.18
python-jose==3.3.0
python-multipart==0.0.9
passlib[bcrypt]==1.7.4
asyncpg==0.29.0
asyncpg==0.29.0
rdflib
PyLD
5 changes: 4 additions & 1 deletion ingestion_service/worker/core/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ def load_environment(env_name="env"):
"RABBITMQ_URL": os.getenv("RABBITMQ_URL", "localhost"),
"RABBITMQ_PORT": os.getenv("RABBITMQ_PORT", 5672),
"RABBITMQ_VHOST": os.getenv("RABBITMQ_VHOST","/"),
"INGEST_URL": os.getenv("INGEST_URL")
"INGEST_URL": os.getenv("INGEST_URL"),
"JWT_LOGIN_EMAIL": os.getenv("JWT_LOGIN_EMAIL"),
"JWT_LOGIN_PASSWORD": os.getenv("JWT_LOGIN_PASSWORD"),
"JWT_BEARER_TOKEN_URL": os.getenv("JWT_BEARER_TOKEN_URL")
}


11 changes: 7 additions & 4 deletions ingestion_service/worker/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
from core.rabbit_mq_listener import start_consuming
from core.configure_logging import configure_logging
from core.routers.worker import router as index_router
import logging

logger = logging.getLogger(__name__)

async def background_task():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be apart of the @app.on_event("startup")?

print("waiting for messages...")
asyncio.create_task(start_consuming())
logger.info("#### waiting for messages... ####")
loop = asyncio.get_event_loop()
# This will run start_consuming in a separate thread
await loop.run_in_executor(None, start_consuming)

app = FastAPI()
logger = logging.getLogger(__name__)
app.add_middleware(CorrelationIdMiddleware)


Expand All @@ -27,7 +30,7 @@ async def background_task():
async def startup_event():
asyncio.create_task(background_task())
configure_logging()
logger.info("Starting FastAPI")
logger.info("#### Starting FastAPI... ####")


# log all HTTP exception when raised
Expand Down
Loading