Source code for shaclapi.reduction.travshacl.ReducedShapeParser

import logging
import re
from functools import reduce

import rdflib
from TravSHACL.core.ShapeParser import ShapeParser

from shaclapi.reduction.Reduction import Reduction

logger = logging.getLogger(__name__)
re_https = re.compile("https?://")


# Note the internal structure of ShapeParser:
# parse_shapes_from_dir --> calls for each shape: parse_constraints (--> parse_constraint), shape_references; Afterwards we call computeReducedEdges to find the involvedShapeIDs.
[docs]class ReducedShapeParser(ShapeParser): def __init__(self, query, graph_traversal, config): super().__init__() self.query = query self.targetShapes = config.target_shape if isinstance(config.target_shape, dict) else {'UNDEF': [config.target_shape]} self.targetShapeList = [shape for shape in reduce(lambda a, b: a + b, self.targetShapes.values()) if shape is not None] self.currentShape = None self.removed_constraints = {} self.involvedShapesPerTarget = {} self.graph_traversal = graph_traversal self.config = config
[docs] def parse_shapes(self, path, shapeFormat, useSelectiveQueries, maxSplitSize, ORDERBYinQueries): """ Parses shapes from a directory or RDFlib graph. However, shapes are only relevant if they occur in the query or are reachable from shapes occurring in the query. The remaining shapes can be removed. """ if isinstance(path, rdflib.Graph): all_shapes = super().parse_ttl(path, useSelectiveQueries, maxSplitSize, ORDERBYinQueries) else: all_shapes = super().parse_shapes_from_dir(path, shapeFormat, useSelectiveQueries, maxSplitSize, ORDERBYinQueries) reducer = Reduction(self) # Step 1: Prune not reachable shapes reduced_shapes = reducer.reduce_shape_network(all_shapes, self.targetShapeList) if self.config.prune_shape_network: shapes = reduced_shapes else: shapes = all_shapes logger.warning('Shape Network is not pruned!') logger.debug('Removed Constraints:' + str(self.removed_constraints)) # Step 2: Replace appropriate target queries if self.query is not None and self.config.replace_target_query and 'UNDEF' not in self.targetShapes: reducer.replace_target_query(shapes, self.query, self.targetShapes, self.targetShapeList, self.config.merge_old_target_query, self.config.query_extension_per_target_shape) else: logger.warning('Using Shape Schema WITHOUT replaced target query!') if self.config.start_with_target_shape: return shapes, reducer.node_order(self.targetShapeList), self.targetShapeList else: return shapes, None, self.targetShapeList
[docs] def replace_target_query(self, shape, query): shape.targetQuery = shape.get_prefix_string() + query shape.targetQueryNoPref = query shape._Shape__compute_target_queries()
[docs] def shape_get_id(self, shape): return shape.get_id()
[docs] def parse_constraints(self, array, targetDef, constraintsId): self.currentShape = constraintsId[:-3] self.removed_constraints[self.currentShape] = [] return [c for c in super().parse_constraints(array, targetDef, constraintsId) if c]
[docs] def parse_constraints_ttl(self, array, target_def, constraints_id): self.currentShape = '<' + constraints_id[:-3] + '>' self.removed_constraints[self.currentShape] = [] return [c for c in super().parse_constraints_ttl(array, target_def, constraints_id) if c]
[docs] def parse_constraint(self, varGenerator, obj, id, targetDef, options=None): """ Constraints are only relevant if: - subject and object do both NOT belong to the targetShape OR - subject or object belong to the targetShape AND the predicate is part of the query (-> inverted paths can be treated equally to normal paths) Other constraints are not relevant and result in an empty list. """ if self.query is not None and self.config.remove_constraints and (self.currentShape in self.targetShapeList or obj.get('shape') in self.targetShapeList): path = obj.get('path') if path is not None and str(path).startswith('^'): is_inverse_path = True path = str(path)[1:] else: is_inverse_path = False if path is not None and re_https.match(path): path = '<' + path + '>' path = '^' + path if is_inverse_path else path query_predicates = self.query.get_predicates(replace_prefixes=True, ignore_inv=False) else: query_predicates = self.query.get_predicates(replace_prefixes=False, ignore_inv=False) if path is None or path in query_predicates: if path is None: if not options: return [] elif len(options) == 1: return options return super().parse_constraint(varGenerator, obj, id, targetDef, options) else: self.removed_constraints[self.currentShape] += [obj.get('path')] return [] return super().parse_constraint(varGenerator, obj, id, targetDef, options)
[docs] def shape_references(self, constraints): """ Constraints and references are parsed independently based on the input SHACL shape schema. Constraints that are removed in parse_constraint() should not appear in the references. self.removed_constraints keeps track of the removed constraints shape_references is used to get the references in self.currentShape to other shapes. It then returns ONE path of a constraint referencing to that shape (The other ones are ignored?!) """ return {c.get('shape'): c.get('path') for c in constraints if c.get('shape') and c.get('path') not in self.removed_constraints[self.currentShape]}
[docs] def computeReducedEdges(self, shapes): """ Computes the edges in the network. Returns unidirectional dependencies with a single exception: Reversed dependencies are included, if they aim at the targetShape. """ dependencies = {s.get_id(): [] for s in shapes} reverse_dependencies = {s.get_id(): [] for s in shapes} for s in shapes: refs = s.get_shape_refs() if refs: name = s.get_id() dependencies[name] = refs return dependencies, reverse_dependencies