001package jmri.jmrix.ieee802154.xbee;
002
003import com.digi.xbee.api.RemoteXBeeDevice;
004import com.digi.xbee.api.exceptions.TimeoutException;
005import com.digi.xbee.api.exceptions.XBeeException;
006import java.io.DataInputStream;
007import java.io.DataOutputStream;
008import java.io.PipedInputStream;
009import java.io.PipedOutputStream;
010import java.util.ArrayList;
011import jmri.jmrix.AbstractPortController;
012import jmri.util.ImmediatePipedOutputStream;
013import org.slf4j.Logger;
014import org.slf4j.LoggerFactory;
015
016/*
017 * This class provides an interface between the XBee messages that are sent
018 * to and from the serial port connected to an XBee device.
019 * Remote devices may be sending messages in transparent mode.
020 *
021 * Some of this code is derived from the XNetSimulator.
022 *
023 * @Author Paul Bender Copyright (C) 2014
024 */
025final public class XBeeIOStream extends AbstractPortController {
026
027    private DataOutputStream pout = null; // for output to other classes
028    private DataInputStream pin = null; // for input from other classes
029    // internal ends of the pipes
030    private DataOutputStream outpipe = null;  // data from xbee written here
031    // ends up in pin.
032    private DataInputStream inpipe = null; // data read from this pipe is
033    // sent to the XBee.
034    private Thread sourceThread;  // thread writing to the remote xbee
035    private Thread sinkThread;  // thread reading from the remote xbee
036
037    private final RemoteXBeeDevice remoteXBee;
038    private final XBeeTrafficController xtc;
039
040    public XBeeIOStream(XBeeNode node, XBeeTrafficController tc) {
041        super(tc.getAdapterMemo());
042        remoteXBee = node.getXBee();
043        try {
044            PipedOutputStream tempPipeI = new ImmediatePipedOutputStream();
045            pout = new DataOutputStream(tempPipeI);
046            inpipe = new DataInputStream(new PipedInputStream(tempPipeI));
047            PipedOutputStream tempPipeO = new ImmediatePipedOutputStream();
048            outpipe = new DataOutputStream(tempPipeO);
049            pin = new DataInputStream(new PipedInputStream(tempPipeO));
050        } catch (java.io.IOException e) {
051            log.error("init (pipe): Exception: {}", e.toString());
052        }
053
054        xtc = tc;
055    }
056
057    // routines defined as abstract in AbstractPortController
058    @Override
059    public DataInputStream getInputStream() {
060        if (pin == null) {
061            log.error("getInputStream called before load(), stream not available");
062        }
063        return pin;
064    }
065
066    @Override
067    public DataOutputStream getOutputStream() {
068        if (pout == null) {
069            log.error("getOutputStream called before load(), stream not available");
070        }
071        return pout;
072    }
073
074    @Override
075    public void connect() {
076    }
077
078    @Override
079    public void configure() {
080        // start the transmit thread
081        sourceThread = new Thread(new TransmitThread(remoteXBee, xtc, inpipe));
082        sourceThread.setName("xbee.XBeeIOStream Transmit thread");
083        sourceThread.start();
084
085        // start the receive thread
086        sinkThread = new Thread(new ReceiveThread(remoteXBee, xtc, outpipe));
087        sinkThread.setName("xbee.XBeeIOStream Receive thread");
088        sinkThread.start();
089
090    }
091
092    @Override
093    public boolean status() {
094        return (pout != null && pin != null);
095    }
096
097    @Override
098    public String getCurrentPortName() {
099        return "NONE";
100    }
101
102    @Override
103    public boolean getDisabled() {
104        return false;
105    }
106
107    @Override
108    public void setDisabled(boolean disabled) {
109    }
110
111    @Override
112    public void dispose() {
113        if (sourceThread != null) {
114            sourceThread.interrupt();
115            sourceThread = null;
116        }
117        if (sinkThread != null) {
118            sinkThread.interrupt();
119            sinkThread = null;
120        }
121        try {
122            pin.close();
123            pout.close();
124            outpipe.close();
125            inpipe.close();
126        } catch (java.io.IOException ioe) {
127            log.debug("IO Exception closing port during dispose call");
128        }
129        super.dispose();
130    }
131
132    @Override
133    public void recover() {
134    }
135
136    static private class TransmitThread implements Runnable {
137
138        private RemoteXBeeDevice node = null;
139        private XBeeTrafficController xtc = null;
140        private DataInputStream pipe = null;
141
142        public TransmitThread(RemoteXBeeDevice n, XBeeTrafficController tc, DataInputStream input) {
143            node = n;
144            xtc = tc;
145            pipe = input;
146        }
147
148        @Override
149        public void run() { // start a new thread
150            // this thread has one task.  It repeatedly reads from the input pipe
151            // and sends data to the XBee.
152            log.debug("XBee Transmit Thread Started");
153            for (;;) {
154                // the data we send is required to be a byte array.
155                // The maximum number of values we
156                // can collect is 100.
157                ArrayList<Byte> data = new ArrayList<>();
158                try {
159                    do {
160                        log.debug("Attempting byte read");
161                        byte b = pipe.readByte();
162                        log.debug("Read Byte: {}", b);
163                        data.add(data.size(), b);
164                    } while (data.size() < 100 && pipe.available() > 0);
165                } catch (java.io.IOException e) {
166                    log.error("IOException reading serial data from pipe before sending to XBee");
167                }
168                byte[] dataArray = new byte[data.size()];
169                int i = 0;
170                for (Byte n : data) {
171                    dataArray[i++] = n;
172                }
173                if (log.isDebugEnabled()) {
174                    log.debug("XBee Thread received message {}", jmri.util.StringUtil.hexStringFromBytes(dataArray));
175                }
176                try {
177                    xtc.getXBee().sendData(node, dataArray);
178                } catch (TimeoutException te) {
179                    log.error("Timeout sending stream data to node {}.", node);
180                } catch (XBeeException xbe) {
181                    log.error("Exception sending stream data to node {}.", node);
182                } catch (NullPointerException npe) {
183                    if (Thread.interrupted()) {
184                        return;
185                    }
186                }
187            }
188        }
189
190    }
191
192    static private class ReceiveThread implements Runnable {
193
194        private final RemoteXBeeDevice node;
195        private final XBeeTrafficController xtc;
196        private final DataOutputStream pipe;
197
198        public ReceiveThread(RemoteXBeeDevice n, XBeeTrafficController tc, DataOutputStream output) {
199            node = n;
200            xtc = tc;
201            pipe = output;
202        }
203
204        @Override
205        public void run() { // start a new thread
206            // this thread has one task.  It repeatedly reads from the XBee
207            // and writes data to the output pipe
208            if (log.isDebugEnabled()) {
209                log.debug("XBee Receive Thread Started");
210            }
211            for (;;) {
212                try {
213                    com.digi.xbee.api.models.XBeeMessage message = xtc.getXBee().readDataFrom(node, 100);
214                    if (message != null) {
215                        byte[] data = message.getData();
216                        log.debug("Received {}", data);
217                        for (byte datum : data) {
218                            pipe.write(datum);
219                        }
220                    }
221                } catch (java.io.IOException ioe) {
222                    log.error("IOException writing serial data from XBee to pipe");
223                } catch (java.lang.NullPointerException npe) {
224                    log.error("NullPointerException writing serial data from XBee to pipe");
225                    // this is fatal, return
226                    return;
227                }
228            }
229        }
230
231    }
232
233    private final static Logger log = LoggerFactory.getLogger(XBeeIOStream.class);
234
235}