Source code for shaclapi.multiprocessing.Xgoptional.Xgoptional

"""
Created on Jul 10, 2011

Implements the Xgoptional operator.
The intermediate results are represented as queues.

@author: Maribel Acosta Deibe
"""
import json
import logging
import signal
from multiprocessing import Queue
from os import remove
from queue import Empty
from tempfile import NamedTemporaryFile
from time import time

from .OperatorStructures import Record, RJTTail, FileDescriptor

logger = logging.getLogger(__name__)


[docs]class Xgoptional(): def __init__(self, vars_left, vars_right, memory_size): self.left_table = dict() self.right_table = dict() self.qresults = Queue() self.bag = set() # used to store results, which not have found a join partner, such that the result can be added in stage 3 self.vars_left = set(vars_left) self.vars_right = set(vars_right) self.vars = list(self.vars_left & self.vars_right) # Second stage settings self.secondStagesTS = [] self.lastSecondStageTS = float('-inf') self.timeoutSecondStage = 100000000 self.sourcesBlocked = False # Main memory settings self.memorySize = memory_size # represents the main memory size (# tuples) self.fileDescriptor_left = {} self.fileDescriptor_right = {} self.memory_left = 0 self.memory_right = 0 self.leftcount = 0 self.rightcount = 0
[docs] def add_to_bag(self, item): try: json_enc = json.dumps(item, sort_keys=True) self.bag.add(json_enc) except (KeyError, ValueError): pass except Exception as e: raise e
[docs] def remove_from_bag(self, item): try: json_enc = json.dumps(item, sort_keys=True) self.bag.remove(json_enc) except (KeyError, ValueError): pass except Exception as e: raise e
[docs] def iterate_bag(self): for item in self.bag: yield json.loads(item)
[docs] def instantiate(self, d): newvars_left = self.vars_left - set(d.keys()) newvars_right = self.vars_right - set(d.keys()) return Xgoptional(newvars_left, newvars_right, self.memorySize)
[docs] def instantiateFilter(self, instantiated_vars, filter_str): newvars_left = self.vars_left - set(instantiated_vars) newvars_right = self.vars_right - set(instantiated_vars) return Xgoptional(newvars_left, newvars_right, self.memorySize)
[docs] def execute(self, left, right, out, processqueue=Queue()): # Executes the Xgoptional. self.left = left self.right = right self.qresults = out # Initialize tuples. tuple1 = None tuple2 = None # Create alarm to go to stage 2. signal.signal(signal.SIGALRM, self.stage2) # Get the tuples from the queues. while not(tuple1 == 'EOF') or not(tuple2 == 'EOF'): # Try to get and process tuple from left queue. if not(tuple1 == 'EOF'): try: tuple1 = self.left.get(block=False) if not(tuple1 == 'EOF'): self.add_to_bag(tuple1) self.leftcount += 1 signal.alarm(self.timeoutSecondStage) self.stage1(tuple1, self.left_table, self.right_table, self.vars_right) self.memory_right += 1 # print('bag after stage 1:', self.bag) except Empty: # Empty: in tuple1 = self.left.get(False), when the queue is empty. pass except TypeError as te: logger.warning('TypeError: in resource = resource + tuple[var]' + str(tuple) + str(te)) # TypeError: in resource = resource + tuple[var], when the tuple is 'EOF'. pass except IOError: # IOError: when a tuple is received, but the alarm is fired. self.sourcesBlocked = False pass if not(tuple2 == 'EOF'): # Try to get and process tuple from right queue. try: tuple2 = self.right.get(block=False) self.rightcount += 1 signal.alarm(self.timeoutSecondStage) self.stage1(tuple2, self.right_table, self.left_table, self.vars_left) self.memory_left += 1 except Empty: # Empty: in tuple2 = self.right.get(False), when the queue is empty. pass except TypeError as te: logger.warning('TypeError: in resource = resource + tuple[var]' + str(tuple) + str(te)) # TypeError: in resource = resource + tuple[var], when the tuple is 'EOF'. pass except IOError: # IOError: when a tuple is received, but the alarm is fired. self.sourcesBlocked = False pass if len(self.left_table) + len(self.right_table) >= self.memorySize: self.flushRJT() # Turn off alarm to stage 2. signal.alarm(0) # print('Perform the last probes.') self.stage3()
[docs] def stage1(self, tuple, tuple_rjttable, other_rjttable, vars): # Stage 1: While one of the sources is sending data. # print('stage 1') # Get the resource associated to the tuples. if tuple != 'EOF': resource = '' for var in self.vars: val = tuple[var] if '^^<' in val: val = val[:val.find('^^<')] resource = resource + str(val) # print('probe') # Probe the tuple against its RJT table. probeTS = self.probe(tuple, resource, tuple_rjttable, vars) # print('creating record') # Create the records. record = Record(tuple, probeTS, time(), float('inf')) # print('Stage 1') # Insert the record in the other RJT table. # TODO: use RJTTail. Check ProbeTS if resource in other_rjttable: other_rjttable.get(resource).updateRecords(record) other_rjttable.get(resource).setRJTProbeTS(probeTS) else: tail = RJTTail(record, probeTS) other_rjttable[resource] = tail
[docs] def stage2(self, signum, frame): # print('Stage 2: When both sources become blocked.') self.sourcesBlocked = True # Get common resources. resources1 = set(self.left_table.keys()) & set(self.fileDescriptor_right.keys()) resources2 = set(self.right_table.keys()) & set(self.fileDescriptor_left.keys()) # Iterate while there are common resources and both sources are blocked. while (resources1 or resources2) and self.sourcesBlocked: if resources1: resource = resources1.pop() rjts1 = self.left_table[resource].records for rjt1 in rjts1: probed = self.probeFile(rjt1, self.fileDescriptor_right, resource, 2) if probed: rjt1.probeTS = time() elif resources2: resource = resources2.pop() rjts1 = self.right_table[resource].records for rjt1 in rjts1: probed = self.probeFile(rjt1, self.fileDescriptor_left, resource, 2) if probed: rjt1.probeTS = time() # End of second stage. self.lastSecondStageTS = time() self.secondStagesTS.append(self.lastSecondStageTS)
[docs] def stage3(self): # Stage 3: When both sources sent all the data. # print('stage 3') # This is the optional: Produce tuples that haven't matched already. # print('Length of data in xgoptional():', len(self.bag)) # RJTs in main (left) memory are probed against RJTs in secondary (right) memory. common_resources = set(self.left_table.keys()) & set(self.fileDescriptor_right.keys()) for resource in common_resources: rjts1 = self.left_table[resource].records for rjt1 in rjts1: self.probeFile(rjt1, self.fileDescriptor_right, resource, 3) # RJTs in main (right) memory are probed against RJTs in secondary (left) memory. common_resources = set(self.right_table.keys()) & set(self.fileDescriptor_left.keys()) for resource in common_resources: rjts1 = self.right_table[resource].records for rjt1 in rjts1: self.probeFile(rjt1, self.fileDescriptor_left, resource, 3) # RJTs in secondary memory are probed to produce new results. common_resources = set(self.fileDescriptor_left.keys()) & set(self.fileDescriptor_right.keys()) for resource in common_resources: file1 = open(self.fileDescriptor_right[resource].file.name) rjts1 = file1.readlines() for rjt1 in rjts1: (tuple1, probeTS1, insertTS1, flushTS1) = rjt1.split('|') self.probeFile(Record(eval(tuple1), float(probeTS1), float(insertTS1), float(flushTS1)), self.fileDescriptor_left, resource, 3) file1.close() for resource in common_resources: file1 = open(self.fileDescriptor_left[resource].file.name) rjts1 = file1.readlines() for rjt1 in rjts1: (tuple1, probeTS1, insertTS1, flushTS1) = rjt1.split('|') self.probeFile(Record(eval(tuple1), float(probeTS1), float(insertTS1), float(flushTS1)), self.fileDescriptor_right, resource, 3) file1.close() # Delete files from secondary memory. for resource in self.fileDescriptor_left: remove(self.fileDescriptor_left[resource].file.name) for resource in self.fileDescriptor_right: remove(self.fileDescriptor_right[resource].file.name) for tuple in self.iterate_bag(): vars_to_add = self.vars_right.difference(set(tuple.keys())) for var in vars_to_add: tuple.update({var: None}) self.qresults.put(tuple) # Put EOF in queue and exit. self.qresults.put('EOF') return
[docs] def probe(self, tuple, resource, rjttable, vars): probeTS = time() # If the resource is in table, produce results. try: if resource in rjttable: rjttable.get(resource).setRJTProbeTS(probeTS) list_records = rjttable[resource].records # Delete tuple from bag. self.remove_from_bag(tuple) for record in list_records: # print('record:', type(record.tuple), record.tuple) if isinstance(record.tuple, dict): res = record.tuple.copy() res.update(tuple) self.qresults.put(res) # Delete tuple from bag. self.remove_from_bag(record.tuple) except Exception: pass return probeTS
[docs] def probeFile(self, rjt1, filedescriptor2, resource, stage): # Probe an RJT against its corresponding partition in secondary memory. file2 = open(filedescriptor2[resource].file.name, 'r') rjts2 = file2.readlines() st = '' probed = False for rjt2 in rjts2: (tuple2, probeTS2, insertTS2, flushTS2) = rjt2.split('|') probedStage1 = False probedStage2 = False # Checking Property 2: Probed in stage 2. for ss in self.secondStagesTS: if float(flushTS2) < rjt1.insertTS < ss < rjt1.flushTS: probedStage2 = True break # Checking Property 1: Probed in stage 1. if rjt1.probeTS < float(flushTS2): probedStage1 = True # Produce result if it has not been produced. if not(probedStage1) and not probedStage2: res = rjt1.tuple.copy() tuple2_evaluated = eval(tuple2) res.update(tuple2_evaluated) self.qresults.put(res) # Delete tuple from bag. self.remove_from_bag(rjt1.tuple) self.remove_from_bag(tuple2_evaluated) probed = True # Update probeTS of tuple2. stprobeTS = '%.40r' % (time()) st = st + tuple2 + '|' + stprobeTS + '|' + insertTS2 + '|' + flushTS2 file2.close() # Update file2 if in stage 2. if (stage == 2) and probed: file2 = open(filedescriptor2[resource].file.name, 'w') file2.write(st) file2.close() return probed
[docs] def flushRJT(self): # Flush an RJT to secondary memory. # Choose a victim from each partition (table). (resource_to_flush1, tail_to_flush1, least_ts1) = self.getVictim(self.left_table) (resource_to_flush2, tail_to_flush2, least_ts2) = self.getVictim(self.right_table) # Flush resource from left table. if least_ts1 <= least_ts2: file_descriptor = self.fileDescriptor_left table = self.left_table resource_to_flush = resource_to_flush1 tail_to_flush = tail_to_flush1 # Flush resource from right table. if least_ts2 < least_ts1: file_descriptor = self.fileDescriptor_right table = self.right_table resource_to_flush = resource_to_flush2 tail_to_flush = tail_to_flush2 # Create flush timestamp. flushTS = time() # Update file descriptor if resource_to_flush in file_descriptor: lentail = file_descriptor[resource_to_flush].size file = open(file_descriptor[resource_to_flush].file.name, 'a') file_descriptor.update({resource_to_flush: FileDescriptor(file, len(tail_to_flush.records) + lentail, flushTS)}) else: file = NamedTemporaryFile(mode='w+', suffix='.rjt', prefix='', delete=False) file_descriptor.update({resource_to_flush: FileDescriptor(file, len(tail_to_flush.records), flushTS)}) # Flush tail in file. for record in tail_to_flush.records: sttuple = str(record.tuple) stprobeTS = '%.40r' % (record.probeTS) stinsertTS = '%.40r' % (record.insertTS) stflushTS = '%.40r' % (flushTS) file.write(sttuple + '|') file.write(stprobeTS + '|') file.write(stinsertTS + '|') file.write(stflushTS + '\n') file.close() # Delete resource from main memory. del table[resource_to_flush]
[docs] def getVictim(self, table): # Selects a victim from a partition in main memory to flush. resource_to_flush = '' tail_to_flush = RJTTail([], 0) least_ts = float('inf') for resource, tail in table.items(): resource_ts = tail.rjtProbeTS if (resource_ts < least_ts) or (resource_ts == least_ts and len(tail.records) > len(tail_to_flush.records)): resource_to_flush = resource tail_to_flush = tail least_ts = resource_ts # print('Victim chosen:', resource_to_flush, 'TS:', least_ts, 'LEN:', len(tail_to_flush.records)) return (resource_to_flush, tail_to_flush, least_ts)
[docs] def getLargestRJTs(self, i): # Selects the i-th largest RJT stored in secondary memory. sizes1 = set(map(FileDescriptor.getSize, self.fileDescriptor_left.values())) sizes2 = set(map(FileDescriptor.getSize, self.fileDescriptor_right.values())) sizes1 = list(sizes1) sizes2 = list(sizes2) sizes1.sort() sizes2.sort() if sizes1 and sizes2: if sizes1[len(sizes1) - 1] > sizes2[len(sizes2) - 1]: file_descriptor = self.fileDescriptor_left max_len = sizes1[len(sizes1)-(i+1)] table = self.right_table else: file_descriptor = self.fileDescriptor_right max_len = sizes2[len(sizes2)-(i+1)] table = self.left_table elif sizes1: file_descriptor = self.fileDescriptor_left max_len = sizes1[len(sizes1)-(i+1)] table = self.right_table else: file_descriptor = self.fileDescriptor_right max_len = sizes2[len(sizes2)-(i+1)] table = self.left_table largestRJTs = {} for resource, fd in file_descriptor.items(): if fd.size == max_len: largestRJTs[resource] = fd return (largestRJTs, table)