001package jmri.jmrix.can.adapters.gridconnect; 002 003import java.io.DataInputStream; 004import java.io.FilterInputStream; 005import java.io.IOException; 006import java.io.InputStream; 007import java.util.concurrent.BlockingQueue; 008import java.util.concurrent.LinkedBlockingQueue; 009 010import jmri.jmrix.ConnectionStatus; 011import jmri.jmrix.can.TrafficController; 012 013/** 014 * Implements SerialPortAdapter for the GridConnect protocol. 015 * 016 * @author Bob Jacobsen Copyright (C) 2001, 2002 017 * @author Andrew Crosland Copyright (C) 2008 018 * @author Balazs Racz Copyright (C) 2017 019 */ 020public class GcSerialDriverAdapter extends GcPortController { 021 022 protected FlowControl flowControl = FlowControl.NONE; // disabled to start 023 024 /** 025 * Creates a new CAN GridConnect Network Driver Adapter. 026 */ 027 public GcSerialDriverAdapter() { 028 super(new jmri.jmrix.can.CanSystemConnectionMemo()); 029 option1Name = "Protocol"; // NOI18N 030 options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"), 031 jmri.jmrix.can.ConfigurationManager.getSystemOptions())); 032 this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG; 033 allowConnectionRecovery = true; 034 } 035 036 /** 037 * Creates a new CAN GridConnect Network Driver Adapter. 038 * <p> 039 * Allows for default systemPrefix other than "M". 040 * @param prefix System Prefix. 041 */ 042 public GcSerialDriverAdapter(String prefix) { 043 super(new jmri.jmrix.can.CanSystemConnectionMemo(prefix)); 044 option1Name = "Protocol"; // NOI18N 045 options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"), 046 jmri.jmrix.can.ConfigurationManager.getSystemOptions())); 047 this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG; 048 allowConnectionRecovery = true; 049 } 050 051 /** 052 * Creates a new CAN GridConnect Network Driver Adapter. 053 * <p> 054 * Allows for default systemPrefix other than "M". 055 * @param prefix System Prefix. 056 * @param flow flow control, true for RTS/CTS 057 */ 058 public GcSerialDriverAdapter(String prefix, FlowControl flow) { 059 super(new jmri.jmrix.can.CanSystemConnectionMemo(prefix)); 060 option1Name = "Protocol"; // NOI18N 061 options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"), 062 jmri.jmrix.can.ConfigurationManager.getSystemOptions())); 063 this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG; 064 allowConnectionRecovery = true; 065 flowControl = flow; 066 } 067 068 /** 069 * {@inheritDoc} 070 */ 071 @Override 072 public String openPort(String portName, String appName) { 073 074 // get and open the primary port 075 currentSerialPort = activatePort(portName, log); 076 if (currentSerialPort == null) { 077 log.error("failed to connect CAN to {}", portName); 078 return Bundle.getMessage("SerialPortNotFound", portName); 079 } 080 log.info("Connecting CAN to {} {}", portName, currentSerialPort); 081 082 // try to set it for communication via SerialDriver 083 // find the baud rate value, configure comm options 084 int baud = currentBaudNumber(mBaudRate); 085 setBaudRate(currentSerialPort, baud); 086 configureLeads(currentSerialPort, true, true); 087 localSetFlowControl(); 088 089 // get and save stream 090 serialStream = currentSerialPort.getInputStream(); 091 // this is referenced in several other methods, 092 // so can't easily be removed. 093 094 // report status 095 reportPortStatus(log, portName); 096 097 opened = true; 098 099 return null; // indicates OK return 100 } 101 102 /** 103 * Local set up the flow contro, here to allow override 104 */ 105 protected void localSetFlowControl() { 106 setFlowControl(currentSerialPort, flowControl); 107 } 108 109 /** 110 * Set up all of the other objects to operate with a CAN RS adapter 111 * connected to this port. 112 * {@inheritDoc} 113 */ 114 @Override 115 public void configure() { 116 // Register the CAN traffic controller being used for this connection 117 //GcTrafficController.instance(); 118 TrafficController tc = makeGcTrafficController(); 119 this.getSystemConnectionMemo().setTrafficController(tc); 120 121 // Now connect to the traffic controller 122 log.debug("Connecting port"); 123 tc.connectPort(this); 124 125 this.getSystemConnectionMemo().setProtocol(getOptionState(option1Name)); 126 127 // do central protocol-specific configuration 128 //jmri.jmrix.can.ConfigurationManager.configure(getOptionState(option1Name)); 129 this.getSystemConnectionMemo().configureManagers(); 130 } 131 132 /** 133 * {@inheritDoc} 134 * Reconnects to Traffic Controller. 135 * Updates connection status. 136 */ 137 @Override 138 protected void resetupConnection() { 139 if (!getSystemConnectionMemo().getTrafficController().status()) { 140 getSystemConnectionMemo().getTrafficController().connectPort(this); 141 ConnectionStatus.instance().setConnectionState(getUserName(), getCurrentPortName(), 142 ((getSystemConnectionMemo().getTrafficController().status() && status()) ? ConnectionStatus.CONNECTION_UP : ConnectionStatus.CONNECTION_DOWN)); 143 } 144 } 145 146 /** 147 * {@inheritDoc} 148 * 149 * Closes serial streams. 150 */ 151 @Override 152 protected void closeConnection(){ 153 log.info("Closing connection {}.",getCurrentPortName()); 154 try { 155 if (serialStream!=null) { 156 serialStream.close(); 157 } 158 serialStream = null; 159 if (bufferedStream!=null) { 160 bufferedStream.close(); 161 } 162 bufferedStream = null; 163 } 164 catch ( IOException e ) { 165 log.error("unable to close {}",this.currentSerialPort); 166 } 167 if (currentSerialPort!=null) { 168 currentSerialPort.closePort(); 169 } 170 currentSerialPort = null; 171 } 172 173 /** 174 * Make a new GC Traffic Controller. 175 * @return new GridConnect Traffic Controller. 176 */ 177 protected GcTrafficController makeGcTrafficController() { 178 return new GcTrafficController(); 179 } 180 181 /** 182 * Helper class wrapping the input serial port's InputStream. 183 * <p> 184 * It starts a helper thread at high priority that reads the input serial 185 * port as fast as it can, buffering all incoming data in memory in a queue. 186 * <p> 187 * The queue is unbounded and readers will get the data from the queue. 188 * <p> 189 * This class is thread-safe. 190 */ 191 private static class AsyncBufferInputStream extends FilterInputStream { 192 193 private boolean active; 194 195 /** 196 * Create new AsyncBufferInputStream. 197 * @param inputStream Input Stream. 198 * @param portName Port Name. 199 */ 200 AsyncBufferInputStream(InputStream inputStream, String portName) { 201 super(inputStream); 202 this.portName = portName; 203 active = true; 204 Thread rt = jmri.util.ThreadingUtil.newThread(this::readThreadBody); 205 rt.setName("GcSerialPort InputBufferThread " + portName); 206 rt.setDaemon(true); 207 rt.setPriority(Thread.MAX_PRIORITY); 208 rt.start(); 209 } 210 211 /** 212 * Helper function that tries to perform a read from the underlying port 213 * with a given maximum length. 214 * 215 * @param maxLen how many bytes to request from the port. Setting this to 1 216 * will apparently block the thread if there are zero bytes 217 * available. 218 * @return a block of data read, or nullptr if fatal IO errors make 219 * further use of this port impossible. 220 */ 221 private BufferEntry tryRead(int maxLen) { 222 BufferEntry tail = new BufferEntry(); 223 try { 224 // read what's available, up to maxLen, but always at least 1 225 int len = Math.max (1, Math.min(maxLen, in.available())); 226 tail.data = new byte[len]; 227 tail.len = in.read(tail.data, 0, len); 228 errorCount = 0; 229 } catch (IOException e) { 230 tail.e = e; 231 if (++errorCount > MAX_IO_ERRORS_TO_ABORT) { 232 log.error("Closing read thread due to too many IO errors", e); 233 return null; 234 } else { 235 log.debug("Error reading serial port {}", portName, e); 236 } 237 } 238 return tail; 239 } 240 241 /** 242 * Implementation of the buffering thread. 243 */ 244 private void readThreadBody() { 245 BufferEntry tail; 246 while (active) { 247 // Try to read one byte to block the thread. 248 tail = tryRead(1); 249 if (tail == null) { 250 return; 251 } 252 // NOTE: in order to reuse this class in a generic context, we need to add support 253 // for the underlying input stream persistently returning EOF. That does not 254 // happen on a serial port. 255 if (tail.len > 0 || tail.e != null) { 256 readAhead.add(tail); 257 } else { 258 continue; 259 } 260 // Read as many bytes as we have in large increments. Reading 128 bytes is a good 261 // compromise between throughput (4 gridconnect packets per kernel IO) but not 262 // wasting a lot of memory if less data actually shows up. 263 do { 264 tail = tryRead(128); 265 if (tail == null) { 266 return; 267 } 268 if (tail.len > 0 || tail.e != null) { 269 readAhead.add(tail); 270 } else { 271 break; 272 } 273 } while (true); 274 } 275 } 276 277 /** 278 * We queue objects of this class between the read thread and the actual 279 * read() methods. 280 */ 281 private static class BufferEntry { 282 283 // data payload 284 byte[] data; 285 // how many bytes of data are filled in 286 int len = 0; 287 // an exception was caught reading the input stream 288 IOException e = null; 289 } 290 291 /** 292 * {@inheritDoc} 293 */ 294 @Override 295 public int read() throws IOException { 296 throw new UnsupportedOperationException(); 297 } 298 299 /** 300 * {@inheritDoc} 301 */ 302 @Override 303 public int read(byte[] bytes) throws IOException { 304 throw new UnsupportedOperationException(); 305 } 306 307 /** 308 * {@inheritDoc} 309 */ 310 @Override 311 public synchronized int read(byte[] bytes, int skip, int len) throws IOException { 312 if (skip != 0) { 313 throw new UnsupportedOperationException(); 314 } 315 if (head == null || headOfs >= head.len) { 316 try { 317 head = readAhead.take(); 318 } catch (InterruptedException e) { 319 Thread.currentThread().interrupt(); 320 } 321 if (head.e != null) { 322 throw head.e; 323 } 324 headOfs = 0; 325 if (head.len < 0) { 326 return -1; 327 } 328 } 329 int cp = head.len - headOfs; 330 if (cp > len) { 331 cp = len; 332 } 333 System.arraycopy(head.data, headOfs, bytes, 0, cp); 334 headOfs += cp; 335 return cp; 336 } 337 338 private final String portName; 339 // After this many consecutive read attempts resulting in an exception we will terminate 340 // the read thread and return the last exception to the reader. 341 private final static int MAX_IO_ERRORS_TO_ABORT = 10; 342 // Queue holding the buffered data. 343 private final BlockingQueue<BufferEntry> readAhead = new LinkedBlockingQueue<>(); 344 // The last entry we got from the queue if there are still bytes we need to return from it. 345 BufferEntry head = null; 346 // Offset of next live byte in head. 347 int headOfs = 0; 348 // How many of the last consecutive read attempts have resulted in an exception. 349 int errorCount = 0; 350 351 @Override 352 public void close() throws IOException { 353 active = false; 354 super.close(); 355 } 356 } 357 358 /** 359 * Base class methods for the PortController interface. 360 * {@inheritDoc} 361 */ 362 @Override 363 public DataInputStream getInputStream() { 364 if (!opened) { 365 log.error("getInputStream called before load(), stream not available"); 366 return null; 367 } 368 synchronized (this) { 369 if (bufferedStream == null) { 370 bufferedStream = new AsyncBufferInputStream(serialStream, currentSerialPort.toString()); 371 } 372 return new DataInputStream(bufferedStream); 373 } 374 } 375 376 /** 377 * {@inheritDoc} 378 * 379 * @return array of localized valid baud rates 380 */ 381 @Override 382 public String[] validBaudRates() { 383 return new String[]{Bundle.getMessage("Baud57600"), 384 Bundle.getMessage("Baud115200"), Bundle.getMessage("Baud230400"), 385 Bundle.getMessage("Baud250000"), Bundle.getMessage("Baud333333"), 386 Bundle.getMessage("Baud460800")}; 387 } 388 389 /** 390 * Get an array of valid baud rates. 391 * 392 * @return valid baud rates 393 */ 394 @Override 395 public int[] validBaudNumbers() { 396 return new int[]{57600, 115200, 230400, 250000, 333333, 460800}; 397 } 398 399 /** 400 * {@inheritDoc} 401 */ 402 @Override 403 public int defaultBaudIndex() { 404 return 0; 405 } 406 407 // private control members 408 InputStream serialStream = null; 409 // Stream wrapper that buffers the input bytes. 410 private InputStream bufferedStream = null; 411 412 private final static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(GcSerialDriverAdapter.class); 413 414}