001package jmri.jmrix.loconet; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.concurrent.LinkedTransferQueue; 006import org.slf4j.Logger; 007import org.slf4j.LoggerFactory; 008 009/** 010 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface" 011 * side sends/receives LocoNetMessage objects. The connection to a 012 * LnPortController is via a pair of *Streams, which then carry sequences of 013 * characters for transmission. 014 * <p> 015 * Messages come to this via the main GUI thread, and are forwarded back to 016 * listeners in that same thread. Reception and transmission are handled in 017 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 018 * classes defined here. The thread priorities are: 019 * <ul> 020 * <li> RcvHandler - at highest available priority 021 * <li> XmtHandler - down one, which is assumed to be above the GUI 022 * <li> (everything else) 023 * </ul> 024 * Some of the message formats used in this class are Copyright Digitrax, Inc. 025 * and used with permission as part of the JMRI project. That permission does 026 * not extend to uses in other software products. If you wish to use this code, 027 * algorithm or these message formats outside of JMRI, please contact Digitrax 028 * Inc for separate permission. 029 * 030 * @author Bob Jacobsen Copyright (C) 2001, 2018 031 * @author B. Milhaupt Copyright (C) 2020 032 */ 033public class LnPacketizer extends LnTrafficController { 034 035 /** 036 * True if the external hardware is not echoing messages, so we must. 037 */ 038 protected boolean echo = false; // true = echo messages here, instead of in hardware 039 040 public LnPacketizer(LocoNetSystemConnectionMemo m) { 041 // set the memo to point here 042 memo = m; 043 m.setLnTrafficController(this); 044 } 045 046 // The methods to implement the LocoNetInterface 047 048 /** 049 * {@inheritDoc} 050 */ 051 @Override 052 public boolean status() { 053 boolean returnVal = ( ostream != null && istream != null 054 && xmtThread != null && xmtThread.isAlive() && xmtHandler != null 055 && rcvThread != null && rcvThread.isAlive() && rcvHandler != null 056 ); 057 return returnVal; 058 } 059 060 /** 061 * Synchronized list used as a transmit queue. 062 */ 063 protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>(); 064 065 /** 066 * XmtHandler (a local class) object to implement the transmit thread. 067 * <p> 068 * We create this object in startThreads() as each packetizer uses different handlers. 069 * So long as the object is created before using it to sync it works. 070 * 071 */ 072 protected Runnable xmtHandler = null; 073 074 /** 075 * RcvHandler (a local class) object to implement the receive thread 076 */ 077 protected Runnable rcvHandler; 078 079 /** 080 * Forward a preformatted LocoNetMessage to the actual interface. 081 * <p> 082 * Checksum is computed and overwritten here, then the message is converted 083 * to a byte array and queued for transmission. 084 * 085 * @param m Message to send; will be updated with CRC 086 */ 087 @Override 088 public void sendLocoNetMessage(LocoNetMessage m) { 089 090 // update statistics 091 transmittedMsgCount++; 092 093 // set the error correcting code byte(s) before transmittal 094 m.setParity(); 095 096 // stream to port in single write, as that's needed by serial 097 int len = m.getNumDataElements(); 098 byte msg[] = new byte[len]; 099 for (int i = 0; i < len; i++) { 100 msg[i] = (byte) m.getElement(i); 101 } 102 103 log.debug("queue LocoNet packet: {}", m); 104 // We need to queue the request and wake the xmit thread in an atomic operation 105 // But the thread might not be running, in which case the request is just 106 // queued up. 107 try { 108 xmtList.add(msg); 109 } catch (RuntimeException e) { 110 log.warn("passing to xmit: unexpected exception: ", e); 111 } 112 } 113 114 /** 115 * Implement abstract method to signal if there's a backlog of information 116 * waiting to be sent. 117 * 118 * @return true if busy, false if nothing waiting to send 119 */ 120 @Override 121 public boolean isXmtBusy() { 122 if (controller == null) { 123 return false; 124 } 125 126 return (!controller.okToSend()); 127 } 128 129 // methods to connect/disconnect to a source of data in a LnPortController 130 131 protected LnPortController controller = null; 132 133 /** 134 * Make connection to an existing LnPortController object. 135 * 136 * @param p Port controller for connected. Save this for a later disconnect 137 * call 138 */ 139 public void connectPort(LnPortController p) { 140 istream = p.getInputStream(); 141 ostream = p.getOutputStream(); 142 if (controller != null) { 143 log.warn("connectPort: connect called while connected"); 144 } 145 controller = p; 146 } 147 148 /** 149 * Break connection to an existing LnPortController object. Once broken, 150 * attempts to send via "message" member will fail. 151 * 152 * @param p previously connected port 153 */ 154 public void disconnectPort(LnPortController p) { 155 istream = null; 156 ostream = null; 157 if (controller != p) { 158 log.warn("disconnectPort: disconnect called from non-connected LnPortController"); 159 } 160 controller = null; 161 } 162 163 // data members to hold the streams. These are public so the inner classes defined here 164 // can access them with a Java 1.1 compiler 165 public DataInputStream istream = null; 166 public OutputStream ostream = null; 167 168 /** 169 * Read a single byte, protecting against various timeouts, etc. 170 * <p> 171 * When a port is set to have a receive timeout (via the 172 * enableReceiveTimeout() method), some will return zero bytes or an 173 * EOFException at the end of the timeout. In that case, the read should be 174 * repeated to get the next real character. 175 * 176 * @param istream stream to read from 177 * @return buffer of received data 178 * @throws java.io.IOException failure during stream read 179 * 180 */ 181 protected byte readByteProtected(DataInputStream istream) throws java.io.IOException { 182 while (true) { // loop will repeat until character found 183 int nchars; 184 // The istream should be configured so that the following 185 // read(..) call only blocks for a short time, e.g. 100msec, if no 186 // data is available. It's OK if it 187 // throws e.g. java.io.InterruptedIOException 188 // in that case, as the calling loop should just go around 189 // and request input again. This semi-blocking behavior will 190 // let the terminateThreads() method end this thread cleanly. 191 nchars = istream.read(rcvBuffer, 0, 1); 192 if (nchars > 0) { 193 return rcvBuffer[0]; 194 } 195 } 196 } 197 // Defined this way to reduce new object creation 198 private final byte[] rcvBuffer = new byte[1]; 199 200 /** 201 * Captive class to handle incoming characters. This is a permanent loop, 202 * looking for input messages in character form on the stream connected to 203 * the LnPortController via <code>connectPort</code>. 204 */ 205 protected class RcvHandler implements Runnable { 206 207 /** 208 * Remember the LnPacketizer object 209 */ 210 LnTrafficController trafficController; 211 212 public RcvHandler(LnTrafficController lt) { 213 trafficController = lt; 214 } 215 216 /** 217 * Handle incoming characters. This is a permanent loop, looking for 218 * input messages in character form on the stream connected to the 219 * LnPortController via <code>connectPort</code>. Terminates with the 220 * input stream breaking out of the try block. 221 */ 222 @Override 223 public void run() { 224 225 int opCode; 226 while (!threadStopRequest && ! Thread.interrupted() ) { // loop until asked to stop 227 try { 228 // start by looking for command - skip if bit not set 229 while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check 230 if (log.isTraceEnabled()) { // avoid building string 231 log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N 232 } 233 } 234 // here opCode is OK. Create output message 235 if (log.isTraceEnabled()) { // avoid building string 236 log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N 237 } 238 LocoNetMessage msg = null; 239 while (msg == null) { 240 try { 241 // Capture 2nd byte, always present 242 int byte2 = readByteProtected(istream) & 0xFF; 243 if (log.isTraceEnabled()) { // avoid building string 244 log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N 245 } // Decide length 246 int len = 2; 247 switch ((opCode & 0x60) >> 5) { 248 case 0: 249 /* 2 byte message */ 250 251 len = 2; 252 break; 253 254 case 1: 255 /* 4 byte message */ 256 257 len = 4; 258 break; 259 260 case 2: 261 /* 6 byte message */ 262 263 len = 6; 264 break; 265 266 case 3: 267 /* N byte message */ 268 269 if (byte2 < 2) { 270 log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N 271 } 272 len = byte2; 273 break; 274 default: 275 log.warn("Unhandled code: {}", (opCode & 0x60) >> 5); 276 break; 277 } 278 msg = new LocoNetMessage(len); 279 // message exists, now fill it 280 msg.setOpCode(opCode); 281 msg.setElement(1, byte2); 282 log.trace("len: {}", len); // NOI18N 283 for (int i = 2; i < len; i++) { 284 // check for message-blocking error 285 int b = readByteProtected(istream) & 0xFF; 286 if (log.isTraceEnabled()) { 287 log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N 288 } 289 if ((b & 0x80) != 0) { 290 log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N 291 opCode = b; 292 throw new LocoNetMessageException(); 293 } 294 msg.setElement(i, b); 295 } 296 } catch (LocoNetMessageException e) { 297 // retry by destroying the existing message 298 // opCode is set for the newly-started packet 299 msg = null; 300 } 301 } 302 // check parity 303 if (!msg.checkParity()) { 304 log.warn("Ignore LocoNet packet with bad checksum: {}", msg); 305 throw new LocoNetMessageException(); 306 } 307 // message is complete, dispatch it !! 308 { 309 log.debug("queue message for notification: {}", msg); 310 311 jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController)); 312 } 313 314 // done with this one 315 } catch (LocoNetMessageException e) { 316 // just let it ride for now 317 log.warn("run: unexpected LocoNetMessageException", e); // NOI18N 318 continue; 319 } catch (java.io.EOFException | java.io.InterruptedIOException e) { 320 // posted from idle port when enableReceiveTimeout used 321 // Normal condition, go around the loop again 322 continue; 323 } catch (java.io.IOException e) { 324 // fired when write-end of HexFile reaches end 325 log.debug("IOException, should only happen with HexFile", e); // NOI18N 326 log.info("End of file"); // NOI18N 327 disconnectPort(controller); 328 return; 329 } catch (RuntimeException e) { 330 // normally, we don't catch RuntimeException, but in this 331 // permanently running loop it seems wise. 332 log.warn("run: unexpected Exception", e); // NOI18N 333 continue; 334 } 335 } // end of permanent loop 336 } 337 } 338 339 /** 340 * Captive class to notify of one message. 341 */ 342 private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction { 343 344 public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) { 345 thisMsg = msg; 346 thisTc = trafficController; 347 } 348 LocoNetMessage thisMsg; 349 LnTrafficController thisTc; 350 351 /** 352 * {@inheritDoc} 353 */ 354 @Override 355 public void run() { 356 thisTc.notify(thisMsg); 357 } 358 } 359 360 /** 361 * Captive class to handle transmission. 362 */ 363 class XmtHandler implements Runnable { 364 365 /** 366 * Loops forever, looking for message to send and processing them. 367 */ 368 @Override 369 public void run() { 370 371 while (!threadStopRequest) { // loop until asked to stop 372 // any input? 373 try { 374 // get content; blocks until present 375 log.trace("check for input"); // NOI18N 376 377 byte msg[] = xmtList.take(); 378 379 // input - now send 380 try { 381 if (ostream != null) { 382 if (log.isDebugEnabled()) { // avoid work if not needed 383 if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N 384 log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 385 } 386 ostream.write(msg); 387 ostream.flush(); 388 if (log.isTraceEnabled()) { // avoid String building if not needed 389 log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 390 } 391 messageTransmitted(msg); 392 } else { 393 // no stream connected 394 log.warn("sendLocoNetMessage: no connection established"); // NOI18N 395 } 396 } catch (java.io.IOException e) { 397 log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N 398 } 399 } catch (InterruptedException ie) { 400 return; // ending the thread 401 } catch (RuntimeException rt) { 402 log.error("Exception on take() call", rt); 403 } 404 } 405 } 406 } 407 408 /** 409 * When a message is finally transmitted, forward it to listeners if echoing 410 * is needed. 411 * 412 * @param msg message sent 413 */ 414 protected void messageTransmitted(byte[] msg) { 415 log.debug("message transmitted (echo {})", echo); 416 if (!echo) { 417 return; 418 } 419 // message is queued for transmit, echo it when needed 420 // return a notification via the queue to ensure end 421 javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg))); 422 } 423 424 static class Echo implements Runnable { 425 426 Echo(LnPacketizer t, LocoNetMessage m) { 427 myTc = t; 428 msgForLater = m; 429 } 430 LocoNetMessage msgForLater; 431 LnPacketizer myTc; 432 433 /** 434 * {@inheritDoc} 435 */ 436 @Override 437 public void run() { 438 myTc.notify(msgForLater); 439 } 440 } 441 442 /** 443 * Invoked at startup to start the threads needed here. 444 */ 445 public void startThreads() { 446 int priority = Thread.currentThread().getPriority(); 447 log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N 448 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 449 450 // start the RcvHandler in a thread of its own 451 if (rcvHandler == null) { 452 rcvHandler = new RcvHandler(this); 453 } 454 rcvThread = jmri.util.ThreadingUtil.newThread(rcvHandler, "LocoNet receive handler"); // NOI18N 455 rcvThread.setDaemon(true); 456 rcvThread.setPriority(Thread.MAX_PRIORITY); 457 rcvThread.start(); 458 459 if (xmtHandler == null) { 460 xmtHandler = new XmtHandler(); 461 } 462 // make sure that the xmt priority is no lower than the current priority 463 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 464 // start the XmtHandler in a thread of its own 465 if (xmtThread == null) { 466 xmtThread = jmri.util.ThreadingUtil.newThread(xmtHandler, "LocoNet transmit handler"); // NOI18N 467 } 468 log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N 469 xmtThread.setDaemon(true); 470 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 471 xmtThread.start(); 472 473 log.info("lnPacketizer Started"); 474 } 475 476 protected Thread rcvThread; 477 protected Thread xmtThread; 478 479 /** 480 * {@inheritDoc} 481 */ 482 // The join(150) is using a timeout because some receive threads 483 // (and maybe some day transmit threads) use calls that block 484 // even when interrupted. We wait 150 msec and proceed. 485 // Threads that do that are responsible for ending cleanly 486 // when the blocked call eventually returns. 487 @Override 488 public void dispose() { 489 if (xmtThread != null) { 490 xmtThread.interrupt(); 491 try { 492 xmtThread.join(150); 493 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 494 } 495 if (rcvThread != null) { 496 rcvThread.interrupt(); 497 try { 498 rcvThread.join(150); 499 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 500 } 501 super.dispose(); 502 } 503 504 /** 505 * Terminate the receive and transmit threads. 506 * <p> 507 * This is intended to be used only by testing subclasses. 508 */ 509 // The join(150) is using a timeout because some receive threads 510 // (and maybe some day transmit threads) use calls that block 511 // even when interrupted. We wait 150 msec and proceed. 512 // Threads that do that are responsible for ending cleanly 513 // when the blocked call eventually returns. 514 public void terminateThreads() { 515 threadStopRequest = true; 516 if (xmtThread != null) { 517 xmtThread.interrupt(); 518 try { 519 xmtThread.join(150); 520 } catch (InterruptedException ie){ 521 // interrupted during cleanup. 522 } 523 } 524 525 if (rcvThread != null) { 526 rcvThread.interrupt(); 527 try { 528 rcvThread.join(150); 529 } catch (InterruptedException ie){ 530 // interrupted during cleanup. 531 } 532 } 533 } 534 535 /** 536 * Flag that threads should terminate as soon as they can. 537 */ 538 protected volatile boolean threadStopRequest = false; 539 540 private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class); 541 542}