001/*******************************************************************************
002 * Copyright (c) 2017 Pablo Pavon Marino and others.
003 * All rights reserved. This program and the accompanying materials
004 * are made available under the terms of the 2-clause BSD License 
005 * which accompanies this distribution, and is available at
006 * https://opensource.org/licenses/BSD-2-Clause
007 *
008 * Contributors:
009 *     Pablo Pavon Marino and others - initial API and implementation
010 *******************************************************************************/
011package com.net2plan.examples.ocnbook.onlineSim;
012
013
014import cern.colt.matrix.tdouble.DoubleFactory1D;
015import cern.colt.matrix.tdouble.DoubleFactory2D;
016import cern.colt.matrix.tdouble.DoubleMatrix1D;
017import cern.colt.matrix.tdouble.DoubleMatrix2D;
018import com.jom.OptimizationProblem;
019import com.net2plan.interfaces.networkDesign.*;
020import com.net2plan.interfaces.simulation.IEventProcessor;
021import com.net2plan.interfaces.simulation.SimEvent;
022import com.net2plan.utils.Constants.RoutingType;
023import com.net2plan.utils.*;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.PrintStream;
028import java.util.*;
029
030/** 
031 * This module implements a distributed primal-gradient based algorithm, for iteratively adapting the network routing.
032 *
033 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 
034 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 
035 * The time evolution of different metrics can be stored in output files, for later processing. 
036 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec9_3_adaptiveRoutingPrimal.m">{@code fig_sec9_3_adaptiveRoutingPrimal.m}</a> MATLAB file used for generating the graph/s of the case study in the 
037 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm.
038 * 
039 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator.
040 * 
041 * @net2plan.keywords Flow assignment (FA), Distributed algorithm, Primal gradient algorithm
042 * @net2plan.ocnbooksections Section 9.3
043 * @net2plan.inputParameters 
044 * @author Pablo Pavon-Marino
045 */
046@SuppressWarnings("unchecked")
047public class Online_evProc_adaptiveRoutingPrimal extends IEventProcessor
048{
049        private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages");
050        private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true);
051        private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true);
052        private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true);
053        private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
054        private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true);
055        private InputParameter update_isSynchronous = new InputParameter ("update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state");
056        private InputParameter update_averageInterUpdateTime = new InputParameter ("update_averageInterUpdateTime", 1.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true);
057        private InputParameter update_maxFluctuationInterUpdateTime = new InputParameter ("update_maxFluctuationInterUpdateTime", 0.5 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
058        private InputParameter gradient_maxGradientAbsoluteNoise = new InputParameter ("gradient_maxGradientAbsoluteNoise", 0.0 , "Max value of the added noise to the gradient coordinate in absolute values" , 0 , true , Double.MAX_VALUE , true);
059        private InputParameter gradient_gammaStep = new InputParameter ("gradient_gammaStep", 1.0 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true);
060        private InputParameter gradient_heavyBallBetaParameter = new InputParameter ("gradient_heavyBallBetaParameter", 0.0 , "Beta parameter of heavy ball, between 0 and 1. Value 0 means no heavy ball" , 0 , true , 1.0 , true);
061        private InputParameter gradient_maxGradientCoordinateChange = new InputParameter ("gradient_maxGradientCoordinateChange", 1000.0 , "Maximum change in an iteration of a gradient coordinate" , 0 , false , Double.MAX_VALUE , true);
062        private InputParameter simulation_maxNumberOfUpdateIntervals = new InputParameter ("simulation_maxNumberOfUpdateIntervals", 100.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true);
063        private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator");
064        private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "adaptiveRoutingPrimal" , "Root of the file name to be used in the output files. If blank, no output");
065        private InputParameter control_maxNumberOfPathsPerDemand = new InputParameter ("control_maxNumberOfPathsPerDemand", (int) 5 , "Number of acceptable paths per demand. Take the k-shortest paths in number of hops" , 1 , Integer.MAX_VALUE);
066
067        private static PrintStream getNulFile () { try { return new PrintStream (new FileOutputStream ("NUL") , false); } catch (Exception e) {e.printStackTrace(); throw new RuntimeException ("Not NUL file"); }   } 
068        private PrintStream log = getNulFile (); //System.err;//new PrintStream (new FileOutputStream ("NUL") , true);
069
070        private Random rng;
071
072        private static final int SIGNALING_WAKEUPTOSENDMESSAGE = 300;
073        private static final int SIGNALING_RECEIVEDMESSAGE = 301;
074        private static final int UPDATE_WAKEUPTOUPDATE = 302;
075
076        private NetPlan currentNetPlan;
077
078        private DoubleMatrix1D control_price_e;
079        private DoubleMatrix2D control_mostUpdated_price_e_knownByNode_n;
080        private DoubleMatrix1D control_previousXp;
081        private int [][] control_routeIndexes_d;
082        private int E , R , N , D;
083        
084//      private Map<Node,DoubleMatrix1D> routing_mostUpdated_price_e_knownByNode_n; 
085//      private Map<Long,Set<Long>> routing_outgoingDemands_n;
086//      private Map<Long,Double> routing_previousXp;
087        
088        private TimeTrace stat_traceOf_xp;
089        private TimeTrace stat_traceOf_objFunction;
090        
091        
092        @Override
093        public String getDescription()
094        {
095                return "This module implements a distributed primal-gradient based algorithm, for iteratively adapting the network routing.";
096        }
097
098        @Override
099        public List<Triple<String, String, String>> getParameters()
100        {
101                /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */
102                return InputParameter.getInformationAllInputParameterFieldsOfObject(this);
103        }
104
105        @Override
106        public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters)
107        {
108                /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */
109                InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters);
110
111                if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks");
112
113                this.rng = new Random (simulation_randomSeed.getLong());
114                this.currentNetPlan = currentNetPlan;
115                this.E = currentNetPlan.getNumberOfLinks();
116                this.N = currentNetPlan.getNumberOfNodes();
117                this.D = currentNetPlan.getNumberOfDemands();
118                
119                /* Check all nodes have output links */
120                for (Node n : currentNetPlan.getNodes()) if (n.getOutgoingLinks().isEmpty()) throw new Net2PlanException ("All nodes should have output links");
121
122//              /* Compute the CPL */
123//              this.cpl = new CandidatePathList (this.currentNetPlan , "K" , "" + routing_maxNumberOfPathsPerDemand);
124                
125                /* Sets the initial routing, with all the traffic balanced equally in all the paths of the demand */
126                currentNetPlan.removeAllUnicastRoutingInformation();
127                currentNetPlan.setRoutingTypeAllDemands(RoutingType.SOURCE_ROUTING);
128                this.currentNetPlan.addRoutesFromCandidatePathList(currentNetPlan.computeUnicastCandidatePathList(null , control_maxNumberOfPathsPerDemand.getInt(), -1, -1, -1, -1, -1, -1 , null));
129
130                this.R = currentNetPlan.getNumberOfRoutes ();
131                this.control_previousXp = DoubleFactory1D.dense.make (currentNetPlan.getNumberOfRoutes());
132                this.control_routeIndexes_d = new int [D][];
133                for (Demand d : this.currentNetPlan.getDemands())
134                {
135                        final double h_d = d.getOfferedTraffic();
136                        final Set<Route> routes = d.getRoutes();
137                        if (routes.isEmpty()) throw new Net2PlanException ("A demand has no paths assigned: not connected topology");
138                        control_routeIndexes_d [d.getIndex ()] = new int [routes.size ()]; 
139                        int counter_r = 0;
140                        for (Route r : routes)
141                        {
142                                control_routeIndexes_d [d.getIndex ()][counter_r ++] = r.getIndex ();
143                                r.setCarriedTraffic(h_d / routes.size() , h_d / routes.size());
144                                control_previousXp.set(r.getIndex(), h_d / routes.size());
145                                log.println("Initial route: " + r + " demand: " +d + ", h_d: " + h_d + ", h_r = " + r.getCarriedTraffic() + ", seqLinks: "+ r.getSeqLinks());
146                        }
147                }
148
149                /* Set the initial prices in the nodes */
150                this.control_price_e = DoubleFactory1D.dense.make (E);
151                for (Link e : currentNetPlan.getLinks ()) this.control_price_e.set (e.getIndex() , computeLinkPriceFromNetPlan (e));
152                
153                /* Initialize the information each demand knows of the prices of all the links */
154                this.control_mostUpdated_price_e_knownByNode_n = DoubleFactory2D.dense.make (N , E);
155                for (int n = 0; n < N ; n ++) control_mostUpdated_price_e_knownByNode_n.viewRow (n).assign (control_price_e);
156                
157                /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */
158                for (Node n : currentNetPlan.getNodes())
159                {
160                        final double signalingTime = (signaling_isSynchronous.getBoolean())? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
161                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , n));
162                        final double updateTime = (update_isSynchronous.getBoolean())? update_averageInterUpdateTime.getDouble() : Math.max(0 , update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
163                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE , n));
164                }
165
166                /* Intialize the traces */
167                this.stat_traceOf_xp = new TimeTrace();
168                this.stat_traceOf_objFunction = new TimeTrace (); 
169                this.stat_traceOf_xp.add(0.0, this.currentNetPlan.getVectorRouteCarriedTraffic());
170                this.stat_traceOf_objFunction.add(0.0, computeObjectiveFucntionFromNetPlan());
171
172                /* */
173                log.println("Initial rouetrs in this.currentNetPlan.getRouteCarriedTrafficMap(): " + this.currentNetPlan.getVectorRouteCarriedTraffic());
174        }
175
176        @Override
177        public void processEvent(NetPlan currentNetPlan, SimEvent event)
178        {
179                final double t = event.getEventTime();
180                switch (event.getEventType())
181                {
182                case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nt for any destination
183                {
184                        final Pair<Node,Map<Link,Double>> signalInfo = (Pair<Node,Map<Link,Double>>) event.getEventObject();
185                        final Node nMe = signalInfo.getFirst();
186                        final Map<Link,Double> receivedInfo_price_e = signalInfo.getSecond();
187                        for (Link e : receivedInfo_price_e.keySet())
188                                this.control_mostUpdated_price_e_knownByNode_n.set (nMe.getIndex () , e.getIndex() , receivedInfo_price_e.get(e));
189                        log.println("t=" + t + ": SIGNALING_RECEIVEDMESSAGE node " + nMe);
190                        break;
191                }
192                
193                case SIGNALING_WAKEUPTOSENDMESSAGE: // A node broadcasts signaling info to its 1 hop neighbors
194                {
195                        final Node nMe = (Node) event.getEventObject();
196                        log.println("t=" + t + ": ROUTING_NODEWAKEUPTO_TRANSMITSIGNAL node " + nMe);
197
198                        /* Create the info I will signal */
199                        Map<Link,Double> infoToSignal_price_e = new HashMap<Link,Double> ();
200                        for (Link e : nMe.getOutgoingLinks())
201                                infoToSignal_price_e.put (e , this.computeLinkPriceFromNetPlan(e));
202
203                        /* Send the events of the signaling information messages to all the nodes */
204                        if (rng.nextDouble() >= this.signaling_signalingLossProbability.getDouble()) // the signaling may be lost => lost to all nodes
205                                for (Node n : currentNetPlan.getNodes ())
206                                {
207                                        final double signalingReceptionTime = t + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5));
208                                        this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Pair.of(n , infoToSignal_price_e)));
209                                }
210                        
211                        /* Re-schedule when to wake up again */
212                        final double signalingTime = signaling_isSynchronous.getBoolean()? t + signaling_averageInterMessageTime.getDouble() : Math.max(t , t + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
213                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , nMe));
214                        break;
215                }
216
217                case UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available
218                {
219                        final Node nMe = (Node) event.getEventObject(); 
220                        log.println("t=" + t + ": ROUTING_NODEWAKEUPTO_RECOMPUTE node " + nMe);
221                        //log.println("p_e values Before = " + mac_p_e);
222                        
223                        //if (1 ==1) break;
224                        DoubleMatrix1D x_p = this.currentNetPlan.getVectorRouteCarriedTraffic();
225                        DoubleMatrix1D infoIKnow_price_e = this.control_mostUpdated_price_e_knownByNode_n.viewRow (nMe.getIndex ());
226                        for (Demand d : nMe.getOutgoingDemands())
227                        {
228                                final double h_d = d.getOfferedTraffic();
229                                DoubleMatrix1D new_x_p = DoubleFactory1D.dense.make (R);
230                                for (int rIndex : control_routeIndexes_d [d.getIndex ()])
231                                {
232                                        final Route r = currentNetPlan.getRoute (rIndex);
233                                        final double old_xp = x_p.get(rIndex);
234                                        double accumPrices = 0;
235                                        for (Link e : r.getSeqLinks()) accumPrices += infoIKnow_price_e.get(e.getIndex());
236                                        accumPrices += 2*gradient_maxGradientAbsoluteNoise.getDouble()*(this.rng.nextDouble()-0.5);
237                                        new_x_p.set (r.getIndex () , old_xp - this.gradient_gammaStep.getDouble() * accumPrices + this.gradient_heavyBallBetaParameter.getDouble() * (r.getCarriedTraffic() - this.control_previousXp.get(r.getIndex ())  ));
238                                }
239
240                                GradientProjectionUtils.euclideanProjection_sumEquality (new_x_p , control_routeIndexes_d [d.getIndex ()] , h_d);
241
242                                if (gradient_maxGradientCoordinateChange.getDouble() > 0)
243                                        GradientProjectionUtils.scaleDown_maxAbsoluteCoordinateChange (x_p , new_x_p , control_routeIndexes_d [d.getIndex ()] ,  gradient_maxGradientCoordinateChange.getDouble());
244
245                                for (int rIndex : control_routeIndexes_d [d.getIndex ()])
246                                {
247                                        final double newCarriedTraffic = new_x_p.get (rIndex);
248                                        final Route r = currentNetPlan.getRoute (rIndex);
249                                        control_previousXp.set (rIndex , r.getCarriedTraffic());
250                                        r.setCarriedTraffic(newCarriedTraffic , newCarriedTraffic);
251                                }
252                        }
253
254                        final double updateTime = update_isSynchronous.getBoolean()? t + update_averageInterUpdateTime.getDouble() : Math.max(t , t + update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
255                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE,  nMe));
256
257                        this.stat_traceOf_xp.add(t, currentNetPlan.getVectorRouteCarriedTraffic());
258                        this.stat_traceOf_objFunction.add(t, computeObjectiveFucntionFromNetPlan());
259
260                        if (t > this.simulation_maxNumberOfUpdateIntervals.getDouble() * this.update_averageInterUpdateTime.getDouble()) { this.endSimulation (); }
261                        
262                        break;
263                }
264                        
265
266                default: throw new RuntimeException ("Unexpected received event");
267                }
268                
269                
270        }
271
272        public String finish (StringBuilder st , double simTime)
273        {
274                if (simulation_outFileNameRoot.getString().equals("")) return null;
275                stat_traceOf_xp.printToFile(new File (simulation_outFileNameRoot.getString() + "_xp.txt"));
276                stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt"));
277                Pair<DoubleMatrix1D,Double> optPair = computeOptimumSolution ();
278                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_xp.txt"), optPair.getFirst());
279                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), optPair.getSecond());
280                return null;
281        }
282        
283        private double computeLinkPriceFromNetPlan (Link e)
284        {
285                final double y_e = e.getCarriedTraffic();
286                final double u_e = e.getCapacity();
287                if (u_e == 0) throw new RuntimeException ("Zero capacity in a link means");
288                final double gradient = (y_e / u_e > 0.99)? 1/Math.pow(u_e * 0.01,2) : 1/Math.pow(u_e - y_e,2); 
289                return gradient;
290        }
291
292        
293        
294        private double computeObjectiveFucntionFromNetPlan ()
295        {
296                double objFunc = 0;
297                for (Link e : this.currentNetPlan.getLinks())
298                {
299                        final double y_e = e.getCarriedTraffic();
300                        final double u_e = e.getCapacity();
301                        if (y_e / u_e > 0.99)
302                                objFunc += 1/(0.01*u_e) + 1/Math.pow(u_e * 0.01,2) * (y_e - 0.99*u_e);
303                        else
304                                objFunc += 1/(u_e - y_e);
305                }
306                return objFunc;
307        }
308        
309        private Pair<DoubleMatrix1D,Double> computeOptimumSolution ()
310        {
311                /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */
312                final int R = this.currentNetPlan.getNumberOfRoutes();
313                final int E = this.currentNetPlan.getNumberOfLinks();
314                final DoubleMatrix1D h_d = this.currentNetPlan.getVectorDemandOfferedTraffic();
315                final DoubleMatrix1D u_e = this.currentNetPlan.getVectorLinkCapacity();
316                final DoubleMatrix1D h_p = this.currentNetPlan.getVectorRouteOfferedTrafficOfAssociatedDemand();
317                                
318                OptimizationProblem op = new OptimizationProblem();
319                op.addDecisionVariable("x_p", false , new int[] { 1 , R }, DoubleUtils.zeros(R) , h_p.toArray());
320                op.addDecisionVariable("y_e", false , new int[] { 1 , E }, DoubleUtils.zeros(E) , u_e.toArray()); 
321                op.setInputParameter("h_d", h_d , "row");
322                op.setInputParameter("h_p", h_p , "row");
323                op.setInputParameter("u_e", u_e , "row");
324                
325                /* Set some input parameters */
326                op.setObjectiveFunction("minimize", "sum (1./(u_e - y_e))");
327
328                /* VECTORIAL FORM OF THE CONSTRAINTS  */
329                op.setInputParameter("A_dp", currentNetPlan.getMatrixDemand2RouteAssignment());
330                op.setInputParameter("A_ep", currentNetPlan.getMatrixLink2RouteAssignment());
331                op.addConstraint("A_dp * x_p' == h_d'"); // for each demand, the 100% of the traffic is carried (summing the associated paths)
332                op.addConstraint("A_ep * x_p' == y_e'"); // set the traffic in each link equal to y_e
333                
334                /* Call the solver to solve the problem */
335                op.solve("ipopt");
336
337                /* If an optimal solution was not found, quit */
338                if (!op.solutionIsOptimal ()) throw new RuntimeException ("An optimal solution was not found");
339        
340                /* Retrieve the optimum solutions. Convert the bps into Erlangs */
341                final double optCost = op.getOptimalCost();
342                DoubleMatrix1D x_p = op.getPrimalSolution("x_p").view1D();
343//              Map<Long,Double> x_p_map = new HashMap<Long,Double> ();
344//              for (long p : cpl.getPathIds()) x_p_map.put(p, x_p_array [id2IndexPathMap.get(p)]); 
345
346                /* Check the solution */
347                for (Link e : this.currentNetPlan.getLinks())
348                {
349                        double y_e = 0; for (Route r : e.getTraversingRoutes()) y_e += x_p.get (r.getIndex());
350                        if (y_e > e.getCapacity() + 1E-3) throw new RuntimeException ("Bad");
351                }
352                for (Demand d : this.currentNetPlan.getDemands())
353                {
354                        double trafDemand = 0; for (Route r : d.getRoutes()) trafDemand += x_p.get(r.getIndex ());
355                        if (Math.abs(trafDemand - d.getOfferedTraffic()) > 1E-3) throw new RuntimeException ("Bad");
356                }
357                
358                return Pair.of(x_p,optCost);
359        }
360        
361
362        
363}