Source code for shaclapi.multiprocessing.Xgjoin.Xgjoin

"""
Created on Jul 10, 2011
Implements the Xgjoin operator.
The intermediate results are represented as a queue.
@author: Maribel Acosta Deibe
"""

import logging
import signal
from multiprocessing import Queue
from os import remove
from queue import Empty
from tempfile import NamedTemporaryFile
from time import time

from .OperatorStructures import Record, RJTTail, FileDescriptor

logger = logging.getLogger(__name__)


[docs]class Xgjoin(): def __init__(self, vars, memory_size): self.left_table = dict() self.right_table = dict() self.qresults = Queue() self.vars = vars # Second stage settings self.secondStagesTS = [] self.lastSecondStageTS = float('-inf') self.timeoutSecondStage = 100000000 self.sourcesBlocked = False # Main memory settings self.memorySize = memory_size # represents the main memory size (# tuples) self.fileDescriptor_left = {} self.fileDescriptor_right = {} self.memory_left = 0 self.memory_right = 0 self.leftcount = 0 self.rightcount = 0
[docs] def instantiate(self, d): newvars = self.vars - set(d.keys()) return Xgjoin(newvars, self.memorySize)
[docs] def instantiateFilter(self, instantiated_vars, filter_str): newvars = self.vars - set(instantiated_vars) return Xgjoin(newvars, self.memorySize)
[docs] def execute(self, left, right, out, processqueue=Queue()): # Executes the Xgjoin. self.left = left self.right = right self.qresults = out # Initialize tuples. tuple1 = None tuple2 = None # Create alarm to go to stage 2. signal.signal(signal.SIGALRM, self.stage2) # Get the tuples from the queues. while tuple1 != 'EOF' or tuple2 != 'EOF': # Try to get and process tuple from left queue. if tuple1 != 'EOF': try: tuple1 = self.left.get(block=False) # print('tuple1', tuple1) self.leftcount += 1 signal.alarm(self.timeoutSecondStage) self.stage1(tuple1, self.left_table, self.right_table) self.memory_right += 1 except Empty: # Empty: in tuple1 = self.left.get(False), when the queue is empty. pass except TypeError as te: logger.warning('TypeError: in resource = resource + tuple[var]' + str(tuple) + str(te)) # TypeError: in resource = resource + tuple[var], when the tuple is 'EOF'. pass except IOError: # IOError: when a tuple is received, but the alarm is fired. self.sourcesBlocked = False pass # Try to get and process tuple from right queue. if tuple2 != 'EOF': try: tuple2 = self.right.get(block=False) # print('tuple2', tuple2) self.rightcount += 1 signal.alarm(self.timeoutSecondStage) self.stage1(tuple2, self.right_table, self.left_table) self.memory_left += 1 except Empty: # Empty: in tuple2 = self.right.get(False), when the queue is empty. pass except TypeError as te: logger.warning('TypeError: in resource = resource + tuple[var]' + str(tuple) + str(te)) # TypeError: in resource = resource + tuple[var], when the tuple is 'EOF'. pass except IOError: # IOError: when a tuple is received, but the alarm is fired. self.sourcesBlocked = False pass if len(self.left_table) + len(self.right_table) >= self.memorySize: self.flushRJT() # Turn off alarm to stage 2. signal.alarm(0) # Perform the last probes. self.stage3() return
[docs] def stage1(self, tuple, tuple_rjttable, other_rjttable): # print('Stage 1: While one of the sources is sending data.') if tuple != 'EOF': # Get the resource associated to the tuples. resource = '' # print(tuple) for var in self.vars: if var in tuple: val = tuple[var] if '^^<' in val: val = val[:val.find('^^<')] resource = resource + str(val) # Probe the tuple against its RJT table. probeTS = self.probe(tuple, resource, tuple_rjttable) # Create the records. record = Record(tuple, probeTS, time(), float('inf')) # Insert the record in the other RJT table. if resource in other_rjttable: other_rjttable.get(resource).updateRecords(record) other_rjttable.get(resource).setRJTProbeTS(probeTS) else: tail = RJTTail(record, probeTS) other_rjttable[resource] = tail
[docs] def stage2(self, signum, frame): # print('Stage 2: When both sources become blocked.') self.sourcesBlocked = True # Get common resources. resources1 = set(self.left_table.keys()) & set(self.fileDescriptor_right.keys()) resources2 = set(self.right_table.keys()) & set(self.fileDescriptor_left.keys()) # Iterate while there are common resources and both sources are blocked. while (resources1 or resources2) and self.sourcesBlocked: if resources1: resource = resources1.pop() rjts1 = self.left_table[resource].records for rjt1 in rjts1: probed = self.probeFile(rjt1, self.fileDescriptor_right, resource, 2) if probed: rjt1.probeTS = time() elif resources2: resource = resources2.pop() rjts1 = self.right_table[resource].records for rjt1 in rjts1: probed = self.probeFile(rjt1, self.fileDescriptor_left, resource, 2) if probed: rjt1.probeTS = time() # End of second stage. self.lastSecondStageTS = time() self.secondStagesTS.append(self.lastSecondStageTS)
[docs] def stage3(self): # print('Stage 3: When both sources sent all the data.') # RJTs in main (left) memory are probed against RJTs in secondary (right) memory. common_resources = set(self.left_table.keys()) & set(self.fileDescriptor_right.keys()) for resource in common_resources: rjts1 = self.left_table[resource].records for rjt1 in rjts1: self.probeFile(rjt1, self.fileDescriptor_right, resource, 3) # RJTs in main (right) memory are probed against RJTs in secondary (left) memory. common_resources = set(self.right_table.keys()) & set(self.fileDescriptor_left.keys()) for resource in common_resources: rjts1 = self.right_table[resource].records for rjt1 in rjts1: self.probeFile(rjt1, self.fileDescriptor_left, resource, 3) # RJTs in secondary memory are probed to produce new results. common_resources = set(self.fileDescriptor_left.keys()) & set(self.fileDescriptor_right.keys()) for resource in common_resources: file1 = open(self.fileDescriptor_right[resource].file.name) rjts1 = file1.readlines() for rjt1 in rjts1: (tuple1, probeTS1, insertTS1, flushTS1) = rjt1.split('|') self.probeFile(Record(eval(tuple1), float(probeTS1), float(insertTS1), float(flushTS1)), self.fileDescriptor_left, resource, 3) file1.close() for resource in common_resources: file1 = open(self.fileDescriptor_left[resource].file.name) rjts1 = file1.readlines() for rjt1 in rjts1: (tuple1, probeTS1, insertTS1, flushTS1) = rjt1.split('|') self.probeFile(Record(eval(tuple1), float(probeTS1), float(insertTS1), float(flushTS1)), self.fileDescriptor_right, resource, 3) file1.close() # Delete files from secondary memory. for resource in self.fileDescriptor_left: remove(self.fileDescriptor_left[resource].file.name) for resource in self.fileDescriptor_right: remove(self.fileDescriptor_right[resource].file.name)
[docs] def probe(self, tuple, resource, rjttable): # Probe a tuple against its corresponding table. probeTS = time() # If the resource is in table, produce results. if resource in rjttable: rjttable.get(resource).setRJTProbeTS(probeTS) list_records = rjttable[resource].records for record in list_records: res = {} res.update(record.tuple) #res = record.tuple.copy() res.update(tuple) self.qresults.put(res) #print hex(id(self)), 'res:', res return probeTS
[docs] def probeFile(self, rjt1, filedescriptor2, resource, stage): # Probe an RJT against its corresponding partition in secondary memory. file2 = open(filedescriptor2[resource].file.name, 'r') rjts2 = file2.readlines() st = '' probed = False for rjt2 in rjts2: (tuple2, probeTS2, insertTS2, flushTS2) = rjt2.split('|') probedStage1 = False probedStage2 = False # Checking Property 2: Probed in stage 2. for ss in self.secondStagesTS: if float(flushTS2) < rjt1.insertTS < ss < rjt1.flushTS: probedStage2 = True break # Checking Property 1: Probed in stage 1. if rjt1.probeTS < float(flushTS2): probedStage1 = True # Produce result if it has not been produced. if not probedStage1 and not probedStage2: res = rjt1.tuple.copy() res.update(eval(tuple2)) self.qresults.put(res) probed = True # Update probeTS of tuple2. stprobeTS = '%.40r' % (time()) st = st + tuple2 + '|' + stprobeTS + '|' + insertTS2 + '|' + flushTS2 file2.close() # Update file2 if in stage 2. if (stage == 2) and probed: file2 = open(filedescriptor2[resource].file.name, 'w') file2.write(st) file2.close() return probed
[docs] def flushRJT(self): # Flush an RJT to secondary memory. # Choose a victim from each partition (table). (resource_to_flush1, tail_to_flush1, least_ts1) = self.getVictim(self.left_table) (resource_to_flush2, tail_to_flush2, least_ts2) = self.getVictim(self.right_table) # Flush resource from left table. if least_ts1 <= least_ts2: file_descriptor = self.fileDescriptor_left table = self.left_table resource_to_flush = resource_to_flush1 tail_to_flush = tail_to_flush1 # Flush resource from right table. if least_ts2 < least_ts1: file_descriptor = self.fileDescriptor_right table = self.right_table resource_to_flush = resource_to_flush2 tail_to_flush = tail_to_flush2 # Create flush timestamp. flushTS = time() # Update file descriptor if resource_to_flush in file_descriptor: lentail = file_descriptor[resource_to_flush].size file = open(file_descriptor[resource_to_flush].file.name, 'a') file_descriptor.update({resource_to_flush: FileDescriptor(file, len(tail_to_flush.records) + lentail, flushTS)}) else: file = NamedTemporaryFile(suffix='.rjt', prefix='', delete=False) file_descriptor.update({resource_to_flush: FileDescriptor(file, len(tail_to_flush.records), flushTS)}) # Flush tail in file. for record in tail_to_flush.records: sttuple = str(record.tuple) stprobeTS = '%.40r' % (record.probeTS) stinsertTS = '%.40r' % (record.insertTS) stflushTS = '%.40r' % (flushTS) file.write(sttuple + '|') file.write(stprobeTS + '|') file.write(stinsertTS + '|') file.write(stflushTS + '\n') file.close() # Delete resource from main memory. del table[resource_to_flush]
[docs] def getVictim(self, table): # Selects a victim from a partition in main memory to flush. resource_to_flush = '' tail_to_flush = RJTTail([], 0) least_ts = float('inf') for resource, tail in table.items(): resource_ts = tail.rjtProbeTS if (resource_ts < least_ts) or (resource_ts == least_ts and len(tail.records) > len(tail_to_flush.records)): resource_to_flush = resource tail_to_flush = tail least_ts = resource_ts # print('Victim chosen:', resource_to_flush, 'TS:', least_ts, 'LEN:', len(tail_to_flush.records)) return (resource_to_flush, tail_to_flush, least_ts)
[docs] def getLargestRJTs(self, i): # Selects the i-th largest RJT stored in secondary memory. sizes1 = set(map(FileDescriptor.getSize, self.fileDescriptor_left.values())) sizes2 = set(map(FileDescriptor.getSize, self.fileDescriptor_right.values())) sizes1 = list(sizes1) sizes2 = list(sizes2) sizes1.sort() sizes2.sort() if sizes1 and sizes2: if sizes1[len(sizes1) - 1] > sizes2[len(sizes2) - 1]: file_descriptor = self.fileDescriptor_left max_len = sizes1[len(sizes1)-(i+1)] table = self.right_table else: file_descriptor = self.fileDescriptor_right max_len = sizes2[len(sizes2)-(i+1)] table = self.left_table elif sizes1: file_descriptor = self.fileDescriptor_left max_len = sizes1[len(sizes1)-(i+1)] table = self.right_table else: file_descriptor = self.fileDescriptor_right max_len = sizes2[len(sizes2)-(i+1)] table = self.left_table largestRJTs = {} for resource, fd in file_descriptor.items(): if fd.size == max_len: largestRJTs[resource] = fd return (largestRJTs, table)