001package jmri.jmrix; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.DataInputStream; 005import java.io.IOException; 006import java.io.OutputStream; 007import java.util.*; 008 009import javax.annotation.Nonnull; 010import javax.swing.SwingUtilities; 011 012import jmri.InstanceManager; 013import jmri.ShutDownManager; 014 015/** 016 * Abstract base for TrafficControllers in a Message/Reply protocol. 017 * <p> 018 * Two threads are used for the actual communication. The "Transmit" thread 019 * handles pushing characters to the port, and also changing the mode. The 020 * "Receive" thread converts characters from the input stream into replies. 021 * <p> 022 * The constructor registers a shutdown task to 023 * trigger the necessary cleanup code 024 * <p> 025 * The internal state machine handles changes of mode, automatic retry of 026 * certain messages, time outs, and sending poll messages when otherwise idle. 027 * <p> 028 * "Mode" refers to the state of the command station communications. "Normal" 029 * and "Programming" are the two modes, used if the command station requires 030 * messages to go back and forth between them. <br> 031 * 032 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram"> 033 * 034 * <p> 035 * The key methods for the basic operation are: 036 * <ul> 037 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)} 038 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data 039 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs 040 * information on e.g. the last message sent, that can be stored in member variables 041 * by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}. 042 * <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects 043 * </ul> 044 * <p> 045 * If your command station requires messages to go in and out of 046 * "programming mode", those should be provided by 047 * {@link #enterProgMode()} and {@link #enterNormalMode()}. 048 * <p> 049 * If you want to poll for information when the line is otherwise idle, 050 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}. 051 * 052 * @author Bob Jacobsen Copyright (C) 2003 053 * @author Paul Bender Copyright (C) 2004-2010 054 */ 055 056/* 057@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png 058 059 [*] --> IDLESTATE 060 IDLESTATE --> NOTIFIEDSTATE : sendMessage() 061 NOTIFIEDSTATE --> IDLESTATE : queue empty 062 063 NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message 064 065 WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE 066 WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE 067 068 WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply() 069 070 WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE 071 WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE 072 OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message 073 074 IDLESTATE --> POLLSTATE : transmitLoop()\nno work 075 POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it 076 POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send 077 078 WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply 079 AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message 080 081NOTIFIEDSTATE : Transmit thread wakes up and processes 082POLLSTATE : Transient while deciding to send poll 083OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change 084AUTORETRYSTATE : Transient while deciding to resend auto-retry message 085WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply 086WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply 087WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout 088 089Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry 090Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change. 091 092@enduml 093 */ 094 095public abstract class AbstractMRTrafficController { 096 097 private final Runnable shutDownTask = this::terminate; // retain for possible removal. 098 099 /** 100 * Create a new unnamed MRTrafficController. 101 */ 102 public AbstractMRTrafficController() { 103 log.debug("Creating AbstractMRTrafficController instance"); 104 mCurrentMode = NORMALMODE; 105 mCurrentState = IDLESTATE; 106 allowUnexpectedReply = false; 107 108 109 // We use a shutdown task here to make sure the connection is left 110 // in a clean state prior to exiting. This is required on systems 111 // which have a service mode to ensure we don't leave the system 112 // in an unusable state. Once the shutdown task executes, the connection 113 // must be considered permanently closed. 114 115 InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask); 116 } 117 118 private boolean synchronizeRx = true; 119 120 protected void setSynchronizeRx(boolean val) { 121 synchronizeRx = val; 122 } 123 124 protected boolean getSynchronizeRx() { 125 return synchronizeRx; 126 } 127 128 // The methods to implement the abstract Interface 129 130 protected final Vector<AbstractMRListener> cmdListeners = new Vector<>(); 131 132 /** 133 * Add a Listener to the Listener list. 134 * @param l The Listener to be added, not null. 135 */ 136 protected synchronized void addListener(AbstractMRListener l) { 137 // add only if not already registered 138 if (l == null) { 139 throw new NullPointerException(); 140 } 141 if (!cmdListeners.contains(l)) { 142 cmdListeners.addElement(l); 143 } 144 } 145 146 /** 147 * Add a Listener to start of the Listener list. 148 * Intended for use only by system Consoles which may prefer notification 149 * before other objects have processed a Message and sent a Reply. 150 * @param l The Listener to be added, not null. 151 */ 152 protected synchronized void addConsoleListener(@Nonnull AbstractMRListener l){ 153 // add only if not already registered 154 if (!cmdListeners.contains(l)) { 155 cmdListeners.insertElementAt(l, 0); 156 } 157 } 158 159 /** 160 * Remove a Listener from the Listener list. 161 * The Listener will receive no further notifications. 162 * @param l The Listener to be removed. 163 */ 164 protected synchronized void removeListener(AbstractMRListener l) { 165 if (cmdListeners.contains(l)) { 166 cmdListeners.removeElement(l); 167 } 168 } 169 170 /** 171 * Forward a Message to registered listeners. 172 * 173 * @param m Message to be forwarded intact 174 * @param notMe One (optional) listener to be skipped, usually because it's 175 * the originating object. 176 */ 177 @SuppressWarnings("unchecked") 178 protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) { 179 // make a copy of the listener vector to synchronized not needed for transmit 180 Vector<AbstractMRListener> v; 181 synchronized (this) { 182 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 183 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 184 } 185 // forward to all listeners 186 int cnt = v.size(); 187 for (int i = 0; i < cnt; i++) { 188 AbstractMRListener client = v.elementAt(i); 189 if (notMe != client) { 190 log.debug("notify message, client: {}", client); 191 try { 192 forwardMessage(client, m); 193 } catch (RuntimeException e) { 194 log.warn("notify: During message dispatch to {}", client, e); 195 } 196 } 197 } 198 } 199 200 /** 201 * Implement this to forward a specific message type to a protocol-specific 202 * listener interface. 203 * This puts the casting into the concrete class. 204 * @param client abstract listener. 205 * @param m message to forward. 206 */ 207 protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m); 208 209 /** 210 * Invoked if it's appropriate to do low-priority polling of the command 211 * station, this should return the next message to send, or null if the 212 * TrafficController should just sleep. 213 * @return Formatted poll message 214 */ 215 protected abstract AbstractMRMessage pollMessage(); 216 217 protected abstract AbstractMRListener pollReplyHandler(); 218 219 protected AbstractMRListener mLastSender = null; 220 221 protected volatile int mCurrentMode; 222 public static final int NORMALMODE = 1; 223 public static final int PROGRAMINGMODE = 4; 224 225 /** 226 * Set the system to programming mode. 227 * @see #enterNormalMode() 228 * 229 * @return any message that needs to be returned to the Command Station 230 * to change modes. If no message is needed, returns null. 231 */ 232 protected abstract AbstractMRMessage enterProgMode(); 233 234 /** 235 * Sets the system to normal mode during programming while in IDLESTATE. 236 * If {@link #programmerIdle()} returns true, enterNormalMode() is 237 * called after a timeout. 238 * @see #enterProgMode() 239 * 240 * @return any message that needs to be returned to the Command Station 241 * to change modes. If no message is needed, returns null. 242 */ 243 protected abstract AbstractMRMessage enterNormalMode(); 244 245 /** 246 * Check if the programmer is idle. 247 * Override in the system specific code if necessary (see notes for 248 * {@link #enterNormalMode()}. 249 * 250 * @return true if not busy programming 251 */ 252 protected boolean programmerIdle() { 253 return true; 254 } 255 256 /** 257 * Get the delay (wait time) after enabling the programming track. 258 * Override in subclass to add a longer delay. 259 * 260 * @return 0 as default delay 261 */ 262 protected int enterProgModeDelayTime() { 263 return 0; 264 } 265 266 protected volatile int mCurrentState; 267 public static final int IDLESTATE = 10; // nothing happened 268 public static final int NOTIFIEDSTATE = 15; // xmt notified, will next wake 269 public static final int WAITMSGREPLYSTATE = 25; // xmt has sent, await reply to message 270 public static final int WAITREPLYINPROGMODESTATE = 30; // xmt has done mode change, await reply 271 public static final int WAITREPLYINNORMMODESTATE = 35; // xmt has done mode change, await reply 272 public static final int OKSENDMSGSTATE = 40; // mode change reply here, send original msg 273 public static final int AUTORETRYSTATE = 45; // received message where automatic recovery may occur with a retransmission, re-send original msg 274 public static final int POLLSTATE = 50; // Send program mode or poll message 275 276 protected boolean allowUnexpectedReply; 277 278 /** 279 * Set whether the command station may send messages without a request 280 * sent to it. 281 * 282 * @param expected true to allow messages without a prior request 283 */ 284 protected void setAllowUnexpectedReply(boolean expected) { 285 allowUnexpectedReply = expected; 286 } 287 288 /** 289 * Forward a "Reply" from layout to registered listeners. 290 * 291 * @param r Reply to be forwarded intact 292 * @param dest One (optional) listener to be skipped, usually because it's 293 * the originating object. 294 */ 295 @SuppressWarnings("unchecked") 296 protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) { 297 // make a copy of the listener vector to synchronized (not needed for transmit?) 298 Vector<AbstractMRListener> v; 299 synchronized (this) { 300 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 301 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 302 } 303 // forward to all listeners 304 int cnt = v.size(); 305 for (int i = 0; i < cnt; i++) { 306 AbstractMRListener client = v.elementAt(i); 307 log.debug("notify reply, client: {}", client); 308 try { 309 //skip dest for now, we'll send the message to there last. 310 if (dest != client) { 311 forwardReply(client, r); 312 } 313 } catch (RuntimeException e) { 314 log.warn("notify: During reply dispatch to {}", client, e); 315 } 316 } 317 318 // forward to the last listener who sent a message 319 // this is done _second_ so monitoring can have already stored the reply 320 // before a response is sent 321 if (dest != null) { 322 log.debug("notify reply, dest: {}", dest); 323 forwardReply(dest, r); 324 } 325 } 326 327 protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m); 328 329 /** 330 * Messages to be transmitted. 331 */ 332 protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>(); 333 protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>(); 334 335 /** 336 * Forward message to the port. Messages are queued and then the 337 * transmission thread is notified. 338 * @see #forwardToPort(AbstractMRMessage, AbstractMRListener) 339 * 340 * @param m the message to send 341 * @param reply the Listener sending the message, often provided as 'this' 342 */ 343 protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) { 344 msgQueue.addLast(m); 345 listenerQueue.addLast(reply); 346 synchronized (xmtRunnable) { 347 if (mCurrentState == IDLESTATE) { 348 mCurrentState = NOTIFIEDSTATE; 349 xmtRunnable.notify(); 350 } 351 } 352 if (m != null) { 353 log.debug("just notified transmit thread with message {}", m); 354 } 355 } 356 357 /** 358 * Permanent loop for the transmit thread. 359 */ 360 protected void transmitLoop() { 361 log.debug("transmitLoop starts in {}", this); 362 363 // loop forever 364 while (!connectionError && !threadStopRequest) { 365 AbstractMRMessage m = null; 366 AbstractMRListener l = null; 367 // check for something to do 368 synchronized (this) { 369 if (!msgQueue.isEmpty()) { 370 // yes, something to do 371 m = msgQueue.getFirst(); 372 msgQueue.removeFirst(); 373 l = listenerQueue.getFirst(); 374 listenerQueue.removeFirst(); 375 mCurrentState = WAITMSGREPLYSTATE; 376 log.debug("transmit loop has something to do: {}", m); 377 } // release lock here to proceed in parallel 378 } 379 // if a message has been extracted, process it 380 if (m != null) { 381 // check for need to change mode 382 log.debug("Start msg, state = {}", mCurrentMode); 383 if (m.getNeededMode() != mCurrentMode) { 384 AbstractMRMessage modeMsg; 385 if (m.getNeededMode() == PROGRAMINGMODE) { 386 // change state to programming mode and send message 387 modeMsg = enterProgMode(); 388 if (modeMsg != null) { 389 mCurrentState = WAITREPLYINPROGMODESTATE; 390 log.debug("Enter Programming Mode"); 391 forwardToPort(modeMsg, null); 392 // wait for reply 393 transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted"); 394 } 395 } else { 396 // change state to normal and send message 397 modeMsg = enterNormalMode(); 398 if (modeMsg != null) { 399 mCurrentState = WAITREPLYINNORMMODESTATE; 400 log.debug("Enter Normal Mode"); 401 forwardToPort(modeMsg, null); 402 // wait for reply 403 transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted"); 404 } 405 } 406 if (modeMsg != null) { 407 checkReplyInDispatch(); 408 if (mCurrentState != OKSENDMSGSTATE) { 409 handleTimeout(modeMsg, l); 410 } 411 mCurrentState = WAITMSGREPLYSTATE; 412 } else { 413 // no mode message required, but the message 414 // needs a different mode 415 log.debug("Setting mode to: {}", m.getNeededMode()); 416 mCurrentMode = m.getNeededMode(); 417 } 418 } 419 forwardToPort(m, l); 420 // reply expected? 421 if (m.replyExpected()) { 422 log.debug("reply expected is true for message {}",m); 423 // wait for a reply, or eventually timeout 424 transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted"); 425 checkReplyInDispatch(); 426 if (mCurrentState == WAITMSGREPLYSTATE) { 427 handleTimeout(m, l); 428 } else if (mCurrentState == AUTORETRYSTATE) { 429 log.info("Message added back to queue: {}", m); 430 msgQueue.addFirst(m); 431 listenerQueue.addFirst(l); 432 synchronized (xmtRunnable) { 433 mCurrentState = IDLESTATE; 434 } 435 } else { 436 resetTimeout(m); 437 } 438 } // just continue to the next message from here 439 } else { 440 // nothing to do 441 if (mCurrentState != IDLESTATE) { 442 log.debug("Setting IDLESTATE"); 443 log.debug("Current Mode {}", mCurrentMode); 444 mCurrentState = IDLESTATE; 445 } 446 // wait for something to send 447 if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) { 448 try { 449 long startTime = Calendar.getInstance().getTimeInMillis(); 450 synchronized (xmtRunnable) { 451 xmtRunnable.wait(mWaitBeforePoll); 452 } 453 long endTime = Calendar.getInstance().getTimeInMillis(); 454 waitTimePoll = waitTimePoll + endTime - startTime; 455 } catch (InterruptedException e) { 456 Thread.currentThread().interrupt(); // retain if needed later 457 // end of transmit loop 458 break; 459 } 460 } 461 // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee 462 // the change of mCurrentState to one of the waiting for reply states. Therefore we need to synchronize. 463 synchronized (this) { 464 if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) { 465 log.error("left timeout in unexpected state: {}", mCurrentState); 466 } 467 if (mCurrentState == IDLESTATE) { 468 mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE 469 } 470 } 471 // went around with nothing to do; leave programming state if in it 472 if (mCurrentMode == PROGRAMINGMODE) { 473 log.debug("Timeout - in service mode"); 474 } 475 if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) { 476 log.debug("timeout causes leaving programming mode"); 477 mCurrentState = WAITREPLYINNORMMODESTATE; 478 AbstractMRMessage msg = enterNormalMode(); 479 // if the enterNormalMode() message is null, we 480 // don't want to try to send it to the port. 481 if (msg != null) { 482 forwardToPort(msg, null); 483 // wait for reply 484 transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode"); 485 checkReplyInDispatch(); 486 // exit program mode timeout? 487 if (mCurrentState == WAITREPLYINNORMMODESTATE) { 488 // entering normal mode via timeout 489 handleTimeout(msg, l); 490 mCurrentMode = NORMALMODE; 491 } 492 // and go around again 493 } 494 } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) { 495 // We may need to poll 496 AbstractMRMessage msg = pollMessage(); 497 if (msg != null) { 498 // yes, send that 499 log.debug("Sending poll, wait time {}", waitTimePoll); 500 mCurrentState = WAITMSGREPLYSTATE; 501 forwardToPort(msg, pollReplyHandler()); 502 // wait for reply 503 log.debug("Still waiting for reply"); 504 transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply"); 505 checkReplyInDispatch(); 506 // and go around again 507 if (mCurrentState == WAITMSGREPLYSTATE) { 508 handleTimeout(msg, l); 509 } else { 510 resetTimeout(msg); 511 } 512 } 513 waitTimePoll = 0; 514 } 515 // no messages, so back to idle 516 if (mCurrentState == POLLSTATE) { 517 mCurrentState = IDLESTATE; 518 } 519 } 520 } 521 } // end of transmit loop; go around again 522 523 protected void transmitWait(int waitTime, int state, String interruptMessage) { 524 // wait() can have spurious wakeup! 525 // so we protect by making sure the entire timeout time is used 526 long currentTime = Calendar.getInstance().getTimeInMillis(); 527 long endTime = currentTime + waitTime; 528 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 529 long wait = endTime - currentTime; 530 try { 531 synchronized (xmtRunnable) { 532 // Do not wait if the current state has changed since we 533 // last set it. 534 if (mCurrentState != state) { 535 return; 536 } 537 xmtRunnable.wait(wait); // rcvr normally ends this w state change 538 } 539 } catch (InterruptedException e) { 540 Thread.currentThread().interrupt(); // retain if needed later 541 String[] packages = this.getClass().getName().split("\\."); 542 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 543 +(packages.length>=1 ? packages[packages.length-1] :""); 544 if (!threadStopRequest) { 545 log.error("{} in transmitWait(..) of {}", interruptMessage, name); 546 } else { 547 log.debug("during shutdown, {} in transmitWait(..) of {}", interruptMessage, name); 548 } 549 } 550 } 551 log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState); 552 } 553 554 // Dispatch control and timer 555 protected boolean replyInDispatch = false; // true when reply has been received but dispatch not completed 556 private int maxDispatchTime = 0; 557 private int warningMessageTime = DISPATCH_WARNING_TIME; 558 private static final int DISPATCH_WAIT_INTERVAL = 100; 559 private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded 560 private static final int WARN_NEXT_TIME = 1000; // report every second 561 562 private void checkReplyInDispatch() { 563 int loopCount = 0; 564 while (replyInDispatch) { 565 try { 566 synchronized (xmtRunnable) { 567 xmtRunnable.wait(DISPATCH_WAIT_INTERVAL); 568 } 569 } catch (InterruptedException e) { 570 Thread.currentThread().interrupt(); // retain if needed later 571 if (threadStopRequest) return; // don't log an error if closing. 572 String[] packages = this.getClass().getName().split("\\."); 573 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 574 +(packages.length>=1 ? packages[packages.length-1] :""); 575 log.error("transmitLoop interrupted in class {}", name); 576 } 577 loopCount++; 578 int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL; 579 if (currentDispatchTime > maxDispatchTime) { 580 maxDispatchTime = currentDispatchTime; 581 if (currentDispatchTime >= warningMessageTime) { 582 warningMessageTime = warningMessageTime + WARN_NEXT_TIME; 583 log.debug("Max dispatch time is now {}", currentDispatchTime); 584 } 585 } 586 } 587 } 588 589 /** 590 * Determine if the interface is down. 591 * 592 * @return timeoutFlag 593 */ 594 public boolean hasTimeouts() { 595 return timeoutFlag; 596 } 597 598 protected boolean timeoutFlag = false; 599 protected int timeouts = 0; 600 protected boolean flushReceiveChars = false; 601 602 protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) { 603 //log.debug("Timeout mCurrentState: {}", mCurrentState); 604 warnOnTimeout(msg, l); 605 timeouts++; 606 timeoutFlag = true; 607 flushReceiveChars = true; 608 } 609 610 protected void warnOnTimeout(AbstractMRMessage msg, AbstractMRListener l) { 611 String[] packages = this.getClass().getName().split("\\."); 612 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 613 +(packages.length>=1 ? packages[packages.length-1] :""); 614 615 log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name); 616 } 617 618 protected void resetTimeout(AbstractMRMessage msg) { 619 if (timeouts > 0) { 620 log.debug("Reset timeout after {} timeouts", timeouts); 621 } 622 timeouts = 0; 623 timeoutFlag = false; 624 } 625 626 /** 627 * Add header to the outgoing byte stream. 628 * 629 * @param msg the output byte stream 630 * @param m Message results 631 * @return next location in the stream to fill 632 */ 633 protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) { 634 return 0; 635 } 636 637 protected int mWaitBeforePoll = 100; 638 protected long waitTimePoll = 0; 639 640 /** 641 * Add trailer to the outgoing byte stream. 642 * 643 * @param msg the output byte stream 644 * @param offset the first byte not yet used 645 * @param m output message to extend 646 */ 647 protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) { 648 if (!m.isBinary()) { 649 msg[offset] = 0x0d; 650 } 651 } 652 653 /** 654 * Determine how many bytes the entire message will take, including 655 * space for header and trailer. 656 * 657 * @param m the message to be sent 658 * @return number of bytes 659 */ 660 protected int lengthOfByteStream(AbstractMRMessage m) { 661 int len = m.getNumDataElements(); 662 int cr = 0; 663 if (!m.isBinary()) { 664 cr = 1; // space for return char 665 } 666 return len + cr; 667 } 668 669 protected boolean xmtException = false; 670 671 /** 672 * Actually transmit the next message to the port. 673 * @see #sendMessage(AbstractMRMessage, AbstractMRListener) 674 * 675 * @param m the message to send 676 * @param reply the Listener sending the message, often provided as 'this' 677 */ 678 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"}, 679 justification = "Two locks needed for synchronization here, this is OK") 680 protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 681 log.debug("forwardToPort message: [{}]", m); 682 // remember who sent this 683 mLastSender = reply; 684 685 // forward the message to the registered recipients, 686 // which includes the communications monitor, except the sender. 687 // Schedule notification via the Swing event queue to ensure order 688 log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback")); 689 Runnable r = new XmtNotifier(m, mLastSender, this); 690 SwingUtilities.invokeLater(r); 691 692 // stream to port in single write, as that's needed by serial 693 int byteLength = lengthOfByteStream(m); 694 byte[] msg= new byte[byteLength]; 695 log.debug("copying message, length = {}", byteLength); 696 // add header 697 int offset = addHeaderToOutput(msg, m); 698 699 // add data content 700 int len = m.getNumDataElements(); 701 log.debug("copying data to message, length = {}", len); 702 if (len > byteLength) { // happens somehow 703 log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len); 704 } 705 for (int i = 0; (i < len && i < byteLength); i++) { 706 msg[i + offset] = (byte) m.getElement(i); 707 } 708 // add trailer 709 addTrailerToOutput(msg, len + offset, m); 710 // and stream the bytes 711 try { 712 if (ostream != null) { 713 if (log.isDebugEnabled()) { 714 StringBuilder f = new StringBuilder(); 715 for (int i = 0; i < msg.length; i++) { 716 f.append(String.format("%02X ",0xFF & msg[i])); 717 } 718 log.debug("formatted message: {}", f.toString() ); 719 } 720 while (m.getRetries() >= 0) { 721 if (portReadyToSend(controller)) { 722 ostream.write(msg); 723 ostream.flush(); 724 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 725 break; 726 } else if (m.getRetries() >= 0) { 727 log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries()); 728 m.setRetries(m.getRetries() - 1); 729 try { 730 synchronized (xmtRunnable) { 731 xmtRunnable.wait(m.getTimeout()); 732 } 733 } catch (InterruptedException e) { 734 Thread.currentThread().interrupt(); // retain if needed later 735 log.error("retry wait interrupted"); 736 } 737 } else { 738 log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg)); 739 } 740 } 741 } else { // ostream is null 742 // no stream connected 743 connectionWarn(); 744 } 745 } catch (IOException | RuntimeException e) { 746 // TODO Currently there's no port recovery if an exception occurs 747 // must restart JMRI to clear xmtException. 748 xmtException = true; 749 portWarn(e); 750 } 751 } 752 753 protected void connectionWarn() { 754 log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception()); 755 } 756 757 protected void portWarn(Exception e) { 758 log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e); 759 } 760 761 protected boolean connectionError = false; 762 763 protected void portWarnTCP(Exception e) { 764 log.warn("Exception java net: ", e); 765 connectionError = true; 766 } 767 // methods to connect/disconnect to a source of data in an AbstractPortController 768 769 public AbstractPortController controller = null; 770 771 public boolean status() { 772 return (ostream != null && istream != null); 773 } 774 775 protected volatile Thread xmtThread = null; 776 protected volatile Thread rcvThread = null; 777 778 protected volatile Runnable xmtRunnable = null; 779 780 /** 781 * Make connection to an existing PortController object. 782 * 783 * @param p the PortController 784 */ 785 public void connectPort(AbstractPortController p) { 786 rcvException = false; 787 connectionError = false; 788 xmtException = false; 789 threadStopRequest = false; 790 try { 791 istream = p.getInputStream(); 792 ostream = p.getOutputStream(); 793 if (controller != null) { 794 log.warn("connectPort: connect called while connected"); 795 } else { 796 log.debug("connectPort invoked"); 797 } 798 controller = p; 799 // and start threads 800 xmtThread = jmri.util.ThreadingUtil.newThread( 801 xmtRunnable = new Runnable() { 802 @Override 803 public void run() { 804 try { 805 transmitLoop(); 806 } catch (Throwable e) { 807 if (!threadStopRequest) { 808 log.error("Transmit thread terminated prematurely by: {}", e, e); 809 } 810 // see http://docs.oracle.com/javase/7/docs/api/java/lang/ThreadDeath.html 811 // ThreadDeath must be thrown per Java API Javadocs 812 // 813 // The type ThreadDeath has been deprecated since version 20 and marked for removal 814 // and the warning cannot be suppressed in Java 21. But external libraries might 815 // throw the exception outside of JMRI control. So check the name of the exception 816 // instead of using "instanceof". 817 if ("java.lang.ThreadDeath".equals(e.getClass().getName())) { 818 throw e; 819 } 820 } 821 } 822 }); 823 824 String[] packages = this.getClass().getName().split("\\."); 825 xmtThread.setName( 826 (packages.length>=2 ? packages[packages.length-2]+"." :"") 827 +(packages.length>=1 ? packages[packages.length-1] :"") 828 +" Transmit thread"); 829 830 xmtThread.setDaemon(true); 831 xmtThread.setPriority(Thread.MAX_PRIORITY-1); //bump up the priority 832 xmtThread.start(); 833 834 rcvThread = jmri.util.ThreadingUtil.newThread( 835 new Runnable() { 836 @Override 837 public void run() { 838 receiveLoop(); 839 } 840 }); 841 rcvThread.setName( 842 (packages.length>=2 ? packages[packages.length-2]+"." :"") 843 +(packages.length>=1 ? packages[packages.length-1] :"") 844 +" Receive thread"); 845 846 rcvThread.setPriority(Thread.MAX_PRIORITY); //bump up the priority 847 rcvThread.setDaemon(true); 848 rcvThread.start(); 849 850 } catch (RuntimeException e) { 851 log.error("Failed to start up communications. Error was: ", e); 852 log.debug("Full trace:", e); 853 } 854 } 855 856 /** 857 * Get the port name for this connection from the TrafficController. 858 * 859 * @return the name of the port 860 */ 861 public String getPortName() { 862 return controller.getCurrentPortName(); 863 } 864 865 /** 866 * Break connection to existing PortController object. Once broken, attempts 867 * to send via "message" member will fail. 868 * 869 * @param p the PortController 870 */ 871 public void disconnectPort(AbstractPortController p) { 872 istream = null; 873 ostream = null; 874 if (controller != p) { 875 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 876 } 877 controller = null; 878 threadStopRequest = true; 879 } 880 881 /** 882 * Check if PortController object can be sent to. 883 * 884 * @param p the PortController 885 * @return true if ready, false otherwise May throw an Exception. 886 */ 887 public boolean portReadyToSend(AbstractPortController p) { 888 if (p != null && !xmtException && !rcvException) { 889 return true; 890 } else { 891 return false; 892 } 893 } 894 895 // data members to hold the streams 896 protected DataInputStream istream = null; 897 protected OutputStream ostream = null; 898 899 protected boolean rcvException = false; 900 901 protected int maxRcvExceptionCount = 100; 902 903 /** 904 * Handle incoming characters. This is a permanent loop, looking for input 905 * messages in character form on the stream connected to the PortController 906 * via {@link #connectPort(AbstractPortController)}. 907 * <p> 908 * Each turn of the loop is the receipt of a single message. 909 */ 910 public void receiveLoop() { 911 log.debug("receiveLoop starts in {}", this); 912 int errorCount = 0; 913 while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception 914 try { 915 handleOneIncomingReply(); 916 errorCount = 0; 917 } catch (java.io.InterruptedIOException e) { 918 // related to InterruptedException, catch first 919 break; 920 } catch (IOException e) { 921 rcvException = true; 922 reportReceiveLoopException(e); 923 break; 924 } catch (RuntimeException e1) { 925 log.error("Exception in receive loop: {}", e1.toString(), e1); 926 errorCount++; 927 if (errorCount == maxRcvExceptionCount) { 928 rcvException = true; 929 reportReceiveLoopException(e1); 930 } 931 } 932 } 933 if (!threadStopRequest) { // if e.g. unexpected end 934 ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN); 935 log.debug("Exit from rcv loop in {}", this.getClass()); 936 log.info("Exiting receive loop"); 937 recovery(); // see if you can restart 938 } 939 } 940 941 /** 942 * Disconnect and reset the current PortController. 943 * Invoked at abnormal ending of receiveLoop. 944 */ 945 protected final void recovery() { 946 AbstractPortController adapter = controller; 947 disconnectPort(controller); 948 adapter.recover(); 949 } 950 951 /** 952 * Report an error on the receive loop. Separated so tests can suppress, even 953 * though message is asynchronous. 954 * @param e Exception encountered at lower level to trigger error, or null 955 */ 956 protected void reportReceiveLoopException(Exception e) { 957 log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e); 958 jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN); 959 if (controller instanceof AbstractNetworkPortController) { 960 portWarnTCP(e); 961 } 962 } 963 964 protected abstract AbstractMRReply newReply(); 965 966 protected abstract boolean endOfMessage(AbstractMRReply r); 967 968 /** 969 * Dummy routine, to be filled by protocols that have to skip some 970 * start-of-message characters. 971 * @param istream input source 972 * @throws IOException from underlying operations 973 */ 974 protected void waitForStartOfReply(DataInputStream istream) throws IOException { 975 } 976 977 /** 978 * Read a single byte, protecting against various timeouts, etc. 979 * <p> 980 * When a port is set to have a receive timeout, some will return 981 * zero bytes, an EOFException or a InterruptedIOException at the end of the timeout. 982 * In that case, the read() 983 * should be repeated to get the next real character. 984 * 985 * @param istream stream to read 986 * @return the byte read 987 * @throws java.io.IOException if unable to read 988 */ 989 protected byte readByteProtected(DataInputStream istream) throws IOException { 990 if (istream == null) { 991 throw new IOException("Input Stream NULL when reading"); 992 } 993 while (true) { // loop will repeat until character found 994 int nchars; 995 // The istream should be configured so that the following 996 // read(..) call only blocks for a short time, e.g. 100msec, if no 997 // data is available. It's OK if it 998 // throws e.g. java.io.InterruptedIOException 999 // in that case, as the calling loop should just go around 1000 // and request input again. This semi-blocking behavior will 1001 // let the terminateThreads() method end this thread cleanly. 1002 nchars = istream.read(rcvBuffer, 0, 1); 1003 if (nchars == -1) { 1004 // No more bytes can be read from the channel 1005 throw new IOException("Connection not terminated normally"); 1006 } 1007 if (nchars > 0) { 1008 return rcvBuffer[0]; 1009 } 1010 } 1011 } 1012 1013 // Defined this way to reduce new object creation 1014 private byte[] rcvBuffer = new byte[1]; 1015 1016 /** 1017 * Get characters from the input source, and file a message. 1018 * <p> 1019 * Returns only when the message is complete. 1020 * <p> 1021 * Only used in the Receive thread. 1022 * <p> 1023 * Handles timeouts on read by ignoring zero-length reads. 1024 * 1025 * @param msg message to fill 1026 * @param istream character source. 1027 * @throws IOException when presented by the input source. 1028 */ 1029 protected void loadChars(AbstractMRReply msg, DataInputStream istream) 1030 throws IOException { 1031 int i; 1032 for (i = 0; i < msg.maxSize(); i++) { 1033 byte char1 = readByteProtected(istream); 1034 log.trace("char: {} i: {}",(char1&0xFF),i); 1035 // if there was a timeout, flush any char received and start over 1036 if (flushReceiveChars) { 1037 log.warn("timeout flushes receive buffer: {}", msg); 1038 msg.flush(); 1039 i = 0; // restart 1040 flushReceiveChars = false; 1041 } 1042 if (canReceive()) { 1043 msg.setElement(i, char1); 1044 if (endOfMessage(msg)) { 1045 break; 1046 } 1047 } else { 1048 i--; // flush char 1049 log.error("unsolicited character received: {}", Integer.toHexString(char1)); 1050 } 1051 } 1052 } 1053 1054 /** 1055 * Override in the system specific code if necessary 1056 * 1057 * @return true if it is okay to buffer receive characters into a reply 1058 * message. When false, discard char received 1059 */ 1060 protected boolean canReceive() { 1061 return true; 1062 } 1063 1064 private int retransmitCount = 0; 1065 1066 /** 1067 * Executes a reply distribution action on the appropriate thread for JMRI. 1068 * @param r a runnable typically encapsulating a MRReply and the iteration code needed to 1069 * send it to all the listeners. 1070 */ 1071 protected void distributeReply(Runnable r) { 1072 try { 1073 if (synchronizeRx) { 1074 SwingUtilities.invokeAndWait(r); 1075 } else { 1076 SwingUtilities.invokeLater(r); 1077 } 1078 } catch (InterruptedException ie) { 1079 if (threadStopRequest) return; 1080 log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString()); 1081 } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) { 1082 log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString()); 1083 return; 1084 } 1085 log.debug("dispatch thread invoked"); 1086 } 1087 1088 /** 1089 * Handle each reply when complete. 1090 * <p> 1091 * (This is public for testing purposes) Runs in the "Receive" thread. 1092 * 1093 * @throws java.io.IOException on error. 1094 */ 1095 public void handleOneIncomingReply() throws IOException { 1096 // we sit in this until the message is complete, relying on 1097 // threading to let other stuff happen 1098 1099 // Create message off the right concrete class 1100 AbstractMRReply msg = newReply(); 1101 1102 // wait for start if needed 1103 waitForStartOfReply(istream); 1104 1105 // message exists, now fill it 1106 loadChars(msg, istream); 1107 1108 if (threadStopRequest) return; 1109 1110 // message is complete, dispatch it !! 1111 replyInDispatch = true; 1112 log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState); 1113 1114 // forward the message to the registered recipients, 1115 // which includes the communications monitor 1116 // return a notification via the Swing event queue to ensure proper thread 1117 Runnable r = new RcvNotifier(msg, mLastSender, this); 1118 distributeReply(r); 1119 1120 if (!msg.isUnsolicited()) { 1121 // effect on transmit: 1122 switch (mCurrentState) { 1123 case WAITMSGREPLYSTATE: { 1124 // check to see if the response was an error message we want 1125 // to automatically handle by re-queueing the last sent 1126 // message, otherwise go on to the next message 1127 if (msg.isRetransmittableErrorMsg()) { 1128 log.error("Automatic Recovery from Error Message: {}. Retransmitted {} times.", msg, retransmitCount); 1129 synchronized (xmtRunnable) { 1130 mCurrentState = AUTORETRYSTATE; 1131 if (retransmitCount > 0) { 1132 try { 1133 xmtRunnable.wait(retransmitCount * 100L); 1134 } catch (InterruptedException e) { 1135 Thread.currentThread().interrupt(); // retain if needed later 1136 } 1137 } 1138 replyInDispatch = false; 1139 xmtRunnable.notify(); 1140 retransmitCount++; 1141 } 1142 } else { 1143 // update state, and notify to continue 1144 synchronized (xmtRunnable) { 1145 mCurrentState = NOTIFIEDSTATE; 1146 replyInDispatch = false; 1147 xmtRunnable.notify(); 1148 retransmitCount = 0; 1149 } 1150 } 1151 break; 1152 } 1153 case WAITREPLYINPROGMODESTATE: { 1154 // entering programming mode 1155 mCurrentMode = PROGRAMINGMODE; 1156 replyInDispatch = false; 1157 1158 // check to see if we need to delay to allow decoders to become 1159 // responsive 1160 int warmUpDelay = enterProgModeDelayTime(); 1161 if (warmUpDelay != 0) { 1162 try { 1163 synchronized (xmtRunnable) { 1164 xmtRunnable.wait(warmUpDelay); 1165 } 1166 } catch (InterruptedException e) { 1167 Thread.currentThread().interrupt(); // retain if needed later 1168 } 1169 } 1170 // update state, and notify to continue 1171 synchronized (xmtRunnable) { 1172 mCurrentState = OKSENDMSGSTATE; 1173 xmtRunnable.notify(); 1174 } 1175 break; 1176 } 1177 case WAITREPLYINNORMMODESTATE: { 1178 // entering normal mode 1179 mCurrentMode = NORMALMODE; 1180 replyInDispatch = false; 1181 // update state, and notify to continue 1182 synchronized (xmtRunnable) { 1183 mCurrentState = OKSENDMSGSTATE; 1184 xmtRunnable.notify(); 1185 } 1186 break; 1187 } 1188 default: { 1189 replyInDispatch = false; 1190 if (allowUnexpectedReply) { 1191 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg); 1192 synchronized (xmtRunnable) { 1193 // The transmit thread sometimes gets stuck 1194 // when unexpected replies are received. Notify 1195 // it to clear the block without a timeout. 1196 // (do not change the current state) 1197 //if(mCurrentState!=IDLESTATE) 1198 xmtRunnable.notify(); 1199 } 1200 } else { 1201 unexpectedReplyStateError(mCurrentState, msg.toString()); 1202 } 1203 } 1204 } 1205 // Unsolicited message 1206 } else { 1207 log.debug("Unsolicited Message Received {}", msg); 1208 1209 replyInDispatch = false; 1210 } 1211 } 1212 1213 /** 1214 * Log an error message for a message received in an unexpected state. 1215 * @param State message state. 1216 * @param msgString message string. 1217 */ 1218 protected void unexpectedReplyStateError(int State, String msgString) { 1219 String[] packages = this.getClass().getName().split("\\."); 1220 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 1221 +(packages.length>=1 ? packages[packages.length-1] :""); 1222 log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name); 1223 } 1224 1225 /** 1226 * for testing purposes, let us be able to find out 1227 * what the last sender was. 1228 * @return last sender, mLastSender. 1229 */ 1230 public AbstractMRListener getLastSender() { 1231 return mLastSender; 1232 } 1233 1234 protected void terminate() { 1235 log.debug("Cleanup Starts"); 1236 if (ostream == null) { 1237 return; // no connection established 1238 } 1239 AbstractMRMessage modeMsg = enterNormalMode(); 1240 if (modeMsg != null) { 1241 modeMsg.setRetries(100); // set the number of retries 1242 // high, just in case the interface 1243 // is busy when we try to send 1244 forwardToPort(modeMsg, null); 1245 // wait for reply 1246 try { 1247 if (xmtRunnable != null) { 1248 synchronized (xmtRunnable) { 1249 xmtRunnable.wait(modeMsg.getTimeout()); 1250 } 1251 } 1252 } catch (InterruptedException e) { 1253 Thread.currentThread().interrupt(); // retain if needed later 1254 log.error("transmit interrupted"); 1255 } 1256 } 1257 } 1258 1259 /** 1260 * Internal class to remember the Reply object and destination listener with 1261 * a reply is received. 1262 */ 1263 protected static class RcvNotifier implements Runnable { 1264 1265 AbstractMRReply mMsg; 1266 AbstractMRListener mDest; 1267 AbstractMRTrafficController mTc; 1268 1269 public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1270 AbstractMRTrafficController pTc) { 1271 mMsg = pMsg; 1272 mDest = pDest; 1273 mTc = pTc; 1274 } 1275 1276 @Override 1277 public void run() { 1278 log.debug("Delayed rcv notify starts"); 1279 mTc.notifyReply(mMsg, mDest); 1280 } 1281 } // end RcvNotifier 1282 1283 // allow creation of object outside package 1284 protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1285 AbstractMRTrafficController pTc) { 1286 return new RcvNotifier(pMsg, pDest, pTc); 1287 } 1288 1289 /** 1290 * Internal class to remember the Message object and destination listener 1291 * when a message is queued for notification. 1292 */ 1293 protected static class XmtNotifier implements Runnable { 1294 1295 AbstractMRMessage mMsg; 1296 AbstractMRListener mDest; 1297 AbstractMRTrafficController mTc; 1298 1299 public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest, 1300 AbstractMRTrafficController pTc) { 1301 mMsg = pMsg; 1302 mDest = pDest; 1303 mTc = pTc; 1304 } 1305 1306 @Override 1307 public void run() { 1308 log.debug("Delayed xmt notify starts"); 1309 mTc.notifyMessage(mMsg, mDest); 1310 } 1311 } // end XmtNotifier 1312 1313 /** 1314 * Terminate the receive and transmit threads. 1315 * <p> 1316 * This is intended to be used only by testing subclasses. 1317 */ 1318 public void terminateThreads() { 1319 threadStopRequest = true; 1320 if (xmtThread != null) { 1321 xmtThread.interrupt(); 1322 try { 1323 xmtThread.join(150); 1324 } catch (InterruptedException ie){ 1325 // interrupted during cleanup. 1326 } 1327 } 1328 1329 if (rcvThread != null) { 1330 rcvThread.interrupt(); 1331 try { 1332 rcvThread.join(150); 1333 } catch (InterruptedException ie){ 1334 // interrupted during cleanup. 1335 } 1336 } 1337 // we also need to remove the shutdown task. 1338 InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask); 1339 } 1340 1341 /** 1342 * Flag that threads should terminate as soon as they can. 1343 */ 1344 protected volatile boolean threadStopRequest = false; 1345 1346 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class); 1347 1348}