Skip to content

Connector now uses a requests Session, can have a pool #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .coverage
Binary file not shown.
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ venv/
_build/
build/
dist/
neo4j_connector.egg-info/
neo4j_connector.egg-info/
.pytest_cache/
.mypy_cache/
data/
.vscode/
Pipfile
Pipfile.lock
21 changes: 21 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
black = "*"
mypy = "*"
pytest = "*"
pytest-asyncio = "*"
pytest-cov = "*"

[packages]
httpx = "*"
pytest-asyncio = "*"

[requires]
python_version = "3.7"

[pipenv]
allow_prereleases = true
445 changes: 445 additions & 0 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions bin/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pipenv run python -m pytest --cov=neo4j\
--cov-report=term-missing \
--durations=0 \
-s
96 changes: 39 additions & 57 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,28 @@
library.
"""

import requests
import sys
from typing import List, Tuple
from collections import namedtuple
from typing import List, Optional, Tuple
from urllib.parse import urlparse

import requests


class Statement(dict):
"""Class that helps transform a cypher query plus optional parameters into the dictionary structure that Neo4j
expects. The values can easily be accessed as shown in the last code example.

Args:
cypher (str): the Cypher statement
parameters (dict): [optional] parameters that are merged into the statement at the server-side. Parameters help
with speeding up queries because the execution plan for identical Cypher statements is cached.

Example code:

>>> # create simple statement
>>> statement = Statement("MATCH () RETURN COUNT(*) AS node_count")

>>> # create parametrized statement
>>> statement = Statement("MATCH (n:node {uuid: {uuid}}) RETURN n", {'uuid': '123abc'})

>>> # create multiple parametrized statements
>>> statements = [Statement("MATCH (n:node {uuid: {uuid}}) RETURN n", {'uuid': uuid}) for uuid in ['123abc', '456def']]

>>> # print individual Statement values
>>> statement = Statement("MATCH (n:node {uuid: {uuid}}) RETURN n", {'uuid': '123abc'})
>>> print("Cypher statement: {}".format(statement['statement']))
Expand All @@ -38,104 +34,105 @@ class Statement(dict):
def __init__(self, cypher: str, parameters: dict = None):
super().__init__(statement=cypher)
if parameters:
self['parameters'] = parameters
self["parameters"] = parameters


class Connector:
"""Class that abstracts communication with neo4j into up-front setup and then executes one or more
:class:`Statement`. The connector doesn't maintain an open connection and thus doesn't need to be closed after
use.

Args:
endpoint (str): the fully qualified endpoint to send messages to
credentials (tuple[str, str]): the credentials that are used to authenticate the requests
verbose_errors (bool): if set to True the :class:`Connector` prints :class:`Neo4jErrors` messages and codes to
the standard error output in a bit nicer format than the stack trace.

pool (optional[int]): will determine the size of the underlying http connection pool. Useful for multithreaded
applications.
Example code:

>>> # default connector
>>> connector = Connector()

>>> # localhost connector, custom credentials
>>> connector = Connector(credentials=('username', 'password'))

>>> # custom connector
>>> connector = Connector('http://mydomain:7474', ('username', 'password'))
"""

# default endpoint of localhost
default_host = 'http://localhost:7474'
default_path = '/db/data/transaction/commit'
# default host and endpoint
default_host = "http://localhost:7474"
default_path = "/db/data/transaction/commit"

# default credentials
default_credentials = ('neo4j', 'neo4j')

def __init__(self, host: str = default_host, credentials: Tuple[str, str] = default_credentials,
verbose_errors=False):
default_credentials = ("neo4j", "neo4j")

def __init__(
self,
host: str = default_host,
credentials: Tuple[str, str] = default_credentials,
pool: Optional[int] = None,
verbose_errors: bool = False,
):
self.endpoint = host + self.default_path
self.credentials = credentials
self.verbose_errors = verbose_errors
self.session = requests.Session()
if pool:
self.session.mount(
prefix=f"{urlparse(host).scheme}://",
adapter=requests.adapters.HTTPAdapter(
pool_connections=pool, pool_maxsize=pool
),
)

