001/******************************************************************************* 002 * Copyright (c) 2016 Pablo Pavon Mariņo. 003 * All rights reserved. This program and the accompanying materials 004 * are made available under the terms of the GNU Lesser Public License v2.1 005 * which accompanies this distribution, and is available at 006 * http://www.gnu.org/licenses/lgpl.html 007 ******************************************************************************/ 008package com.net2plan.examples.ocnbook.onlineSim; 009 010 011 012 013import java.io.File; 014import java.io.FileOutputStream; 015import java.io.PrintStream; 016import java.util.Arrays; 017import java.util.HashMap; 018import java.util.LinkedList; 019import java.util.List; 020import java.util.Map; 021import java.util.Random; 022 023import cern.colt.matrix.tdouble.DoubleMatrix1D; 024import cern.colt.matrix.tdouble.DoubleMatrix2D; 025import cern.colt.matrix.tint.IntFactory3D; 026import cern.colt.matrix.tint.IntMatrix3D; 027import cern.jet.math.tdouble.DoubleFunctions; 028import cern.jet.random.tdouble.Poisson; 029 030import com.jom.OptimizationProblem; 031import com.net2plan.interfaces.networkDesign.Demand; 032import com.net2plan.interfaces.networkDesign.Link; 033import com.net2plan.interfaces.networkDesign.Net2PlanException; 034import com.net2plan.interfaces.networkDesign.NetPlan; 035import com.net2plan.interfaces.networkDesign.Node; 036import com.net2plan.interfaces.networkDesign.Route; 037import com.net2plan.interfaces.simulation.IEventProcessor; 038import com.net2plan.interfaces.simulation.SimEvent; 039import com.net2plan.utils.Constants.RoutingType; 040import com.net2plan.utils.DoubleUtils; 041import com.net2plan.utils.InputParameter; 042import com.net2plan.utils.IntUtils; 043import com.net2plan.utils.Pair; 044import com.net2plan.utils.TimeTrace; 045import com.net2plan.utils.Triple; 046 047/** 048 * This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources. 049 * 050 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 051 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 052 * The time evolution of different metrics can be stored in output files, for later processing. 053 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m">{@code fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m}</a> MATLAB file used for generating the graph/s of the case study in the 054 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm. 055 * 056 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator. 057 * 058 * @net2plan.keywords Bandwidth assignment (BA), Flow assignment (FA), Backpressure routing, Distributed algorithm, Dual decomposition 059 * @net2plan.ocnbooksections Section 11.4 060 * @net2plan.inputParameters 061 * @author Pablo Pavon-Marino 062 */ 063public class Online_evProc_congControlAndBackpressureRoutingDualDecomp extends IEventProcessor 064{ 065 private static PrintStream getNulFile () { try { return new PrintStream (new FileOutputStream ("NUL") , false); } catch (Exception e) {e.printStackTrace(); throw new RuntimeException ("Not NUL file"); } } 066 private PrintStream log = getNulFile (); //System.err;//new PrintStream (new FileOutputStream ("NUL") , true); 067 068 /******************** MAC ****************/ 069 static final int CC_UPDATE_WAKEUPTOUPDATE = 402; 070 071 private InputParameter cc_update_isSynchronous = new InputParameter ("cc_update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state"); 072 private InputParameter cc_update_averageInterUpdateTime = new InputParameter ("cc_update_averageInterUpdateTime", 10.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true); 073 private InputParameter cc_update_maxFluctuationInterUpdateTime = new InputParameter ("cc_update_maxFluctuationInterUpdateTime", 5.0 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 074 075 private InputParameter cc_simulation_maxNumberOfUpdateIntervals = new InputParameter ("cc_simulation_maxNumberOfUpdateIntervals", 1000.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true); 076 private InputParameter cc_control_minHd = new InputParameter ("cc_control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 077 private InputParameter cc_control_maxHd = new InputParameter ("cc_control_maxHd", 25 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true); 078 private InputParameter cc_control_fairnessFactor = new InputParameter ("cc_control_fairnessFactor_1", 2.0 , "Fairness factor in utility function of congestion control for demands" , 0 , true , Double.MAX_VALUE , true); 079 080 081 private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator"); 082 private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "crossLayerCongControlBackpressureRoutingDualDecomp" , "Root of the file name to be used in the output files. If blank, no output"); 083 084 private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages"); 085 private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true); 086 private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true); 087 private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true); 088 private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true); 089 private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true); 090 091 private InputParameter routing_fixedPacketDurationAndSchedulingInterval = new InputParameter ("routing_fixedPacketDurationAndSchedulingInterval", 1.0 , "Fixed slot size (synchronized all links)" , 0 , false , Double.MAX_VALUE , true); 092 private InputParameter routing_numTrafficUnitsOfOnePacket = new InputParameter ("routing_numTrafficUnitsOfOnePacket", 1.0 , "One packet in one slot time is this traffic" , 0 , false , Double.MAX_VALUE , true); 093 private InputParameter routing_statNumSchedSlotBetweenN2PRecomputing = new InputParameter ("routing_statNumSchedSlotBetweenN2PRecomputing", (int) 100 , "Number of scheduling slots between two N2P update and storing a stat trace" , 1 , Integer.MAX_VALUE); 094 private InputParameter routing_maxNumberOfN2PStatisticsIntervals = new InputParameter ("routing_maxNumberOfN2PStatisticsIntervals", (int) 15 , "Simulation length in number of scheduling intervals" , 1 , Integer.MAX_VALUE); 095 096 private InputParameter routing_gradient_gammaStep = new InputParameter ("routing_gradient_gammaStep", 0.000005 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true); 097 098 /* Optimum solution */ 099 private Pair<NetPlan,double[][]> optNetPlan; 100 101 /******************* ROUTING ****/ 102 103 private Random rng; 104 int N,E,D; 105 106 static final int SIGNALING_WAKEUPTOSENDMESSAGE = 300; // A NODE SIGNAL QUEUES TO NEIGHBOR NODES 107 static final int SIGNALING_RECEIVEDMESSAGE = 301; // A NODE RECEIVED SIGNAL QUEUES TO NEIGHBOR NODES 108 static final int UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS = 302; // ALL NODES SCHEDULE ONE PACKET PER LINK 109 static final int UPDATE_STATISTICTRACES = 303; // STAT UPDATE OF N2P 110 111 112 private NetPlan currentNetPlan; 113 114 private Map<List<Link>,Route> stat_mapSeqLinks2RouteId; 115 private Map<Route,Integer> stat_mapRouteId2CarriedPacketsLastInterval; 116 private int [] stat_accumNumGeneratedPackets_d; 117 private int [] stat_accumNumReceivedPackets_d; 118 private int [][] stat_accumNumQeueusPackets_nd; 119 120// private TimeTrace traceOf_objFunction; 121// private TimeTrace traceOf_ye; 122 private TimeTrace stat_traceOf_hd; 123 private TimeTrace stat_traceOf_xp; // traces are taken from netPlan, when it is updated (stat_n2pRecomputingInterval) 124 private TimeTrace stat_traceOf_objFunction; 125 private TimeTrace stat_traceOf_ye; 126 private TimeTrace stat_traceOf_queueSizes; 127 128 129 private IntMatrix3D ctlMostUpdatedQndKnownByNodeN1_n1nd; // sparse for memory efficiency 130 private int [][] ctlNumPacketsQueue_nd; 131 private Map<Pair<Node,Demand>,LinkedList<PacketInfo>> ctlQueue_nd; 132 133 @Override 134 public String getDescription() 135 { 136 return "This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources."; 137 } 138 139 @Override 140 public List<Triple<String, String, String>> getParameters() 141 { 142 /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */ 143 return InputParameter.getInformationAllInputParameterFieldsOfObject(this); 144 } 145 146 @Override 147 public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters) 148 { 149 /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */ 150 InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters); 151 152 this.currentNetPlan = currentNetPlan; 153 if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks"); 154 155 /* Update the indexes. The set of nodes, links and demands cannot change during the algorithm */ 156 this.N = this.currentNetPlan.getNumberOfNodes(); 157 this.E = this.currentNetPlan.getNumberOfLinks(); 158 this.D = this.currentNetPlan.getNumberOfDemands(); 159 this.rng = new Random (simulation_randomSeed.getLong()); 160 if ((E == 0) || (D == 0)) throw new Net2PlanException ("The input design should have links and demands"); 161 162 /* Sets the initial routing, with all the traffic balanced equally in all the paths of the demand */ 163 this.stat_mapSeqLinks2RouteId = new HashMap<List<Link>,Route> (); 164 this.stat_mapRouteId2CarriedPacketsLastInterval = new HashMap<Route,Integer> (); 165 166 /* Computes the optimum NetPlan */ 167 this.currentNetPlan.removeAllUnicastRoutingInformation(); 168 this.currentNetPlan.setRoutingType(RoutingType.SOURCE_ROUTING); 169 170 this.currentNetPlan.removeAllRoutes(); 171 this.optNetPlan = computeOptimumSolution (); 172 /* Add the routes in the optimum solution to the netPlan, with zero traffic. For having teh same IDs later */ 173 this.currentNetPlan.copyFrom(optNetPlan.getFirst()); 174 for (Route r: currentNetPlan.getRoutes()) 175 { 176 r.setCarriedTraffic(0.0 , 0.0); 177 this.stat_mapSeqLinks2RouteId.put(r.getSeqLinksRealPath(),r); 178 this.stat_mapRouteId2CarriedPacketsLastInterval.put(r, 0); 179 } 180 181 182 this.ctlQueue_nd = new HashMap<Pair<Node,Demand>,LinkedList<PacketInfo>> (); 183 for (Node n: this.currentNetPlan.getNodes()) 184 for (Demand d: this.currentNetPlan.getDemands()) 185 { ctlQueue_nd.put(Pair.of(n, d), new LinkedList<PacketInfo> ()); } 186 187 this.ctlMostUpdatedQndKnownByNodeN1_n1nd = IntFactory3D.sparse.make(N, N, D); 188 this.ctlNumPacketsQueue_nd = new int [N][D]; 189 190 this.stat_accumNumGeneratedPackets_d = new int [D]; 191 this.stat_accumNumReceivedPackets_d = new int [D]; 192 this.stat_accumNumQeueusPackets_nd= new int [N][D]; 193 194 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 195 this.scheduleEvent(new SimEvent (routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * routing_fixedPacketDurationAndSchedulingInterval.getDouble(), SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1)); 196 this.scheduleEvent(new SimEvent (routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1)); 197 for (Node n : currentNetPlan.getNodes()) 198 { 199 final double signalingTime = signaling_isSynchronous.getBoolean()? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 200 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , n)); 201 } 202 203 /* Intialize the traces */ 204 this.stat_traceOf_xp = new TimeTrace (); 205 this.stat_traceOf_ye = new TimeTrace (); 206 this.stat_traceOf_queueSizes = new TimeTrace (); 207 this.stat_traceOf_objFunction = new TimeTrace (); 208 this.stat_traceOf_hd = new TimeTrace (); 209 this.stat_traceOf_xp.add(0.0 , netPlanRouteCarriedTrafficMap (this.currentNetPlan)); 210 this.stat_traceOf_ye.add(0.0, currentNetPlan.getVectorLinkTotalCarriedTraffic()); 211 this.stat_traceOf_queueSizes.add(0.0, copyOf(this.ctlNumPacketsQueue_nd)); 212 this.stat_traceOf_objFunction.add(0.0, computeObjectiveFunctionFromNetPlan(this.currentNetPlan)); 213 this.stat_traceOf_hd.add(0.0, currentNetPlan.getVectorDemandOfferedTraffic()); 214 215 /* CONG CONTROL */ 216 /* Set the initial prices in the links: 1.0 */ 217 /* Initialize the information each demand knows of the prices of all the links */ 218 /* Compute the traffic each demand injects, set it in net2plan, and tell routing layer there was an update in this */ 219 for (Demand d : currentNetPlan.getDemands()) 220 d.setOfferedTraffic(this.computeHdFromPrices(d)); 221 222 /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */ 223 for (Demand d : currentNetPlan.getDemands()) 224 { 225 final double updateTime = (cc_update_isSynchronous.getBoolean())? cc_update_averageInterUpdateTime.getDouble() : Math.max(0 , cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 226 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE , d)); 227 } 228 } 229 230 @Override 231 public void processEvent(NetPlan currentNetPlan, SimEvent event) 232 { 233 final double time = event.getEventTime(); 234 switch (event.getEventType()) 235 { 236 case UPDATE_STATISTICTRACES: 237 { 238 for (Route r : this.currentNetPlan.getRoutes()) 239 { 240 final int numPacketsArrivedDestinationLastInterval = stat_mapRouteId2CarriedPacketsLastInterval.get(r); 241 final double trafficArrivedDestinationLastInterval = numPacketsArrivedDestinationLastInterval * routing_numTrafficUnitsOfOnePacket.getDouble() / (routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt ()); 242 r.setCarriedTraffic(trafficArrivedDestinationLastInterval , trafficArrivedDestinationLastInterval); 243 /* Set zero in the counter of packets */ 244 stat_mapRouteId2CarriedPacketsLastInterval.put(r,0); 245 } 246 247 /* Update the traces */ 248 this.stat_traceOf_xp.add(time , netPlanRouteCarriedTrafficMap (this.currentNetPlan)); 249 this.stat_traceOf_ye.add(time, currentNetPlan.getVectorLinkTotalCarriedTraffic()); 250 this.stat_traceOf_objFunction.add(time, computeObjectiveFunctionFromNetPlan(this.currentNetPlan)); 251 final double scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic = this.routing_numTrafficUnitsOfOnePacket.getDouble() / this.routing_statNumSchedSlotBetweenN2PRecomputing.getInt (); 252 /* We store the average queue sizes in traffic units */ 253 this.stat_traceOf_queueSizes.add(time, scaledCopyOf(this.stat_accumNumQeueusPackets_nd , scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic)); 254 this.stat_accumNumQeueusPackets_nd = new int [N][D]; // reset the count 255 this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1)); 256 break; 257 } 258 259 case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nd for any demand 260 { 261 final Triple<Node,Node,int []> signalInfo = (Triple<Node,Node,int []>) event.getEventObject(); 262 final Node nMe = signalInfo.getFirst(); 263 final int index_nIdMe = nMe.getIndex (); 264 final Node nNeighbor = signalInfo.getSecond(); 265 final int index_nNeighbor = nNeighbor.getIndex (); 266 final int [] receivedInfo_q_d = signalInfo.getThird(); 267 for (int index_d = 0 ; index_d < D ; index_d ++) 268 ctlMostUpdatedQndKnownByNodeN1_n1nd.set(index_nIdMe, index_nNeighbor, index_d, receivedInfo_q_d[index_d]); 269 break; 270 271 } 272 273 case SIGNALING_WAKEUPTOSENDMESSAGE: // A node broadcasts signaling info to its 1 hop neighbors 274 { 275 final Node nMe = (Node) event.getEventObject(); 276 final int index_nMe = nMe.getIndex (); 277 278 log.println("t=" + time + ": ROUTING_NODEWAKEUPTO_TRANSMITSIGNAL node " + nMe); 279 280 /* Create the info I will signal */ 281 int [] infoToSignal_q_d = Arrays.copyOf(ctlNumPacketsQueue_nd [index_nMe], D); 282 283 /* Send the events of the signaling information messages to incoming neighbors */ 284 for (Node n : nMe.getInNeighbors()) 285 { 286 if (rng.nextDouble() >= this.signaling_signalingLossProbability.getDouble()) // the signaling may be lost => lost to all nodes 287 { 288 final double signalingReceptionTime = time + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5)); 289 this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Triple.of(n , nMe , infoToSignal_q_d))); 290 } 291 } 292 293 /* Re-schedule when to wake up again */ 294 final double signalingTime = signaling_isSynchronous.getBoolean()? time + signaling_averageInterMessageTime.getDouble() : Math.max(time , time + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5)); 295 this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , nMe)); 296 break; 297 } 298 299 case UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS: // a node updates its p_n, p_e values, using most updated info available 300 { 301 log.println("t=" + time + ": UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS"); 302 303 /* First, create traffic of the demands. A constant amount */ 304 for (Demand d : this.currentNetPlan.getDemands()) 305 { 306 final double avgNumPackets_d = d.getOfferedTraffic() / routing_numTrafficUnitsOfOnePacket.getDouble(); 307 final int numPackets_d = Poisson.staticNextInt(avgNumPackets_d); 308 final Node a_d = d.getIngressNode(); 309 LinkedList<PacketInfo> q = this.ctlQueue_nd.get(Pair.of(a_d, d)); 310 for (int cont = 0 ; cont < numPackets_d ; cont ++) 311 q.add (new PacketInfo (d , time)); 312 this.ctlNumPacketsQueue_nd [a_d.getIndex ()][d.getIndex ()] += numPackets_d; 313 this.stat_accumNumGeneratedPackets_d [d.getIndex ()] += numPackets_d; 314 } 315 316 /* Forwarding decisions for each link. Each link can transmit as many packets as its capacity divided by trafic per packet factor (floor), during this scheduling */ 317 int [][] numPacketsToForward_ed = new int [E][D]; 318 int [][] numPacketsToForward_nd = new int [N][D]; 319 for (int index_e = 0 ; index_e < E ; index_e ++) 320 { 321 final Link e = currentNetPlan.getLink (index_e); 322 final int maxNumPacketsToTransmit = (int) Math.floor(e.getCapacity() / this.routing_numTrafficUnitsOfOnePacket.getDouble()); 323 final Node a_e = e.getOriginNode(); 324 final Node b_e = e.getDestinationNode(); 325 final int index_ae = a_e.getIndex (); 326 final int index_be = b_e.getIndex (); 327 for (int cont = 0 ; cont < maxNumPacketsToTransmit ; cont ++) 328 { 329 /* Take the queue with higher */ 330 double highestBackpressure = -1; 331 int indexDemandToTransmitPacket = -1; 332 for (int index_d = 0 ; index_d < D ; index_d ++) 333 { 334 final Demand d = currentNetPlan.getDemand (index_d); 335 final int thisQueueSize = ctlNumPacketsQueue_nd [index_ae][index_d]; 336 if (thisQueueSize != ctlQueue_nd.get(Pair.of(a_e, d)).size ()) throw new RuntimeException ("Bad"); 337 final int neighborQueueSize = ctlMostUpdatedQndKnownByNodeN1_n1nd.get(index_ae,index_be,index_d); 338 final double backpressure = routing_gradient_gammaStep.getDouble() * (thisQueueSize - numPacketsToForward_nd[index_ae][index_d] - neighborQueueSize); 339 if (backpressure > highestBackpressure) { indexDemandToTransmitPacket = index_d; highestBackpressure = backpressure; } 340 } 341 if (highestBackpressure <= 0) break; // no more queues will be empty, not enough backpressures 342 343 numPacketsToForward_ed[index_e][indexDemandToTransmitPacket] ++; 344 numPacketsToForward_nd[index_ae][indexDemandToTransmitPacket] ++; 345 } 346 } 347 348 /* Apply the forwarding decisions */ 349 for (int index_e = 0 ; index_e < E ; index_e ++) 350 { 351 final Link e = currentNetPlan.getLink (index_e); 352 final Node a_e = e.getOriginNode(); 353 final Node b_e = e.getDestinationNode(); 354 final int index_ae = a_e.getIndex (); 355 final int index_be = b_e.getIndex (); 356 for (int index_d = 0 ; index_d < D ; index_d ++) 357 { 358 final Demand d = currentNetPlan.getDemand (index_d); 359 final int numPackets_ed = numPacketsToForward_ed [index_e][index_d]; 360 361 for (int cont = 0 ; cont < numPackets_ed ; cont ++) 362 { 363 //System.out.println("Pop e: " + e + ", d: " + d + ", numpack quque: " + this.ctlQueue_nd.get(Pair.of(a_e,d)).size() ); 364 365 PacketInfo p = this.ctlQueue_nd.get(Pair.of(a_e,d)).pop(); 366 ctlNumPacketsQueue_nd [index_ae][index_d] --; 367 if (this.ctlQueue_nd.get(Pair.of(a_e,d)).size() != ctlNumPacketsQueue_nd [index_ae][index_d]) throw new RuntimeException ("Bad"); 368 p.seqLinks.add(e); // update the seq of links 369 if (b_e == d.getEgressNode()) 370 { 371 /* Packet arrived to destination: update the statistics */ 372 Route route = stat_mapSeqLinks2RouteId.get(p.seqLinks); 373 374 /* If the route does not exist, create it */ 375 if (route == null) 376 { 377 route = this.currentNetPlan.addRoute(d, 0.0 , 0.0 , p.seqLinks, null); 378 stat_mapSeqLinks2RouteId.put(p.seqLinks , route); 379 stat_mapRouteId2CarriedPacketsLastInterval.put(route, 1); 380 } 381 else 382 { 383 stat_mapRouteId2CarriedPacketsLastInterval.put(route, stat_mapRouteId2CarriedPacketsLastInterval.get(route) + 1); 384 this.stat_accumNumReceivedPackets_d [index_d] ++; 385 } 386 } 387 else 388 { 389 /* Not the end node => put it in the next queue */ 390 this.ctlQueue_nd.get(Pair.of(b_e,d)).addLast(p); 391 ctlNumPacketsQueue_nd [index_be][index_d] ++; 392 } 393 } 394 } 395 396 } 397 398 /* Update the statistics of accum number of packets in a queue */ 399 for (int index_n = 0 ; index_n < N ; index_n ++) 400 for (int index_d = 0 ; index_d < D ; index_d ++) 401 stat_accumNumQeueusPackets_nd [index_n][index_d] += this.ctlNumPacketsQueue_nd [index_n][index_d]; 402 403 this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1)); 404 405 if (time > this.routing_maxNumberOfN2PStatisticsIntervals.getInt() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * this.routing_fixedPacketDurationAndSchedulingInterval.getDouble()) { this.endSimulation (); } 406 407 break; 408 } 409 410 case CC_UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available 411 { 412 final Demand dMe = (Demand) event.getEventObject(); 413 final double new_hd = computeHdFromPrices (dMe); 414 dMe.setOfferedTraffic(new_hd); 415 416 final double updateTime = cc_update_isSynchronous.getBoolean()? time + cc_update_averageInterUpdateTime.getDouble() : Math.max(time , time + cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5)); 417 this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE, dMe)); 418 419 this.stat_traceOf_hd.add(time, currentNetPlan.getVectorDemandOfferedTraffic()); 420 421 if (time > this.cc_simulation_maxNumberOfUpdateIntervals.getDouble() * this.cc_update_averageInterUpdateTime.getDouble()) { this.endSimulation (); } 422 423 break; 424 } 425 426 default: throw new RuntimeException ("Unexpected received event"); 427 } 428 429 430 } 431 432 public String finish (StringBuilder st , double simTime) 433 { 434 System.out.println("stat_accumNumGeneratedPackets_d: " + Arrays.toString(stat_accumNumGeneratedPackets_d)); 435 System.out.println("stat_accumNumReceivedPackets_d: " + Arrays.toString(stat_accumNumReceivedPackets_d)); 436 System.out.println("transmittedNotReceived: " + Arrays.toString(IntUtils.substract(stat_accumNumGeneratedPackets_d, stat_accumNumReceivedPackets_d))); 437 438 /* If no output file, return */ 439 if (simulation_outFileNameRoot.getString().equals("")) return null; 440 441 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), new double [] { computeObjectiveFunctionFromNetPlan (optNetPlan.getFirst()) } ); 442 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_xp.txt"), optNetPlan.getFirst().getVectorRouteCarriedTraffic()); 443 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), optNetPlan.getFirst().getVectorDemandOfferedTraffic()); 444 TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_qnd.txt"), DoubleUtils.mult(optNetPlan.getSecond(), 1/routing_gradient_gammaStep.getDouble())); 445 this.stat_traceOf_queueSizes.printToFile(new File (simulation_outFileNameRoot.getString() + "_qnd.txt")); 446 this.stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt")); 447 this.stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt")); 448 for (Pair<Double,Object> sample : this.stat_traceOf_xp.getList()) 449 { 450 Map<Long,Double> x_p = (Map<Long,Double>) sample.getSecond (); 451 for (long r: this.currentNetPlan.getRouteIds()) 452 if (!x_p.containsKey(r)) x_p.put(r , 0.0); 453 } 454 this.stat_traceOf_xp.printToFile(new File (simulation_outFileNameRoot.getString() + "_xp.txt")); 455 this.stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt")); 456 457 458 return null; 459 } 460 461 private Pair<NetPlan,double[][]> computeOptimumSolution () 462 { 463 /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */ 464 final int D = this.currentNetPlan.getNumberOfDemands(); 465 final int E = this.currentNetPlan.getNumberOfLinks(); 466 final DoubleMatrix1D u_e = this.currentNetPlan.getVectorLinkCapacity(); 467 468 469 OptimizationProblem op = new OptimizationProblem(); 470 op.addDecisionVariable("x_de", false , new int[] { D , E }, 0 , u_e.getMaxLocation() [0]); 471 op.addDecisionVariable("h_d", false, new int[] {1, D}, cc_control_minHd.getDouble() , cc_control_maxHd.getDouble()); 472 473 op.setInputParameter("u_e", u_e , "row"); 474 op.setInputParameter("alpha", this.cc_control_fairnessFactor.getDouble()); 475 476 /* Set some input parameters */ 477 /* Sets the objective function */ 478 if (cc_control_fairnessFactor.getDouble() == 1) 479 op.setObjectiveFunction("maximize", "sum(ln(h_d))"); 480 else if (cc_control_fairnessFactor.getDouble() == 0) 481 op.setObjectiveFunction("maximize", "sum(h_d)"); 482 else 483 op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))"); 484 485 op.setInputParameter("A_ne", currentNetPlan.getMatrixNodeLinkIncidence()); 486 op.setInputParameter("A_nd", currentNetPlan.getMatrixNodeDemandIncidence()); 487 op.addConstraint("A_ne * (x_de') >= A_nd * diag(h_d)" , "flowConservationConstraints"); // the flow-conservation constraints (NxD constraints) 488 op.addConstraint("sum(x_de,1) <= u_e"); // the capacity constraints (E constraints) 489 490 /* Call the solver to solve the problem */ 491 op.solve("ipopt"); 492 493 /* If an optimal solution was not found, quit */ 494 if (!op.solutionIsOptimal ()) throw new RuntimeException ("An optimal solution was not found"); 495 496 /* Retrieve the optimum solutions. Convert the bps into Erlangs */ 497 final DoubleMatrix1D h_d_array = op.getPrimalSolution("h_d").view1D (); 498 final DoubleMatrix2D x_de_array = op.getPrimalSolution("x_de").view2D (); 499 final double [][] q_nd_array = (double [][]) op.getMultipliersOfConstraint("flowConservationConstraints").assign(DoubleFunctions.abs).toArray(); 500 501 /* Convert the x_de variables into a set of routes for each demand */ 502 NetPlan np = this.currentNetPlan.copy(); 503 np.removeAllUnicastRoutingInformation(); 504 np.setVectorDemandOfferedTraffic(h_d_array); 505 np.setRoutingFromDemandLinkCarriedTraffic(x_de_array , false , false); 506 507 /* Check solution: all traffic is carried, no link oversubscribed */ 508 for (Demand d : np.getDemands()) 509 { 510 final double thisDemand_hd = d.getOfferedTraffic(); 511 if (Math.abs (d.getCarriedTraffic() - d.getOfferedTraffic()) > 1e-3) throw new RuntimeException ("Bad"); 512 if (thisDemand_hd < cc_control_minHd.getDouble() - 1E-3) throw new RuntimeException ("Bad"); 513 if (thisDemand_hd > cc_control_maxHd.getDouble() + 1E-3) throw new RuntimeException ("Bad"); 514 } 515 if (np.getVectorLinkUtilizationIncludingProtectionSegments().getMaxLocation() [0] > 1.001) throw new RuntimeException ("Bad"); 516 517 return Pair.of(np,q_nd_array); 518 } 519 520 521 private class PacketInfo 522 { 523 final Demand d; 524 final double packetCreationTime; 525 LinkedList<Link> seqLinks; 526 PacketInfo (Demand d , double creationTime) { this.d = d; this.packetCreationTime = creationTime; this.seqLinks = new LinkedList<Link> (); } 527 } 528 private int [][] copyOf (int [][] a) { int [][] res = new int [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; } 529 private double [][] copyOf (double [][] a) { double [][] res = new double [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; } 530 private double [][] scaledCopyOf (int [][] a , double mult) { double [][] res = new double [a.length][a[0].length]; for (int cont = 0 ; cont < a.length ; cont ++) for (int cont2 = 0; cont2 < a[cont].length ; cont2 ++) res [cont][cont2] = a[cont][cont2] * mult; return res; } 531 532 533 private double computeObjectiveFunctionFromNetPlan (NetPlan np) 534 { 535 double objFunc = 0; 536 for (Demand d : np.getDemands()) 537 { 538 final double h_d = d.getOfferedTraffic(); 539 objFunc += (this.cc_control_fairnessFactor.getDouble() == 1)? Math.log(h_d) : Math.pow(h_d, 1-this.cc_control_fairnessFactor.getDouble()) / (1-this.cc_control_fairnessFactor.getDouble()); 540 } 541 return objFunc; 542 } 543 544 private double computeHdFromPrices (Demand d) 545 { 546 /* compute the demand price as weighted sum in the routes of route prices */ 547 final int index_ad = d.getIngressNode().getIndex (); 548 final int index_d = d.getIndex (); 549 final double demandInitialNodeQueueSize = this.routing_numTrafficUnitsOfOnePacket.getDouble() * ((double) this.ctlNumPacketsQueue_nd [index_ad][index_d]) * this.routing_gradient_gammaStep.getDouble(); 550 /* compute the new h_d */ 551 final double new_hd = Math.max(this.cc_control_minHd.getDouble() , Math.min(this.cc_control_maxHd.getDouble(), Math.pow(demandInitialNodeQueueSize, -1/this.cc_control_fairnessFactor.getDouble()))); 552 553 return new_hd; 554 } 555 556 private Map<Long,Double> netPlanRouteCarriedTrafficMap (NetPlan np) { Map<Long,Double> res = new HashMap<Long,Double> (); for (Route r : np.getRoutes ()) res.put (r.getId () , r.getCarriedTraffic()); return res; } 557 558}