java集成mqtt,与硬件设备通信

java集成mqtt,与硬件设备通信esp8266硬件

java集成

maven依赖

	<dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

配置参数

package com.ruoyi.demo;

/**
 * @author 张璞
 * @date 2022/4/2 16:51
 */
public class MqConfig {
    public static int qos = 2; //只有一次
    public static String broker = "tcp://xxx.110.32.xxx:1883";
    public static String userName = "xxxxx";
    public static String passWord = "xxxxx";
}

发布消息

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @author 张璞
 * @date 2022/3/30 17:51
 */
public class PubMsg {

    /**
     * 功能描述:连接MQ
     * @author zhangpu
     * @date 2022/4/2
     * @param clientId
     * @param userName
     * @param password
     */
    private static MqttClient connect(String clientId, String userName,String password) throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(userName);
        connOpts.setPassword(password.toCharArray());
        connOpts.setConnectionTimeout(10);// 设置超时时间
        connOpts.setKeepAliveInterval(20); // 设置会话心跳时间
        // broker,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        MqttClient mqttClient = new MqttClient(MqConfig.broker, clientId, persistence);
        mqttClient.setCallback(new PushCallback("test"));//设置回调
        mqttClient.connect(connOpts);
        return mqttClient;
    }

    /**
     * 功能描述:发送消息
     * @author zhangpu
     * @date 2022/4/2
     * @param msg
     * @param clientId
     * @param topic
     */
    private static void publish(String msg, String clientId, String topic) throws MqttException {
        MqttClient mqttClient = connect(clientId, MqConfig.userName, MqConfig.passWord);
        if (mqttClient != null)
        {
            MqttMessage message = new MqttMessage(msg.getBytes());
            message.setQos(MqConfig.qos);
            //重新连接MQTT服务时,不需要接收该主题最新消息,设置retained为false
            //重新连接MQTT服务时,需要接收该主题最新消息,设置retained为true;
            message.setRetained(false);
            mqttClient.publish(topic, message);
        }
        if (mqttClient != null)
        {
            mqttClient.disconnect();//释放连接
        }
    }
    public static void main(String[] args) throws MqttException {
        publish("message content", "client-id-0", "test-topic");
    }
}


发布回调

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author 张璞
 * @date 2022/3/30 17:52
 */
public class PushCallback implements MqttCallback {
    private String threadId;

    public PushCallback(String threadId) {
        this.threadId = threadId;
    }

    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        System.out.println("-------messageArrived-------"+threadId + " " + msg);
    }
}


订阅

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @author 张璞
 * @date 2022/3/30 17:53
 */
public class SubMsg {

    /**
     * 功能描述:连接MQ
     * @author zhangpu
     * @date 2022/4/2
     * @param clientId 客户端唯一标识
     */
    private static MqttClient connect(String clientId) throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        connOpts.setConnectionTimeout(10);
        connOpts.setKeepAliveInterval(20);
        connOpts.setUserName(MqConfig.userName);
        connOpts.setPassword(MqConfig.passWord.toCharArray());
        MqttClient mqttClient = new MqttClient(MqConfig.broker, clientId, persistence);
        mqttClient.connect(connOpts);
        return mqttClient;
    }


    /**
     * 功能描述:订阅消息
     * @author zhangpu
     * @date 2022/4/2
     * @param clientId
     * @param topic
     */
    private static void subMsg(String clientId, String topic) throws MqttException{
        MqttClient mqttClient = connect(clientId);
        if(mqttClient != null)
        {
            int[] Qos  = {MqConfig.qos};
            String[] topics = {topic};
            mqttClient.subscribe(topics, Qos);
        }
        mqttClient.subscribe(topic,MqConfig.qos,new SubMessageListener());
    }

    public static void main(String[] args) throws MqttException{

        subMsg("testSub", "test-topic");
    }
}

订阅监听

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author 张璞
 * @date 2022/4/2 15:46
 */
public class SubMessageListener implements IMqttMessageListener {
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + mqttMessage.getQos());
        System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
    }
}

c集成

/**
 * @author 张璞
 * @date 2022/3/30 17:51
 */
#include <ESP8266WiFi.h>   
#include <PubSubClient.h>       
#include <DNSServer.h>
#include <ESP8266WebServer.h>
#include <WiFiManager.h>  
#include <Ticker.h>       

