最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

整合MQTT

来源:博客园

1、步骤(1)dependencecom.google.code.gsongsonorg.springframework.integrationspring-integration-streamorg.springframework.integrationspring-integration-mqtt(2)application.properties


(资料图)

用户名(这里为空)

mqtt.username=iot

密码(这里为空)

mqtt.password=iot

推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.60.133:1883

mqtt.url=tcp://ccc.lmuiot.cn:1883

客户端ID(这里使用随机数)

mqtt.clientId=$

默认的推送主题,实际可在调用接口时指定

mqtt.sender.defaultTopic=test

默认的接收主题,实际可在调用接口时指定

mqtt.receiver.defaultTopic=testaa/#,testbb(3)Configuration@Configurationpublic class MqttConfig {/*** 发布的bean名称*/public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";public static final String CHANNEL_NAME_IN = "mqttInboundChannel";// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息private static final byte[] WILL_DATA;static {WILL_DATA = "offline".getBytes();}

@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.url}")private String url;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.sender.defaultTopic}")private String defaultTopic;@Value("${mqtt.receiver.defaultTopic}")private String defaultreceiverTopic;/** * MQTT连接器选项 */@Beanpublic MqttConnectOptions getSenderMqttConnectOptions(){    MqttConnectOptions options=new MqttConnectOptions();    // 设置连接的用户名    System.out.println(username);    if(!username.trim().equals("")){        options.setUserName(username);    }    // 设置连接的密码    options.setPassword(password.toCharArray());    // 设置连接的地址    options.setServerURIs(new String[]{url});    // 设置超时时间 单位为秒    options.setConnectionTimeout(100);    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线    // 但这个方法并没有重连的机制    options.setKeepAliveInterval(30);    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。    options.setWill("willTopic", WILL_DATA, 2, false);    return options;}/** * MQTT客户端 */@Beanpublic MqttPahoClientFactory senderMqttClientFactory() {    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();    factory.setConnectionOptions(getSenderMqttConnectOptions());    return factory;}/** * MQTT信息通道(生产者) */@Bean(name = CHANNEL_NAME_OUT)public MessageChannel mqttOutboundChannel() {    return new DirectChannel();}/** * MQTT消息处理器(生产者) */@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)public MessageHandler mqttOutbound() {    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, senderMqttClientFactory());    messageHandler.setAsync(true);    messageHandler.setDefaultTopic(defaultTopic);    return messageHandler;}/** * MQTT信息通道(消费者) */@Bean(name = CHANNEL_NAME_IN)public MessageChannel mqttInboundChannel() {    return new DirectChannel();}/** * MQTT消息订阅绑定(消费者) */@Beanpublic MessageProducer inbound() {    // 可以同时消费(订阅)多个Topic    MqttPahoMessageDrivenChannelAdapter adapter =            new MqttPahoMessageDrivenChannelAdapter(                    clientId, senderMqttClientFactory(),                    StringUtils.split(defaultreceiverTopic, ","));    adapter.setCompletionTimeout(5000);    adapter.setConverter(new DefaultPahoMessageConverter());    adapter.setQos(1);    // 设置订阅通道    adapter.setOutputChannel(mqttInboundChannel());    return adapter;}/** * MQTT消息处理器(消费者) */@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)public MessageHandler handler() {    return new MqttCaseServiceImpl();}

}(4)MqttCaseServiceImplpublic class MqttCaseServiceImpl implements MessageHandler {@AutowiredGson gson;@AutowiredSensor sensor;

@Overridepublic void handleMessage(Message arg0) throws MessagingException {// TODO Auto-generated method stubSystem.out.println("ok 00");    String topic = (String) arg0.getHeaders().get("mqtt_receivedTopic");    String payload = (String) arg0.getPayload();    System.out.println("headers:"+topic+"   "+payload);    if(topic.equals("testaa/sensor"))    {     System.out.println("testaa/sensor"+"   "+payload);     try {     sensor=gson.fromJson(payload, Sensor.class); System.out.println("sensor:"+sensor.toString());} catch (JsonSyntaxException e) {// TODO Auto-generated catch blocke.printStackTrace();}    }}

}(5)IMqttSender/**

  • MQTT生产者消息发送接口/@Component@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)public interface IMqttSender {/*

    • 发送信息到MQTT服务器
    • @param data 发送的文本/void sendToMqtt(String data);/*
    • 发送信息到MQTT服务器
    • @param topic 主题
    • @param payload 消息主体/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*
    • 发送信息到MQTT服务器
    • @param topic 主题
    • @param qos 对消息处理的几种机制。
    • 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
    • 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
    • 2 多了一次去重的动作,确保订阅者收到的消息有一次。
    • @param payload 消息主体/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);}(6)发送Controller@RestControllerpublic class MQTTController {/*
    • 注入发送MQTT的Bean*/@Autowiredprivate IMqttSender iMqttSender;

    // 发送自定义消息内容(使用默认主题)@RequestMapping("/test1/{data}")public void test1(@PathVariable("data") String data) {System.out.println(data);iMqttSender.sendToMqtt(data);}

    // 发送自定义消息内容,且指定主题@RequestMapping("/test2/{topic}/{data}")public void test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {iMqttSender.sendToMqtt(topic, data);}}2、测试(1)发送消息

(2)接收消息

关键词: 发送信息 信息通道 表示的是