MQTT integration with spring boot

Harkesh Kumar
4 min readJan 19, 2021

--

How Does MQTT Works

MQTT is a publish/subscribe protocol that allows edge-of-network devices to publish to a broker. Clients connect to this broker, which then mediates communication between the two devices. … When another client publishes a message on a subscribed topic, the broker forwards the message to any client that has subscribed.

One of Most Common Example of MQTT

The Most Popular MQTT Broker are

  1. Mosqitto
  2. VerneMq
  3. RabbitMq
  4. EMQ
  5. hivemq

And MQTT Broker run on 1883 as its default port.

MQTT: The Standard for IoT Messaging

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc

Spring Boot With MQTT

Step1 . Setup a Maven Project With eclipse poho dependency into spring boot pom.xml file in project.

       <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>

Step 2. Create Mqtt configuration class where we have broker credentials broker address etc.

public abstract class MqttConfig {

protected final String broker = "your broker address or IP ";
protected final int qos = 1;
protected Boolean hasSSL = false; /*By default SSL is disabled */
protected Integer port = 1883; /* Default port */
protected final String userName = "your username";
protected final String password = "Password";
protected final String TCP = "tcp://";
protected final String SSL = "ssl://";

/**
* Custom Configuration
*
*
@param broker
* @param port
* @param ssl
* @param withUserNamePass
*/
protected abstract void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass);

/**
* Default Configuration
*/
protected abstract void config();


}

Step 3. we can create a class when we can fetch data after subscribe to mqtt topic on which data is coming. And to annotate with @component

@Component
public class MqttSubscriberImpl extends MqttConfig implements MqttCallback{

private static final String fota_fetch_record = "fota_fetch_record";
private String brokerUrl = null;
final private String colon = ":";
final private String clientId = UUID.randomUUID().toString();

private MqttClient mqttClient = null;
private MqttConnectOptions connectionOptions = null;
private MemoryPersistence persistence = null;

private static final Logger logger = LoggerFactory.getLogger(MqttSubscriberImpl.class);

public MqttSubscriberImpl() {
logger.info("I am MqttSub impl");
this.config();
}

@Override
public void connectionLost(Throwable cause) {
logger.info("Connection Lost" + cause);
this.config();
}

@Override
protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
logger.info("Inside Parameter Config");
String protocal = this.TCP;

this.brokerUrl = protocal + this.broker + colon + port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();

try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
this.connectionOptions.setPassword(this.password.toCharArray());
this.connectionOptions.setUserName(this.userName);
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
throw new com.bms.exceptions.MqttException("Not Connected");
}
}

@Override
protected void config() {
logger.info("Inside Config with parameter");
this.brokerUrl = this.TCP + this.broker + colon + this.port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
this.connectionOptions.setPassword(this.password.toCharArray());
this.connectionOptions.setUserName(this.userName);
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
throw new com.bms.exceptions.MqttException("Not Connected");
}
}

@Override
public void subscribeMessage(String topic) {
try {

this.mqttClient.subscribe(topic, this.qos);
} catch (MqttException me) {
System.out.println("Not able to Read Topic "+ topic);
// me.printStackTrace();
}
}

@Override
public void disconnect() {
try {
this.mqttClient.disconnect();
} catch (MqttException me) {
logger.error("ERROR", me);
}
}


@Override
public void messageArrived(String mqttTopic, MqttMessage mqttMessage) throws Exception {
String time = new Timestamp(System.currentTimeMillis()).toString();
System.out.println("***********************************************************************");
System.out.println("Message Arrived at Time: " + time + " Topic: " + mqttTopic + " Message: "
+ new String(mqttMessage.getPayload()));
// System.out.println("***********************************************************************");


}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {

}
}

Step 4. To subscriber we have create Listener which sync in subscribe when a data set publish by any publisher or any IOT device

@Component
public class MqttMessageListener implements Runnable {
@Autowired
MqttSubscriberApi subscriber;

@Override
public void run() {
while (true) {
subscriber.subscribeMessage("your mqtt topic name");
}

}

Step 5. We also need to implement Runnable task to run this Task when spring boot application will bootstrap

@SpringBootApplication
public class Application extends SpringBootServletInitializer {


@Autowired
Runnable MessageListener;

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public CommandLineRunner schedulingRunner(TaskExecutor executor) {
return new CommandLineRunner() {
public void run(String... args) throws Exception {
executor.execute(MessageListener);
}
};
}

Finally we done for subscribe part now we check how we can publish data over the Mqtt Protocol to send broker when other device can subscribe

@Component
public class MqttPublisherImpl extends MqttConfig implements MqttCallback{

private static final Logger logger = LoggerFactory.getLogger(MqttPublisherImpl.class);


private MqttPublisherImpl() {
this.config();
}
private MqttPublisherImpl(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
this.config(broker, port, ssl, withUserNamePass);
}
public static MqttPublisherImpl getInstance() {
return new MqttPublisherImpl();
}


public static MqttPublisherImpl getInstance(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
return new MqttPublisherImpl(broker, port, ssl, withUserNamePass);
}

@Override
public void publishMessage(String topic, String message) {

try {
MqttMessage mqttmessage = new MqttMessage(message.getBytes());
mqttmessage.setQos(this.qos);
mqttmessage.setRetained(false);
this.mqttClient.publish(topic, mqttmessage);
} catch (MqttException me) {
logger.error("ERROR", me);
}
return null;
}

@Override
public void disconnect() {
try {
this.mqttClient.disconnect();
} catch (MqttException me) {
logger.error("ERROR", me);
}
}

@Override
public void connectionLost(Throwable arg0) {
logger.info("Connection Lost");

}

@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
logger.info("delivery completed");

}


@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// Leave it blank for Publisher

}
@Override
protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
// Like we did in MqttSubscribe
}

@Override
protected void config() {
// Like we did in MqttSubscribe
}


}

There are Many MQTT client are available as MQTT.fx , mqttBox

which we can you use to testing Mqtt Data transfer like publish and subscribe

--

--

Harkesh Kumar
Harkesh Kumar

Written by Harkesh Kumar

Full Stack Developer, Springboot, MicroServices, MQTT, Apache kafka, Google vision

Responses (2)