001package jmri.jmrix.ieee802154.xbee; 002 003import com.digi.xbee.api.RemoteXBeeDevice; 004import com.digi.xbee.api.exceptions.TimeoutException; 005import com.digi.xbee.api.exceptions.XBeeException; 006import java.io.DataInputStream; 007import java.io.DataOutputStream; 008import java.io.PipedInputStream; 009import java.io.PipedOutputStream; 010import java.util.ArrayList; 011import jmri.jmrix.AbstractPortController; 012import jmri.util.ImmediatePipedOutputStream; 013import org.slf4j.Logger; 014import org.slf4j.LoggerFactory; 015 016/* 017 * This class provides an interface between the XBee messages that are sent 018 * to and from the serial port connected to an XBee device. 019 * Remote devices may be sending messages in transparent mode. 020 * 021 * Some of this code is derived from the XNetSimulator. 022 * 023 * @Author Paul Bender Copyright (C) 2014 024 */ 025final public class XBeeIOStream extends AbstractPortController { 026 027 private DataOutputStream pout = null; // for output to other classes 028 private DataInputStream pin = null; // for input from other classes 029 // internal ends of the pipes 030 private DataOutputStream outpipe = null; // data from xbee written here 031 // ends up in pin. 032 private DataInputStream inpipe = null; // data read from this pipe is 033 // sent to the XBee. 034 private Thread sourceThread; // thread writing to the remote xbee 035 private Thread sinkThread; // thread reading from the remote xbee 036 037 private final RemoteXBeeDevice remoteXBee; 038 private final XBeeTrafficController xtc; 039 040 public XBeeIOStream(XBeeNode node, XBeeTrafficController tc) { 041 super(tc.getAdapterMemo()); 042 remoteXBee = node.getXBee(); 043 try { 044 PipedOutputStream tempPipeI = new ImmediatePipedOutputStream(); 045 pout = new DataOutputStream(tempPipeI); 046 inpipe = new DataInputStream(new PipedInputStream(tempPipeI)); 047 PipedOutputStream tempPipeO = new ImmediatePipedOutputStream(); 048 outpipe = new DataOutputStream(tempPipeO); 049 pin = new DataInputStream(new PipedInputStream(tempPipeO)); 050 } catch (java.io.IOException e) { 051 log.error("init (pipe): Exception: {}", e.toString()); 052 } 053 054 xtc = tc; 055 } 056 057 // routines defined as abstract in AbstractPortController 058 @Override 059 public DataInputStream getInputStream() { 060 if (pin == null) { 061 log.error("getInputStream called before load(), stream not available"); 062 } 063 return pin; 064 } 065 066 @Override 067 public DataOutputStream getOutputStream() { 068 if (pout == null) { 069 log.error("getOutputStream called before load(), stream not available"); 070 } 071 return pout; 072 } 073 074 @Override 075 public void connect() { 076 } 077 078 @Override 079 public void configure() { 080 // start the transmit thread 081 sourceThread = new Thread(new TransmitThread(remoteXBee, xtc, inpipe)); 082 sourceThread.setName("xbee.XBeeIOStream Transmit thread"); 083 sourceThread.start(); 084 085 // start the receive thread 086 sinkThread = new Thread(new ReceiveThread(remoteXBee, xtc, outpipe)); 087 sinkThread.setName("xbee.XBeeIOStream Receive thread"); 088 sinkThread.start(); 089 090 } 091 092 @Override 093 public boolean status() { 094 return (pout != null && pin != null); 095 } 096 097 @Override 098 public String getCurrentPortName() { 099 return "NONE"; 100 } 101 102 @Override 103 public boolean getDisabled() { 104 return false; 105 } 106 107 @Override 108 public void setDisabled(boolean disabled) { 109 } 110 111 @Override 112 public void dispose() { 113 if (sourceThread != null) { 114 sourceThread.interrupt(); 115 sourceThread = null; 116 } 117 if (sinkThread != null) { 118 sinkThread.interrupt(); 119 sinkThread = null; 120 } 121 try { 122 pin.close(); 123 pout.close(); 124 outpipe.close(); 125 inpipe.close(); 126 } catch (java.io.IOException ioe) { 127 log.debug("IO Exception closing port during dispose call"); 128 } 129 super.dispose(); 130 } 131 132 @Override 133 public void recover() { 134 } 135 136 static private class TransmitThread implements Runnable { 137 138 private RemoteXBeeDevice node = null; 139 private XBeeTrafficController xtc = null; 140 private DataInputStream pipe = null; 141 142 public TransmitThread(RemoteXBeeDevice n, XBeeTrafficController tc, DataInputStream input) { 143 node = n; 144 xtc = tc; 145 pipe = input; 146 } 147 148 @Override 149 public void run() { // start a new thread 150 // this thread has one task. It repeatedly reads from the input pipe 151 // and sends data to the XBee. 152 log.debug("XBee Transmit Thread Started"); 153 for (;;) { 154 // the data we send is required to be a byte array. 155 // The maximum number of values we 156 // can collect is 100. 157 ArrayList<Byte> data = new ArrayList<>(); 158 try { 159 do { 160 log.debug("Attempting byte read"); 161 byte b = pipe.readByte(); 162 log.debug("Read Byte: {}", b); 163 data.add(data.size(), b); 164 } while (data.size() < 100 && pipe.available() > 0); 165 } catch (java.io.IOException e) { 166 log.error("IOException reading serial data from pipe before sending to XBee"); 167 } 168 byte[] dataArray = new byte[data.size()]; 169 int i = 0; 170 for (Byte n : data) { 171 dataArray[i++] = n; 172 } 173 if (log.isDebugEnabled()) { 174 log.debug("XBee Thread received message {}", jmri.util.StringUtil.hexStringFromBytes(dataArray)); 175 } 176 try { 177 xtc.getXBee().sendData(node, dataArray); 178 } catch (TimeoutException te) { 179 log.error("Timeout sending stream data to node {}.", node); 180 } catch (XBeeException xbe) { 181 log.error("Exception sending stream data to node {}.", node); 182 } catch (NullPointerException npe) { 183 if (Thread.interrupted()) { 184 return; 185 } 186 } 187 } 188 } 189 190 } 191 192 static private class ReceiveThread implements Runnable { 193 194 private final RemoteXBeeDevice node; 195 private final XBeeTrafficController xtc; 196 private final DataOutputStream pipe; 197 198 public ReceiveThread(RemoteXBeeDevice n, XBeeTrafficController tc, DataOutputStream output) { 199 node = n; 200 xtc = tc; 201 pipe = output; 202 } 203 204 @Override 205 public void run() { // start a new thread 206 // this thread has one task. It repeatedly reads from the XBee 207 // and writes data to the output pipe 208 if (log.isDebugEnabled()) { 209 log.debug("XBee Receive Thread Started"); 210 } 211 for (;;) { 212 try { 213 com.digi.xbee.api.models.XBeeMessage message = xtc.getXBee().readDataFrom(node, 100); 214 if (message != null) { 215 byte[] data = message.getData(); 216 log.debug("Received {}", data); 217 for (byte datum : data) { 218 pipe.write(datum); 219 } 220 } 221 } catch (java.io.IOException ioe) { 222 log.error("IOException writing serial data from XBee to pipe"); 223 } catch (java.lang.NullPointerException npe) { 224 log.error("NullPointerException writing serial data from XBee to pipe"); 225 // this is fatal, return 226 return; 227 } 228 } 229 } 230 231 } 232 233 private final static Logger log = LoggerFactory.getLogger(XBeeIOStream.class); 234 235}