001package jmri.jmrit.logixng.util; 002 003import java.awt.event.ActionEvent; 004import java.util.*; 005import java.util.concurrent.ArrayBlockingQueue; 006import java.util.concurrent.BlockingQueue; 007 008import javax.swing.Timer; 009import javax.annotation.Nonnull; 010import javax.annotation.concurrent.ThreadSafe; 011 012import jmri.util.*; 013 014import jmri.util.ThreadingUtil.ThreadAction; 015 016/** 017 * Utilities for handling JMRI's LogixNG threading conventions. 018 * <p> 019 * For background, see 020 * <a href="http://jmri.org/help/en/html/doc/Technical/Threads.shtml">http://jmri.org/help/en/html/doc/Technical/Threads.shtml</a> 021 * <p> 022 * This is the ThreadingUtil class for LogixNG. 023 * 024 * @author Bob Jacobsen Copyright 2015 025 * @author Daniel Bergqvist Copyright 2020 026 */ 027@ThreadSafe 028public class LogixNG_Thread { 029 030 public static final int DEFAULT_LOGIXNG_THREAD = 0; 031 public static final int DEFAULT_LOGIXNG_DEBUG_THREAD = 1; 032 033 private static final Map<Integer, LogixNG_Thread> _threads = new HashMap<>(); 034 private static final Map<String, LogixNG_Thread> _threadNames = new HashMap<>(); 035 private static int _highestThreadID = -1; 036 037 private final int _threadID; 038 private String _name; 039 private volatile boolean _stopThread = false; 040 private volatile boolean _threadIsStopped = false; 041 042 private final Thread _logixNGThread; 043 private boolean _threadInUse = false; 044 private final BlockingQueue<ThreadEvent> _eventQueue; 045 046 047 public static LogixNG_Thread createNewThread(@Nonnull String name) { 048 return createNewThread(-1, name); 049 } 050 051 public static LogixNG_Thread createNewThread(int threadID, @Nonnull String name) { 052 synchronized (LogixNG_Thread.class) { 053 if (threadID == -1) { 054 threadID = ++_highestThreadID; 055 } else { 056 if (threadID > _highestThreadID) _highestThreadID = threadID; 057 } 058 059 if (_threads.containsKey(threadID) && _threads.get(threadID)._name.equals(name)) { 060 log.warn("Thread ID {} with name {} already exists", threadID, name); 061 return _threads.get(threadID); 062 } 063 064 if (_threads.containsKey(threadID)) { 065 throw new IllegalArgumentException(String.format("Thread ID %d already exists", threadID)); 066 } 067 068 if (_threadNames.containsKey(name)) { 069 throw new IllegalArgumentException(String.format("Thread name %s already exists", name)); 070 } 071 LogixNG_Thread thread = new LogixNG_Thread(threadID, name); 072 _threads.put(threadID, thread); 073 _threadNames.put(name, thread); 074 thread._logixNGThread.start(); 075 076 return thread; 077 } 078 } 079 080 public static boolean validateNewThreadName(@Nonnull String name) { 081 synchronized (LogixNG_Thread.class) { 082 return !_threadNames.containsKey(name); 083 } 084 } 085 086 public static LogixNG_Thread getThread(int threadID) { 087 synchronized (LogixNG_Thread.class) { 088 LogixNG_Thread thread = _threads.get(threadID); 089 if (thread == null) { 090 switch (threadID) { 091 case DEFAULT_LOGIXNG_THREAD: 092 thread = createNewThread(DEFAULT_LOGIXNG_THREAD, Bundle.getMessage("LogixNG_Thread")); 093 break; 094 case DEFAULT_LOGIXNG_DEBUG_THREAD: 095 thread = createNewThread(DEFAULT_LOGIXNG_DEBUG_THREAD, Bundle.getMessage("LogixNG_DebugThread")); 096 break; 097 default: 098 throw new IllegalArgumentException(String.format("Thread ID %d does not exists", threadID)); 099 } 100 } 101 return thread; 102 } 103 } 104 105 public static int getThreadID(String name) { 106 synchronized (LogixNG_Thread.class) { 107 for (LogixNG_Thread t : _threads.values()) { 108 if (name.equals(t._name)) return t._threadID; 109 } 110 throw new IllegalArgumentException(String.format("Thread name \"%s\" does not exists", name)); 111 } 112 } 113 114 public static void deleteThread(LogixNG_Thread thread) { 115 synchronized (LogixNG_Thread.class) { 116 LogixNG_Thread aThread = _threads.get(thread._threadID); 117 118 if (aThread == null) throw new IllegalArgumentException("Thread does not exists"); 119 if (aThread != thread) throw new IllegalArgumentException("Thread ID does not match"); 120 if (aThread._threadInUse) throw new IllegalArgumentException("Thread is in use"); 121 122 _threads.remove(thread._threadID); 123 _threadNames.remove(thread._name); 124 } 125 } 126 127 public static Collection<LogixNG_Thread> getThreads() { 128 return Collections.unmodifiableCollection(_threads.values()); 129 } 130 131 private LogixNG_Thread(int threadID, String name) { 132 _threadID = threadID; 133 _name = name; 134 135 synchronized(LogixNG_Thread.class) { 136 137 _eventQueue = new ArrayBlockingQueue<>(1024); 138 _logixNGThread = new Thread(() -> { 139 while (!_stopThread) { 140 try { 141 ThreadEvent event = _eventQueue.take(); 142 if (event._lock != null) { 143 synchronized(event._lock) { 144 if (!_stopThread) event._threadAction.run(); 145 event._lock.notify(); 146 } 147 } else { 148 event._threadAction.run(); 149 } 150 } catch (InterruptedException ex) { 151 Thread.currentThread().interrupt(); 152 } 153 } 154 _threadIsStopped = true; 155 }, "JMRI LogixNGThread"); 156 157 _logixNGThread.setDaemon(true); 158 } 159 } 160 161 public Thread getThread() { 162 return _logixNGThread; 163 } 164 165 public int getThreadId() { 166 return _threadID; 167 } 168 169 public String getThreadName() { 170 return _name; 171 } 172 173 public void setThreadName(@Nonnull String name) { 174 if (_name.equals(name)) return; 175 176 synchronized (LogixNG_Thread.class) { 177 if (_threadNames.containsKey(name)) { 178 throw new IllegalArgumentException(String.format("Thread name %s already exists", name)); 179 } 180 _threadNames.remove(_name); 181 _threadNames.put(name, this); 182 _name = name; 183 } 184 } 185 186 public boolean getThreadInUse() { 187 return _threadInUse; 188 } 189 190 /** 191 * Set the thread to "in use". 192 * If a thread is in use, it cannot be unset as not in use. 193 */ 194 public void setThreadInUse() { 195 _threadInUse = true; 196 } 197 198 /** 199 * Run some LogixNG-specific code before returning. 200 * <p> 201 * Typical uses: 202 * <p> {@code 203 * ThreadingUtil.runOnLogixNG(() -> { 204 * logixNG.doSomething(value); 205 * }); 206 * } 207 * 208 * @param ta What to run, usually as a lambda expression 209 */ 210 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value = {"WA_NOT_IN_LOOP", "UW_UNCOND_WAIT"}, 211 justification="Method runOnLogixNG() doesn't have a loop. Waiting for single possible event."+ 212 "The thread that is going to call notify() cannot get"+ 213 " it's hands on the lock until wait() is called, "+ 214 " since the caller must first fetch the event from the"+ 215 " event queue and the event is put on the event queue in"+ 216 " the synchronize block.") 217 public void runOnLogixNG(@Nonnull ThreadAction ta) { 218 if (_logixNGThread != null) { 219 Object lock = new Object(); 220 synchronized(lock) { 221 _eventQueue.add(new ThreadEvent(ta, lock)); 222 try { 223 lock.wait(); 224 } catch (InterruptedException e) { 225 log.debug("Interrupted while running on LogixNG thread"); 226 Thread.currentThread().interrupt(); 227 } 228 } 229 } else { 230 throw new RuntimeException("LogixNG thread not started. ThreadID: "+Integer.toString(_threadID)); 231 } 232 } 233 234 /** 235 * Run some LogixNG-specific code at some later point. 236 * <p> 237 * Please note the operation may have happened before this returns. Or 238 * later. No long-term guarantees. 239 * <p> 240 * Typical uses: 241 * <p> {@code 242 * ThreadingUtil.runOnLogixNGEventually(() -> { 243 * sensor.setState(value); 244 * }); 245 * } 246 * 247 * @param ta What to run, usually as a lambda expression 248 */ 249 public void runOnLogixNGEventually(@Nonnull ThreadAction ta) { 250 if (_logixNGThread != null) { 251 _eventQueue.add(new ThreadEvent(ta)); 252 } else { 253 throw new RuntimeException("LogixNG thread not started"); 254 } 255 } 256 257 /** 258 * Run some LogixNG-specific code at some later point, at least a known time 259 * in the future. 260 * <p> 261 * There is no long-term guarantee about the accuracy of the interval. 262 * <p> 263 * Typical uses: 264 * <p> {@code 265 * ThreadingUtil.runOnLogixNGDelayed(() -> { 266 * sensor.setState(value); 267 * }, 1000); 268 * } 269 * 270 * @param ta What to run, usually as a lambda expression 271 * @param delay interval in milliseconds 272 * @return reference to timer object handling delay so you can cancel if desired; note that operation may have already taken place. 273 */ 274 @Nonnull 275 public Timer runOnLogixNGDelayed(@Nonnull ThreadAction ta, int delay) { 276 if (_logixNGThread != null) { 277 // dispatch to logixng thread via timer. We are forced to use a 278 // Swing Timer since the method returns a Timer object and we don't 279 // want to change the method interface. 280 Timer timer = new Timer(delay, (ActionEvent e) -> { 281 // Dispatch the event to the LogixNG event handler once the time 282 // has passed. 283 _eventQueue.add(new ThreadEvent(ta)); 284 }); 285 timer.setRepeats(false); 286 timer.start(); 287 return timer; 288 } else { 289 throw new RuntimeException("LogixNG thread not started"); 290 } 291 } 292 293 public boolean isQueueEmpty() { 294 return _eventQueue.isEmpty(); 295 } 296 297 /** 298 * Check if on the LogixNG-operation thread. 299 * 300 * @return true if on the LogixNG-operation thread 301 */ 302 public boolean isLogixNGThread() { 303 if (_logixNGThread != null) { 304 return _logixNGThread == Thread.currentThread(); 305 } else { 306 throw new RuntimeException("LogixNG thread not started"); 307 } 308 } 309 310 /** 311 * Checks if the the current thread is the LogixNG thread. 312 * The check is only done if debug is enabled. 313 */ 314 public void checkIsLogixNGThread() { 315 if (log.isDebugEnabled()) { 316 if (!isLogixNGThread()) { 317 LoggingUtil.warnOnce(log, "checkIsLogixNGThread() called on wrong thread", new Exception()); 318 } 319 } 320 } 321 322 static private class ThreadEvent { 323 private final ThreadAction _threadAction; 324 private final Object _lock; 325 326 public ThreadEvent(ThreadAction threadAction) { 327 _threadAction = threadAction; 328 _lock = null; 329 } 330 331 public ThreadEvent(ThreadAction threadAction, 332 Object lock) { 333 _threadAction = threadAction; 334 _lock = lock; 335 } 336 } 337 338 private void stopLogixNGThread() { 339 synchronized(LogixNG_Thread.class) { 340 if (_logixNGThread != null) { 341 _stopThread = true; 342 _logixNGThread.interrupt(); 343 try { 344 _logixNGThread.join(0); 345 } catch (InterruptedException e) { 346 throw new RuntimeException("stopLogixNGThread() was interrupted"); 347 } 348 if (_logixNGThread.getState() != Thread.State.TERMINATED) { 349 throw new RuntimeException("Could not stop logixNGThread. Current state: "+_logixNGThread.getState().name()); 350 } 351 _threads.remove(_threadID); 352 _threadNames.remove(_name); 353 _stopThread = false; 354 } 355 } 356 } 357 358 public static void stopAllLogixNGThreads() { 359 synchronized(LogixNG_Thread.class) { 360 List<LogixNG_Thread> list = new ArrayList<>(_threads.values()); 361 for (LogixNG_Thread thread : list) { 362 thread.stopLogixNGThread(); 363 } 364 } 365 } 366 367 public static void assertLogixNGThreadNotRunning() { 368 synchronized(LogixNG_Thread.class) { 369 boolean aThreadIsRunning = false; 370 for (LogixNG_Thread thread : _threads.values()) { 371 if (!thread._threadIsStopped) { 372 aThreadIsRunning = true; 373 thread.stopLogixNGThread(); 374 } 375 } 376 if (aThreadIsRunning) { 377 throw new RuntimeException("logixNGThread is running"); 378 } 379 } 380 } 381 382 private final static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(LogixNG_Thread.class); 383 384} 385