Source code for ores.api
"""
This module provides a :class:`ores.api.Session` class that can maintain a
client connection to an instance of ORES and efficiently generate scores.
Batching and parallelism are set by constructor arguments.
.. autoclass:: ores.api.Session
:members:
"""
import logging
import time
import traceback
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
import requests
import requests.adapters
from more_itertools import chunked
logger = logging.getLogger(__name__)
[docs]class Session:
"""
Constructs a session with an ORES API and provides facilities for scoring
revisions in batch and parallel.
:Parameters:
host : str
The host of ORES to connect to (preceed with "http" or "https")
user_agent : str
A User-Agent header to send with every request
batch_size : int
The number of scores to batch per request.
parallel_request : int
The maximum number of requests to make in parallel
retries : int
The maximum number of retries for basic HTTP errors before giving
up
"""
DEFAULT_USERAGENT = "ores.api default user-agent"
def __init__(self, host, user_agent=None, session=None,
retries=5, batch_size=50, parallel_requests=4):
self.host = str(host)
if session is not None:
self._session = session
else:
self.retries = int(retries)
self._session = requests.Session()
self._session.mount(self.host,
requests.adapters.HTTPAdapter(max_retries=retries))
self.batch_size = int(batch_size)
self.workers = int(parallel_requests)
self.headers = {}
if user_agent is None:
logger.warning("Sending requests with default User-Agent. " +
"Set 'user_agent' on oresapi.Session to " +
"quiet this message.")
self.headers['User-Agent'] = self.DEFAULT_USERAGENT
else:
self.headers['User-Agent'] = user_agent
[docs] def score(self, context, models, revids):
"""
Genetate scores for model applied to a sequence of revisions.
:Parameters:
context : str
The name of the context -- usually the database name of a wiki
models : `iterable`
The names of a models to apply
revids : `iterable`
A sequence of revision IDs to score.
"""
if isinstance(revids, int):
rev_ids = [revids]
else:
rev_ids = [int(rid) for rid in revids]
return self._score(context, models, rev_ids)
def _score(self, context, models, rev_ids):
logging.debug("Starting up thread pool with {0} workers"
.format(self.workers))
with ThreadPoolExecutor(max_workers=self.workers) as executor:
future_rev_ids = [] # A list of future results and the revids
# This loop loads all rev_id_batch's into the executor for
# processing
for rev_id_batch in chunked(rev_ids, self.batch_size):
rev_id_batch = list(rev_id_batch)
logging.debug("Starting batch of {0} revids"
.format(len(rev_id_batch)))
future_rev_ids.append((
executor.submit(self._score_request, context, rev_id_batch,
models),
rev_id_batch))
# This loop blocks on reading the futures as soon as they are ready
for (future, rev_id_batch) in future_rev_ids:
try:
for score in future.result():
yield score
except RuntimeError as e:
logger.warning(
"An ORES scoring job failed with the following error:")
logger.warning(traceback.format_exc())
for rev_id in rev_id_batch:
yield {m: {"error": e.args[0]}
for m in models}
def _score_request(self, context, rev_ids, models):
url = self.host + "/v3/scores/{0}/".format(urllib.parse.quote(context))
params = {'revids': "|".join(str(rid) for rid in rev_ids),
'models': "|".join(urllib.parse.quote(model)
for model in models)}
logging.debug("Sending score request for {0} revisions"
.format(len(rev_ids)))
start = time.time()
response = self._session.get(url, params=params,
headers=self.headers,
verify=True, stream=True)
try:
doc = response.json()
except ValueError:
raise RuntimeError("Non-json response: " + response.text[:100])
logging.debug("Score request completed for " +
"{0} revisions completed in {1} seconds"
.format(len(rev_ids), round(time.time() - start, 3)))
if 'error' in doc:
# TODO: custom class
raise RuntimeError(doc['error'])
if 'warnings' in doc:
for warning_doc in doc['warnings']:
logger.warn(warning_doc)
return [doc[context]['scores'][str(rev_id)]
for rev_id in rev_ids]