001/******************************************************************************* 002 * Copyright (c) 2017 Pablo Pavon Marino and others. 003 * All rights reserved. This program and the accompanying materials 004 * are made available under the terms of the 2-clause BSD License 005 * which accompanies this distribution, and is available at 006 * https://opensource.org/licenses/BSD-2-Clause 007 * 008 * Contributors: 009 * Pablo Pavon Marino and others - initial API and implementation 010 *******************************************************************************/ 011package com.net2plan.examples.ocnbook.onlineSim; 012 013 014import cern.colt.matrix.tdouble.DoubleFactory1D; 015import cern.colt.matrix.tdouble.DoubleFactory2D; 016import cern.colt.matrix.tdouble.DoubleMatrix1D; 017import cern.colt.matrix.tdouble.DoubleMatrix2D; 018import com.jom.OptimizationProblem; 019import com.net2plan.interfaces.networkDesign.*; 020import com.net2plan.interfaces.simulation.IEventProcessor; 021import com.net2plan.interfaces.simulation.SimEvent; 022import com.net2plan.utils.Constants.RoutingType; 023import com.net2plan.utils.*; 024 025import java.io.File; 026import java.io.FileOutputStream; 027import java.io.PrintStream; 028import java.util.*; 029 030/** 031 * This module implements a distributed primal-gradient based algorithm, for iteratively adapting the network routing. 032 * 033 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 034 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 035 * The time evolution of different metrics can be stored in output files, for later processing. 036 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec9_3_adaptiveRoutingPrimal.m">{@code fig_sec9_3_adaptiveRoutingPrimal.m}</a> MATLAB file used for generating the graph/s of the case study in the 037 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm. 038 * 039 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator. 040 * 041 * @net2plan.keywords Flow assignment (FA), Distributed algorithm, Primal gradient algorithm 042 * @net2plan.ocnbooksections Section 9.3 043 * @net2plan.inputParameters 044 * @author Pablo Pavon-Marino 045 */ 046@SuppressWarnings("unchecked") 047public class Online_evProc_adaptiveRoutingPrimal extends IEventProcessor 048{ 049 private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages"); 050 private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true); 051 private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true); 052 private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true); 053 private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 054 private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true); 055 private InputParameter update_isSynchronous = new InputParameter ("update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state"); 056 private InputParameter update_averageInterUpdateTime = new InputParameter ("update_averageInterUpdateTime", 1.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true); 057 private InputParameter update_maxFluctuationInterUpdateTime = new InputParameter ("update_maxFluctuationInterUpdateTime", 0.5 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 058 private InputParameter gradient_maxGradientAbsoluteNoise = new InputParameter ("gradient_maxGradientAbsoluteNoise", 0.0 , "Max value of the added noise to the gradient coordinate in absolute values" , 0 , true , Double.MAX_VALUE , true); 059 private InputParameter gradient_gammaStep = new InputParameter ("gradient_gammaStep", 1.0 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true); 060 private InputParameter gradient_heavyBallBetaParameter = new InputParameter ("gradient_heavyBallBetaParameter", 0.0 , "Beta parameter of heavy ball, between 0 and 1. Value 0 means no heavy ball" , 0 , true , 1.0 , true); 061 private InputParameter gradient_maxGradientCoordinateChange = new InputParameter ("gradient_maxGradientCoordinateChange", 1000.0 , "Maximum change in an iteration of a gradient coordinate" , 0 , false , Double.MAX_VALUE , true); 062 private InputParameter simulation_maxNumberOfUpdateIntervals = new InputParameter ("simulation_maxNumberOfUpdateIntervals", 100.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true); 063 private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator"); 064 private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "adaptiveRoutingPrimal" , "Root of the file name to be used in the output files. If blank, no output"); 065 private InputParameter control_maxNumberOfPathsPerDemand = new InputParameter ("control_maxNumberOfPathsPerDemand", (int) 5 , "Number of acceptable paths per demand. Take the k-shortest paths in number of hops" , 1 , Integer.MAX_VALUE); 066 067 private static PrintStream getNulFile () { try { return new PrintStream (new FileOutputStream ("NUL") , false); } catch (Exception e) {e.printStackTrace(); throw new RuntimeException ("Not NUL file"); } } 068 private PrintStream log = getNulFile (); //System.err;//new PrintStream (new FileOutputStream ("NUL") , true); 069 070 private Random rng; 071 072 private static final int SIGNALING_WAKEUPTOSENDMESSAGE = 300; 073 private static final int SIGNALING_RECEIVEDMESSAGE = 301; 074 private static final int UPDATE_WAKEUPTOUPDATE = 302; 075 076 private NetPlan currentNetPlan; 077 078 private DoubleMatrix1D control_price_e; 079 private DoubleMatrix2D control_mostUpdated_price_e_knownByNode_n; 080 private DoubleMatrix1D control_previousXp; 081 private int [][] control_routeIndexes_d; 082 private int E , R , N , D; 083 084// private Map<Node,DoubleMatrix1D> routing_mostUpdated_price_e_knownByNode_n; 085// private Map<Long,Set<Long>> routing_outgoingDemands_n; 086// private Map<Long,Double> routing_previousXp; 087 088 private TimeTrace stat_traceOf_xp; 089 private TimeTrace stat_traceOf_objFunction; 090 091 092 @Override 093 public String getDescription() 094 { 095 return "This module implements a distributed primal-gradient based algorithm, for iteratively adapting the network routing."; 096 } 097 098 @Override 099 public List<Triple<String, String, String>> getParameters() 100 { 101 /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */ 102 return InputParameter.getInformationAllInputParameterFieldsOfObject(this); 103 } 104 105 @Override 106 public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters) 107 { 108 /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */ 109 InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters); 110 111 if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks"); 112 113 this.rng = new Random (simulation_randomSeed.getLong()); 114 this.currentNetPlan = currentNetPlan; 115 this.E = currentNetPlan.getNumberOfLinks(); 116 this.N = currentNetPlan.getNumberOfNodes(); 117 this.D = currentNetPlan.getNumberOfDemands(); 118 119 /* Check all nodes have output links */ 120 for (Node n : currentNetPlan.getNodes()) if (n.getOutgoingLinks().isEmpty()) throw new Net2PlanException ("All nodes should have output links"); 121 122// /* Compute the CPL */ 123// this.cpl = new CandidatePathList (this.currentNetPlan , "K" , "" + routing_maxNumberOfPathsPerDemand); 124 125 /* Sets the initial routing, with all the traffic balanced equally in all the paths of the demand */ 126 currentNetPlan.removeAllUnicastRoutingInformation(); 127 currentNetPlan.setRoutingTypeAllDemands(RoutingType.SOURCE_ROUTING); 128 this.currentNetPlan.addRoutesFromCandidatePathList(currentNetPlan.computeUnicastCandidatePathList(null , control_maxNumberOfPathsPerDemand.getInt(), -1, -1, -1, -1, -1, -1 , null)); 129 130 this.R = currentNetPlan.getNumberOfRoutes (); 131 this.control_previousXp = DoubleFactory1D.dense.make (currentNetPlan.getNumberOfRoutes()); 132 this.control_routeIndexes_d = new int [D][]; 133 for (Demand d : this.currentNetPlan.getDemands()) 134 { 135 final double h_d = d.getOfferedTraffic(); 136 final Set<Route> routes = d.getRoutes(); 137 if (routes.isEmpty()) throw new Net2PlanException ("A demand has no paths assigned: not connected topology"); 138 control_routeIndexes_d [d.getIndex ()] = new int [routes.size ()]; 139 int counter_r = 0; 140 for (Route r : routes) 141 { 142 control_routeIndexes_d [d.getIndex ()][counter_r ++] = r.getIndex (); 143 r.setCarriedTraffic(h_d / routes.size() , h_d / routes.size()); 144 control_previousXp.set(r.getIndex(), h_d / routes.size()); 145 log.println("Initial route: " + r + " demand: " +d + ", h_d: " + h_d + ", h_r = " + r.getCarriedTraffic() + ", seqLinks: "+ r.getSeqLinks()); 146 } 147 } 148 149 /* Set the initial prices in the nodes */ 150 this.control_price_e = DoubleFactory1D.dense.make (E); 151 for (Link e : currentNetPlan.getLinks ()) this.control_price_e.set (e.getIndex() , computeLinkPriceFromNetPlan (e)); 152 153 /* Initialize the information each demand knows of the prices of all the links */ 154 this.control_mostUpdated_price_e_knownByNode_n = DoubleFactory2D.dense.make (N , E); 155 for (int n = 0; n < N ; n ++) control_mostUpdated_price_e_knownByNode_n.viewRow (n).assign (control_price_e); 156 157 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 158 for (Node n : currentNetPlan.getNodes()) 159 { 160 final double signalingTime = (signaling_isSynchronous.getBoolean())? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 161 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , n)); 162 final double updateTime = (update_isSynchronous.getBoolean())? update_averageInterUpdateTime.getDouble() : Math.max(0 , update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 163 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE , n)); 164 } 165 166 /* Intialize the traces */ 167 this.stat_traceOf_xp = new TimeTrace(); 168 this.stat_traceOf_objFunction = new TimeTrace (); 169 this.stat_traceOf_xp.add(0.0, this.currentNetPlan.getVectorRouteCarriedTraffic()); 170 this.stat_traceOf_objFunction.add(0.0, computeObjectiveFucntionFromNetPlan()); 171 172 /* */ 173 log.println("Initial rouetrs in this.currentNetPlan.getRouteCarriedTrafficMap(): " + this.currentNetPlan.getVectorRouteCarriedTraffic()); 174 } 175 176 @Override 177 public void processEvent(NetPlan currentNetPlan, SimEvent event) 178 { 179 final double t = event.getEventTime(); 180 switch (event.getEventType()) 181 { 182 case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nt for any destination 183 { 184 final Pair<Node,Map<Link,Double>> signalInfo = (Pair<Node,Map<Link,Double>>) event.getEventObject(); 185 final Node nMe = signalInfo.getFirst(); 186 final Map<Link,Double> receivedInfo_price_e = signalInfo.getSecond(); 187 for (Link e : receivedInfo_price_e.keySet()) 188 this.control_mostUpdated_price_e_knownByNode_n.set (nMe.getIndex () , e.getIndex() , receivedInfo_price_e.get(e)); 189 log.println("t=" + t + ": SIGNALING_RECEIVEDMESSAGE node " + nMe); 190 break; 191 } 192 193 case SIGNALING_WAKEUPTOSENDMESSAGE: // A node broadcasts signaling info to its 1 hop neighbors 194 { 195 final Node nMe = (Node) event.getEventObject(); 196 log.println("t=" + t + ": ROUTING_NODEWAKEUPTO_TRANSMITSIGNAL node " + nMe); 197 198 /* Create the info I will signal */ 199 Map<Link,Double> infoToSignal_price_e = new HashMap<Link,Double> (); 200 for (Link e : nMe.getOutgoingLinks()) 201 infoToSignal_price_e.put (e , this.computeLinkPriceFromNetPlan(e)); 202 203 /* Send the events of the signaling information messages to all the nodes */ 204 if (rng.nextDouble() >= this.signaling_signalingLossProbability.getDouble()) // the signaling may be lost => lost to all nodes 205 for (Node n : currentNetPlan.getNodes ()) 206 { 207 final double signalingReceptionTime = t + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5)); 208 this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Pair.of(n , infoToSignal_price_e))); 209 } 210 211 /* Re-schedule when to wake up again */ 212 final double signalingTime = signaling_isSynchronous.getBoolean()? t + signaling_averageInterMessageTime.getDouble() : Math.max(t , t + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 213 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , nMe)); 214 break; 215 } 216 217 case UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available 218 { 219 final Node nMe = (Node) event.getEventObject(); 220 log.println("t=" + t + ": ROUTING_NODEWAKEUPTO_RECOMPUTE node " + nMe); 221 //log.println("p_e values Before = " + mac_p_e); 222 223 //if (1 ==1) break; 224 DoubleMatrix1D x_p = this.currentNetPlan.getVectorRouteCarriedTraffic(); 225 DoubleMatrix1D infoIKnow_price_e = this.control_mostUpdated_price_e_knownByNode_n.viewRow (nMe.getIndex ()); 226 for (Demand d : nMe.getOutgoingDemands()) 227 { 228 final double h_d = d.getOfferedTraffic(); 229 DoubleMatrix1D new_x_p = DoubleFactory1D.dense.make (R); 230 for (int rIndex : control_routeIndexes_d [d.getIndex ()]) 231 { 232 final Route r = currentNetPlan.getRoute (rIndex); 233 final double old_xp = x_p.get(rIndex); 234 double accumPrices = 0; 235 for (Link e : r.getSeqLinks()) accumPrices += infoIKnow_price_e.get(e.getIndex()); 236 accumPrices += 2*gradient_maxGradientAbsoluteNoise.getDouble()*(this.rng.nextDouble()-0.5); 237 new_x_p.set (r.getIndex () , old_xp - this.gradient_gammaStep.getDouble() * accumPrices + this.gradient_heavyBallBetaParameter.getDouble() * (r.getCarriedTraffic() - this.control_previousXp.get(r.getIndex ()) )); 238 } 239 240 GradientProjectionUtils.euclideanProjection_sumEquality (new_x_p , control_routeIndexes_d [d.getIndex ()] , h_d); 241 242 if (gradient_maxGradientCoordinateChange.getDouble() > 0) 243 GradientProjectionUtils.scaleDown_maxAbsoluteCoordinateChange (x_p , new_x_p , control_routeIndexes_d [d.getIndex ()] , gradient_maxGradientCoordinateChange.getDouble()); 244 245 for (int rIndex : control_routeIndexes_d [d.getIndex ()]) 246 { 247 final double newCarriedTraffic = new_x_p.get (rIndex); 248 final Route r = currentNetPlan.getRoute (rIndex); 249 control_previousXp.set (rIndex , r.getCarriedTraffic()); 250 r.setCarriedTraffic(newCarriedTraffic , newCarriedTraffic); 251 } 252 } 253 254 final double updateTime = update_isSynchronous.getBoolean()? t + update_averageInterUpdateTime.getDouble() : Math.max(t , t + update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 255 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE, nMe)); 256 257 this.stat_traceOf_xp.add(t, currentNetPlan.getVectorRouteCarriedTraffic()); 258 this.stat_traceOf_objFunction.add(t, computeObjectiveFucntionFromNetPlan()); 259 260 if (t > this.simulation_maxNumberOfUpdateIntervals.getDouble() * this.update_averageInterUpdateTime.getDouble()) { this.endSimulation (); } 261 262 break; 263 } 264 265 266 default: throw new RuntimeException ("Unexpected received event"); 267 } 268 269 270 } 271 272 public String finish (StringBuilder st , double simTime) 273 { 274 if (simulation_outFileNameRoot.getString().equals("")) return null; 275 stat_traceOf_xp.printToFile(new File (simulation_outFileNameRoot.getString() + "_xp.txt")); 276 stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt")); 277 Pair<DoubleMatrix1D,Double> optPair = computeOptimumSolution (); 278 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_xp.txt"), optPair.getFirst()); 279 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), optPair.getSecond()); 280 return null; 281 } 282 283 private double computeLinkPriceFromNetPlan (Link e) 284 { 285 final double y_e = e.getCarriedTraffic(); 286 final double u_e = e.getCapacity(); 287 if (u_e == 0) throw new RuntimeException ("Zero capacity in a link means"); 288 final double gradient = (y_e / u_e > 0.99)? 1/Math.pow(u_e * 0.01,2) : 1/Math.pow(u_e - y_e,2); 289 return gradient; 290 } 291 292 293 294 private double computeObjectiveFucntionFromNetPlan () 295 { 296 double objFunc = 0; 297 for (Link e : this.currentNetPlan.getLinks()) 298 { 299 final double y_e = e.getCarriedTraffic(); 300 final double u_e = e.getCapacity(); 301 if (y_e / u_e > 0.99) 302 objFunc += 1/(0.01*u_e) + 1/Math.pow(u_e * 0.01,2) * (y_e - 0.99*u_e); 303 else 304 objFunc += 1/(u_e - y_e); 305 } 306 return objFunc; 307 } 308 309 private Pair<DoubleMatrix1D,Double> computeOptimumSolution () 310 { 311 /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */ 312 final int R = this.currentNetPlan.getNumberOfRoutes(); 313 final int E = this.currentNetPlan.getNumberOfLinks(); 314 final DoubleMatrix1D h_d = this.currentNetPlan.getVectorDemandOfferedTraffic(); 315 final DoubleMatrix1D u_e = this.currentNetPlan.getVectorLinkCapacity(); 316 final DoubleMatrix1D h_p = this.currentNetPlan.getVectorRouteOfferedTrafficOfAssociatedDemand(); 317 318 OptimizationProblem op = new OptimizationProblem(); 319 op.addDecisionVariable("x_p", false , new int[] { 1 , R }, DoubleUtils.zeros(R) , h_p.toArray()); 320 op.addDecisionVariable("y_e", false , new int[] { 1 , E }, DoubleUtils.zeros(E) , u_e.toArray()); 321 op.setInputParameter("h_d", h_d , "row"); 322 op.setInputParameter("h_p", h_p , "row"); 323 op.setInputParameter("u_e", u_e , "row"); 324 325 /* Set some input parameters */ 326 op.setObjectiveFunction("minimize", "sum (1./(u_e - y_e))"); 327 328 /* VECTORIAL FORM OF THE CONSTRAINTS */ 329 op.setInputParameter("A_dp", currentNetPlan.getMatrixDemand2RouteAssignment()); 330 op.setInputParameter("A_ep", currentNetPlan.getMatrixLink2RouteAssignment()); 331 op.addConstraint("A_dp * x_p' == h_d'"); // for each demand, the 100% of the traffic is carried (summing the associated paths) 332 op.addConstraint("A_ep * x_p' == y_e'"); // set the traffic in each link equal to y_e 333 334 /* Call the solver to solve the problem */ 335 op.solve("ipopt"); 336 337 /* If an optimal solution was not found, quit */ 338 if (!op.solutionIsOptimal ()) throw new RuntimeException ("An optimal solution was not found"); 339 340 /* Retrieve the optimum solutions. Convert the bps into Erlangs */ 341 final double optCost = op.getOptimalCost(); 342 DoubleMatrix1D x_p = op.getPrimalSolution("x_p").view1D(); 343// Map<Long,Double> x_p_map = new HashMap<Long,Double> (); 344// for (long p : cpl.getPathIds()) x_p_map.put(p, x_p_array [id2IndexPathMap.get(p)]); 345 346 /* Check the solution */ 347 for (Link e : this.currentNetPlan.getLinks()) 348 { 349 double y_e = 0; for (Route r : e.getTraversingRoutes()) y_e += x_p.get (r.getIndex()); 350 if (y_e > e.getCapacity() + 1E-3) throw new RuntimeException ("Bad"); 351 } 352 for (Demand d : this.currentNetPlan.getDemands()) 353 { 354 double trafDemand = 0; for (Route r : d.getRoutes()) trafDemand += x_p.get(r.getIndex ()); 355 if (Math.abs(trafDemand - d.getOfferedTraffic()) > 1E-3) throw new RuntimeException ("Bad"); 356 } 357 358 return Pair.of(x_p,optCost); 359 } 360 361 362 363}