001/******************************************************************************* 002 * Copyright (c) 2016 Pablo Pavon Mariņo. 003 * All rights reserved. This program and the accompanying materials 004 * are made available under the terms of the GNU Lesser Public License v2.1 005 * which accompanies this distribution, and is available at 006 * http://www.gnu.org/licenses/lgpl.html 007 ******************************************************************************/ 008package com.net2plan.examples.ocnbook.onlineSim; 009 010 011 012 013import java.io.File; 014import java.io.FileOutputStream; 015import java.io.PrintStream; 016import java.util.List; 017import java.util.Map; 018import java.util.Random; 019import java.util.Set; 020 021import cern.colt.matrix.tdouble.DoubleFactory1D; 022import cern.colt.matrix.tdouble.DoubleFactory2D; 023import cern.colt.matrix.tdouble.DoubleMatrix1D; 024import cern.colt.matrix.tdouble.DoubleMatrix2D; 025import cern.jet.math.tdouble.DoubleFunctions; 026 027import com.jom.OptimizationProblem; 028import com.net2plan.interfaces.networkDesign.Demand; 029import com.net2plan.interfaces.networkDesign.Link; 030import com.net2plan.interfaces.networkDesign.Net2PlanException; 031import com.net2plan.interfaces.networkDesign.NetPlan; 032import com.net2plan.interfaces.networkDesign.Route; 033import com.net2plan.interfaces.simulation.IEventProcessor; 034import com.net2plan.interfaces.simulation.SimEvent; 035import com.net2plan.libraries.NetworkPerformanceMetrics; 036import com.net2plan.utils.Constants.RoutingType; 037import com.net2plan.utils.InputParameter; 038import com.net2plan.utils.Pair; 039import com.net2plan.utils.TimeTrace; 040import com.net2plan.utils.Triple; 041 042/** 043 * This module implements a distributed dual-gradient based algorithm, for adapting the demand injected traffic (congestion control) in the network, to maximize the network utility enforcing a fair allocation of the resources. 044 * 045 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 046 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 047 * The time evolution of different metrics can be stored in output files, for later processing. 048 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec10_4_congestionControlDual.m">{@code fig_sec10_4_congestionControlDual.m}</a> MATLAB file used for generating the graph/s of the case study in the 049 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm. 050 * 051 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator. 052 * 053 * @net2plan.keywords Bandwidth assignment (BA), Distributed algorithm, Dual gradient algorithm 054 * @net2plan.ocnbooksections Section 10.4 055 * @net2plan.inputParameters 056 * @author Pablo Pavon-Marino 057 */ 058public class Online_evProc_congestionControlDual extends IEventProcessor 059{ 060 private Random rng; 061 062 private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages"); 063 private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true); 064 private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true); 065 private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true); 066 private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 067 private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true); 068 private InputParameter update_isSynchronous = new InputParameter ("update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state"); 069 private InputParameter update_averageInterUpdateTime = new InputParameter ("update_averageInterUpdateTime", 1.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true); 070 private InputParameter update_maxFluctuationInterUpdateTime = new InputParameter ("update_maxFluctuationInterUpdateTime", 0.5 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 071 private InputParameter gradient_gammaStep = new InputParameter ("gradient_gammaStep", 0.0001 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true); 072 private InputParameter gradient_maxGradientAbsoluteNoise = new InputParameter ("gradient_maxGradientAbsoluteNoise", 0.0 , "Max value of the added noise to the gradient coordinate in absolute values" , 0 , true , Double.MAX_VALUE , true); 073 074 private InputParameter simulation_maxNumberOfUpdateIntervals = new InputParameter ("simulation_maxNumberOfUpdateIntervals", 700.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true); 075 private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator"); 076 private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "congestionControlDual" , "Root of the file name to be used in the output files. If blank, no output"); 077 078 private InputParameter control_minHd = new InputParameter ("control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 079 private InputParameter control_maxHd = new InputParameter ("control_maxHd", 1.0E6 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 080 private InputParameter control_fairnessFactor = new InputParameter ("control_fairnessFactor", 2.0 , "Fairness factor in utility function of congestion control" , 0 , true , Double.MAX_VALUE , true); 081 private InputParameter control_initialLinkPrices = new InputParameter ("control_initialLinkPrices", 1 , "Initial value of the link weights" , 0 , true , Double.MAX_VALUE , true); 082 083 private static final int SIGNALING_WAKEUPTOSENDMESSAGE = 400; 084 private static final int SIGNALING_RECEIVEDMESSAGE = 401; 085 private static final int UPDATE_WAKEUPTOUPDATE = 402; 086 087 private NetPlan currentNetPlan; 088 private int N,E,D; 089 private DoubleMatrix1D congControl_price_e; 090 private DoubleMatrix2D control_mostUpdatedLinkPriceKnownByDemand_de; 091 092 private TimeTrace stat_traceOf_hd; 093 private TimeTrace stat_traceOf_objFunction; 094 private TimeTrace stat_traceOf_pie; 095 private TimeTrace stat_traceOf_ye; 096 097 @Override 098 public String getDescription() 099 { 100 return "This module implements a distributed dual-gradient based algorithm, for adapting the demand injected traffic (congestion control) in the network, to maximize the network utility enforcing a fair allocation of the resources."; 101 } 102 103 @Override 104 public List<Triple<String, String, String>> getParameters() 105 { 106 /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */ 107 return InputParameter.getInformationAllInputParameterFieldsOfObject(this); 108 } 109 110 @Override 111 public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters) 112 { 113 /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */ 114 InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters); 115 116 this.currentNetPlan = currentNetPlan; 117 if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks"); 118 119 /* Remove all routes, and create one with the shortest path in km for each demand */ 120 currentNetPlan.removeAllUnicastRoutingInformation(); 121 currentNetPlan.setRoutingType(RoutingType.SOURCE_ROUTING); 122 currentNetPlan.addRoutesFromCandidatePathList(currentNetPlan.getVectorLinkLengthInKm().toArray() , "K" , "1"); 123 for (Route r : currentNetPlan.getRoutes ()) r.setCarriedTraffic(r.getDemand().getOfferedTraffic() , r.getDemand().getOfferedTraffic()); 124 125 this.rng = new Random (simulation_randomSeed.getLong()); 126 this.currentNetPlan = currentNetPlan; 127 this.D = currentNetPlan.getNumberOfDemands (); 128 this.E = currentNetPlan.getNumberOfLinks (); 129 this.N = currentNetPlan.getNumberOfNodes (); 130 131 /* Set the initial prices in the links: 1.0 */ 132 this.congControl_price_e = DoubleFactory1D.dense.make (E , control_initialLinkPrices.getDouble()); 133 134 /* Initialize the information each demand knows of the prices of all the links */ 135 this.control_mostUpdatedLinkPriceKnownByDemand_de = DoubleFactory2D.dense.make (D,E,control_initialLinkPrices.getDouble()); 136 137 /* Compute the traffic each demand injects, update the routes keeping the fraction */ 138 for (Demand d : currentNetPlan.getDemands()) 139 { 140 final double new_hd = this.computeHdFromPrices(d); 141 if (Double.isNaN(new_hd)) throw new RuntimeException ("Bad"); 142 final double old_hd = d.getOfferedTraffic(); 143 d.setOfferedTraffic(new_hd); 144 final Set<Route> routes = d.getRoutes(); 145 final double increasingFactor = (old_hd == 0)? Double.MAX_VALUE : new_hd/old_hd; 146 for (Route r : routes) 147 { 148 final double old_hr = r.getCarriedTraffic(); 149 final double new_hr = (old_hd == 0)? new_hd / routes.size() : old_hr * increasingFactor; 150 if (Double.isNaN(old_hr)) throw new RuntimeException ("Bad"); 151 if (Double.isNaN(new_hr)) throw new RuntimeException ("Bad"); 152 r.setCarriedTraffic(new_hr , new_hr); 153 } 154 if (Math.abs(d.getOfferedTraffic() - d.getCarriedTraffic()) > 1E-3) throw new RuntimeException ("Bad"); 155 } 156 157 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 158 for (Link e : currentNetPlan.getLinks()) 159 { 160 final double signalingTime = (signaling_isSynchronous.getBoolean())? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 161 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , e)); 162 } 163 for (Demand d : currentNetPlan.getDemands()) 164 { 165 final double updateTime = (update_isSynchronous.getBoolean())? update_averageInterUpdateTime.getDouble() : Math.max(0 , update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 166 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE , d)); 167 } 168 169 /* Intialize the traces */ 170 this.stat_traceOf_hd = new TimeTrace (); 171 this.stat_traceOf_pie = new TimeTrace (); 172 this.stat_traceOf_ye = new TimeTrace (); 173 this.stat_traceOf_objFunction = new TimeTrace (); 174 175 this.stat_traceOf_hd.add(0.0, this.currentNetPlan.getVectorDemandOfferedTraffic()); 176 this.stat_traceOf_pie.add(0.0, this.congControl_price_e.copy ()); 177 this.stat_traceOf_ye.add(0.0, this.currentNetPlan.getVectorLinkTotalCarriedTraffic()); 178 this.stat_traceOf_objFunction.add(0.0, NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble())); 179 180 } 181 182 @Override 183 public void processEvent(NetPlan currentNetPlan, SimEvent event) 184 { 185 final double t = event.getEventTime(); 186 switch (event.getEventType()) 187 { 188 case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nt for any destination 189 { 190 final Pair<Demand,Pair<Link,Double>> signalInfo = (Pair<Demand,Pair<Link,Double>>) event.getEventObject(); 191 final Demand d = signalInfo.getFirst(); 192 final Pair<Link,Double> receivedInfo_price_e = signalInfo.getSecond(); 193 this.control_mostUpdatedLinkPriceKnownByDemand_de.set(d.getIndex() , receivedInfo_price_e.getFirst().getIndex () ,receivedInfo_price_e.getSecond()); 194 break; 195 } 196 197 case SIGNALING_WAKEUPTOSENDMESSAGE: 198 { 199 final Link eMe = (Link) event.getEventObject(); 200 201 /* Update the new price with the gradient approach */ 202 final double u_e = eMe.getCapacity(); 203 final double y_e = eMe.getCarriedTrafficIncludingProtectionSegments(); 204 final double old_pie = this.congControl_price_e.get(eMe.getIndex()); 205 final double new_pie = Math.max(0, old_pie - this.gradient_gammaStep.getDouble() * (u_e - y_e) + 2*gradient_maxGradientAbsoluteNoise.getDouble()*(rng.nextDouble()-0.5)); 206 this.congControl_price_e.set(eMe.getIndex(), new_pie); 207 208 /* Create the info I will signal */ 209 Pair<Link,Double> infoToSignal = Pair.of(eMe , new_pie); 210 211 /* Send the events of the signaling information messages to all the nodes */ 212 for (Route route : eMe.getTraversingRoutes()) 213 { 214 if (rng.nextDouble() < this.signaling_signalingLossProbability.getDouble()) continue; // the signaling may be lost => lost to all demands 215 final Demand d = route.getDemand(); 216 final double signalingReceptionTime = t + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5)); 217 this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Pair.of(d , infoToSignal))); 218 } 219 220 /* Re-schedule when to wake up again */ 221 final double signalingTime = signaling_isSynchronous.getBoolean()? t + signaling_averageInterMessageTime.getDouble() : Math.max(t , t + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 222 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , eMe)); 223 break; 224 } 225 226 case UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available 227 { 228 final Demand dMe = (Demand) event.getEventObject(); 229 230 /* compute the new h_d and apply it */ 231 final double new_hd = computeHdFromPrices (dMe); 232 final double old_hd = dMe.getCarriedTraffic(); 233 dMe.setOfferedTraffic(new_hd); 234 final Set<Route> routes = dMe.getRoutes(); 235 final double increasingFactor = (old_hd == 0)? Double.MAX_VALUE : new_hd/old_hd; 236 for (Route r : routes) 237 { 238 final double old_hr = r.getCarriedTraffic(); 239 final double new_hr = (old_hd == 0)? new_hd / routes.size() : old_hr * increasingFactor; 240 r.setCarriedTraffic(new_hr , new_hr); 241 } 242// if (Math.abs(currentNetPlan.getDemandOfferedTraffic(dIdMe) - currentNetPlan.getDemandCarriedTraffic(dIdMe)) > 1E-3) throw new RuntimeException ("Bad"); 243 244 final double updateTime = update_isSynchronous.getBoolean()? t + update_averageInterUpdateTime.getDouble() : Math.max(t , t + update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 245 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE, dMe)); 246 247 this.stat_traceOf_hd.add(t, this.currentNetPlan.getVectorDemandOfferedTraffic()); 248 this.stat_traceOf_pie.add(t, this.congControl_price_e.copy ()); 249 this.stat_traceOf_ye.add(t, this.currentNetPlan.getVectorLinkTotalCarriedTraffic()); 250 this.stat_traceOf_objFunction.add(t, NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble())); 251 252 if (t > this.simulation_maxNumberOfUpdateIntervals.getDouble() * this.update_averageInterUpdateTime.getDouble()) { this.endSimulation (); } 253 254 break; 255 } 256 257 258 default: throw new RuntimeException ("Unexpected received event"); 259 } 260 261 262 } 263 264 public String finish (StringBuilder st , double simTime) 265 { 266 if (simulation_outFileNameRoot.getString().equals("")) return null; 267 stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt")); 268 stat_traceOf_pie.printToFile(new File (simulation_outFileNameRoot.getString() + "_pie.txt")); 269 stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt")); 270 stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt")); 271 Triple<DoubleMatrix1D,DoubleMatrix1D,Double> pair = computeOptimumSolution (); 272 DoubleMatrix1D h_d_opt = pair.getFirst(); 273 DoubleMatrix1D pi_e = pair.getSecond(); 274 double optCost = pair.getThird(); 275 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), optCost); 276 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), h_d_opt); 277 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_pie.txt"), pi_e); 278 return null; 279 } 280 281 private double computeHdFromPrices (Demand d) 282 { 283 DoubleMatrix1D infoIKnow_price_e = this.control_mostUpdatedLinkPriceKnownByDemand_de.viewRow (d.getIndex ()); 284 285 /* compute the demand price as weighted sum in the routes of route prices */ 286 double demandWeightedSumLinkPrices = 0; 287 double demandCarriedTraffic = 0; 288 for (Route r : d.getRoutes ()) 289 { 290 final double h_r = r.getCarriedTraffic(); 291 demandCarriedTraffic += h_r; 292 for (Link e : r.getSeqLinksRealPath()) 293 demandWeightedSumLinkPrices += h_r * infoIKnow_price_e.get(e.getIndex ()); 294 } 295 demandWeightedSumLinkPrices /= demandCarriedTraffic; 296 297 /* compute the new h_d */ 298 final double new_hd = Math.max(this.control_minHd.getDouble() , Math.min(this.control_maxHd.getDouble(), Math.pow(demandWeightedSumLinkPrices, -1/this.control_fairnessFactor.getDouble()))); 299 return new_hd; 300 } 301 302 private Triple<DoubleMatrix1D,DoubleMatrix1D,Double> computeOptimumSolution () 303 { 304 /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */ 305 final int D = this.currentNetPlan.getNumberOfDemands(); 306 307 OptimizationProblem op = new OptimizationProblem(); 308 309 /* Add the decision variables to the problem */ 310 op.addDecisionVariable("h_d", false, new int[] {1, D}, control_minHd.getDouble(), control_maxHd.getDouble()); 311 312 /* Set some input parameters */ 313 op.setInputParameter("u_e", currentNetPlan.getVectorLinkCapacity() , "row"); 314 op.setInputParameter("alpha", control_fairnessFactor.getDouble()); 315 op.setInputParameter("R_de", currentNetPlan.getMatrixDemand2LinkAssignment()); 316 317 /* Sets the objective function */ 318 if (control_fairnessFactor.getDouble() == 1) 319 op.setObjectiveFunction("maximize", "sum(ln(h_d))"); 320 else if (control_fairnessFactor.getDouble() == 0) 321 op.setObjectiveFunction("maximize", "sum(h_d)"); 322 else 323 op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))"); 324 325 op.addConstraint("h_d * R_de <= u_e" , "pi_e"); // the capacity constraints (E constraints) 326 327 /* Call the solver to solve the problem */ 328 op.solve("ipopt"); 329 330 /* If an optimal solution was not found, quit */ 331 if (!op.solutionIsOptimal()) throw new Net2PlanException("An optimal solution was not found"); 332 333 /* Retrieve the optimum solutions */ 334 DoubleMatrix1D h_d = op.getPrimalSolution("h_d").view1D (); 335 DoubleMatrix1D pi_e = op.getMultipliersOfConstraint("pi_e").assign(DoubleFunctions.abs).view1D (); 336 337 return Triple.of(h_d,pi_e,NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble())); 338 } 339 340}