001package jmri.jmrix.loconet;
002
003import java.io.DataInputStream;
004import java.io.EOFException;
005import java.io.OutputStream;
006import java.util.concurrent.LinkedTransferQueue;
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009
010/**
011 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface"
012 * side sends/receives LocoNetMessage objects. The connection to a
013 * LnPortController is via a pair of *Streams, which then carry sequences of
014 * characters for transmission.
015 * <p>
016 * Messages come to this via the main GUI thread, and are forwarded back to
017 * listeners in that same thread. Reception and transmission are handled in
018 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
019 * classes defined here. The thread priorities are:
020 * <ul>
021 *   <li> RcvHandler - at highest available priority
022 *   <li> XmtHandler - down one, which is assumed to be above the GUI
023 *   <li> (everything else)
024 * </ul>
025 * Some of the message formats used in this class are Copyright Digitrax, Inc.
026 * and used with permission as part of the JMRI project. That permission does
027 * not extend to uses in other software products. If you wish to use this code,
028 * algorithm or these message formats outside of JMRI, please contact Digitrax
029 * Inc for separate permission.
030 *
031 * @author Bob Jacobsen Copyright (C) 2001, 2018
032 * @author B. Milhaupt  Copyright (C) 2020
033 */
034public class LnPacketizer extends LnTrafficController {
035
036    /**
037     * True if the external hardware is not echoing messages, so we must.
038     */
039    protected boolean echo = false;  // true = echo messages here, instead of in hardware
040
041    public LnPacketizer(LocoNetSystemConnectionMemo m) {
042        // set the memo to point here
043        memo = m;
044        m.setLnTrafficController(this);
045    }
046
047    // The methods to implement the LocoNetInterface
048
049    /**
050     * {@inheritDoc}
051     */
052    @Override
053    public boolean status() {
054        boolean returnVal = ( ostream != null && istream != null
055                && xmtThread != null && xmtThread.isAlive() && xmtHandler != null
056                && rcvThread != null && rcvThread.isAlive() && rcvHandler != null
057                );
058        return returnVal;
059    }
060
061    /**
062     * Synchronized list used as a transmit queue.
063     */
064    protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>();
065
066    /**
067     * XmtHandler (a local class) object to implement the transmit thread.
068     * <p>
069     * We create this object in startThreads() as each packetizer uses different handlers.
070     * So long as the object is created before using it to sync it works.
071     *
072     */
073    protected Runnable xmtHandler = null;
074
075    /**
076     * RcvHandler (a local class) object to implement the receive thread
077     */
078    protected Runnable rcvHandler;
079
080    /**
081     * Forward a preformatted LocoNetMessage to the actual interface.
082     * <p>
083     * Checksum is computed and overwritten here, then the message is converted
084     * to a byte array and queued for transmission.
085     *
086     * @param m Message to send; will be updated with CRC
087     */
088    @Override
089    public void sendLocoNetMessage(LocoNetMessage m) {
090
091        // update statistics
092        transmittedMsgCount++;
093
094        // set the error correcting code byte(s) before transmittal
095        m.setParity();
096
097        // stream to port in single write, as that's needed by serial
098        int len = m.getNumDataElements();
099        byte msg[] = new byte[len];
100        for (int i = 0; i < len; i++) {
101            msg[i] = (byte) m.getElement(i);
102        }
103
104        log.debug("queue LocoNet packet: {}", m);
105        // We need to queue the request and wake the xmit thread in an atomic operation
106        // But the thread might not be running, in which case the request is just
107        // queued up.
108        try {
109            xmtList.add(msg);
110        } catch (RuntimeException e) {
111            log.warn("passing to xmit: unexpected exception: ", e);
112        }
113    }
114
115    /**
116     * Implement abstract method to signal if there's a backlog of information
117     * waiting to be sent.
118     *
119     * @return true if busy, false if nothing waiting to send
120     */
121    @Override
122    public boolean isXmtBusy() {
123        if (controller == null) {
124            return false;
125        }
126
127        return (!controller.okToSend());
128    }
129
130    // methods to connect/disconnect to a source of data in a LnPortController
131
132    protected LnPortController controller = null;
133
134    /**
135     * Make connection to an existing LnPortController object.
136     *
137     * @param p Port controller for connected. Save this for a later disconnect
138     *          call
139     */
140    public void connectPort(LnPortController p) {
141        istream = p.getInputStream();
142        ostream = p.getOutputStream();
143        if (controller != null) {
144            log.warn("connectPort: connect called while connected");
145        }
146        controller = p;
147    }
148
149    /**
150     * Break connection to an existing LnPortController object. Once broken,
151     * attempts to send via "message" member will fail.
152     *
153     * @param p previously connected port
154     */
155    public void disconnectPort(LnPortController p) {
156        istream = null;
157        ostream = null;
158        if (controller != p) {
159            log.warn("disconnectPort: disconnect called from non-connected LnPortController");
160        }
161        controller = null;
162    }
163
164    // data members to hold the streams. These are public so the inner classes defined here
165    // can access them with a Java 1.1 compiler
166    public DataInputStream istream = null;
167    public OutputStream ostream = null;
168
169    /**
170     * Read a single byte, protecting against various timeouts, etc.
171     * <p>
172     * When a port is set to have a receive timeout (via the
173     * enableReceiveTimeout() method), some will return zero bytes or an
174     * EOFException at the end of the timeout. In that case, the read should be
175     * repeated to get the next real character.
176     *
177     * @param istream stream to read from
178     * @return buffer of received data
179     * @throws java.io.IOException failure during stream read
180     *
181     */
182    protected byte readByteProtected(DataInputStream istream) throws java.io.IOException {
183        while (true) { // loop will repeat until character found
184            int nchars;
185            // The istream should be configured so that the following
186            // read(..) call only blocks for a short time, e.g. 100msec, if no
187            // data is available.  It's OK if it
188            // throws e.g. java.io.InterruptedIOException
189            // in that case, as the calling loop should just go around
190            // and request input again.  This semi-blocking behavior will
191            // let the terminateThreads() method end this thread cleanly.
192            nchars = istream.read(rcvBuffer, 0, 1);
193            if (nchars < 0) {
194                throw new EOFException(String.format("Stream read returned %d, indicating end-of-file", nchars));
195            }
196            if (nchars > 0) {
197                return rcvBuffer[0];
198            }
199        }
200    }
201    // Defined this way to reduce new object creation
202    private final byte[] rcvBuffer = new byte[1];
203
204    /**
205     * Captive class to handle incoming characters. This is a permanent loop,
206     * looking for input messages in character form on the stream connected to
207     * the LnPortController via <code>connectPort</code>.
208     */
209    protected class RcvHandler implements Runnable {
210
211        /**
212         * Remember the LnPacketizer object
213         */
214        LnTrafficController trafficController;
215
216        public RcvHandler(LnTrafficController lt) {
217            trafficController = lt;
218        }
219
220        /**
221         * Handle incoming characters. This is a permanent loop, looking for
222         * input messages in character form on the stream connected to the
223         * LnPortController via <code>connectPort</code>. Terminates with the
224         * input stream breaking out of the try block.
225         */
226        @Override
227        public void run() {
228
229            int opCode;
230            while (!threadStopRequest && ! Thread.interrupted() ) {   // loop until asked to stop
231                try {
232                    // start by looking for command -  skip if bit not set
233                    while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check
234                        if (log.isTraceEnabled()) { // avoid building string
235                            log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N
236                        }
237                    }
238                    // here opCode is OK. Create output message
239                    if (log.isTraceEnabled()) { // avoid building string
240                        log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N
241                    }
242                    LocoNetMessage msg = null;
243                    while (msg == null) {
244                        try {
245                            // Capture 2nd byte, always present
246                            int byte2 = readByteProtected(istream) & 0xFF;
247                            if (log.isTraceEnabled()) { // avoid building string
248                                log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N
249                            }                            // Decide length
250                            int len = 2;
251                            switch ((opCode & 0x60) >> 5) {
252                                case 0:
253                                    /* 2 byte message */
254
255                                    len = 2;
256                                    break;
257
258                                case 1:
259                                    /* 4 byte message */
260
261                                    len = 4;
262                                    break;
263
264                                case 2:
265                                    /* 6 byte message */
266
267                                    len = 6;
268                                    break;
269
270                                case 3:
271                                    /* N byte message */
272
273                                    if (byte2 < 2) {
274                                        log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N
275                                    }
276                                    len = byte2;
277                                    break;
278                                default:
279                                    log.warn("Unhandled code: {}", (opCode & 0x60) >> 5);
280                                    break;
281                            }
282                            msg = new LocoNetMessage(len);
283                            // message exists, now fill it
284                            msg.setOpCode(opCode);
285                            msg.setElement(1, byte2);
286                            log.trace("len: {}", len); // NOI18N
287                            for (int i = 2; i < len; i++) {
288                                // check for message-blocking error
289                                int b = readByteProtected(istream) & 0xFF;
290                                if (log.isTraceEnabled()) {
291                                    log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N
292                                }
293                                if ((b & 0x80) != 0) {
294                                    log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N
295                                    opCode = b;
296                                    throw new LocoNetMessageException();
297                                }
298                                msg.setElement(i, b);
299                            }
300                        } catch (LocoNetMessageException e) {
301                            // retry by destroying the existing message
302                            // opCode is set for the newly-started packet
303                            msg = null;
304                        }
305                    }
306                    // check parity
307                    if (!msg.checkParity()) {
308                        log.warn("Ignore LocoNet packet with bad checksum: {}", msg);
309                        throw new LocoNetMessageException();
310                    }
311                    // message is complete, dispatch it !!
312                    {
313                        log.debug("queue message for notification: {}", msg);
314
315                        jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController));
316                    }
317
318                    // done with this one
319                } catch (LocoNetMessageException e) {
320                    // just let it ride for now
321                    log.warn("run: unexpected LocoNetMessageException", e); // NOI18N
322                    continue;
323                } catch (java.io.InterruptedIOException e) {
324                    // posted from idle port when enableReceiveTimeout used
325                    // Normal condition, go around the loop again
326                    continue;
327                } catch (java.io.IOException e) {
328                    // fired when read detects end-of-file
329                    log.info("End of file", e); // NOI18N
330                    dispose();
331                    disconnectPort(controller);
332                    return;
333                } catch (RuntimeException e) {
334                    // normally, we don't catch RuntimeException, but in this
335                    // permanently running loop it seems wise.
336                    log.warn("run: unexpected Exception", e); // NOI18N
337                    continue;
338                }
339            } // end of permanent loop
340        }
341    }
342
343    /**
344     * Captive class to notify of one message.
345     */
346    private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction {
347
348        public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) {
349            thisMsg = msg;
350            thisTc = trafficController;
351        }
352        LocoNetMessage thisMsg;
353        LnTrafficController thisTc;
354
355        /**
356         * {@inheritDoc}
357         */
358        @Override
359        public void run() {
360            thisTc.notify(thisMsg);
361        }
362    }
363
364    /**
365     * Captive class to handle transmission.
366     */
367    class XmtHandler implements Runnable {
368
369        /**
370         * Loops forever, looking for message to send and processing them.
371         */
372        @Override
373        public void run() {
374
375            while (!threadStopRequest) {   // loop until asked to stop
376                // any input?
377                try {
378                    // get content; blocks until present
379                    log.trace("check for input"); // NOI18N
380
381                    byte msg[] = xmtList.take();
382
383                    // input - now send
384                    try {
385                        if (ostream != null) {
386                            if (log.isDebugEnabled()) { // avoid work if not needed
387                                if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N
388                                log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
389                            }
390                            ostream.write(msg);
391                            ostream.flush();
392                            if (log.isTraceEnabled()) { // avoid String building if not needed
393                                log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
394                            }
395                            messageTransmitted(msg);
396                        } else {
397                            // no stream connected
398                            log.warn("sendLocoNetMessage: no connection established"); // NOI18N
399                        }
400                    } catch (java.io.IOException e) {
401                        log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N
402                    }
403                } catch (InterruptedException ie) {
404                    return; // ending the thread
405                } catch (RuntimeException rt) {
406                    log.error("Exception on take() call", rt);
407                }
408            }
409        }
410    }
411
412    /**
413     * When a message is finally transmitted, forward it to listeners if echoing
414     * is needed.
415     *
416     * @param msg message sent
417     */
418    protected void messageTransmitted(byte[] msg) {
419        log.debug("message transmitted (echo {})", echo);
420        if (!echo) {
421            return;
422        }
423        // message is queued for transmit, echo it when needed
424        // return a notification via the queue to ensure end
425        javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg)));
426    }
427
428    static class Echo implements Runnable {
429
430        Echo(LnPacketizer t, LocoNetMessage m) {
431            myTc = t;
432            msgForLater = m;
433        }
434        LocoNetMessage msgForLater;
435        LnPacketizer myTc;
436
437        /**
438         * {@inheritDoc}
439         */
440        @Override
441        public void run() {
442            myTc.notify(msgForLater);
443        }
444    }
445
446    /**
447     * Invoked at startup to start the threads needed here.
448     */
449    public void startThreads() {
450        int priority = Thread.currentThread().getPriority();
451        log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N
452                priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY);
453
454        // start the RcvHandler in a thread of its own
455        if (rcvHandler == null) {
456            rcvHandler = new RcvHandler(this);
457        }
458        rcvThread = jmri.util.ThreadingUtil.newThread(rcvHandler, "LocoNet receive handler"); // NOI18N
459        rcvThread.setDaemon(true);
460        rcvThread.setPriority(Thread.MAX_PRIORITY);
461        rcvThread.start();
462
463        if (xmtHandler == null) {
464            xmtHandler = new XmtHandler();
465        }
466        // make sure that the xmt priority is no lower than the current priority
467        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY);
468        // start the XmtHandler in a thread of its own
469        if (xmtThread == null) {
470            xmtThread = jmri.util.ThreadingUtil.newThread(xmtHandler, "LocoNet transmit handler"); // NOI18N
471        }
472        log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N
473        xmtThread.setDaemon(true);
474        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
475        xmtThread.start();
476
477        log.info("lnPacketizer Started");
478    }
479
480    protected Thread rcvThread;
481    protected Thread xmtThread;
482
483    /**
484     * {@inheritDoc}
485     */
486    // The join(150) is using a timeout because some receive threads
487    // (and maybe some day transmit threads) use calls that block
488    // even when interrupted.  We wait 150 msec and proceed.
489    // Threads that do that are responsible for ending cleanly
490    // when the blocked call eventually returns.
491    @Override
492    public void dispose() {
493        threadStopRequest = true;
494        if (xmtThread != null) {
495            xmtThread.interrupt();
496            try {
497                xmtThread.join(150);
498            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
499        }
500        if (rcvThread != null) {
501            rcvThread.interrupt();
502            try {
503                rcvThread.join(150);
504            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
505        }
506        super.dispose();
507    }
508
509    /**
510     * Terminate the receive and transmit threads.
511     * <p>
512     * This is intended to be used only by testing subclasses.
513     */
514    // The join(150) is using a timeout because some receive threads
515    // (and maybe some day transmit threads) use calls that block
516    // even when interrupted.  We wait 150 msec and proceed.
517    // Threads that do that are responsible for ending cleanly
518    // when the blocked call eventually returns.
519    public void terminateThreads() {
520        threadStopRequest = true;
521        if (xmtThread != null) {
522            xmtThread.interrupt();
523            try {
524                xmtThread.join(150);
525            } catch (InterruptedException ie){
526                // interrupted during cleanup.
527            }
528        }
529
530        if (rcvThread != null) {
531            rcvThread.interrupt();
532            try {
533                rcvThread.join(150);
534            } catch (InterruptedException ie){
535                // interrupted during cleanup.
536            }
537        }
538    }
539
540    /**
541     * Flag that threads should terminate as soon as they can.
542     */
543    protected volatile boolean threadStopRequest = false;
544
545    private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class);
546
547}