001package jmri.jmrix.mrc; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.Arrays; 006import java.util.Calendar; 007import java.util.Date; 008import java.util.Iterator; 009import java.util.LinkedList; 010import org.slf4j.Logger; 011import org.slf4j.LoggerFactory; 012 013/** 014 * Converts Stream-based I/O to/from Mrc messages. The "MrcInterface" side 015 * sends/receives MrcMessage objects. The connection to a MrcPortController is 016 * via a pair of *Streams, which then carry sequences of characters for 017 * transmission. 018 * <p> 019 * This is based upon the Packetizer used for LocoNet Connections due to its 020 * speed and efficiency to handle messages. This also takes some code from the 021 * AbstractMRTrafficController, when dealing with handling replies to messages 022 * sent. 023 * 024 * The MRC Command Station sends out a poll message to each handset which then 025 * has approximately 20ms to initially respond with a request. Otherwise the 026 * Command Station will poll the next handset. 027 * 028 * <p> 029 * Messages come to this via the main GUI thread, and are forwarded back to 030 * listeners in that same thread. Reception and transmission are handled in 031 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 032 * classes defined here. The thread priorities are: 033 * <ul> 034 * <li> RcvHandler - at highest available priority 035 * <li> XmtHandler - down one, which is assumed to be above the GUI 036 * <li> (everything else) 037 * </ul> 038 * <p> 039 * Some of the message formats used in this class are Copyright MRC, Inc. and 040 * used with permission as part of the JMRI project. That permission does not 041 * extend to uses in other software products. If you wish to use this code, 042 * algorithm or these message formats outside of JMRI, please contact Mrc Inc 043 * for separate permission. 044 * 045 * @author Bob Jacobsen Copyright (C) 2001 046 * @author Kevin Dickerson Copyright (C) 2014 047 * @author Ken Cameron Copyright (C) 2014 048 * 049 */ 050public class MrcPacketizer extends MrcTrafficController { 051 052 /** 053 * true if the external hardware is not echoing messages, so we must 054 */ 055 protected boolean echo = true; // echo messages here, instead of in hardware 056 057 public MrcPacketizer() { 058 } 059 060 // The methods to implement the MrcInterface 061 @Override 062 public boolean status() { 063 return (ostream != null && istream != null); 064 } 065 066 /** 067 * Synchronized list used as a transmit queue. 068 * <p> 069 * This is public to allow access from the internal class(es) when compiling 070 * with Java 1.1 071 */ 072 public LinkedList<MrcMessage> xmtList = new LinkedList<MrcMessage>(); 073 074 /** 075 * XmtHandler (a local class) object to implement the transmit thread 076 */ 077 protected Runnable xmtHandler; 078 079 /** 080 * RcvHandler (a local class) object to implement the receive thread 081 */ 082 protected Runnable rcvHandler; 083 084 /** 085 * Forward a preformatted MrcMessage to the actual interface. 086 * 087 * The message is converted to a byte array and queue for transmission 088 * 089 * @param m Message to send; 090 */ 091 @Override 092 public void sendMrcMessage(MrcMessage m) { 093 // update statistics 094 transmittedMsgCount++; 095 096 //Convert the message to a byte stream, to save doing this when the message 097 //is picked out 098 m.setByteStream(); 099 100 if (log.isDebugEnabled()) { // avoid String building if not needed 101 log.debug("queue Mrc packet: {}", m.toString()); 102 } 103 // in an atomic operation, queue the request and wake the xmit thread 104 try { 105 synchronized (xmtHandler) { 106 xmtList.addLast(m); 107 if (log.isDebugEnabled()) { // avoid String building if not needed 108 log.debug("xmt list size {}", xmtList.size()); // NOI18N 109 Iterator<MrcMessage> iterator = xmtList.iterator(); 110 while (iterator.hasNext()) { 111 log.debug(" entry: {}",iterator.next().toString()); 112 } 113 } 114 } 115 } catch (RuntimeException e) { 116 log.warn("passing to xmit: unexpected exception", e); // NOI18N 117 } 118 } 119 120 /** 121 * Implement abstract method to signal if there's a backlog of information 122 * waiting to be sent. 123 * 124 * @return true if busy, false if nothing waiting to send 125 */ 126 @Override 127 public boolean isXmtBusy() { 128 if (controller == null) { 129 return false; 130 } 131 132 return (!controller.okToSend()); 133 } 134 135 // methods to connect/disconnect to a source of data in a MrcPortController 136 // This is public to allow access from the internal class(es) when compiling with Java 1.1 137 public MrcPortController controller = null; 138 139 /** 140 * Make connection to existing MrcPortController object. 141 * 142 * @param p Port controller for connected. Save this for a later disconnect 143 * call 144 */ 145 public void connectPort(MrcPortController p) { 146 istream = p.getInputStream(); 147 ostream = p.getOutputStream(); 148 if (controller != null) { 149 log.warn("connectPort: connect called while connected"); // NOI18N 150 } 151 controller = p; 152 } 153 154 /** 155 * Break connection to existing MrcPortController object. Once broken, 156 * attempts to send via "message" member will fail. 157 * 158 * @param p previously connected port 159 */ 160 public void disconnectPort(MrcPortController p) { 161 istream = null; 162 ostream = null; 163 if (controller != p) { 164 log.warn("disconnectPort: disconnect called from non-connected MrcPortController"); // NOI18N 165 } 166 controller = null; 167 } 168 169 // data members to hold the streams. These are public so the inner classes defined here 170 // can access whem with a Java 1.1 compiler 171 public DataInputStream istream = null; 172 public OutputStream ostream = null; 173 174 //We keep a copy of the lengths here to save on time on each request later. 175 final private static int THROTTLEPACKETLENGTH = MrcPackets.getThrottlePacketLength(); 176 final private static int FUNCTIONGROUPLENGTH = MrcPackets.getFunctionPacketLength(); 177 final private static int READCVLENGTH = MrcPackets.getReadCVPacketLength(); 178 final private static int readCVReplyLength = MrcPackets.getReadCVPacketReplyLength(); 179 final private static int readDecoderAddressLength = MrcPackets.getReadDecoderAddressLength(); 180 final private static int WRITECVPROGLENGTH = MrcPackets.getWriteCVPROGPacketLength(); 181 final private static int WRITECVPOMLENGTH = MrcPackets.getWriteCVPOMPacketLength(); 182 final private static int SETCLOCKRATIOLENGTH = MrcPackets.getSetClockRatioPacketLength(); 183 final private static int SETCLOCKTIMELENGTH = MrcPackets.getSetClockTimePacketLength(); 184 final private static int setClockAMPMLength = MrcPackets.getSetClockAmPmPacketLength(); 185 final private static int powerOnLength = MrcPackets.getPowerOnPacketLength(); 186 final private static int powerOffLength = MrcPackets.getPowerOffPacketLength(); 187 188 final private static int addToConsistLength = MrcPackets.getClearConsistPacketLength(); 189 final private static int clearConsistLength = MrcPackets.getClearConsistPacketLength(); 190 final private static int routeControlLength = MrcPackets.getRouteControlPacketLength(); 191 final private static int clearRouteLength = MrcPackets.getClearRoutePacketLength(); 192 final private static int addToRouteLength = MrcPackets.getAddToRoutePacketLength(); 193 final private static int accessoryLength = MrcPackets.getAccessoryPacketLength(); 194 195 /** 196 * Read a single byte, protecting against various timeouts, etc. 197 * <p> 198 * When a port is set to have a receive timeout (via the 199 * enableReceiveTimeout() method), some will return zero bytes or an 200 * EOFException at the end of the timeout. In that case, the read should be 201 * repeated to get the next real character. 202 * @param istream data input stream from layout 203 * @return byte stream from interface 204 * @throws java.io.IOException from read errors 205 * 206 */ 207 protected byte readByteProtected(DataInputStream istream) throws java.io.IOException { 208 while (true) { // loop will repeat until character found 209 int nchars; 210 nchars = istream.read(rcvBuffer, 0, 1); 211 if (nchars > 0) { 212 return rcvBuffer[0]; 213 } 214 } 215 } 216 // Defined this way to reduce new object creation 217 private byte[] rcvBuffer = new byte[1]; 218 //boolean xmtWindow = false; 219 220 /** 221 * Captive class to handle incoming characters. This is a permanent loop, 222 * looking for input messages in character form on the stream connected to 223 * the MrcPortController via <code>connectPort</code>. 224 */ 225 class RcvHandler implements Runnable { 226 227 /** 228 * Remember the MrcPacketizer object 229 */ 230 MrcPacketizer trafficController; 231 232 public RcvHandler(MrcPacketizer lt) { 233 trafficController = lt; 234 } 235 236 @Override 237 public void run() { 238 int firstByte; 239 int secondByte; 240 int thirdByte; 241 while (true) { // loop permanently, program close will exit 242 try { 243 firstByte = readByteProtected(istream) & 0xFF; 244 secondByte = readByteProtected(istream) & 0xFF; 245 thirdByte = readByteProtected(istream) & 0xFF; 246 // start by looking for command - skip if bit not set or byte 1 & 3 don't match. 247 while (secondByte != 0x00 && secondByte != 0x01 || firstByte != thirdByte) { 248 if (firstByte == 0x00 && secondByte == 0x01) { 249 //Only a clock message has the first & thirdbyte different 250 break; 251 } 252 log.debug("Skipping: {} {} {}", Integer.toHexString(firstByte), Integer.toHexString(secondByte), Integer.toHexString(thirdByte)); 253 firstByte = secondByte; 254 secondByte = thirdByte; 255 thirdByte = readByteProtected(istream) & 0xFF; 256 } 257 final Date time = new Date(); 258 log.trace(" (RcvHandler) Start message with message: {} {}", Integer.toHexString(firstByte), Integer.toHexString(secondByte)); 259 MrcMessage msg = null; 260 boolean pollForUs = false; 261 262 if (secondByte == 0x01) { 263 264 msg = new MrcMessage(6); 265 msg.setMessageClass(MrcInterface.POLL); 266 //msg.setPollMessage(); 267 if (firstByte == cabAddress) { 268 pollForUs = true; 269 } else if (mCurrentState == WAITFORCMDRECEIVED) { 270 log.debug("Missed our poll slot"); 271 synchronized (transmitLock) { 272 mCurrentState = MISSEDPOLL; 273 transmitLock.notify(); 274 } 275 } 276 if (firstByte == 0x00) { 277 msg.setMessageClass(MrcInterface.CLOCK + MrcInterface.POLL); 278 } 279 } else { 280 switch (firstByte) { 281 case 0:/* 2 No Data Poll */ 282 283 msg = new MrcMessage(4); 284 msg.setMessageClass(MrcInterface.POLL); 285 break; 286 case MrcPackets.THROTTLEPACKETCMD: 287 msg = new MrcMessage(THROTTLEPACKETLENGTH); 288 msg.setMessageClass(MrcInterface.THROTTLEINFO); 289 break; 290 //$FALL-THROUGH$ 291 case MrcPackets.FUNCTIONGROUP1PACKETCMD: 292 case MrcPackets.FUNCTIONGROUP2PACKETCMD: 293 case MrcPackets.FUNCTIONGROUP3PACKETCMD: 294 case MrcPackets.FUNCTIONGROUP4PACKETCMD: 295 case MrcPackets.FUNCTIONGROUP5PACKETCMD: 296 case MrcPackets.FUNCTIONGROUP6PACKETCMD: 297 msg = new MrcMessage(FUNCTIONGROUPLENGTH); 298 msg.setMessageClass(MrcInterface.THROTTLEINFO); 299 break; 300 case MrcPackets.READCVCMD: 301 msg = new MrcMessage(READCVLENGTH); 302 msg.setMessageClass(MrcInterface.PROGRAMMING); 303 log.debug("Read CV Cmd"); 304 break; 305 case MrcPackets.READDECODERADDRESSCMD: 306 msg = new MrcMessage(readDecoderAddressLength); 307 msg.setMessageClass(MrcInterface.PROGRAMMING); 308 break; 309 case MrcPackets.WRITECVPROGCMD: 310 msg = new MrcMessage(WRITECVPROGLENGTH); 311 msg.setMessageClass(MrcInterface.PROGRAMMING); 312 break; 313 case MrcPackets.WRITECVPOMCMD: 314 msg = new MrcMessage(WRITECVPOMLENGTH); 315 msg.setMessageClass(MrcInterface.PROGRAMMING); 316 break; 317 case MrcPackets.SETCLOCKRATIOCMD: 318 msg = new MrcMessage(SETCLOCKRATIOLENGTH); 319 msg.setMessageClass(MrcInterface.CLOCK); 320 break; 321 case MrcPackets.SETCLOCKTIMECMD: 322 msg = new MrcMessage(SETCLOCKTIMELENGTH); 323 msg.setMessageClass(MrcInterface.CLOCK); 324 break; 325 case MrcPackets.SETCLOCKAMPMCMD: 326 msg = new MrcMessage(setClockAMPMLength); 327 msg.setMessageClass(MrcInterface.CLOCK); 328 break; 329 case MrcPackets.READCVHEADERREPLYCODE: 330 msg = new MrcMessage(readCVReplyLength); 331 msg.setMessageClass(MrcInterface.PROGRAMMING); 332 synchronized (transmitLock) { 333 mCurrentState = IDLESTATE; 334 transmitLock.notify(); 335 } 336 log.debug("CV read reply"); 337 break; 338 case MrcPackets.PROGCMDSENTCODE: 339 log.debug("Gd Prog Cmd Sent"); 340 synchronized (transmitLock) { 341 mCurrentState = IDLESTATE; 342 transmitLock.notify(); 343 } 344 msg = new MrcMessage(4); 345 msg.setMessageClass(MrcInterface.PROGRAMMING); 346 break; 347 case MrcPackets.POWERONCMD: 348 mCurrentState = IDLESTATE; 349 msg = new MrcMessage(powerOnLength); 350 msg.setMessageClass(MrcInterface.POWER); 351 break; 352 case MrcPackets.POWEROFFCMD: 353 mCurrentState = IDLESTATE; 354 msg = new MrcMessage(powerOffLength); 355 msg.setMessageClass(MrcInterface.POWER); 356 break; 357 case MrcPackets.ADDTOCONSISTPACKETCMD: 358 mCurrentState = IDLESTATE; 359 msg = new MrcMessage(addToConsistLength); 360 msg.setMessageClass(MrcInterface.THROTTLEINFO); 361 break; 362 case MrcPackets.CLEARCONSISTPACKETCMD: 363 mCurrentState = IDLESTATE; 364 msg = new MrcMessage(clearConsistLength); 365 msg.setMessageClass(MrcInterface.THROTTLEINFO); 366 break; 367 case MrcPackets.ROUTECONTROLPACKETCMD: 368 mCurrentState = IDLESTATE; 369 msg = new MrcMessage(routeControlLength); 370 msg.setMessageClass(MrcInterface.TURNOUTS); 371 break; 372 case MrcPackets.CLEARROUTEPACKETCMD: 373 mCurrentState = IDLESTATE; 374 msg = new MrcMessage(clearRouteLength); 375 msg.setMessageClass(MrcInterface.TURNOUTS); 376 break; 377 case MrcPackets.ADDTOROUTEPACKETCMD: 378 mCurrentState = IDLESTATE; 379 msg = new MrcMessage(addToRouteLength); 380 msg.setMessageClass(MrcInterface.TURNOUTS); 381 break; 382 case MrcPackets.ACCESSORYPACKETCMD: 383 mCurrentState = IDLESTATE; 384 msg = new MrcMessage(accessoryLength); 385 msg.setMessageClass(MrcInterface.TURNOUTS); 386 break; 387 case MrcPackets.LOCODBLCONTROLCODE: 388 synchronized (transmitLock) { 389 mCurrentState = DOUBLELOCOCONTROL; 390 transmitLock.notify(); 391 } 392 msg = new MrcMessage(4); 393 msg.setMessageClass(MrcInterface.THROTTLEINFO); 394 break; 395 case MrcPackets.LOCOSOLECONTROLCODE: 396 mCurrentState = IDLESTATE; 397 msg = new MrcMessage(4); 398 msg.setMessageClass(MrcInterface.THROTTLEINFO); 399 synchronized (transmitLock) { 400 mCurrentState = IDLESTATE; 401 transmitLock.notify(); 402 } 403 break; 404 case MrcPackets.GOODCMDRECEIVEDCODE: //Possibly shouldn't change the state, as we wait for further confirmation. 405 if (mCurrentState == CONFIRMATIONONLY) { 406 synchronized (transmitLock) { 407 mCurrentState = IDLESTATE; 408 transmitLock.notify(); 409 } 410 } 411 msg = new MrcMessage(4); 412 break; 413 case MrcPackets.BADCMDRECEIVEDCODE: 414 mCurrentState = BADCOMMAND; 415 msg = new MrcMessage(4); 416 break; 417 default: 418 msg = new MrcMessage(4); //Unknown 419 log.debug("UNKNOWN {}", Integer.toHexString(firstByte)); // NOI18N 420 } 421 } 422 423 msg.setElement(0, firstByte); 424 msg.setElement(1, secondByte); 425 msg.setElement(2, thirdByte); 426 // message exists, now fill it 427 int len = msg.getNumDataElements(); 428 log.trace("len: {}", len); 429 for (int i = 3; i < len; i++) { 430 // check for message-blocking error 431 int b = readByteProtected(istream) & 0xFF; 432 msg.setElement(i, b); 433 log.trace("char {} is: {}", i, Integer.toHexString(b)); 434 } 435 /*Slight trade off with this we may see any transmitted message go out prior to the 436 poll message being passed to the monitor. */ 437 if (pollForUs) { 438 synchronized (xmtHandler) { 439 xmtHandler.notify(); //This will notify the xmt to send a message, even if it is only "no Data" reply 440 } 441 } 442 443 if ((msg.getMessageClass() & MrcInterface.POLL) != MrcInterface.POLL && msg.getNumDataElements() > 6) { 444 if (!msg.validCheckSum()) { 445 log.warn("Ignore Mrc packet with bad checksum: {}", msg); // NOI18N 446 throw new MrcMessageException(); 447 } else { 448 for (int i = 1; i < msg.getNumDataElements(); i += 2) { 449 if (msg.getElement(i) != 0x00) { 450 log.warn("Ignore Mrc packet with bad bit: {}", msg); // NOI18N 451 throw new MrcMessageException(); 452 } 453 } 454 } 455 } 456 // message is complete, dispatch it !! 457 { 458 log.trace("queue message for notification: {}", msg); 459 final MrcMessage thisMsg = msg; 460 final MrcPacketizer thisTc = trafficController; 461 // return a notification via the queue to ensure end 462 Runnable r = new Runnable() { 463 MrcMessage msgForLater = thisMsg; 464 MrcPacketizer myTc = thisTc; 465 466 @Override 467 public void run() { 468 myTc.notifyRcv(time, msgForLater); 469 } 470 }; 471 javax.swing.SwingUtilities.invokeLater(r); 472 } 473 // done with this one 474 } catch (MrcMessageException e) { 475 // just let it ride for now 476 log.warn("run: unexpected MrcMessageException", e); // NOI18N 477 } catch (java.io.EOFException e) { 478 // posted from idle port when enableReceiveTimeout used 479 log.trace("EOFException, is Mrc serial I/O using timeouts?"); 480 } catch (java.io.IOException e) { 481 // fired when write-end of HexFile reaches end 482 log.debug("IOException, should only happen with HexFile", e); 483 disconnectPort(controller); 484 return; 485 } // normally, we don't catch RuntimeException, but in this 486 // permanently running loop it seems wise. 487 catch (RuntimeException e) { 488 log.warn("Unknown Exception", e); // NOI18N 489 } 490 } // end of permanent loop 491 } 492 } 493 494 final static int IDLESTATE = 0x00; 495 final static int WAITFORCMDRECEIVED = 0x01; 496 final static int DOUBLELOCOCONTROL = 0x02; 497 final static int MISSEDPOLL = 0x04; 498 final static int BADCOMMAND = 0x08; 499 final static int CONFIRMATIONONLY = 0x10; 500 int mCurrentState = IDLESTATE; 501 502 int consecutiveMissedPolls = 0; 503 504 final MrcMessage noData = MrcMessage.setNoData(); 505 final byte noDataMsg[] = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00}; 506 507 /** 508 * Captive class to handle transmission 509 */ 510 class XmtHandler implements Runnable { 511 512 @Override 513 public void run() { 514 byte msg[]; 515 MrcMessage m; 516 int x = 0; 517 int state = WAITFORCMDRECEIVED; 518 while (true) { // loop permanently 519 m = noData; 520 msg = noDataMsg; 521 log.trace("check for input"); 522 synchronized (this) { 523 log.trace("start wait"); 524 //log.info("wait until we have been polled"); 525 new jmri.util.WaitHandler(this); // handle synchronization, spurious wake, interruption 526 log.trace("end wait"); 527 528 if (xmtList.size() != 0) { 529 m = xmtList.removeFirst(); 530 msg = m.getByteStream(); 531 log.debug("xmt list size after get {}", xmtList.size()); 532 log.debug("Message to send on {}", m); 533 } 534 } 535 try { 536 if (m.getMessageClass() != MrcInterface.POLL) { 537 mCurrentState = WAITFORCMDRECEIVED; 538 /* We set the current state before transmitting the message otherwise 539 the reply to the message may be received before the state is set 540 and the message will timeout and be retransmitted */ 541 if (!m.isReplyExpected()) { 542 mCurrentState = CONFIRMATIONONLY; 543 } 544 state = mCurrentState; 545 } 546 ostream.write(msg); 547 ostream.flush(); 548 messageTransmitted(m); 549 if (m.getMessageClass() != MrcInterface.POLL) { 550 if (log.isTraceEnabled()) { // avoid String building if not needed 551 log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); 552 log.trace("wait : {} : {}", m.getTimeout(), x); 553 } 554 transmitWait(m.getTimeout(), state, "transmitLoop interrupted", x); // NOI18N 555 x++; 556 } else { 557 mCurrentState = IDLESTATE; 558 } 559 560 if (mCurrentState == WAITFORCMDRECEIVED || mCurrentState == CONFIRMATIONONLY) { 561 log.debug("Timed out"); 562 if (m.getRetries() >= 0) { 563 m.setRetries(m.getRetries() - 1); 564 synchronized (this) { 565 xmtList.addFirst(m); 566 } 567 } else { 568 messageFailed(m); 569 } 570 mCurrentState = IDLESTATE; 571 consecutiveMissedPolls = 0; 572 } else if (mCurrentState == MISSEDPOLL && m.getRetries() >= 0) { 573 consecutiveMissedPolls++; 574 log.debug("Missed add to front"); 575 if (consecutiveMissedPolls < 5) { 576 synchronized (this) { 577 xmtList.addFirst(m); 578 mCurrentState = IDLESTATE; 579 if (log.isDebugEnabled()) { // avoid String building if not needed 580 log.debug("xmt list size {}", xmtList.size()); 581 Iterator<MrcMessage> iterator = xmtList.iterator(); 582 while (iterator.hasNext()) { 583 log.debug("message: {}", iterator.next().toString()); 584 } 585 } 586 } 587 } else { 588 log.warn("Message missed {} polls for message {}", consecutiveMissedPolls, m); // NOI18N 589 consecutiveMissedPolls = 0; 590 } 591 } else if (mCurrentState == DOUBLELOCOCONTROL && m.getRetries() >= 0) { 592 if (log.isDebugEnabled()) { // avoid String building if not needed 593 log.debug("Auto Retry send message added back to queue: {}", Arrays.toString(msg)); 594 } 595 m.setRetries(m.getRetries() - 1); 596 synchronized (this) { 597 xmtList.addFirst(m); 598 mCurrentState = IDLESTATE; 599 } 600 consecutiveMissedPolls = 0; 601 } else if (mCurrentState == BADCOMMAND) { 602 log.debug("Bad command sent"); 603 messageFailed(m); 604 mCurrentState = IDLESTATE; 605 consecutiveMissedPolls = 0; 606 } 607 } catch (java.io.IOException e) { 608 log.warn("sendMrcMessage: IOException", e); // NOI18N 609 } 610 } 611 } 612 } 613 614 static final Object transmitLock = new Object(); 615 616 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings( value = "SLF4J_FORMAT_SHOULD_BE_CONST", 617 justification = "passing InterruptMessage unchanged") 618 protected void transmitWait(int waitTime, int state, String InterruptMessage, int x) { 619 // wait() can have spurious wakeup! 620 // so we protect by making sure the entire timeout time is used 621 long currentTime = Calendar.getInstance().getTimeInMillis(); 622 long endTime = currentTime + waitTime; 623 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 624 long wait = endTime - currentTime; 625 try { 626 synchronized (transmitLock) { 627 // Do not wait if the current state has changed since we 628 // last set it. 629 if (mCurrentState != state) { 630 return; 631 } 632 transmitLock.wait(wait); // rcvr normally ends this w state change 633 } 634 } catch (InterruptedException e) { 635 Thread.currentThread().interrupt(); // retain if needed later 636 log.error(InterruptMessage); 637 } 638 } 639 log.debug("Timeout in transmitWait {}, mCurrentState: {} after {}", x, mCurrentState, waitTime); 640 } 641 642 protected void messageFailed(MrcMessage m) { 643 log.debug("message transmitted"); 644 if (m.getSource() == null) { 645 return; 646 } 647 // message is queued for transmit, echo it when needed 648 // return a notification via the queue to ensure end 649 javax.swing.SwingUtilities.invokeLater(new Failed(new Date(), m)); 650 651 } 652 653 static class Failed implements Runnable { 654 655 Failed(Date _timestamp, MrcMessage m) { 656 msgForLater = m; 657 timestamp = _timestamp; 658 } 659 MrcMessage msgForLater; 660 Date timestamp; 661 662 @Override 663 public void run() { 664 msgForLater.getSource().notifyFailedXmit(timestamp, msgForLater); 665 } 666 } 667 668 /** 669 * When a message is finally transmitted, forward it to listeners if echoing 670 * is needed. 671 * 672 * @param msg message to tag a transmitted message 673 */ 674 protected void messageTransmitted(MrcMessage msg) { 675 //if (debug) log.debug("message transmitted"); 676 if (!echo) { 677 return; 678 } 679 // message is queued for transmit, echo it when needed 680 // return a notification via the queue to ensure end 681 javax.swing.SwingUtilities.invokeLater(new Echo(this, new Date(), msg)); 682 } 683 684 static class Echo implements Runnable { 685 686 Echo(MrcPacketizer t, Date _timestamp, MrcMessage m) { 687 myTc = t; 688 msgForLater = m; 689 timestamp = _timestamp; 690 } 691 MrcMessage msgForLater; 692 MrcPacketizer myTc; 693 Date timestamp; 694 695 @Override 696 public void run() { 697 myTc.notifyXmit(timestamp, msgForLater); 698 } 699 } 700 701 /** 702 * Invoked at startup to start the threads needed here. 703 */ 704 public void startThreads() { 705 int priority = Thread.currentThread().getPriority(); 706 log.debug("startThreads current priority = {} max available = " + Thread.MAX_PRIORITY + " default = " + Thread.NORM_PRIORITY + " min available = " + Thread.MIN_PRIORITY, priority); // NOI18N 707 708 // make sure that the xmt priority is no lower than the current priority 709 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY : Thread.MAX_PRIORITY - 1); 710 // start the XmtHandler in a thread of its own 711 if (xmtHandler == null) { 712 xmtHandler = new XmtHandler(); 713 } 714 Thread xmtThread = new Thread(xmtHandler, "Mrc transmit handler"); // NOI18N 715 log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N 716 xmtThread.setDaemon(true); 717 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 718 xmtThread.start(); 719 720 // start the RcvHandler in a thread of its own 721 if (rcvHandler == null) { 722 rcvHandler = new RcvHandler(this); 723 } 724 Thread rcvThread = new Thread(rcvHandler, "Mrc receive handler " + Thread.MAX_PRIORITY); // NOI18N 725 rcvThread.setDaemon(true); 726 rcvThread.setPriority(Thread.MAX_PRIORITY); 727 rcvThread.start(); 728 729 } 730 731 private final static Logger log = LoggerFactory.getLogger(MrcPacketizer.class); 732}