self.session.auth = credentials

def run(self, cypher: str, parameters: dict = None):
"""
Method that runs a single statement against Neo4j in a single transaction. This method builds the
:class:`Statement` object for the user.

Args:
cypher (str): the Cypher statement
parameters (dict): [optional] parameters that are merged into the statement at the server-side. Parameters
help with speeding up queries because the execution plan for identical Cypher statements is cached.

Returns:
list[dict]: a list of dictionaries, one dictionary for each row in the result. The keys in the dictionary
are defined in the Cypher statement

Raises:
Neo4jErrors

Example code:

>>> # retrieve all nodes' properties
>>> all_nodes = [row['n'] for row in connector.run("MATCH (n) RETURN n")]

>>> # single row result
>>> node_count = connector.run("MATCH () RETURN COUNT(*) AS node_count")[0]['node_count']

>>> # get a single node's properties with a statement + parameter
>>> # in this case we're assuming: CONSTRAINT ON (node:node) ASSERT node.uuid IS UNIQUE
>>> single_node_properties_by_uuid = connector.run("MATCH (n:node {uuid: {uuid}}) RETURN n", {'uuid': '123abc'})[0]['n']
"""
response = self.post([Statement(cypher, parameters)])
return self._clean_results(response)[0]

def run_multiple(self, statements: List[Statement], batch_size: int = None) -> List[List[dict]]:
def run_multiple(
self, statements: List[Statement], batch_size: int = None
) -> List[List[dict]]:
"""
Method that runs multiple :class:`Statement`\ s against Neo4j in a single transaction or several batches.

Args:
statements (list[Statement]): the statements to execute
batch_size (int): [optional] number of statements to send to Neo4j per batch. In case the batch_size is
omitted (i.e. None) then all statements are sent as a single batch. This parameter can help make large
jobs manageable for Neo4j (e.g not running out of memory).

Returns:
list[list[dict]]: a list of statement results, each containing a list of dictionaries, one dictionary for
each row in the result. The keys in the dictionary are defined in the Cypher statement. The statement
results have the same order as the corresponding :class:`Statement`\ s

Raises:
Neo4jErrors

Example code:

>>> cypher = "MATCH (n:node {uuid: {uuid}}) RETURN n"
>>> statements = [Statement(cypher, {'uuid': uuid}) for uuid in ['123abc', '456def']
>>> statements_responses = connector.run_multiple(statements)
>>> for statement_responses in statements_responses:
>>> for row in statement_responses:
>>> print(row)

>>> # we can use batches if we're likely to overwhelm neo4j by sending everything in a single request
>>> # note that this has no effect on the returned data structure
>>> cypher = "MATCH (n:node {uuid: {uuid}}) RETURN n"
Expand All @@ -144,7 +141,6 @@ def run_multiple(self, statements: List[Statement], batch_size: int = None) -> L
>>> for statement_responses in statements_responses:
>>> for row in statement_responses:
>>> print(row)

>>> # we can easily re-use some information from the statement in the next example
>>> cypher = "MATCH (language {name: {name}})-->(word:word)) RETURN word"
>>> statements = [Statement(cypher, {'name': lang}) for lang in ['en', 'nl']
Expand All @@ -167,26 +163,21 @@ def post(self, statements: List[Statement]):
<https://neo4j.com/docs/http-api/3.5/http-api-actions/begin-and-commit-a-transaction-in-one-request/>`_.
This specifically includes the metadata per row and has a separate entry for the result names and the actual
values.

Args:
statements (list[Statement]): the statements that are POST-ed to Neo4j

Returns:
dict: the parsed Neo4j HTTP API response

Raises:
Neo4jErrors

Example code:

>>> cypher = "MATCH (n:node {uuid: {uuid}}) RETURN n"
>>> statements = [Statement(cypher, {'uuid': uuid}) for uuid in ['123abc', '456def']
>>> statements_responses = connector.run_multiple(statements)
>>> for result in statements_responses['results']:
>>> for datum in result['data']:
>>> print(datum['row'][0]) #n is the first item in the row
"""
response = requests.post(self.endpoint, json={'statements': statements}, auth=self.credentials)
response = self.session.post(self.endpoint, json={"statements": statements})
json_response = response.json()