const char* mqttServer = "xxx.xxx.xxx.xxx";//MQ服务地址
const char* mqttUserName = "xxx"; //户名
const char* mqttPassword = "xxx";// 密码

Ticker ticker;
WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient);
int count;    // Ticker计数用变量

/**
 * @brief 程序启动入口
 * 
 */
void setup() {
    Serial.begin(9600);
    pinMode(LED_BUILTIN, OUTPUT);             // 设置板上LED引脚为输出模式
    digitalWrite(LED_BUILTIN, HIGH);          // 启动后关闭板上LED
    connectEquipment();                       //自动配网
    mqttClient.setServer(mqttServer, 1883);   // 设置MQTT服务器和端口号
    mqttClient.setCallback(receiveCallback);  // 设置MQTT订阅回调函数
    connectMQTTServer();                     // 连接MQTT服务器
    ticker.attach(1, tickerCount);          // Ticker定时对象
}
 
void loop() {
 if (!mqttClient.connected())  // 如果开发板未能成功连接服务器,则尝试连接服务器
  {
    connectMQTTServer();
  }
   mqttClient.loop();
  if (count >= 3)// 每隔3秒钟发布一次信息
  {
    pubMQTTmsg();
    count = 0;
  }
}

void tickerCount(){
  count++;
}

/**
 * @brief 建立网络配置
 * 
 */
void connectEquipment(){
    WiFiManager wifiManager;// 建立WiFiManager对象
    wifiManager.autoConnect("zhangpuArduino", "xxxxxxx");//wifi名:密码
    // WiFi连接成功后将通过串口监视器输出连接成功信息 
    Serial.println(""); 
    Serial.print("ESP8266 Connected to ");
    Serial.println(WiFi.SSID());              // WiFi名称
    Serial.print("IP address:\t");
    Serial.println(WiFi.localIP());           // IP
    Serial.print("MAC address:\t");   
    Serial.println(WiFi.macAddress());
  }

/**
 * @brief 连接mqtt
 * 
 */
void connectMQTTServer(){
  String clientId = "esp8266-" + WiFi.macAddress();
  if (mqttClient.connect(clientId.c_str(), mqttUserName, mqttPassword)) 
  { 
    Serial.println("MQTT Server Connected.");
    Serial.print("Server Address: ");
    Serial.println(mqttServer);
    Serial.print("ClientId: ");
    Serial.println(clientId);
    subscribeTopic(); // 订阅指定主题
  } else {
    Serial.print("MQTT Server Connect Failed. Client State:");
    Serial.println(mqttClient.state());
    delay(3000);
  }   
}
 
/**
 * @brief 发布消息
 * 
 */
void pubMQTTmsg(){
  static int value; // 客户端发布信息用数字
  String topicString = "Taichi-Maker-Pub-" + WiFi.macAddress();
  char publishTopic[topicString.length() + 1];  
  strcpy(publishTopic, topicString.c_str());
  String messageString = "Hello World " + String(value++); 
  char publishMsg[messageString.length() + 1];   
  strcpy(publishMsg, messageString.c_str());
  if(mqttClient.publish(publishTopic, publishMsg))
  {
    Serial.println("Message Publish Success."); 
    Serial.println("Publish Topic:");Serial.println(publishTopic);
    Serial.println("Publish message:");Serial.println(publishMsg);    
  } else {
    Serial.println("Message Publish Failed."); 
  }
}


/**
 * 
 * 订阅主题
 */
void subscribeTopic(){
  String topicString = "Taichi-Maker-Sub-" + WiFi.macAddress();//订阅的主题
  char subTopic[topicString.length() + 1];  
  strcpy(subTopic, topicString.c_str());
  if(mqttClient.subscribe(subTopic)){
    Serial.println("Subscrib Topic:");
    Serial.println(subTopic);
  } else {
    Serial.print("Subscribe Fail...");
  }  
}

/**
 * @brief 收到信息后的回调函数
 * @param topic 
 * @param payload 
 * @param length 
 */
void receiveCallback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message Received [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println("");
  Serial.print("Message Length(Bytes) ");
  Serial.println(length);

  if ((char)payload[0] == '1') {     // 如果收到的信息以“1”为开始
    digitalWrite(BUILTIN_LED, LOW);  // 则点亮LED。
    Serial.println("LED ON");
  } else {                           
    digitalWrite(BUILTIN_LED, HIGH); // 否则熄灭LED。
    Serial.println("LED OFF");
  }
}


版权声明:本文为weixin_42456784原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>