Source code for shaclapi.multiprocessing.runner

import atexit
import logging
import multiprocessing as mp
import time

from shaclapi.multiprocessing.PipeAdapter import PipeAdapter, QueueAdapter
from shaclapi.query import Query

logger = logging.getLogger(__name__)


[docs]class Runner: """ A runner object is associated with a multiprocessing.Process which is started with start_process and runs until stop_process is called. Additionally, a runner has a function f assigned, which is executed each time when new_task is called. f needs to have to following parameters: - FIRST: in_queues (multiprocessing.Queue) - SECOND: out_queues (number_of_out_queues specified in constructor of Runner) - FINALLY: variable number of parameters needed for the task (These which also needed to be passed to new_task) """ def __init__(self, function, number_of_out_queues=1): self.context = mp.get_context('spawn') self.manager = mp.Manager() self.function = function self.number_of_out_queues = number_of_out_queues self.process = None self.task_queue = None self.process_running = False
[docs] def start_process(self): self.task_queue = self.context.Queue() self.process = mp.Process(target=mp_function, args=(self.task_queue, self.function), name=self.function.__name__) self.process.start() self.process_running = True logger.info('Process {} started!'.format(self.function.__name__)) atexit.register(self.stop_process)
[docs] def stop_process(self): if self.process and self.process_running: atexit.unregister(self.stop_process) self.task_queue.close() self.process.terminate() self.process = None self.process_running = False logger.info('Process {} stopped!'.format(self.function.__name__))
[docs] def get_new_queue(self): return self.manager.Queue()
[docs] def get_new_out_queues(self, use_pipes): out_queues = [] for _ in range(self.number_of_out_queues): if use_pipes: out_queues += [PipeAdapter()] else: out_queues += [QueueAdapter(self.manager)] out_queues = tuple(out_queues) return out_queues
[docs] def new_task(self, in_queues, out_queues, task_description, runner_stats_out_queue, wait_for_finish=False): if self.process and self.process_running: if wait_for_finish: task_finished_recv, task_finished_send = self.context.Pipe() self.task_queue.put((in_queues, out_queues, runner_stats_out_queue, task_description, task_finished_send)) result = task_finished_recv.recv() task_finished_send.close() task_finished_recv.close() return result else: self.task_queue.put((in_queues, out_queues, runner_stats_out_queue, task_description, None)) else: raise Exception('Start processes before using /multiprocessing')
[docs]def mp_function(task_in_queue, function): speed_up_query = Query.prepare_query('PREFIX test1:<http://example.org/testGraph1#>\nSELECT DISTINCT ?x WHERE {\n?x a test1:classE.\n?x test1:has ?lit.\n}') speed_up_query.namespace_manager.namespaces() try: active_task = task_in_queue.get() while active_task != 'EOF': in_queues, out_queues, runner_stats_out_queue, task_description, task_finished_send = active_task # Now one can use logging as normal logger.info(function.__name__ + ' received task!') start_timestamp = time.time() try: function(*in_queues, *out_queues, *task_description) except Exception as e: runner_stats_out_queue.put({'topic': 'Exception', 'location': function.__name__}) logger.exception(e) finally: for queue in out_queues: queue.put('EOF') # Writing EOF here allows global error handling finished_timestamp = time.time() runner_stats_out_queue.put({'topic': function.__name__, 'time': (start_timestamp, finished_timestamp)}) logger.info(function.__name__ + ' finished task; waiting for next one!') if task_finished_send: task_finished_send.send('Done') active_task = task_in_queue.get() except KeyboardInterrupt: pass