更多内容详见【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译)

一、Mosquitto中的QoS定义

MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。
MQTT协议中定义了三个不同等级的QoS:

QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不进行确认,也不关心消息是否到达订阅者。这种QoS等级的消息传输效率最高,但可靠性最低。

QoS 1:最少一次(At least once)传输。消息发布者会发送消息,并等待确认。如果消息没有被确认,会再次发送,直到收到确认为止。这种QoS等级的消息传输可靠性较高,但效率稍低。

QoS 2:恰好一次(Exactly once)传输。消息发布者发送消息,并等待确认。如果没有收到确认,会再次发送,直到收到确认为止。订阅者接收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。这种QoS等级的消息传输可靠性最高,但效率最低。

QoS1和3区别

在MQTT协议中,客户端和服务端可以通过协商来确定消息传输的QoS等级,以满足消息传输效率和可靠性的要求。
MQTT协议中的QoS 1和QoS 2都保证了消息的可靠性,但它们之间有一些区别。
QoS 1(最少一次)保证了消息至少会被传输一次,但不保证消息一定会被准确地传输一次。在QoS 1级别下,发布者发送消息并等待一个确认,如果没有收到确认,会再次发送消息,直到收到确认为止。但是,如果确认消息丢失或延迟,可能会导致发布者不断地发送相同的消息。因此,QoS 1级别下可能会出现重复的消息。
QoS 2(恰好一次)保证了消息会被恰好传输一次,不会出现重复的消息。在QoS 2级别下,发布者发送消息并等待一个确认。订阅者收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。如果订阅者收到重复的消息,可以通过消息中的标识符进行去重,以保证只处理一次。
综上所述,QoS 2级别下的消息传输可靠性更高,但是会带来更大的网络开销和延迟。在选择QoS等级时,需要根据具体的应用需求来确定。

二、安装base64库

base64库用于完成图片与base64之间的转换工作。
git clone https://github.com/ReneNyffenegger/cpp-base64.git
得到base64.h及base64.cpp,直接在工程中进行引用即可。

三、cjson简介

由于Mosquitto传递字节消息,需使用CJSON完成对象的序列化:
如果需要将C语言结构体转化为JSON字符串,需要手动遍历结构体中的成员,并使用CJSON提供的接口将它们转化为JSON对象。
cjson库在上一篇博文中已安装。这里使用cjson获取字符串实现mqtt节点间的信息传递

四、主程序代码

视频在拉流后,视频帧到不同算法间的关系是发布/订阅关系,
下面代码进行了2节点简单模拟,文中对于Mosquitto原始库做了一个简易封装,了解逻辑即可

