Source code for shaclapi.multiprocessing.contactSource
__author__ = 'Gabriela Montoya, Kemele M. Endris, Julian Gercke' # modified version uses requests instead of urllib
import logging
import requests
logger = logging.getLogger(__name__)
[docs]def contactSource(queue, endpoint, query, limit=-1):
"""
Normal contactSource implementation but queue is filled with an output, which is in a format which is joinable
with validation results. Queue_copy contains the normal result but with an ID.
Example:
Input:
{var1: instance1, var2: instance2, var3: instance3}
Output queue:
{'instance': instance1, 'var': var1, 'id': UNIQUE_RESULT_ID},
{'instance': instance2, 'var': var2, 'id': UNIQUE_RESULT_ID},
{'instance': instance3, 'var': var3, 'id': UNIQUE_RESULT_ID}
Output queue_copy:
{'query_result': {'var1': instance1, 'var2': instance2, 'var3': instance3}, 'id': UNIQUE_RESULT_ID}
"""
# Contacts the datasource (i.e. real endpoint).
# Every tuple in the answer is represented as Python dictionaries
# and is stored in a queue.
# print('in *NEW* contactSource')
b = None
cardinality = 0
server = endpoint
referer = server
try:
server = server.split('http://')[1]
except:
try:
server = server.split('https://')[1]
except:
raise Exception('Not a valid endpoint url: {}'.format(server))
if '/' in server:
(server, path) = server.split('/', 1)
else:
path = ''
host_port = server.split(':')
port = 80 if len(host_port) == 1 else host_port[1]
card = 0
if limit == -1:
b, card = contactSourceAux(referer, server, path, port, query, queue)
else:
# Contacts the datasource (i.e. real endpoint) incrementally,
# retrieving partial result sets combining the SPARQL sequence
# modifiers LIMIT and OFFSET.
# Set up the offset.
offset = 0
while True:
query_copy = query + ' LIMIT ' + str(limit) + ' OFFSET ' + str(offset)
b, cardinality = contactSourceAux(referer, server, path, port, query_copy, queue, offset)
card += cardinality
if cardinality < limit:
break
offset = offset + limit
# Close the queue
# queue.put('EOF')
# queue_copy.put('EOF')
return b
[docs]def contactSourceAux(referer, server, path, port, query, queue, first_id=0):
# Setting variables to return.
b = None
reslist = 0
if '0.0.0.0' in server:
server = server.replace('0.0.0.0', 'localhost')
js = 'application/sparql-results+json'
params = {'query': query, 'format': js}
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36',
'Accept': js}
try:
r = requests.get(referer, params=params, headers=headers)
res = r.text
res = res.replace('false', 'False')
res = res.replace('true', 'True')
res = eval(res)
reslist = 0
if type(res) == dict:
b = res.get('boolean', None)
if 'results' in res:
# print 'raw results from endpoint', res
id = first_id
for x in res['results']['bindings']:
for key, props in x.items():
# Handle typed-literals and language tags
suffix = ''
if props['type'] == 'typed-literal':
if isinstance(props['datatype'], bytes):
suffix = '^^<' + props['datatype'].decode('utf-8') + '>'
else:
suffix = '^^<' + props['datatype'] + '>'
elif 'xml:lang' in props:
suffix = '@' + props['xml:lang']
try:
if isinstance(props['value'], bytes):
x[key] = props['value'].decode('utf-8') + suffix
else:
x[key] = props['value'] + suffix
except:
x[key] = props['value'] + suffix
queue.put({'var': key, 'instance': x[key], 'id': id})
logger.debug({'query_result': x, 'id': id})
id = id + 1
reslist += 1
else:
logger.warning('the source ' + str(server) + ' answered in ' + res.getheader('content-type') +
' format, instead of the JSON format required, then that answer will be ignored')
except Exception as e:
raise Exception('Exception while sending request to ', referer, 'msg:', e)
return b, reslist