001package jmri.jmrix.loconet.uhlenbrock; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.util.Calendar; 005import java.util.concurrent.ConcurrentLinkedQueue; 006import jmri.jmrix.loconet.LnPacketizer; 007import jmri.jmrix.loconet.LocoNetMessage; 008import jmri.jmrix.loconet.LocoNetMessageException; 009import jmri.jmrix.loconet.LocoNetSystemConnectionMemo; 010import org.slf4j.Logger; 011import org.slf4j.LoggerFactory; 012 013/** 014 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface" 015 * side sends/receives LocoNetMessage objects. The connection to a 016 * LnPortController is via a pair of *Streams, which then carry sequences of 017 * characters for transmission. 018 * <p> 019 * Messages come to this via the main GUI thread, and are forwarded back to 020 * listeners in that same thread. Reception and transmission are handled in 021 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 022 * classes defined here. The thread priorities are: 023 * <ul> 024 * <li> RcvHandler - at highest available priority 025 * <li> XmtHandler - down one, which is assumed to be above the GUI 026 * <li> (everything else) 027 * </ul> 028 * 029 * Some of the message formats used in this class are Copyright Digitrax, Inc. 030 * and used with permission as part of the JMRI project. That permission does 031 * not extend to uses in other software products. If you wish to use this code, 032 * algorithm or these message formats outside of JMRI, please contact Digitrax 033 * Inc for separate permission. 034 * 035 * @author Bob Jacobsen Copyright (C) 2001, 2010 036 */ 037public class UhlenbrockPacketizer extends LnPacketizer { 038 039 @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", 040 justification = "Only used during system initialization") 041 public UhlenbrockPacketizer(LocoNetSystemConnectionMemo m) { 042 super(m); 043 log.debug("UhlenbrockPacketizer instantiated"); 044 } 045 046 public static final int NOTIFIEDSTATE = 15; // xmt notified, will next wake 047 public static final int WAITMSGREPLYSTATE = 25; // xmt has sent, await reply to message 048 049 static int defaultWaitTimer = 10000; 050 051 /** 052 * Forward a preformatted LocoNetMessage to the actual interface. 053 * 054 * Checksum is computed and overwritten here, then the message is converted 055 * to a byte array and queued for transmission. 056 * 057 * @param m Message to send; will be updated with CRC 058 */ 059 @Override 060 public void sendLocoNetMessage(LocoNetMessage m) { 061 log.debug("add to queue message {}", m.toString()); 062 // update statistics 063 transmittedMsgCount++; 064 065 // set the error correcting code byte(s) before transmittal 066 m.setParity(); 067 068 // stream to port in single write, as that's needed by serial 069 int len = m.getNumDataElements(); 070 byte msg[] = new byte[len]; 071 for (int i = 0; i < len; i++) { 072 msg[i] = (byte) m.getElement(i); 073 } 074 075 if (log.isDebugEnabled()) { 076 log.debug("queue LocoNet packet: {}", m.toString()); 077 } 078 // queue the request 079 try { 080 xmtLocoNetList.add(m); // done first to make sure it's there before xmtList has an element 081 xmtList.add(msg); 082 } catch (RuntimeException e) { 083 log.warn("passing to xmit: unexpected exception: ", e); 084 } 085 } 086 087 /** 088 * Synchronized list used as a transmit queue. 089 * <p> 090 * This is public to allow access from the internal class(es) when compiling 091 * with Java 1.1 092 */ 093 public ConcurrentLinkedQueue<LocoNetMessage> xmtLocoNetList = new ConcurrentLinkedQueue<>(); 094 095 /** 096 * Captive class to handle incoming characters. This is a permanent loop, 097 * looking for input messages in character form on the stream connected to 098 * the LnPortController via <code>connectPort</code>. 099 */ 100 class RcvHandler implements Runnable { 101 102 /** 103 * Remember the LnPacketizer object. 104 */ 105 LnPacketizer trafficController; 106 107 public RcvHandler(LnPacketizer lt) { 108 trafficController = lt; 109 } 110 111 @Override 112 public void run() { 113 114 int opCode; 115 while (true) { // loop permanently, program close will exit 116 try { 117 // start by looking for command - skip if bit not set 118 int inbyte = readByteProtected(istream) & 0xFF; 119 while (((opCode = (inbyte)) & 0x80) == 0) { 120 log.debug("Skipping: {}", Integer.toHexString(opCode)); 121 inbyte = readByteProtected(istream) & 0xFF; 122 } 123 // here opCode is OK. Create output message 124 log.debug("Start message with opcode: {}", Integer.toHexString(opCode)); 125 LocoNetMessage msg = null; 126 while (msg == null) { 127 try { 128 // Capture 2nd byte, always present 129 int byte2 = readByteProtected(istream) & 0xFF; 130 //log.debug("Byte2: "+Integer.toHexString(byte2)); 131 if ((byte2 & 0x80) != 0) { 132 log.warn("LocoNet message with opCode: {} ended early. Byte2 is also an opcode: {}", Integer.toHexString(opCode), Integer.toHexString(byte2)); 133 opCode = byte2; 134 throw new LocoNetMessageException(); 135 } 136 137 // Decide length 138 switch ((opCode & 0x60) >> 5) { 139 case 0: 140 /* 2 byte message */ 141 142 msg = new LocoNetMessage(2); 143 break; 144 145 case 1: 146 /* 4 byte message */ 147 148 msg = new LocoNetMessage(4); 149 break; 150 151 case 2: 152 /* 6 byte message */ 153 154 msg = new LocoNetMessage(6); 155 break; 156 157 case 3: 158 /* N byte message */ 159 160 if (byte2 < 2) { 161 log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); 162 } 163 msg = new LocoNetMessage(byte2); 164 break; 165 default: // can't happen with this code, but just in case... 166 throw new LocoNetMessageException("decode failure " + byte2); 167 } 168 // message exists, now fill it 169 msg.setOpCode(opCode); 170 msg.setElement(1, byte2); 171 int len = msg.getNumDataElements(); 172 //log.debug("len: "+len); 173 for (int i = 2; i < len; i++) { 174 // check for message-blocking error 175 int b = readByteProtected(istream) & 0xFF; 176 //log.debug("char "+i+" is: "+Integer.toHexString(b)); 177 if ((b & 0x80) != 0) { 178 log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); 179 opCode = b; 180 throw new LocoNetMessageException(); 181 } 182 msg.setElement(i, b); 183 } 184 } catch (LocoNetMessageException e) { 185 // retry by going around again 186 // opCode is set for the newly-started packet 187 msg = null; 188 continue; 189 } 190 } 191 // check parity 192 if (!msg.checkParity()) { 193 log.warn("Ignore LocoNet packet with bad checksum: {}", msg.toString()); 194 throw new LocoNetMessageException(); 195 } 196 197 if (msg.equals(lastMessage)) { 198 log.debug("We have our returned message and can send back out our next instruction"); 199 mCurrentState = NOTIFIEDSTATE; 200 } 201 202 // message is complete, dispatch it !! 203 { 204 log.debug("queue message for notification"); 205 //log.debug("-------------------Uhlenbrock IB-COM LocoNet message RECEIVED: {}", msg.toString()); 206 final LocoNetMessage thisMsg = msg; 207 final LnPacketizer thisTc = trafficController; 208 // return a notification via the queue to ensure end 209 Runnable r = new Runnable() { 210 LocoNetMessage msgForLater = thisMsg; 211 LnPacketizer myTc = thisTc; 212 213 @Override 214 public void run() { 215 myTc.notify(msgForLater); 216 } 217 }; 218 javax.swing.SwingUtilities.invokeLater(r); 219 } 220 221 // done with this one 222 } catch (LocoNetMessageException e) { 223 // just let it ride for now 224 log.warn("run: unexpected LocoNetMessageException: ", e); 225 } catch (java.io.EOFException | java.io.InterruptedIOException e) { 226 // posted from idle port when enableReceiveTimeout used 227 // Normal condition, go around the loop again 228 continue; 229 } catch (java.io.IOException e) { 230 // fired when write-end of HexFile reaches end 231 log.debug("IOException, should only happen with HexFile", e); 232 log.debug("End of file"); 233 disconnectPort(controller); 234 return; 235 } catch (RuntimeException e) { 236 // normally, we don't catch RuntimeException, but in this 237 // permanently running loop it seems wise. 238 log.warn("run: unexpected Exception", e); // NOI18N 239 continue; 240 } 241 } // end of permanent loop 242 } 243 } 244 245 LocoNetMessage lastMessage; 246 247 /** 248 * Captive class to handle transmission 249 */ 250 class XmtHandler implements Runnable { 251 252 @Override 253 public void run() { 254 255 while (true) { // loop permanently 256 // any input? 257 try { 258 // get content; blocks until present 259 log.debug("check for input"); 260 byte msg[] = null; 261 lastMessage = null; 262 msg = xmtList.take(); 263 lastMessage = xmtLocoNetList.remove(); // done second to make sure xmlList had an element 264 265 //log.debug("-------------------Uhlenbrock IB-COM LocoNet message to SEND: {}", msg.toString()); 266 267 // input - now send 268 try { 269 if (ostream != null) { 270 if (!controller.okToSend()) { 271 log.debug("LocoNet port not ready to receive"); 272 } 273 log.debug("start write to stream"); 274 while (!controller.okToSend()) { 275 Thread.yield(); 276 } 277 ostream.write(msg); 278 ostream.flush(); 279 log.debug("end write to stream"); 280 messageTransmitted(msg); 281 mCurrentState = WAITMSGREPLYSTATE; 282 transmitWait(defaultWaitTimer, WAITMSGREPLYSTATE); 283 } else { 284 // no stream connected 285 log.warn("sendLocoNetMessage: no connection established"); 286 } 287 } catch (java.io.IOException e) { 288 log.warn("sendLocoNetMessage: IOException: {}", e.toString()); 289 } 290 } catch (InterruptedException ie) { 291 return; // ending the thread 292 } 293 } 294 } 295 } 296 297 protected void transmitWait(int waitTime, int state/*, String InterruptMessage*/) { 298 // wait() can have spurious wakeup! 299 // so we protect by making sure the entire timeout time is used 300 long currentTime = Calendar.getInstance().getTimeInMillis(); 301 long endTime = currentTime + waitTime; 302 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 303 long wait = endTime - currentTime; 304 try { 305 synchronized (xmtHandler) { 306 // Do not wait if the current state has changed since we 307 // last set it. 308 if (mCurrentState != state) { 309 return; 310 } 311 xmtHandler.wait(wait); // rcvr normally ends this w state change 312 } 313 } catch (InterruptedException e) { 314 Thread.currentThread().interrupt(); // retain if needed later 315 log.error("transmitLoop interrupted"); 316 } 317 } 318 log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState); 319 } 320 321 volatile protected int mCurrentState; 322 323 /** 324 * Invoked at startup to start the threads needed here. 325 */ 326 @Override 327 public void startThreads() { 328 int priority = Thread.currentThread().getPriority(); 329 log.debug("startThreads current priority = {} max available = " + Thread.MAX_PRIORITY + " default = " + Thread.NORM_PRIORITY + " min available = " + Thread.MIN_PRIORITY, priority); 330 331 // make sure that the xmt priority is no lower than the current priority 332 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 333 // start the XmtHandler in a thread of its own 334 if (xmtHandler == null) { 335 xmtHandler = new XmtHandler(); 336 } 337 xmtThread = new Thread(xmtHandler, "LocoNet Uhlenbrock transmit handler"); 338 log.debug("Xmt thread starts at priority {}", xmtpriority); 339 xmtThread.setDaemon(true); 340 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 341 xmtThread.start(); 342 343 // start the RcvHandler in a thread of its own 344 if (rcvHandler == null) { 345 rcvHandler = new RcvHandler(this); 346 } 347 rcvThread = new Thread(rcvHandler, "LocoNet Uhlenbrock receive handler"); 348 rcvThread.setDaemon(true); 349 rcvThread.setPriority(Thread.MAX_PRIORITY); 350 rcvThread.start(); 351 352 } 353 354 private final static Logger log = LoggerFactory.getLogger(UhlenbrockPacketizer.class); 355 356}