ref: 第三章 MQTT通訊協議
MQTT通訊為一對多的M2M傳輸,使用發佈(Publish)/訂閱(Subscribe)的訊息傳送機制,此機制中包含4個主要的元素,發佈者(Publisher)、訂閱者(Subscriber)、主題(Topic)、訊息中轉站(Broker)。
Publisher為訊息的來源,傳送夾帶有Topic資訊的訊息至Broker,訂閱者向Broker註冊想要接受到之訊息的Topic,例如有一Publisher發佈一則Topic為”Test”的訊息,只要是有對Broker註冊Topic為”Test”的Subscriber都能接收到此訊息。
除了發佈/訂閱的機制外,MQTT通訊協議有幾項特點:
- 使用TCP/IP作為基本的網路連線
- 提供三種訊息傳送服務的QoS
a. QoS0:At most once(最多一次),訊息可能被重複發送或遺失,適合使用於感測器的原始資料傳送,因為下一則訊息將馬上被送出
b. QoS1:At least once(至少一次),保證訊息會被送達,但可能會發生重複發送的情形
c. QoS2:” Exactly once(確保一次),保證訊息只會被送達一次,適用於對高度謹慎之系統 - ...more
- A message has a topic and a payload, like the subject and the content of an e-mail.
- The Publisher sends a message to the network.
- The Subscriber listens for messages with a particular topic.
- The Broker is responsible for coordinating the communication between publishers and subscribers. It can also store messages while subscribers are offline (a feature not used in this tutorial).
- ...more
(ref: iosphere/mosquitto 依自已需求改了一點test5.c裡的test7)
安裝: libssl-dev
快速編譯: git clone 上面連結,替換裡面的test5.c後再make就行了
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"
#if defined(_WINDOWS)
#include <windows.h>
#include <openssl/applink.c>
#define MAXHOSTNAMELEN 256
#define snprintf _snprintf
#else
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#endif
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
/*********************************************************************
Test7: Send and receive big messages
*********************************************************************/
void* test7_payload = NULL;
int test7_payloadlen = 99;
typedef struct
{
MQTTAsync client;
char clientid[24];
char topic[100];
int maxmsgs;
int rcvdmsgs[3];
int sentmsgs[3];
int testFinished;
int subscribed;
} AsyncTestClient;
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void MyLog(int LOGA_level, char* format, ...)
{
static char msg_buf[256];
va_list args;
struct timeb ts;
struct tm *timeinfo;
if (LOGA_level == LOGA_DEBUG)
return;
ftime(&ts);
timeinfo = localtime(&ts.time);
strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
va_start(args, format);
vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
format, args);
va_end(args);
printf("%s\n", msg_buf);
fflush(stdout);
}
#define AsyncTestClient_initializer {NULL, "\0", "\0", 0, {0, 0, 0}, {0, 0, 0}, 0, 0}
#if defined(WIN32) || defined(_WINDOWS)
#define mqsleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD
static DWORD start_time = 0;
START_TIME_TYPE start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define mqsleep sleep
#define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define mqsleep sleep
#define START_TIME_TYPE struct timeval
/* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE start_clock(void)
{
struct timeval start_time;
gettimeofday(&start_time, NULL);
return start_time;
}
#endif
#if defined(WIN32)
long elapsed(START_TIME_TYPE start_time)
{
return GetTickCount() - start_time;
}
#elif defined(AIX)
#define assert(a)
long elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long elapsed(START_TIME_TYPE start_time)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start_time, &res);
return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
}
#endif
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
#define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
#define MAXMSGS 30;
int tests = 0;
int failures = 0;
FILE* xml;
START_TIME_TYPE global_start_time;
char output[3000];
char* cur_output = output;
void write_test_result(void)
{
long duration = elapsed(global_start_time);
fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
if (cur_output != output)
{
fprintf(xml, "%s", output);
cur_output = output;
}
fprintf(xml, "</testcase>\n");
}
void myassert(char* filename, int lineno, char* description, int value,
char* format, ...)
{
++tests;
if (!value)
{
va_list args;
++failures;
printf("Assertion failed, file %s, line %d, description: %s", filename,
lineno, description);
va_start(args, format);
vprintf(format, args);
va_end(args);
cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
description, filename, lineno);
}
else
MyLog(LOGA_DEBUG,
"Assertion succeeded, file %s, line %d, description: %s",
filename, lineno, description);
}
void test7OnConnectFailure(void* context, MQTTAsync_failureData* response)
{
AsyncTestClient* client = (AsyncTestClient*) context;
MyLog(LOGA_DEBUG, "In test7OnConnectFailure callback, %s", client->clientid);
assert("There should be no failures in this test. ", 0, "test7OnConnectFailure callback was called\n", 0);
client->testFinished = 1;
}
void test7OnPublishFailure(void* context, MQTTAsync_failureData* response)
{
AsyncTestClient* client = (AsyncTestClient*) context;
MyLog(LOGA_DEBUG, "In test7OnPublishFailure callback, %s", client->clientid);
assert("There should be no failures in this test. ", 0, "test7OnPublishFailure callback was called\n", 0);
client->testFinished = 1;
}
int test7MessageArrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* message)
{
printf("in test7MessageArrived\n");
AsyncTestClient* tc = (AsyncTestClient*) context;
static int message_count = 0;
int rc, i;
MyLog(LOGA_DEBUG, "In messageArrived callback %p", tc);
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.payload = "{\"Action\": \"RegisterReq\", \"Time\": \"2018-04-13T12:50:00Z\", \"RequestID\": \"xxxxxxx\", \"Version\": \"1.1\"}";
pubmsg.payloadlen = 100;
pubmsg.qos = 0;
pubmsg.retained = 0;
opts.onSuccess = NULL;
opts.onFailure = test7OnPublishFailure;
opts.context = tc;
printf("%s, tc->topic=%s, tc->clientid=%s\n", __func__, tc->topic, tc->clientid);
usleep(50000); //50ms
rc = MQTTAsync_sendMessage(tc->client, "ai/speaker/NerERA/04370927/req", &pubmsg, &opts);
printf("recv the body is %s\n", (char *) message->payload);
if (strlen((char *) message->payload)>1)
tc->testFinished = 1;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test7OnSubscribe(void* context, MQTTAsync_successData* response)
{
printf("in test7OnSubscribe\n");
AsyncTestClient* tc = (AsyncTestClient*) context;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc, i;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p", tc);
pubmsg.payload = "0";
pubmsg.payloadlen = 100;
pubmsg.qos = 0;
pubmsg.retained = 0;
printf("in test7OnSubscribe topic=%s\n", tc->topic);
usleep(50000);
rc = MQTTAsync_send(tc->client, tc->topic, pubmsg.payloadlen, pubmsg.payload,
pubmsg.qos, pubmsg.retained, NULL);
}
void test7OnConnect(void* context, MQTTAsync_successData* response)
{
AsyncTestClient* tc = (AsyncTestClient*) context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test7OnSubscribe;
opts.context = tc;
printf("subscribe, tc->topic=%s\n", tc->topic);
usleep(50000);
rc = MQTTAsync_subscribe(tc->client, tc->topic, 0, &opts);
printf("subscribe_rc=%d\n", rc);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
tc->testFinished = 1;
}
int test7(void)
{
char *testname = "xxxxxxxx";
char *broker="ssl://happy.ai.test.net:8888";
char *server_key_file="../../../test/ssl/ROOTeCA_64.crt";
int subsqos = 0;
AsyncTestClient tc =
AsyncTestClient_initializer;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc = 0;
int test_finished;
test_finished = failures = 0;
MyLog(LOGA_INFO, "Starting test 7 - big messages");
fprintf(xml, "testcase classname=\"test5\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, broker, "async_test_7", MQTTCLIENT_PERSISTENCE_NONE,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, &tc, NULL, test7MessageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
tc.client = c;
sprintf(tc.clientid, "%s", testname);
sprintf(tc.topic, "ai/speaker/NerERA/04370927/rsp");//topic_s
tc.maxmsgs = MAXMSGS;
//tc.rcvdmsgs = 0;
tc.subscribed = 0;
tc.testFinished = 0;
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "de0236c6ece1dd2dfce194c7c4b8d54";
opts.password = "de0236c6ece1dd2dfce194c7c4b8d54";
opts.will = &wopts;
opts.will->message = "{\"Action\": \"RegisterReq\", \"Time\": \"2018-03-14T11:50:00Z\", \"RequestID\": \"xxxxxxxx\", \"Version\": \"1.1\"}";
//opts.will->qos = 1;
opts.will->qos = 0;
opts.will->retained = 0;
opts.will->topicName = "ai/speaker/NerERA/04370927/req"; //topic_p
opts.will = NULL;
opts.onSuccess = test7OnConnect;
opts.onFailure = test7OnConnectFailure;
opts.context = &tc;
sslopts.struct_version = 1;//Must be 0, or 1 to enable TLS version selection
sslopts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
sslopts.enableServerCertAuth=0;
opts.ssl = &sslopts;
if (server_key_file != NULL)
opts.ssl->trustStore = "../../../test/ssl/ROOTeCA_64.crt"; /*file of certificates trusted by client*/
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!tc.testFinished)
#if defined(WIN32)
Sleep(100);
#else
usleep(1000L);
#endif
MQTTAsync_destroy(&c);
exit: MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
printf("%s\n", message);
}
int main(int argc, char** argv)
{
int rc = 0;
xml = fopen("TEST-test5.xml", "w");
fprintf(xml, "<testsuite name=\"test5\" tests=7>\n");
test7();
if (rc == 0)
MyLog(LOGA_INFO, "verdict pass");
else
MyLog(LOGA_INFO, "verdict fail");
fprintf(xml, "</testsuite>\n");
fclose(xml);
return rc;
}
ref:Sending and Receiving Messages with MQTT
MQTTTransport_mbedTLS.cpp
iosphere/mosquitto
MQTT C 接入示例
potato papa
沒有留言:
張貼留言