Source code for shaclapi.reduction.Reduction

import logging
from functools import reduce

from shaclapi.query import Query

logger = logging.getLogger(__name__)


[docs]class Reduction: def __init__(self, parser): self.parser = parser self.involvedShapesPerTarget = {}
[docs] def reduce_shape_network(self, shapes, target_shape_list): involvedShapes = set() for target_shape in target_shape_list: shapeIds = list(self.parser.graph_traversal.traverse_graph( *self.parser.computeReducedEdges(shapes), target_shape, True)) self.involvedShapesPerTarget[target_shape] = shapeIds involvedShapes = involvedShapes.union(shapeIds) logger.debug('Involved Shapes:' + str(self.involvedShapesPerTarget)) shapes = [s for s in shapes if self.parser.shape_get_id(s) in involvedShapes] return shapes
[docs] def replace_target_query(self, shapes, query, target_shapes, target_shape_list, merge_old_target_query, query_extension_per_target_shape): logger.info('Using Shape Schema WITH replaced target query!') if query_extension_per_target_shape is None: query_extension_per_target_shape = {} # Build target shape to variable mapping target_shapes_to_var = {} for var in target_shapes.keys(): for target_shape in target_shapes[var]: target_shapes_to_var[target_shape] = var # TODO: What is with a target shape occurring more then once? for s in shapes: s_id = self.parser.shape_get_id(s) if s_id in target_shape_list: # If there isn't a shape based on the target shape, reduce the target definition if len(target_shape_list) == 1 or s_id not in reduce(lambda a, b: a+b, [self.involvedShapesPerTarget[targetShape] for targetShape in target_shape_list if targetShape != s_id]): # The Shape already has a target query logger.debug(f'Reducing target definition of {s_id}') logger.debug('Original Query:\n' + query.query_string) if s.targetQuery and merge_old_target_query: logger.debug('Old TargetDef: \n' + s.targetQuery) oldTargetQuery = Query(s.targetQuery) targetQuery = query.intersect(target_shapes_to_var[s_id], oldTargetQuery) else: if '?x' not in query.query_string: new_query_string = query.query_string.replace(query.target_var, '?x') targetQuery = Query(new_query_string).as_target_query('?x') else: targetQuery = query.query_string def rreplace(s, old, new, occurrence): li = s.rsplit(old, occurrence) return new.join(li) if s_id in query_extension_per_target_shape: targetQuery = rreplace(targetQuery, '}', f'{query_extension_per_target_shape[s_id]}}}', 1) logger.debug(f'Extended targetQuery with query extension specified!') self.parser.replace_target_query(s, targetQuery) logger.debug('New TargetDef:\n' + targetQuery)
[docs] def node_order(self, target_shape_list): node_order = target_shape_list for target_shape in target_shape_list: node_order = node_order + list(self.involvedShapesPerTarget[target_shape]) unique_node_order = [] unique_nodes = set() for node in node_order: if node not in unique_nodes: unique_node_order.append(node) unique_nodes.add(node) logger.debug('Node Order estimated by the shaclapi: ' + str(unique_node_order)) return unique_node_order