Wednesday, November 22, 2017

Eclipse Paho Java Client (MQTT client) integration with Spring Boot REST API

MQTT is an open, lightweight, simple client-server (publish / subscriber) messaging transport protocol. It is designed with a minimal protocol overhead. On the other hand Eclipse Paho is one of the most popular client library implementations. In the following example, I will show how to integrate Spring Boot with Eclipse Paho.

Tested with the following environment and technologies:

a) JDK 1.8
b) Eclipse
c) Maven
d) Spring Boot
e) Eclipse Paho as MQTT client library.
f) Postman as REST API client (Google Chrome extension)
g) MQTTLens as client tool for MQTT (Google Chrome extension)




MQTT with Spring Boot
                       
                                                                     Application Architecture 

Application flow:

1. Client1 (Postman) sends data via REST API
2. The server receives data and sends to broker with the help of Eclipse Paho's publisher.
3. The subscriber receives data from broker and prints it to the console.
4. Client2 (MQTTLens) also receives same data from the broker.


1. Project configuration - POM file


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.monirthought</groupId>
    <artifactId>SpringBootMQTTExample</artifactId>
    <version>0.1.0</version>

 <name>Spring Boot MQTT With Eclipse Paho</name>

 <repositories>
  <repository>
   <id>Eclipse Paho Repo</id>
   <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
  </repository>
 </repositories>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-tomcat</artifactId>
   <scope>provided</scope>
  </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
  <dependency>
   <groupId>org.eclipse.paho</groupId>
   <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
   <version>1.0.2</version>
  </dependency>        
    </dependencies>

    <properties>
        <java.version>1.8</java.version>
    </properties>

 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <configuration>
     <skipTests>true</skipTests>
    </configuration>
   </plugin>
  </plugins>
  <finalName>ROOT</finalName>
 </build>
 <packaging>war</packaging>

</project>

* A repository of Eclipse Paho Repo added.
* Have to add dependency of eclipse Ppaho client

2. MQTT Configuration:

public abstract class MQTTConfig {

 protected final String broker = "iot.eclipse.org";
 protected final int qos = 2;
 protected Boolean hasSSL = false; /* By default SSL is disabled */
 protected Integer port = 1883; /* Default port */
 protected final String userName = "testUserName";
 protected final String password = "demoPassword";
 protected final String TCP = "tcp://";
 protected final String SSL = "ssl://";

 /**
  * Custom Configuration
  * 
  * @param broker
  * @param port
  * @param ssl
  * @param withUserNamePass
  */
 protected abstract void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass);

 /**
  * Default Configuration
  */
 protected abstract void config();
}

* We will use default configuration with TCP protocol.

3. Configure Publisher:

3.a) MQTTPublisherBase interface:

public interface MQTTPublisherBase {

 /**
  * Publish message
  * 
  * @param topic
  * @param jasonMessage
  */
 public void publishMessage(String topic, String message);

 /**
  * Disconnect MQTT Client
  */
 public void disconnect();

}
* Only two methods, one to publish message and other to disconnect the connection.

3.b) MQTTPublisher Class

@Component
public class MQTTPublisher extends MQTTConfig implements MqttCallback,  MQTTPublisherBase{

 private String brokerUrl = null;
 
 final private String colon = ":";
 final private String clientId = "demoClient1";
 
 private MqttClient mqttClient = null;
 private MqttConnectOptions connectionOptions = null;
 private MemoryPersistence persistence = null;

 private static final Logger logger = LoggerFactory.getLogger(MQTTPublisher.class);

 /**
  * Private default constructor
  */
 private MQTTPublisher() {
  this.config();
 }

 /**
  * Private constructor
  */
 private MQTTPublisher(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
  this.config(broker, port, ssl, withUserNamePass);
 }

 /**
  * Factory method to get instance of MQTTPublisher
  * 
  * @return MQTTPublisher
  */
 public static MQTTPublisher getInstance() {
  return new MQTTPublisher();
 }

 /**
  * Factory method to get instance of MQTTPublisher
  * 
  * @param broker
  * @param port
  * @param ssl
  * @param withUserNamePass
  * @return MQTTPublisher
  */
 public static MQTTPublisher getInstance(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
  return new MQTTPublisher(broker, port, ssl, withUserNamePass);
 }

 /*
  * (non-Javadoc)
  * 
  * @see
  * com.bjitgroup.jasmysp.mqtt.publisher.MQTTPublisherBase#configurePublisher()
  */
 @Override
 protected void config() {

  this.brokerUrl = this.TCP + this.broker + colon + this.port;
  this.persistence = new MemoryPersistence();
  this.connectionOptions = new MqttConnectOptions();
  try {
   this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
   this.connectionOptions.setCleanSession(true);
   this.mqttClient.connect(this.connectionOptions);
   this.mqttClient.setCallback(this);
  } catch (MqttException me) {
   logger.error("ERROR", me);
  }
 }

