Source code for jinete.algorithms.heuristics.insertion.iterators.ranking

from __future__ import (
    annotations,
)

import itertools as it
import logging
from random import (
    Random,
)
from typing import (
    TYPE_CHECKING,
)

from .abc import (
    InsertionIterator,
)

if TYPE_CHECKING:
    from typing import (
        Dict,
        List,
    )
    from .....models import PlannedTrip, Route, Vehicle

logger = logging.getLogger(__name__)


[docs]class RankingInsertionIterator(InsertionIterator): ranking: Dict[Vehicle, List[Route]]
[docs] def __init__(self, neighborhood_max_size: int = 24, randomized_size: int = 1, seed: int = 56, *args, **kwargs): super().__init__(*args, **kwargs) if neighborhood_max_size is None: neighborhood_max_size = len(self._trips) self.neighborhood_max_size = min(neighborhood_max_size, len(self._trips)) self.randomized_size = randomized_size self.random = Random(seed) self.ranking = dict() self._initialize_ranking()
def _initialize_ranking(self) -> None: logger.info("Initializing ranking...") for route in self._attractive_routes: self.ranking[route.vehicle] = self._create_sub_ranking(route) logger.info(f"Added sub ranking! Currently {len(self.ranking)}.") def _mark_planned_trip_as_done(self, planned_trip: PlannedTrip) -> None: super()._mark_planned_trip_as_done(planned_trip) self._update_ranking(planned_trip) def _update_ranking(self, planned_trip: PlannedTrip) -> None: logger.debug(f'Updating all rankings due to planned trip with "{planned_trip.trip_identifier}" trip...') for route in self._attractive_routes: self._update_vehicle_ranking(planned_trip, route.vehicle) def _update_vehicle_ranking(self, planned_trip: PlannedTrip, vehicle: Vehicle): logger.debug(f'Updating ranking for vehicle with "{vehicle.identifier}" identifier...') if vehicle is planned_trip.vehicle or vehicle not in self.ranking: self.ranking[vehicle] = self._create_sub_ranking(self.routes_container[vehicle]) else: self.ranking[vehicle] = [route for route in self.ranking[vehicle] if planned_trip.trip not in route.trips] def _create_sub_ranking(self, route: Route) -> List[Route]: logger.debug(f'Creating sub_ranking for vehicle "{route.vehicle_identifier}"...') pending_trips = it.islice(self.pending_trips, self.neighborhood_max_size) raw_sub_ranking = self._strategy.compute(route, pending_trips) self._criterion.sorted(raw_sub_ranking, inplace=True) return raw_sub_ranking def __next__(self) -> Route: if not any(self.ranking): raise StopIteration candidates = list() for sub_ranking in self.ranking.values(): if not any(sub_ranking): continue for current in sub_ranking: if ( len(candidates) == self.randomized_size and self._criterion.best(candidates[-1], current) == candidates[-1] ): break if self.randomized_size <= len(candidates): candidates.pop() candidates.append(current) self._criterion.sorted(candidates, inplace=True) if not any(candidates): raise StopIteration best = self.random.choice(candidates) return best