Source code for jinete.models.objectives

from __future__ import (
    annotations,
)

import logging
from abc import (
    ABC,
)
from functools import (
    reduce,
)
from operator import (
    add,
)
from typing import (
    TYPE_CHECKING,
)

from .constants import (
    OptimizationDirection,
)
from .planned_trips import (
    PlannedTrip,
)
from .plannings import (
    Planning,
)
from .results import (
    Result,
)
from .routes import (
    Route,
)
from .stops import (
    Stop,
)

if TYPE_CHECKING:
    from typing import (
        Optional,
        TypeVar,
        Tuple,
    )

    Optimizable = TypeVar("Optimizable", Result, Planning, Route, Stop, PlannedTrip, Tuple[float, ...])

logger = logging.getLogger(__name__)


[docs]class Objective(ABC): direction: OptimizationDirection
[docs] def __init__(self, name: str, dimension_count: int): self.name = name self.direction = OptimizationDirection.MAXIMIZATION self.dimension_count = dimension_count
[docs] def best(self, *args: Optional[Optimizable]) -> Optimizable: return self.direction((arg for arg in args if arg is not None), key=self.optimization_function, default=None,)
[docs] def optimization_function(self, value: Optimizable) -> Tuple[float, ...]: if isinstance(value, Result): result = self._result_optimization_function(value) elif isinstance(value, Planning): result = self._planning_optimization_function(value) elif isinstance(value, Route): result = self._route_optimization_function(value) elif isinstance(value, Stop): result = self._stop_optimization_function(value) elif isinstance(value, PlannedTrip): result = self._planned_trip_optimization_function(value) else: result = value logger.debug(f'Computed optimization function value and obtained "{result}" from "{value}".') return result
def _result_optimization_function(self, result: Result) -> Tuple[float, ...]: return self._planning_optimization_function(result.planning) def _planning_optimization_function(self, planning: Planning) -> Tuple[float, ...]: return reduce( lambda a, b: tuple(map(add, a, b)), (self._route_optimization_function(route) for route in planning.routes), tuple(0 for _ in range(self.dimension_count)), ) def _route_optimization_function(self, route: Route) -> Tuple[float, ...]: return reduce( lambda a, b: tuple(map(add, a, b)), (self._stop_optimization_function(stop) for stop in route.stops), tuple(0 for _ in range(self.dimension_count)), ) def _stop_optimization_function(self, stop: Stop) -> Tuple[float, ...]: return reduce( lambda a, b: tuple(map(add, a, b)), (self._planned_trip_optimization_function(planned_trip) for planned_trip in stop.pickup_planned_trips), tuple(0 for _ in range(self.dimension_count)), ) def _planned_trip_optimization_function(self, planned_trip: PlannedTrip) -> Tuple[float, ...]: raise NotImplementedError
[docs]class DialARideObjective(Objective):
[docs] def __init__(self): super().__init__( name="Dial-a-Ride", dimension_count=2, )
def _route_optimization_function(self, route: Route) -> Tuple[float, ...]: return len(tuple(route.trips)), -route.distance def _planned_trip_optimization_function(self, planned_trip: PlannedTrip) -> Tuple[float, ...]: scoring = 0.0 current = planned_trip.delivery while current != planned_trip.pickup and current.previous is not None: scoring -= current.distance current = current.previous return 1, scoring
[docs]class TaxiSharingObjective(Objective):
[docs] def __init__(self): super().__init__( name="Taxi-Sharing", dimension_count=1, )
def _planned_trip_optimization_function(self, planned_trip: PlannedTrip) -> Tuple[float, ...]: if planned_trip.capacity == 0: return (0.0,) return (planned_trip.duration,)
[docs]class HashCodeObjective(Objective):
[docs] def __init__(self): super().__init__( name="HashCode-2018", dimension_count=1, )
def _planned_trip_optimization_function(self, planned_trip: PlannedTrip) -> Tuple[float, ...]: if planned_trip.capacity == 0: return (0.0,) trip = planned_trip.trip scoring = trip.distance if trip.origin_earliest == planned_trip.pickup_time: scoring += trip.on_time_bonus return (scoring,)