001package jmri.jmrix.mqtt.logixng; 002 003import jmri.jmrit.logixng.actions.*; 004 005import java.util.*; 006 007import jmri.*; 008import jmri.jmrit.logixng.*; 009import jmri.jmrit.logixng.util.parser.ParserException; 010import jmri.jmrix.mqtt.MqttEventListener; 011import jmri.jmrix.mqtt.MqttSystemConnectionMemo; 012import jmri.util.ThreadingUtil; 013 014/** 015 * This action subscribes to a topic to MQTT. 016 * 017 * @author Daniel Bergqvist Copyright 2022 018 */ 019public class Subscribe extends AbstractDigitalAction 020 implements MqttEventListener { 021 022 private MqttSystemConnectionMemo _memo; 023 private String _subscribeToTopic; 024 private String _lastTopic; 025 private String _lastMessage; 026 private String _lastTopicLocalVariable; 027 private boolean _removeChannelFromLastTopic; 028 private String _lastMessageLocalVariable; 029 030 031 public Subscribe(String sys, String user, MqttSystemConnectionMemo memo) 032 throws BadUserNameException, BadSystemNameException { 033 super(sys, user); 034 _memo = memo; 035 } 036 037 @Override 038 public Base getDeepCopy(Map<String, String> systemNames, Map<String, String> userNames) throws ParserException { 039 DigitalActionManager manager = InstanceManager.getDefault(DigitalActionManager.class); 040 String sysName = systemNames.get(getSystemName()); 041 String userName = userNames.get(getSystemName()); 042 if (sysName == null) sysName = manager.getAutoSystemName(); 043 Subscribe copy = new Subscribe(sysName, userName, _memo); 044 copy.setComment(getComment()); 045 copy._subscribeToTopic = _subscribeToTopic; 046 copy._lastTopicLocalVariable = _lastTopicLocalVariable; 047 copy._removeChannelFromLastTopic = _removeChannelFromLastTopic; 048 copy._lastMessageLocalVariable = _lastMessageLocalVariable; 049 return manager.registerAction(copy); 050 } 051 052 public void setMemo(MqttSystemConnectionMemo memo) { 053 assertListenersAreNotRegistered(log, "setMemo"); 054 _memo = memo; 055 } 056 057 public MqttSystemConnectionMemo getMemo() { 058 return _memo; 059 } 060 061 public String getSubscribeToTopic() { 062 return _subscribeToTopic; 063 } 064 065 public void setSubscribeToTopic(String topic) { 066 _subscribeToTopic = topic; 067 } 068 069 public String getLastTopicLocalVariable() { 070 return _lastTopicLocalVariable; 071 } 072 073 public void setLastTopicLocalVariable(String variable) { 074 _lastTopicLocalVariable = variable; 075 } 076 077 public boolean getRemoveChannelFromLastTopic() { 078 return _removeChannelFromLastTopic; 079 } 080 081 public void setRemoveChannelFromLastTopic(boolean value) { 082 _removeChannelFromLastTopic = value; 083 } 084 085 086 public String getLastMessageLocalVariable() { 087 return _lastMessageLocalVariable; 088 } 089 090 public void setLastMessageLocalVariable(String variable) { 091 _lastMessageLocalVariable = variable; 092 } 093 094 /** {@inheritDoc} */ 095 @Override 096 public Category getCategory() { 097 return Category.ITEM; 098 } 099 100 /** {@inheritDoc} */ 101 @Override 102 public void execute() throws JmriException { 103 if ((_lastTopicLocalVariable != null) && (!_lastTopicLocalVariable.isBlank())) { 104 getConditionalNG().getSymbolTable().setValue(_lastTopicLocalVariable, _lastTopic); 105 } 106 if ((_lastMessageLocalVariable != null) && (!_lastMessageLocalVariable.isBlank())) { 107 getConditionalNG().getSymbolTable().setValue(_lastMessageLocalVariable, _lastMessage); 108 } 109 } 110 111 @Override 112 public FemaleSocket getChild(int index) throws IllegalArgumentException, UnsupportedOperationException { 113 throw new UnsupportedOperationException("Not supported."); 114 } 115 116 @Override 117 public int getChildCount() { 118 return 0; 119 } 120 121 @Override 122 public String getShortDescription(Locale locale) { 123 return Bundle.getMessage(locale, "Subscribe_Short"); 124 } 125 126 @Override 127 public String getLongDescription(Locale locale) { 128 return Bundle.getMessage(locale, "Subscribe_Long", _subscribeToTopic); 129 } 130 131 /** {@inheritDoc} */ 132 @Override 133 public void setup() { 134 // Do nothing 135 } 136 137 /** {@inheritDoc} */ 138 @Override 139 public void registerListenersForThisClass() { 140 if (! _listenersAreRegistered) { 141 if (_subscribeToTopic == null) return; 142 ThreadingUtil.runOnLayout(() -> { 143 _memo.getMqttAdapter().subscribe(_subscribeToTopic, this); 144 }); 145 _listenersAreRegistered = true; 146 } 147 } 148 149 /** {@inheritDoc} */ 150 @Override 151 public void unregisterListenersForThisClass() { 152 if (_listenersAreRegistered) { 153 if (_subscribeToTopic == null) return; 154 ThreadingUtil.runOnLayout(() -> { 155 _memo.getMqttAdapter().unsubscribe(_subscribeToTopic, this); 156 }); 157 _listenersAreRegistered = false; 158 } 159 } 160 161 /** {@inheritDoc} */ 162 @Override 163 public void notifyMqttMessage(String topic, String message) { 164 _lastTopic = topic; 165 if (_removeChannelFromLastTopic && _lastTopic.startsWith(_memo.getMqttAdapter().baseTopic)) { 166 _lastTopic = _lastTopic.substring(_memo.getMqttAdapter().baseTopic.length()); 167 } 168 _lastMessage = message; 169 getConditionalNG().execute(); 170 } 171 172 /** {@inheritDoc} */ 173 @Override 174 public void disposeMe() { 175 } 176 177 private final static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(Subscribe.class); 178 179}