 /*
  * (non-Javadoc)
  * 
  * @see
  * com.bjitgroup.jasmysp.mqtt.publisher.MQTTPublisherBase#configurePublisher(
  * java.lang.String, java.lang.Integer, java.lang.Boolean, java.lang.Boolean)
  */
 @Override
 protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

  String protocal = this.TCP;
  if (true == ssl) {
   protocal = this.SSL;
  }

  this.brokerUrl = protocal + this.broker + colon + port;
  this.persistence = new MemoryPersistence();
  this.connectionOptions = new MqttConnectOptions();

  try {
   this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
   this.connectionOptions.setCleanSession(true);
   if (true == withUserNamePass) {
    if (password != null) {
     this.connectionOptions.setPassword(this.password.toCharArray());
    }
    if (userName != null) {
     this.connectionOptions.setUserName(this.userName);
    }
   }
   this.mqttClient.connect(this.connectionOptions);
   this.mqttClient.setCallback(this);
  } catch (MqttException me) {
   logger.error("ERROR", me);
  }
 }


 /*
  * (non-Javadoc)
  * @see com.monirthought.mqtt.publisher.MQTTPublisherBase#publishMessage(java.lang.String, java.lang.String)
  */
 @Override
 public void publishMessage(String topic, String message) {

  try {
   MqttMessage mqttmessage = new MqttMessage(message.getBytes());
   mqttmessage.setQos(this.qos);
   this.mqttClient.publish(topic, mqttmessage);
  } catch (MqttException me) {
   logger.error("ERROR", me);
  }

 }

 /*
  * (non-Javadoc)
  * @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.Throwable)
  */
 @Override
 public void connectionLost(Throwable arg0) {
  logger.info("Connection Lost");

 }

 /*
  * (non-Javadoc)
  * @see org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken)
  */
 @Override
 public void deliveryComplete(IMqttDeliveryToken arg0) {
  logger.info("delivery completed");

 }

 /*
  * (non-Javadoc)
  * @see org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage)
  */
 @Override
 public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
  // Leave it blank for Publisher

 }

 /*
  * (non-Javadoc)
  * @see com.monirthought.mqtt.publisher.MQTTPublisherBase#disconnect()
  */
 @Override
 public void disconnect() {
  try {
   this.mqttClient.disconnect();
  } catch (MqttException me) {
   logger.error("ERROR", me);
  }
 }
 
}

We extend MqttCallback class to implement some CallBack methods. But it's optional.

4. Configure Subscriber

4.a) MQTTSubscriberBase interface

public interface MQTTSubscriberBase {

 public static final Logger logger = LoggerFactory.getLogger(MQTTPublisherBase.class);

 /**
  * Subscribe message
  * 
  * @param topic
  * @param jasonMessage
  */
 public void subscribeMessage(String topic);

 /**
  * Disconnect MQTT Client
  */
 public void disconnect();
}

4.b) MQTTSubscriber class

@Component
public class MQTTSubscriber extends MQTTConfig implements MqttCallback, MQTTSubscriberBase {

 private String brokerUrl = null;
 final private String colon = ":";
 final private String clientId = "demoClient2";

 private MqttClient mqttClient = null;
 private MqttConnectOptions connectionOptions = null;
 private MemoryPersistence persistence = null;

 private static final Logger logger = LoggerFactory.getLogger(MQTTSubscriber.class);

 public MQTTSubscriber() {
  this.config();
 }

 /*
  * (non-Javadoc)
  * 
  * @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.
  * Throwable)
  */
 @Override
 public void connectionLost(Throwable cause) {
  logger.info("Connection Lost");

 }

 /*
  * (non-Javadoc)
  * 
  * @see
  * org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String,
  * org.eclipse.paho.client.mqttv3.MqttMessage)
  */
 @Override
 public void messageArrived(String topic, MqttMessage message) throws Exception {
  // Called when a message arrives from the server that matches any
  // subscription made by the client
  String time = new Timestamp(System.currentTimeMillis()).toString();
  System.out.println();
  System.out.println("***********************************************************************");
  System.out.println("Message Arrived at Time: " + time + "  Topic: " + topic + "  Message: "
    + new String(message.getPayload()));
  System.out.println("***********************************************************************");
  System.out.println();
 }

 /*
  * (non-Javadoc)
  * 
  * @see
  * org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho
  * .client.mqttv3.IMqttDeliveryToken)
  */
 @Override
 public void deliveryComplete(IMqttDeliveryToken token) {
  // Leave it blank for subscriber

 }

