001package jmri.jmrix.dccpp.dccppovertcp; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.BufferedReader; 005import java.io.InputStreamReader; 006import java.util.LinkedList; 007import java.util.NoSuchElementException; 008import jmri.jmrix.dccpp.DCCppCommandStation; 009import jmri.jmrix.dccpp.DCCppListener; 010import jmri.jmrix.dccpp.DCCppMessage; 011import jmri.jmrix.dccpp.DCCppNetworkPortController; 012import jmri.jmrix.dccpp.DCCppPacketizer; 013import jmri.jmrix.dccpp.DCCppReply; 014import org.slf4j.Logger; 015import org.slf4j.LoggerFactory; 016 017import javax.annotation.concurrent.GuardedBy; 018 019/** 020 * Converts Stream-based I/O to/from DCC++ messages. The "DCCppInterface" side 021 * sends/receives DCCppMessage objects. The connection to a 022 * DCCppPortnetworkController is via a pair of *Streams, which then carry 023 * sequences of characters for transmission. 024 * <p> 025 * Messages come to this via the main GUI thread, and are forwarded back to 026 * listeners in that same thread. Reception and transmission are handled in 027 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 028 * classes defined here. The thread priorities are: 029 * <ul> 030 * <li> RcvHandler - at highest available priority 031 * <li> XmtHandler - down one, which is assumed to be above the GUI 032 * <li> (everything else) 033 * </ul> 034 * 035 * @author Bob Jacobsen Copyright (C) 2001 036 * @author Alex Shepherd Copyright (C) 2003, 2006 037 * @author Mark Underwood Copyright (C) 2015 038 * 039 * Based on jmri.jmrix.loconet.loconetovertcp.LnOverTcpPacketizer 040 * 041 */ 042// TODO: Consider ditching the LocoNet-inherited "RECEIVE" and "SEND" prefixes 043// and just rely on the already-present "<" and ">" to mark start and end 044// of frame. This would pretty much make DCCppOverTCP redundant with the 045// Network Port interface to the Base Station (that is, the "host" JMRI 046// application would look just like a Network Base Station to the "client" JMRI 047// application). 048// 049// However, at minimum, this would break backward compatibility for the interface, 050// so there is that to consider. Probably best to do this sooner than later, 051// to minimize that impact. 052// 053public class DCCppOverTcpPacketizer extends DCCppPacketizer { 054 055 static final String OLD_RECEIVE_PREFIX = "RECEIVE "; 056 static final String OLD_SEND_PREFIX = "SEND"; 057 static final String RECEIVE_PREFIX = "<"; 058 static final String SEND_PREFIX = ""; // Making this an empty string on purpose. 059 static final String OLD_SERVER_VERSION_STRING = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility 060 static final String NEW_SERVER_VERSION_STRING = "VERSION DCC++ Server "; 061 062 boolean useOldPrefix = false; 063 064 protected BufferedReader istreamReader = null; 065 066 /** 067 * XmtHandler (a local class) object to implement the transmit thread 068 */ 069 @GuardedBy ("xmtHandler") 070 final protected Runnable xmtHandler; 071 072 /** 073 * RcvHandler (a local class) object to implement the receive thread 074 */ 075 protected Runnable rcvHandler; 076 077 /** 078 * Synchronized list used as a transmit queue. 079 */ 080 @GuardedBy ("xmtHandler") 081 protected LinkedList<DCCppMessage> xmtList = new LinkedList<>(); 082 083 @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification = "Only used during system initialization") 084 public DCCppOverTcpPacketizer(DCCppCommandStation cs) { 085 super(cs); // Don't need the command station (?) 086 087 xmtHandler = new XmtHandler(); 088 rcvHandler = new RcvHandler(this); 089 log.debug("DCCppOverTcpPacketizer created."); 090 } 091 092 public DCCppNetworkPortController networkController = null; 093 094 public boolean isXmtBusy() { 095 return networkController != null; 096 } 097 098 /** 099 * Make connection to existing DCCppNetworkPortController object. 100 * 101 * @param p Port networkController for connected. Save this for a later 102 * disconnect call 103 */ 104 public void connectPort(DCCppNetworkPortController p) { 105 istream = p.getInputStream(); 106 istreamReader = new BufferedReader(new InputStreamReader(istream)); 107 ostream = p.getOutputStream(); 108 if (networkController != null) { 109 log.warn("connectPort: connect called while connected"); 110 } 111 networkController = p; 112 } 113 114 /** 115 * Break connection to existing DCCppNetworkPortController object. Once broken, 116 * attempts to send via "message" member will fail. 117 * 118 * @param p previously connected port 119 */ 120 public void disconnectPort(DCCppNetworkPortController p) { 121 istream = null; 122 ostream = null; 123 if (networkController != p) { 124 log.warn("disconnectPort: disconnect called from non-connected DCCppNetworkPortController"); 125 } 126 networkController = null; 127 } 128 129 /** 130 * Forward a preformatted DCCppMessage to the actual interface. 131 * 132 * Checksum is computed and overwritten here, then the message is converted 133 * to a byte array and queue for transmission 134 * 135 * @param m Message to send; will be updated with CRC 136 */ 137 @Override 138 public void sendDCCppMessage(DCCppMessage m, DCCppListener reply) { 139 // update statistics 140 //transmittedMsgCount++; 141 142 log.debug("queue DCCpp packet: {}", m); 143 // in an atomic operation, queue the request and wake the xmit thread 144 try { 145 synchronized (xmtHandler) { 146 xmtList.addLast(m); 147 xmtHandler.notifyAll(); 148 } 149 } catch (Exception e) { 150 log.warn("passing to xmit: unexpected exception: ", e); 151 } 152 } 153 154 /** 155 * Invoked at startup to start the threads needed here. 156 */ 157 public void startThreads() { 158 int priority = Thread.currentThread().getPriority(); 159 log.debug("startThreads current priority = {} max available {} default = {} min available = {}", 160 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 161 162 // make sure that the xmt priority is no lower than the current priority 163 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 164 // start the XmtHandler in a thread of its own 165 Thread xmtThread; 166 synchronized (xmtHandler) { // never null at this point 167 xmtThread = new Thread(xmtHandler, "DCC++ transmit handler"); 168 } 169 log.debug("Xmt thread starts at priority {}", xmtpriority); 170 xmtThread.setDaemon(true); 171 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 172 xmtThread.start(); 173 174 // start the RcvHandler in a thread of its own 175 if (rcvHandler == null) { 176 rcvHandler = new RcvHandler(this); 177 } 178 Thread rcvThread = new Thread(rcvHandler, "DCC++ receive handler"); 179 rcvThread.setDaemon(true); 180 rcvThread.setPriority(Thread.MAX_PRIORITY); 181 rcvThread.start(); 182 } 183 184 /** 185 * Captive class to handle incoming characters. This is a permanent loop, 186 * looking for input messages in character form on the stream connected to 187 * the DCCppNetworkPortController via <code>connectPort</code>. 188 */ 189 class RcvHandler implements Runnable { 190 191 /** 192 * nothing to do here 193 * 194 * @param lt the DCCppOverTcpPacketizer trafficController to run over 195 */ 196 public RcvHandler(DCCppOverTcpPacketizer lt) { 197 } 198 199 // readline is deprecated, but there are no problems 200 // with multi-byte characters here. 201 @Override 202 public void run() { 203 204 String rxLine; 205 while (true) { // loop permanently, program close will exit 206 try { 207 // start by looking for a complete line 208 209 if (istreamReader == null) { 210 log.error("istreamReader not initialized!"); 211 return; 212 } 213 rxLine = istreamReader.readLine(); // Note: This uses BufferedReader for safer data handling 214 if (rxLine == null) { 215 log.warn("run: input stream returned null, exiting loop"); 216 return; 217 } 218 219 log.debug("Received: {}", rxLine); 220 221 // Legacy support. If this message is the old JMRI version 222 // handshake, flag us as in "old mode" 223 if (rxLine.startsWith(OLD_SERVER_VERSION_STRING)) { 224 useOldPrefix = true; 225 } 226 227 // Legacy support. If the old receive prefix is present 228 // remove it. 229 if (rxLine.startsWith(OLD_RECEIVE_PREFIX)) { 230 final int trim = OLD_RECEIVE_PREFIX.length(); 231 rxLine = rxLine.substring(trim); 232 } 233 234 if (!rxLine.startsWith(RECEIVE_PREFIX)) { 235 // Not a valid Tcp packet 236 log.debug("Wrong Prefix: {}", rxLine); 237 continue; 238 } 239 240 // Strip the prefix off. 241 //final int trim = RECEIVE_PREFIX.length(); 242 //rxLine = rxLine.substring(trim); 243 244 int firstidx = rxLine.indexOf("<"); 245 int lastidx = rxLine.lastIndexOf(">"); 246 log.debug("String {} Index1 {} Index 2{}", rxLine, firstidx, lastidx); 247 248 // BUG FIX: Incoming DCCppOverTCP messages are already formatted for DCC++ and don't 249 // need to be parsed. Indeed, trying to parse them will screw them up. 250 // So instead, we de-deprecated the string constructor so that we can 251 // directly create a DCCppReply from the incoming string without translation/parsing. 252 253 // Note: the substring call below also strips off the "< >" 254 DCCppReply msg = DCCppReply.parseDCCppReply(rxLine.substring(rxLine.indexOf("<") + 1, 255 rxLine.lastIndexOf(">"))); 256 257 if (!msg.isValidReplyFormat()) { 258 log.warn("Invalid Reply Format: {}", msg.toString()); 259 continue; 260 } 261 // message is complete, dispatch it !! 262 log.debug("queue reply for notification"); 263 264 final DCCppReply thisMsg = msg; 265 //final DCCppPacketizer thisTc = trafficController; 266 // return a notification via the queue to ensure end 267 Runnable r = new Runnable() { 268 final DCCppReply msgForLater = thisMsg; 269 270 @Override 271 public void run() { 272 notifyReply(msgForLater, null); 273 } 274 }; 275 javax.swing.SwingUtilities.invokeLater(r); 276 // done with this one 277 //} catch (DCCppMessageException e) { 278 // just let it ride for now 279 // log.warn("run: unexpected DCCppMessageException: ", e); 280 } catch (java.io.EOFException e) { 281 // posted from idle port when enableReceiveTimeout used 282 log.debug("EOFException, is DCC++ serial I/O using timeouts?"); 283 } catch (java.io.IOException e) { 284 // fired when write-end of HexFile reaches end 285 log.debug("IOException, should only happen with HexFile: ", e); 286 log.info("End of file"); 287 // disconnectPort(networkController); 288 return; 289 } // normally, we don't catch the unnamed Exception, but in this 290 // permanently running loop it seems wise. 291 catch (Exception e) { 292 log.warn("run: unexpected Exception: ", e); 293 } 294 } // end of permanent loop 295 } 296 } 297 298 /** 299 * Captive class to handle transmission. 300 */ 301 class XmtHandler implements Runnable { 302 303 @Override 304 public void run() { 305 306 while (!threadStopRequest) { // loop until asked to stop 307 // any input? 308 try { 309 // get content; failure is a NoSuchElementException 310 log.debug("check for input"); 311 DCCppMessage msg; 312 synchronized (xmtHandler) { 313 msg = xmtList.removeFirst(); 314 } 315 316 // input - now send 317 try { 318 if (ostream != null) { 319 //Commented out as the original LnPortnetworkController always returned true. 320 //if (!networkController.okToSend()) log.warn(DCCpp port not ready to receive"); // TCP, not RS232, so message is a real warning 321 log.debug("start write to network stream"); 322 StringBuilder packet = new StringBuilder(msg.length() + SEND_PREFIX.length() + 2); 323 if (useOldPrefix) { 324 packet.append(OLD_SEND_PREFIX); 325 } 326 packet.append("<").append(msg.toString()).append(">"); 327 if (log.isDebugEnabled()) { // avoid building a String when not needed 328 log.debug("Write to LbServer: {}", packet.toString()); 329 } 330 packet.append("\r\n"); 331 ostream.write(packet.toString().getBytes()); 332 ostream.flush(); 333 log.debug("end write to stream"); 334 } else { 335 // no stream connected 336 log.warn("sendDCCppMessage: no connection established"); 337 } 338 } catch (java.io.IOException e) { 339 log.warn("sendDCCppMessage: IOException: {}", e.toString()); 340 } 341 } catch (NoSuchElementException e) { 342 // message queue was empty, wait for input 343 log.debug("start wait"); 344 345 new jmri.util.WaitHandler(this); // handle synchronization, spurious wake, interruption 346 347 log.debug("end wait"); 348 } 349 } 350 } 351 } 352 353 /** 354 * Terminate the receive and transmit threads. 355 * <p> 356 * This is intended to be used only by testing subclasses. 357 */ 358 @Override 359 public void terminateThreads() { 360 threadStopRequest = true; 361 if (xmtThread != null) { 362 xmtThread.interrupt(); 363 try { 364 xmtThread.join(); 365 } catch (InterruptedException ie){ 366 // interrupted during cleanup. 367 } 368 } 369 370 if (rcvThread != null) { 371 rcvThread.interrupt(); 372 try { 373 rcvThread.join(); 374 } catch (InterruptedException ie){ 375 // interrupted during cleanup. 376 } 377 } 378 } 379 380 private final static Logger log = LoggerFactory.getLogger(DCCppOverTcpPacketizer.class); 381 382}