Source code for shaclapi.multiprocessing.PipeAdapter
from multiprocessing import Pipe
from queue import Empty
[docs]class ConnectionAdapter:
"""
The idea is to use a Connection Object with the functions of a Queue.
"""
def __init__(self, connection, is_sender) -> None:
self.is_sender = is_sender
self.connection = connection
self.closed = False
[docs] def put(self, item):
if item == 'EOF' and self.closed:
return
if self.is_sender and not self.closed:
self.connection.send(item)
if item == 'EOF':
self.connection.close()
self.closed = True
else:
raise Exception('Receiver is not allowed to send or Connection is closed!')
[docs] def get(self, block=True, timeout=None):
result = None
if not self.is_sender and not self.closed:
if block and timeout is None:
result = self.connection.recv()
elif block and timeout is not None:
if self.connection.poll(timeout):
result = self.connection.recv()
else:
raise Empty
elif not block and timeout is None:
if self.connection.poll():
result = self.connection.recv()
else:
raise Empty
elif not block and timeout is not None:
if self.connection.poll(timeout):
result = self.connection.recv()
else:
raise Empty
else:
raise Exception('Sender is not allowed to receive or Connection is closed!')
if result == 'EOF':
self.connection.close()
self.closed = True
return result
[docs]class PipeAdapter:
def __init__(self) -> None:
conn1, conn2 = Pipe()
self.sender = ConnectionAdapter(conn1, True)
self.receiver = ConnectionAdapter(conn2, False)
[docs]class QueueAdapter:
def __init__(self, context):
queue = context.Queue()
self.sender = queue
self.receiver = queue