001package jmri.jmrix.mrc;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.Arrays;
006import java.util.Calendar;
007import java.util.Date;
008import java.util.Iterator;
009import java.util.LinkedList;
010import org.slf4j.Logger;
011import org.slf4j.LoggerFactory;
012
013/**
014 * Converts Stream-based I/O to/from Mrc messages. The "MrcInterface" side
015 * sends/receives MrcMessage objects. The connection to a MrcPortController is
016 * via a pair of *Streams, which then carry sequences of characters for
017 * transmission.
018 * <p>
019 * This is based upon the Packetizer used for LocoNet Connections due to its
020 * speed and efficiency to handle messages. This also takes some code from the
021 * AbstractMRTrafficController, when dealing with handling replies to messages
022 * sent.
023 *
024 * The MRC Command Station sends out a poll message to each handset which then
025 * has approximately 20ms to initially respond with a request. Otherwise the
026 * Command Station will poll the next handset.
027 *
028 * <p>
029 * Messages come to this via the main GUI thread, and are forwarded back to
030 * listeners in that same thread. Reception and transmission are handled in
031 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
032 * classes defined here. The thread priorities are:
033 * <ul>
034 * <li> RcvHandler - at highest available priority
035 * <li> XmtHandler - down one, which is assumed to be above the GUI
036 * <li> (everything else)
037 * </ul>
038 * <p>
039 * Some of the message formats used in this class are Copyright MRC, Inc. and
040 * used with permission as part of the JMRI project. That permission does not
041 * extend to uses in other software products. If you wish to use this code,
042 * algorithm or these message formats outside of JMRI, please contact Mrc Inc
043 * for separate permission.
044 *
045 * @author Bob Jacobsen Copyright (C) 2001
046 * @author Kevin Dickerson Copyright (C) 2014
047 * @author Ken Cameron Copyright (C) 2014
048 *
049 */
050public class MrcPacketizer extends MrcTrafficController {
051
052    /**
053     * true if the external hardware is not echoing messages, so we must
054     */
055    protected boolean echo = true;  // echo messages here, instead of in hardware
056
057    public MrcPacketizer() {
058    }
059
060    // The methods to implement the MrcInterface
061    @Override
062    public boolean status() {
063        return (ostream != null && istream != null);
064    }
065
066    /**
067     * Synchronized list used as a transmit queue.
068     * <p>
069     * This is public to allow access from the internal class(es) when compiling
070     * with Java 1.1
071     */
072    public LinkedList<MrcMessage> xmtList = new LinkedList<MrcMessage>();
073
074    /**
075     * XmtHandler (a local class) object to implement the transmit thread
076     */
077    protected Runnable xmtHandler;
078
079    /**
080     * RcvHandler (a local class) object to implement the receive thread
081     */
082    protected Runnable rcvHandler;
083
084    /**
085     * Forward a preformatted MrcMessage to the actual interface.
086     *
087     * The message is converted to a byte array and queue for transmission
088     *
089     * @param m Message to send;
090     */
091    @Override
092    public void sendMrcMessage(MrcMessage m) {
093        // update statistics
094        transmittedMsgCount++;
095
096        //Convert the message to a byte stream, to save doing this when the message
097        //is picked out
098        m.setByteStream();
099
100        if (log.isDebugEnabled()) { // avoid String building if not needed
101            log.debug("queue Mrc packet: {}", m.toString());
102        }
103        // in an atomic operation, queue the request and wake the xmit thread
104        try {
105            synchronized (xmtHandler) {
106                xmtList.addLast(m);
107                if (log.isDebugEnabled()) { // avoid String building if not needed
108                    log.debug("xmt list size {}", xmtList.size()); // NOI18N
109                    Iterator<MrcMessage> iterator = xmtList.iterator();
110                    while (iterator.hasNext()) {
111                        log.debug("  entry: {}",iterator.next().toString());
112                    }
113                }
114            }
115        } catch (RuntimeException e) {
116            log.warn("passing to xmit: unexpected exception", e); // NOI18N
117        }
118    }
119
120    /**
121     * Implement abstract method to signal if there's a backlog of information
122     * waiting to be sent.
123     *
124     * @return true if busy, false if nothing waiting to send
125     */
126    @Override
127    public boolean isXmtBusy() {
128        if (controller == null) {
129            return false;
130        }
131
132        return (!controller.okToSend());
133    }
134
135    // methods to connect/disconnect to a source of data in a MrcPortController
136    // This is public to allow access from the internal class(es) when compiling with Java 1.1
137    public MrcPortController controller = null;
138
139    /**
140     * Make connection to existing MrcPortController object.
141     *
142     * @param p Port controller for connected. Save this for a later disconnect
143     *          call
144     */
145    public void connectPort(MrcPortController p) {
146        istream = p.getInputStream();
147        ostream = p.getOutputStream();
148        if (controller != null) {
149            log.warn("connectPort: connect called while connected"); // NOI18N
150        }
151        controller = p;
152    }
153
154    /**
155     * Break connection to existing MrcPortController object. Once broken,
156     * attempts to send via "message" member will fail.
157     *
158     * @param p previously connected port
159     */
160    public void disconnectPort(MrcPortController p) {
161        istream = null;
162        ostream = null;
163        if (controller != p) {
164            log.warn("disconnectPort: disconnect called from non-connected MrcPortController"); // NOI18N
165        }
166        controller = null;
167    }
168
169    // data members to hold the streams. These are public so the inner classes defined here
170    // can access whem with a Java 1.1 compiler
171    public DataInputStream istream = null;
172    public OutputStream ostream = null;
173
174    //We keep a copy of the lengths here to save on time on each request later.
175    final private static int THROTTLEPACKETLENGTH = MrcPackets.getThrottlePacketLength();
176    final private static int FUNCTIONGROUPLENGTH = MrcPackets.getFunctionPacketLength();
177    final private static int READCVLENGTH = MrcPackets.getReadCVPacketLength();
178    final private static int readCVReplyLength = MrcPackets.getReadCVPacketReplyLength();
179    final private static int readDecoderAddressLength = MrcPackets.getReadDecoderAddressLength();
180    final private static int WRITECVPROGLENGTH = MrcPackets.getWriteCVPROGPacketLength();
181    final private static int WRITECVPOMLENGTH = MrcPackets.getWriteCVPOMPacketLength();
182    final private static int SETCLOCKRATIOLENGTH = MrcPackets.getSetClockRatioPacketLength();
183    final private static int SETCLOCKTIMELENGTH = MrcPackets.getSetClockTimePacketLength();
184    final private static int setClockAMPMLength = MrcPackets.getSetClockAmPmPacketLength();
185    final private static int powerOnLength = MrcPackets.getPowerOnPacketLength();
186    final private static int powerOffLength = MrcPackets.getPowerOffPacketLength();
187
188    final private static int addToConsistLength = MrcPackets.getClearConsistPacketLength();
189    final private static int clearConsistLength = MrcPackets.getClearConsistPacketLength();
190    final private static int routeControlLength = MrcPackets.getRouteControlPacketLength();
191    final private static int clearRouteLength = MrcPackets.getClearRoutePacketLength();
192    final private static int addToRouteLength = MrcPackets.getAddToRoutePacketLength();
193    final private static int accessoryLength = MrcPackets.getAccessoryPacketLength();
194
195    /**
196     * Read a single byte, protecting against various timeouts, etc.
197     * <p>
198     * When a port is set to have a receive timeout (via the
199     * enableReceiveTimeout() method), some will return zero bytes or an
200     * EOFException at the end of the timeout. In that case, the read should be
201     * repeated to get the next real character.
202     * @param istream data input stream from layout
203     * @return byte stream from interface
204     * @throws java.io.IOException from read errors
205     *
206     */
207    protected byte readByteProtected(DataInputStream istream) throws java.io.IOException {
208        while (true) { // loop will repeat until character found
209            int nchars;
210            nchars = istream.read(rcvBuffer, 0, 1);
211            if (nchars > 0) {
212                return rcvBuffer[0];
213            }
214        }
215    }
216    // Defined this way to reduce new object creation
217    private byte[] rcvBuffer = new byte[1];
218    //boolean xmtWindow = false;
219
220    /**
221     * Captive class to handle incoming characters. This is a permanent loop,
222     * looking for input messages in character form on the stream connected to
223     * the MrcPortController via <code>connectPort</code>.
224     */
225    class RcvHandler implements Runnable {
226
227        /**
228         * Remember the MrcPacketizer object
229         */
230        MrcPacketizer trafficController;
231
232        public RcvHandler(MrcPacketizer lt) {
233            trafficController = lt;
234        }
235
236        @Override
237        public void run() {
238            int firstByte;
239            int secondByte;
240            int thirdByte;
241            while (true) {   // loop permanently, program close will exit
242                try {
243                    firstByte = readByteProtected(istream) & 0xFF;
244                    secondByte = readByteProtected(istream) & 0xFF;
245                    thirdByte = readByteProtected(istream) & 0xFF;
246                    // start by looking for command -  skip if bit not set or byte 1 & 3 don't match.
247                    while (secondByte != 0x00 && secondByte != 0x01 || firstByte != thirdByte) {
248                        if (firstByte == 0x00 && secondByte == 0x01) {
249                            //Only a clock message has the first & thirdbyte different
250                            break;
251                        }
252                        log.debug("Skipping: {} {} {}", Integer.toHexString(firstByte), Integer.toHexString(secondByte), Integer.toHexString(thirdByte));
253                        firstByte = secondByte;
254                        secondByte = thirdByte;
255                        thirdByte = readByteProtected(istream) & 0xFF;
256                    }
257                    final Date time = new Date();
258                    log.trace(" (RcvHandler) Start message with message: {} {}", Integer.toHexString(firstByte), Integer.toHexString(secondByte));
259                    MrcMessage msg = null;
260                    boolean pollForUs = false;
261
262                    if (secondByte == 0x01) {
263
264                        msg = new MrcMessage(6);
265                        msg.setMessageClass(MrcInterface.POLL);
266                        //msg.setPollMessage();
267                        if (firstByte == cabAddress) {
268                            pollForUs = true;
269                        } else if (mCurrentState == WAITFORCMDRECEIVED) {
270                            log.debug("Missed our poll slot");
271                            synchronized (transmitLock) {
272                                mCurrentState = MISSEDPOLL;
273                                transmitLock.notify();
274                            }
275                        }
276                        if (firstByte == 0x00) {
277                            msg.setMessageClass(MrcInterface.CLOCK + MrcInterface.POLL);
278                        }
279                    } else {
280                        switch (firstByte) {
281                            case 0:/* 2 No Data Poll */
282
283                                msg = new MrcMessage(4);
284                                msg.setMessageClass(MrcInterface.POLL);
285                                break;
286                            case MrcPackets.THROTTLEPACKETCMD:
287                                msg = new MrcMessage(THROTTLEPACKETLENGTH);
288                                msg.setMessageClass(MrcInterface.THROTTLEINFO);
289                                break;
290                            //$FALL-THROUGH$
291                            case MrcPackets.FUNCTIONGROUP1PACKETCMD:
292                            case MrcPackets.FUNCTIONGROUP2PACKETCMD:
293                            case MrcPackets.FUNCTIONGROUP3PACKETCMD:
294                            case MrcPackets.FUNCTIONGROUP4PACKETCMD:
295                            case MrcPackets.FUNCTIONGROUP5PACKETCMD:
296                            case MrcPackets.FUNCTIONGROUP6PACKETCMD:
297                                msg = new MrcMessage(FUNCTIONGROUPLENGTH);
298                                msg.setMessageClass(MrcInterface.THROTTLEINFO);
299                                break;
300                            case MrcPackets.READCVCMD:
301                                msg = new MrcMessage(READCVLENGTH);
302                                msg.setMessageClass(MrcInterface.PROGRAMMING);
303                                log.debug("Read CV Cmd");
304                                break;
305                            case MrcPackets.READDECODERADDRESSCMD:
306                                msg = new MrcMessage(readDecoderAddressLength);
307                                msg.setMessageClass(MrcInterface.PROGRAMMING);
308                                break;
309                            case MrcPackets.WRITECVPROGCMD:
310                                msg = new MrcMessage(WRITECVPROGLENGTH);
311                                msg.setMessageClass(MrcInterface.PROGRAMMING);
312                                break;
313                            case MrcPackets.WRITECVPOMCMD:
314                                msg = new MrcMessage(WRITECVPOMLENGTH);
315                                msg.setMessageClass(MrcInterface.PROGRAMMING);
316                                break;
317                            case MrcPackets.SETCLOCKRATIOCMD:
318                                msg = new MrcMessage(SETCLOCKRATIOLENGTH);
319                                msg.setMessageClass(MrcInterface.CLOCK);
320                                break;
321                            case MrcPackets.SETCLOCKTIMECMD:
322                                msg = new MrcMessage(SETCLOCKTIMELENGTH);
323                                msg.setMessageClass(MrcInterface.CLOCK);
324                                break;
325                            case MrcPackets.SETCLOCKAMPMCMD:
326                                msg = new MrcMessage(setClockAMPMLength);
327                                msg.setMessageClass(MrcInterface.CLOCK);
328                                break;
329                            case MrcPackets.READCVHEADERREPLYCODE:
330                                msg = new MrcMessage(readCVReplyLength);
331                                msg.setMessageClass(MrcInterface.PROGRAMMING);
332                                synchronized (transmitLock) {
333                                    mCurrentState = IDLESTATE;
334                                    transmitLock.notify();
335                                }
336                                log.debug("CV read reply");
337                                break;
338                            case MrcPackets.PROGCMDSENTCODE:
339                                log.debug("Gd Prog Cmd Sent");
340                                synchronized (transmitLock) {
341                                    mCurrentState = IDLESTATE;
342                                    transmitLock.notify();
343                                }
344                                msg = new MrcMessage(4);
345                                msg.setMessageClass(MrcInterface.PROGRAMMING);
346                                break;
347                            case MrcPackets.POWERONCMD:
348                                mCurrentState = IDLESTATE;
349                                msg = new MrcMessage(powerOnLength);
350                                msg.setMessageClass(MrcInterface.POWER);
351                                break;
352                            case MrcPackets.POWEROFFCMD:
353                                mCurrentState = IDLESTATE;
354                                msg = new MrcMessage(powerOffLength);
355                                msg.setMessageClass(MrcInterface.POWER);
356                                break;
357                            case MrcPackets.ADDTOCONSISTPACKETCMD:
358                                mCurrentState = IDLESTATE;
359                                msg = new MrcMessage(addToConsistLength);
360                                msg.setMessageClass(MrcInterface.THROTTLEINFO);
361                                break;
362                            case MrcPackets.CLEARCONSISTPACKETCMD:
363                                mCurrentState = IDLESTATE;
364                                msg = new MrcMessage(clearConsistLength);
365                                msg.setMessageClass(MrcInterface.THROTTLEINFO);
366                                break;
367                            case MrcPackets.ROUTECONTROLPACKETCMD:
368                                mCurrentState = IDLESTATE;
369                                msg = new MrcMessage(routeControlLength);
370                                msg.setMessageClass(MrcInterface.TURNOUTS);
371                                break;
372                            case MrcPackets.CLEARROUTEPACKETCMD:
373                                mCurrentState = IDLESTATE;
374                                msg = new MrcMessage(clearRouteLength);
375                                msg.setMessageClass(MrcInterface.TURNOUTS);
376                                break;
377                            case MrcPackets.ADDTOROUTEPACKETCMD:
378                                mCurrentState = IDLESTATE;
379                                msg = new MrcMessage(addToRouteLength);
380                                msg.setMessageClass(MrcInterface.TURNOUTS);
381                                break;
382                            case MrcPackets.ACCESSORYPACKETCMD:
383                                mCurrentState = IDLESTATE;
384                                msg = new MrcMessage(accessoryLength);
385                                msg.setMessageClass(MrcInterface.TURNOUTS);
386                                break;
387                            case MrcPackets.LOCODBLCONTROLCODE:
388                                synchronized (transmitLock) {
389                                    mCurrentState = DOUBLELOCOCONTROL;
390                                    transmitLock.notify();
391                                }
392                                msg = new MrcMessage(4);
393                                msg.setMessageClass(MrcInterface.THROTTLEINFO);
394                                break;
395                            case MrcPackets.LOCOSOLECONTROLCODE:
396                                mCurrentState = IDLESTATE;
397                                msg = new MrcMessage(4);
398                                msg.setMessageClass(MrcInterface.THROTTLEINFO);
399                                synchronized (transmitLock) {
400                                    mCurrentState = IDLESTATE;
401                                    transmitLock.notify();
402                                }
403                                break;
404                            case MrcPackets.GOODCMDRECEIVEDCODE:      //Possibly shouldn't change the state, as we wait for further confirmation.
405                                if (mCurrentState == CONFIRMATIONONLY) {
406                                    synchronized (transmitLock) {
407                                        mCurrentState = IDLESTATE;
408                                        transmitLock.notify();
409                                    }
410                                }
411                                msg = new MrcMessage(4);
412                                break;
413                            case MrcPackets.BADCMDRECEIVEDCODE:
414                                mCurrentState = BADCOMMAND;
415                                msg = new MrcMessage(4);
416                                break;
417                            default:
418                                msg = new MrcMessage(4); //Unknown
419                                log.debug("UNKNOWN {}", Integer.toHexString(firstByte)); // NOI18N
420                        }
421                    }
422
423                    msg.setElement(0, firstByte);
424                    msg.setElement(1, secondByte);
425                    msg.setElement(2, thirdByte);
426                    // message exists, now fill it
427                    int len = msg.getNumDataElements();
428                    log.trace("len: {}", len);
429                    for (int i = 3; i < len; i++) {
430                        // check for message-blocking error
431                        int b = readByteProtected(istream) & 0xFF;
432                        msg.setElement(i, b);
433                        log.trace("char {} is: {}", i, Integer.toHexString(b));
434                    }
435                    /*Slight trade off with this we may see any transmitted message go out prior to the
436                     poll message being passed to the monitor. */
437                    if (pollForUs) {
438                        synchronized (xmtHandler) {
439                            xmtHandler.notify(); //This will notify the xmt to send a message, even if it is only "no Data" reply
440                        }
441                    }
442
443                    if ((msg.getMessageClass() & MrcInterface.POLL) != MrcInterface.POLL && msg.getNumDataElements() > 6) {
444                        if (!msg.validCheckSum()) {
445                            log.warn("Ignore Mrc packet with bad checksum: {}", msg); // NOI18N
446                            throw new MrcMessageException();
447                        } else {
448                            for (int i = 1; i < msg.getNumDataElements(); i += 2) {
449                                if (msg.getElement(i) != 0x00) {
450                                    log.warn("Ignore Mrc packet with bad bit: {}", msg); // NOI18N
451                                    throw new MrcMessageException();
452                                }
453                            }
454                        }
455                    }
456                    // message is complete, dispatch it !!
457                    {
458                        log.trace("queue message for notification: {}", msg);
459                        final MrcMessage thisMsg = msg;
460                        final MrcPacketizer thisTc = trafficController;
461                        // return a notification via the queue to ensure end
462                        Runnable r = new Runnable() {
463                            MrcMessage msgForLater = thisMsg;
464                            MrcPacketizer myTc = thisTc;
465
466                            @Override
467                            public void run() {
468                                myTc.notifyRcv(time, msgForLater);
469                            }
470                        };
471                        javax.swing.SwingUtilities.invokeLater(r);
472                    }
473                    // done with this one
474                } catch (MrcMessageException e) {
475                    // just let it ride for now
476                    log.warn("run: unexpected MrcMessageException", e); // NOI18N
477                } catch (java.io.EOFException e) {
478                    // posted from idle port when enableReceiveTimeout used
479                    log.trace("EOFException, is Mrc serial I/O using timeouts?");
480                } catch (java.io.IOException e) {
481                    // fired when write-end of HexFile reaches end
482                    log.debug("IOException, should only happen with HexFile", e);
483                    disconnectPort(controller);
484                    return;
485                } // normally, we don't catch RuntimeException, but in this
486                // permanently running loop it seems wise.
487                catch (RuntimeException e) {
488                    log.warn("Unknown Exception", e);  // NOI18N
489                }
490            } // end of permanent loop
491        }
492    }
493
494    final static int IDLESTATE = 0x00;
495    final static int WAITFORCMDRECEIVED = 0x01;
496    final static int DOUBLELOCOCONTROL = 0x02;
497    final static int MISSEDPOLL = 0x04;
498    final static int BADCOMMAND = 0x08;
499    final static int CONFIRMATIONONLY = 0x10;
500    int mCurrentState = IDLESTATE;
501
502    int consecutiveMissedPolls = 0;
503
504    final MrcMessage noData = MrcMessage.setNoData();
505    final byte noDataMsg[] = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00};
506
507    /**
508     * Captive class to handle transmission
509     */
510    class XmtHandler implements Runnable {
511
512        @Override
513        public void run() {
514            byte msg[];
515            MrcMessage m;
516            int x = 0;
517            int state = WAITFORCMDRECEIVED;
518            while (true) {   // loop permanently
519                m = noData;
520                msg = noDataMsg;
521                log.trace("check for input");
522                synchronized (this) {
523                    log.trace("start wait");
524                    //log.info("wait until we have been polled");
525                    new jmri.util.WaitHandler(this);  // handle synchronization, spurious wake, interruption
526                    log.trace("end wait");
527
528                    if (xmtList.size() != 0) {
529                        m = xmtList.removeFirst();
530                        msg = m.getByteStream();
531                        log.debug("xmt list size after get {}", xmtList.size());
532                        log.debug("Message to send on {}", m);
533                    }
534                }
535                try {
536                    if (m.getMessageClass() != MrcInterface.POLL) {
537                        mCurrentState = WAITFORCMDRECEIVED;
538                        /* We set the current state before transmitting the message otherwise
539                         the reply to the message may be received before the state is set
540                         and the message will timeout and be retransmitted */
541                        if (!m.isReplyExpected()) {
542                            mCurrentState = CONFIRMATIONONLY;
543                        }
544                        state = mCurrentState;
545                    }
546                    ostream.write(msg);
547                    ostream.flush();
548                    messageTransmitted(m);
549                    if (m.getMessageClass() != MrcInterface.POLL) {
550                        if (log.isTraceEnabled()) { // avoid String building if not needed
551                            log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg));
552                            log.trace("wait : {} : {}", m.getTimeout(), x);
553                        }
554                        transmitWait(m.getTimeout(), state, "transmitLoop interrupted", x); // NOI18N
555                        x++;
556                    } else {
557                        mCurrentState = IDLESTATE;
558                    }
559
560                    if (mCurrentState == WAITFORCMDRECEIVED || mCurrentState == CONFIRMATIONONLY) {
561                        log.debug("Timed out");
562                        if (m.getRetries() >= 0) {
563                            m.setRetries(m.getRetries() - 1);
564                            synchronized (this) {
565                                xmtList.addFirst(m);
566                            }
567                        } else {
568                            messageFailed(m);
569                        }
570                        mCurrentState = IDLESTATE;
571                        consecutiveMissedPolls = 0;
572                    } else if (mCurrentState == MISSEDPOLL && m.getRetries() >= 0) {
573                        consecutiveMissedPolls++;
574                        log.debug("Missed add to front");
575                        if (consecutiveMissedPolls < 5) {
576                            synchronized (this) {
577                                xmtList.addFirst(m);
578                                mCurrentState = IDLESTATE;
579                                if (log.isDebugEnabled()) { // avoid String building if not needed
580                                    log.debug("xmt list size {}", xmtList.size());
581                                    Iterator<MrcMessage> iterator = xmtList.iterator();
582                                    while (iterator.hasNext()) {
583                                        log.debug("message: {}", iterator.next().toString());
584                                    }
585                                }
586                            }
587                        } else {
588                            log.warn("Message missed {} polls for message {}", consecutiveMissedPolls, m); // NOI18N
589                            consecutiveMissedPolls = 0;
590                        }
591                    } else if (mCurrentState == DOUBLELOCOCONTROL && m.getRetries() >= 0) {
592                        if (log.isDebugEnabled()) { // avoid String building if not needed
593                            log.debug("Auto Retry send message added back to queue: {}", Arrays.toString(msg));
594                        }
595                        m.setRetries(m.getRetries() - 1);
596                        synchronized (this) {
597                            xmtList.addFirst(m);
598                            mCurrentState = IDLESTATE;
599                        }
600                        consecutiveMissedPolls = 0;
601                    } else if (mCurrentState == BADCOMMAND) {
602                        log.debug("Bad command sent");
603                        messageFailed(m);
604                        mCurrentState = IDLESTATE;
605                        consecutiveMissedPolls = 0;
606                    }
607                } catch (java.io.IOException e) {
608                    log.warn("sendMrcMessage: IOException", e); // NOI18N
609                }
610            }
611        }
612    }
613
614    static final Object transmitLock = new Object();
615
616    @edu.umd.cs.findbugs.annotations.SuppressFBWarnings( value = "SLF4J_FORMAT_SHOULD_BE_CONST",
617        justification = "passing InterruptMessage unchanged")
618    protected void transmitWait(int waitTime, int state, String InterruptMessage, int x) {
619        // wait() can have spurious wakeup!
620        // so we protect by making sure the entire timeout time is used
621        long currentTime = Calendar.getInstance().getTimeInMillis();
622        long endTime = currentTime + waitTime;
623        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
624            long wait = endTime - currentTime;
625            try {
626                synchronized (transmitLock) {
627                    // Do not wait if the current state has changed since we
628                    // last set it.
629                    if (mCurrentState != state) {
630                        return;
631                    }
632                    transmitLock.wait(wait); // rcvr normally ends this w state change
633                }
634            } catch (InterruptedException e) {
635                Thread.currentThread().interrupt(); // retain if needed later
636                log.error(InterruptMessage);
637            }
638        }
639        log.debug("Timeout in transmitWait {}, mCurrentState: {} after {}", x, mCurrentState, waitTime);
640    }
641
642    protected void messageFailed(MrcMessage m) {
643        log.debug("message transmitted");
644        if (m.getSource() == null) {
645            return;
646        }
647        // message is queued for transmit, echo it when needed
648        // return a notification via the queue to ensure end
649        javax.swing.SwingUtilities.invokeLater(new Failed(new Date(), m));
650
651    }
652
653    static class Failed implements Runnable {
654
655        Failed(Date _timestamp, MrcMessage m) {
656            msgForLater = m;
657            timestamp = _timestamp;
658        }
659        MrcMessage msgForLater;
660        Date timestamp;
661
662        @Override
663        public void run() {
664            msgForLater.getSource().notifyFailedXmit(timestamp, msgForLater);
665        }
666    }
667
668    /**
669     * When a message is finally transmitted, forward it to listeners if echoing
670     * is needed.
671     *
672     * @param msg message to tag a transmitted message
673     */
674    protected void messageTransmitted(MrcMessage msg) {
675        //if (debug) log.debug("message transmitted");
676        if (!echo) {
677            return;
678        }
679        // message is queued for transmit, echo it when needed
680        // return a notification via the queue to ensure end
681        javax.swing.SwingUtilities.invokeLater(new Echo(this, new Date(), msg));
682    }
683
684    static class Echo implements Runnable {
685
686        Echo(MrcPacketizer t, Date _timestamp, MrcMessage m) {
687            myTc = t;
688            msgForLater = m;
689            timestamp = _timestamp;
690        }
691        MrcMessage msgForLater;
692        MrcPacketizer myTc;
693        Date timestamp;
694
695        @Override
696        public void run() {
697            myTc.notifyXmit(timestamp, msgForLater);
698        }
699    }
700
701    /**
702     * Invoked at startup to start the threads needed here.
703     */
704    public void startThreads() {
705        int priority = Thread.currentThread().getPriority();
706        log.debug("startThreads current priority = {} max available = " + Thread.MAX_PRIORITY + " default = " + Thread.NORM_PRIORITY + " min available = " + Thread.MIN_PRIORITY, priority); // NOI18N
707
708        // make sure that the xmt priority is no lower than the current priority
709        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY : Thread.MAX_PRIORITY - 1);
710        // start the XmtHandler in a thread of its own
711        if (xmtHandler == null) {
712            xmtHandler = new XmtHandler();
713        }
714        Thread xmtThread = new Thread(xmtHandler, "Mrc transmit handler"); // NOI18N
715        log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N
716        xmtThread.setDaemon(true);
717        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
718        xmtThread.start();
719
720        // start the RcvHandler in a thread of its own
721        if (rcvHandler == null) {
722            rcvHandler = new RcvHandler(this);
723        }
724        Thread rcvThread = new Thread(rcvHandler, "Mrc receive handler " + Thread.MAX_PRIORITY); // NOI18N
725        rcvThread.setDaemon(true);
726        rcvThread.setPriority(Thread.MAX_PRIORITY);
727        rcvThread.start();
728
729    }
730
731    private final static Logger log = LoggerFactory.getLogger(MrcPacketizer.class);
732}