 /*
  * (non-Javadoc)
  * 
  * @see
  * com.monirthought.mqtt.subscriber.MQTTSubscriberBase#subscribeMessage(java.
  * lang.String)
  */
 @Override
 public void subscribeMessage(String topic) {
  try {
   this.mqttClient.subscribe(topic, this.qos);
  } catch (MqttException me) {
   me.printStackTrace();
  }
 }

 /*
  * (non-Javadoc)
  * 
  * @see com.monirthought.mqtt.subscriber.MQTTSubscriberBase#disconnect()
  */
 public void disconnect() {
  try {
   this.mqttClient.disconnect();
  } catch (MqttException me) {
   logger.error("ERROR", me);
  }
 }

 /*
  * (non-Javadoc)
  * 
  * @see com.monirthought.config.MQTTConfig#config(java.lang.String,
  * java.lang.Integer, java.lang.Boolean, java.lang.Boolean)
  */
 @Override
 protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

  String protocal = this.TCP;
  if (true == ssl) {
   protocal = this.SSL;
  }

  this.brokerUrl = protocal + this.broker + colon + port;
  this.persistence = new MemoryPersistence();
  this.connectionOptions = new MqttConnectOptions();

  try {
   this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
   this.connectionOptions.setCleanSession(true);
   if (true == withUserNamePass) {
    if (password != null) {
     this.connectionOptions.setPassword(this.password.toCharArray());
    }
    if (userName != null) {
     this.connectionOptions.setUserName(this.userName);
    }
   }
   this.mqttClient.connect(this.connectionOptions);
   this.mqttClient.setCallback(this);
  } catch (MqttException me) {
   me.printStackTrace();
  }

 }

 /*
  * (non-Javadoc)
  * 
  * @see com.monirthought.config.MQTTConfig#config()
  */
 @Override
 protected void config() {

  this.brokerUrl = this.TCP + this.broker + colon + this.port;
  this.persistence = new MemoryPersistence();
  this.connectionOptions = new MqttConnectOptions();
  try {
   this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
   this.connectionOptions.setCleanSession(true);
   this.mqttClient.connect(this.connectionOptions);
   this.mqttClient.setCallback(this);
  } catch (MqttException me) {
   me.printStackTrace();
  }

 }

}

5. Message Listener

The subscriber needs to subscribe message continuously. We can achieve this by implementing TaskExecutor with the help of  Spring Boot's CommandLineRunner.

We have to create a bean of TaskExecutor, in that case, we use SimpleAsyncTaskExecutor.

@Component
public class AppConfig {

 @Bean
 public TaskExecutor taskExecutor() {
  return new SimpleAsyncTaskExecutor();
 }
}


@Component
public class MessageListener implements Runnable{

 @Autowired
 MQTTSubscriberBase subscriber;
 
 @Override
 public void run() {
  while(true) {
   subscriber.subscribeMessage("demoTopic2017");
  }
  
 }

}
As you see, subscriber is in an infinite loop to subscribe message from the topic "demoTopic2017"

Finally, we create a CommandLineRunner to schedule our task. CommandLineRunner will be executed after all beans have Bean initialized.

@SpringBootApplication
public class Application extends SpringBootServletInitializer {

 @Autowired
 Runnable MessageListener;
 
 @Override
 protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
  return application.sources(Application.class);
 }

 public static void main(String[] args) {
  SpringApplication.run(Application.class, args);
 }
 
 @Bean
 public CommandLineRunner schedulingRunner(TaskExecutor executor) {
     return new CommandLineRunner() {
         public void run(String... args) throws Exception {
             executor.execute(MessageListener);
         }
     };
 }
}

6. REST Controller

The controller receives data from REST client and publishes to the broker at topic "demoTopic2017". Finally, REST client will get a response from server "Message sent to Broker".

@RestController
public class DemoRestController {

 @Autowired
 MQTTPublisherBase publisher;
 
 @RequestMapping(value = "/mqtt/send", method = RequestMethod.POST)
 public String index(@RequestBody String data) {
  publisher.publishMessage("demoTopic2017", data);
  return "Message sent to Broker";
 }

}

7. Testing:

a). Prepare MQTTLens as per given below
MQTTLens setup


b) Subscribe to the topic "demoTopic2017"

MQTTLens

c) Send message "This is a sample message" to server using postman


d). The subscriber receives the message.




MQTTLens also receives the message.


e) Publish message "This is another test message" from MQTTLens


The subscriber receives the message.

MQTTLens also receives the message.



Hope you enjoy the post. If you have a better idea, please don't forget to write in the comment section.

You can download full code from my github repository 

No comments:

Post a Comment