001package jmri.jmrix.mqtt; 002 003import jmri.IdTagManager; 004import jmri.InstanceManager; 005import jmri.IdTag; 006 007import jmri.util.ThreadingUtil; 008 009import jmri.implementation.AbstractIdTagReporter; 010 011/** 012 * Provide a Reporter implementation for MQTT communications 013 * 014 * @author Bob Jacobsen Copyright (C) 2023 015 */ 016 017class MqttReporter extends AbstractIdTagReporter implements MqttEventListener { 018 019 /** 020 * Requires, but does not check, that the system name and topic be consistent 021 * @param ma Adapter to specific connection 022 * @param systemName System Name for this Sensor 023 * @param rcvTopic Topic string to be used when receiving by JMRI 024 */ 025 MqttReporter(MqttAdapter ma, String systemName, String rcvTopic) { 026 super(systemName); 027 this.rcvTopic = rcvTopic; 028 mqttAdapter = ma; 029 mqttAdapter.subscribe(rcvTopic, MqttReporter.this); 030 } 031 032 private final MqttAdapter mqttAdapter; 033 private final String rcvTopic; 034 035 @Override 036 public void notifyMqttMessage(String receivedTopic, String message) { 037 if (! receivedTopic.endsWith(rcvTopic) ) { 038 log.error("Got a message whose topic ({}) wasn't for me ({})", receivedTopic, rcvTopic); 039 return; 040 } 041 042 log.trace("start parse of {}", message); 043 044 // parse content 045 String[] terms = message.split(" ", 2); 046 047 if ( terms.length < 1 || terms[0].isEmpty()) { 048 log.debug("No loco ID present in ({}), record empty report", message); 049 ThreadingUtil.runOnLayout(() -> { 050 notify(null); 051 }); 052 return; 053 } 054 // normal condition 055 String loco = terms[0]; 056 057 String content = ""; 058 if (terms.length > 1 ) { 059 content = terms[1]; 060 } 061 062 // IdTags can throw IllegalArgumentException on some inputs. 063 try { 064 IdTag idTag = InstanceManager.getDefault(IdTagManager.class).provideIdTag(loco); 065 idTag.setProperty("content", content); 066 // always send a null report as workaround for IdTag equality not checking properties 067 ThreadingUtil.runOnLayout(() -> { 068 notify(null); 069 }); 070 // and then send the real report 071 ThreadingUtil.runOnLayout(() -> { 072 notify(idTag); 073 }); 074 } catch (IllegalArgumentException e) { 075 log.error("Reporter {} cannot make a tag from input ({})", getSystemName(), message); 076 } 077 } 078 079 @Override 080 public void dispose() { 081 mqttAdapter.unsubscribe(rcvTopic, this); 082 super.dispose(); 083 } 084 085 private final static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(MqttReporter.class); 086 087}