001package jmri.jmrix.loconet.loconetovertcp; 002 003import java.io.BufferedReader; 004import java.io.IOException; 005import java.io.InputStreamReader; 006import java.io.OutputStream; 007import java.net.Socket; 008import java.util.LinkedList; 009import java.util.StringTokenizer; 010import jmri.jmrix.loconet.LnTrafficController; 011import jmri.jmrix.loconet.LocoNetListener; 012import jmri.jmrix.loconet.LocoNetMessage; 013import org.slf4j.Logger; 014import org.slf4j.LoggerFactory; 015 016/** 017 * Implementation of the LocoNetOverTcp LbServer Server Protocol. 018 * 019 * @author Alex Shepherd Copyright (C) 2006 020 */ 021public final class ClientRxHandler extends Thread implements LocoNetListener { 022 023 Socket clientSocket; 024 BufferedReader inStream; 025 OutputStream outStream; 026 final LinkedList<LocoNetMessage> msgQueue = new LinkedList<>(); 027 volatile Thread txThread; 028 String inString; 029 String remoteAddress; 030 LocoNetMessage lastSentMessage; 031 LnTrafficController tc; 032 033 public ClientRxHandler(String newRemoteAddress, Socket newSocket, LnTrafficController _tc) { 034 tc = _tc; 035 clientSocket = newSocket; 036 setDaemon(true); 037 setPriority(Thread.MAX_PRIORITY); 038 remoteAddress = newRemoteAddress; 039 setName("ClientRxHandler:" + remoteAddress); 040 lastSentMessage = null; 041 start(); 042 } 043 044 @Override 045 public void run() { 046 047 try { 048 inStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 049 outStream = clientSocket.getOutputStream(); 050 051 tc.addLocoNetListener(~0, this); 052 053 txThread = new Thread(new ClientTxHandler(this)); 054 txThread.setDaemon(true); 055 txThread.setPriority(Thread.MAX_PRIORITY); 056 txThread.setName("ClientTxHandler: " + remoteAddress); 057 txThread.start(); 058 059 while (!isInterrupted()) { 060 inString = inStream.readLine(); 061 if (inString == null) { 062 log.debug("ClientRxHandler: Remote Connection Closed"); 063 interrupt(); 064 } else { 065 log.debug("ClientRxHandler: Received: {}", inString); 066 067 StringTokenizer st = new StringTokenizer(inString); 068 if (st.nextToken().equals("SEND")) { 069 LocoNetMessage msg = null; 070 int opCode = Integer.parseInt(st.nextToken(), 16); 071 int byte2 = Integer.parseInt(st.nextToken(), 16); 072 073 // Decide length 074 switch ((opCode & 0x60) >> 5) { 075 case 0: // 2 byte message 076 077 msg = new LocoNetMessage(2); 078 break; 079 080 case 1: // 4 byte message 081 082 msg = new LocoNetMessage(4); 083 break; 084 085 case 2: // 6 byte message 086 087 msg = new LocoNetMessage(6); 088 break; 089 090 case 3: // N byte message 091 092 if (byte2 < 2) { 093 log.error("ClientRxHandler: LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); 094 } 095 msg = new LocoNetMessage(byte2); 096 break; 097 default: 098 log.warn("Unhandled msg length: {}", (opCode & 0x60) >> 5); 099 break; 100 } 101 if (msg == null) { // IDE may flag, spotbugs warns it can be null at this point, so keep this check 102 log.error("msg is null!"); 103 return; 104 } 105 // message exists, now fill it 106 msg.setOpCode(opCode); 107 msg.setElement(1, byte2); 108 int len = msg.getNumDataElements(); 109 //log.debug("len: "+len); 110 111 for (int i = 2; i < len; i++) { 112 int b = Integer.parseInt(st.nextToken(), 16); 113 msg.setElement(i, b); 114 } 115 116 tc.sendLocoNetMessage(msg); 117 // Keep the message we just sent so we can ACK it when we hear 118 // the echo from the LocoBuffer 119 lastSentMessage = msg; 120 } 121 } 122 } 123 } catch (IOException ex) { 124 log.debug("ClientRxHandler: IO Exception: ", ex); 125 } 126 tc.removeLocoNetListener(~0, this); 127 if (txThread != null) txThread.interrupt(); 128 129 txThread = null; 130 inStream = null; 131 outStream = null; 132 msgQueue.clear(); 133 134 try { 135 clientSocket.close(); 136 } catch (IOException ignore) { 137 } 138 139 LnTcpServer.getDefault().removeClient(this); // NPE here: 140 // log.info("ClientRxHandler: Exiting"); 141 } 142 143 public void close() { 144 try { 145 clientSocket.close(); 146 } catch (IOException ex1) { 147 log.error("close, which closing clientSocket", ex1); 148 } 149 } 150 151 class ClientTxHandler implements Runnable { 152 153 LocoNetMessage msg; 154 StringBuffer outBuf; 155 Thread parentThread; 156 157 ClientTxHandler(Thread creator) { 158 parentThread = creator; 159 } 160 161 @Override 162 public void run() { 163 164 try { 165 outBuf = new StringBuffer("VERSION JMRI Server "); 166 outBuf.append(jmri.Version.name()).append("\r\n"); 167 outStream.write(outBuf.toString().getBytes()); 168 169 while (!isInterrupted()) { 170 msg = null; 171 172 synchronized (msgQueue) { 173 if (msgQueue.isEmpty()) { 174 msgQueue.wait(); 175 } 176 177 if (!msgQueue.isEmpty()) { 178 msg = msgQueue.removeFirst(); 179 } 180 } 181 182 if (msg != null) { 183 outBuf.setLength(0); 184 outBuf.append("RECEIVE "); 185 outBuf.append(msg.toString()); 186 log.debug("ClientTxHandler: Send: {}", outBuf.toString()); 187 outBuf.append("\r\n"); 188 // See if we are waiting for an echo of a sent message 189 // and if it is append the Ack to the client 190 if ((lastSentMessage != null) && lastSentMessage.equals(msg)) { 191 lastSentMessage = null; 192 outBuf.append("SENT OK\r\n"); 193 } 194 outStream.write(outBuf.toString().getBytes()); 195 outStream.flush(); 196 } 197 } 198 } catch (IOException ex) { 199 log.error("ClientTxHandler: IO Exception"); 200 } catch (InterruptedException ex) { 201 Thread.currentThread().interrupt(); // retain if needed later 202 log.debug("ClientTxHandler: Interrupted Exception"); 203 } 204 // Interrupt the Parent to let it know we are exiting for some reason 205 parentThread.interrupt(); 206 207 parentThread = null; 208 msg = null; 209 outBuf = null; 210 //log.info("ClientTxHandler: Exiting"); 211 } 212 } 213 214 @Override 215 public void message(LocoNetMessage msg) { 216 synchronized (msgQueue) { 217 msgQueue.add(msg); 218 msgQueue.notify(); 219 } 220 } 221 222 /** 223 * Kill this thread, usually for testing purposes 224 */ 225 void dispose() { 226 try { 227 this.interrupt(); 228 this.join(); 229 } catch (InterruptedException ex) { 230 log.warn("dispose() interrupted"); 231 } 232 } 233 234 private final static Logger log = LoggerFactory.getLogger(ClientRxHandler.class); 235 236}