Source code for ores.scoring_systems.scoring_system

import logging
import time

import revscoring.errors
import stopit

from .. import errors
from ..errors import (MissingContext, MissingModels, TimeoutError,
                      TooManyRequestsError)
from ..lock_manager import IpRangeList, PoolCounter
from ..metrics_collectors import Null
from ..score_caches import Empty
from ..score_response import ScoreResponse
from ..scoring_context import ScoringContext
from ..util import timeout

logger = logging.getLogger(__name__)


[docs]class ScoringSystem(dict): def __init__(self, context_map, score_cache=None, metrics_collector=None, timeout=None, lock_manager=None, connections_per_ip=4, connections_per_ip_hard=7, whitelisted_ips=[]): super().__init__() self.update(context_map) self.score_cache = score_cache or Empty() self.metrics_collector = metrics_collector or Null() self.timeout = timeout self.connections_per_ip = connections_per_ip self.connections_per_ip_hard = connections_per_ip_hard self.lock_manager = lock_manager self.whitelisted_ranges = IpRangeList(whitelisted_ips)
[docs] def check_context_models(self, request): if request.context_name not in self: raise MissingContext(request.context_name) missing_models = request.model_names - self[request.context_name].keys() if len(missing_models) > 0: raise MissingModels(request.context_name, missing_models)
[docs] def score(self, request): self.check_context_models(request) locked = None start = time.time() if request.ip and (not request.precache) and \ self.lock_manager is not None and \ not self.whitelisted_ranges.matches(request.ip): locked = self._lock_ip(request.ip) self.metrics_collector.lock_acquired( 'poolcounter', duration=time.time() - start) logger.debug("Scoring {0}".format(request)) try: response = self._score(request) except errors.ScoreProcessorOverloaded: self.metrics_collector.score_processor_overloaded(request) raise finally: if request.ip and locked: self._release_ip(request.ip) duration = time.time() - start if not request.precache: self.metrics_collector.scores_request(request, duration) else: self.metrics_collector.precache_request(request, duration) return response
def _score(self, request): context = self[request.context_name] response = ScoreResponse(context, request) # 0. Get model info (if applicable) if request.model_info: self._build_model_info(request, response) # 1. Lookup cached and inprogress scores (Fast IO) if not request.include_features: # Can't use cached score if we want feature values since those # will need to be regenerated self._lookup_cached_scores(request, response) inprogress_results = self._lookup_inprogress_results( request, response) else: inprogress_results = {} # 2. Get missing rev_model sets missing_model_set_revs = self._filter_missing_model_set_revs( request, response, inprogress_results=inprogress_results) # 2.5 Register inprogress work self._register_model_set_revs_to_process( request, missing_model_set_revs) # 3. Extract base datasources for missing models (Slow IO) start = time.time() root_caches, extraction_errors = self._extract_root_caches( request, missing_model_set_revs) self.metrics_collector.datasources_extracted( request, sum(len(ids) for ids in missing_model_set_revs.values()), time.time() - start) # 3.5. Record extraction errors for rev_id, error in extraction_errors.items(): for model in request.model_names: # To avoid too may logs, we filter errors related # to missing resources, since they are expected # and generally not a problem. # See https://phabricator.wikimedia.org/T299137 if not isinstance(error, revscoring.errors.MissingResource): logger.error( "Feature extraction error for model {} " "and revision {} due to: {}" .format(model, rev_id, error)) response.add_error(rev_id, model, error) self.metrics_collector.score_errored(request, model) # 4. Generate scores (Heavy CPU) missing_scores, scoring_errors = self._process_missing_scores( request, missing_model_set_revs, root_caches, inprogress_results=inprogress_results) for rev_id, score_map in missing_scores.items(): for model_name, model_score in score_map.items(): response.add_score(rev_id, model_name, model_score['score']) if request.include_features: response.add_features( rev_id, model_name, model_score['features']) # Store scores in cache self._cache_scores(request, rev_id, score_map) # 4.5 Record scoring errors for rev_id, error in scoring_errors.items(): for model in request.model_names: # To avoid too may logs, we filter errors related # to missing resources, since they are expected # and generally not a problem. # See https://phabricator.wikimedia.org/T299137 if not isinstance(error, revscoring.errors.MissingResource): logger.error( "Feature extraction error for model {} " "and revision {} due to: {}" .format(model, rev_id, error)) response.add_error(rev_id, model, error) self.metrics_collector.score_errored(request, model) return response def _build_model_info(self, request, response): context = self[request.context_name] for model_name in request.model_names: response.add_model_info( model_name, context.format_model_info(model_name, request.model_info)) def _extract_root_caches(self, request, missing_model_set_revs): context = self[request.context_name] root_caches = {} errors = {} for model_set, rev_ids in missing_model_set_revs.items(): ms_root_caches, ms_errors = context.extract_root_dependency_caches( model_set, rev_ids, injection_caches=request.injection_caches) root_caches.update(ms_root_caches) errors.update(ms_errors) return root_caches, errors def _process_score_map(self, request, rev_id, model_names, root_cache): context = self[request.context_name] start = time.time() # Runs a timeout function so that we don't get stuck here try: score_map = timeout( context.process_model_scores, model_names, root_cache, include_features=request.include_features, seconds=self.timeout) except revscoring.errors.CaughtDependencyError as e: if isinstance(e.exception, stopit.TimeoutException): duration = time.time() - start self.metrics_collector.score_timed_out(request, duration) raise TimeoutError("Timed out after {0} seconds." .format(round(duration))) else: raise except TimeoutError: duration = time.time() - start self.metrics_collector.score_timed_out(request, duration) raise duration = time.time() - start logger.debug("Score generated for {0}:{1} in {2} seconds" .format(request.context_name, set(request.model_names), round(duration, 3))) self.metrics_collector.score_processed(request, duration) return score_map def _process_missing_scores(self, request, missing_model_set_revs, root_caches): raise NotImplementedError() def _filter_missing_model_set_revs(self, request, response, inprogress_results=None): missing_model_set_rev_pairs = self._filter_missing_model_pairs( request, response, inprogress_results) missing_model_set_revs = {} for model_set, rev_id in missing_model_set_rev_pairs: if len(model_set) == 0: continue if model_set in missing_model_set_revs: missing_model_set_revs[model_set].append(rev_id) else: missing_model_set_revs[model_set] = [rev_id] return missing_model_set_revs def _filter_missing_model_pairs(self, request, response, inprogress_results): for rev_id in request.rev_ids: missing_models = request.model_names - ( set(response.scores.get(rev_id, {}).keys()) | set(inprogress_results.get(rev_id, {}).keys())) yield frozenset(missing_models), rev_id def _register_model_set_revs_to_process(self, request, missing_model_set_revs): return None def _cache_scores(self, request, rev_id, score_map): for model_name, score_doc in score_map.items(): self._cache_score(request, rev_id, model_name, score_doc) def _cache_score(self, request, rev_id, model_name, score_doc): version = self[request.context_name].model_version(model_name) injection_cache = request.injection_caches.get(rev_id) self.score_cache.store( score_doc['score'], request.context_name, model_name, rev_id, version=version, injection_cache=injection_cache) def _lookup_inprogress_results(self, request, response): return {} def _lookup_cached_scores(self, request, response): for rev_id in request.rev_ids: for model_name in request.model_names: try: rev_score = self._lookup_cached_score( request, rev_id, model_name) response.add_score(rev_id, model_name, rev_score) except KeyError: pass def _lookup_cached_score(self, request, rev_id, model_name): version = self[request.context_name].model_version(model_name) injection_cache = request.injection_caches.get(rev_id) try: score = self.score_cache.lookup( request.context_name, model_name, rev_id, version=version, injection_cache=injection_cache) logger.debug("Found cached score for {0}" .format(request.format(rev_id, model_name))) self.metrics_collector.score_cache_hit(request, model_name) return score except KeyError: self.metrics_collector.score_cache_miss(request, model_name) raise def _lock_ip(self, ip): try: locked = self.lock_manager.lock( ip, self.connections_per_ip, self.connections_per_ip_hard, self.timeout) except TimeoutError: raise except TooManyRequestsError: raise # Lock manager can't lock, let's do nothing except Exception: logger.warning('Can not lock in lock manager') return False return locked def _release_ip(self, ip): try: self.lock_manager.release(ip) # Lock manager can't release, let's do nothing except Exception: logger.warning('Can not release locks in lock manager') @classmethod def _kwargs_from_config(cls, config, name, section_key="scoring_systems", ScoringContextClass=ScoringContext): from ..metrics_collectors import MetricsCollector from ..score_caches import ScoreCache section = config[section_key][name] context_map = ScoringContextClass.map_from_config( config, section['scoring_contexts']) if 'score_cache' in section: score_cache = ScoreCache.from_config(config, section['score_cache']) else: score_cache = None if 'lock_manager' in section: pool_counter = PoolCounter.from_config(config, section['lock_manager']) else: pool_counter = None if 'metrics_collector' in section: metrics_collector = MetricsCollector.from_config( config, section['metrics_collector']) else: metrics_collector = None timeout = section.get('timeout') connections_per_ip = section.get('connections_per_ip', 4) connections_per_ip_hard = section.get('connections_per_ip', 7) whitelisted_ips = section.get('whitelisted_ips', []) return {'context_map': context_map, 'score_cache': score_cache, 'metrics_collector': metrics_collector, 'timeout': timeout, 'connections_per_ip': connections_per_ip, 'connections_per_ip_hard': connections_per_ip_hard, 'whitelisted_ips': whitelisted_ips, 'lock_manager': pool_counter}
[docs] @classmethod def from_config(cls, config, name, section_key="scoring_systems"): try: import yamlconf except ImportError: raise ImportError("Could not find yamlconf. This packages is " + "required when using yaml config files.") logger.info("Loading ScoreProcessor '{0}' from config.".format(name)) section = config[section_key][name] if 'module' in section: return yamlconf.import_module(section['module']) elif 'class' in section: Class = yamlconf.import_module(section['class']) return Class.from_config(config, name) else: raise RuntimeError("No module or class to load.")