001package jmri.jmrix.mqtt; 002 003import java.io.IOException; 004import java.util.*; 005 006import javax.annotation.Nonnull; 007 008import org.apiguardian.api.API; 009import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 010import org.eclipse.paho.client.mqttv3.MqttCallback; 011import org.eclipse.paho.client.mqttv3.MqttClient; 012import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 013import org.eclipse.paho.client.mqttv3.MqttException; 014import org.eclipse.paho.client.mqttv3.MqttMessage; 015import org.eclipse.paho.client.mqttv3.MqttTopic; 016import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; 017 018import org.slf4j.Logger; 019import org.slf4j.LoggerFactory; 020 021/** 022 * Communications adapter for Mqtt communications links. 023 * 024 * @author Lionel Jeanson 025 * @author Bob Jacobsen Copyright (c) 2019, 2023 026 */ 027@API(status=API.Status.MAINTAINED) 028public class MqttAdapter extends jmri.jmrix.AbstractNetworkPortController implements MqttCallback { 029 030 private final static String PROTOCOL = "tcp://"; 031 private final static String DEFAULT_BASETOPIC = Bundle.getMessage("TopicBase"); 032 033 // 0.1 to get it to the front of the list 034 private final static String MQTT_USERNAME_OPTION = "0.1"; 035 036 // 0.2 to get it to the front of the list 037 private final static String MQTT_PASSWORD_OPTION = "0.2"; 038 039 public boolean retained = true; // public for script access 040 public int qosflag = 2; // public for script access 041 042 /** 043 * Otherwise known as "Channel", this is prepended to the 044 * topic for all JMRI inward and outward communications. 045 * Typically set by preferences at startup. Changing it 046 * after startup might have no or bad effect. 047 */ 048 @API(status=API.Status.MAINTAINED) 049 public String baseTopic = DEFAULT_BASETOPIC; 050 051 HashMap<String, ArrayList<MqttEventListener>> mqttEventListeners = new HashMap<>(); 052 053 MqttClient mqttClient; 054 055 @API(status=API.Status.INTERNAL) 056 public MqttAdapter() { 057 super(new MqttSystemConnectionMemo()); 058 log.debug("Doing ctor..."); 059 060 options.put(MQTT_USERNAME_OPTION, new Option(Bundle.getMessage("MQTT_Username"), 061 new String[]{""}, Option.Type.TEXT)); 062 063 options.put(MQTT_PASSWORD_OPTION, new Option(Bundle.getMessage("MQTT_Password"), 064 new String[]{""}, Option.Type.PASSWORD)); 065 066 option2Name = "0 MQTTchannel"; // 0 to get it to the front of the list 067 options.put(option2Name, new Option(Bundle.getMessage("NameTopicBase"), 068 new String[]{baseTopic}, Option.Type.TEXT)); 069 070 options.put("10.3", new Option(Bundle.getMessage("NameTopicTurnoutSend"), 071 new String[]{Bundle.getMessage("TopicTurnoutSend")}, Option.Type.TEXT)); 072 options.put("10.5", new Option(Bundle.getMessage("NameTopicTurnoutRcv"), 073 new String[]{Bundle.getMessage("TopicTurnoutRcv")}, Option.Type.TEXT)); 074 075 076 options.put("11.3", new Option(Bundle.getMessage("NameTopicSensorSend"), 077 new String[]{Bundle.getMessage("TopicSensorSend")}, Option.Type.TEXT)); 078 options.put("11.5", new Option(Bundle.getMessage("NameTopicSensorRcv"), 079 new String[]{Bundle.getMessage("TopicSensorRcv")}, Option.Type.TEXT)); 080 081 options.put("12.3", new Option(Bundle.getMessage("NameTopicLightSend"), 082 new String[]{Bundle.getMessage("TopicLightSend")}, Option.Type.TEXT)); 083 options.put("12.5", new Option(Bundle.getMessage("NameTopicLightRcv"), 084 new String[]{Bundle.getMessage("TopicLightRcv")}, Option.Type.TEXT)); 085 086 options.put("13", new Option("Reporter topic :", new String[]{Bundle.getMessage("TopicReporter")}, Option.Type.TEXT)); 087 options.put("14", new Option("Signal Head topic :", new String[]{Bundle.getMessage("TopicSignalHead")}, Option.Type.TEXT)); 088 options.put("15", new Option("Signal Mast topic :", new String[]{Bundle.getMessage("TopicSignalMast")}, Option.Type.TEXT)); 089 options.put("16.3", new Option(Bundle.getMessage("NameTopicThrottleSend"), 090 new String[]{Bundle.getMessage("TopicThrottleSend")}, Option.Type.TEXT)); 091 options.put("16.5", new Option(Bundle.getMessage("NameTopicThrottleRcv"), 092 new String[]{Bundle.getMessage("TopicThrottleRcv")}, Option.Type.TEXT)); 093 options.put("17.3", new Option(Bundle.getMessage("NameTopicDirectionSend"), 094 new String[]{Bundle.getMessage("TopicDirectionSend")}, Option.Type.TEXT)); 095 options.put("17.5", new Option(Bundle.getMessage("NameTopicDirectionRcv"), 096 new String[]{Bundle.getMessage("TopicDirectionRcv")}, Option.Type.TEXT)); 097 options.put("18.3", new Option(Bundle.getMessage("NameTopicFunctionSend"), 098 new String[]{Bundle.getMessage("TopicFunctionSend")}, Option.Type.TEXT)); 099 options.put("18.5", new Option(Bundle.getMessage("NameTopicFunctionRcv"), 100 new String[]{Bundle.getMessage("TopicFunctionRcv")}, Option.Type.TEXT)); 101 options.put("19.3", new Option(Bundle.getMessage("NameTopicConsistSend"), 102 new String[]{Bundle.getMessage("TopicConsistSend")}, Option.Type.TEXT)); 103 options.put("20.3", new Option(Bundle.getMessage("NameTopicPowerSend"), 104 new String[]{Bundle.getMessage("TopicPowerSend")}, Option.Type.TEXT)); 105 options.put("20.5", new Option(Bundle.getMessage("NameTopicPowerRcv"), 106 new String[]{Bundle.getMessage("TopicPowerRcv")}, Option.Type.TEXT)); 107 108 options.put("LastWillTopic", new Option(Bundle.getMessage("NameTopicLastWill"), 109 new String[]{Bundle.getMessage("TopicLastWill")}, Option.Type.TEXT)); 110 options.put("LastWillMessage", new Option(Bundle.getMessage("NameMessageLastWill"), 111 new String[]{Bundle.getMessage("MessageLastWill")}, Option.Type.TEXT)); 112 allowConnectionRecovery = true; 113 114 } 115 116 public MqttConnectOptions getMqttConnectionOptions() { 117 118 // Setup the MQTT Connection Options 119 MqttConnectOptions mqttConnOpts = new MqttConnectOptions(); 120 mqttConnOpts.setCleanSession(true); 121 mqttConnOpts.setMaxInflight(100); 122 123 if ( getOptionState(MQTT_USERNAME_OPTION) != null 124 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) { 125 mqttConnOpts.setUserName(getOptionState(MQTT_USERNAME_OPTION)); 126 mqttConnOpts.setPassword(getOptionState(MQTT_PASSWORD_OPTION).toCharArray()); 127 } 128 129 //set Last Will 130 if (! getOptionState("LastWillTopic").isEmpty() 131 && ! getOptionState("LastWillMessage").isEmpty()) { 132 mqttConnOpts.setWill(baseTopic + getOptionState("LastWillTopic"), 133 getOptionState("LastWillMessage").getBytes(), 134 qosflag, 135 true); 136 } 137 138 return mqttConnOpts; 139 } 140 141 @Override 142 @API(status=API.Status.INTERNAL) 143 public void configure() { 144 log.debug("Doing configure..."); 145 mqttEventListeners = new HashMap<>(); 146 getSystemConnectionMemo().setMqttAdapter(this); 147 getSystemConnectionMemo().configureManagers(); 148 } 149 150 @Override 151 @API(status=API.Status.INTERNAL) 152 public void connect() throws IOException { 153 log.info("MQTT starting connect with MQTTchannel = \"{}\"", getOptionState(option2Name)); 154 155 try { 156 if ( getOptionState(option2Name)!= null && ! getOptionState(option2Name).trim().isEmpty()) { 157 baseTopic = getOptionState(option2Name); 158 } 159 160 // have to make that a valid choice, overriding the original above. This 161 // is ugly and temporary. 162 if (! DEFAULT_BASETOPIC.equals(baseTopic)) { 163 options.put(option2Name, new Option("MQTT channel: ", new String[]{baseTopic, DEFAULT_BASETOPIC})); 164 } 165 166 // generate a unique client ID based on the network ID and the system prefix of the MQTT connection. 167 String clientID = jmri.InstanceManager.getDefault(jmri.web.server.WebServerPreferences.class).getRailroadName(); 168 169 // ensure that only guaranteed valid characters are included in the client ID 170 clientID = clientID.replaceAll("[^A-Za-z0-9]", ""); 171 172 String clientIDsuffix = "JMRI" + Integer.toHexString(jmri.util.node.NodeIdentity.networkIdentity().hashCode()) .toUpperCase() + getSystemPrefix(); 173 174 // Trim railroad name to fit within MQTT client id 23 character limit. 175 if (clientID.length() > 23 - clientIDsuffix.length()) 176 clientID = clientID.substring(0,23 - clientIDsuffix.length()); 177 178 clientID = clientID + clientIDsuffix; 179 180 log.info("Connection {} is using a clientID of \"{}\"", getSystemPrefix(), clientID); 181 182 String tempdirName = jmri.util.FileUtil.getExternalFilename(jmri.util.FileUtil.PROFILE); 183 log.debug("will use {} as temporary directory", tempdirName); 184 185 mqttClient = getNewMqttClient(clientID, tempdirName); 186 187 if ((getOptionState(MQTT_USERNAME_OPTION) != null 188 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) 189 || ( ! getOptionState("LastWillTopic").isEmpty() 190 && ! getOptionState("LastWillMessage").isEmpty())) { 191 mqttClient.connect(getMqttConnectionOptions()); 192 193 } else { 194 mqttClient.connect(); 195 } 196 197 if ( ! getOptionState("LastWillTopic").isEmpty()) { 198 publish(getOptionState("LastWillTopic"), ""); 199 } 200 201 mqttClient.setCallback(this); 202 203 } catch (MqttException ex) { 204 throw new IOException("Can't create MQTT client", ex); 205 } 206 } 207 208 MqttClient getNewMqttClient(String clientID, String tempdirName) throws MqttException { 209 return new MqttClient(PROTOCOL + getCurrentPortName(), 210 clientID, new MqttDefaultFilePersistence(tempdirName)); 211 } 212 213 @Override 214 @API(status=API.Status.MAINTAINED) 215 public MqttSystemConnectionMemo getSystemConnectionMemo() { 216 return (MqttSystemConnectionMemo) super.getSystemConnectionMemo(); 217 } 218 219 @API(status=API.Status.MAINTAINED) 220 public void subscribe(String topic, MqttEventListener mel) { 221 if (mqttEventListeners == null || mqttClient == null) { 222 jmri.util.LoggingUtil.warnOnce(log, "Trying to subscribe before connect/configure is done"); 223 return; 224 } 225 try { 226 String fullTopic = baseTopic + topic; 227 if (mqttEventListeners.containsKey(fullTopic)) { 228 if (!mqttEventListeners.get(fullTopic).contains(mel)) { 229 mqttEventListeners.get(fullTopic).add(mel); 230 } 231 return; 232 } 233 ArrayList<MqttEventListener> mels = new ArrayList<>(); 234 mels.add(mel); 235 mqttEventListeners.put(fullTopic, mels); 236 mqttClient.subscribe(fullTopic); 237 log.debug("Subscribed : \"{}\"", fullTopic); 238 } catch (MqttException ex) { 239 log.error("Can't subscribe : ", ex); 240 } 241 } 242 243 @API(status=API.Status.MAINTAINED) 244 public void unsubscribe(String topic, MqttEventListener mel) { 245 String fullTopic = baseTopic + topic; 246 if (mqttEventListeners == null || mqttClient == null) { 247 jmri.util.LoggingUtil.warnOnce(log, "Trying to unsubscribe before connect/configure is done"); 248 return; 249 } 250 try { 251 mqttEventListeners.get(fullTopic).remove(mel); 252 } catch (NullPointerException e) { 253 // Not subscribed 254 log.debug("Unsubscribe but not subscribed: \"{}\"", fullTopic); 255 return; 256 } 257 if (mqttEventListeners.get(fullTopic).isEmpty()) { 258 try { 259 mqttClient.unsubscribe(fullTopic); 260 mqttEventListeners.remove(fullTopic); 261 log.debug("Unsubscribed : \"{}\"", fullTopic); 262 } catch (MqttException ex) { 263 log.error("Can't unsubscribe : ", ex); 264 } 265 } 266 } 267 268 @API(status=API.Status.MAINTAINED) 269 public void unsubscribeall(MqttEventListener mel) { 270 mqttEventListeners.keySet().forEach((t) -> { 271 unsubscribe(t, mel); 272 }); 273 } 274 275 /** 276 * Send a message over the existing link to a broker. 277 * @param topic The topic, which follows the channel and precedes the payload in the message 278 * @param payload The payload makes up the final part of the message 279 */ 280 @API(status=API.Status.MAINTAINED) 281 public void publish(@Nonnull String topic, @Nonnull byte[] payload) { 282 publish(topic, payload, retained); 283 } 284 285 /** 286 * Send a message over the existing link to a broker. 287 * @param topic The topic, which follows the channel and precedes the payload in the message 288 * @param payload The payload makes up the final part of the message 289 * @param retain Should the message be retained? 290 */ 291 @API(status=API.Status.MAINTAINED) 292 public void publish(@Nonnull String topic, @Nonnull byte[] payload, boolean retain) { 293 try { 294 String fullTopic = baseTopic + topic; 295 mqttClient.publish(fullTopic, payload, qosflag, retain); 296 } catch (MqttException ex) { 297 log.error("Can't publish : ", ex); 298 } 299 } 300 301 /** 302 * Send a message over the existing link to a broker. 303 * @param topic The topic, which follows the channel and precedes the payload in the message 304 * @param payload The payload makes up the final part of the message 305 */ 306 @API(status=API.Status.MAINTAINED) 307 public void publish(@Nonnull String topic, @Nonnull String payload) { 308 publish(topic, payload.getBytes()); 309 } 310 311 /** 312 * Send a message over the existing link to a broker. 313 * @param topic The topic, which follows the channel and precedes the payload in the message 314 * @param retain Should the message be retained? 315 * @param payload The payload makes up the final part of the message 316 */ 317 @API(status=API.Status.MAINTAINED) 318 public void publish(@Nonnull String topic, @Nonnull String payload, boolean retain) { 319 publish(topic, payload.getBytes(), retain); 320 } 321 322 public MqttClient getMQttClient() { 323 return (mqttClient); 324 } 325 326 private void tryToReconnect(boolean showLogMessages) { 327 if (showLogMessages) log.warn("Try to reconnect"); 328 try { 329 if ((getOptionState(MQTT_USERNAME_OPTION) != null 330 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) 331 || ( ! getOptionState("LastWillTopic").isEmpty() 332 && ! getOptionState("LastWillMessage").isEmpty())) { 333 mqttClient.connect(getMqttConnectionOptions()); 334 } else { 335 mqttClient.connect(); 336 } 337 338 if (! getOptionState("LastWillTopic").isEmpty()) { 339 publish(getOptionState("LastWillTopic"), ""); 340 } 341 342 log.warn("Succeeded to reconnect"); 343 344 mqttClient.setCallback(this); 345 Set<String> set = new HashSet<>(mqttEventListeners.keySet()); 346 for (String t : set) { 347 mqttClient.subscribe(t); 348 } 349 } catch (MqttException ex) { 350 if (showLogMessages) log.error("Unable to reconnect", ex); 351 scheduleReconnectTimer(false); 352 } 353 } 354 355 private void scheduleReconnectTimer(boolean showLogMessages) { 356 jmri.util.TimerUtil.scheduleOnLayoutThread(new java.util.TimerTask() { 357 @Override 358 public void run() { 359 tryToReconnect(showLogMessages); 360 } 361 }, 500); 362 } 363 364 @Override 365 @API(status=API.Status.INTERNAL) 366 public void connectionLost(Throwable thrwbl) { 367 log.warn("Lost MQTT broker connection..."); 368 if (this.allowConnectionRecovery) { 369 log.info("...trying to reconnect repeatedly"); 370 scheduleReconnectTimer(true); 371 return; 372 } 373 log.error("Won't reconnect"); 374 } 375 376 @Override 377 @API(status=API.Status.INTERNAL) 378 public void messageArrived(String topic, MqttMessage mm) throws Exception { 379 log.debug("Message received, topic : {} - '{}'", topic, mm); 380 381 boolean found = false; 382 Map<String,ArrayList<MqttEventListener>> tempMap 383 = new HashMap<> (mqttEventListeners); // Avoid ConcurrentModificationException 384 for (Map.Entry<String,ArrayList<MqttEventListener>> e : tempMap.entrySet()) { 385 // does key match received topic, including wildcards? 386 if (MqttTopic.isMatched(e.getKey(), topic) ) { 387 found = true; 388 e.getValue().forEach((mel) -> { 389 try { 390 mel.notifyMqttMessage(topic, mm.toString()); 391 } 392 catch (Exception exception) { 393 log.error("MqttEventListener exception: ", exception); 394 } 395 }); 396 } 397 } 398 399 if (!found) { 400 log.error("No one subscribed to {}", topic); 401 throw new Exception("No subscriber for MQTT topic " + topic); 402 } 403 } 404 405 @Override 406 @API(status=API.Status.INTERNAL) 407 public void deliveryComplete(IMqttDeliveryToken imdt) { 408 log.debug("Message delivered"); 409 } 410 411 412 @Override 413 protected void closeConnection(){ 414 log.debug("Closing MqttAdapter"); 415 try { 416 if (mqttClient != null) { 417 mqttClient.disconnect(); 418 } 419 } 420 catch (Exception exception) { 421 log.error("MqttEventListener exception: ", exception); 422 } 423 424 } 425 426 @Override 427 public void dispose() { 428 log.debug("Disposing MqttAdapter"); 429 closeConnection(); 430 super.dispose(); 431 } 432 433 private final static Logger log = LoggerFactory.getLogger(MqttAdapter.class); 434 435}