001package jmri.jmrix;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.IOException;
006import java.io.OutputStream;
007import java.util.*;
008
009import javax.annotation.Nonnull;
010import javax.swing.SwingUtilities;
011
012import jmri.InstanceManager;
013import jmri.ShutDownManager;
014
015/**
016 * Abstract base for TrafficControllers in a Message/Reply protocol.
017 * <p>
018 * Two threads are used for the actual communication. The "Transmit" thread
019 * handles pushing characters to the port, and also changing the mode. The
020 * "Receive" thread converts characters from the input stream into replies.
021 * <p>
022 * The constructor registers a shutdown task to
023 * trigger the necessary cleanup code
024 * <p>
025 * The internal state machine handles changes of mode, automatic retry of
026 * certain messages, time outs, and sending poll messages when otherwise idle.
027 * <p>
028 * "Mode" refers to the state of the command station communications. "Normal"
029 * and "Programming" are the two modes, used if the command station requires
030 * messages to go back and forth between them. <br>
031 *
032 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram">
033 *
034 * <p>
035 * The key methods for the basic operation are:
036 * <ul>
037 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)}
038 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data
039 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs
040 *      information on e.g. the last message sent, that can be stored in member variables
041 *      by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}.
042 *  <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects
043 * </ul>
044 * <p>
045 * If your command station requires messages to go in and out of
046 * "programming mode", those should be provided by
047 * {@link #enterProgMode()} and {@link #enterNormalMode()}.
048 * <p>
049 * If you want to poll for information when the line is otherwise idle,
050 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}.
051 *
052 * @author Bob Jacobsen Copyright (C) 2003
053 * @author Paul Bender Copyright (C) 2004-2010
054 */
055
056/*
057@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png
058
059    [*] --> IDLESTATE
060    IDLESTATE --> NOTIFIEDSTATE : sendMessage()
061    NOTIFIEDSTATE --> IDLESTATE : queue empty
062
063    NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message
064
065    WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE
066    WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE
067
068    WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply()
069
070    WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE
071    WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE
072    OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message
073
074    IDLESTATE --> POLLSTATE : transmitLoop()\nno work
075    POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it
076    POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send
077
078    WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply
079    AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message
080
081NOTIFIEDSTATE : Transmit thread wakes up and processes
082POLLSTATE : Transient while deciding to send poll
083OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change
084AUTORETRYSTATE : Transient while deciding to resend auto-retry message
085WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply
086WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply
087WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout
088
089Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry
090Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change.
091
092@enduml
093 */
094
095public abstract class AbstractMRTrafficController {
096
097    private final Runnable shutDownTask = this::terminate; // retain for possible removal.
098
099    /**
100     * Create a new unnamed MRTrafficController.
101     */
102    public AbstractMRTrafficController() {
103        log.debug("Creating AbstractMRTrafficController instance");
104        mCurrentMode = NORMALMODE;
105        mCurrentState = IDLESTATE;
106        allowUnexpectedReply = false;
107
108
109        // We use a shutdown task here to make sure the connection is left
110        // in a clean state prior to exiting.  This is required on systems
111        // which have a service mode to ensure we don't leave the system
112        // in an unusable state. Once the shutdown task executes, the connection
113        // must be considered permanently closed.
114
115        InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask);
116    }
117
118    private boolean synchronizeRx = true;
119
120    protected void setSynchronizeRx(boolean val) {
121        synchronizeRx = val;
122    }
123
124    protected boolean getSynchronizeRx() {
125        return synchronizeRx;
126    }
127
128    // The methods to implement the abstract Interface
129
130    protected final Vector<AbstractMRListener> cmdListeners = new Vector<>();
131
132    /**
133     * Add a Listener to the Listener list.
134     * @param l The Listener to be added, not null.
135     */
136    protected synchronized void addListener(AbstractMRListener l) {
137        // add only if not already registered
138        if (l == null) {
139            throw new NullPointerException();
140        }
141        if (!cmdListeners.contains(l)) {
142            cmdListeners.addElement(l);
143        }
144    }
145
146    /**
147     * Add a Listener to start of the Listener list.
148     * Intended for use only by system Consoles which may prefer notification
149     * before other objects have processed a Message and sent a Reply.
150     * @param l The Listener to be added, not null.
151     */
152    protected synchronized void addConsoleListener(@Nonnull AbstractMRListener l){
153        // add only if not already registered
154        if (!cmdListeners.contains(l)) {
155            cmdListeners.insertElementAt(l, 0);
156        }
157    }
158
159    /**
160     * Remove a Listener from the Listener list.
161     * The Listener will receive no further notifications.
162     * @param l The Listener to be removed.
163     */
164    protected synchronized void removeListener(AbstractMRListener l) {
165        if (cmdListeners.contains(l)) {
166            cmdListeners.removeElement(l);
167        }
168    }
169
170    /**
171     * Forward a Message to registered listeners.
172     *
173     * @param m     Message to be forwarded intact
174     * @param notMe One (optional) listener to be skipped, usually because it's
175     *              the originating object.
176     */
177    @SuppressWarnings("unchecked")
178    protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
179        // make a copy of the listener vector to synchronized not needed for transmit
180        Vector<AbstractMRListener> v;
181        synchronized (this) {
182            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
183            v = (Vector<AbstractMRListener>) cmdListeners.clone();
184        }
185        // forward to all listeners
186        int cnt = v.size();
187        for (int i = 0; i < cnt; i++) {
188            AbstractMRListener client = v.elementAt(i);
189            if (notMe != client) {
190                log.debug("notify message, client: {}", client);
191                try {
192                    forwardMessage(client, m);
193                } catch (RuntimeException e) {
194                    log.warn("notify: During message dispatch to {}", client, e);
195                }
196            }
197        }
198    }
199
200    /**
201     * Implement this to forward a specific message type to a protocol-specific
202     * listener interface.
203     * This puts the casting into the concrete class.
204     * @param client abstract listener.
205     * @param m message to forward.
206     */
207    protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m);
208
209    /**
210     * Invoked if it's appropriate to do low-priority polling of the command
211     * station, this should return the next message to send, or null if the
212     * TrafficController should just sleep.
213     * @return Formatted poll message
214     */
215    protected abstract AbstractMRMessage pollMessage();
216
217    protected abstract AbstractMRListener pollReplyHandler();
218
219    protected AbstractMRListener mLastSender = null;
220
221    protected volatile int mCurrentMode;
222    public static final int NORMALMODE = 1;
223    public static final int PROGRAMINGMODE = 4;
224
225    /**
226     * Set the system to programming mode.
227     * @see #enterNormalMode()
228     *
229     * @return any message that needs to be returned to the Command Station
230     * to change modes. If no message is needed, returns null.
231     */
232    protected abstract AbstractMRMessage enterProgMode();
233
234    /**
235     * Sets the system to normal mode during programming while in IDLESTATE.
236     * If {@link #programmerIdle()} returns true, enterNormalMode() is
237     * called after a timeout.
238     * @see #enterProgMode()
239     *
240     * @return any message that needs to be returned to the Command Station
241     * to change modes. If no message is needed, returns null.
242     */
243    protected abstract AbstractMRMessage enterNormalMode();
244
245    /**
246     * Check if the programmer is idle.
247     * Override in the system specific code if necessary (see notes for
248     * {@link #enterNormalMode()}.
249     *
250     * @return true if not busy programming
251     */
252    protected boolean programmerIdle() {
253        return true;
254    }
255
256    /**
257     * Get the delay (wait time) after enabling the programming track.
258     * Override in subclass to add a longer delay.
259     *
260     * @return 0 as default delay
261     */
262    protected int enterProgModeDelayTime() {
263        return 0;
264    }
265
266    protected volatile int mCurrentState;
267    public static final int IDLESTATE = 10;        // nothing happened
268    public static final int NOTIFIEDSTATE = 15;    // xmt notified, will next wake
269    public static final int WAITMSGREPLYSTATE = 25;  // xmt has sent, await reply to message
270    public static final int WAITREPLYINPROGMODESTATE = 30;  // xmt has done mode change, await reply
271    public static final int WAITREPLYINNORMMODESTATE = 35;  // xmt has done mode change, await reply
272    public static final int OKSENDMSGSTATE = 40;        // mode change reply here, send original msg
273    public static final int AUTORETRYSTATE = 45;        // received message where automatic recovery may occur with a retransmission, re-send original msg
274    public static final int POLLSTATE = 50;   // Send program mode or poll message
275
276    protected boolean allowUnexpectedReply;
277
278    /**
279     * Set whether the command station may send messages without a request
280     * sent to it.
281     *
282     * @param expected true to allow messages without a prior request
283     */
284    protected void setAllowUnexpectedReply(boolean expected) {
285        allowUnexpectedReply = expected;
286    }
287
288    /**
289     * Forward a "Reply" from layout to registered listeners.
290     *
291     * @param r    Reply to be forwarded intact
292     * @param dest One (optional) listener to be skipped, usually because it's
293     *             the originating object.
294     */
295    @SuppressWarnings("unchecked")
296    protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
297        // make a copy of the listener vector to synchronized (not needed for transmit?)
298        Vector<AbstractMRListener> v;
299        synchronized (this) {
300            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
301            v = (Vector<AbstractMRListener>) cmdListeners.clone();
302        }
303        // forward to all listeners
304        int cnt = v.size();
305        for (int i = 0; i < cnt; i++) {
306            AbstractMRListener client = v.elementAt(i);
307            log.debug("notify reply, client: {}", client);
308            try {
309                //skip dest for now, we'll send the message to there last.
310                if (dest != client) {
311                    forwardReply(client, r);
312                }
313            } catch (RuntimeException e) {
314                log.warn("notify: During reply dispatch to {}", client, e);
315            }
316        }
317
318        // forward to the last listener who sent a message
319        // this is done _second_ so monitoring can have already stored the reply
320        // before a response is sent
321        if (dest != null) {
322            log.debug("notify reply, dest: {}", dest);
323            forwardReply(dest, r);
324        }
325    }
326
327    protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m);
328
329    /**
330     * Messages to be transmitted.
331     */
332    protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>();
333    protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>();
334
335    /**
336     * Forward message to the port. Messages are queued and then the
337     * transmission thread is notified.
338     * @see #forwardToPort(AbstractMRMessage, AbstractMRListener)
339     *
340     * @param m the message to send
341     * @param reply the Listener sending the message, often provided as 'this'
342     */
343    protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) {
344        msgQueue.addLast(m);
345        listenerQueue.addLast(reply);
346        synchronized (xmtRunnable) {
347            if (mCurrentState == IDLESTATE) {
348                mCurrentState = NOTIFIEDSTATE;
349                xmtRunnable.notify();
350            }
351        }
352        if (m != null) {
353            log.debug("just notified transmit thread with message {}", m);
354        }
355    }
356
357    /**
358     * Permanent loop for the transmit thread.
359     */
360    protected void transmitLoop() {
361        log.debug("transmitLoop starts in {}", this);
362
363        // loop forever
364        while (!connectionError && !threadStopRequest) {
365            AbstractMRMessage m = null;
366            AbstractMRListener l = null;
367            // check for something to do
368            synchronized (this) {
369                if (!msgQueue.isEmpty()) {
370                    // yes, something to do
371                    m = msgQueue.getFirst();
372                    msgQueue.removeFirst();
373                    l = listenerQueue.getFirst();
374                    listenerQueue.removeFirst();
375                    mCurrentState = WAITMSGREPLYSTATE;
376                    log.debug("transmit loop has something to do: {}", m);
377                }  // release lock here to proceed in parallel
378            }
379            // if a message has been extracted, process it
380            if (m != null) {
381                // check for need to change mode
382                log.debug("Start msg, state = {}", mCurrentMode);
383                if (m.getNeededMode() != mCurrentMode) {
384                    AbstractMRMessage modeMsg;
385                    if (m.getNeededMode() == PROGRAMINGMODE) {
386                        // change state to programming mode and send message
387                        modeMsg = enterProgMode();
388                        if (modeMsg != null) {
389                            mCurrentState = WAITREPLYINPROGMODESTATE;
390                            log.debug("Enter Programming Mode");
391                            forwardToPort(modeMsg, null);
392                            // wait for reply
393                            transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted");
394                        }
395                    } else {
396                        // change state to normal and send message
397                        modeMsg = enterNormalMode();
398                        if (modeMsg != null) {
399                            mCurrentState = WAITREPLYINNORMMODESTATE;
400                            log.debug("Enter Normal Mode");
401                            forwardToPort(modeMsg, null);
402                            // wait for reply
403                            transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted");
404                        }
405                    }
406                    if (modeMsg != null) {
407                        checkReplyInDispatch();
408                        if (mCurrentState != OKSENDMSGSTATE) {
409                            handleTimeout(modeMsg, l);
410                        }
411                        mCurrentState = WAITMSGREPLYSTATE;
412                    } else {
413                        // no mode message required, but the message
414                        // needs a different mode
415                        log.debug("Setting mode to: {}", m.getNeededMode());
416                        mCurrentMode = m.getNeededMode();
417                    }
418                }
419                forwardToPort(m, l);
420                // reply expected?
421                if (m.replyExpected()) {
422                    log.debug("reply expected is true for message {}",m);
423                    // wait for a reply, or eventually timeout
424                    transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted");
425                    checkReplyInDispatch();
426                    if (mCurrentState == WAITMSGREPLYSTATE) {
427                        handleTimeout(m, l);
428                    } else if (mCurrentState == AUTORETRYSTATE) {
429                        log.info("Message added back to queue: {}", m);
430                        msgQueue.addFirst(m);
431                        listenerQueue.addFirst(l);
432                        synchronized (xmtRunnable) {
433                            mCurrentState = IDLESTATE;
434                        }
435                    } else {
436                        resetTimeout(m);
437                    }
438                } // just continue to the next message from here
439            } else {
440                // nothing to do
441                if (mCurrentState != IDLESTATE) {
442                    log.debug("Setting IDLESTATE");
443                    log.debug("Current Mode {}", mCurrentMode);
444                    mCurrentState = IDLESTATE;
445                }
446                // wait for something to send
447                if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) {
448                    try {
449                        long startTime = Calendar.getInstance().getTimeInMillis();
450                        synchronized (xmtRunnable) {
451                            xmtRunnable.wait(mWaitBeforePoll);
452                        }
453                        long endTime = Calendar.getInstance().getTimeInMillis();
454                        waitTimePoll = waitTimePoll + endTime - startTime;
455                    } catch (InterruptedException e) {
456                        Thread.currentThread().interrupt(); // retain if needed later
457                        // end of transmit loop
458                        break;
459                    }
460                }
461                // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee
462                // the change of mCurrentState to one of the waiting for reply states.  Therefore we need to synchronize.
463                synchronized (this) {
464                    if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) {
465                        log.error("left timeout in unexpected state: {}", mCurrentState);
466                    }
467                    if (mCurrentState == IDLESTATE) {
468                        mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE
469                    }
470                }
471                // went around with nothing to do; leave programming state if in it
472                if (mCurrentMode == PROGRAMINGMODE) {
473                    log.debug("Timeout - in service mode");
474                }
475                if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) {
476                    log.debug("timeout causes leaving programming mode");
477                    mCurrentState = WAITREPLYINNORMMODESTATE;
478                    AbstractMRMessage msg = enterNormalMode();
479                    // if the enterNormalMode() message is null, we
480                    // don't want to try to send it to the port.
481                    if (msg != null) {
482                        forwardToPort(msg, null);
483                        // wait for reply
484                        transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode");
485                        checkReplyInDispatch();
486                        // exit program mode timeout?
487                        if (mCurrentState == WAITREPLYINNORMMODESTATE) {
488                            // entering normal mode via timeout
489                            handleTimeout(msg, l);
490                            mCurrentMode = NORMALMODE;
491                        }
492                        // and go around again
493                    }
494                } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) {
495                    // We may need to poll
496                    AbstractMRMessage msg = pollMessage();
497                    if (msg != null) {
498                        // yes, send that
499                        log.debug("Sending poll, wait time {}", waitTimePoll);
500                        mCurrentState = WAITMSGREPLYSTATE;
501                        forwardToPort(msg, pollReplyHandler());
502                        // wait for reply
503                        log.debug("Still waiting for reply");
504                        transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply");
505                        checkReplyInDispatch();
506                        // and go around again
507                        if (mCurrentState == WAITMSGREPLYSTATE) {
508                            handleTimeout(msg, l);
509                        } else {
510                            resetTimeout(msg);
511                        }
512                    }
513                    waitTimePoll = 0;
514                }
515                // no messages, so back to idle
516                if (mCurrentState == POLLSTATE) {
517                    mCurrentState = IDLESTATE;
518                }
519            }
520        }
521    }   // end of transmit loop; go around again
522
523    protected void transmitWait(int waitTime, int state, String interruptMessage) {
524        // wait() can have spurious wakeup!
525        // so we protect by making sure the entire timeout time is used
526        long currentTime = Calendar.getInstance().getTimeInMillis();
527        long endTime = currentTime + waitTime;
528        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
529            long wait = endTime - currentTime;
530            try {
531                synchronized (xmtRunnable) {
532                    // Do not wait if the current state has changed since we
533                    // last set it.
534                    if (mCurrentState != state) {
535                        return;
536                    }
537                    xmtRunnable.wait(wait); // rcvr normally ends this w state change
538                }
539            } catch (InterruptedException e) {
540                Thread.currentThread().interrupt(); // retain if needed later
541                String[] packages = this.getClass().getName().split("\\.");
542                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
543                        +(packages.length>=1 ? packages[packages.length-1] :"");
544                if (!threadStopRequest) {
545                    log.error("{} in transmitWait(..) of {}", interruptMessage, name);
546                } else {
547                    log.debug("during shutdown, {}  in transmitWait(..) of {}", interruptMessage, name);
548                }
549            }
550        }
551        log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState);
552    }
553
554    // Dispatch control and timer
555    protected boolean replyInDispatch = false;          // true when reply has been received but dispatch not completed
556    private int maxDispatchTime = 0;
557    private int warningMessageTime = DISPATCH_WARNING_TIME;
558    private static final int DISPATCH_WAIT_INTERVAL = 100;
559    private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded
560    private static final int WARN_NEXT_TIME = 1000;         // report every second
561
562    private void checkReplyInDispatch() {
563        int loopCount = 0;
564        while (replyInDispatch) {
565            try {
566                synchronized (xmtRunnable) {
567                    xmtRunnable.wait(DISPATCH_WAIT_INTERVAL);
568                }
569            } catch (InterruptedException e) {
570                Thread.currentThread().interrupt(); // retain if needed later
571                if (threadStopRequest) return; // don't log an error if closing.
572                String[] packages = this.getClass().getName().split("\\.");
573                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
574                        +(packages.length>=1 ? packages[packages.length-1] :"");
575                log.error("transmitLoop interrupted in class {}", name);
576            }
577            loopCount++;
578            int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL;
579            if (currentDispatchTime > maxDispatchTime) {
580                maxDispatchTime = currentDispatchTime;
581                if (currentDispatchTime >= warningMessageTime) {
582                    warningMessageTime = warningMessageTime + WARN_NEXT_TIME;
583                    log.debug("Max dispatch time is now {}", currentDispatchTime);
584                }
585            }
586        }
587    }
588
589    /**
590     *  Determine if the interface is down.
591     *
592     *  @return timeoutFlag
593     */
594    public boolean hasTimeouts() {
595        return timeoutFlag;
596    }
597
598    protected boolean timeoutFlag = false;
599    protected int timeouts = 0;
600    protected boolean flushReceiveChars = false;
601
602    protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
603        //log.debug("Timeout mCurrentState: {}", mCurrentState);
604        warnOnTimeout(msg, l);
605        timeouts++;
606        timeoutFlag = true;
607        flushReceiveChars = true;
608    }
609
610    protected void warnOnTimeout(AbstractMRMessage msg, AbstractMRListener l) {
611        String[] packages = this.getClass().getName().split("\\.");
612        String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
613                +(packages.length>=1 ? packages[packages.length-1] :"");
614
615        log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name);
616    }
617    
618    protected void resetTimeout(AbstractMRMessage msg) {
619        if (timeouts > 0) {
620            log.debug("Reset timeout after {} timeouts", timeouts);
621        }
622        timeouts = 0;
623        timeoutFlag = false;
624    }
625
626    /**
627     * Add header to the outgoing byte stream.
628     *
629     * @param msg the output byte stream
630     * @param m Message results
631     * @return next location in the stream to fill
632     */
633    protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) {
634        return 0;
635    }
636
637    protected int mWaitBeforePoll = 100;
638    protected long waitTimePoll = 0;
639
640    /**
641     * Add trailer to the outgoing byte stream.
642     *
643     * @param msg    the output byte stream
644     * @param offset the first byte not yet used
645     * @param m   output message to extend
646     */
647    protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) {
648        if (!m.isBinary()) {
649            msg[offset] = 0x0d;
650        }
651    }
652
653    /**
654     * Determine how many bytes the entire message will take, including
655     * space for header and trailer.
656     *
657     * @param m the message to be sent
658     * @return number of bytes
659     */
660    protected int lengthOfByteStream(AbstractMRMessage m) {
661        int len = m.getNumDataElements();
662        int cr = 0;
663        if (!m.isBinary()) {
664            cr = 1;  // space for return char
665        }
666        return len + cr;
667    }
668
669    protected boolean xmtException = false;
670
671    /**
672     * Actually transmit the next message to the port.
673     * @see #sendMessage(AbstractMRMessage, AbstractMRListener)
674     *
675     * @param m the message to send
676     * @param reply the Listener sending the message, often provided as 'this'
677     */
678    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"},
679            justification = "Two locks needed for synchronization here, this is OK")
680    protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
681        log.debug("forwardToPort message: [{}]", m);
682        // remember who sent this
683        mLastSender = reply;
684
685        // forward the message to the registered recipients,
686        // which includes the communications monitor, except the sender.
687        // Schedule notification via the Swing event queue to ensure order
688        log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback"));
689        Runnable r = new XmtNotifier(m, mLastSender, this);
690        SwingUtilities.invokeLater(r);
691
692        // stream to port in single write, as that's needed by serial
693        int byteLength = lengthOfByteStream(m);
694        byte[]  msg= new byte[byteLength];
695        log.debug("copying message, length = {}", byteLength);
696        // add header
697        int offset = addHeaderToOutput(msg, m);
698
699        // add data content
700        int len = m.getNumDataElements();
701        log.debug("copying data to message, length = {}", len);
702        if (len > byteLength) { // happens somehow
703            log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len);
704        }
705        for (int i = 0; (i < len && i < byteLength); i++) {
706            msg[i + offset] = (byte) m.getElement(i);
707        }
708        // add trailer
709        addTrailerToOutput(msg, len + offset, m);
710        // and stream the bytes
711        try {
712            if (ostream != null) {
713                if (log.isDebugEnabled()) {
714                    StringBuilder f = new StringBuilder();
715                    for (int i = 0; i < msg.length; i++) {
716                        f.append(String.format("%02X ",0xFF & msg[i]));
717                    }
718                    log.debug("formatted message: {}", f.toString() );
719                }
720                while (m.getRetries() >= 0) {
721                    if (portReadyToSend(controller)) {
722                        ostream.write(msg);
723                        ostream.flush();
724                        log.debug("written, msg timeout: {} mSec", m.getTimeout());
725                        break;
726                    } else if (m.getRetries() >= 0) {
727                        log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries());
728                        m.setRetries(m.getRetries() - 1);
729                        try {
730                            synchronized (xmtRunnable) {
731                                xmtRunnable.wait(m.getTimeout());
732                            }
733                        } catch (InterruptedException e) {
734                            Thread.currentThread().interrupt(); // retain if needed later
735                            log.error("retry wait interrupted");
736                        }
737                    } else {
738                        log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg));
739                    }
740                }
741            } else {  // ostream is null
742                // no stream connected
743                connectionWarn();
744            }
745        } catch (IOException | RuntimeException e) {
746            // TODO Currently there's no port recovery if an exception occurs
747            // must restart JMRI to clear xmtException.
748            xmtException = true;
749            portWarn(e);
750        }
751    }
752
753    protected void connectionWarn() {
754        log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception());
755    }
756
757    protected void portWarn(Exception e) {
758        log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e);
759    }
760
761    protected boolean connectionError = false;
762
763    protected void portWarnTCP(Exception e) {
764        log.warn("Exception java net: ", e);
765        connectionError = true;
766    }
767    // methods to connect/disconnect to a source of data in an AbstractPortController
768
769    public AbstractPortController controller = null;
770
771    public boolean status() {
772        return (ostream != null && istream != null);
773    }
774
775    protected volatile Thread xmtThread = null;
776    protected volatile Thread rcvThread = null;
777
778    protected volatile Runnable xmtRunnable = null;
779
780    /**
781     * Make connection to an existing PortController object.
782     *
783     * @param p the PortController
784     */
785    public void connectPort(AbstractPortController p) {
786        rcvException = false;
787        connectionError = false;
788        xmtException = false;
789        threadStopRequest = false;
790        try {
791            istream = p.getInputStream();
792            ostream = p.getOutputStream();
793            if (controller != null) {
794                log.warn("connectPort: connect called while connected");
795            } else {
796                log.debug("connectPort invoked");
797            }
798            controller = p;
799            // and start threads
800            xmtThread = jmri.util.ThreadingUtil.newThread(
801                xmtRunnable = new Runnable() {
802                    @Override
803                    public void run() {
804                        try {
805                            transmitLoop();
806                        } catch (Throwable e) {
807                            if (!threadStopRequest) {
808                                log.error("Transmit thread terminated prematurely by: {}", e, e);
809                            }
810                            // see http://docs.oracle.com/javase/7/docs/api/java/lang/ThreadDeath.html
811                            // ThreadDeath must be thrown per Java API Javadocs
812                            // 
813                            // The type ThreadDeath has been deprecated since version 20 and marked for removal
814                            // and the warning cannot be suppressed in Java 21. But external libraries might
815                            // throw the exception outside of JMRI control. So check the name of the exception
816                            // instead of using "instanceof".
817                            if ("java.lang.ThreadDeath".equals(e.getClass().getName())) {
818                                throw e;
819                            }
820                        }
821                }
822            });
823
824            String[] packages = this.getClass().getName().split("\\.");
825            xmtThread.setName(
826                (packages.length>=2 ? packages[packages.length-2]+"." :"")
827                +(packages.length>=1 ? packages[packages.length-1] :"")
828                +" Transmit thread");
829
830            xmtThread.setDaemon(true);
831            xmtThread.setPriority(Thread.MAX_PRIORITY-1);      //bump up the priority
832            xmtThread.start();
833
834            rcvThread = jmri.util.ThreadingUtil.newThread(
835                new Runnable() {
836                    @Override
837                    public void run() {
838                        receiveLoop();
839                    }
840                });
841            rcvThread.setName(
842                (packages.length>=2 ? packages[packages.length-2]+"." :"")
843                +(packages.length>=1 ? packages[packages.length-1] :"")
844                +" Receive thread");
845
846            rcvThread.setPriority(Thread.MAX_PRIORITY);      //bump up the priority
847            rcvThread.setDaemon(true);
848            rcvThread.start();
849
850        } catch (RuntimeException e) {
851            log.error("Failed to start up communications. Error was: ", e);
852            log.debug("Full trace:", e);
853        }
854    }
855
856    /**
857     * Get the port name for this connection from the TrafficController.
858     *
859     * @return the name of the port
860     */
861    public String getPortName() {
862        return controller.getCurrentPortName();
863    }
864
865    /**
866     * Break connection to existing PortController object. Once broken, attempts
867     * to send via "message" member will fail.
868     *
869     * @param p the PortController
870     */
871    public void disconnectPort(AbstractPortController p) {
872        istream = null;
873        ostream = null;
874        if (controller != p) {
875            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
876        }
877        controller = null;
878        threadStopRequest = true;
879    }
880
881    /**
882     * Check if PortController object can be sent to.
883     *
884     * @param p the PortController
885     * @return true if ready, false otherwise May throw an Exception.
886     */
887    public boolean portReadyToSend(AbstractPortController p) {
888        if (p != null && !xmtException && !rcvException) {
889            return true;
890        } else {
891            return false;
892        }
893    }
894
895    // data members to hold the streams
896    protected DataInputStream istream = null;
897    protected OutputStream ostream = null;
898
899    protected boolean rcvException = false;
900
901    protected int maxRcvExceptionCount = 100;
902
903    /**
904     * Handle incoming characters. This is a permanent loop, looking for input
905     * messages in character form on the stream connected to the PortController
906     * via {@link #connectPort(AbstractPortController)}.
907     * <p>
908     * Each turn of the loop is the receipt of a single message.
909     */
910    public void receiveLoop() {
911        log.debug("receiveLoop starts in {}", this);
912        int errorCount = 0;
913        while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception
914            try {
915                handleOneIncomingReply();
916                errorCount = 0;
917            } catch (java.io.InterruptedIOException e) {
918                // related to InterruptedException, catch first
919                break;
920            } catch (IOException e) {
921                rcvException = true;
922                reportReceiveLoopException(e);
923                break;
924            } catch (RuntimeException e1) {
925                log.error("Exception in receive loop: {}", e1.toString(), e1);
926                errorCount++;
927                if (errorCount == maxRcvExceptionCount) {
928                    rcvException = true;
929                    reportReceiveLoopException(e1);
930                }
931            }
932        }
933        if (!threadStopRequest) { // if e.g. unexpected end
934            ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN);
935            log.debug("Exit from rcv loop in {}", this.getClass());
936            log.info("Exiting receive loop");
937            recovery(); // see if you can restart
938        }
939    }
940
941    /**
942     * Disconnect and reset the current PortController.
943     * Invoked at abnormal ending of receiveLoop.
944     */
945    protected final void recovery() {
946        AbstractPortController adapter = controller;
947        disconnectPort(controller);
948        adapter.recover();
949    }
950
951    /**
952     * Report an error on the receive loop. Separated so tests can suppress, even
953     * though message is asynchronous.
954     * @param e Exception encountered at lower level to trigger error, or null
955     */
956    protected void reportReceiveLoopException(Exception e) {
957        log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e);
958        jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN);
959        if (controller instanceof AbstractNetworkPortController) {
960            portWarnTCP(e);
961        }
962    }
963
964    protected abstract AbstractMRReply newReply();
965
966    protected abstract boolean endOfMessage(AbstractMRReply r);
967
968    /**
969     * Dummy routine, to be filled by protocols that have to skip some
970     * start-of-message characters.
971     * @param istream input source
972     * @throws IOException from underlying operations
973     */
974    protected void waitForStartOfReply(DataInputStream istream) throws IOException {
975    }
976
977    /**
978     * Read a single byte, protecting against various timeouts, etc.
979     * <p>
980     * When a port is set to have a receive timeout, some will return
981     * zero bytes, an EOFException or a InterruptedIOException at the end of the timeout. 
982     * In that case, the read()
983     * should be repeated to get the next real character.
984     *
985     * @param istream stream to read
986     * @return the byte read
987     * @throws java.io.IOException if unable to read
988     */
989    protected byte readByteProtected(DataInputStream istream) throws IOException {
990        if (istream == null) {
991            throw new IOException("Input Stream NULL when reading");
992        }
993        while (true) { // loop will repeat until character found
994            int nchars;
995            // The istream should be configured so that the following
996            // read(..) call only blocks for a short time, e.g. 100msec, if no
997            // data is available.  It's OK if it 
998            // throws e.g. java.io.InterruptedIOException
999            // in that case, as the calling loop should just go around
1000            // and request input again.  This semi-blocking behavior will
1001            // let the terminateThreads() method end this thread cleanly.
1002            nchars = istream.read(rcvBuffer, 0, 1);
1003            if (nchars == -1) {
1004                // No more bytes can be read from the channel
1005                throw new IOException("Connection not terminated normally");
1006            }
1007            if (nchars > 0) {
1008                return rcvBuffer[0];
1009            }
1010        }
1011    }
1012
1013    // Defined this way to reduce new object creation
1014    private byte[] rcvBuffer = new byte[1];
1015
1016    /**
1017     * Get characters from the input source, and file a message.
1018     * <p>
1019     * Returns only when the message is complete.
1020     * <p>
1021     * Only used in the Receive thread.
1022     * <p>
1023     * Handles timeouts on read by ignoring zero-length reads.
1024     *
1025     * @param msg     message to fill
1026     * @param istream character source.
1027     * @throws IOException when presented by the input source.
1028     */
1029    protected void loadChars(AbstractMRReply msg, DataInputStream istream)
1030            throws IOException {
1031        int i;
1032        for (i = 0; i < msg.maxSize(); i++) {
1033            byte char1 = readByteProtected(istream);
1034            log.trace("char: {} i: {}",(char1&0xFF),i);
1035            // if there was a timeout, flush any char received and start over
1036            if (flushReceiveChars) {
1037                log.warn("timeout flushes receive buffer: {}", msg);
1038                msg.flush();
1039                i = 0;  // restart
1040                flushReceiveChars = false;
1041            }
1042            if (canReceive()) {
1043                msg.setElement(i, char1);
1044                if (endOfMessage(msg)) {
1045                    break;
1046                }
1047            } else {
1048                i--; // flush char
1049                log.error("unsolicited character received: {}", Integer.toHexString(char1));
1050            }
1051        }
1052    }
1053
1054    /**
1055     * Override in the system specific code if necessary
1056     *
1057     * @return true if it is okay to buffer receive characters into a reply
1058     *         message. When false, discard char received
1059     */
1060    protected boolean canReceive() {
1061        return true;
1062    }
1063
1064    private int retransmitCount = 0;
1065
1066    /**
1067     * Executes a reply distribution action on the appropriate thread for JMRI.
1068     * @param r a runnable typically encapsulating a MRReply and the iteration code needed to
1069     *          send it to all the listeners.
1070     */
1071    protected void distributeReply(Runnable r) {
1072        try {
1073            if (synchronizeRx) {
1074                SwingUtilities.invokeAndWait(r);
1075            } else {
1076                SwingUtilities.invokeLater(r);
1077            }
1078        } catch (InterruptedException ie) {
1079            if (threadStopRequest) return;
1080            log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString());
1081        } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) {
1082            log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString());
1083            return;
1084        }
1085        log.debug("dispatch thread invoked");
1086    }
1087
1088    /**
1089     * Handle each reply when complete.
1090     * <p>
1091     * (This is public for testing purposes) Runs in the "Receive" thread.
1092     *
1093     * @throws java.io.IOException on error.
1094     */
1095    public void handleOneIncomingReply() throws IOException {
1096        // we sit in this until the message is complete, relying on
1097        // threading to let other stuff happen
1098
1099        // Create message off the right concrete class
1100        AbstractMRReply msg = newReply();
1101
1102        // wait for start if needed
1103        waitForStartOfReply(istream);
1104
1105        // message exists, now fill it
1106        loadChars(msg, istream);
1107
1108        if (threadStopRequest) return;
1109
1110        // message is complete, dispatch it !!
1111        replyInDispatch = true;
1112        log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState);
1113
1114        // forward the message to the registered recipients,
1115        // which includes the communications monitor
1116        // return a notification via the Swing event queue to ensure proper thread
1117        Runnable r = new RcvNotifier(msg, mLastSender, this);
1118        distributeReply(r);
1119
1120        if (!msg.isUnsolicited()) {
1121            // effect on transmit:
1122            switch (mCurrentState) {
1123                case WAITMSGREPLYSTATE: {
1124                    // check to see if the response was an error message we want
1125                    // to automatically handle by re-queueing the last sent
1126                    // message, otherwise go on to the next message
1127                    if (msg.isRetransmittableErrorMsg()) {
1128                        log.error("Automatic Recovery from Error Message: {}.  Retransmitted {} times.", msg, retransmitCount);
1129                        synchronized (xmtRunnable) {
1130                            mCurrentState = AUTORETRYSTATE;
1131                            if (retransmitCount > 0) {
1132                                try {
1133                                    xmtRunnable.wait(retransmitCount * 100L);
1134                                } catch (InterruptedException e) {
1135                                    Thread.currentThread().interrupt(); // retain if needed later
1136                                }
1137                            }
1138                            replyInDispatch = false;
1139                            xmtRunnable.notify();
1140                            retransmitCount++;
1141                        }
1142                    } else {
1143                        // update state, and notify to continue
1144                        synchronized (xmtRunnable) {
1145                            mCurrentState = NOTIFIEDSTATE;
1146                            replyInDispatch = false;
1147                            xmtRunnable.notify();
1148                            retransmitCount = 0;
1149                        }
1150                    }
1151                    break;
1152                }
1153                case WAITREPLYINPROGMODESTATE: {
1154                    // entering programming mode
1155                    mCurrentMode = PROGRAMINGMODE;
1156                    replyInDispatch = false;
1157
1158                    // check to see if we need to delay to allow decoders to become
1159                    // responsive
1160                    int warmUpDelay = enterProgModeDelayTime();
1161                    if (warmUpDelay != 0) {
1162                        try {
1163                            synchronized (xmtRunnable) {
1164                                xmtRunnable.wait(warmUpDelay);
1165                            }
1166                        } catch (InterruptedException e) {
1167                            Thread.currentThread().interrupt(); // retain if needed later
1168                        }
1169                    }
1170                    // update state, and notify to continue
1171                    synchronized (xmtRunnable) {
1172                        mCurrentState = OKSENDMSGSTATE;
1173                        xmtRunnable.notify();
1174                    }
1175                    break;
1176                }
1177                case WAITREPLYINNORMMODESTATE: {
1178                    // entering normal mode
1179                    mCurrentMode = NORMALMODE;
1180                    replyInDispatch = false;
1181                    // update state, and notify to continue
1182                    synchronized (xmtRunnable) {
1183                        mCurrentState = OKSENDMSGSTATE;
1184                        xmtRunnable.notify();
1185                    }
1186                    break;
1187                }
1188                default: {
1189                    replyInDispatch = false;
1190                    if (allowUnexpectedReply) {
1191                        log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg);
1192                        synchronized (xmtRunnable) {
1193                            // The transmit thread sometimes gets stuck
1194                            // when unexpected replies are received.  Notify
1195                            // it to clear the block without a timeout.
1196                            // (do not change the current state)
1197                            //if(mCurrentState!=IDLESTATE)
1198                            xmtRunnable.notify();
1199                        }
1200                    } else {
1201                        unexpectedReplyStateError(mCurrentState, msg.toString());
1202                    }
1203                }
1204            }
1205            // Unsolicited message
1206        } else {
1207            log.debug("Unsolicited Message Received {}", msg);
1208
1209            replyInDispatch = false;
1210        }
1211    }
1212
1213    /**
1214     * Log an error message for a message received in an unexpected state.
1215     * @param State message state.
1216     * @param msgString message string.
1217     */
1218    protected void unexpectedReplyStateError(int State, String msgString) {
1219       String[] packages = this.getClass().getName().split("\\.");
1220       String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
1221                     +(packages.length>=1 ? packages[packages.length-1] :"");
1222       log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name);
1223    }
1224
1225    /**
1226     * for testing purposes, let us be able to find out
1227     * what the last sender was.
1228     * @return last sender, mLastSender.
1229     */
1230    public AbstractMRListener getLastSender() {
1231        return mLastSender;
1232    }
1233
1234    protected void terminate() {
1235        log.debug("Cleanup Starts");
1236        if (ostream == null) {
1237            return;    // no connection established
1238        }
1239        AbstractMRMessage modeMsg = enterNormalMode();
1240        if (modeMsg != null) {
1241            modeMsg.setRetries(100); // set the number of retries
1242            // high, just in case the interface
1243            // is busy when we try to send
1244            forwardToPort(modeMsg, null);
1245            // wait for reply
1246            try {
1247                if (xmtRunnable != null) {
1248                    synchronized (xmtRunnable) {
1249                        xmtRunnable.wait(modeMsg.getTimeout());
1250                    }
1251                }
1252            } catch (InterruptedException e) {
1253                Thread.currentThread().interrupt(); // retain if needed later
1254                log.error("transmit interrupted");
1255            }
1256        }
1257    }
1258
1259    /**
1260     * Internal class to remember the Reply object and destination listener with
1261     * a reply is received.
1262     */
1263    protected static class RcvNotifier implements Runnable {
1264
1265        AbstractMRReply mMsg;
1266        AbstractMRListener mDest;
1267        AbstractMRTrafficController mTc;
1268
1269        public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1270                AbstractMRTrafficController pTc) {
1271            mMsg = pMsg;
1272            mDest = pDest;
1273            mTc = pTc;
1274        }
1275
1276        @Override
1277        public void run() {
1278            log.debug("Delayed rcv notify starts");
1279            mTc.notifyReply(mMsg, mDest);
1280        }
1281    } // end RcvNotifier
1282
1283    // allow creation of object outside package
1284    protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1285            AbstractMRTrafficController pTc) {
1286        return new RcvNotifier(pMsg, pDest, pTc);
1287    }
1288
1289    /**
1290     * Internal class to remember the Message object and destination listener
1291     * when a message is queued for notification.
1292     */
1293    protected static class XmtNotifier implements Runnable {
1294
1295        AbstractMRMessage mMsg;
1296        AbstractMRListener mDest;
1297        AbstractMRTrafficController mTc;
1298
1299        public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest,
1300                AbstractMRTrafficController pTc) {
1301            mMsg = pMsg;
1302            mDest = pDest;
1303            mTc = pTc;
1304        }
1305
1306        @Override
1307        public void run() {
1308            log.debug("Delayed xmt notify starts");
1309            mTc.notifyMessage(mMsg, mDest);
1310        }
1311    }  // end XmtNotifier
1312
1313    /**
1314     * Terminate the receive and transmit threads.
1315     * <p>
1316     * This is intended to be used only by testing subclasses.
1317     */
1318    public void terminateThreads() {
1319        threadStopRequest = true;
1320        if (xmtThread != null) {
1321            xmtThread.interrupt();
1322            try {
1323                xmtThread.join(150);
1324            } catch (InterruptedException ie){
1325                // interrupted during cleanup.
1326            }
1327        }
1328
1329        if (rcvThread != null) {
1330            rcvThread.interrupt();
1331            try {
1332                rcvThread.join(150);
1333            } catch (InterruptedException ie){
1334                // interrupted during cleanup.
1335            }
1336        }
1337        // we also need to remove the shutdown task.
1338        InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask);
1339    }
1340
1341    /**
1342     * Flag that threads should terminate as soon as they can.
1343     */
1344    protected volatile boolean threadStopRequest = false;
1345
1346    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class);
1347
1348}