001package jmri.jmrix.mqtt;
002
003import java.io.IOException;
004import java.util.*;
005
006import javax.annotation.Nonnull;
007
008import org.apiguardian.api.API;
009import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
010import org.eclipse.paho.client.mqttv3.MqttCallback;
011import org.eclipse.paho.client.mqttv3.MqttClient;
012import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
013import org.eclipse.paho.client.mqttv3.MqttException;
014import org.eclipse.paho.client.mqttv3.MqttMessage;
015import org.eclipse.paho.client.mqttv3.MqttTopic;
016import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
017
018import org.slf4j.Logger;
019import org.slf4j.LoggerFactory;
020
021/**
022 * Communications adapter for Mqtt communications links.
023 *
024 * @author Lionel Jeanson
025 * @author Bob Jacobsen   Copyright (c) 2019, 2023
026 */
027@API(status=API.Status.MAINTAINED)
028public class MqttAdapter extends jmri.jmrix.AbstractNetworkPortController implements MqttCallback {
029
030    private final static String PROTOCOL = "tcp://";
031    private final static String DEFAULT_BASETOPIC = Bundle.getMessage("TopicBase");
032
033    // 0.1 to get it to the front of the list
034    private final static String MQTT_USERNAME_OPTION = "0.1";
035
036    // 0.2 to get it to the front of the list
037    private final static String MQTT_PASSWORD_OPTION = "0.2";
038
039    public boolean retained = true;  // public for script access
040    public int      qosflag = 2;     // public for script access
041
042    /**
043     * Otherwise known as "Channel", this is prepended to the
044     * topic for all JMRI inward and outward communications.
045     * Typically set by preferences at startup.  Changing it
046     * after startup might have no or bad effect.
047     */
048    @API(status=API.Status.MAINTAINED)
049    public String baseTopic = DEFAULT_BASETOPIC;
050
051    HashMap<String, ArrayList<MqttEventListener>> mqttEventListeners = new HashMap<>();
052
053    MqttClient mqttClient;
054
055    @API(status=API.Status.INTERNAL)
056    public MqttAdapter() {
057        super(new MqttSystemConnectionMemo());
058        log.debug("Doing ctor...");
059
060        options.put(MQTT_USERNAME_OPTION, new Option(Bundle.getMessage("MQTT_Username"),
061                new String[]{""},  Option.Type.TEXT));
062
063        options.put(MQTT_PASSWORD_OPTION, new Option(Bundle.getMessage("MQTT_Password"),
064                new String[]{""},  Option.Type.PASSWORD));
065
066        option2Name = "0 MQTTchannel"; // 0 to get it to the front of the list
067        options.put(option2Name, new Option(Bundle.getMessage("NameTopicBase"),
068                                            new String[]{baseTopic}, Option.Type.TEXT));
069
070        options.put("10.3", new Option(Bundle.getMessage("NameTopicTurnoutSend"),
071                new String[]{Bundle.getMessage("TopicTurnoutSend")},  Option.Type.TEXT));
072        options.put("10.5", new Option(Bundle.getMessage("NameTopicTurnoutRcv"),
073                new String[]{Bundle.getMessage("TopicTurnoutRcv")},  Option.Type.TEXT));
074
075
076        options.put("11.3", new Option(Bundle.getMessage("NameTopicSensorSend"),
077                                            new String[]{Bundle.getMessage("TopicSensorSend")},   Option.Type.TEXT));
078        options.put("11.5", new Option(Bundle.getMessage("NameTopicSensorRcv"),
079                                            new String[]{Bundle.getMessage("TopicSensorRcv")},   Option.Type.TEXT));
080
081        options.put("12.3", new Option(Bundle.getMessage("NameTopicLightSend"),
082                                       new String[]{Bundle.getMessage("TopicLightSend")},  Option.Type.TEXT));
083        options.put("12.5", new Option(Bundle.getMessage("NameTopicLightRcv"),
084                                       new String[]{Bundle.getMessage("TopicLightRcv")},  Option.Type.TEXT));
085
086        options.put("13", new Option("Reporter topic :",    new String[]{Bundle.getMessage("TopicReporter")}, Option.Type.TEXT));
087        options.put("14", new Option("Signal Head topic :", new String[]{Bundle.getMessage("TopicSignalHead")}, Option.Type.TEXT));
088        options.put("15", new Option("Signal Mast topic :", new String[]{Bundle.getMessage("TopicSignalMast")}, Option.Type.TEXT));
089        options.put("16.3", new Option(Bundle.getMessage("NameTopicThrottleSend"),
090                                       new String[]{Bundle.getMessage("TopicThrottleSend")},  Option.Type.TEXT));
091        options.put("16.5", new Option(Bundle.getMessage("NameTopicThrottleRcv"),
092                                       new String[]{Bundle.getMessage("TopicThrottleRcv")},  Option.Type.TEXT));
093        options.put("17.3", new Option(Bundle.getMessage("NameTopicDirectionSend"),
094                                       new String[]{Bundle.getMessage("TopicDirectionSend")},  Option.Type.TEXT));
095        options.put("17.5", new Option(Bundle.getMessage("NameTopicDirectionRcv"),
096                                       new String[]{Bundle.getMessage("TopicDirectionRcv")},  Option.Type.TEXT));
097        options.put("18.3", new Option(Bundle.getMessage("NameTopicFunctionSend"),
098                                       new String[]{Bundle.getMessage("TopicFunctionSend")},  Option.Type.TEXT));
099        options.put("18.5", new Option(Bundle.getMessage("NameTopicFunctionRcv"),
100                                       new String[]{Bundle.getMessage("TopicFunctionRcv")},  Option.Type.TEXT));
101        options.put("19.3", new Option(Bundle.getMessage("NameTopicConsistSend"),
102                                       new String[]{Bundle.getMessage("TopicConsistSend")},  Option.Type.TEXT));
103        options.put("20.3", new Option(Bundle.getMessage("NameTopicPowerSend"),
104                                       new String[]{Bundle.getMessage("TopicPowerSend")},  Option.Type.TEXT));
105        options.put("20.5", new Option(Bundle.getMessage("NameTopicPowerRcv"),
106                                       new String[]{Bundle.getMessage("TopicPowerRcv")},  Option.Type.TEXT));
107
108        options.put("LastWillTopic", new Option(Bundle.getMessage("NameTopicLastWill"),
109                    new String[]{Bundle.getMessage("TopicLastWill")}, Option.Type.TEXT));
110        options.put("LastWillMessage", new Option(Bundle.getMessage("NameMessageLastWill"),
111                    new String[]{Bundle.getMessage("MessageLastWill")}, Option.Type.TEXT));
112        allowConnectionRecovery = true;
113
114    }
115
116    public MqttConnectOptions getMqttConnectionOptions() {
117
118        // Setup the MQTT Connection Options
119        MqttConnectOptions mqttConnOpts = new MqttConnectOptions();
120        mqttConnOpts.setCleanSession(true);
121        mqttConnOpts.setMaxInflight(100);
122        
123        if ( getOptionState(MQTT_USERNAME_OPTION) != null
124                && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) {
125            mqttConnOpts.setUserName(getOptionState(MQTT_USERNAME_OPTION));
126            mqttConnOpts.setPassword(getOptionState(MQTT_PASSWORD_OPTION).toCharArray());
127        }
128
129        //set Last Will
130        if (! getOptionState("LastWillTopic").isEmpty()
131                && ! getOptionState("LastWillMessage").isEmpty()) {
132            mqttConnOpts.setWill(baseTopic + getOptionState("LastWillTopic"),
133                    getOptionState("LastWillMessage").getBytes(),
134                    qosflag,
135                    true);
136        }
137
138        return mqttConnOpts;
139    }
140
141    @Override
142    @API(status=API.Status.INTERNAL)
143    public void configure() {
144        log.debug("Doing configure...");
145        mqttEventListeners = new HashMap<>();
146        getSystemConnectionMemo().setMqttAdapter(this);
147        getSystemConnectionMemo().configureManagers();
148    }
149
150    @Override
151    @API(status=API.Status.INTERNAL)
152    public void connect() throws IOException {
153        log.info("MQTT starting connect with MQTTchannel = \"{}\"", getOptionState(option2Name));
154
155        try {
156            if ( getOptionState(option2Name)!= null && ! getOptionState(option2Name).trim().isEmpty()) {
157                baseTopic = getOptionState(option2Name);
158            }
159
160            // have to make that a valid choice, overriding the original above. This
161            // is ugly and temporary.
162            if (! DEFAULT_BASETOPIC.equals(baseTopic)) {
163                options.put(option2Name, new Option("MQTT channel: ", new String[]{baseTopic, DEFAULT_BASETOPIC}));
164            }
165
166            // generate a unique client ID based on the network ID and the system prefix of the MQTT connection.
167            String clientID = jmri.InstanceManager.getDefault(jmri.web.server.WebServerPreferences.class).getRailroadName();
168
169            // ensure that only guaranteed valid characters are included in the client ID
170            clientID = clientID.replaceAll("[^A-Za-z0-9]", "");
171
172            String clientIDsuffix = "JMRI" + Integer.toHexString(jmri.util.node.NodeIdentity.networkIdentity().hashCode()) .toUpperCase() + getSystemPrefix();
173
174            // Trim railroad name to fit within MQTT client id 23 character limit.
175            if (clientID.length() > 23 - clientIDsuffix.length())
176                clientID = clientID.substring(0,23 - clientIDsuffix.length());
177
178            clientID = clientID + clientIDsuffix;
179
180            log.info("Connection {} is using a clientID of \"{}\"", getSystemPrefix(), clientID);
181            
182            String tempdirName = jmri.util.FileUtil.getExternalFilename(jmri.util.FileUtil.PROFILE);
183            log.debug("will use {} as temporary directory", tempdirName);
184
185            mqttClient = getNewMqttClient(clientID, tempdirName);
186
187            if ((getOptionState(MQTT_USERNAME_OPTION) != null
188                    && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty())
189                    || ( ! getOptionState("LastWillTopic").isEmpty()
190                    && ! getOptionState("LastWillMessage").isEmpty())) {
191                mqttClient.connect(getMqttConnectionOptions());
192
193            } else {
194                mqttClient.connect();
195            }
196
197            if ( ! getOptionState("LastWillTopic").isEmpty()) {
198                publish(getOptionState("LastWillTopic"), "");
199            }
200
201            mqttClient.setCallback(this);
202
203        } catch (MqttException ex) {
204            throw new IOException("Can't create MQTT client", ex);
205        }
206    }
207
208    MqttClient getNewMqttClient(String clientID, String tempdirName) throws MqttException {
209        return new MqttClient(PROTOCOL + getCurrentPortName(),
210            clientID, new MqttDefaultFilePersistence(tempdirName));
211    }
212
213    @Override
214    @API(status=API.Status.MAINTAINED)
215    public MqttSystemConnectionMemo getSystemConnectionMemo() {
216        return (MqttSystemConnectionMemo) super.getSystemConnectionMemo();
217    }
218
219    @API(status=API.Status.MAINTAINED)
220    public void subscribe(String topic, MqttEventListener mel) {
221        if (mqttEventListeners == null || mqttClient == null) {
222            jmri.util.LoggingUtil.warnOnce(log, "Trying to subscribe before connect/configure is done");
223            return;
224        }
225        try {
226            String fullTopic = baseTopic + topic;
227            if (mqttEventListeners.containsKey(fullTopic)) {
228                if (!mqttEventListeners.get(fullTopic).contains(mel)) {
229                    mqttEventListeners.get(fullTopic).add(mel);
230                }
231                return;
232            }
233            ArrayList<MqttEventListener> mels = new ArrayList<>();
234            mels.add(mel);
235            mqttEventListeners.put(fullTopic, mels);
236            mqttClient.subscribe(fullTopic);
237            log.debug("Subscribed : \"{}\"", fullTopic);
238        } catch (MqttException ex) {
239            log.error("Can't subscribe : ", ex);
240        }
241    }
242
243    @API(status=API.Status.MAINTAINED)
244    public void unsubscribe(String topic, MqttEventListener mel) {
245        String fullTopic = baseTopic + topic;
246        if (mqttEventListeners == null || mqttClient == null) {
247            jmri.util.LoggingUtil.warnOnce(log, "Trying to unsubscribe before connect/configure is done");
248            return;
249        }
250        try {
251            mqttEventListeners.get(fullTopic).remove(mel);
252        } catch (NullPointerException e) {
253            // Not subscribed
254            log.debug("Unsubscribe but not subscribed: \"{}\"", fullTopic);
255            return;
256        }
257        if (mqttEventListeners.get(fullTopic).isEmpty()) {
258            try {
259                mqttClient.unsubscribe(fullTopic);
260                mqttEventListeners.remove(fullTopic);
261                log.debug("Unsubscribed : \"{}\"", fullTopic);
262            } catch (MqttException ex) {
263                log.error("Can't unsubscribe : ", ex);
264            }
265        }
266    }
267
268    @API(status=API.Status.MAINTAINED)
269    public void unsubscribeall(MqttEventListener mel) {
270        mqttEventListeners.keySet().forEach((t) -> {
271            unsubscribe(t, mel);
272        });
273    }
274
275    /**
276     * Send a message over the existing link to a broker.
277     * @param topic The topic, which follows the channel and precedes the payload in the message
278     * @param payload The payload makes up the final part of the message
279     */
280    @API(status=API.Status.MAINTAINED)
281    public void publish(@Nonnull String topic, @Nonnull byte[] payload) {
282        publish(topic, payload, retained);
283    }
284
285    /**
286     * Send a message over the existing link to a broker.
287     * @param topic The topic, which follows the channel and precedes the payload in the message
288     * @param payload The payload makes up the final part of the message
289     * @param retain Should the message be retained?
290     */
291    @API(status=API.Status.MAINTAINED)
292    public void publish(@Nonnull String topic, @Nonnull byte[] payload, boolean retain) {
293        try {
294            String fullTopic = baseTopic + topic;
295            mqttClient.publish(fullTopic, payload, qosflag, retain);
296        } catch (MqttException ex) {
297            log.error("Can't publish : ", ex);
298        }
299    }
300
301    /**
302     * Send a message over the existing link to a broker.
303     * @param topic The topic, which follows the channel and precedes the payload in the message
304     * @param payload The payload makes up the final part of the message
305     */
306    @API(status=API.Status.MAINTAINED)
307    public void publish(@Nonnull String topic, @Nonnull String payload) {
308        publish(topic, payload.getBytes());
309    }
310
311    /**
312     * Send a message over the existing link to a broker.
313     * @param topic The topic, which follows the channel and precedes the payload in the message
314     * @param retain Should the message be retained?
315     * @param payload The payload makes up the final part of the message
316     */
317    @API(status=API.Status.MAINTAINED)
318    public void publish(@Nonnull String topic, @Nonnull String payload, boolean retain) {
319        publish(topic, payload.getBytes(), retain);
320    }
321
322    public MqttClient getMQttClient() {
323        return (mqttClient);
324    }
325
326    private void tryToReconnect(boolean showLogMessages) {
327        if (showLogMessages) log.warn("Try to reconnect");
328        try {
329             if ((getOptionState(MQTT_USERNAME_OPTION) != null
330                    && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty())
331                    || ( ! getOptionState("LastWillTopic").isEmpty()
332                    && ! getOptionState("LastWillMessage").isEmpty())) {
333                mqttClient.connect(getMqttConnectionOptions());
334            } else {
335                mqttClient.connect();
336            }
337
338            if (! getOptionState("LastWillTopic").isEmpty()) {
339                publish(getOptionState("LastWillTopic"), "");
340            }
341
342            log.warn("Succeeded to reconnect");
343
344            mqttClient.setCallback(this);
345            Set<String> set = new HashSet<>(mqttEventListeners.keySet());
346            for (String t : set) {
347                mqttClient.subscribe(t);
348            }
349        } catch (MqttException ex) {
350            if (showLogMessages) log.error("Unable to reconnect", ex);
351            scheduleReconnectTimer(false);
352        }
353    }
354
355    private void scheduleReconnectTimer(boolean showLogMessages) {
356        jmri.util.TimerUtil.scheduleOnLayoutThread(new java.util.TimerTask() {
357            @Override
358            public void run() {
359                tryToReconnect(showLogMessages);
360            }
361        }, 500);
362    }
363
364    @Override
365    @API(status=API.Status.INTERNAL)
366    public void connectionLost(Throwable thrwbl) {
367        log.warn("Lost MQTT broker connection...");
368        if (this.allowConnectionRecovery) {
369            log.info("...trying to reconnect repeatedly");
370            scheduleReconnectTimer(true);
371            return;
372        }
373        log.error("Won't reconnect");
374    }
375
376    @Override
377    @API(status=API.Status.INTERNAL)
378    public void messageArrived(String topic, MqttMessage mm) throws Exception {
379        log.debug("Message received, topic : {} - '{}'", topic, mm);
380
381        boolean found = false;
382        Map<String,ArrayList<MqttEventListener>> tempMap
383            = new HashMap<> (mqttEventListeners); // Avoid ConcurrentModificationException
384        for (Map.Entry<String,ArrayList<MqttEventListener>> e : tempMap.entrySet()) {
385            // does key match received topic, including wildcards?
386            if (MqttTopic.isMatched(e.getKey(), topic) ) {
387                found = true;
388                e.getValue().forEach((mel) -> {
389                    try {
390                        mel.notifyMqttMessage(topic, mm.toString());
391                    }
392                    catch (Exception exception) {
393                        log.error("MqttEventListener exception: ", exception);
394                    }
395                });
396            }
397        }
398
399        if (!found) {
400            log.error("No one subscribed to {}", topic);
401            throw new Exception("No subscriber for MQTT topic " + topic);
402        }
403    }
404
405    @Override
406    @API(status=API.Status.INTERNAL)
407    public void deliveryComplete(IMqttDeliveryToken imdt) {
408        log.debug("Message delivered");
409    }
410
411
412    @Override
413    protected void closeConnection(){
414       log.debug("Closing MqttAdapter");
415        try {
416            mqttClient.disconnect();
417        }
418        catch (Exception exception) {
419            log.error("MqttEventListener exception: ", exception);
420        }
421
422    }
423
424    @Override
425    public void dispose() {
426        log.debug("Disposing MqttAdapter");
427        closeConnection();
428        super.dispose();
429    }
430
431    private final static Logger log = LoggerFactory.getLogger(MqttAdapter.class);
432
433}