001package jmri.jmrix.mqtt; 002 003import jmri.DccLocoAddress; 004import jmri.LocoAddress; 005import jmri.SpeedStepMode; 006import jmri.jmrix.AbstractThrottle; 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009import javax.annotation.Nonnull; 010import java.util.regex.*; 011 012 013/** 014 * An implementation of AbstractThrottle with code specific to a MQTT 015 * connection. 016 * 017 * @author Dean Cording (C) 2023 018 */ 019 020 021 public class MqttThrottle extends AbstractThrottle implements MqttEventListener{ 022 023 private final MqttAdapter mqttAdapter; 024 @Nonnull 025 public String sendThrottleTopic = "cab/{0}/throttle"; 026 @Nonnull 027 public String rcvThrottleTopic ="cab/{0}/throttle"; 028 @Nonnull 029 public String sendDirectionTopic = "cab/{0}/direction"; 030 @Nonnull 031 public String rcvDirectionTopic = "cab/{0}/direction"; 032 @Nonnull 033 public String sendFunctionTopic = "cab/{0}/function/{1}"; 034 @Nonnull 035 public String rcvFunctionTopic = "cab/{0}/function/{1}"; 036 037 protected int address = -1; 038 039 private Pattern functionPattern; 040 041 private MqttConsistManager consistManager; 042 043 /** 044 * Constructor. 045 * @param memo system connection. 046 */ 047 048 public MqttThrottle(MqttSystemConnectionMemo memo) { 049 super(memo); 050 mqttAdapter = memo.getMqttAdapter(); 051 consistManager = memo.getConsistManager(); 052 053 this.speedStepMode = SpeedStepMode.NMRA_DCC_128; 054 055 this.isForward = true; //loco should default to forward 056 log.debug("MqttThrottle constructor"); 057 } 058 059 060 061 public MqttThrottle(MqttSystemConnectionMemo memo, String sendThrottleTopic, String rcvThrottleTopic, 062 String sendDirectionTopic, String rcvDirectionTopic, String sendFunctionTopic, 063 String rcvFunctionTopic) { 064 super(memo); 065 mqttAdapter = memo.getMqttAdapter(); 066 consistManager = memo.getConsistManager(); 067 this.sendThrottleTopic = sendThrottleTopic; 068 this.rcvThrottleTopic = rcvThrottleTopic; 069 this.sendDirectionTopic = sendDirectionTopic; 070 this.rcvDirectionTopic = rcvDirectionTopic; 071 this.sendFunctionTopic = sendFunctionTopic; 072 this.rcvFunctionTopic = rcvFunctionTopic; 073 074 this.speedStepMode = SpeedStepMode.NMRA_DCC_128; 075 076 this.isForward = true; //loco should default to forward 077 log.debug("MqttThrottle constructor"); 078 } 079 080 /** 081 * Constructor. 082 * @param memo system connection 083 * @param sendThrottleTopic MQTT topic for sending speed 084 * @param rcvThrottleTopic MQTT topic for receiving speed 085 * @param sendDirectionTopic MQTT topic for sending direction 086 * @param rcvDirectionTopic MQTT topic for receiving direction 087 * @param sendFunctionTopic MQTT topic for sending function values 088 * @param rcvFunctionTopic MQTT topic for receiving function values 089 * @param address loco address to set on throttle 090 */ 091 public MqttThrottle(MqttSystemConnectionMemo memo, String sendThrottleTopic, String rcvThrottleTopic, 092 String sendDirectionTopic, String rcvDirectionTopic, String sendFunctionTopic, String rcvFunctionTopic, LocoAddress address) { 093 super(memo); 094 mqttAdapter = memo.getMqttAdapter(); 095 consistManager = memo.getConsistManager(); 096 this.sendThrottleTopic = sendThrottleTopic; 097 this.rcvThrottleTopic = rcvThrottleTopic; 098 this.sendDirectionTopic = sendDirectionTopic; 099 this.rcvDirectionTopic = rcvDirectionTopic; 100 this.sendFunctionTopic = sendFunctionTopic; 101 this.rcvFunctionTopic = rcvFunctionTopic; 102 103 this.setDccAddress(address.getNumber()); 104 this.speedStepMode = SpeedStepMode.NMRA_DCC_128; 105 106 this.isForward = true; //loco should default to forward 107 108 log.debug("MqttThrottle constructor called for address {}", address); 109 } 110 111 112 /** 113 * {@inheritDoc} 114 */ 115 @Override 116 public void setSpeedSetting(float speed) { 117 118 super.setSpeedSetting(speed); 119 120 if (speed < 0) { 121 speed = 0; 122 // Send MQTT message 123 jmri.util.ThreadingUtil.runOnLayout(() -> { 124 mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), "STOP"); 125 }); 126 super.setSpeedSetting(0); 127 log.debug("sent address {} direction {}", address, "STOP"); 128 } 129 130 int intSpeed = Math.round(speed * 100); 131 132 // ensure non-zero input will result in non-zero output 133 if (speed > 0 && intSpeed == 0) 134 { 135 intSpeed = 1; 136 } 137 138 final String stringSpeed = String.valueOf(intSpeed); 139 140 // Send MQTT message 141 jmri.util.ThreadingUtil.runOnLayout(() -> { 142 143 mqttAdapter.publish(this.sendThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), stringSpeed); 144 }); 145 log.debug("sent address {} speed {}", address, intSpeed); 146 147 148 } 149 150 /** 151 * Set the direction 152 * 153 * @param forward true if forward; false otherwise 154 */ 155 @Override 156 public void setIsForward(boolean forward) { 157 158 super.setIsForward(forward); 159 // Send MQTT message 160 jmri.util.ThreadingUtil.runOnLayout(() -> { 161 mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), (forward ? "FORWARD" : "REVERSE")); 162 }); 163 log.debug("sent address {} direction {}", address, (forward ? "FORWARD" : "REVERSE")); 164 165 } 166 167 /** 168 * {@inheritDoc} 169 */ 170 @Override 171 public void sendFunctionGroup(int functionNum, boolean momentary) { 172 173 // Send MQTT message 174 jmri.util.ThreadingUtil.runOnLayoutEventually(() -> { 175 mqttAdapter.publish(this.sendFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}",String.valueOf(functionNum)), (getFunction(functionNum) ? "ON" : "OFF")); 176 }); 177 178 log.debug("sent address {} function {} {}", address, functionNum, (getFunction(functionNum) ? "ON" : "OFF")); 179 180 } 181 182 183 protected void throttleRelease() { 184 185 active = false; 186 187 // Send blank MQTT message to remove any persistent message 188 jmri.util.ThreadingUtil.runOnLayout(() -> { 189 mqttAdapter.publish(this.sendThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), ""); 190 mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), ""); 191 192 for (int functionNum = 0; functionNum < getFunctions().length; functionNum++) { 193 mqttAdapter.publish(this.sendFunctionTopic.replaceFirst("\\{0\\}", 194 String.valueOf(address)).replaceFirst("\\{1\\}",String.valueOf(functionNum)), ""); 195 } 196 }); 197 consistManager.deactivateConsist(getLocoAddress()); 198 199 mqttAdapter.unsubscribe(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this); 200 mqttAdapter.unsubscribe(this.rcvDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this); 201 mqttAdapter.unsubscribe(this.rcvFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}", "+"), this); 202 203 204 } 205 206 /** 207 * Dispose when finished with this object. After this, further usage of this 208 * Throttle object will result in a JmriException. 209 * 210 * This is quite problematic, because a using object doesn't know when it's 211 * the last user. 212 */ 213 @Override 214 protected void throttleDispose() { 215 log.debug("throttleDispose {}", address); 216 217 finishRecord(); 218 } 219 220 221 222 public int setDccAddress(int newaddress) { 223 224 if (address > 0) { 225 // Send blank MQTT message to remove any persistent message 226 jmri.util.ThreadingUtil.runOnLayout(() -> { 227 mqttAdapter.publish(this.sendThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), ""); 228 mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), ""); 229 230 for (int functionNum = 0; functionNum < getFunctions().length; functionNum++) { 231 mqttAdapter.publish(this.sendFunctionTopic.replaceFirst("\\{0\\}", 232 String.valueOf(address)).replaceFirst("\\{1\\}",String.valueOf(functionNum)), ""); 233 } 234 }); 235 236 mqttAdapter.unsubscribe(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this); 237 mqttAdapter.unsubscribe(this.rcvDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this); 238 mqttAdapter.unsubscribe(this.rcvFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}", "+"), this); 239 } 240 address = newaddress; 241 242 mqttAdapter.subscribe(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this); 243 mqttAdapter.subscribe(this.rcvDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this); 244 mqttAdapter.subscribe(this.rcvFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}", "+"), this); 245 246 consistManager.activateConsist(getLocoAddress()); 247 setSpeedSetting(0); 248 setIsForward(true); 249 250 functionPattern = Pattern.compile(this.rcvFunctionTopic.replaceFirst("\\{0\\}", 251 String.valueOf(address)).replaceFirst("\\{1\\}", "(\\\\d+)")); 252 253 return address; 254 } 255 256 public int getDccAddress() { 257 return address; 258 } 259 260 @Override 261 public LocoAddress getLocoAddress() { 262 return new DccLocoAddress(address, MqttThrottleManager.isLongAddress(address)); 263 } 264 265 @Override 266 public void notifyMqttMessage(String receivedTopic, String message) { 267 268 if (receivedTopic.endsWith(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)))) { 269 270 Float speed ; 271 272 try { 273 speed = Math.max(0.0f,Math.min(Float.parseFloat(message)/100.0f,1.0f)); 274 } 275 catch (Exception e){ 276 if (message.length() != 0) { 277 log.error("Invalid throttle speed: '{}'", message); 278 } 279 speed = -1.0f; 280 } 281 282 super.setSpeedSetting(speed); 283 284 } else if (receivedTopic.endsWith(this.rcvDirectionTopic.replaceFirst("\\{0\\}", 285 String.valueOf(address)))) { 286 switch (message) { 287 case "FORWARD": 288 super.setIsForward(true); 289 break; 290 case "REVERSE": 291 super.setIsForward(false); 292 break; 293 case "STOP": 294 case "": 295 super.setSpeedSetting(-1); 296 break; 297 default: 298 log.error("Invalid message {}", message); 299 } 300 } else { 301 302 Matcher functionMatcher = functionPattern.matcher(receivedTopic); 303 if (functionMatcher.matches()) { 304 updateFunction(Integer.parseInt(functionMatcher.group(1)),(message.equals("ON"))); 305 } 306 } 307 } 308 309 // register for notification 310 private final static Logger log = LoggerFactory.getLogger(MqttThrottle.class); 311 312 }