001package jmri.jmrix.dcc4pc; 002 003import java.io.DataInputStream; 004import java.lang.reflect.InvocationTargetException; 005import java.util.Calendar; 006 007import jmri.jmrix.AbstractMRListener; 008import jmri.jmrix.AbstractMRMessage; 009import jmri.jmrix.AbstractMRReply; 010import jmri.jmrix.AbstractMRTrafficController; 011import jmri.jmrix.dcc4pc.serialdriver.SerialDriverAdapter; 012 013import org.slf4j.Logger; 014import org.slf4j.LoggerFactory; 015 016/** 017 * Converts Stream-based I/O to/from DCC4PC messages. The "Dcc4PcInterface" side 018 * sends/receives message objects. 019 * <p> 020 * The connection to a Dcc4PcPortController is via a pair of *Streams, which 021 * then carry sequences of characters for transmission. Note that this 022 * processing is handled in an independent thread. 023 * <p> 024 * This handles the state transitions, based on the necessary state in each 025 * message. 026 * 027 * @author Bob Jacobsen Copyright (C) 2001 028 */ 029public class Dcc4PcTrafficController extends AbstractMRTrafficController implements Dcc4PcInterface { 030 031 /** 032 * Create a new DccPcTrafficController instance. 033 */ 034 public Dcc4PcTrafficController() { 035 super(); 036 if (log.isDebugEnabled()) { 037 log.debug("creating a new Dcc4PcTrafficController object"); 038 } 039 this.setAllowUnexpectedReply(false); 040 } 041 042 public void setAdapterMemo(Dcc4PcSystemConnectionMemo memo) { 043 adaptermemo = memo; 044 } 045 046 Dcc4PcSystemConnectionMemo adaptermemo; 047 048 @Override 049 public synchronized void addDcc4PcListener(Dcc4PcListener l) { 050 this.addListener(l); 051 } 052 053 @Override 054 public synchronized void removeDcc4PcListener(Dcc4PcListener l) { 055 this.removeListener(l); 056 } 057 058 public static final int RETRIEVINGDATA = 100; 059 060 /** 061 * Forward a Dcc4PcMessage to all registered Dcc4PcInterface listeners. 062 */ 063 @Override 064 protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) { 065 ((Dcc4PcListener) client).message((Dcc4PcMessage) m); 066 } 067 068 /** 069 * Forward a Dcc4PcReply to all registered Dcc4PcInterface listeners. 070 */ 071 @Override 072 protected void forwardReply(AbstractMRListener client, AbstractMRReply r) { 073 ((Dcc4PcListener) client).reply((Dcc4PcReply) r); 074 } 075 076 @Override 077 protected AbstractMRMessage pollMessage() { 078 return null; 079 } 080 081 @Override 082 protected AbstractMRListener pollReplyHandler() { 083 return null; 084 } 085 086 /** 087 * Forward a preformatted message to the actual interface. 088 */ 089 @Override 090 public void sendDcc4PcMessage(Dcc4PcMessage m, Dcc4PcListener reply) { 091 sendMessage(m, reply); 092 } 093 094 protected boolean unsolicitedSensorMessageSeen = false; 095 096 //Dcc4Pc doesn't support this function. 097 @Override 098 protected AbstractMRMessage enterProgMode() { 099 return Dcc4PcMessage.getProgMode(); 100 } 101 102 //Dcc4Pc doesn't support this function! 103 @Override 104 protected AbstractMRMessage enterNormalMode() { 105 return Dcc4PcMessage.getExitProgMode(); 106 } 107 108 @Override 109 protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) { 110 } 111 112 Dcc4PcMessage mLastMessage; //Last message requested with a reply listener ie from external methods 113 Dcc4PcMessage mLastSentMessage; //Last message actually sent from within the code, ie getResponse. 114 115 @Override 116 synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 117 if (log.isDebugEnabled()) { 118 log.debug("forwardToPort message: [{}]", m); 119 } 120 if (port == null) { 121 return; 122 } 123 // remember who sent this 124 mLastSender = reply; 125 mLastMessage = (Dcc4PcMessage) m; 126 127 // forward the message to the registered recipients, 128 // which includes the communications monitor, except the sender. 129 // Schedule notification via the Swing event queue to ensure order 130 if (!mLastMessage.isGetResponse()) { 131 //Do not forward on the get response packets, saves filling up the monitors with chaff 132 Runnable r = new XmtNotifier(m, mLastSender, this); 133 javax.swing.SwingUtilities.invokeLater(r); 134 } 135 forwardToPort(m); 136 137 } 138 139 //this forward to port is also used internally for repeating commands. 140 private void forwardToPort(AbstractMRMessage m) { 141 mLastSentMessage = (Dcc4PcMessage) m; 142 // stream to port in single write, as that's needed by serial 143 byte msg[] = new byte[lengthOfByteStream(m)]; 144 145 // add data content 146 int len = m.getNumDataElements(); 147 for (int i = 0; i < len; i++) { 148 msg[i] = (byte) m.getElement(i); 149 } 150 151 try { 152 if (ostream != null) { 153 if (log.isDebugEnabled()) { 154 StringBuilder f = new StringBuilder(); 155 for (int i = 0; i < msg.length; i++) { 156 f.append(Integer.toHexString(0xFF & msg[i])); 157 f.append(" "); 158 } 159 log.debug("formatted message: {}", f); 160 } 161 while (m.getRetries() >= 0) { 162 if (portReadyToSend(controller)) { 163 port.setDTR(true); 164 ostream.write(msg); 165 try { 166 Thread.sleep(20); 167 } catch (InterruptedException ex) { 168 Thread.currentThread().interrupt(); 169 } catch (Exception ex) { 170 log.warn("sendMessage: Exception: {}", ex.toString()); 171 } 172 ostream.flush(); 173 port.setDTR(false); 174 break; 175 } else if (m.getRetries() >= 0) { 176 log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries()); 177 m.setRetries(m.getRetries() - 1); 178 try { 179 synchronized (xmtRunnable) { 180 xmtRunnable.wait(m.getTimeout()); 181 } 182 } catch (InterruptedException e) { 183 Thread.currentThread().interrupt(); // retain if needed later 184 log.error("retry wait interrupted"); 185 } 186 } else { 187 log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg)); 188 } 189 } 190 } else { 191 // ostream is null 192 // no stream connected 193 connectionWarn(); 194 } 195 } catch (java.io.IOException | RuntimeException e) { 196 // TODO Currently there's no port recovery if an exception occurs 197 // must restart JMRI to clear xmtException. 198 xmtException = true; 199 portWarn(e); 200 } 201 } 202 jmri.jmrix.purejavacomm.SerialPort port; 203 204 @Override 205 public void connectPort(jmri.jmrix.AbstractPortController p) { 206 207 super.connectPort(p); 208 port = ((SerialDriverAdapter) controller).getSerialPort(); 209 210 } 211 212 @Override 213 protected AbstractMRReply newReply() { 214 Dcc4PcReply reply = new Dcc4PcReply(); 215 return reply; 216 } 217 218 // for now, receive always OK 219 @Override 220 protected boolean canReceive() { 221 return true; 222 } 223 224 @Override 225 protected boolean endOfMessage(AbstractMRReply msg) { 226 if (port.isDSR()) { 227 return false; 228 } 229 try { 230 if (controller.getInputStream().available() > 0) { 231 if (port.isRI()) { 232 log.debug("??? Ringing true ???"); 233 } 234 return false; 235 } 236 237 //log.debug("No more input available " + port.isDSR()); 238 if (port.isRI()) { 239 log.debug("??? Ringing true ???"); 240 } 241 return true; 242 } catch (java.io.IOException ex) { 243 log.error("IO Exception{}", ex.toString()); 244 } 245 return !port.isDSR(); 246 } 247 248 @Override 249 protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) { 250 if(l != null){ 251 ((Dcc4PcListener) l).handleTimeout((Dcc4PcMessage) msg); 252 } 253 super.handleTimeout(msg, l); 254 } 255 256 Dcc4PcReply lastIncomplete; 257 boolean waitingForMore = false; 258 boolean loading = false; 259 260 final int GETMOREDATA = 0x01; 261 262 /** 263 * Handle each reply when complete. 264 * <p> 265 * (This is public for testing purposes) Runs in the "Receive" thread. 266 * 267 */ 268 @Override 269 public void handleOneIncomingReply() throws java.io.IOException { 270 // we sit in this until the message is complete, relying on 271 // threading to let other stuff happen 272 273 // Create message off the right concrete class 274 AbstractMRReply msg = newReply(); 275 276 // message exists, now fill it 277 loadChars(msg, istream); 278 if (mLastSentMessage != null) { 279 ((Dcc4PcReply)msg).setOriginalRequest(mLastMessage); 280 //log.debug(mLastMessage.getElement(0)); 281 if (mLastSentMessage.isForChildBoard()) { 282 if (log.isDebugEnabled()) { 283 log.debug("This is a message for a child board {}", ((Dcc4PcReply) msg).toHexString()); 284 log.debug("Originate {}", mLastMessage.toString()); 285 } 286 if ((mLastSentMessage.getNumDataElements() - 1) == msg.getElement(1)) { 287 log.debug("message lengths match"); 288 waitingForMore = true; 289 try { 290 Thread.sleep(10); 291 } catch (InterruptedException ex) { 292 log.debug("InterruptedException", ex); 293 } 294 //log.debug("We do not forward the response to the listener as it has not been formed"); 295 lastIncomplete = null; 296 forwardToPort(Dcc4PcMessage.getResponse()); 297 298 return; 299 } else { 300 if (log.isDebugEnabled()) { 301 log.debug("Not all of the command was sent, we need to figure out a way to resend the bits"); 302 log.debug("Original Message length {}", mLastSentMessage.getNumDataElements()); 303 log.debug("What CID has procced in size {}", (byte) msg.getElement(1)); 304 log.debug("Reply is in error {}", ((Dcc4PcReply) msg).toHexString()); 305 } 306 } 307 } else if (mLastSentMessage.getElement(0) == 0x0C) { 308 if (log.isDebugEnabled()) { 309 log.debug("last message was a get response {}", ((Dcc4PcReply) msg).toHexString()); 310 } 311 if (msg.getElement(0) == Dcc4PcReply.SUCCESS) { 312 ((Dcc4PcReply) msg).strip(); 313 if (lastIncomplete != null) { 314 //log.debug("Need to add the new reply to this message"); 315 //log.debug("existing : " + lastIncomplete.toHexString()); 316 317 //Append this message to the last incomplete message 318 if (msg.getNumDataElements() != 0) { 319 int iOrig = lastIncomplete.getNumDataElements(); 320 int iNew = 0; 321 while (iNew < msg.getNumDataElements()) { 322 lastIncomplete.setElement(iOrig, msg.getElement(iNew)); 323 iOrig++; 324 iNew++; 325 } 326 } 327 //set the last incomplete message as the one to return 328 log.debug("Reply set as lastIncomplete"); 329 msg = lastIncomplete; 330 } 331 ((Dcc4PcReply) msg).setError(false); 332 ((Dcc4PcReply)msg).setOriginalRequest(mLastMessage); 333 lastIncomplete = null; 334 waitingForMore = false; 335 mLastMessage = null; 336 mLastSentMessage = null; 337 } else if (msg.getElement(0) == Dcc4PcReply.INCOMPLETE) { 338 waitingForMore = true; 339 ((Dcc4PcReply) msg).strip(); 340 if (lastIncomplete != null) { 341 //Append this message to the last incomplete message 342 if (msg.getNumDataElements() != 0) { 343 int iOrig = lastIncomplete.getNumDataElements(); 344 int iNew = 0; 345 while (iNew < msg.getNumDataElements()) { 346 lastIncomplete.setElement(iOrig, msg.getElement(iNew)); 347 iOrig++; 348 iNew++; 349 } 350 } 351 352 } else if (msg.getNumDataElements() > 1) { 353 lastIncomplete = (Dcc4PcReply) msg; 354 } 355 //We do not forward the response to the listener as it has not been formed 356 forwardToPort(Dcc4PcMessage.getResponse()); 357 358 return; 359 360 } else { 361 log.debug("Reply is an error mesage"); 362 ((Dcc4PcReply) msg).setError(true); 363 mLastMessage.setRetries(mLastMessage.getRetries() - 1); 364 if (mLastMessage.getRetries() >= 0) { 365 synchronized (xmtRunnable) { 366 mCurrentState = AUTORETRYSTATE; 367 replyInDispatch = false; 368 xmtRunnable.notify(); 369 } 370 return; 371 } 372 } 373 } 374 } else { 375 log.debug("Last message sent was null {}", ((Dcc4PcReply) msg).toHexString()); 376 } 377 378 // message is complete, dispatch it !! 379 replyInDispatch = true; 380 if (log.isDebugEnabled()) { 381 log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState); 382 } 383 // forward the message to the registered recipients, 384 // which includes the communications monitor 385 // return a notification via the Swing event queue to ensure proper thread 386 Runnable r = newRcvNotifier(msg, mLastSender, this); 387 try { 388 javax.swing.SwingUtilities.invokeAndWait(r); 389 } catch (InterruptedException | InvocationTargetException e) { 390 log.error("Unexpected exception in invokeAndWait:", e); 391 } 392 393 if (log.isDebugEnabled()) { 394 log.debug("dispatch thread invoked"); 395 } 396 if (!msg.isUnsolicited()) { 397 // effect on transmit: 398 switch (mCurrentState) { 399 case WAITMSGREPLYSTATE: { 400 // check to see if the response was an error message we want 401 // to automatically handle by re-queueing the last sent 402 // message, otherwise go on to the next message 403 if (msg.isRetransmittableErrorMsg()) { 404 if (log.isDebugEnabled()) { 405 log.debug("Automatic Recovery from Error Message: {}", msg.toString()); 406 } 407 synchronized (xmtRunnable) { 408 mCurrentState = AUTORETRYSTATE; 409 replyInDispatch = false; 410 xmtRunnable.notify(); 411 } 412 } else { 413 // update state, and notify to continue 414 synchronized (xmtRunnable) { 415 mCurrentState = NOTIFIEDSTATE; 416 replyInDispatch = false; 417 xmtRunnable.notify(); 418 } 419 } 420 break; 421 } 422 case WAITREPLYINPROGMODESTATE: { 423 // entering programming mode 424 mCurrentMode = PROGRAMINGMODE; 425 replyInDispatch = false; 426 427 // check to see if we need to delay to allow decoders to become 428 // responsive 429 int warmUpDelay = enterProgModeDelayTime(); 430 if (warmUpDelay != 0) { 431 try { 432 synchronized (xmtRunnable) { 433 xmtRunnable.wait(warmUpDelay); 434 } 435 } catch (InterruptedException e) { 436 Thread.currentThread().interrupt(); // retain if needed later 437 } 438 } 439 // update state, and notify to continue 440 synchronized (xmtRunnable) { 441 mCurrentState = OKSENDMSGSTATE; 442 xmtRunnable.notify(); 443 } 444 break; 445 } 446 case WAITREPLYINNORMMODESTATE: { 447 // entering normal mode 448 mCurrentMode = NORMALMODE; 449 replyInDispatch = false; 450 // update state, and notify to continue 451 synchronized (xmtRunnable) { 452 mCurrentState = OKSENDMSGSTATE; 453 xmtRunnable.notify(); 454 } 455 break; 456 } 457 default: { 458 replyInDispatch = false; 459 unexpectedReplyStateError(mCurrentState,msg.toString()); 460 } 461 } 462 // Unsolicited message 463 } else { 464 if (log.isDebugEnabled()) { 465 log.debug("Unsolicited Message Received {}", msg.toString()); 466 } 467 replyInDispatch = false; 468 } 469 } 470 471 boolean normalFlushReceiveChars = false; 472 473 //Need a way to detect that the dsr has gone low. 474 @Override 475 protected void loadChars(AbstractMRReply msg, DataInputStream istream) 476 throws java.io.IOException { 477 int i; 478 readingData = false; 479 MAINGET: 480 { 481 for (i = 0; i < msg.maxSize(); i++) { 482 boolean waiting = true; 483 while (waiting) { 484 if (controller.getInputStream().available() > 0) { 485 readingData = true; 486 byte char1 = readByteProtected(istream); 487 waiting = false; 488 489 //potentially add in a flush here that is generated by the transmit after a command has been sent, but this is not an error type flush.l 490 // if there was a timeout, flush any char received and start over 491 if (flushReceiveChars) { 492 lastIncomplete = null; 493 waitingForMore = false; 494 mLastMessage = null; 495 mLastSentMessage = null; 496 readingData = false; 497 log.warn("timeout flushes receive buffer: {}", ((Dcc4PcReply) msg).toHexString()); 498 msg.flush(); 499 i = 0; // restart 500 flushReceiveChars = false; 501 waiting = true; 502 } else { 503 if (canReceive()) { 504 if (log.isDebugEnabled()) { 505 log.debug("Set data {}, {}", i, char1 & 0xff); 506 } 507 msg.setElement(i, char1); 508 waiting = false; 509 if (port.isRI()) { 510 log.debug("Ring high error"); 511 ((Dcc4PcReply) msg).setError(true); 512 break MAINGET; 513 } 514 if (endOfMessage(msg)) { 515 break MAINGET; 516 } 517 } else { 518 i--; // flush char 519 log.error("unsolicited character received: {}", Integer.toHexString(char1)); 520 } 521 } 522 } else if (!port.isDSR()) { 523 if (i == 0) { 524 waiting = true; 525 } else { 526 log.debug("We have data so will break"); 527 waiting = false; 528 break MAINGET; 529 } 530 } else { 531 //As we have no data to process we will set the readingData flag false; 532 readingData = false; 533 } 534 } 535 } 536 } 537 } 538 539 boolean readingData = false; 540 541 @Override 542 protected void transmitWait(int waitTime, int state, String InterruptMessage) { 543 // wait() can have spurious wakeup! 544 // so we protect by making sure the entire timeout time is used 545 long currentTime = Calendar.getInstance().getTimeInMillis(); 546 long endTime = currentTime + waitTime; 547 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 548 long wait = endTime - currentTime; 549 try { 550 synchronized (xmtRunnable) { 551 // Do not wait if the current state has changed since we 552 // last set it. 553 554 if (mCurrentState != state) { 555 return; 556 } 557 xmtRunnable.wait(wait); // rcvr normally ends this w state change 558 //If we are in the process of reading the data then do not time out. 559 if (readingData) { 560 endTime = endTime + 10; 561 } 562 //if we have received a packet and a seperate message has been sent to retrieve 563 //the reply we will add more time to our wait process. 564 if (waitingForMore) { 565 waitingForMore = false; 566 //if we are in the process of retrieving data, then we shall increase the endTime by 200ms. 567 endTime = endTime + 200; 568 } 569 570 } 571 } catch (InterruptedException e) { 572 Thread.currentThread().interrupt(); // retain if needed later 573 log.error("{} from {}", InterruptMessage, e.getMessage()); 574 } 575 } 576 log.debug("TIMEOUT in transmitWait, mCurrentState:{} {} port dsr {} wait time {}", mCurrentState, state, port.isDSR(), waitTime); 577 } 578 579 private final static Logger log = LoggerFactory.getLogger(Dcc4PcTrafficController.class); 580}