self._check_for_errors(json_response)
Expand All @@ -205,10 +196,10 @@ def make_batches(statements: List[Statement], batch_size: int = None) -> List:
raise ValueError("batchsize should be >= 1")

for start_idx in range(0, len(statements), batch_size):
yield statements[start_idx:start_idx + batch_size]
yield statements[start_idx : start_idx + batch_size]

def _check_for_errors(self, json_response):
errors = json_response.get('errors')
errors = json_response.get("errors")
if errors:
neo4j_errors = Neo4jErrors(errors)
if self.verbose_errors:
Expand All @@ -220,23 +211,17 @@ def _check_for_errors(self, json_response):
@staticmethod
def _clean_results(response):
return [
[
dict(zip(result['columns'], datum['row']))
for datum in result['data']
]
for result in response['results']
[dict(zip(result["columns"], datum["row"])) for datum in result["data"]]
for result in response["results"]
]


class Neo4jErrors(Exception):
"""Exception that is raised when Neo4j responds to a request with one or more error message. Iterate over this
object to get the individual :class:`Neo4jError` objects

Args:
errors (list(dict)): A list of dictionaries that contain the 'code' and 'message' properties

Example code:

>>> try:
>>> connector.run(...)
>>> except Neo4jErrors as neo4j_errors:
Expand All @@ -246,23 +231,20 @@ class Neo4jErrors(Exception):
"""

def __init__(self, errors: List[dict]):
self.errors = [Neo4jError(error['code'], error['message']) for error in errors]
self.errors = [Neo4jError(error["code"], error["message"]) for error in errors]

def __iter__(self):
return iter(self.errors)


# wrapped the namedtuple in a class so it gets documented properly
class Neo4jError(namedtuple('Neo4jError', ['code', 'message'])):
class Neo4jError(namedtuple("Neo4jError", ["code", "message"])):
"""namedtuple that contains the code and message of a Neo4j error

Args:
code (str): Error status code as defined in https://neo4j.com/docs/status-codes/3.5/
message (str): Descriptive message. For Cypher syntax errors this will contain a separate line (delimited by \\\\n)
that contains the '^' character to point to the problem

Example code:

>>> print(neo4j_error.code, file=sys.stderr)
>>> print(neo4j_error.message, file=sys.stderr)
"""
24 changes: 12 additions & 12 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from setuptools import setup

with open('README.rst') as readme_file:
with open("README.rst") as readme_file:
long_description = readme_file.read()

with open('requirements.txt') as requirements_file:
with open("requirements.txt") as requirements_file:
install_requires = requirements_file.read()

setup(
name='neo4j-connector',
version='1.1.0',
description='Connector with single-request transactions for Neo4j 3.0 and above',
name="neo4j-connector",
version="1.1.0",
description="Connector with single-request transactions for Neo4j 3.0 and above",
long_description=long_description,
long_description_content_type="text/x-rst",
author='Jelle Jan Bankert (Textkernel BV)',
author_email='bankert@textkernel.com',
url='https://github.com/textkernel/neo4j-connector',
license='MIT',
packages=['neo4j'],
author="Jelle Jan Bankert (Textkernel BV)",
author_email="bankert@textkernel.com",
url="https://github.com/textkernel/neo4j-connector",
license="MIT",
packages=["neo4j"],
install_requires=install_requires,
test_suite='tests',
test_suite="tests",
classifiers=[
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
Expand All @@ -30,5 +30,5 @@
"Topic :: Database",
"Topic :: Software Development",
"Topic :: Utilities",
]
],
)
Loading