001package jmri.jmrix.roco.z21; 002 003import java.io.DataInputStream; 004import java.io.DataOutputStream; 005import java.io.IOException; 006import java.io.PipedInputStream; 007import java.io.PipedOutputStream; 008import jmri.jmrix.lenz.XNetListener; 009import jmri.jmrix.lenz.XNetMessage; 010import jmri.jmrix.lenz.XNetReply; 011import jmri.util.ImmediatePipedOutputStream; 012import org.slf4j.Logger; 013import org.slf4j.LoggerFactory; 014 015/** 016 * Interface between z21 messages and an XpressNet stream. 017 * <p> 018 * Parts of this code are derived from the 019 * jmri.jmrix.lenz.xnetsimulator.XNetSimulatorAdapter class. 020 * 021 * @author Paul Bender Copyright (C) 2014 022 */ 023public class Z21XPressNetTunnel implements Z21Listener, XNetListener, Runnable { 024 025 jmri.jmrix.lenz.XNetStreamPortController xsc = null; 026 private DataOutputStream pout = null; // for output to other classes 027 private DataInputStream pin = null; // for input from other classes 028 // internal ends of the pipes 029 private DataOutputStream outpipe = null; // feed pin 030 private DataInputStream inpipe = null; // feed pout 031 private Z21SystemConnectionMemo _memo; 032 private final Thread sourceThread; 033 private volatile boolean stopThread = false; 034 035 /** 036 * Build a new XpressNet tunnel. 037 * @param memo system connection. 038 */ 039 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data") 040 public Z21XPressNetTunnel(Z21SystemConnectionMemo memo) { 041 // save the SystemConnectionMemo. 042 _memo = memo; 043 044 // configure input and output pipes to use for 045 // the communication with the XpressNet implementation. 046 try { 047 PipedOutputStream tempPipeI = new ImmediatePipedOutputStream(); 048 pout = new DataOutputStream(tempPipeI); 049 inpipe = new DataInputStream(new PipedInputStream(tempPipeI)); 050 PipedOutputStream tempPipeO = new ImmediatePipedOutputStream(); 051 outpipe = new DataOutputStream(tempPipeO); 052 pin = new DataInputStream(new PipedInputStream(tempPipeO)); 053 } catch (java.io.IOException e) { 054 log.error("init (pipe): Exception: {}", e.toString()); 055 sourceThread = null; 056 return; 057 } 058 059 // start a thread to read from the input pipe. 060 sourceThread = new Thread(this); 061 sourceThread.setName("z21.Z21XpressNetTunnel sourceThread"); 062 sourceThread.setDaemon(true); 063 sourceThread.start(); 064 065 // Then use those pipes as the input and output pipes for 066 // a new XNetStreamPortController object. 067 setStreamPortController(new Z21XNetStreamPortController(pin, pout, "None")); 068 069 // register as a Z21Listener, so we can receive replies 070 _memo.getTrafficController().addz21Listener(this); 071 072 // start the XpressNet configuration. 073 xsc.configure(); 074 } 075 076 @Override 077 public void run() { // start a new thread 078 // this thread has one task. It repeatedly reads from the input pipe 079 // and writes modified data to the output pipe. This is the heart 080 // of the command station simulation. 081 log.debug("Simulator Thread Started"); 082 while (!stopThread) { 083 Z21XNetMessage m = readMessage(); 084 if(m != null) { 085 // don't forward a null message. 086 message(m); 087 } 088 } 089 } 090 091 /** 092 * Read one incoming message from the buffer and set 093 * outputBufferEmpty to true. 094 */ 095 private Z21XNetMessage readMessage() { 096 Z21XNetMessage msg = null; 097 try { 098 msg = loadChars(); 099 } catch (java.io.IOException e) { 100 // should do something meaningful here. 101 102 } 103 return (msg); 104 } 105 106 /** 107 * Get characters from the input source, and file a message. 108 * <p> 109 * Returns only when the message is complete. 110 * <p> 111 * Only used in the Receive thread. 112 * 113 * @return filled message 114 * @throws IOException when presented by the input source. 115 */ 116 private Z21XNetMessage loadChars() throws java.io.IOException { 117 int i; 118 byte char1; 119 char1 = readByteProtected(inpipe); 120 int len = (char1 & 0x0f) + 2; // opCode+Nbytes+ECC 121 // The z21 protocol has a special case for 122 // LAN_X_GET_TURNOUT_INFO, which advertises as having 123 // 3 payload bytes, but really only has two. 124 if((char1&0xff)==Z21Constants.LAN_X_GET_TURNOUT_INFO) 125 { 126 len=4; 127 } 128 Z21XNetMessage msg = new Z21XNetMessage(len); 129 msg.setElement(0, char1 & 0xFF); 130 for (i = 1; i < len; i++) { 131 char1 = readByteProtected(inpipe); 132 msg.setElement(i, char1 & 0xFF); 133 } 134 return msg; 135 } 136 137 /** 138 * Read a single byte, protecting against various timeouts, etc. 139 * <p> 140 * When a port is set to have a receive timeout (via the 141 * enableReceiveTimeout() method), some will return zero bytes or an 142 * EOFException at the end of the timeout. In that case, the read should be 143 * repeated to get the next real character. 144 */ 145 private byte readByteProtected(DataInputStream istream) throws java.io.IOException { 146 byte[] rcvBuffer = new byte[1]; 147 while (true) { // loop will repeat until character found 148 int nchars; 149 nchars = istream.read(rcvBuffer, 0, 1); 150 if (nchars > 0) { 151 return rcvBuffer[0]; 152 } 153 } 154 } 155 156 // Z21Listener interface methods. 157 158 /** 159 * Member function that will be invoked by a z21Interface implementation to 160 * forward a z21 message from the layout. 161 * 162 * @param msg The received z21 message. Note that this same object may be 163 * presented to multiple users. It should not be modified here. 164 */ 165 @Override 166 public void reply(Z21Reply msg) { 167 // This funcction forwards the payload of an XpressNet message 168 // tunneled in a z21 message and forwards it to the XpressNet 169 // implementation's input stream. 170 if (msg.isXPressNetTunnelMessage()) { 171 Z21XNetReply reply = msg.getXNetReply(); 172 log.debug("Z21 Reply {} forwarded to XpressNet implementation as {}", 173 msg, reply); 174 for (int i = 0; i < reply.getNumDataElements(); i++) { 175 try { 176 outpipe.writeByte(reply.getElement(i)); 177 } catch (java.io.IOException ioe) { 178 log.error("Error writing XpressNet Reply to XpressNet input stream."); 179 } 180 } 181 } 182 } 183 184 /** 185 * Member function that will be invoked by a z21Interface implementation to 186 * forward a z21 message sent to the layout. Normally, this function will do 187 * nothing. 188 * 189 * @param msg The received z21 message. Note that this same object may be 190 * presented to multiple users. It should not be modified here. 191 */ 192 @Override 193 public void message(Z21Message msg) { 194 // this function does nothing. 195 } 196 197 // XNetListener Interface methods. 198 199 /** 200 * Member function that will be invoked by a XNetInterface implementation to 201 * forward a XNet message from the layout. 202 * 203 * @param msg The received XNet message. Note that this same object may be 204 * presented to multiple users. It should not be modified here. 205 */ 206 @Override 207 public void message(XNetReply msg) { 208 // we don't do anything with replies. 209 } 210 211 /** 212 * Member function that will be invoked by a XNetInterface implementation to 213 * forward a XNet message sent to the layout. Normally, this function will 214 * do nothing. 215 * 216 * @param msg The received XNet message. Note that this same object may be 217 * presented to multiple users. It should not be modified here. 218 */ 219 @Override 220 public void message(XNetMessage msg) { 221 // when an XpressNet message shows up here, package it in a Z21Message 222 Z21Message message = new Z21Message(msg); 223 log.debug("XpressNet Message {} forwarded to z21 Interface as {}", 224 msg, message); 225 // and send the z21 message to the interface 226 _memo.getTrafficController().sendz21Message(message, this); 227 } 228 229 /** 230 * Member function invoked by an XNetInterface implementation to notify a 231 * sender that an outgoing message timed out and was dropped from the queue. 232 */ 233 @Override 234 public void notifyTimeout(XNetMessage msg) { 235 // we don't do anything with timeouts. 236 } 237 238 /** 239 * Package protected method to retrieve the stream port controller 240 * associated with this tunnel. 241 * @return controller in use 242 */ 243 jmri.jmrix.lenz.XNetStreamPortController getStreamPortController() { 244 return xsc; 245 } 246 247 /** 248 * Package protected method to set the stream port controller 249 * associated with this tunnel. 250 * @param x controller to retain 251 */ 252 void setStreamPortController(jmri.jmrix.lenz.XNetStreamPortController x){ 253 xsc = x; 254 255 // configure the XpressNet connections properties. 256 xsc.getSystemConnectionMemo().setSystemPrefix("X"); 257 xsc.getSystemConnectionMemo().setUserName(_memo.getUserName() + "XpressNet"); 258 259 // register a connection config object for this stream port. 260 //jmri.InstanceManager.getDefault(jmri.ConfigureManager.class).registerPref(new Z21XNetConnectionConfig(xsc)); 261 //jmri.InstanceManager.getDefault(jmri.jmrix.ConnectionConfigManager.class).add(new Z21XNetConnectionConfig(xsc)); 262 } 263 264 @SuppressWarnings("deprecation") // Thread.stop 265 public void dispose(){ 266 if (xsc != null) { 267 xsc.dispose(); 268 } 269 try { 270 inpipe.close(); 271 } catch (IOException ex) { 272 // Ignore IO error 273 } 274 275 if (sourceThread != null) { 276 stopThread = true; 277 sourceThread.interrupt(); 278 try { 279 sourceThread.join(); 280 } catch (InterruptedException e) { 281 // Do nothing 282 } 283 } 284 } 285 286 private final static Logger log = LoggerFactory.getLogger(Z21XPressNetTunnel.class); 287 288}