001/******************************************************************************* 002 * Copyright (c) 2017 Pablo Pavon Marino and others. 003 * All rights reserved. This program and the accompanying materials 004 * are made available under the terms of the 2-clause BSD License 005 * which accompanies this distribution, and is available at 006 * https://opensource.org/licenses/BSD-2-Clause 007 * 008 * Contributors: 009 * Pablo Pavon Marino and others - initial API and implementation 010 *******************************************************************************/ 011package com.net2plan.examples.ocnbook.onlineSim; 012 013 014import cern.colt.matrix.tdouble.DoubleFactory1D; 015import cern.colt.matrix.tdouble.DoubleFactory2D; 016import cern.colt.matrix.tdouble.DoubleMatrix1D; 017import cern.colt.matrix.tdouble.DoubleMatrix2D; 018import cern.jet.math.tdouble.DoubleFunctions; 019import com.jom.OptimizationProblem; 020import com.net2plan.interfaces.networkDesign.*; 021import com.net2plan.interfaces.simulation.IEventProcessor; 022import com.net2plan.interfaces.simulation.SimEvent; 023import com.net2plan.libraries.NetworkPerformanceMetrics; 024import com.net2plan.utils.Constants.RoutingType; 025import com.net2plan.utils.InputParameter; 026import com.net2plan.utils.Pair; 027import com.net2plan.utils.TimeTrace; 028import com.net2plan.utils.Triple; 029 030import java.io.File; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035 036/** 037 * This module implements a distributed dual-gradient based algorithm, for adapting the demand injected traffic (congestion control) in the network, to maximize the network utility enforcing a fair allocation of the resources. 038 * 039 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 040 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 041 * The time evolution of different metrics can be stored in output files, for later processing. 042 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec10_4_congestionControlDual.m">{@code fig_sec10_4_congestionControlDual.m}</a> MATLAB file used for generating the graph/s of the case study in the 043 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm. 044 * 045 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator. 046 * 047 * @net2plan.keywords Bandwidth assignment (BA), Distributed algorithm, Dual gradient algorithm 048 * @net2plan.ocnbooksections Section 10.4 049 * @net2plan.inputParameters 050 * @author Pablo Pavon-Marino 051 */ 052@SuppressWarnings("unchecked") 053public class Online_evProc_congestionControlDual extends IEventProcessor 054{ 055 private Random rng; 056 057 private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages"); 058 private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true); 059 private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true); 060 private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true); 061 private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 062 private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true); 063 private InputParameter update_isSynchronous = new InputParameter ("update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state"); 064 private InputParameter update_averageInterUpdateTime = new InputParameter ("update_averageInterUpdateTime", 1.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true); 065 private InputParameter update_maxFluctuationInterUpdateTime = new InputParameter ("update_maxFluctuationInterUpdateTime", 0.5 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 066 private InputParameter gradient_gammaStep = new InputParameter ("gradient_gammaStep", 0.0001 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true); 067 private InputParameter gradient_maxGradientAbsoluteNoise = new InputParameter ("gradient_maxGradientAbsoluteNoise", 0.0 , "Max value of the added noise to the gradient coordinate in absolute values" , 0 , true , Double.MAX_VALUE , true); 068 069 private InputParameter simulation_maxNumberOfUpdateIntervals = new InputParameter ("simulation_maxNumberOfUpdateIntervals", 700.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true); 070 private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator"); 071 private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "congestionControlDual" , "Root of the file name to be used in the output files. If blank, no output"); 072 073 private InputParameter control_minHd = new InputParameter ("control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 074 private InputParameter control_maxHd = new InputParameter ("control_maxHd", 1.0E6 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 075 private InputParameter control_fairnessFactor = new InputParameter ("control_fairnessFactor", 2.0 , "Fairness factor in utility function of congestion control" , 0 , true , Double.MAX_VALUE , true); 076 private InputParameter control_initialLinkPrices = new InputParameter ("control_initialLinkPrices", 1 , "Initial value of the link weights" , 0 , true , Double.MAX_VALUE , true); 077 078 private static final int SIGNALING_WAKEUPTOSENDMESSAGE = 400; 079 private static final int SIGNALING_RECEIVEDMESSAGE = 401; 080 private static final int UPDATE_WAKEUPTOUPDATE = 402; 081 082 private NetPlan currentNetPlan; 083 private int N,E,D; 084 private DoubleMatrix1D congControl_price_e; 085 private DoubleMatrix2D control_mostUpdatedLinkPriceKnownByDemand_de; 086 087 private TimeTrace stat_traceOf_hd; 088 private TimeTrace stat_traceOf_objFunction; 089 private TimeTrace stat_traceOf_pie; 090 private TimeTrace stat_traceOf_ye; 091 092 @Override 093 public String getDescription() 094 { 095 return "This module implements a distributed dual-gradient based algorithm, for adapting the demand injected traffic (congestion control) in the network, to maximize the network utility enforcing a fair allocation of the resources."; 096 } 097 098 @Override 099 public List<Triple<String, String, String>> getParameters() 100 { 101 /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */ 102 return InputParameter.getInformationAllInputParameterFieldsOfObject(this); 103 } 104 105 @Override 106 public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters) 107 { 108 /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */ 109 InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters); 110 111 this.currentNetPlan = currentNetPlan; 112 if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks"); 113 114 /* Remove all routes, and create one with the shortest path in km for each demand */ 115 currentNetPlan.removeAllUnicastRoutingInformation(); 116 currentNetPlan.setRoutingTypeAllDemands(RoutingType.SOURCE_ROUTING); 117 this.currentNetPlan.addRoutesFromCandidatePathList(currentNetPlan.computeUnicastCandidatePathList(currentNetPlan.getVectorLinkLengthInKm() , 1, -1, -1, -1, -1, -1, -1 , null)); 118 119 for (Route r : currentNetPlan.getRoutes ()) r.setCarriedTraffic(r.getDemand().getOfferedTraffic() , r.getDemand().getOfferedTraffic()); 120 121 this.rng = new Random (simulation_randomSeed.getLong()); 122 this.currentNetPlan = currentNetPlan; 123 this.D = currentNetPlan.getNumberOfDemands (); 124 this.E = currentNetPlan.getNumberOfLinks (); 125 this.N = currentNetPlan.getNumberOfNodes (); 126 127 /* Set the initial prices in the links: 1.0 */ 128 this.congControl_price_e = DoubleFactory1D.dense.make (E , control_initialLinkPrices.getDouble()); 129 130 /* Initialize the information each demand knows of the prices of all the links */ 131 this.control_mostUpdatedLinkPriceKnownByDemand_de = DoubleFactory2D.dense.make (D,E,control_initialLinkPrices.getDouble()); 132 133 /* Compute the traffic each demand injects, update the routes keeping the fraction */ 134 for (Demand d : currentNetPlan.getDemands()) 135 { 136 final double new_hd = this.computeHdFromPrices(d); 137 if (Double.isNaN(new_hd)) throw new RuntimeException ("Bad"); 138 final double old_hd = d.getOfferedTraffic(); 139 d.setOfferedTraffic(new_hd); 140 final Set<Route> routes = d.getRoutes(); 141 final double increasingFactor = (old_hd == 0)? Double.MAX_VALUE : new_hd/old_hd; 142 for (Route r : routes) 143 { 144 final double old_hr = r.getCarriedTraffic(); 145 final double new_hr = (old_hd == 0)? new_hd / routes.size() : old_hr * increasingFactor; 146 if (Double.isNaN(old_hr)) throw new RuntimeException ("Bad"); 147 if (Double.isNaN(new_hr)) throw new RuntimeException ("Bad"); 148 r.setCarriedTraffic(new_hr , new_hr); 149 } 150 if (Math.abs(d.getOfferedTraffic() - d.getCarriedTraffic()) > 1E-3) throw new RuntimeException ("Bad"); 151 } 152 153 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 154 for (Link e : currentNetPlan.getLinks()) 155 { 156 final double signalingTime = (signaling_isSynchronous.getBoolean())? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 157 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , e)); 158 } 159 for (Demand d : currentNetPlan.getDemands()) 160 { 161 final double updateTime = (update_isSynchronous.getBoolean())? update_averageInterUpdateTime.getDouble() : Math.max(0 , update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 162 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE , d)); 163 } 164 165 /* Intialize the traces */ 166 this.stat_traceOf_hd = new TimeTrace (); 167 this.stat_traceOf_pie = new TimeTrace (); 168 this.stat_traceOf_ye = new TimeTrace (); 169 this.stat_traceOf_objFunction = new TimeTrace (); 170 171 this.stat_traceOf_hd.add(0.0, this.currentNetPlan.getVectorDemandOfferedTraffic()); 172 this.stat_traceOf_pie.add(0.0, this.congControl_price_e.copy ()); 173 this.stat_traceOf_ye.add(0.0, this.currentNetPlan.getVectorLinkCarriedTraffic()); 174 this.stat_traceOf_objFunction.add(0.0, NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble())); 175 176 } 177 178 @Override 179 public void processEvent(NetPlan currentNetPlan, SimEvent event) 180 { 181 final double t = event.getEventTime(); 182 switch (event.getEventType()) 183 { 184 case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nt for any destination 185 { 186 final Pair<Demand,Pair<Link,Double>> signalInfo = (Pair<Demand,Pair<Link,Double>>) event.getEventObject(); 187 final Demand d = signalInfo.getFirst(); 188 final Pair<Link,Double> receivedInfo_price_e = signalInfo.getSecond(); 189 this.control_mostUpdatedLinkPriceKnownByDemand_de.set(d.getIndex() , receivedInfo_price_e.getFirst().getIndex () ,receivedInfo_price_e.getSecond()); 190 break; 191 } 192 193 case SIGNALING_WAKEUPTOSENDMESSAGE: 194 { 195 final Link eMe = (Link) event.getEventObject(); 196 197 /* Update the new price with the gradient approach */ 198 final double u_e = eMe.getCapacity(); 199 final double y_e = eMe.getCarriedTraffic(); 200 final double old_pie = this.congControl_price_e.get(eMe.getIndex()); 201 final double new_pie = Math.max(0, old_pie - this.gradient_gammaStep.getDouble() * (u_e - y_e) + 2*gradient_maxGradientAbsoluteNoise.getDouble()*(rng.nextDouble()-0.5)); 202 this.congControl_price_e.set(eMe.getIndex(), new_pie); 203 204 /* Create the info I will signal */ 205 Pair<Link,Double> infoToSignal = Pair.of(eMe , new_pie); 206 207 /* Send the events of the signaling information messages to all the nodes */ 208 for (Route route : eMe.getTraversingRoutes()) 209 { 210 if (rng.nextDouble() < this.signaling_signalingLossProbability.getDouble()) continue; // the signaling may be lost => lost to all demands 211 final Demand d = route.getDemand(); 212 final double signalingReceptionTime = t + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5)); 213 this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Pair.of(d , infoToSignal))); 214 } 215 216 /* Re-schedule when to wake up again */ 217 final double signalingTime = signaling_isSynchronous.getBoolean()? t + signaling_averageInterMessageTime.getDouble() : Math.max(t , t + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 218 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , eMe)); 219 break; 220 } 221 222 case UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available 223 { 224 final Demand dMe = (Demand) event.getEventObject(); 225 226 /* compute the new h_d and apply it */ 227 final double new_hd = computeHdFromPrices (dMe); 228 final double old_hd = dMe.getCarriedTraffic(); 229 dMe.setOfferedTraffic(new_hd); 230 final Set<Route> routes = dMe.getRoutes(); 231 final double increasingFactor = (old_hd == 0)? Double.MAX_VALUE : new_hd/old_hd; 232 for (Route r : routes) 233 { 234 final double old_hr = r.getCarriedTraffic(); 235 final double new_hr = (old_hd == 0)? new_hd / routes.size() : old_hr * increasingFactor; 236 r.setCarriedTraffic(new_hr , new_hr); 237 } 238// if (Math.abs(currentNetPlan.getDemandOfferedTraffic(dIdMe) - currentNetPlan.getDemandCarriedTraffic(dIdMe)) > 1E-3) throw new RuntimeException ("Bad"); 239 240 final double updateTime = update_isSynchronous.getBoolean()? t + update_averageInterUpdateTime.getDouble() : Math.max(t , t + update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 241 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE, dMe)); 242 243 this.stat_traceOf_hd.add(t, this.currentNetPlan.getVectorDemandOfferedTraffic()); 244 this.stat_traceOf_pie.add(t, this.congControl_price_e.copy ()); 245 this.stat_traceOf_ye.add(t, this.currentNetPlan.getVectorLinkCarriedTraffic()); 246 this.stat_traceOf_objFunction.add(t, NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble())); 247 248 if (t > this.simulation_maxNumberOfUpdateIntervals.getDouble() * this.update_averageInterUpdateTime.getDouble()) { this.endSimulation (); } 249 250 break; 251 } 252 253 254 default: throw new RuntimeException ("Unexpected received event"); 255 } 256 257 258 } 259 260 public String finish (StringBuilder st , double simTime) 261 { 262 if (simulation_outFileNameRoot.getString().equals("")) return null; 263 stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt")); 264 stat_traceOf_pie.printToFile(new File (simulation_outFileNameRoot.getString() + "_pie.txt")); 265 stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt")); 266 stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt")); 267 Triple<DoubleMatrix1D,DoubleMatrix1D,Double> pair = computeOptimumSolution (); 268 DoubleMatrix1D h_d_opt = pair.getFirst(); 269 DoubleMatrix1D pi_e = pair.getSecond(); 270 double optCost = pair.getThird(); 271 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), optCost); 272 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), h_d_opt); 273 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_pie.txt"), pi_e); 274 return null; 275 } 276 277 private double computeHdFromPrices (Demand d) 278 { 279 DoubleMatrix1D infoIKnow_price_e = this.control_mostUpdatedLinkPriceKnownByDemand_de.viewRow (d.getIndex ()); 280 281 /* compute the demand price as weighted sum in the routes of route prices */ 282 double demandWeightedSumLinkPrices = 0; 283 double demandCarriedTraffic = 0; 284 for (Route r : d.getRoutes ()) 285 { 286 final double h_r = r.getCarriedTraffic(); 287 demandCarriedTraffic += h_r; 288 for (Link e : r.getSeqLinks()) 289 demandWeightedSumLinkPrices += h_r * infoIKnow_price_e.get(e.getIndex ()); 290 } 291 demandWeightedSumLinkPrices /= demandCarriedTraffic; 292 293 /* compute the new h_d */ 294 final double new_hd = Math.max(this.control_minHd.getDouble() , Math.min(this.control_maxHd.getDouble(), Math.pow(demandWeightedSumLinkPrices, -1/this.control_fairnessFactor.getDouble()))); 295 return new_hd; 296 } 297 298 private Triple<DoubleMatrix1D,DoubleMatrix1D,Double> computeOptimumSolution () 299 { 300 /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */ 301 final int D = this.currentNetPlan.getNumberOfDemands(); 302 303 OptimizationProblem op = new OptimizationProblem(); 304 305 /* Add the decision variables to the problem */ 306 op.addDecisionVariable("h_d", false, new int[] {1, D}, control_minHd.getDouble(), control_maxHd.getDouble()); 307 308 /* Set some input parameters */ 309 op.setInputParameter("u_e", currentNetPlan.getVectorLinkCapacity() , "row"); 310 op.setInputParameter("alpha", control_fairnessFactor.getDouble()); 311 op.setInputParameter("R_de", currentNetPlan.getMatrixDemand2LinkAssignment()); 312 313 /* Sets the objective function */ 314 if (control_fairnessFactor.getDouble() == 1) 315 op.setObjectiveFunction("maximize", "sum(ln(h_d))"); 316 else if (control_fairnessFactor.getDouble() == 0) 317 op.setObjectiveFunction("maximize", "sum(h_d)"); 318 else 319 op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))"); 320 321 op.addConstraint("h_d * R_de <= u_e" , "pi_e"); // the capacity constraints (E constraints) 322 323 /* Call the solver to solve the problem */ 324 op.solve("ipopt"); 325 326 /* If an optimal solution was not found, quit */ 327 if (!op.solutionIsOptimal()) throw new Net2PlanException("An optimal solution was not found"); 328 329 /* Retrieve the optimum solutions */ 330 DoubleMatrix1D h_d = op.getPrimalSolution("h_d").view1D (); 331 DoubleMatrix1D pi_e = op.getMultipliersOfConstraint("pi_e").assign(DoubleFunctions.abs).view1D (); 332 333 return Triple.of(h_d,pi_e,NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble())); 334 } 335 336}