Source code for jinete.algorithms.exacts.models.three_index

"""Three index linear model class definitions."""

from __future__ import (
    annotations,
)

import logging
from collections import (
    defaultdict,
)
from itertools import (
    product,
)
from typing import (
    TYPE_CHECKING,
)

import pulp as lp
from cached_property import (
    cached_property,
)
from jinete.models import (
    PlannedTrip,
    Route,
    Stop,
)

from .abc import (
    LinearModel,
)

if TYPE_CHECKING:
    from typing import (
        List,
        Set,
        Tuple,
        Iterable,
        Optional,
        Dict,
    )
    from jinete.models import (
        Trip,
        Vehicle,
        Position,
    )

logger = logging.getLogger(__name__)

BIG = 10000


[docs]class ThreeIndexLinearModel(LinearModel): """Three index model class implementation. This implementation is based on the Cordeau-Laporte paper about the dial-a-ride problem. """
[docs] def __init__(self, solver: lp.LpSolver = None, *args, **kwargs): """Construct a new object instance. :param solver: The solver to be used during the optimization. :param args: Additional positional arguments. :param kwargs: Additional named arguments. """ super().__init__(*args, **kwargs) self.solver = solver
@cached_property def _problem(self) -> lp.LpProblem: problem = lp.LpProblem("3-idx_dial-a-ride", lp.LpMinimize) problem.objective = self._objective problem.extend(self._constraints) return problem @cached_property def _objective(self) -> Optional[lp.LpConstraintVar]: return lp.lpSum( self._x[k][i][j] * self._costs[i][j] for k, i, j in product(self._routes_indexer, self._positions_indexer, self._positions_indexer) ) @cached_property def _constraints(self) -> List[lp.LpConstraint]: constraints: List[lp.LpConstraint] = sum( [ self._uniqueness_constraints, self._connectivity_constraints, self._time_constraints, self._feasibility_constraints, ], [], ) logger.info(f'Built "{len(constraints)}" constraints.') return constraints @cached_property def _x(self) -> List[List[List[lp.LpVariable]]]: x = list() for k in self._routes_indexer: x_k = list() for i in self._positions_indexer: x_ki = list() for j in self._positions_indexer: x_kij = lp.LpVariable(f"x_{k}_{i}_{j}", cat=lp.LpBinary) x_ki.append(x_kij) x_k.append(x_ki) x.append(x_k) return x @cached_property def _u(self) -> List[List[lp.LpVariable]]: u = list() for k in self._routes_indexer: u_k = list() for i in self._positions_indexer: u_ki = lp.LpVariable(f"u_{k}_{i}", lowBound=0.0) u_k.append(u_ki) u.append(u_k) return u @cached_property def _w(self) -> List[List[lp.LpVariable]]: w = list() for k in self._routes_indexer: w_k = list() for i in self._positions_indexer: w_ki = lp.LpVariable(f"w_{k}_{i}", lowBound=0.0) w_k.append(w_ki) w.append(w_k) return w @cached_property def _vehicles(self) -> Tuple[Vehicle, ...]: return tuple(self.fleet.vehicles) @cached_property def _trips(self) -> Tuple[Trip, ...]: return tuple(self.job.trips) @cached_property def _positions(self) -> Tuple[Position, ...]: origins = tuple(trip.origin_position for trip in self._trips) destinations = tuple(trip.destination_position for trip in self._trips) positions = ( (self._vehicles[0].origin_position,) + origins + destinations + (self._vehicles[0].destination_position,) ) return positions @cached_property def _costs(self): costs = list() for origin in self._positions: origin_costs = list() for destination in self._positions: if origin == destination: cost = BIG else: cost = origin.distance_to(destination) origin_costs.append(cost) costs.append(origin_costs) return costs @property def _pickups_indexer(self) -> Iterable[int]: return range(1, self._n + 1) @property def _nodes_indexer(self) -> Iterable[int]: return range(1, self._n * 2 + 1) @property def _routes_indexer(self) -> Iterable[int]: return range(len(self._vehicles)) @property def _positions_indexer(self) -> Iterable[int]: return range(len(self._positions)) @property def _n(self) -> int: return len(self._trips) def _trip_by_position_idx(self, idx: int) -> Optional[Trip]: if idx in (0, len(self._positions) - 1): return None return self._trips[(idx % self._n) - 1] def _idx_by_position(self, position: Position): return self._positions.index(position) def _time_window_by_position_idx(self, idx: int) -> Tuple[float, float]: position = self._positions[idx] trip = self._trip_by_position_idx(idx) if trip is None: earliest, latest = 0, 1440 elif position == trip.origin_position: earliest, latest = trip.origin_earliest, trip.origin_latest elif position == trip.destination_position: earliest, latest = trip.destination_earliest, trip.destination_latest else: raise Exception("There was a problem related with earliest, latest indices.") return earliest, latest def _capacity_by_position_idx(self, idx: int) -> float: trip = self._trip_by_position_idx(idx) if trip is None: return 0 capacity = trip.capacity if not idx < len(self._positions) / 2: capacity *= -1 return capacity def _load_time_by_position_idx(self, idx: int) -> float: trip = self._trip_by_position_idx(idx) if trip is None: return 0 return trip.origin_duration def _timeout_by_position_idx(self, idx: int) -> float: trip = self._trip_by_position_idx(idx) if trip is None: return 0 return trip.timeout @cached_property def _uniqueness_constraints(self) -> List[lp.LpConstraint]: constraints = list() for i in self._pickups_indexer: lhs = lp.lpSum(self._x[k][i][j] for j, k in product(self._positions_indexer, self._routes_indexer)) constraints.append(lhs == 1) for k in self._routes_indexer: lhs = lp.lpSum(self._x[k][i][j] for j in self._positions_indexer) - lp.lpSum( self._x[k][self._n + i][j] for j in self._positions_indexer ) constraints.append(lhs == 0) return constraints @cached_property def _connectivity_constraints(self) -> List[lp.LpConstraint]: constraints = list() for k in self._routes_indexer: lhs = lp.lpSum(self._x[k][0][j] for j in self._positions_indexer) constraints.append(lhs == 1) for i in self._nodes_indexer: lhs = lp.lpSum(self._x[k][j][i] for j in self._positions_indexer) - lp.lpSum( self._x[k][i][j] for j in self._positions_indexer ) constraints.append(lhs == 0) lhs = lp.lpSum(self._x[k][i][-1] for i in self._positions_indexer) constraints.append(lhs == 1) return constraints @cached_property def _time_constraints(self) -> List[lp.LpConstraint]: constraints = list() for k in self._routes_indexer: for i, j in product(self._positions_indexer, self._positions_indexer): load_time_i = self._load_time_by_position_idx(i) travel_time = self._positions[i].time_to(self._positions[j]) earliest_i, latest_i = self._time_window_by_position_idx(i) earliest_j, latest_j = self._time_window_by_position_idx(j) cons = max(0, latest_i + load_time_i + travel_time - earliest_j) constraints.append( self._u[k][j] >= self._u[k][i] + load_time_i + travel_time - cons * (1 - self._x[k][i][j]), ) capacity_i = self._capacity_by_position_idx(i) capacity_j = self._capacity_by_position_idx(j) cons = self._vehicles[k].capacity + min(0.0, capacity_i) constraints.append(self._w[k][j] >= self._w[k][i] + capacity_j - cons * (1 - self._x[k][i][j]),) return constraints @cached_property def _feasibility_constraints(self) -> List[lp.LpConstraint]: constraints = list() for k in self._routes_indexer: constraint = self._u[k][-1] - self._u[k][0] <= self._vehicles[k].timeout constraints.append(constraint) for i in self._positions_indexer: earliest, latest = self._time_window_by_position_idx(i) constraints.extend([earliest <= self._u[k][i], self._u[k][i] <= latest]) for i in self._pickups_indexer: load_time = self._load_time_by_position_idx(i) timeout = self._timeout_by_position_idx(i) travel_time = self._positions[i].time_to(self._positions[i + self._n]) constraints.extend( [ travel_time <= self._u[k][i + self._n] - (self._u[k][i] + load_time), self._u[k][i + self._n] - (self._u[k][i] + load_time) <= timeout, ] ) for i in self._positions_indexer: capacity = self._capacity_by_position_idx(i) constraints.extend( [ max(0.0, capacity) <= self._w[k][i], self._w[k][i] <= self._vehicles[k].capacity + min(0.0, capacity), ] ) return constraints
[docs] def solve(self) -> Set[Route]: """Perform a optimization based on the linear model. :return A set of optimized routes. """ logger.info("Starting to solve...") self._problem.solve(self.solver) self._validate() logger.info(f'Obtained "{lp.value(self._objective)}" reaching "{lp.LpStatus[self._problem.status]}".') return self._solution_to_routes()
def _print_solution(self): print("X:") for k in self._routes_indexer: print(f"Vehicle {k}-th.") print(f' {" ".join(map(lambda num: f"{num:02d}", self._positions_indexer))}') for i in self._positions_indexer: print(f"{i:02d}", end=" ") for j in self._positions_indexer: print(f"{int(self._x[k][i][j].varValue):2d}", end=" ") print() print() print("U:") for k in self._routes_indexer: print(f"Vehicle {k}-th.") for i in self._positions_indexer: print(f"{self._u[k][i].varValue:4.01f}", end=" ") print() print("W:") for k in self._routes_indexer: print(f"Vehicle {k}-th.") for i in self._positions_indexer: print(f"{self._w[k][i].varValue:4.01f}", end=" ") print() def _validate(self): for k in self._routes_indexer: for i in self._positions_indexer: logger.info(f'Obtained "u[{k}][{i}]={self._u[k][i].varValue}".') assert self._u[k][i].varValue >= 0.0 logger.info(f'Obtained "w[{k}][{i}]={self._w[k][i].varValue}".') assert self._w[k][i].varValue >= 0.0 for j in self._positions_indexer: logger.info(f'Obtained "x[{k}][{i}][{j}]={self._x[k][i][j].varValue}".') assert min(abs(self._x[k][i][j].varValue), abs(self._x[k][i][j].varValue - 1)) <= 0.05 def _solution_to_routes(self): logger.info("Casting solution to a set of routes...") routes = set() for k in self._routes_indexer: vehicle = self._vehicles[k] route = Route(vehicle) positions = self._solution_to_positions(k) stops = self._positions_to_stops(route, positions) trips = self._positions_to_trips(positions) self._build_planned_trips(vehicle, trips, stops) self._adjust_waiting_times(stops, k) for stop in stops: route.insert_stop(stop) routes.add(route) return routes def _positions_to_trips(self, positions) -> List[Trip]: trips: List[Trip] = list() for position in positions: trip = next((trip for trip in self._trips if trip.origin_position == position), None) if trip is None: continue if trip in trips: continue trips.append(trip) return trips def _build_planned_trips(self, vehicle: Vehicle, trips: List[Trip], stops: List[Stop]) -> List[PlannedTrip]: stop_mapper = self._stop_to_stop_mapper(stops) planned_trips = list() for trip in trips: pickup = stop_mapper[trip.origin_position].pop(0) delivery = stop_mapper[trip.destination_position].pop(0) planned_trip = PlannedTrip(vehicle, trip, pickup, delivery) planned_trips.append(planned_trip) return planned_trips def _solution_to_positions(self, k: int) -> List[Position]: ordered_trip_indexes = [idx for idx in sorted(range(len(self._u[k])), key=lambda x: self._u[k][x].varValue)] positions = list() for i, j in product(ordered_trip_indexes, self._positions_indexer): if not int(self._x[k][i][j].varValue) == 1: continue positions.append(self._positions[i]) return positions @staticmethod def _positions_to_stops(route, positions) -> List[Stop]: stops = [route.first_stop] for position in positions: if position == stops[-1].position: continue pickup = Stop(route.vehicle, position, stops[-1]) stops.append(pickup) return stops @staticmethod def _stop_to_stop_mapper(stops: List[Stop]) -> Dict[Position, List[Stop]]: mapper: Dict[Position, List[Stop]] = defaultdict(list) for stop in stops: mapper[stop.position].append(stop) return mapper def _adjust_waiting_times(self, stops: List[Stop], k: int): starting_times = tuple(float(u_k.varValue) for u_k in self._u[k]) for stop in stops: stop.flush() stop.starting_time = starting_times[self._idx_by_position(stop.position)]