Source code for shaclapi.statsCalculation

import csv
import os
import time


[docs]class StatsCalculation: def __init__(self, test_identifier, approach_name): self.test_name = test_identifier self.approach_name = approach_name self.first_result_timestamp = None self.last_result_timestamp = None self.validation_started_time = None self.validation_finished_time = None self.query_started_time = None self.query_finished_time = None self.join_started_time = None self.join_finished_time = None self.post_processing_started_time = None self.post_processing_finished_time = None self.first_validation_result_time = None self.number_of_results = 'Not Calculated' self.global_start_time = None self.global_end_time = None self.task_start_time = None
[docs] def globalCalculationStart(self): self.global_start_time = time.time()
[docs] def globalCalculationFinished(self): self.global_end_time = time.time()
[docs] def taskCalculationStart(self): self.task_start_time = time.time()
[docs] def receive_and_write_trace(self, trace_file, timestamp_queue): """ This assigns the timestamp of the first and the last result; writes the trace file and counts the number of results. This is done using the path of the trace_file and the timestamp_queue (information from the post-processing step) """ if trace_file is not None: f, writer = self._open_csv(trace_file, ['test', 'approach', 'answer', 'time']) received_results = 0 result = timestamp_queue.get() while result != 'EOF': received_results += 1 if trace_file is not None: writer.writerow({ 'test': self.test_name, 'approach': self.approach_name, 'answer': received_results, 'time': result['timestamp'] - self.global_start_time }) self.last_result_timestamp = result['timestamp'] if not self.first_result_timestamp: self.first_result_timestamp = result['timestamp'] result = timestamp_queue.get() self.number_of_results = received_results if trace_file is not None: f.close()
@staticmethod def _open_csv(file, fields): mode = 'a' if os.path.isfile(file) else 'w' f = open(file, mode) writer = csv.DictWriter(f, fields) if mode == 'w': writer.writeheader() return f, writer
[docs] def receive_global_stats(self, stats_out_queue, using_output_completion_runner=False): """ Receiving start and stop times of the different steps and also the time of the first validation result. """ needed_stats = {'mp_validate': False, 'contactSource': False, 'mp_xjoin': False, 'mp_post_processing': False, 'first_validation_result': False} if using_output_completion_runner: needed_stats['mp_output_completion'] = False while sum(needed_stats.values()) < len(needed_stats.keys()): statistic = stats_out_queue.get() needed_stats[statistic['topic']] = True if statistic['topic'] == 'mp_validate': self.validation_started_time, self.validation_finished_time = statistic['time'] elif statistic['topic'] == 'contactSource': self.query_started_time, self.query_finished_time = statistic['time'] elif statistic['topic'] == 'mp_xjoin': self.join_started_time, self.join_finished_time = statistic['time'] elif statistic['topic'] == 'mp_post_processing': self.post_processing_started_time, self.post_processing_finished_time = statistic['time'] elif statistic['topic'] == 'first_validation_result': self.first_validation_result_time = statistic['time'] elif statistic['topic'] == 'mp_output_completion': _, self.global_end_time = statistic['time'] elif statistic['topic'] == 'Exception': raise Exception('An Exception occurred in ' + statistic['location']) else: raise Exception('received statistic with unknown topic: {}'.format(statistic['topic']))
[docs] def write_matrix_and_stats_files(self, matrix_file, stats_file): total_execution_time = self.global_end_time - self.global_start_time if self.query_started_time is not None and self.query_finished_time is not None: query_time = self.query_finished_time - self.query_started_time else: query_time = 'NaN' if self.validation_finished_time is not None and self.validation_started_time is not None: network_validation_time = self.validation_finished_time - self.validation_started_time else: network_validation_time = 'NaN' # if self.post_processing_finished_time is not None and self.post_processing_started_time is not None: # post_processing_time = self.post_processing_finished_time - self.post_processing_started_time # else: # post_processing_time = 'NaN' # Using the maximum of these timestamps because the later one better describes the 'real' start of the join. if self.first_validation_result_time: approximated_join_start = max(self.join_started_time, self.first_validation_result_time) else: approximated_join_start = self.join_started_time if self.join_started_time is not None and self.join_finished_time is not None: join_time = self.join_finished_time - approximated_join_start else: join_time = 'NaN' # if self.first_result_timestamp: # first_result_time = self.first_result_timestamp - self.global_start_time # else: # first_result_time = 'NaN' # # if self.last_result_timestamp: # last_result_time = self.last_result_timestamp - self.global_start_time # else: # last_result_time = 'NaN' # # matrix_entry = {'test': self.test_name, # 'approach': self.approach_name, # 'tfft': first_result_time, # 'totaltime': last_result_time, # 'comp': self.number_of_results} stats_entry = {'test': self.test_name, 'approach': self.approach_name, 'total_execution_time': total_execution_time, 'query_time': query_time, 'network_validation_time': network_validation_time, 'join_time': join_time} # if matrix_file is not None: # f, writer = self._open_csv(matrix_file, ['test', 'approach', 'tfft', 'totaltime', 'comp']) # writer.writerow(matrix_entry) # f.close() if stats_file is not None: f, writer = self._open_csv(stats_file, ['test', 'approach', 'total_execution_time', 'query_time', 'network_validation_time', 'join_time']) writer.writerow(stats_entry) f.close()