Source code for shaclapi.multiprocessing.functions

import logging
import time
from enum import IntEnum
from functools import reduce

from rdflib import Namespace, URIRef

from shaclapi.multiprocessing.Xgoptional.Xgoptional import Xgoptional
from shaclapi.reduction import prepare_validation
from shaclapi.triple import TripleE

[docs]class ValReport(IntEnum): SHAPE = 0 IS_VALID = 1 REASON = 2
logger = logging.getLogger(__name__)
[docs]def mp_post_processing(joined_result_queue, output_queue, timestamp_queue, variables, target_shape, target_var, collect_all_results = False): """ The post-processing collects all the bindings belonging to a SPARQL result mapping. Example: Joined Result Queue (input): {'instance': '', 'validation': ('ShapeA', True, 'unbound'), 'var': 'x', 'id': 0},... {'instance': 'literal', 'validation': None, 'var': 'lit', 'id': 0} <-- without validation result, produced by the xgoptional. Output: [{'instance': '', 'validation': ('ShapeA', True, 'unbound'), 'var': 'x'}, {'var': 'lit', 'instance': 'literal'}], ... """ if 'UNDEF' in target_shape and not collect_all_results: collect_all_results = True logger.warning('Running in blocking mode as the target variable could not be identified!') table = {} finished_set = set() item = joined_result_queue.get() while item != 'EOF': item_id = item['id'] del item['id'] if item_id in finished_set: logger.debug('Received a mapping from an already finished result {}'.format(item)) item = joined_result_queue.get() continue # Initialize Hashtable Entry if necessary if item_id not in table: table[item_id] = {'result': [], 'need': variables.copy()} # TODO: Deal with multiple targets for one variable try: if collect_all_results: # Collect all results table[item_id]['result'].append(item) else: # If only the required validation result per assignment are collected, the validation result can be # - None (produced by XGoptional) # - a binding not occuring in the target_shape mapping # - a binding with a validation result matching the target_shape binding_var = '?' + item['var'] if item['validation'] is None or (binding_var not in target_shape.keys() or item['validation'][0] in target_shape[binding_var]): table[item_id]['need'].remove('?' + item['var']) table[item_id]['result'].append(item) logger.debug(f'New Mapping matching target shape: {item}') else: table[item_id]['result'].append(item) logger.debug(f'New Mapping not matching target shape: {item}') except ValueError: logger.debug('Received a duplicate mapping from xgoptional {} --> {}'.format(item, table[item_id])) item = joined_result_queue.get() continue # If the Hashtable Entry is complete put it into the output queue; remove it from the Hashtable and add the id to the finished list if len(table[item_id]['need']) == 0 and not collect_all_results: final_result_item = table[item_id] del table[item_id] finished_set.add(item_id) output_queue.put({'result': final_result_item['result']}) timestamp_queue.put({'timestamp': time.time()}) logger.debug('Finished Result {}'.format(final_result_item['result'])) item = joined_result_queue.get() if collect_all_results: for mapping in table.values(): output_queue.put({'result': mapping['result']}) timestamp_queue.put({'timestamp': time.time()}) logger.debug('Finished Result {}'.format(mapping['result']))
[docs]def mp_validate(out_queue, config, query, result_transmitter): """ Function to be executed with Runner to run the validation process of the backend. """ schema = prepare_validation(config, query, result_transmitter) _ = schema.validate(config.start_with_target_shape) # Validate Schema --> validation results will be put into the out_queue during validation
[docs]def mp_xjoin(left, right, out_queue, config): """ Function to be executed with Runner to join the instances of the left with the right queue. """ join_instance = Xgoptional(['var', 'instance', 'id'], ['instance', 'validation'], config.memory_size) join_instance.execute(left, right, out_queue)
[docs]def mp_output_completion(input_queue, output_queue, query, target_shape, is_test_output=False): target_shape_list = reduce(lambda a, b: a + b, target_shape.values()) t_path = Namespace('//travshacl_path#') query.namespace_manager.bind('ts', t_path) t_path_valid = t_path['satisfiesShape'].n3(query.namespace_manager) t_path_invalid = t_path['violatesShape'].n3(query.namespace_manager) query_triples = query.get_triples(replace_prefixes=False) test_output = {'validTargets': set(), 'invalidTargets': set(), 'advancedValid': set(), 'advancedInvalid': set()} result = input_queue.get() while result != 'EOF': logger.debug('Result:' + str(result)) query_result = result['result'] if not is_test_output: # Create Bindings binding = {} filtered_bindings = {} for b in query_result: try: instance = URIRef(b['instance']).n3(query.namespace_manager) except: instance = b['instance'] binding['?' + b['var']] = instance if '?' + b['var'] in query.PV: filtered_bindings['?' + b['var']] = instance triples = [ (binding[t[TripleE.SUBJECT]], t[TripleE.PREDICATE], binding.get(t[TripleE.OBJECT]) or t[TripleE.OBJECT]) for t in query_triples if t[TripleE.SUBJECT] in binding ] report_triples = [ ( URIRef(b['instance']).n3(query.namespace_manager), (t_path_valid if b['validation'][1] else t_path_invalid), b['validation'][0] ) for b in query_result if 'validation' in b and b['validation'] ] logger.debug('Report Triples:' + str(report_triples)) output_queue.put((filtered_bindings, triples, report_triples)) else: for binding in query_result: if 'validation' in binding: if binding['validation']: if binding['validation'][ValReport.SHAPE] in target_shape_list: if binding['validation'][ValReport.IS_VALID]: test_output['validTargets'].add((binding['instance'], binding['validation'][ValReport.REASON])) else: test_output['invalidTargets'].add((binding['instance'], binding['validation'][ValReport.REASON])) else: if binding['validation'][ValReport.IS_VALID]: test_output['advancedValid'].add((binding['instance'], binding['validation'][ValReport.REASON])) else: test_output['advancedInvalid'].add((binding['instance'], binding['validation'][ValReport.REASON])) result = input_queue.get() if is_test_output: test_output['validTargets'] = list(test_output['validTargets']) test_output['invalidTargets'] = list(test_output['invalidTargets']) test_output['advancedValid'] = list(test_output['advancedValid']) test_output['advancedInvalid'] = list(test_output['advancedInvalid']) output_queue.put(test_output)