Source code for shaclapi.multiprocessing.PipeAdapter

from multiprocessing import Pipe
from queue import Empty


[docs]class ConnectionAdapter: """ The idea is to use a Connection Object with the functions of a Queue. """ def __init__(self, connection, is_sender) -> None: self.is_sender = is_sender self.connection = connection self.closed = False
[docs] def put(self, item): if item == 'EOF' and self.closed: return if self.is_sender and not self.closed: self.connection.send(item) if item == 'EOF': self.connection.close() self.closed = True else: raise Exception('Receiver is not allowed to send or Connection is closed!')
[docs] def get(self, block=True, timeout=None): result = None if not self.is_sender and not self.closed: if block and timeout is None: result = self.connection.recv() elif block and timeout is not None: if self.connection.poll(timeout): result = self.connection.recv() else: raise Empty elif not block and timeout is None: if self.connection.poll(): result = self.connection.recv() else: raise Empty elif not block and timeout is not None: if self.connection.poll(timeout): result = self.connection.recv() else: raise Empty else: raise Exception('Sender is not allowed to receive or Connection is closed!') if result == 'EOF': self.connection.close() self.closed = True return result
[docs]class PipeAdapter: def __init__(self) -> None: conn1, conn2 = Pipe() self.sender = ConnectionAdapter(conn1, True) self.receiver = ConnectionAdapter(conn2, False)
[docs]class QueueAdapter: def __init__(self, context): queue = context.Queue() self.sender = queue self.receiver = queue