#include "mqtt-client.hpp"
#include <opencv2/opencv.hpp>
#include <base64.h>
#include "cJSON.h"
#include <fstream>
using namespace std;
string matToBase64(const cv::Mat& image) {
    // 将图像编码为base64字符串
    vector<uchar> data;
    cv::imencode(".jpg", image, data);
    string base64_img = base64_encode(data.data(), data.size());
    return base64_img;
}
cv::Mat base64ToMat(const std::string& base64_str) {
    // 解码base64字符串
    std::string decoded_str = base64_decode(base64_str);
    // 将解码后的字符串转换为cv::Mat
    cv::Mat image = cv::imdecode(std::vector<uchar>(decoded_str.begin(), decoded_str.end()), cv::IMREAD_COLOR);
    return image;
}
int main(){  
    typedef struct {
    int id;
    std::string pic;
    } Student;
    // 读取PNG图片为Mat
    cv::Mat image = cv::imread("/home/trtlearn/mqtt-client/workspace/pic.jpg");
    string base64_img = matToBase64(image);
    // 创建一个JSON对象
    cJSON *root = cJSON_CreateObject();
    // 将结构体成员转化为JSON对象
    cJSON_AddNumberToObject(root, "num", 0.75);
    cJSON_AddStringToObject(root, "pic", base64_img.c_str());
    // 将JSON对象转换为JSON字符串
    char* json_str = cJSON_Print(root);
    cJSON_Delete(root);
    //新建两个客户端,其中一个发布消息接收数值,另一个接收图片。其中第一个节点即是发布者又是订阅者。
    string strPic(json_str);
    MQTTInitializer _;
    MQTTConnectCredential credential;
    credential.client_id = "wish";
    credential.subscribe_topic = "cc";
    credential.server_address = "127.0.0.1";
    credential.port = 9999;
    MQTTConnectCredential credential2;
    credential2.client_id = "wish1";
    credential2.subscribe_topic = "cc";
    credential2.server_address = "127.0.0.1";
    credential2.port = 9999;
    auto client = create_mqtt_client(credential);
    auto client2 = create_mqtt_client(credential2);

    if(client == nullptr){
        printf("Failed to create client.\n");
        return -1;
    }
    if(client2 == nullptr){
        printf("Failed to create client2.\n");
        return -1;
    }
    printf("Create client success.\n");
    //设置订阅后,收到消息时的回调函数
    client->set_message_callback([](MQTTClient* client, std::string topic, std::string message){
        cJSON *root = cJSON_Parse(message.c_str());
        cJSON *num = cJSON_GetObjectItem(root, "num");
        printf("客户端1收到 [%s] 数值: %f\n", topic.c_str(), (float)(num->valuedouble));
        cJSON_Delete(root);
    });
    client2->set_message_callback([&](MQTTClient* client, std::string topic, std::string message){
        cJSON *root = cJSON_Parse(message.c_str());
        cJSON *pics = cJSON_GetObjectItem(root, "pic");
        cv::Mat image=base64ToMat(string(pics->valuestring));
    if (!image.empty()) {
        cv::imwrite("receiver.jpg",image);
    }
        printf("客户端2收到 [%s] 图片\n", topic.c_str());
        cJSON_Delete(root);
    });
    while(true){
        char c = getchar();
        if(c == 'q'){
            break;
        }

        if(c == 's'){
            if(client->publish("cc", strPic)){
                printf("Publish success\n");
            }else{
                printf("Publish failed\n");
            }
        }
    }
    printf("Done.\n");
    return 0;
}

运行程序,输入s发布数值与图片信息,打印订阅输出结果:
【MQTT协议】使用Mosquitto实现mqtt协议(二):编写视频帧的发布/订阅服务-LMLPHP

五、调用Mosquitto库使用的cmakelist

cmake_minimum_required(VERSION 3.0)
project(pro)
add_definitions(-std=c++11)

option(CUDA_USE_STATIC_CUDA_RUNTIME OFF)
set(CMAKE_CXX_STANDARD 11)
#set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Debug)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/workspace)

set(OpenCV_DIR   "/usr/local/opencv/lib64/cmake/opencv4")
set(mosquitto_Include   "/usr/local/mosquitto/include/")
set(CJSON_INCLUDE   "/usr/local/include/cjson")
set(mosquitto_DIR   "/usr/local/mosquitto")
set(mosquitto_LIBS mosquitto mosquittopp)
set(CJSON_LIBS cjson)

find_package(OpenCV)

include_directories(
    ${PROJECT_SOURCE_DIR}/src
    ${OpenCV_INCLUDE_DIRS}
    ${mosquitto_Include}
    ${CJSON_INCLUDE}
)

link_directories(
    ${TENSORRT_DIR}/lib
    ${CUDA_DIR}/lib64
    ${CUDNN_DIR}/lib
    ${mosquitto_DIR}/lib
)

set(CMAKE_CXX_FLAGS  "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -O0 -Wfatal-errors -pthread -w -g")

#递归地添加的相关文件
file(GLOB_RECURSE cpp_srcs ${PROJECT_SOURCE_DIR}/src/*.cpp)
file(GLOB_RECURSE c_srcs ${PROJECT_SOURCE_DIR}/src/*.c)

add_executable(pro ${cpp_srcs} ${c_srcs})

target_link_libraries(pro ${OpenCV_LIBS} ${mosquitto_LIBS} ${CJSON_LIBS})

add_custom_target(
    run
    DEPENDS pro
    WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/workspace
    COMMAND ./pro
)
04-14 20:37