001package jmri.jmrix.dccpp.dccppovertcp; 002 003import java.io.BufferedReader; 004import java.io.IOException; 005import java.io.InputStreamReader; 006import java.io.OutputStream; 007import java.net.Socket; 008import java.util.LinkedList; 009import jmri.InstanceManager; 010import jmri.jmrix.dccpp.DCCppListener; 011import jmri.jmrix.dccpp.DCCppMessage; 012import jmri.jmrix.dccpp.DCCppReply; 013import jmri.jmrix.dccpp.DCCppSystemConnectionMemo; 014import org.slf4j.Logger; 015import org.slf4j.LoggerFactory; 016 017import javax.annotation.concurrent.GuardedBy; 018 019/** 020 * Implementation of the DCCppOverTcp LbServer Server Protocol. 021 * 022 * @author Alex Shepherd Copyright (C) 2006 023 * @author Mark Underwood Copyright (C) 2015 024 */ 025public final class ClientRxHandler extends Thread implements DCCppListener { 026 027 Socket clientSocket; 028 BufferedReader inStream; 029 OutputStream outStream; 030 @GuardedBy ("replyQueue") 031 final LinkedList<DCCppReply> replyQueue = new LinkedList<>(); // Init before Rx and Tx 032 033 Thread txThread; 034 String inString; 035 String remoteAddress; 036 DCCppMessage lastSentMessage; 037 private static final String oldSendPrefix = "SEND"; // lack of space is correct for legacy code 038 private static final String oldReceivePrefix = "RECEIVE "; // presence of space is correct for legacy code 039 private static final String sendPrefix = "<"; 040 private static final String oldServerVersionString = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility 041 private static final String newServerVersionString = "VERSION DCC++ Server "; 042 boolean useOldPrefix = false; 043 044 public ClientRxHandler(String newRemoteAddress, Socket newSocket) { 045 clientSocket = newSocket; 046 setDaemon(true); 047 setPriority(Thread.MAX_PRIORITY); 048 remoteAddress = newRemoteAddress; 049 setName("ClientRxHandler:" + remoteAddress); 050 lastSentMessage = null; 051 start(); 052 } 053 054 @Override 055 public void run() { 056 057 DCCppSystemConnectionMemo memo = InstanceManager.getDefault(DCCppSystemConnectionMemo.class); 058 059 try { 060 txThread = new Thread(new ClientTxHandler(this)); 061 txThread.setDaemon(true); 062 txThread.setPriority(Thread.MAX_PRIORITY); 063 txThread.setName("ClientTxHandler:" + remoteAddress); 064 065 inStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 066 outStream = clientSocket.getOutputStream(); 067 068 memo.getDCCppTrafficController().addDCCppListener(~0, this); 069 070 txThread.start(); 071 072 while (!isInterrupted()) { 073 inString = inStream.readLine(); 074 if (inString == null) { 075 log.debug("ClientRxHandler: Remote Connection Closed"); 076 interrupt(); 077 } else { 078 log.debug("ClientRxHandler: Received: {}", inString); 079 080 // Check for the old server version string. If present, 081 // append the old-style prefixes to transmissions. 082 // Not sure this ever happens. Only the client sends 083 // the version string. 084 if (inString.startsWith(oldServerVersionString)) { 085 useOldPrefix = true; 086 } 087 // Legacy support: If the old prefix is there, delete it. 088 // Also, set the flag so we will start sending old-style 089 // prefixes. 090 if (inString.startsWith(oldSendPrefix)) { 091 useOldPrefix = true; 092 final int trim = oldSendPrefix.length(); 093 inString = inString.substring(trim); 094 log.debug("Adapted String: {}", inString); 095 } 096 // Check for the opening bracket 097 if (!inString.startsWith(sendPrefix)) { 098 log.debug("Invalid packet format: {}", inString); 099 continue; 100 } 101 102 DCCppMessage msg = new DCCppMessage(inString.substring(inString.indexOf('<') + 1, 103 inString.lastIndexOf('>'))); 104 105 memo.getDCCppTrafficController().sendDCCppMessage(msg, null); 106 // Keep the message we just sent so we can ACK it when we hear 107 // the echo from the LocoBuffer 108 lastSentMessage = msg; 109 } 110 } 111 } catch (IOException ex) { 112 log.debug("ClientRxHandler: IO Exception: ", ex); 113 } 114 115 memo.getDCCppTrafficController().removeDCCppListener(~0, this); 116 txThread.interrupt(); 117 118 txThread = null; 119 inStream = null; 120 outStream = null; 121 synchronized (replyQueue) { 122 replyQueue.clear(); 123 } 124 125 try { 126 clientSocket.close(); 127 } catch (IOException ex1) { 128 log.trace("Exception while closing client socket",ex1); 129 } 130 131 InstanceManager.getDefault(Server.class).removeClient(this); 132 log.info("ClientRxHandler: Exiting"); 133 } 134 135 public void close() { 136 try { 137 clientSocket.close(); 138 } catch (IOException ex1) { 139 log.error("close, which closing clientSocket", ex1); 140 } 141 } 142 143 class ClientTxHandler implements Runnable { 144 145 DCCppReply msg; 146 StringBuilder outBuf; 147 Thread parentThread; 148 149 ClientTxHandler(Thread creator) { 150 parentThread = creator; 151 } 152 153 @Override 154 public void run() { 155 156 try { 157 outBuf = new StringBuilder(newServerVersionString); 158 outBuf.append(jmri.Version.name()).append("\r\n"); 159 outStream.write(outBuf.toString().getBytes()); 160 161 while (!isInterrupted()) { 162 msg = null; 163 164 synchronized (replyQueue) { 165 if (replyQueue.isEmpty()) { 166 replyQueue.wait(); 167 } 168 169 if (!replyQueue.isEmpty()) { 170 msg = replyQueue.removeFirst(); 171 log.debug("Prepping to send message: {}", msg); 172 } 173 } 174 175 if (msg != null) { 176 outBuf.setLength(0); 177 if (useOldPrefix) { 178 outBuf.append(oldReceivePrefix); 179 } 180 outBuf.append("<"); 181 outBuf.append(msg.toString()); 182 outBuf.append(">"); 183 log.debug("ClientTxHandler: Send: {}", outBuf); 184 outBuf.append("\r\n"); 185 outStream.write(outBuf.toString().getBytes()); 186 outStream.flush(); 187 } 188 } 189 } catch (IOException ex) { 190 log.error("ClientTxHandler: IO Exception"); 191 } catch (InterruptedException ex) { 192 Thread.currentThread().interrupt(); // retain if needed later 193 log.debug("ClientTxHandler: Interrupted Exception"); 194 } 195 // Interrupt the Parent to let it know we are exiting for some reason 196 parentThread.interrupt(); 197 198 parentThread = null; 199 msg = null; 200 outBuf = null; 201 log.info("ClientTxHandler: Exiting"); 202 } 203 } 204 205 @Override 206 public void message(DCCppMessage msg) { 207 // no need to handle outgoing messages 208 } 209 210 @Override 211 public void message(DCCppReply msg) { 212 synchronized (replyQueue) { 213 replyQueue.add(msg); 214 replyQueue.notifyAll(); 215 } 216 log.debug("Message added to queue: {}", msg); 217 } 218 219 @Override 220 public void notifyTimeout(DCCppMessage m) { 221 // ToDo : handle timeouts 222 } 223 224 private static final Logger log = LoggerFactory.getLogger(ClientRxHandler.class); 225 226}