Source code for ores.scoring_systems.process_pool

import logging
from concurrent import futures as cfutures

from ..errors import TimeoutError
from .scoring_system import ScoringSystem

logger = logging.getLogger(__name__)


[docs]class ProcessPool(ScoringSystem): def __init__(self, *args, workers=None, **kwargs): super().__init__(*args, **kwargs) self.workers = int(workers) if workers is not None else None def _process_missing_scores(self, request, missing_model_set_revs, root_caches, inprogress_results=None): rev_scores = {} errors = {} futures = {} with cfutures.ProcessPoolExecutor(max_workers=self.workers) as executor: for missing_models, rev_ids in missing_model_set_revs.items(): for rev_id in rev_ids: if rev_id not in root_caches: continue root_cache = root_caches[rev_id] logger.debug("Submitting _process_score_map for {0}" .format(request.format(rev_id, missing_models))) future = executor.submit( self._process_score_map, request, rev_id, missing_models, root_cache) futures[rev_id] = future for rev_id, future in futures.items(): try: rev_scores[rev_id] = future.result(timeout=self.timeout) except cfutures.TimeoutError: errors[rev_id] = TimeoutError( "Timed out after {0} seconds.".format(self.timeout)) except Exception as error: errors[rev_id] = error return rev_scores, errors
[docs] @classmethod def from_config(cls, config, name, section_key="scoring_systems"): logger.info("Loading ProcessPool '{0}' from config.".format(name)) section = config[section_key][name] kwargs = cls._kwargs_from_config( config, name, section_key=section_key) workers = section.get('workers') return cls(workers=workers, **kwargs)