001package jmri.jmrix.loconet; 002 003import java.io.DataInputStream; 004import java.io.EOFException; 005import java.io.OutputStream; 006import java.util.concurrent.LinkedTransferQueue; 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009 010/** 011 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface" 012 * side sends/receives LocoNetMessage objects. The connection to a 013 * LnPortController is via a pair of *Streams, which then carry sequences of 014 * characters for transmission. 015 * <p> 016 * Messages come to this via the main GUI thread, and are forwarded back to 017 * listeners in that same thread. Reception and transmission are handled in 018 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 019 * classes defined here. The thread priorities are: 020 * <ul> 021 * <li> RcvHandler - at highest available priority 022 * <li> XmtHandler - down one, which is assumed to be above the GUI 023 * <li> (everything else) 024 * </ul> 025 * Some of the message formats used in this class are Copyright Digitrax, Inc. 026 * and used with permission as part of the JMRI project. That permission does 027 * not extend to uses in other software products. If you wish to use this code, 028 * algorithm or these message formats outside of JMRI, please contact Digitrax 029 * Inc for separate permission. 030 * 031 * @author Bob Jacobsen Copyright (C) 2001, 2018 032 * @author B. Milhaupt Copyright (C) 2020 033 */ 034public class LnPacketizer extends LnTrafficController { 035 036 /** 037 * True if the external hardware is not echoing messages, so we must. 038 */ 039 protected boolean echo = false; // true = echo messages here, instead of in hardware 040 041 public LnPacketizer(LocoNetSystemConnectionMemo m) { 042 // set the memo to point here 043 memo = m; 044 m.setLnTrafficController(this); 045 } 046 047 // The methods to implement the LocoNetInterface 048 049 /** 050 * {@inheritDoc} 051 */ 052 @Override 053 public boolean status() { 054 boolean returnVal = ( ostream != null && istream != null 055 && xmtThread != null && xmtThread.isAlive() && xmtHandler != null 056 && rcvThread != null && rcvThread.isAlive() && rcvHandler != null 057 ); 058 return returnVal; 059 } 060 061 /** 062 * Synchronized list used as a transmit queue. 063 */ 064 protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>(); 065 066 /** 067 * XmtHandler (a local class) object to implement the transmit thread. 068 * <p> 069 * We create this object in startThreads() as each packetizer uses different handlers. 070 * So long as the object is created before using it to sync it works. 071 * 072 */ 073 protected Runnable xmtHandler = null; 074 075 /** 076 * RcvHandler (a local class) object to implement the receive thread 077 */ 078 protected Runnable rcvHandler; 079 080 /** 081 * Forward a preformatted LocoNetMessage to the actual interface. 082 * <p> 083 * Checksum is computed and overwritten here, then the message is converted 084 * to a byte array and queued for transmission. 085 * 086 * @param m Message to send; will be updated with CRC 087 */ 088 @Override 089 public void sendLocoNetMessage(LocoNetMessage m) { 090 091 // update statistics 092 transmittedMsgCount++; 093 094 // set the error correcting code byte(s) before transmittal 095 m.setParity(); 096 097 // stream to port in single write, as that's needed by serial 098 int len = m.getNumDataElements(); 099 byte msg[] = new byte[len]; 100 for (int i = 0; i < len; i++) { 101 msg[i] = (byte) m.getElement(i); 102 } 103 104 log.debug("queue LocoNet packet: {}", m); 105 // We need to queue the request and wake the xmit thread in an atomic operation 106 // But the thread might not be running, in which case the request is just 107 // queued up. 108 try { 109 xmtList.add(msg); 110 } catch (RuntimeException e) { 111 log.warn("passing to xmit: unexpected exception: ", e); 112 } 113 } 114 115 /** 116 * Implement abstract method to signal if there's a backlog of information 117 * waiting to be sent. 118 * 119 * @return true if busy, false if nothing waiting to send 120 */ 121 @Override 122 public boolean isXmtBusy() { 123 if (controller == null) { 124 return false; 125 } 126 127 return (!controller.okToSend()); 128 } 129 130 // methods to connect/disconnect to a source of data in a LnPortController 131 132 protected LnPortController controller = null; 133 134 /** 135 * Make connection to an existing LnPortController object. 136 * 137 * @param p Port controller for connected. Save this for a later disconnect 138 * call 139 */ 140 public void connectPort(LnPortController p) { 141 istream = p.getInputStream(); 142 ostream = p.getOutputStream(); 143 if (controller != null) { 144 log.warn("connectPort: connect called while connected"); 145 } 146 controller = p; 147 } 148 149 /** 150 * Break connection to an existing LnPortController object. Once broken, 151 * attempts to send via "message" member will fail. 152 * 153 * @param p previously connected port 154 */ 155 public void disconnectPort(LnPortController p) { 156 istream = null; 157 ostream = null; 158 if (controller != p) { 159 log.warn("disconnectPort: disconnect called from non-connected LnPortController"); 160 } 161 controller = null; 162 } 163 164 // data members to hold the streams. These are public so the inner classes defined here 165 // can access them with a Java 1.1 compiler 166 public DataInputStream istream = null; 167 public OutputStream ostream = null; 168 169 /** 170 * Read a single byte, protecting against various timeouts, etc. 171 * <p> 172 * When a port is set to have a receive timeout (via the 173 * enableReceiveTimeout() method), some will return zero bytes or an 174 * EOFException at the end of the timeout. In that case, the read should be 175 * repeated to get the next real character. 176 * 177 * @param istream stream to read from 178 * @return buffer of received data 179 * @throws java.io.IOException failure during stream read 180 * 181 */ 182 protected byte readByteProtected(DataInputStream istream) throws java.io.IOException { 183 while (true) { // loop will repeat until character found 184 int nchars; 185 // The istream should be configured so that the following 186 // read(..) call only blocks for a short time, e.g. 100msec, if no 187 // data is available. It's OK if it 188 // throws e.g. java.io.InterruptedIOException 189 // in that case, as the calling loop should just go around 190 // and request input again. This semi-blocking behavior will 191 // let the terminateThreads() method end this thread cleanly. 192 nchars = istream.read(rcvBuffer, 0, 1); 193 if (nchars < 0) { 194 throw new EOFException(String.format("Stream read returned %d, indicating end-of-file", nchars)); 195 } 196 if (nchars > 0) { 197 return rcvBuffer[0]; 198 } 199 } 200 } 201 // Defined this way to reduce new object creation 202 private final byte[] rcvBuffer = new byte[1]; 203 204 /** 205 * Captive class to handle incoming characters. This is a permanent loop, 206 * looking for input messages in character form on the stream connected to 207 * the LnPortController via <code>connectPort</code>. 208 */ 209 protected class RcvHandler implements Runnable { 210 211 /** 212 * Remember the LnPacketizer object 213 */ 214 LnTrafficController trafficController; 215 216 public RcvHandler(LnTrafficController lt) { 217 trafficController = lt; 218 } 219 220 /** 221 * Handle incoming characters. This is a permanent loop, looking for 222 * input messages in character form on the stream connected to the 223 * LnPortController via <code>connectPort</code>. Terminates with the 224 * input stream breaking out of the try block. 225 */ 226 @Override 227 public void run() { 228 229 int opCode; 230 while (!threadStopRequest && ! Thread.interrupted() ) { // loop until asked to stop 231 try { 232 // start by looking for command - skip if bit not set 233 while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check 234 if (log.isTraceEnabled()) { // avoid building string 235 log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N 236 } 237 } 238 // here opCode is OK. Create output message 239 if (log.isTraceEnabled()) { // avoid building string 240 log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N 241 } 242 LocoNetMessage msg = null; 243 while (msg == null) { 244 try { 245 // Capture 2nd byte, always present 246 int byte2 = readByteProtected(istream) & 0xFF; 247 if (log.isTraceEnabled()) { // avoid building string 248 log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N 249 } // Decide length 250 int len = 2; 251 switch ((opCode & 0x60) >> 5) { 252 case 0: 253 /* 2 byte message */ 254 255 len = 2; 256 break; 257 258 case 1: 259 /* 4 byte message */ 260 261 len = 4; 262 break; 263 264 case 2: 265 /* 6 byte message */ 266 267 len = 6; 268 break; 269 270 case 3: 271 /* N byte message */ 272 273 if (byte2 < 2) { 274 log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N 275 } 276 len = byte2; 277 break; 278 default: 279 log.warn("Unhandled code: {}", (opCode & 0x60) >> 5); 280 break; 281 } 282 msg = new LocoNetMessage(len); 283 // message exists, now fill it 284 msg.setOpCode(opCode); 285 msg.setElement(1, byte2); 286 log.trace("len: {}", len); // NOI18N 287 for (int i = 2; i < len; i++) { 288 // check for message-blocking error 289 int b = readByteProtected(istream) & 0xFF; 290 if (log.isTraceEnabled()) { 291 log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N 292 } 293 if ((b & 0x80) != 0) { 294 log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N 295 opCode = b; 296 throw new LocoNetMessageException(); 297 } 298 msg.setElement(i, b); 299 } 300 } catch (LocoNetMessageException e) { 301 // retry by destroying the existing message 302 // opCode is set for the newly-started packet 303 msg = null; 304 } 305 } 306 // check parity 307 if (!msg.checkParity()) { 308 log.warn("Ignore LocoNet packet with bad checksum: {}", msg); 309 throw new LocoNetMessageException(); 310 } 311 // message is complete, dispatch it !! 312 { 313 log.debug("queue message for notification: {}", msg); 314 315 jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController)); 316 } 317 318 // done with this one 319 } catch (LocoNetMessageException e) { 320 // just let it ride for now 321 log.warn("run: unexpected LocoNetMessageException", e); // NOI18N 322 continue; 323 } catch (java.io.InterruptedIOException e) { 324 // posted from idle port when enableReceiveTimeout used 325 // Normal condition, go around the loop again 326 continue; 327 } catch (java.io.IOException e) { 328 // fired when read detects end-of-file 329 log.info("End of file", e); // NOI18N 330 dispose(); 331 disconnectPort(controller); 332 return; 333 } catch (RuntimeException e) { 334 // normally, we don't catch RuntimeException, but in this 335 // permanently running loop it seems wise. 336 log.warn("run: unexpected Exception", e); // NOI18N 337 continue; 338 } 339 } // end of permanent loop 340 } 341 } 342 343 /** 344 * Captive class to notify of one message. 345 */ 346 private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction { 347 348 public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) { 349 thisMsg = msg; 350 thisTc = trafficController; 351 } 352 LocoNetMessage thisMsg; 353 LnTrafficController thisTc; 354 355 /** 356 * {@inheritDoc} 357 */ 358 @Override 359 public void run() { 360 thisTc.notify(thisMsg); 361 } 362 } 363 364 /** 365 * Captive class to handle transmission. 366 */ 367 class XmtHandler implements Runnable { 368 369 /** 370 * Loops forever, looking for message to send and processing them. 371 */ 372 @Override 373 public void run() { 374 375 while (!threadStopRequest) { // loop until asked to stop 376 // any input? 377 try { 378 // get content; blocks until present 379 log.trace("check for input"); // NOI18N 380 381 byte msg[] = xmtList.take(); 382 383 // input - now send 384 try { 385 if (ostream != null) { 386 if (log.isDebugEnabled()) { // avoid work if not needed 387 if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N 388 log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 389 } 390 ostream.write(msg); 391 ostream.flush(); 392 if (log.isTraceEnabled()) { // avoid String building if not needed 393 log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 394 } 395 messageTransmitted(msg); 396 } else { 397 // no stream connected 398 log.warn("sendLocoNetMessage: no connection established"); // NOI18N 399 } 400 } catch (java.io.IOException e) { 401 log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N 402 } 403 } catch (InterruptedException ie) { 404 return; // ending the thread 405 } catch (RuntimeException rt) { 406 log.error("Exception on take() call", rt); 407 } 408 } 409 } 410 } 411 412 /** 413 * When a message is finally transmitted, forward it to listeners if echoing 414 * is needed. 415 * 416 * @param msg message sent 417 */ 418 protected void messageTransmitted(byte[] msg) { 419 log.debug("message transmitted (echo {})", echo); 420 if (!echo) { 421 return; 422 } 423 // message is queued for transmit, echo it when needed 424 // return a notification via the queue to ensure end 425 javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg))); 426 } 427 428 static class Echo implements Runnable { 429 430 Echo(LnPacketizer t, LocoNetMessage m) { 431 myTc = t; 432 msgForLater = m; 433 } 434 LocoNetMessage msgForLater; 435 LnPacketizer myTc; 436 437 /** 438 * {@inheritDoc} 439 */ 440 @Override 441 public void run() { 442 myTc.notify(msgForLater); 443 } 444 } 445 446 /** 447 * Invoked at startup to start the threads needed here. 448 */ 449 public void startThreads() { 450 int priority = Thread.currentThread().getPriority(); 451 log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N 452 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 453 454 // start the RcvHandler in a thread of its own 455 if (rcvHandler == null) { 456 rcvHandler = new RcvHandler(this); 457 } 458 rcvThread = jmri.util.ThreadingUtil.newThread(rcvHandler, "LocoNet receive handler"); // NOI18N 459 rcvThread.setDaemon(true); 460 rcvThread.setPriority(Thread.MAX_PRIORITY); 461 rcvThread.start(); 462 463 if (xmtHandler == null) { 464 xmtHandler = new XmtHandler(); 465 } 466 // make sure that the xmt priority is no lower than the current priority 467 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 468 // start the XmtHandler in a thread of its own 469 if (xmtThread == null) { 470 xmtThread = jmri.util.ThreadingUtil.newThread(xmtHandler, "LocoNet transmit handler"); // NOI18N 471 } 472 log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N 473 xmtThread.setDaemon(true); 474 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 475 xmtThread.start(); 476 477 log.info("lnPacketizer Started"); 478 } 479 480 protected Thread rcvThread; 481 protected Thread xmtThread; 482 483 /** 484 * {@inheritDoc} 485 */ 486 // The join(150) is using a timeout because some receive threads 487 // (and maybe some day transmit threads) use calls that block 488 // even when interrupted. We wait 150 msec and proceed. 489 // Threads that do that are responsible for ending cleanly 490 // when the blocked call eventually returns. 491 @Override 492 public void dispose() { 493 threadStopRequest = true; 494 if (xmtThread != null) { 495 xmtThread.interrupt(); 496 try { 497 xmtThread.join(150); 498 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 499 } 500 if (rcvThread != null) { 501 rcvThread.interrupt(); 502 try { 503 rcvThread.join(150); 504 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 505 } 506 super.dispose(); 507 } 508 509 /** 510 * Terminate the receive and transmit threads. 511 * <p> 512 * This is intended to be used only by testing subclasses. 513 */ 514 // The join(150) is using a timeout because some receive threads 515 // (and maybe some day transmit threads) use calls that block 516 // even when interrupted. We wait 150 msec and proceed. 517 // Threads that do that are responsible for ending cleanly 518 // when the blocked call eventually returns. 519 public void terminateThreads() { 520 threadStopRequest = true; 521 if (xmtThread != null) { 522 xmtThread.interrupt(); 523 try { 524 xmtThread.join(150); 525 } catch (InterruptedException ie){ 526 // interrupted during cleanup. 527 } 528 } 529 530 if (rcvThread != null) { 531 rcvThread.interrupt(); 532 try { 533 rcvThread.join(150); 534 } catch (InterruptedException ie){ 535 // interrupted during cleanup. 536 } 537 } 538 } 539 540 /** 541 * Flag that threads should terminate as soon as they can. 542 */ 543 protected volatile boolean threadStopRequest = false; 544 545 private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class); 546 547}