MQTT integration with spring boot
How Does MQTT Works
MQTT is a publish/subscribe protocol that allows edge-of-network devices to publish to a broker. Clients connect to this broker, which then mediates communication between the two devices. … When another client publishes a message on a subscribed topic, the broker forwards the message to any client that has subscribed.
The Most Popular MQTT Broker are
And MQTT Broker run on 1883 as its default port.
MQTT: The Standard for IoT Messaging
MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc
Spring Boot With MQTT
Step1 . Setup a Maven Project With eclipse poho dependency into spring boot pom.xml file in project.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
Step 2. Create Mqtt configuration class where we have broker credentials broker address etc.
public abstract class MqttConfig {
protected final String broker = "your broker address or IP ";
protected final int qos = 1;
protected Boolean hasSSL = false; /*By default SSL is disabled */
protected Integer port = 1883; /* Default port */
protected final String userName = "your username";
protected final String password = "Password";
protected final String TCP = "tcp://";
protected final String SSL = "ssl://";
/**
* Custom Configuration
*
* @param broker
* @param port
* @param ssl
* @param withUserNamePass
*/
protected abstract void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass);
/**
* Default Configuration
*/
protected abstract void config();
}
Step 3. we can create a class when we can fetch data after subscribe to mqtt topic on which data is coming. And to annotate with @component
@Component
public class MqttSubscriberImpl extends MqttConfig implements MqttCallback{
private static final String fota_fetch_record = "fota_fetch_record";
private String brokerUrl = null;
final private String colon = ":";
final private String clientId = UUID.randomUUID().toString();
private MqttClient mqttClient = null;
private MqttConnectOptions connectionOptions = null;
private MemoryPersistence persistence = null;
private static final Logger logger = LoggerFactory.getLogger(MqttSubscriberImpl.class);
public MqttSubscriberImpl() {
logger.info("I am MqttSub impl");
this.config();
}
@Override
public void connectionLost(Throwable cause) {
logger.info("Connection Lost" + cause);
this.config();
}
@Override
protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
logger.info("Inside Parameter Config");
String protocal = this.TCP;
this.brokerUrl = protocal + this.broker + colon + port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
this.connectionOptions.setPassword(this.password.toCharArray());
this.connectionOptions.setUserName(this.userName);
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
throw new com.bms.exceptions.MqttException("Not Connected");
}
}
@Override
protected void config() {
logger.info("Inside Config with parameter");
this.brokerUrl = this.TCP + this.broker + colon + this.port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
this.connectionOptions.setPassword(this.password.toCharArray());
this.connectionOptions.setUserName(this.userName);
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
throw new com.bms.exceptions.MqttException("Not Connected");
}
}
@Override
public void subscribeMessage(String topic) {
try {
this.mqttClient.subscribe(topic, this.qos);
} catch (MqttException me) {
System.out.println("Not able to Read Topic "+ topic);
// me.printStackTrace();
}
}
@Override
public void disconnect() {
try {
this.mqttClient.disconnect();
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
@Override
public void messageArrived(String mqttTopic, MqttMessage mqttMessage) throws Exception {
String time = new Timestamp(System.currentTimeMillis()).toString();
System.out.println("***********************************************************************");
System.out.println("Message Arrived at Time: " + time + " Topic: " + mqttTopic + " Message: "
+ new String(mqttMessage.getPayload()));
// System.out.println("***********************************************************************");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
Step 4. To subscriber we have create Listener which sync in subscribe when a data set publish by any publisher or any IOT device
@Component
public class MqttMessageListener implements Runnable {
@Autowired
MqttSubscriberApi subscriber;
@Override
public void run() {
while (true) {
subscriber.subscribeMessage("your mqtt topic name");
}
}
Step 5. We also need to implement Runnable task to run this Task when spring boot application will bootstrap
@SpringBootApplication
public class Application extends SpringBootServletInitializer {
@Autowired
Runnable MessageListener;
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public CommandLineRunner schedulingRunner(TaskExecutor executor) {
return new CommandLineRunner() {
public void run(String... args) throws Exception {
executor.execute(MessageListener);
}
};
}
Finally we done for subscribe part now we check how we can publish data over the Mqtt Protocol to send broker when other device can subscribe
@Component
public class MqttPublisherImpl extends MqttConfig implements MqttCallback{
private static final Logger logger = LoggerFactory.getLogger(MqttPublisherImpl.class);
private MqttPublisherImpl() {
this.config();
}
private MqttPublisherImpl(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
this.config(broker, port, ssl, withUserNamePass);
}
public static MqttPublisherImpl getInstance() {
return new MqttPublisherImpl();
}
public static MqttPublisherImpl getInstance(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
return new MqttPublisherImpl(broker, port, ssl, withUserNamePass);
}
@Override
public void publishMessage(String topic, String message) {
try {
MqttMessage mqttmessage = new MqttMessage(message.getBytes());
mqttmessage.setQos(this.qos);
mqttmessage.setRetained(false);
this.mqttClient.publish(topic, mqttmessage);
} catch (MqttException me) {
logger.error("ERROR", me);
}
return null;
}
@Override
public void disconnect() {
try {
this.mqttClient.disconnect();
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
@Override
public void connectionLost(Throwable arg0) {
logger.info("Connection Lost");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
logger.info("delivery completed");
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// Leave it blank for Publisher
}@Override
protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
// Like we did in MqttSubscribe
}
@Override
protected void config() {
// Like we did in MqttSubscribe
}
}
There are Many MQTT client are available as MQTT.fx , mqttBox
which we can you use to testing Mqtt Data transfer like publish and subscribe