Source code for stardog.connection

"""Connect to Stardog databases.
"""

import contextlib
import distutils.util

from . import content_types as content_types
from . import exceptions as exceptions
from .http import connection as http_connection


[docs]class Connection(object): """Database Connection. This is the entry point for all user-related operations on a Stardog database """
[docs] def __init__(self, database, endpoint=None, username=None, password=None): """Initializes a connection to a Stardog database. Args: database (str): Name of the database endpoint (str): Url of the server endpoint. Defaults to `http://localhost:5820` username (str, optional): Username to use in the connection password (str, optional): Password to use in the connection Examples: >>> conn = Connection('db', endpoint='http://localhost:9999', username='admin', password='admin') """ self.conn = http_connection.Connection(database, endpoint, username, password) self.transaction = None
[docs] def docs(self): """Makes a document storage object. Returns: Docs: A Docs object """ return Docs(self)
[docs] def icv(self): """Makes an integrity constraint validation object. Returns: ICV: An ICV object """ return ICV(self)
[docs] def versioning(self): """Make a versioning object. Returns: VCS: A VCS object. """ return VCS(self)
[docs] def graphql(self): """Makes a GraphQL object. Returns: GraphQL: A GraphQL object """ return GraphQL(self)
[docs] def begin(self, **kwargs): """Begins a transaction. Args: reasoning (bool, optional): Enable reasoning for all queries inside the transaction. If the transaction does not have reasoning enabled, queries within will not be able to use reasoning. Returns: str: Transaction ID Raises: stardog.exceptions.TransactionException If already in a transaction """ self._assert_not_in_transaction() self.transaction = self.conn.begin(**kwargs) return self.transaction
[docs] def rollback(self): """Rolls back the current transaction. Raises: stardog.exceptions.TransactionException If currently not in a transaction """ self._assert_in_transaction() self.conn.rollback(self.transaction) self.transaction = None
[docs] def commit(self): """Commits the current transaction. Raises: stardog.exceptions.TransactionException If currently not in a transaction """ self._assert_in_transaction() self.conn.commit(self.transaction) self.transaction = None
[docs] def add(self, content, graph_uri=None): """Adds data to the database. Args: content (Content): Data to add graph_uri (str, optional): Named graph into which to add the data Raises: stardog.exceptions.TransactionException If not currently in a transaction Examples: >>> conn.add(File('example.ttl'), graph_uri='urn:graph') """ self._assert_in_transaction() with content.data() as data: self.conn.add(self.transaction, data, content.content_type, content.content_encoding, graph_uri)
[docs] def remove(self, content, graph_uri=None): """Removes data from the database. Args: content (Content): Data to add graph_uri (str, optional): Named graph from which to remove the data Raises: stardog.exceptions.TransactionException If currently not in a transaction Examples: >>> conn.remove(File('example.ttl'), graph_uri='urn:graph') """ self._assert_in_transaction() with content.data() as data: self.conn.remove(self.transaction, data, content.content_type, content.content_encoding, graph_uri)
[docs] def clear(self, graph_uri=None): """Removes all data from the database or specific named graph. Args: graph_uri (str, optional): Named graph from which to remove data Raises: stardog.exceptions.TransactionException If currently not in a transaction Examples: clear a specific named graph >>> conn.clear('urn:graph') clear the whole database >>> conn.clear() """ self._assert_in_transaction() self.conn.clear(self.transaction, graph_uri)
[docs] def size(self): """Database size. Returns: int: The number of elements in database """ return self.conn.size()
[docs] def export(self, content_type=content_types.TURTLE, stream=False, chunk_size=10240): """Exports the contents of the database. Args: content_type (str): RDF content type. Defaults to 'text/turtle' stream (bool): Chunk results? Defaults to False chunk_size (int): Number of bytes to read per chunk when streaming. Defaults to 10240 Returns: str: If stream = False Returns: gen: If stream = True Examples: no streaming >>> contents = conn.export() streaming >>> with conn.export(stream=True) as stream: contents = ''.join(stream) """ db = self.conn.export(content_type, stream, chunk_size) return _nextcontext(db) if stream else next(db)
[docs] def explain(self, query, base_uri=None): """Explains the evaluation of a SPARQL query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query Returns: str: Query explanation """ return self.conn.explain(query, base_uri)
[docs] def select(self, query, content_type=content_types.SPARQL_JSON, **kwargs): """Executes a SPARQL select query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str, optional): Content type for results. Defaults to 'application/sparql-results+json' Returns: dict: If content_type='application/sparql-results+json' Returns: str: Other content types Examples: >>> conn.select('select * {?s ?p ?o}', offset=100, limit=100, reasoning=True) bindings >>> conn.select('select * {?s ?p ?o}', bindings={'o': '<urn:a>'}) """ return self.conn.query( query, self.transaction, content_type=content_type, **kwargs)
[docs] def graph(self, query, content_type=content_types.TURTLE, **kwargs): """Executes a SPARQL graph query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'text/turtle' Returns: str: Results in format given by content_type Examples: >>> conn.graph('construct {?s ?p ?o} where {?s ?p ?o}', offset=100, limit=100, reasoning=True) bindings >>> conn.graph('construct {?s ?p ?o} where {?s ?p ?o}', bindings={'o': '<urn:a>'}) """ return self.conn.query(query, self.transaction, content_type, **kwargs)
[docs] def paths(self, query, content_type=content_types.SPARQL_JSON, **kwargs): """Executes a SPARQL paths query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'application/sparql-results+json' Returns: dict: if content_type='application/sparql-results+json'. Returns: str: other content types. Examples: >>> conn.paths('paths start ?x = :subj end ?y = :obj via ?p', reasoning=True) """ return self.conn.query(query, self.transaction, content_type, **kwargs)
[docs] def ask(self, query, **kwargs): """Executes a SPARQL ask query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values Returns: bool: Result of ask query Examples: >>> conn.ask('ask {:subj :pred :obj}', reasoning=True) """ r = self.conn.query(query, self.transaction, content_types.BOOLEAN, **kwargs) return bool(distutils.util.strtobool(r.decode()))
[docs] def update(self, query, **kwargs): """Executes a SPARQL update query. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values Examples: >>> conn.update('delete where {?s ?p ?o}') """ self.conn.update(query, self.transaction, **kwargs)
[docs] def is_consistent(self, graph_uri=None): """Checks if the database or named graph is consistent wrt its schema. Args: graph_uri (str, optional): Named graph from which to check consistency Returns: bool: Database consistency state """ return self.conn.is_consistent(graph_uri)
[docs] def explain_inference(self, content): """Explains the given inference results. Args: content (Content): Data from which to provide explanations Returns: dict: Explanation results Examples: >>> conn.explain_inference(File('inferences.ttl')) """ with content.data() as data: return self.conn.explain_inference(data, content.content_type, content.content_encoding, self.transaction)
[docs] def explain_inconsistency(self, graph_uri=None): """Explains why the database or a named graph is inconsistent. Args: graph_uri (str, optional): Named graph for which to explain inconsistency Returns: dict: Explanation results """ return self.conn.explain_inconsistency(self.transaction, self.graph_uri)
def _assert_not_in_transaction(self): if self.transaction: raise exceptions.TransactionException('Already in a transaction') def _assert_in_transaction(self): if not self.transaction: raise exceptions.TransactionException('Not in a transaction') def __enter__(self): return self def __exit__(self, *args): self.conn.__exit__(*args)
[docs]class Docs(object): """BITES: Document Storage. See Also: https://www.stardog.com/docs/#_unstructured_data """
[docs] def __init__(self, conn): """Initializes a Docs. Use :meth:`stardog.connection.Connection.docs` instead of constructing manually. """ self.docs = conn.conn.docs()
[docs] def size(self): """Calculates document store size. Returns: int: Number of documents in the store """ return self.docs.size()
[docs] def add(self, name, content): """Adds a document to the store. Args: name (str): Name of the document content (Content): Contents of the document Examples: >>> docs.add('example', File('example.pdf')) """ with content.data() as data: self.docs.add(name, data)
[docs] def clear(self): """Removes all documents from the store. """ self.docs.clear()
[docs] def get(self, name, stream=False, chunk_size=10240): """Gets a document from the store. Args: name (str): Name of the document stream (bool): If document should come in chunks or as a whole. Defaults to False chunk_size (int): Number of bytes to read per chunk when streaming. Defaults to 10240 Returns: str: If stream=False Returns: gen: If stream=True Examples: no streaming >>> contents = docs.get('example') streaming >>> with docs.get('example', stream=True) as stream: contents = ''.join(stream) """ doc = self.docs.get(name, stream, chunk_size) return _nextcontext(doc) if stream else next(doc)
[docs] def delete(self, name): """Deletes a document from the store. Args: name (str): Name of the document """ self.docs.delete(name)
[docs]class ICV(object): """Integrity Constraint Validation. See Also: https://www.stardog.com/docs/#_validating_constraints """
[docs] def __init__(self, conn): """Initializes an ICV. Use :meth:`stardog.connection.Connection.icv` instead of constructing manually. """ self.conn = conn self.icv = conn.conn.icv()
[docs] def add(self, content): """Adds integrity constraints to the database. Args: content (Content): Data to add Examples: >>> icv.add(File('constraints.ttl')) """ with content.data() as data: self.icv.add(data, content.content_type, content.content_encoding)
[docs] def remove(self, content): """Removes integrity constraints from the database. Args: content (Content): Data to remove Examples: >>> icv.remove(File('constraints.ttl')) """ with content.data() as data: self.icv.remove(data, content.content_type, content.content_encoding)
[docs] def clear(self): """Removes all integrity constraints from the database. """ self.icv.clear()
[docs] def is_valid(self, content, graph_uri=None): """Checks if given integrity constraints are valid. Args: content (Content): Data to check for validity graph_uri (str, optional): Named graph to check for validity Returns: bool: Integrity constraint validity Examples: >>> icv.is_valid(File('constraints.ttl'), graph_uri='urn:graph') """ with content.data() as data: return self.icv.is_valid(data, content.content_type, content.content_encoding, self.conn.transaction, graph_uri)
[docs] def explain_violations(self, content, graph_uri=None): """Explains violations of the given integrity constraints. Args: content (Content): Data to check for violations graph_uri (str, optional): Named graph from which to check for validations Returns: dict: Integrity constraint violations Examples: >>> icv.explain_violations(File('constraints.ttl'), graph_uri='urn:graph') """ with content.data() as data: return self.icv.explain_violations( data, content.content_type, content.content_encoding, self.conn.transaction, graph_uri)
[docs] def convert(self, content, graph_uri=None): """Converts given integrity constraints to a SPARQL query. Args: content (Content): Integrity constraints graph_uri (str, optional): Named graph from which to apply constraints Returns: str: SPARQL query Examples: >>> icv.convert(File('constraints.ttl'), graph_uri='urn:graph') """ with content.data() as data: return self.icv.convert(data, content.content_type, content.content_encoding, graph_uri)
[docs]class VCS(object): """Versioning See Also: https://www.stardog.com/docs/#_versioning """
[docs] def __init__(self, conn): """Initializes a VCS. Use :meth:`stardog.connection.Connection.versioning` instead of constructing manually. """ self.vcs = conn.conn.versioning() self.conn = conn
[docs] def select(self, query, content_type=content_types.SPARQL_JSON, **kwargs): """Executes a SPARQL select query over versioning history. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'application/sparql-results+json' Returns: dict: If content_type='application/sparql-results+json' Returns: str: Other content types Examples: >>> vcs.select('select distinct ?v {?v a vcs:Version}', offset=100, limit=100) bindings >>> vcs.select( 'select distinct ?v {?v a ?o}', bindings={'o': '<tag:stardog:api:versioning:Version>'}) """ return self.vcs.query(query, content_type, **kwargs)
[docs] def graph(self, query, content_type=content_types.TURTLE, **kwargs): """Executes a SPARQL graph query over versioning history. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'text/turtle' Returns: str: Results in format given by content_type Examples: >>> vcs.graph('construct {?s ?p ?o} where {?s ?p ?o}', offset=100, limit=100) """ return self.vcs.query(query, content_type, **kwargs)
[docs] def paths(self, query, content_type=content_types.SPARQL_JSON, **kwargs): """Executes a SPARQL paths query over versioning history. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values content_type (str): Content type for results. Defaults to 'application/sparql-results+json' Returns: dict: If content_type='application/sparql-results+json' Returns: str: Other content types Examples: >>> vcs.paths('paths start ?x = vcs:user:admin end' ' ?y = <http://www.w3.org/ns/prov#Person> via ?p')) """ return self.vcs.query(query, content_type, **kwargs)
[docs] def ask(self, query, **kwargs): """Executes a SPARQL ask query over versioning history. Args: query (str): SPARQL query base_uri (str, optional): Base URI for the parsing of the query limit (int, optional): Maximum number of results to return offset (int, optional): Offset into the result set timeout (int, optional): Number of ms after which the query should timeout. 0 or less implies no timeout reasoning (bool, optional): Enable reasoning for the query bindings (dict, optional): Map between query variables and their values Returns: bool: Result of ask query Examples: >>> vcs.ask('ask {?v a vcs:Version}') """ r = self.vcs.query(query, content_types.BOOLEAN, **kwargs) return bool(distutils.util.strtobool(r.decode()))
[docs] def commit(self, message): """Commits the current transaction into versioning. Args: message (str): Commit message Raises: stardog.exceptions.TransactionException If currently not in a transaction """ self.conn._assert_in_transaction() self.vcs.commit(self.conn.transaction, message)
[docs] def create_tag(self, revision, name): """Creates a new versioning tag. Args: revision (str): Revision ID name (str): Tag name Examples: >>> vcs.create_tag('02cf7ed8e511bb4d62421565b42fffcaf00f5012', 'v1.1') """ self.vcs.create_tag(revision, name)
[docs] def delete_tag(self, name): """Deletes a versioning tag. Args: name (str): Tag name Examples: >>> vcs.delete_tag('v1.1') """ self.vcs.delete_tag(name)
[docs] def revert(self, to_revision, from_revision, message): """Reverts the database to a specific revision. Args: to_revision (str): Begin revision ID from_revision (str): End revision ID message (str): Revert message Examples: >>> vcs.revert('02cf7ed8e511bb4d62421565b42fffcaf00f5012', '3c48bed3c1f9cfb056b4b0a755961a54d814f496', 'Log message') """ self.vcs.revert(to_revision, from_revision, message)
[docs]class GraphQL(object): """GraphQL See Also: https://www.stardog.com/docs/#_graphql_queries """
[docs] def __init__(self, conn): """Initializes a GraphQL. Use :meth:`stardog.connection.Connection.graphql` instead of constructing manually. """ self.gql = conn.conn.graphql() self.conn = conn
[docs] def query(self, query, variables=None): """Executes a GraphQL query. Args: query (str): GraphQL query variables (dict, optional): GraphQL variables. Keys: '@reasoning' (bool) to enable reasoning, '@schema' (str) to define schemas Returns: dict: Query results Examples: with schema and reasoning >>> gql.query('{ Person {name} }', variables={'@reasoning': True, '@schema': 'people'}) with named variables >>> gql.query( 'query getPerson($id: Integer) { Person(id: $id) {name} }', variables={'id': 1000}) """ return self.gql.query(query, variables)
[docs] def schemas(self): """Retrieves all available schemas. Returns: dict: All schemas """ return self.gql.schemas()
[docs] def clear_schemas(self): """Deletes all schemas. """ self.gql.clear_schemas()
[docs] def add_schema(self, name, content): """Adds a schema to the database. Args: name (str): Name of the schema content (Content): Schema data Examples: >>> gql.add_schema('people', content=File('people.graphql')) """ with content.data() as data: self.gql.add_schema(name, data)
[docs] def schema(self, name): """Gets schema information. Args: name (str): Name of the schema Returns: dict: GraphQL schema """ return self.gql.schema(name)
[docs] def remove_schema(self, name): """Removes a schema from the database. Args: name (str): Name of the schema """ return self.gql.remove_schema(name)
@contextlib.contextmanager def _nextcontext(r): yield next(r)