Source code for stardog.connection

"""Connect to Stardog databases.
"""

import contextlib

from . import content_types as content_types
from . import exceptions as exceptions
from .http import client
from .utils import strtobool
import urllib


[docs]class Connection: """Database Connection. This is the entry point for all user-related operations on a Stardog database """
[docs] def __init__( self, database, endpoint=None, username=None, password=None, auth=None, session=None, ): """Initializes a connection to a Stardog database. Args: database (str): Name of the database endpoint (str): Url of the server endpoint. Defaults to `http://localhost:5820` username (str, optional): Username to use in the connection password (str, optional): Password to use in the connection auth (requests.auth.AuthBase, optional): requests Authentication object. Defaults to `None` session (requests.session.Session, optional): requests Session object. Defaults to `None` Examples: >>> conn = Connection('db', endpoint='http://localhost:9999', username='admin', password='admin') """ self.client = client.Client( endpoint, database, username, password, auth=auth, session=session ) self.transaction = None
[docs] def docs(self): """Makes a document storage object. Returns: Docs: A Docs object """ return Docs(self.client)
[docs] def icv(self): """Makes an integrity constraint validation object. Returns: ICV: An ICV object """ return ICV(self)
[docs] def graphql(self): """Makes a GraphQL object. Returns: GraphQL: A GraphQL object """ return GraphQL(self)
# TODO # def list(self): # """ # List all open transactions on a database # https://stardog-union.github.io/http-docs/#operation/listTransactions # :return: # """ # TODO: Begin a transaction that takes a specific txid (the docs specify /begin and /begin/{txid}, I am not sure if this supports the latter) # Maybe this already supports it, or we might need to create another method
[docs] def begin(self, **kwargs): """Begins a transaction. Args: reasoning (bool, optional): Enable reasoning for all queries inside the transaction. If the transaction does not have reasoning enabled, queries within will not be able to use reasoning. Returns: str: Transaction ID Raises: stardog.exceptions.TransactionException If already in a transaction """ self._assert_not_in_transaction() r = self.client.post("/transaction/begin", params=kwargs) self.transaction = r.text return self.transaction
[docs] def rollback(self): """Rolls back the current transaction. Raises: stardog.exceptions.TransactionException If currently not in a transaction """ self._assert_in_transaction() self.client.post("/transaction/rollback/{}".format(self.transaction)) self.transaction = None
[docs] def commit(self): """Commits the current transaction. Raises: stardog.exceptions.TransactionException If currently not in a transaction """ self._assert_in_transaction() self.client.post("/transaction/commit/{}".format(self.transaction)) self.transaction = None
[docs] def add(self, content, graph_uri=None, server_side=False): """Adds data to the database. :param content: Data to add to a graph. :type content: Content, str :param graph_uri: Named graph into which to add the data. :type graph_uri: str, optional :param server_side: Whether the file to load lives in the remote server. :type server_side: bool :raises stardog.exceptions.TransactionException If not currently in a transaction Examples: Loads example.ttl from the current directory >>> conn.add(File('example.ttl'), graph_uri='urn:graph') Loads /tmp/example.ttl existing in the remote stardog server, and loads it in the default graph. >>> conn.add(File('/tmp/example.ttl'), server_side=True) """ self._assert_in_transaction() args = {"params": {"graph-uri": graph_uri}} if server_side: args["headers"] = { "Content-Type": "application/json", } args["json"] = { "filename": content.fname, } self.client.post("/{}/add".format(self.transaction), **args) else: with content.data() as data: args["headers"] = { "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, } args["data"] = data self.client.post("/{}/add".format(self.transaction), **args)
[docs] def remove(self, content, graph_uri=None): """Removes data from the database. Args: content (Content): Data to add graph_uri (str, optional): Named graph from which to remove the data Raises: stardog.exceptions.TransactionException If currently not in a transaction Examples: >>> conn.remove(File('example.ttl'), graph_uri='urn:graph') """ self._assert_in_transaction() with content.data() as data: self.client.post( "/{}/remove".format(self.transaction), params={"graph-uri": graph_uri}, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, data=data, )
[docs] def clear(self, graph_uri=None): """Removes all data from the database or specific named graph. Args: graph_uri (str, optional): Named graph from which to remove data Raises: stardog.exceptions.TransactionException If currently not in a transaction Examples: clear a specific named graph >>> conn.clear('urn:graph') clear the whole database >>> conn.clear() """ self._assert_in_transaction() self.client.post( "/{}/clear".format(self.transaction), params={"graph-uri": graph_uri} )
[docs] def size(self, exact=False): """Database size. Args: exact (bool, optional): Calculate the size exactly. Defaults to False Returns: int: The number of elements in database """ r = self.client.get("/size", params={"exact": exact}) return int(r.text)
# TODO: This could be part of dbms-admin (hence moved into admin.py) or here. # def model(self): # """ # Generate the reasoning model used by this database in various formats # https://stardog-union.github.io/http-docs/#operation/generateModel # :return: # """
[docs] def export( self, content_type=content_types.TURTLE, stream=False, chunk_size=10240, graph_uri=None, ): """Exports the contents of the database. Args: content_type (str): RDF content type. Defaults to 'text/turtle' stream (bool): Chunk results? Defaults to False chunk_size (int): Number of bytes to read per chunk when streaming. Defaults to 10240 graph_uri (str, optional): Named graph to export Returns: str: If stream = False Returns: gen: If stream = True Examples: no streaming >>> contents = conn.export() streaming >>> with conn.export(stream=True) as stream: contents = ''.join(stream) """ def _export(): with self.client.get( "/export", headers={"Accept": content_type}, params={"graph-uri": graph_uri}, stream=stream, ) as r: yield r.iter_content(chunk_size=chunk_size) if stream else r.content db = _export() return _nextcontext(db) if stream else next(db)
[docs] def explain(self, query, base_uri=None): """Explains the evaluation of a SPARQL query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query Returns: str: Query explanation """ params = {"query": query, "baseURI": base_uri} r = self.client.post( "/explain", data=params, ) return r.text
def __query(self, query, method, content_type=None, **kwargs): txId = self.transaction params = { "query": query, "baseURI": kwargs.get("base_uri"), "limit": kwargs.get("limit"), "offset": kwargs.get("offset"), "timeout": kwargs.get("timeout"), "reasoning": kwargs.get("reasoning"), } # query bindings bindings = kwargs.get("bindings", {}) for k, v in bindings.items(): params["${}".format(k)] = v url = "/{}/{}".format(txId, method) if txId else "/{}".format(method) r = self.client.post( url, data=params, headers={"Accept": content_type}, ) return r.json() if content_type == content_types.SPARQL_JSON else r.content
[docs] def select(self, query, content_type=content_types.SPARQL_JSON, **kwargs): """Executes a SPARQL select query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str, optional): Content type for results. Defaults to 'application/sparql-results+json' Returns: dict: If content_type='application/sparql-results+json' Returns: str: Other content types Examples: >>> conn.select('select * {?s ?p ?o}', offset=100, limit=100, reasoning=True) bindings >>> conn.select('select * {?s ?p ?o}', bindings={'o': '<urn:a>'}) """ return self.__query(query, "query", content_type=content_type, **kwargs)
[docs] def graph(self, query, content_type=content_types.TURTLE, **kwargs): """Executes a SPARQL graph query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'text/turtle' Returns: str: Results in format given by content_type Examples: >>> conn.graph('construct {?s ?p ?o} where {?s ?p ?o}', offset=100, limit=100, reasoning=True) bindings >>> conn.graph('construct {?s ?p ?o} where {?s ?p ?o}', bindings={'o': '<urn:a>'}) """ return self.__query(query, "query", content_type, **kwargs)
[docs] def paths(self, query, content_type=content_types.SPARQL_JSON, **kwargs): """Executes a SPARQL paths query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'application/sparql-results+json' Returns: dict: if content_type='application/sparql-results+json'. Returns: str: other content types. Examples: >>> conn.paths('paths start ?x = :subj end ?y = :obj via ?p', reasoning=True) """ return self.__query(query, "query", content_type, **kwargs)
[docs] def ask(self, query, **kwargs): """Executes a SPARQL ask query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values Returns: bool: Result of ask query Examples: >>> conn.ask('ask {:subj :pred :obj}', reasoning=True) """ r = self.__query(query, "query", content_types.BOOLEAN, **kwargs) return strtobool(r.decode())
[docs] def update(self, query, **kwargs): """Executes a SPARQL update query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values Examples: >>> conn.update('delete where {?s ?p ?o}') """ self.__query(query, "update", None, **kwargs)
[docs] def is_consistent(self, graph_uri=None): """Checks if the database or named graph is consistent wrt its schema. Args: graph_uri (str, optional): Named graph from which to check consistency Returns: bool: Database consistency state """ r = self.client.get( "/reasoning/consistency", params={"graph-uri": graph_uri}, ) return strtobool(r.text)
[docs] def explain_inference(self, content): """Explains the given inference results. Args: content (Content): Data from which to provide explanations Returns: dict: Explanation results Examples: >>> conn.explain_inference(File('inferences.ttl')) """ txId = self.transaction with content.data() as data: url = "/reasoning/{}/explain".format(txId) if txId else "/reasoning/explain" r = self.client.post( url, data=data, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, ) return r.json()["proofs"]
[docs] def explain_inconsistency(self, graph_uri=None): """Explains why the database or a named graph is inconsistent. Args: graph_uri (str, optional): Named graph for which to explain inconsistency Returns: dict: Explanation results """ txId = self.transaction url = ( "/reasoning/{}/explain/inconsistency".format(txId) if txId else "/reasoning/explain/inconsistency" ) r = self.client.get( url, params={"graph-uri": graph_uri}, ) return r.json()["proofs"]
def _assert_not_in_transaction(self): if self.transaction: raise exceptions.TransactionException("Already in a transaction") def _assert_in_transaction(self): if not self.transaction: raise exceptions.TransactionException("Not in a transaction")
[docs] def close(self): """Close the underlying HTTP connection.""" self.client.close()
def __enter__(self): return self def __exit__(self, *args): self.close()
[docs]class Docs: """BITES: Document Storage. See Also: https://www.stardog.com/docs/#_unstructured_data """
[docs] def __init__(self, client): """Initializes a Docs. Use :meth:`stardog.connection.Connection.docs` instead of constructing manually. """ self.client = client
[docs] def size(self): """Calculates document store size. Returns: int: Number of documents in the store """ r = self.client.get("/docs/size") return int(r.text)
[docs] def add(self, name, content): """Adds a document to the store. Args: name (str): Name of the document content (Content): Contents of the document Examples: >>> docs.add('example', File('example.pdf')) """ with content.data() as data: self.client.post("/docs", files={"upload": (name, data)})
[docs] def clear(self): """Removes all documents from the store.""" self.client.delete("/docs")
# TODO # def reindex(self): # """ # Reindex document store # https://stardog-union.github.io/http-docs/#operation/reindex # :return: # """
[docs] def get(self, name, stream=False, chunk_size=10240): """Gets a document from the store. Args: name (str): Name of the document stream (bool): If document should come in chunks or as a whole. Defaults to False chunk_size (int): Number of bytes to read per chunk when streaming. Defaults to 10240 Returns: str: If stream=False Returns: gen: If stream=True Examples: no streaming >>> contents = docs.get('example') streaming >>> with docs.get('example', stream=True) as stream: contents = ''.join(stream) """ def _get(): with self.client.get("/docs/{}".format(name), stream=stream) as r: yield r.iter_content(chunk_size=chunk_size) if stream else r.content doc = _get() return _nextcontext(doc) if stream else next(doc)
[docs] def delete(self, name): """Deletes a document from the store. Args: name (str): Name of the document """ self.client.delete("/docs/{}".format(name))
[docs]class ICV: """Integrity Constraint Validation. See Also: https://www.stardog.com/docs/#_validating_constraints """
[docs] def __init__(self, conn): """Initializes an ICV. Use :meth:`stardog.connection.Connection.icv` instead of constructing manually. """ self.conn = conn self.client = conn.client
[docs] def add(self, content): """ Deprecated: Connection.add() should be preferred. icv.add() will be removed in the next major version. Adds integrity constraints to the database. Args: content (Content): Data to add Examples: >>> icv.add(File('constraints.ttl')) """ with content.data() as data: self.client.post( "/icv/add", data=data, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, )
[docs] def remove(self, content): """ Deprecated: Connection.remove() should be preferred. icv.remove() will be removed in the next major version. Removes integrity constraints from the database. Args: content (Content): Data to remove Examples: >>> icv.remove(File('constraints.ttl')) """ with content.data() as data: self.client.post( "/icv/remove", data=data, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, )
[docs] def clear(self): """Removes all integrity constraints from the database.""" self.client.post("/icv/clear")
[docs] def list(self): """List all integrity constraints from the database.""" r = self.client.get("/icv") return r.text
[docs] def is_valid(self, content, graph_uri=None): """Checks if given integrity constraints are valid. Args: content (Content): Data to check for validity graph_uri (str, optional): Named graph to check for validity Returns: bool: Integrity constraint validity Examples: >>> icv.is_valid(File('constraints.ttl'), graph_uri='urn:graph') """ transaction = self.conn.transaction url = "icv/{}/validate".format(transaction) if transaction else "/icv/validate" with content.data() as data: r = self.client.post( url, data=data, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, params={"graph-uri": graph_uri}, ) return strtobool(r.text)
[docs] def explain_violations(self, content, graph_uri=None): """ Deprecated: icv.report() should be preferred. icv.explain_violations() will be removed in the next major version. Explains violations of the given integrity constraints. Args: content (Content): Data to check for violations graph_uri (str, optional): Named graph from which to check for validations Returns: dict: Integrity constraint violations Examples: >>> icv.explain_violations(File('constraints.ttl'), graph_uri='urn:graph') """ transaction = self.conn.transaction url = ( "/icv/{}/violations".format(transaction) if transaction else "/icv/violations" ) with content.data() as data: r = self.client.post( url, data=data, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, params={"graph-uri": graph_uri}, ) return self.client._multipart(r)
[docs] def convert(self, content, graph_uri=None): """ Deprecated: icv.convert() was meant as a debugging tool, and will be removed in the next major version. Converts given integrity constraints to a SPARQL query. Args: content (Content): Integrity constraints graph_uri (str, optional): Named graph from which to apply constraints Returns: str: SPARQL query Examples: >>> icv.convert(File('constraints.ttl'), graph_uri='urn:graph') """ with content.data() as data: r = self.client.post( "/icv/convert", data=data, headers={ "Content-Type": content.content_type, "Content-Encoding": content.content_encoding, }, params={"graph-uri": graph_uri}, ) return r.text
[docs] def report(self, **kwargs): """ Produces a SHACL validation report. Args (dict): shapes (str, optional): SHACL shapes to validate shacl.shape.graphs (str, optional): SHACL shape graphs to validate nodes (str, optional): SHACL focus node(s) to validate countLimit (str, optional): Maximum number of violations to report shacl.targetClass.simple (boolean, optional): If true, sh:targetClass will be evaluated based on rdf:type triples only, without following rdfs:subClassOf relations shacl.violation.limit.shape (str, optional): number of violation limits per SHACL shapes graph-uri (str, optional): Named Graph reasoning (boolean, optional): Enable Reasoning Returns: str: SHACL validation report Examples: >>> icv.report() """ accepted_args = [ "shapes", "shacl.shape.graphs", "nodes", "countLimit", "shacl.targetClass.simple", "shacl.violation.limit.shape", "graph-uri", "reasoning", ] for arg in kwargs: if arg not in accepted_args: raise Exception("Parameter not recognized") kwargs["prettify"] = True params = urllib.parse.urlencode(kwargs) url = f"/icv/report?{params}" r = self.client.post(url) return r.text
[docs]class GraphQL: """GraphQL See Also: https://www.stardog.com/docs/#_graphql_queries """
[docs] def __init__(self, conn): """Initializes a GraphQL. Use :meth:`stardog.connection.Connection.graphql` instead of constructing manually. """ self.conn = conn self.client = conn.client
[docs] def query(self, query, variables=None): """Executes a GraphQL query. Args: query (str): GraphQL query variables (dict, optional): GraphQL variables. Keys: '@reasoning' (bool) to enable reasoning, '@schema' (str) to define schemas Returns: dict: Query results Examples: with schema and reasoning >>> gql.query('{ Person {name} }', variables={'@reasoning': True, '@schema': 'people'}) with named variables >>> gql.query( 'query getPerson($id: Integer) { Person(id: $id) {name} }', variables={'id': 1000}) """ r = self.client.post( "/graphql", json={"query": query, "variables": variables if variables else {}}, ) res = r.json() if "data" in res: return res["data"] # graphql endpoint returns valid response with errors raise exceptions.StardogException(res)
[docs] def schemas(self): """Retrieves all available schemas. Returns: dict: All schemas """ r = self.client.get("/graphql/schemas") return r.json()["schemas"]
[docs] def clear_schemas(self): """Deletes all schemas.""" self.client.delete("/graphql/schemas")
[docs] def add_schema(self, name, content): """Adds a schema to the database. Args: name (str): Name of the schema content (Content): Schema data Examples: >>> gql.add_schema('people', content=File('people.graphql')) """ with content.data() as data: self.client.put("/graphql/schemas/{}".format(name), data=data)
[docs] def schema(self, name): """Gets schema information. Args: name (str): Name of the schema Returns: dict: GraphQL schema """ r = self.client.get("/graphql/schemas/{}".format(name)) return r.text
[docs] def remove_schema(self, name): """Removes a schema from the database. Args: name (str): Name of the schema """ self.client.delete("/graphql/schemas/{}".format(name))
@contextlib.contextmanager def _nextcontext(r): yield next(r)