001/******************************************************************************* 002 * Copyright (c) 2017 Pablo Pavon Marino and others. 003 * All rights reserved. This program and the accompanying materials 004 * are made available under the terms of the 2-clause BSD License 005 * which accompanies this distribution, and is available at 006 * https://opensource.org/licenses/BSD-2-Clause 007 * 008 * Contributors: 009 * Pablo Pavon Marino and others - initial API and implementation 010 *******************************************************************************/ 011package com.net2plan.examples.ocnbook.onlineSim; 012 013 014import cern.colt.matrix.tdouble.DoubleMatrix1D; 015import cern.colt.matrix.tdouble.DoubleMatrix2D; 016import cern.colt.matrix.tint.IntFactory3D; 017import cern.colt.matrix.tint.IntMatrix3D; 018import cern.jet.math.tdouble.DoubleFunctions; 019import cern.jet.random.tdouble.Poisson; 020import com.jom.OptimizationProblem; 021import com.net2plan.interfaces.networkDesign.*; 022import com.net2plan.interfaces.simulation.IEventProcessor; 023import com.net2plan.interfaces.simulation.SimEvent; 024import com.net2plan.utils.Constants.RoutingType; 025import com.net2plan.utils.*; 026 027import java.io.File; 028import java.io.FileOutputStream; 029import java.io.PrintStream; 030import java.util.*; 031 032/** 033 * This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources. 034 * 035 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 036 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 037 * The time evolution of different metrics can be stored in output files, for later processing. 038 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m">{@code fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m}</a> MATLAB file used for generating the graph/s of the case study in the 039 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm. 040 * 041 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator. 042 * 043 * @net2plan.keywords Bandwidth assignment (BA), Flow assignment (FA), Backpressure routing, Distributed algorithm, Dual decomposition 044 * @net2plan.ocnbooksections Section 11.4 045 * @net2plan.inputParameters 046 * @author Pablo Pavon-Marino 047 */ 048@SuppressWarnings("unchecked") 049public class Online_evProc_congControlAndBackpressureRoutingDualDecomp extends IEventProcessor 050{ 051 private static PrintStream getNulFile () { try { return new PrintStream (new FileOutputStream ("NUL") , false); } catch (Exception e) {e.printStackTrace(); throw new RuntimeException ("Not NUL file"); } } 052 private PrintStream log = getNulFile (); //System.err;//new PrintStream (new FileOutputStream ("NUL") , true); 053 054 /******************** MAC ****************/ 055 static final int CC_UPDATE_WAKEUPTOUPDATE = 402; 056 057 private InputParameter cc_update_isSynchronous = new InputParameter ("cc_update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state"); 058 private InputParameter cc_update_averageInterUpdateTime = new InputParameter ("cc_update_averageInterUpdateTime", 10.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true); 059 private InputParameter cc_update_maxFluctuationInterUpdateTime = new InputParameter ("cc_update_maxFluctuationInterUpdateTime", 5.0 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 060 061 private InputParameter cc_simulation_maxNumberOfUpdateIntervals = new InputParameter ("cc_simulation_maxNumberOfUpdateIntervals", 1000.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true); 062 private InputParameter cc_control_minHd = new InputParameter ("cc_control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 063 private InputParameter cc_control_maxHd = new InputParameter ("cc_control_maxHd", 25 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 064 private InputParameter cc_control_fairnessFactor = new InputParameter ("cc_control_fairnessFactor_1", 2.0 , "Fairness factor in utility function of congestion control for demands" , 0 , true , Double.MAX_VALUE , true); 065 066 067 private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator"); 068 private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "crossLayerCongControlBackpressureRoutingDualDecomp" , "Root of the file name to be used in the output files. If blank, no output"); 069 070 private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages"); 071 private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true); 072 private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true); 073 private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true); 074 private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 075 private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true); 076 077 private InputParameter routing_fixedPacketDurationAndSchedulingInterval = new InputParameter ("routing_fixedPacketDurationAndSchedulingInterval", 1.0 , "Fixed slot size (synchronized all links)" , 0 , false , Double.MAX_VALUE , true); 078 private InputParameter routing_numTrafficUnitsOfOnePacket = new InputParameter ("routing_numTrafficUnitsOfOnePacket", 1.0 , "One packet in one slot time is this traffic" , 0 , false , Double.MAX_VALUE , true); 079 private InputParameter routing_statNumSchedSlotBetweenN2PRecomputing = new InputParameter ("routing_statNumSchedSlotBetweenN2PRecomputing", (int) 100 , "Number of scheduling slots between two N2P update and storing a stat trace" , 1 , Integer.MAX_VALUE); 080 private InputParameter routing_maxNumberOfN2PStatisticsIntervals = new InputParameter ("routing_maxNumberOfN2PStatisticsIntervals", (int) 15 , "Simulation length in number of scheduling intervals" , 1 , Integer.MAX_VALUE); 081 082 private InputParameter routing_gradient_gammaStep = new InputParameter ("routing_gradient_gammaStep", 0.000005 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true); 083 084 /* Optimum solution */ 085 private Pair<NetPlan,double[][]> optNetPlan; 086 087 /******************* ROUTING ****/ 088 089 private Random rng; 090 int N,E,D; 091 092 static final int SIGNALING_WAKEUPTOSENDMESSAGE = 300; // A NODE SIGNAL QUEUES TO NEIGHBOR NODES 093 static final int SIGNALING_RECEIVEDMESSAGE = 301; // A NODE RECEIVED SIGNAL QUEUES TO NEIGHBOR NODES 094 static final int UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS = 302; // ALL NODES SCHEDULE ONE PACKET PER LINK 095 static final int UPDATE_STATISTICTRACES = 303; // STAT UPDATE OF N2P 096 097 098 private NetPlan currentNetPlan; 099 100 private Map<List<Link>,Route> stat_mapSeqLinks2RouteId; 101 private Map<Route,Integer> stat_mapRouteId2CarriedPacketsLastInterval; 102 private int [] stat_accumNumGeneratedPackets_d; 103 private int [] stat_accumNumReceivedPackets_d; 104 private int [][] stat_accumNumQeueusPackets_nd; 105 106// private TimeTrace traceOf_objFunction; 107// private TimeTrace traceOf_ye; 108 private TimeTrace stat_traceOf_hd; 109 private TimeTrace stat_traceOf_xp; // traces are taken from netPlan, when it is updated (stat_n2pRecomputingInterval) 110 private TimeTrace stat_traceOf_objFunction; 111 private TimeTrace stat_traceOf_ye; 112 private TimeTrace stat_traceOf_queueSizes; 113 114 115 private IntMatrix3D ctlMostUpdatedQndKnownByNodeN1_n1nd; // sparse for memory efficiency 116 private int [][] ctlNumPacketsQueue_nd; 117 private Map<Pair<Node,Demand>,LinkedList<PacketInfo>> ctlQueue_nd; 118 119 @Override 120 public String getDescription() 121 { 122 return "This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources."; 123 } 124 125 @Override 126 public List<Triple<String, String, String>> getParameters() 127 { 128 /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */ 129 return InputParameter.getInformationAllInputParameterFieldsOfObject(this); 130 } 131 132 @Override 133 public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters) 134 { 135 /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */ 136 InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters); 137 138 this.currentNetPlan = currentNetPlan; 139 if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks"); 140 141 /* Update the indexes. The set of nodes, links and demands cannot change during the algorithm */ 142 this.N = this.currentNetPlan.getNumberOfNodes(); 143 this.E = this.currentNetPlan.getNumberOfLinks(); 144 this.D = this.currentNetPlan.getNumberOfDemands(); 145 this.rng = new Random (simulation_randomSeed.getLong()); 146 if ((E == 0) || (D == 0)) throw new Net2PlanException ("The input design should have links and demands"); 147 148 /* Sets the initial routing, with all the traffic balanced equally in all the paths of the demand */ 149 this.stat_mapSeqLinks2RouteId = new HashMap<List<Link>,Route> (); 150 this.stat_mapRouteId2CarriedPacketsLastInterval = new HashMap<Route,Integer> (); 151 152 /* Computes the optimum NetPlan */ 153 this.currentNetPlan.removeAllUnicastRoutingInformation(); 154 this.currentNetPlan.setRoutingTypeAllDemands(RoutingType.SOURCE_ROUTING); 155 156 this.currentNetPlan.removeAllRoutes(); 157 this.optNetPlan = computeOptimumSolution (); 158 /* Add the routes in the optimum solution to the netPlan, with zero traffic. For having teh same IDs later */ 159 this.currentNetPlan.copyFrom(optNetPlan.getFirst()); 160 for (Route r: currentNetPlan.getRoutes()) 161 { 162 r.setCarriedTraffic(0.0 , 0.0); 163 this.stat_mapSeqLinks2RouteId.put(r.getSeqLinks(),r); 164 this.stat_mapRouteId2CarriedPacketsLastInterval.put(r, 0); 165 } 166 167 168 this.ctlQueue_nd = new HashMap<Pair<Node,Demand>,LinkedList<PacketInfo>> (); 169 for (Node n: this.currentNetPlan.getNodes()) 170 for (Demand d: this.currentNetPlan.getDemands()) 171 { ctlQueue_nd.put(Pair.of(n, d), new LinkedList<PacketInfo> ()); } 172 173 this.ctlMostUpdatedQndKnownByNodeN1_n1nd = IntFactory3D.sparse.make(N, N, D); 174 this.ctlNumPacketsQueue_nd = new int [N][D]; 175 176 this.stat_accumNumGeneratedPackets_d = new int [D]; 177 this.stat_accumNumReceivedPackets_d = new int [D]; 178 this.stat_accumNumQeueusPackets_nd= new int [N][D]; 179 180 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 181 this.scheduleEvent(new SimEvent (routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * routing_fixedPacketDurationAndSchedulingInterval.getDouble(), SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1)); 182 this.scheduleEvent(new SimEvent (routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1)); 183 for (Node n : currentNetPlan.getNodes()) 184 { 185 final double signalingTime = signaling_isSynchronous.getBoolean()? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 186 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , n)); 187 } 188 189 /* Intialize the traces */ 190 this.stat_traceOf_xp = new TimeTrace (); 191 this.stat_traceOf_ye = new TimeTrace (); 192 this.stat_traceOf_queueSizes = new TimeTrace (); 193 this.stat_traceOf_objFunction = new TimeTrace (); 194 this.stat_traceOf_hd = new TimeTrace (); 195 this.stat_traceOf_xp.add(0.0 , netPlanRouteCarriedTrafficMap (this.currentNetPlan)); 196 this.stat_traceOf_ye.add(0.0, currentNetPlan.getVectorLinkCarriedTraffic()); 197 this.stat_traceOf_queueSizes.add(0.0, copyOf(this.ctlNumPacketsQueue_nd)); 198 this.stat_traceOf_objFunction.add(0.0, computeObjectiveFunctionFromNetPlan(this.currentNetPlan)); 199 this.stat_traceOf_hd.add(0.0, currentNetPlan.getVectorDemandOfferedTraffic()); 200 201 /* CONG CONTROL */ 202 /* Set the initial prices in the links: 1.0 */ 203 /* Initialize the information each demand knows of the prices of all the links */ 204 /* Compute the traffic each demand injects, set it in net2plan, and tell routing layer there was an update in this */ 205 for (Demand d : currentNetPlan.getDemands()) 206 d.setOfferedTraffic(this.computeHdFromPrices(d)); 207 208 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 209 for (Demand d : currentNetPlan.getDemands()) 210 { 211 final double updateTime = (cc_update_isSynchronous.getBoolean())? cc_update_averageInterUpdateTime.getDouble() : Math.max(0 , cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 212 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE , d)); 213 } 214 } 215 216 @Override 217 public void processEvent(NetPlan currentNetPlan, SimEvent event) 218 { 219 final double time = event.getEventTime(); 220 switch (event.getEventType()) 221 { 222 case UPDATE_STATISTICTRACES: 223 { 224 for (Route r : this.currentNetPlan.getRoutes()) 225 { 226 final int numPacketsArrivedDestinationLastInterval = stat_mapRouteId2CarriedPacketsLastInterval.get(r); 227 final double trafficArrivedDestinationLastInterval = numPacketsArrivedDestinationLastInterval * routing_numTrafficUnitsOfOnePacket.getDouble() / (routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt ()); 228 r.setCarriedTraffic(trafficArrivedDestinationLastInterval , trafficArrivedDestinationLastInterval); 229 /* Set zero in the counter of packets */ 230 stat_mapRouteId2CarriedPacketsLastInterval.put(r,0); 231 } 232 233 /* Update the traces */ 234 this.stat_traceOf_xp.add(time , netPlanRouteCarriedTrafficMap (this.currentNetPlan)); 235 this.stat_traceOf_ye.add(time, currentNetPlan.getVectorLinkCarriedTraffic()); 236 this.stat_traceOf_objFunction.add(time, computeObjectiveFunctionFromNetPlan(this.currentNetPlan)); 237 final double scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic = this.routing_numTrafficUnitsOfOnePacket.getDouble() / this.routing_statNumSchedSlotBetweenN2PRecomputing.getInt (); 238 /* We store the average queue sizes in traffic units */ 239 this.stat_traceOf_queueSizes.add(time, scaledCopyOf(this.stat_accumNumQeueusPackets_nd , scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic)); 240 this.stat_accumNumQeueusPackets_nd = new int [N][D]; // reset the count 241 this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1)); 242 break; 243 } 244 245 case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nd for any demand 246 { 247 final Triple<Node,Node,int []> signalInfo = (Triple<Node,Node,int []>) event.getEventObject(); 248 final Node nMe = signalInfo.getFirst(); 249 final int index_nIdMe = nMe.getIndex (); 250 final Node nNeighbor = signalInfo.getSecond(); 251 final int index_nNeighbor = nNeighbor.getIndex (); 252 final int [] receivedInfo_q_d = signalInfo.getThird(); 253 for (int index_d = 0 ; index_d < D ; index_d ++) 254 ctlMostUpdatedQndKnownByNodeN1_n1nd.set(index_nIdMe, index_nNeighbor, index_d, receivedInfo_q_d[index_d]); 255 break; 256 257 } 258 259 case SIGNALING_WAKEUPTOSENDMESSAGE: // A node broadcasts signaling info to its 1 hop neighbors 260 { 261 final Node nMe = (Node) event.getEventObject(); 262 final int index_nMe = nMe.getIndex (); 263 264 log.println("t=" + time + ": ROUTING_NODEWAKEUPTO_TRANSMITSIGNAL node " + nMe); 265 266 /* Create the info I will signal */ 267 int [] infoToSignal_q_d = Arrays.copyOf(ctlNumPacketsQueue_nd [index_nMe], D); 268 269 /* Send the events of the signaling information messages to incoming neighbors */ 270 for (Node n : nMe.getInNeighbors()) 271 { 272 if (rng.nextDouble() >= this.signaling_signalingLossProbability.getDouble()) // the signaling may be lost => lost to all nodes 273 { 274 final double signalingReceptionTime = time + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5)); 275 this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Triple.of(n , nMe , infoToSignal_q_d))); 276 } 277 } 278 279 /* Re-schedule when to wake up again */ 280 final double signalingTime = signaling_isSynchronous.getBoolean()? time + signaling_averageInterMessageTime.getDouble() : Math.max(time , time + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 281 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , nMe)); 282 break; 283 } 284 285 case UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS: // a node updates its p_n, p_e values, using most updated info available 286 { 287 log.println("t=" + time + ": UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS"); 288 289 /* First, create traffic of the demands. A constant amount */ 290 for (Demand d : this.currentNetPlan.getDemands()) 291 { 292 final double avgNumPackets_d = d.getOfferedTraffic() / routing_numTrafficUnitsOfOnePacket.getDouble(); 293 final int numPackets_d = Poisson.staticNextInt(avgNumPackets_d); 294 final Node a_d = d.getIngressNode(); 295 LinkedList<PacketInfo> q = this.ctlQueue_nd.get(Pair.of(a_d, d)); 296 for (int cont = 0 ; cont < numPackets_d ; cont ++) 297 q.add (new PacketInfo (d , time)); 298 this.ctlNumPacketsQueue_nd [a_d.getIndex ()][d.getIndex ()] += numPackets_d; 299 this.stat_accumNumGeneratedPackets_d [d.getIndex ()] += numPackets_d; 300 } 301 302 /* Forwarding decisions for each link. Each link can transmit as many packets as its capacity divided by trafic per packet factor (floor), during this scheduling */ 303 int [][] numPacketsToForward_ed = new int [E][D]; 304 int [][] numPacketsToForward_nd = new int [N][D]; 305 for (int index_e = 0 ; index_e < E ; index_e ++) 306 { 307 final Link e = currentNetPlan.getLink (index_e); 308 final int maxNumPacketsToTransmit = (int) Math.floor(e.getCapacity() / this.routing_numTrafficUnitsOfOnePacket.getDouble()); 309 final Node a_e = e.getOriginNode(); 310 final Node b_e = e.getDestinationNode(); 311 final int index_ae = a_e.getIndex (); 312 final int index_be = b_e.getIndex (); 313 for (int cont = 0 ; cont < maxNumPacketsToTransmit ; cont ++) 314 { 315 /* Take the queue with higher */ 316 double highestBackpressure = -1; 317 int indexDemandToTransmitPacket = -1; 318 for (int index_d = 0 ; index_d < D ; index_d ++) 319 { 320 final Demand d = currentNetPlan.getDemand (index_d); 321 final int thisQueueSize = ctlNumPacketsQueue_nd [index_ae][index_d]; 322 if (thisQueueSize != ctlQueue_nd.get(Pair.of(a_e, d)).size ()) throw new RuntimeException ("Bad"); 323 final int neighborQueueSize = ctlMostUpdatedQndKnownByNodeN1_n1nd.get(index_ae,index_be,index_d); 324 final double backpressure = routing_gradient_gammaStep.getDouble() * (thisQueueSize - numPacketsToForward_nd[index_ae][index_d] - neighborQueueSize); 325 if (backpressure > highestBackpressure) { indexDemandToTransmitPacket = index_d; highestBackpressure = backpressure; } 326 } 327 if (highestBackpressure <= 0) break; // no more queues will be empty, not enough backpressures 328 329 numPacketsToForward_ed[index_e][indexDemandToTransmitPacket] ++; 330 numPacketsToForward_nd[index_ae][indexDemandToTransmitPacket] ++; 331 } 332 } 333 334 /* Apply the forwarding decisions */ 335 for (int index_e = 0 ; index_e < E ; index_e ++) 336 { 337 final Link e = currentNetPlan.getLink (index_e); 338 final Node a_e = e.getOriginNode(); 339 final Node b_e = e.getDestinationNode(); 340 final int index_ae = a_e.getIndex (); 341 final int index_be = b_e.getIndex (); 342 for (int index_d = 0 ; index_d < D ; index_d ++) 343 { 344 final Demand d = currentNetPlan.getDemand (index_d); 345 final int numPackets_ed = numPacketsToForward_ed [index_e][index_d]; 346 347 for (int cont = 0 ; cont < numPackets_ed ; cont ++) 348 { 349 //System.out.println("Pop e: " + e + ", d: " + d + ", numpack quque: " + this.ctlQueue_nd.get(Pair.of(a_e,d)).size() ); 350 351 PacketInfo p = this.ctlQueue_nd.get(Pair.of(a_e,d)).pop(); 352 ctlNumPacketsQueue_nd [index_ae][index_d] --; 353 if (this.ctlQueue_nd.get(Pair.of(a_e,d)).size() != ctlNumPacketsQueue_nd [index_ae][index_d]) throw new RuntimeException ("Bad"); 354 p.seqLinks.add(e); // update the seq of links 355 if (b_e == d.getEgressNode()) 356 { 357 /* Packet arrived to destination: update the statistics */ 358 Route route = stat_mapSeqLinks2RouteId.get(p.seqLinks); 359 360 /* If the route does not exist, create it */ 361 if (route == null) 362 { 363 route = this.currentNetPlan.addRoute(d, 0.0 , 0.0 , p.seqLinks, null); 364 stat_mapSeqLinks2RouteId.put(p.seqLinks , route); 365 stat_mapRouteId2CarriedPacketsLastInterval.put(route, 1); 366 } 367 else 368 { 369 stat_mapRouteId2CarriedPacketsLastInterval.put(route, stat_mapRouteId2CarriedPacketsLastInterval.get(route) + 1); 370 this.stat_accumNumReceivedPackets_d [index_d] ++; 371 } 372 } 373 else 374 { 375 /* Not the end node => put it in the next queue */ 376 this.ctlQueue_nd.get(Pair.of(b_e,d)).addLast(p); 377 ctlNumPacketsQueue_nd [index_be][index_d] ++; 378 } 379 } 380 } 381 382 } 383 384 /* Update the statistics of accum number of packets in a queue */ 385 for (int index_n = 0 ; index_n < N ; index_n ++) 386 for (int index_d = 0 ; index_d < D ; index_d ++) 387 stat_accumNumQeueusPackets_nd [index_n][index_d] += this.ctlNumPacketsQueue_nd [index_n][index_d]; 388 389 this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1)); 390 391 if (time > this.routing_maxNumberOfN2PStatisticsIntervals.getInt() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * this.routing_fixedPacketDurationAndSchedulingInterval.getDouble()) { this.endSimulation (); } 392 393 break; 394 } 395 396 case CC_UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available 397 { 398 final Demand dMe = (Demand) event.getEventObject(); 399 final double new_hd = computeHdFromPrices (dMe); 400 dMe.setOfferedTraffic(new_hd); 401 402 final double updateTime = cc_update_isSynchronous.getBoolean()? time + cc_update_averageInterUpdateTime.getDouble() : Math.max(time , time + cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 403 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE, dMe)); 404 405 this.stat_traceOf_hd.add(time, currentNetPlan.getVectorDemandOfferedTraffic()); 406 407 if (time > this.cc_simulation_maxNumberOfUpdateIntervals.getDouble() * this.cc_update_averageInterUpdateTime.getDouble()) { this.endSimulation (); } 408 409 break; 410 } 411 412 default: throw new RuntimeException ("Unexpected received event"); 413 } 414 415 416 } 417 418 public String finish (StringBuilder st , double simTime) 419 { 420 System.out.println("stat_accumNumGeneratedPackets_d: " + Arrays.toString(stat_accumNumGeneratedPackets_d)); 421 System.out.println("stat_accumNumReceivedPackets_d: " + Arrays.toString(stat_accumNumReceivedPackets_d)); 422 System.out.println("transmittedNotReceived: " + Arrays.toString(IntUtils.substract(stat_accumNumGeneratedPackets_d, stat_accumNumReceivedPackets_d))); 423 424 /* If no output file, return */ 425 if (simulation_outFileNameRoot.getString().equals("")) return null; 426 427 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), new double [] { computeObjectiveFunctionFromNetPlan (optNetPlan.getFirst()) } ); 428 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_xp.txt"), optNetPlan.getFirst().getVectorRouteCarriedTraffic()); 429 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), optNetPlan.getFirst().getVectorDemandOfferedTraffic()); 430 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_qnd.txt"), DoubleUtils.mult(optNetPlan.getSecond(), 1/routing_gradient_gammaStep.getDouble())); 431 this.stat_traceOf_queueSizes.printToFile(new File (simulation_outFileNameRoot.getString() + "_qnd.txt")); 432 this.stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt")); 433 this.stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt")); 434 for (Pair<Double,Object> sample : this.stat_traceOf_xp.getList()) 435 { 436 Map<Long,Double> x_p = (Map<Long,Double>) sample.getSecond (); 437 for (long r: this.currentNetPlan.getRouteIds()) 438 if (!x_p.containsKey(r)) x_p.put(r , 0.0); 439 } 440 this.stat_traceOf_xp.printToFile(new File (simulation_outFileNameRoot.getString() + "_xp.txt")); 441 this.stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt")); 442 443 444 return null; 445 } 446 447 private Pair<NetPlan,double[][]> computeOptimumSolution () 448 { 449 /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */ 450 final int D = this.currentNetPlan.getNumberOfDemands(); 451 final int E = this.currentNetPlan.getNumberOfLinks(); 452 final DoubleMatrix1D u_e = this.currentNetPlan.getVectorLinkCapacity(); 453 454 455 OptimizationProblem op = new OptimizationProblem(); 456 op.addDecisionVariable("x_de", false , new int[] { D , E }, 0 , u_e.getMaxLocation() [0]); 457 op.addDecisionVariable("h_d", false, new int[] {1, D}, cc_control_minHd.getDouble() , cc_control_maxHd.getDouble()); 458 459 op.setInputParameter("u_e", u_e , "row"); 460 op.setInputParameter("alpha", this.cc_control_fairnessFactor.getDouble()); 461 462 /* Set some input parameters */ 463 /* Sets the objective function */ 464 if (cc_control_fairnessFactor.getDouble() == 1) 465 op.setObjectiveFunction("maximize", "sum(ln(h_d))"); 466 else if (cc_control_fairnessFactor.getDouble() == 0) 467 op.setObjectiveFunction("maximize", "sum(h_d)"); 468 else 469 op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))"); 470 471 op.setInputParameter("A_ne", currentNetPlan.getMatrixNodeLinkIncidence()); 472 op.setInputParameter("A_nd", currentNetPlan.getMatrixNodeDemandIncidence()); 473 op.addConstraint("A_ne * (x_de') >= A_nd * diag(h_d)" , "flowConservationConstraints"); // the flow-conservation constraints (NxD constraints) 474 op.addConstraint("sum(x_de,1) <= u_e"); // the capacity constraints (E constraints) 475 476 /* Call the solver to solve the problem */ 477 op.solve("ipopt"); 478 479 /* If an optimal solution was not found, quit */ 480 if (!op.solutionIsOptimal ()) throw new RuntimeException ("An optimal solution was not found"); 481 482 /* Retrieve the optimum solutions. Convert the bps into Erlangs */ 483 final DoubleMatrix1D h_d_array = op.getPrimalSolution("h_d").view1D (); 484 final DoubleMatrix2D x_de_array = op.getPrimalSolution("x_de").view2D (); 485 final double [][] q_nd_array = (double [][]) op.getMultipliersOfConstraint("flowConservationConstraints").assign(DoubleFunctions.abs).toArray(); 486 487 /* Convert the x_de variables into a set of routes for each demand */ 488 NetPlan np = this.currentNetPlan.copy(); 489 np.removeAllUnicastRoutingInformation(); 490 np.setVectorDemandOfferedTraffic(h_d_array); 491 np.setRoutingFromDemandLinkCarriedTraffic(x_de_array , false , false , null); 492 493 /* Check solution: all traffic is carried, no link oversubscribed */ 494 for (Demand d : np.getDemands()) 495 { 496 final double thisDemand_hd = d.getOfferedTraffic(); 497 if (Math.abs (d.getCarriedTraffic() - d.getOfferedTraffic()) > 1e-3) throw new RuntimeException ("Bad"); 498 if (thisDemand_hd < cc_control_minHd.getDouble() - 1E-3) throw new RuntimeException ("Bad"); 499 if (thisDemand_hd > cc_control_maxHd.getDouble() + 1E-3) throw new RuntimeException ("Bad"); 500 } 501 if (np.getVectorLinkUtilization().getMaxLocation() [0] > 1.001) throw new RuntimeException ("Bad"); 502 503 return Pair.of(np,q_nd_array); 504 } 505 506 507 private class PacketInfo 508 { 509 final Demand d; 510 final double packetCreationTime; 511 LinkedList<Link> seqLinks; 512 PacketInfo (Demand d , double creationTime) { this.d = d; this.packetCreationTime = creationTime; this.seqLinks = new LinkedList<Link> (); } 513 } 514 private int [][] copyOf (int [][] a) { int [][] res = new int [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; } 515 private double [][] copyOf (double [][] a) { double [][] res = new double [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; } 516 private double [][] scaledCopyOf (int [][] a , double mult) { double [][] res = new double [a.length][a[0].length]; for (int cont = 0 ; cont < a.length ; cont ++) for (int cont2 = 0; cont2 < a[cont].length ; cont2 ++) res [cont][cont2] = a[cont][cont2] * mult; return res; } 517 518 519 private double computeObjectiveFunctionFromNetPlan (NetPlan np) 520 { 521 double objFunc = 0; 522 for (Demand d : np.getDemands()) 523 { 524 final double h_d = d.getOfferedTraffic(); 525 objFunc += (this.cc_control_fairnessFactor.getDouble() == 1)? Math.log(h_d) : Math.pow(h_d, 1-this.cc_control_fairnessFactor.getDouble()) / (1-this.cc_control_fairnessFactor.getDouble()); 526 } 527 return objFunc; 528 } 529 530 private double computeHdFromPrices (Demand d) 531 { 532 /* compute the demand price as weighted sum in the routes of route prices */ 533 final int index_ad = d.getIngressNode().getIndex (); 534 final int index_d = d.getIndex (); 535 final double demandInitialNodeQueueSize = this.routing_numTrafficUnitsOfOnePacket.getDouble() * ((double) this.ctlNumPacketsQueue_nd [index_ad][index_d]) * this.routing_gradient_gammaStep.getDouble(); 536 /* compute the new h_d */ 537 final double new_hd = Math.max(this.cc_control_minHd.getDouble() , Math.min(this.cc_control_maxHd.getDouble(), Math.pow(demandInitialNodeQueueSize, -1/this.cc_control_fairnessFactor.getDouble()))); 538 539 return new_hd; 540 } 541 542 private Map<Long,Double> netPlanRouteCarriedTrafficMap (NetPlan np) { Map<Long,Double> res = new HashMap<Long,Double> (); for (Route r : np.getRoutes ()) res.put (r.getId () , r.getCarriedTraffic()); return res; } 543 544}