import logging
import re
from functools import reduce
from SHACL2SPARQLpy.ShapeParser import ShapeParser
from shaclapi.reduction.Reduction import Reduction
logger = logging.getLogger(__name__)
re_https = re.compile("https?://")
# Note the internal structure of ShapeParser:
# parse_shapes_from_dir --> calls for each shape: parse_constraints (--> parse_constraint), shape_references; Afterwards we call computeReducedEdges to find the involvedShapeIDs.
[docs]class ReducedShapeParser(ShapeParser):
def __init__(self, query, graph_traversal, config):
super().__init__()
self.query = query
self.targetShapes = config.target_shape if isinstance(config.target_shape, dict) else {'UNDEF': [config.target_shape]}
self.targetShapeList = [shape for shape in reduce(lambda a, b: a + b, self.targetShapes.values()) if shape is not None]
self.currentShape = None
self.removed_constraints = {}
self.involvedShapeIDs = []
self.graph_traversal = graph_traversal
self.config = config
[docs] def parseShapesFromDir(self, path, shapeFormat, useSelectiveQueries, maxSplitSize, ORDERBYinQueries):
"""
Parses shapes from a directory. However, shapes are only relevant if they occur in the query or are
reachable from shapes occurring in the query. The remaining shapes can be removed.
"""
all_shapes = super().parseShapesFromDir(path, shapeFormat, useSelectiveQueries, maxSplitSize, ORDERBYinQueries)
reducer = Reduction(self)
# Step 1: Prune not reachable shapes
reduced_shapes = reducer.reduce_shape_network(all_shapes, self.targetShapeList)
if self.config.prune_shape_network:
shapes = reduced_shapes
else:
shapes = all_shapes
logger.warning('Shape Network is not pruned!')
logger.debug('Removed Constraints:' + str(self.removed_constraints))
# Step 2: Replace appropriate target queries
if self.query is not None and self.config.replace_target_query and 'UNDEF' not in self.targetShapes:
reducer.replace_target_query(shapes, self.query, self.targetShapes, self.targetShapeList, self.config.merge_old_target_query, self.config.query_extension_per_target_shape)
else:
logger.warning('Using Shape Schema WITHOUT replaced target query!')
if self.config.start_with_target_shape:
return shapes, reducer.node_order(self.targetShapeList), self.targetShapeList
else:
return shapes, None, self.targetShapeList
[docs] def replace_target_query(self, shape, query):
shape.targetQuery = shape.prefix_string + query
[docs] def shape_get_id(self, shape):
return shape.getId()
[docs] def parseConstraints(self, shapeName, array, targetDef, constraintsId):
self.currentShape = constraintsId[:-3]
self.removed_constraints[self.currentShape] = []
return [c for c in super().parseConstraints(shapeName, array, targetDef, constraintsId) if c]
[docs] def parse_constraints_ttl(self, array, target_def, constraints_id):
self.currentShape = '<' + constraints_id[:-3] + '>'
self.removed_constraints[self.currentShape] = []
return [c for c in super().parse_constraints_ttl(array, target_def, constraints_id) if c]
[docs] def parseConstraint(self, varGenerator, obj, id, targetDef):
"""
Constraints are only relevant if:
- subject and object do both NOT belong to the targetShape OR
- subject or object belong to the targetShape AND the predicate is part of the query
(-> inverted paths can be treated equally to normal paths)
Other constraints are not relevant and result in an empty list.
"""
if self.query is not None and self.config.remove_constraints and (self.currentShape in self.targetShapeList or obj.get('shape') in self.targetShapeList):
path = obj['path']
if str(path).startswith('^'):
path = path[1:]
is_inverse_path = True
else:
is_inverse_path = False
if re_https.match(path):
path = '<' + path + '>'
path = '^' + path if is_inverse_path else path
query_predicates = self.query.get_predicates(replace_prefixes=True, ignore_inv=False)
else:
query_predicates = self.query.get_predicates(replace_prefixes=False, ignore_inv=False)
if path in query_predicates:
return super().parseConstraint(varGenerator, obj, id, targetDef)
else:
self.removed_constraints[self.currentShape] += [obj.get('path')]
return []
return super().parseConstraint(varGenerator, obj, id, targetDef)
[docs] def shapeReferences(self, constraints):
"""
Constraints and references are parsed independently based on the input SHACL shape schema.
Constraints that are removed in parse_constraint() should not appear in the references.
self.removed_constraints keeps track of the removed constraints
shape_references is used to get the references in self.currentShape to other shapes.
It then returns ONE path of a constraint referencing to that shape (The other ones are ignored?!)
"""
return {c.get('shape'): c.get('path') for c in constraints
if c.get('shape') and c.get('path') not in self.removed_constraints[self.currentShape]}
[docs] def computeReducedEdges(self, shapes):
"""
Computes the edges in the network.
Returns unidirectional dependencies of the shapes in the network.
"""
dependencies = {s.getId(): [] for s in shapes}
reverse_dependencies = {s.getId(): [] for s in shapes}
for s in shapes:
refs = s.getShapeRefs()
if refs:
name = s.getId()
dependencies[name] = refs
return dependencies, reverse_dependencies