001/*******************************************************************************
002 * Copyright (c) 2017 Pablo Pavon Marino and others.
003 * All rights reserved. This program and the accompanying materials
004 * are made available under the terms of the 2-clause BSD License 
005 * which accompanies this distribution, and is available at
006 * https://opensource.org/licenses/BSD-2-Clause
007 *
008 * Contributors:
009 *     Pablo Pavon Marino and others - initial API and implementation
010 *******************************************************************************/
011package com.net2plan.examples.ocnbook.onlineSim;
012
013
014import cern.colt.matrix.tdouble.DoubleMatrix1D;
015import cern.colt.matrix.tdouble.DoubleMatrix2D;
016import cern.colt.matrix.tint.IntFactory3D;
017import cern.colt.matrix.tint.IntMatrix3D;
018import cern.jet.math.tdouble.DoubleFunctions;
019import cern.jet.random.tdouble.Poisson;
020import com.jom.OptimizationProblem;
021import com.net2plan.interfaces.networkDesign.*;
022import com.net2plan.interfaces.simulation.IEventProcessor;
023import com.net2plan.interfaces.simulation.SimEvent;
024import com.net2plan.utils.Constants.RoutingType;
025import com.net2plan.utils.*;
026
027import java.io.File;
028import java.io.FileOutputStream;
029import java.io.PrintStream;
030import java.util.*;
031
032/** 
033 * This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources.
034 *
035 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 
036 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 
037 * The time evolution of different metrics can be stored in output files, for later processing. 
038 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m">{@code fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m}</a> MATLAB file used for generating the graph/s of the case study in the 
039 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm.
040 * 
041 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator.
042 * 
043 * @net2plan.keywords Bandwidth assignment (BA), Flow assignment (FA), Backpressure routing, Distributed algorithm, Dual decomposition 
044 * @net2plan.ocnbooksections Section 11.4
045 * @net2plan.inputParameters 
046 * @author Pablo Pavon-Marino
047 */
048@SuppressWarnings("unchecked")
049public class Online_evProc_congControlAndBackpressureRoutingDualDecomp extends IEventProcessor
050{
051        private static PrintStream getNulFile () { try { return new PrintStream (new FileOutputStream ("NUL") , false); } catch (Exception e) {e.printStackTrace(); throw new RuntimeException ("Not NUL file"); }   } 
052        private PrintStream log = getNulFile (); //System.err;//new PrintStream (new FileOutputStream ("NUL") , true);
053
054        /******************** MAC ****************/
055        static final int CC_UPDATE_WAKEUPTOUPDATE = 402;
056
057        private InputParameter cc_update_isSynchronous = new InputParameter ("cc_update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state");
058        private InputParameter cc_update_averageInterUpdateTime = new InputParameter ("cc_update_averageInterUpdateTime", 10.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true);
059        private InputParameter cc_update_maxFluctuationInterUpdateTime = new InputParameter ("cc_update_maxFluctuationInterUpdateTime", 5.0 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
060
061        private InputParameter cc_simulation_maxNumberOfUpdateIntervals = new InputParameter ("cc_simulation_maxNumberOfUpdateIntervals", 1000.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true);
062        private InputParameter cc_control_minHd = new InputParameter ("cc_control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true);
063        private InputParameter cc_control_maxHd = new InputParameter ("cc_control_maxHd", 25 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true);
064        private InputParameter cc_control_fairnessFactor = new InputParameter ("cc_control_fairnessFactor_1", 2.0 , "Fairness factor in utility function of congestion control for demands" , 0 , true , Double.MAX_VALUE , true);
065        
066
067        private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator");
068        private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "crossLayerCongControlBackpressureRoutingDualDecomp" , "Root of the file name to be used in the output files. If blank, no output");
069
070        private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages");
071        private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true);
072        private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true);
073        private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true);
074        private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
075        private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true);
076
077        private InputParameter routing_fixedPacketDurationAndSchedulingInterval = new InputParameter ("routing_fixedPacketDurationAndSchedulingInterval", 1.0 , "Fixed slot size (synchronized all links)" , 0 , false , Double.MAX_VALUE , true);
078        private InputParameter routing_numTrafficUnitsOfOnePacket = new InputParameter ("routing_numTrafficUnitsOfOnePacket", 1.0 , "One packet in one slot time is this traffic" , 0 , false , Double.MAX_VALUE , true);
079        private InputParameter routing_statNumSchedSlotBetweenN2PRecomputing = new InputParameter ("routing_statNumSchedSlotBetweenN2PRecomputing", (int) 100 , "Number of scheduling slots between two N2P update and storing a stat trace" , 1 , Integer.MAX_VALUE);
080        private InputParameter routing_maxNumberOfN2PStatisticsIntervals = new InputParameter ("routing_maxNumberOfN2PStatisticsIntervals", (int) 15 , "Simulation length in number of scheduling intervals" , 1 , Integer.MAX_VALUE);
081
082        private InputParameter routing_gradient_gammaStep = new InputParameter ("routing_gradient_gammaStep", 0.000005 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true);
083        
084        /* Optimum solution */
085        private Pair<NetPlan,double[][]> optNetPlan;
086        
087        /******************* ROUTING ****/
088        
089        private Random rng;
090        int N,E,D;
091        
092        static final int SIGNALING_WAKEUPTOSENDMESSAGE = 300; // A NODE SIGNAL QUEUES TO NEIGHBOR NODES
093        static final int SIGNALING_RECEIVEDMESSAGE = 301;  // A NODE RECEIVED SIGNAL QUEUES TO NEIGHBOR NODES
094        static final int UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS = 302; // ALL NODES SCHEDULE ONE PACKET PER LINK
095        static final int UPDATE_STATISTICTRACES = 303; // STAT UPDATE OF N2P
096
097
098        private NetPlan currentNetPlan;
099
100        private Map<List<Link>,Route> stat_mapSeqLinks2RouteId;
101        private Map<Route,Integer> stat_mapRouteId2CarriedPacketsLastInterval;
102        private int [] stat_accumNumGeneratedPackets_d;
103        private int [] stat_accumNumReceivedPackets_d;
104        private int [][] stat_accumNumQeueusPackets_nd;
105        
106//      private TimeTrace traceOf_objFunction;
107//      private TimeTrace traceOf_ye;
108        private TimeTrace stat_traceOf_hd;
109        private TimeTrace stat_traceOf_xp; // traces are taken from netPlan, when it is updated (stat_n2pRecomputingInterval)
110        private TimeTrace stat_traceOf_objFunction;
111        private TimeTrace stat_traceOf_ye;
112        private TimeTrace stat_traceOf_queueSizes;
113
114        
115        private IntMatrix3D ctlMostUpdatedQndKnownByNodeN1_n1nd; // sparse for memory efficiency 
116        private int [][] ctlNumPacketsQueue_nd;
117        private Map<Pair<Node,Demand>,LinkedList<PacketInfo>> ctlQueue_nd; 
118
119        @Override
120        public String getDescription()
121        {
122                return "This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources.";
123        }
124
125        @Override
126        public List<Triple<String, String, String>> getParameters()
127        {
128                /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */
129                return InputParameter.getInformationAllInputParameterFieldsOfObject(this);
130        }
131
132        @Override
133        public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters)
134        {
135                /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */
136                InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters);
137
138                this.currentNetPlan = currentNetPlan;
139                if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks");
140
141                /* Update the indexes. The set of nodes, links and demands cannot change during the algorithm */
142                this.N = this.currentNetPlan.getNumberOfNodes();
143                this.E = this.currentNetPlan.getNumberOfLinks();
144                this.D = this.currentNetPlan.getNumberOfDemands();
145                this.rng = new Random (simulation_randomSeed.getLong());
146                if ((E == 0) || (D == 0)) throw new Net2PlanException ("The input design should have links and demands");
147                
148                /* Sets the initial routing, with all the traffic balanced equally in all the paths of the demand */
149                this.stat_mapSeqLinks2RouteId = new HashMap<List<Link>,Route> ();
150                this.stat_mapRouteId2CarriedPacketsLastInterval = new HashMap<Route,Integer> ();
151
152                /* Computes the optimum NetPlan */
153                this.currentNetPlan.removeAllUnicastRoutingInformation();
154                this.currentNetPlan.setRoutingTypeAllDemands(RoutingType.SOURCE_ROUTING);
155
156                this.currentNetPlan.removeAllRoutes();
157                this.optNetPlan = computeOptimumSolution ();
158                /* Add the routes in the optimum solution to the netPlan, with zero traffic. For having teh same IDs later */
159                this.currentNetPlan.copyFrom(optNetPlan.getFirst());
160                for (Route r: currentNetPlan.getRoutes())
161                {
162                        r.setCarriedTraffic(0.0 , 0.0);
163                        this.stat_mapSeqLinks2RouteId.put(r.getSeqLinks(),r);
164                        this.stat_mapRouteId2CarriedPacketsLastInterval.put(r, 0);
165                }
166                
167
168                this.ctlQueue_nd = new HashMap<Pair<Node,Demand>,LinkedList<PacketInfo>> ();
169                for (Node n: this.currentNetPlan.getNodes())
170                        for (Demand d: this.currentNetPlan.getDemands())
171                                { ctlQueue_nd.put(Pair.of(n, d), new LinkedList<PacketInfo> ());  }
172
173                this.ctlMostUpdatedQndKnownByNodeN1_n1nd = IntFactory3D.sparse.make(N, N, D); 
174                this.ctlNumPacketsQueue_nd = new int [N][D];
175                
176                this.stat_accumNumGeneratedPackets_d = new int [D];
177                this.stat_accumNumReceivedPackets_d = new int [D];
178                this.stat_accumNumQeueusPackets_nd= new int [N][D]; 
179                
180                /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */
181                this.scheduleEvent(new SimEvent (routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * routing_fixedPacketDurationAndSchedulingInterval.getDouble(), SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1));
182                this.scheduleEvent(new SimEvent (routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1));
183                for (Node n : currentNetPlan.getNodes())
184                {
185                        final double signalingTime = signaling_isSynchronous.getBoolean()? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
186                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , n));
187                }
188
189                /* Intialize the traces */
190                this.stat_traceOf_xp = new TimeTrace ();
191                this.stat_traceOf_ye = new TimeTrace ();
192                this.stat_traceOf_queueSizes = new TimeTrace ();
193                this.stat_traceOf_objFunction = new TimeTrace (); 
194                this.stat_traceOf_hd = new TimeTrace ();
195                this.stat_traceOf_xp.add(0.0 , netPlanRouteCarriedTrafficMap (this.currentNetPlan));
196                this.stat_traceOf_ye.add(0.0, currentNetPlan.getVectorLinkCarriedTraffic());
197                this.stat_traceOf_queueSizes.add(0.0, copyOf(this.ctlNumPacketsQueue_nd));
198                this.stat_traceOf_objFunction.add(0.0, computeObjectiveFunctionFromNetPlan(this.currentNetPlan));
199                this.stat_traceOf_hd.add(0.0, currentNetPlan.getVectorDemandOfferedTraffic());
200                
201                /* CONG CONTROL */
202                /* Set the initial prices in the links: 1.0 */
203                /* Initialize the information each demand knows of the prices of all the links */
204                /* Compute the traffic each demand injects, set it in net2plan, and tell routing layer there was an update in this */
205                for (Demand d : currentNetPlan.getDemands())
206                        d.setOfferedTraffic(this.computeHdFromPrices(d));
207                
208                /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */
209                for (Demand d : currentNetPlan.getDemands())
210                {
211                        final double updateTime = (cc_update_isSynchronous.getBoolean())? cc_update_averageInterUpdateTime.getDouble() : Math.max(0 , cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
212                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE , d));
213                }
214        }
215
216        @Override
217        public void processEvent(NetPlan currentNetPlan, SimEvent event)
218        {
219                final double time = event.getEventTime();
220                switch (event.getEventType())
221                {
222                case UPDATE_STATISTICTRACES:
223                {
224                        for (Route r : this.currentNetPlan.getRoutes())
225                        {
226                                final int numPacketsArrivedDestinationLastInterval = stat_mapRouteId2CarriedPacketsLastInterval.get(r);
227                                final double trafficArrivedDestinationLastInterval = numPacketsArrivedDestinationLastInterval * routing_numTrafficUnitsOfOnePacket.getDouble() / (routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt ());
228                                r.setCarriedTraffic(trafficArrivedDestinationLastInterval , trafficArrivedDestinationLastInterval);
229                                /* Set zero in the counter of packets */
230                                stat_mapRouteId2CarriedPacketsLastInterval.put(r,0);
231                        }
232                        
233                        /* Update the traces */
234                        this.stat_traceOf_xp.add(time , netPlanRouteCarriedTrafficMap (this.currentNetPlan));
235                        this.stat_traceOf_ye.add(time, currentNetPlan.getVectorLinkCarriedTraffic());
236                        this.stat_traceOf_objFunction.add(time, computeObjectiveFunctionFromNetPlan(this.currentNetPlan));
237                        final double scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic = this.routing_numTrafficUnitsOfOnePacket.getDouble() / this.routing_statNumSchedSlotBetweenN2PRecomputing.getInt ();
238                        /* We store the average queue sizes in traffic units */
239                        this.stat_traceOf_queueSizes.add(time, scaledCopyOf(this.stat_accumNumQeueusPackets_nd , scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic));
240                        this.stat_accumNumQeueusPackets_nd = new int [N][D]; // reset the count
241                        this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1));
242                        break;
243                }
244                
245                case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nd for any demand
246                {
247                        final Triple<Node,Node,int []> signalInfo = (Triple<Node,Node,int []>) event.getEventObject();
248                        final Node nMe = signalInfo.getFirst();
249                        final int index_nIdMe = nMe.getIndex ();
250                        final Node nNeighbor = signalInfo.getSecond();
251                        final int index_nNeighbor = nNeighbor.getIndex ();
252                        final int [] receivedInfo_q_d = signalInfo.getThird();
253                        for (int index_d = 0 ; index_d < D ; index_d ++)
254                                ctlMostUpdatedQndKnownByNodeN1_n1nd.set(index_nIdMe, index_nNeighbor, index_d, receivedInfo_q_d[index_d]);
255                        break;
256
257                }
258                
259                case SIGNALING_WAKEUPTOSENDMESSAGE: // A node broadcasts signaling info to its 1 hop neighbors
260                {
261                        final Node nMe = (Node) event.getEventObject();
262                        final int index_nMe = nMe.getIndex ();
263                        
264                        log.println("t=" + time + ": ROUTING_NODEWAKEUPTO_TRANSMITSIGNAL node " + nMe);
265
266                        /* Create the info I will signal */
267                        int [] infoToSignal_q_d = Arrays.copyOf(ctlNumPacketsQueue_nd [index_nMe], D);
268
269                        /* Send the events of the signaling information messages to incoming neighbors */
270                        for (Node n : nMe.getInNeighbors())
271                        {
272                                if (rng.nextDouble() >= this.signaling_signalingLossProbability.getDouble()) // the signaling may be lost => lost to all nodes
273                                {
274                                        final double signalingReceptionTime = time + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5));
275                                        this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Triple.of(n , nMe , infoToSignal_q_d)));
276                                }
277                        }
278                        
279                        /* Re-schedule when to wake up again */ 
280                        final double signalingTime = signaling_isSynchronous.getBoolean()? time + signaling_averageInterMessageTime.getDouble() : Math.max(time , time + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
281                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , nMe));
282                        break;
283                }
284
285                case UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS: // a node updates its p_n, p_e values, using most updated info available
286                {
287                        log.println("t=" + time + ": UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS");
288
289                        /* First, create traffic of the demands. A constant amount */
290                        for (Demand d : this.currentNetPlan.getDemands())
291                        {
292                                final double avgNumPackets_d = d.getOfferedTraffic() / routing_numTrafficUnitsOfOnePacket.getDouble();
293                                final int numPackets_d = Poisson.staticNextInt(avgNumPackets_d);
294                                final Node a_d = d.getIngressNode();
295                                LinkedList<PacketInfo> q = this.ctlQueue_nd.get(Pair.of(a_d, d));
296                                for (int cont = 0 ; cont < numPackets_d ; cont ++)
297                                        q.add (new PacketInfo (d , time));
298                                this.ctlNumPacketsQueue_nd [a_d.getIndex ()][d.getIndex ()] += numPackets_d;
299                                this.stat_accumNumGeneratedPackets_d [d.getIndex ()] += numPackets_d;
300                        }
301
302                        /* Forwarding decisions for each link. Each link can transmit as many packets as its capacity divided by trafic per packet factor (floor), during this scheduling */
303                        int [][] numPacketsToForward_ed = new int [E][D];
304                        int [][] numPacketsToForward_nd = new int [N][D];
305                        for (int index_e = 0 ; index_e  < E ; index_e ++)
306                        {
307                                final Link e = currentNetPlan.getLink (index_e);
308                                final int maxNumPacketsToTransmit = (int) Math.floor(e.getCapacity() / this.routing_numTrafficUnitsOfOnePacket.getDouble());
309                                final Node a_e = e.getOriginNode();
310                                final Node b_e = e.getDestinationNode();
311                                final int index_ae = a_e.getIndex ();
312                                final int index_be = b_e.getIndex ();
313                                for (int cont = 0 ; cont < maxNumPacketsToTransmit ; cont ++)
314                                {
315                                        /* Take the queue with higher */
316                                        double highestBackpressure = -1;
317                                        int indexDemandToTransmitPacket = -1;
318                                        for (int index_d = 0 ; index_d < D ; index_d ++)
319                                        {
320                                                final Demand d = currentNetPlan.getDemand (index_d);
321                                                final int thisQueueSize = ctlNumPacketsQueue_nd [index_ae][index_d];
322                                                if (thisQueueSize != ctlQueue_nd.get(Pair.of(a_e, d)).size ()) throw new RuntimeException ("Bad"); 
323                                                final int neighborQueueSize = ctlMostUpdatedQndKnownByNodeN1_n1nd.get(index_ae,index_be,index_d);
324                                                final double backpressure = routing_gradient_gammaStep.getDouble() * (thisQueueSize - numPacketsToForward_nd[index_ae][index_d] - neighborQueueSize);
325                                                if (backpressure > highestBackpressure) { indexDemandToTransmitPacket = index_d; highestBackpressure = backpressure; } 
326                                        }
327                                        if (highestBackpressure <= 0) break; // no more queues will be empty, not enough backpressures
328                                        
329                                        numPacketsToForward_ed[index_e][indexDemandToTransmitPacket] ++;
330                                        numPacketsToForward_nd[index_ae][indexDemandToTransmitPacket] ++;
331                                }
332                        }
333
334                        /* Apply the forwarding decisions */
335                        for (int index_e = 0 ; index_e  < E ; index_e ++)
336                        {
337                                final Link e = currentNetPlan.getLink (index_e);
338                                final Node a_e = e.getOriginNode();
339                                final Node b_e = e.getDestinationNode();
340                                final int index_ae = a_e.getIndex ();
341                                final int index_be = b_e.getIndex ();
342                                for (int index_d = 0 ; index_d < D ; index_d ++)
343                                {
344                                        final Demand d = currentNetPlan.getDemand (index_d);
345                                        final int numPackets_ed = numPacketsToForward_ed [index_e][index_d];
346
347                                        for (int cont = 0 ; cont < numPackets_ed ; cont ++)
348                                        {
349                                                //System.out.println("Pop e: " + e + ", d: " + d + ", numpack quque: " + this.ctlQueue_nd.get(Pair.of(a_e,d)).size() );
350
351                                                PacketInfo p = this.ctlQueue_nd.get(Pair.of(a_e,d)).pop();
352                                                ctlNumPacketsQueue_nd [index_ae][index_d] --;
353                                                if (this.ctlQueue_nd.get(Pair.of(a_e,d)).size() != ctlNumPacketsQueue_nd [index_ae][index_d]) throw new RuntimeException ("Bad");
354                                                p.seqLinks.add(e); // update the seq of links
355                                                if (b_e == d.getEgressNode())
356                                                {
357                                                        /* Packet arrived to destination: update the statistics */
358                                                        Route route = stat_mapSeqLinks2RouteId.get(p.seqLinks);
359                                                        
360                                                        /* If the route does not exist, create it */
361                                                        if (route == null) 
362                                                        {
363                                                                route = this.currentNetPlan.addRoute(d, 0.0 , 0.0 , p.seqLinks, null);
364                                                                stat_mapSeqLinks2RouteId.put(p.seqLinks , route);
365                                                                stat_mapRouteId2CarriedPacketsLastInterval.put(route, 1);
366                                                        }
367                                                        else
368                                                        {
369                                                                stat_mapRouteId2CarriedPacketsLastInterval.put(route, stat_mapRouteId2CarriedPacketsLastInterval.get(route) + 1);
370                                                                this.stat_accumNumReceivedPackets_d [index_d] ++;
371                                                        }
372                                                }
373                                                else
374                                                {
375                                                        /* Not the end node => put it in the next queue */
376                                                        this.ctlQueue_nd.get(Pair.of(b_e,d)).addLast(p);
377                                                        ctlNumPacketsQueue_nd [index_be][index_d] ++;
378                                                }
379                                        }
380                                }
381                                
382                        }
383                        
384                        /* Update the statistics of accum number of packets in a queue */
385                        for (int index_n = 0 ; index_n < N ; index_n ++)
386                                for (int index_d = 0 ; index_d < D ; index_d ++)
387                                        stat_accumNumQeueusPackets_nd [index_n][index_d] += this.ctlNumPacketsQueue_nd [index_n][index_d];
388                        
389                        this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1));
390                        
391                        if (time > this.routing_maxNumberOfN2PStatisticsIntervals.getInt() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * this.routing_fixedPacketDurationAndSchedulingInterval.getDouble()) { this.endSimulation (); }
392
393                        break;
394                }
395                        
396                case CC_UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available
397                {
398                        final Demand dMe = (Demand) event.getEventObject();
399                        final double new_hd = computeHdFromPrices (dMe);
400                        dMe.setOfferedTraffic(new_hd);
401                        
402                        final double updateTime = cc_update_isSynchronous.getBoolean()? time + cc_update_averageInterUpdateTime.getDouble() : Math.max(time , time + cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
403                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE,  dMe));
404
405                        this.stat_traceOf_hd.add(time, currentNetPlan.getVectorDemandOfferedTraffic());
406
407                        if (time > this.cc_simulation_maxNumberOfUpdateIntervals.getDouble() * this.cc_update_averageInterUpdateTime.getDouble()) { this.endSimulation (); }
408                        
409                        break;
410                }
411
412                default: throw new RuntimeException ("Unexpected received event");
413                }
414                
415                
416        }
417
418        public String finish (StringBuilder st , double simTime)
419        {
420                System.out.println("stat_accumNumGeneratedPackets_d: " + Arrays.toString(stat_accumNumGeneratedPackets_d));
421                System.out.println("stat_accumNumReceivedPackets_d: " + Arrays.toString(stat_accumNumReceivedPackets_d));
422                System.out.println("transmittedNotReceived: " + Arrays.toString(IntUtils.substract(stat_accumNumGeneratedPackets_d, stat_accumNumReceivedPackets_d)));
423
424                /* If no output file, return */
425                if (simulation_outFileNameRoot.getString().equals("")) return null;
426                
427                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), new double [] { computeObjectiveFunctionFromNetPlan (optNetPlan.getFirst()) } );
428                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_xp.txt"), optNetPlan.getFirst().getVectorRouteCarriedTraffic());
429                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), optNetPlan.getFirst().getVectorDemandOfferedTraffic());
430                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_qnd.txt"), DoubleUtils.mult(optNetPlan.getSecond(), 1/routing_gradient_gammaStep.getDouble()));
431                this.stat_traceOf_queueSizes.printToFile(new File (simulation_outFileNameRoot.getString() + "_qnd.txt"));
432                this.stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt"));
433                this.stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt"));
434                for (Pair<Double,Object> sample : this.stat_traceOf_xp.getList())
435                {
436                        Map<Long,Double> x_p = (Map<Long,Double>) sample.getSecond ();
437                        for (long r: this.currentNetPlan.getRouteIds())
438                                if (!x_p.containsKey(r)) x_p.put(r , 0.0); 
439                }
440                this.stat_traceOf_xp.printToFile(new File (simulation_outFileNameRoot.getString() + "_xp.txt"));
441                this.stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt"));
442                
443                
444                return null;
445        }
446        
447        private Pair<NetPlan,double[][]> computeOptimumSolution ()
448        {
449                /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */
450                final int D = this.currentNetPlan.getNumberOfDemands();
451                final int E = this.currentNetPlan.getNumberOfLinks();
452                final DoubleMatrix1D u_e = this.currentNetPlan.getVectorLinkCapacity();
453                                
454                
455                OptimizationProblem op = new OptimizationProblem();
456                op.addDecisionVariable("x_de", false , new int[] { D , E }, 0 , u_e.getMaxLocation() [0]);
457                op.addDecisionVariable("h_d", false, new int[] {1, D}, cc_control_minHd.getDouble() , cc_control_maxHd.getDouble());
458
459                op.setInputParameter("u_e", u_e , "row");
460                op.setInputParameter("alpha", this.cc_control_fairnessFactor.getDouble());
461
462                /* Set some input parameters */
463                /* Sets the objective function */
464                if (cc_control_fairnessFactor.getDouble() == 1)
465                    op.setObjectiveFunction("maximize", "sum(ln(h_d))");
466                else if (cc_control_fairnessFactor.getDouble() == 0)
467                    op.setObjectiveFunction("maximize", "sum(h_d)");
468                else
469                    op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))");
470
471                op.setInputParameter("A_ne", currentNetPlan.getMatrixNodeLinkIncidence());
472                op.setInputParameter("A_nd", currentNetPlan.getMatrixNodeDemandIncidence());
473                op.addConstraint("A_ne * (x_de') >= A_nd * diag(h_d)" , "flowConservationConstraints"); // the flow-conservation constraints (NxD constraints)
474                op.addConstraint("sum(x_de,1) <= u_e"); // the capacity constraints (E constraints)
475                
476                /* Call the solver to solve the problem */
477                op.solve("ipopt");
478
479                /* If an optimal solution was not found, quit */
480                if (!op.solutionIsOptimal ()) throw new RuntimeException ("An optimal solution was not found");
481        
482                /* Retrieve the optimum solutions. Convert the bps into Erlangs */
483                final DoubleMatrix1D h_d_array = op.getPrimalSolution("h_d").view1D ();
484                final DoubleMatrix2D x_de_array = op.getPrimalSolution("x_de").view2D ();
485                final double [][] q_nd_array = (double [][]) op.getMultipliersOfConstraint("flowConservationConstraints").assign(DoubleFunctions.abs).toArray();
486
487                /* Convert the x_de variables into a set of routes for each demand  */
488                NetPlan np = this.currentNetPlan.copy();
489                np.removeAllUnicastRoutingInformation();
490                np.setVectorDemandOfferedTraffic(h_d_array);
491                np.setRoutingFromDemandLinkCarriedTraffic(x_de_array , false , false , null);
492
493                /* Check solution: all traffic is carried, no link oversubscribed */
494                for (Demand d : np.getDemands())
495                {
496                        final double thisDemand_hd = d.getOfferedTraffic();
497                        if (Math.abs (d.getCarriedTraffic() - d.getOfferedTraffic()) > 1e-3) throw new RuntimeException ("Bad");
498                        if (thisDemand_hd < cc_control_minHd.getDouble() - 1E-3) throw new RuntimeException ("Bad");
499                        if (thisDemand_hd > cc_control_maxHd.getDouble() + 1E-3) throw new RuntimeException ("Bad");
500                }
501                if (np.getVectorLinkUtilization().getMaxLocation() [0] > 1.001) throw new RuntimeException ("Bad");
502
503                return Pair.of(np,q_nd_array);
504        }
505        
506        
507        private class PacketInfo
508        {
509                final Demand d;
510                final double packetCreationTime; 
511                LinkedList<Link> seqLinks;
512                PacketInfo (Demand d , double creationTime) { this.d = d; this.packetCreationTime = creationTime; this.seqLinks = new LinkedList<Link> (); }
513        }
514        private int [][] copyOf (int [][] a) { int [][] res = new int [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; }
515        private double [][] copyOf (double [][] a) { double [][] res = new double [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; }
516        private double [][] scaledCopyOf (int [][] a , double mult) { double [][] res = new double [a.length][a[0].length]; for (int cont = 0 ; cont < a.length ; cont ++) for (int cont2 = 0; cont2 < a[cont].length ; cont2 ++) res [cont][cont2] = a[cont][cont2] * mult; return res; }
517        
518        
519        private double computeObjectiveFunctionFromNetPlan (NetPlan np)
520        {
521                double objFunc = 0;
522                for (Demand d : np.getDemands())
523                {
524                        final double h_d = d.getOfferedTraffic();
525                        objFunc += (this.cc_control_fairnessFactor.getDouble() == 1)? Math.log(h_d) : Math.pow(h_d, 1-this.cc_control_fairnessFactor.getDouble()) / (1-this.cc_control_fairnessFactor.getDouble());
526                }
527                return objFunc;
528        }
529        
530        private double computeHdFromPrices (Demand d)
531        {
532                /* compute the demand price as weighted sum in the routes of route prices  */
533                final int index_ad = d.getIngressNode().getIndex ();
534                final int index_d = d.getIndex ();
535                final double demandInitialNodeQueueSize = this.routing_numTrafficUnitsOfOnePacket.getDouble() * ((double) this.ctlNumPacketsQueue_nd [index_ad][index_d]) * this.routing_gradient_gammaStep.getDouble();
536                /* compute the new h_d */
537                final double new_hd = Math.max(this.cc_control_minHd.getDouble() , Math.min(this.cc_control_maxHd.getDouble(), Math.pow(demandInitialNodeQueueSize, -1/this.cc_control_fairnessFactor.getDouble())));
538
539                return new_hd;
540        }
541
542        private Map<Long,Double> netPlanRouteCarriedTrafficMap (NetPlan np) { Map<Long,Double> res = new HashMap<Long,Double> (); for (Route r : np.getRoutes ()) res.put (r.getId () , r.getCarriedTraffic()); return res; }
543
544}