0

I am trying to connect to mqtt.thingsboard.cloud server with mqtt tls protocol but I encounter the following error :

00:00:18.056,000\] \<inf\> net_mqtt_publisher_sample: Attempting to connect to broker at 23.20.176.222:
\[00:00:18.056,000\] \<inf\> net_mqtt_publisher_sample: Connecting to MQTT broker... Attempt 1
\[00:00:18.058,000\] \<inf\> dns_resolve: mqtt.thingsboard.cloud IPv4 address: 23.20.176.222
\[00:00:18.058,000\] \<inf\> dns_resolve: mqtt.thingsboard.cloud IPv4 address: 54.159.242.170
\[00:00:18.058,000\] \<inf\> dns_resolve: mqtt.thingsboard.cloud IPv4 address: 52.73.3.142
\[00:00:18.058,000\] \<inf\> dns_resolve: mqtt.thingsboard.cloud IPv4 address: 50.19.226.55
\[00:00:18.058,000\] \<inf\> dns_resolve: DNS resolving finished
\[00:00:18.147,000\] \<inf\> net_mqtt_publisher_sample: mqtt_connect: -22 \<ERROR\>
\[00:00:18.147,000\] \<err\> net_mqtt_publisher_sample: mqtt_connect failed: -22
\[00:00:18.647,000\] \<inf\> net_mqtt_publisher_sample: Attempting to connect to broker at 50.19.226.55:
\[00:00:18.647,000\] \<inf\> net_mqtt_publisher_sample: Connecting to MQTT broker... Attempt 2
\[00:00:18.738,000\] \<inf\> net_mqtt_publisher_sample: mqtt_connect: -22 \<ERROR\>
\[00:00:18.738,000\] \<err\> net_mqtt_publisher_sample: mqtt_connect failed: -22
\[00:00:19.239,000\] \<inf\> net_mqtt_publisher_sample: Attempting to connect to broker at 50.19.226.55:
\[00:00:19.239,000\] \<inf\> net_mqtt_publisher_sample: Connecting to MQTT broker... Attempt 3
\[00:00:19.239,000\] \<inf\> net_mqtt_publisher_sample: mqtt_connect: -2 \<ERROR\>
\[00:00:19.239,000\] \<err\> net_mqtt_publisher_sample: mqtt_connect failed: -2

My code run well without tls configuration but when i add the tls_init function, it returns mqtt_connect failed: -2 :

#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);

#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/random/random.h>
#include <zephyr/net/tls_credentials.h>
#include <zephyr/net/dns_resolve.h>


#include <string.h>
#include <errno.h>

#include "config.h"
#include "dns_resolve.h"
#include "credentials.h"

#if defined(CONFIG_USERSPACE)
#include <zephyr/app_memory/app_memdomain.h>
K_APPMEM_PARTITION_DEFINE(app_partition);
struct k_mem_domain app_domain;
#define APP_BMEM K_APP_BMEM(app_partition)
#define APP_DMEM K_APP_DMEM(app_partition)
#else
#define APP_BMEM
#define APP_DMEM
#endif

/* Buffers for MQTT client. */
static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];


/* The mqtt client struct */
static APP_BMEM struct mqtt_client client_ctx;

/* MQTT Broker details. */
static APP_BMEM struct sockaddr_storage broker;


static APP_BMEM struct zsock_pollfd fds[1];
static APP_BMEM int nfds;

static APP_BMEM bool connected;

  /* #define TLS_SNI_HOSTNAME "mqtt.thingsboard.cloud" */

#define APP_CA_CERT_TAG 1
#define APP_PRIVATE_KEY_TAG 2

// Tableau des tags de sécurité utilisés pour les sessions TLS
static const sec_tag_t m_sec_tags[] = {
    APP_CA_CERT_TAG,
    APP_PRIVATE_KEY_TAG,
};

static int tls_init()
{
    int err;
    LOG_INF("Setting up credentials...");

    // Ajouter le certificat CA
    err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
                             ca_cert, strlen(ca_cert));
    if (err < 0) {
        LOG_ERR("Failed to register CA certificate: %d", err);
        return err;
    }

    // Ajouter le certificat serveur
    err = tls_credential_add(APP_PRIVATE_KEY_TAG, TLS_CREDENTIAL_SERVER_CERTIFICATE,
                             client_cert, strlen(client_cert));
    if (err < 0) {
        LOG_ERR("Failed to add device certificate: %d", err);
        return err;
    }

    // Ajouter la clé privée du serveur
    err = tls_credential_add(APP_PRIVATE_KEY_TAG, TLS_CREDENTIAL_PRIVATE_KEY,
                             client_key, strlen(client_key));
    if (err < 0) {
        LOG_ERR("Failed to add device private key: %d", err);
        return err;
    }

    LOG_INF("Credentials setup successfully");
    return 0;
}


static void prepare_fds(struct mqtt_client *client)
{
    if (client->transport.type == MQTT_TRANSPORT_SECURE) {
        fds[0].fd = client->transport.tls.sock;
    }

    fds[0].events = ZSOCK_POLLIN;
    nfds = 1;
}

static void clear_fds(void)
{
    nfds = 0;
}

static int wait(int timeout)
{
    int ret = 0;

    if (nfds > 0) {
        ret = zsock_poll(fds, nfds, timeout);
        if (ret < 0) {
            LOG_ERR("poll error: %d", errno);
        }
    }

    return ret;
}

void mqtt_evt_handler(struct mqtt_client *const client,
              const struct mqtt_evt *evt)
{
    int err;

    switch (evt->type) {
    case MQTT_EVT_CONNACK:
        if (evt->result != 0) {
            LOG_ERR("MQTT connect failed %d", evt->result);
            break;
        }

        connected = true;
        LOG_INF("MQTT client connected!");

        break;

    case MQTT_EVT_DISCONNECT:
        LOG_INF("MQTT client disconnected %d", evt->result);

        connected = false;
        clear_fds();

        break;

    case MQTT_EVT_PUBACK:
        if (evt->result != 0) {
            LOG_ERR("MQTT PUBACK error %d", evt->result);
            break;
        }

        LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);

        break;

    case MQTT_EVT_PUBREC:
        if (evt->result != 0) {
            LOG_ERR("MQTT PUBREC error %d", evt->result);
            break;
        }

        LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);

        const struct mqtt_pubrel_param rel_param = {
            .message_id = evt->param.pubrec.message_id
        };

        err = mqtt_publish_qos2_release(client, &rel_param);
        if (err != 0) {
            LOG_ERR("Failed to send MQTT PUBREL: %d", err);
        }

        break;

    case MQTT_EVT_PUBCOMP:
        if (evt->result != 0) {
            LOG_ERR("MQTT PUBCOMP error %d", evt->result);
            break;
        }

        LOG_INF("PUBCOMP packet id: %u",
            evt->param.pubcomp.message_id);

        break;

    case MQTT_EVT_PINGRESP:
        LOG_INF("PINGRESP packet");
        break;

    default:
        break;
    }
}

static char *get_mqtt_payload(enum mqtt_qos qos)
{
    static APP_BMEM char payload[50];
    
    int temperature = (uint8_t)sys_rand32_get() % 100; 
    snprintk(payload, sizeof(payload), "{\"temperature\": %d}", temperature);

    return payload;
}

static char *get_mqtt_topic(void)
{
    return "v1/devices/me/telemetry";
}

static int publish(struct mqtt_client *client, enum mqtt_qos qos)
{
    struct mqtt_publish_param param;

    param.message.topic.qos = qos;
    param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
    param.message.topic.topic.size =
            strlen(param.message.topic.topic.utf8);
    param.message.payload.data = get_mqtt_payload(qos);
    param.message.payload.len =
            strlen(param.message.payload.data);
    param.message_id = sys_rand32_get();
    param.dup_flag = 0U;
    param.retain_flag = 0U;

    return mqtt_publish(client, &param);
}

#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")

#define PRINT_RESULT(func, rc) \
    LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))

static void broker_init(void)
{
    struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;

    broker4->sin_family = AF_INET;
    broker4->sin_port = htons(SERVER_PORT);
    zsock_inet_pton(AF_INET, resolved_ip, &broker4->sin_addr);
}

static void client_init(struct mqtt_client *client)
{
    mqtt_client_init(client);

    broker_init();

    /* MQTT client configuration */
    client->broker = &broker;
    client->evt_cb = mqtt_evt_handler;
    client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
    client->client_id.size = strlen(MQTT_CLIENTID);
    client->user_name = NULL;
    client->password = NULL;

    client->protocol_version = MQTT_VERSION_3_1_1;

    LOG_INF("Attempting to connect to broker at %s:", resolved_ip);
    
    /* MQTT buffers configuration */
    client->rx_buf = rx_buffer;
    client->rx_buf_size = sizeof(rx_buffer);
    client->tx_buf = tx_buffer;
    client->tx_buf_size = sizeof(tx_buffer);

    /* MQTT transport configuration */
    client->transport.type = MQTT_TRANSPORT_SECURE;
    struct mqtt_sec_config *tls_config = &client->transport.tls.config;

    tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
    tls_config->cipher_list = NULL;
    tls_config->sec_tag_list = m_sec_tags;
    tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
    tls_config->hostname = SERVER_ADDR;
    tls_config->cert_nocopy = TLS_CERT_NOCOPY_OPTIONAL;

 }

/* In this routine we block until the connected variable is 1 */
static int try_to_connect(struct mqtt_client *client)
{
    int rc, i = 0;

    rc = do_ipv4_lookup();
    if (rc != 0) {
        LOG_ERR("DNS lookup failed: %d", rc);
        return rc;
    }

    while (i++ < APP_CONNECT_TRIES && !connected) {

        client_init(client);

        LOG_INF("Connecting to MQTT broker... Attempt %d", i);

        rc = mqtt_connect(client);
        PRINT_RESULT("mqtt_connect", rc);
        if (rc != 0) {
            LOG_ERR("mqtt_connect failed: %d", rc);
            k_sleep(K_MSEC(APP_SLEEP_MSECS));
            continue;
        }

        prepare_fds(client);

        if (wait(APP_CONNECT_TIMEOUT_MS)) {
            mqtt_input(client);
        }

        if (!connected) {
            mqtt_abort(client);
        }
    }

    if (connected) {
        return 0;
    }

    return -EINVAL;
}

static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
{
    int64_t remaining = timeout;
    int64_t start_time = k_uptime_get();
    int rc;

    while (remaining > 0 && connected) {
        if (wait(remaining)) {
            rc = mqtt_input(client);
            if (rc != 0) {
                PRINT_RESULT("mqtt_input", rc);
                return rc;
            }
        }

        rc = mqtt_live(client);
        if (rc != 0 && rc != -EAGAIN) {
            PRINT_RESULT("mqtt_live", rc);
            return rc;
        } else if (rc == 0) {
            rc = mqtt_input(client);
            if (rc != 0) {
                PRINT_RESULT("mqtt_input", rc);
                return rc;
            }
        }

        remaining = timeout + start_time - k_uptime_get();
    }

    return 0;
}

#define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } }
#define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }

static int publisher(void)
{
    int i, rc, r = 0;

    LOG_INF("attempting to connect: ");
    rc = try_to_connect(&client_ctx);
    PRINT_RESULT("try_to_connect", rc);
    SUCCESS_OR_EXIT(rc);

    i = 0;
    while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) {
        r = -1;

        rc = mqtt_ping(&client_ctx);
        PRINT_RESULT("mqtt_ping", rc);
        SUCCESS_OR_BREAK(rc);

        rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
        SUCCESS_OR_BREAK(rc);

        rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
        PRINT_RESULT("mqtt_publish", rc);
        SUCCESS_OR_BREAK(rc);

        rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
        SUCCESS_OR_BREAK(rc);

        rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
        PRINT_RESULT("mqtt_publish", rc);
        SUCCESS_OR_BREAK(rc);

        rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
        SUCCESS_OR_BREAK(rc);

        rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
        PRINT_RESULT("mqtt_publish", rc);
        SUCCESS_OR_BREAK(rc);

        rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
        SUCCESS_OR_BREAK(rc);

        r = 0;
    }

    rc = mqtt_disconnect(&client_ctx);
    PRINT_RESULT("mqtt_disconnect", rc);

    LOG_INF("Bye!");

    return r;
}

static int start_app(void)
{
    int r = 0, i = 0;

    while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS ||
           i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
        r = publisher();

        if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
            k_sleep(K_MSEC(5000));
        }
    }

    return r;
}

#if defined(CONFIG_USERSPACE)
#define STACK_SIZE 2048

#if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
#define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
#else
#define THREAD_PRIORITY K_PRIO_PREEMPT(8)
#endif

K_THREAD_DEFINE(app_thread, STACK_SIZE,
        start_app, NULL, NULL, NULL,
        THREAD_PRIORITY, K_USER, -1);

static K_HEAP_DEFINE(app_mem_pool, 1024 * 2);
#endif

int main(void)
{
    int rc;

    rc = tls_init();
    PRINT_RESULT("tls_init", rc);

#if defined(CONFIG_USERSPACE)
    int ret;

    struct k_mem_partition *parts[] = {
#if Z_LIBC_PARTITION_EXISTS
        &z_libc_partition,
#endif
        &app_partition
    };

    ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts);
    __ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret);
    ARG_UNUSED(ret);

    k_mem_domain_add_thread(&app_domain, app_thread);
    k_thread_heap_assign(app_thread, &app_mem_pool);

    k_thread_start(app_thread);
    k_thread_join(app_thread, K_FOREVER);
#else
    exit(start_app());
#endif
    return 0;
}

and prj.conf :

CONFIG_NETWORKING=y
CONFIG_NET_SOCKETS=y
CONFIG_NET_TCP=y
CONFIG_NET_LOG=y

CONFIG_NET_IPV6_RA_RDNSS=y
CONFIG_NET_IF_UNICAST_IPV6_ADDR_COUNT=3
CONFIG_NET_IF_MCAST_IPV6_ADDR_COUNT=2

CONFIG_PRINTK=y
CONFIG_STDOUT_CONSOLE=y

# Enable IPv6 support
CONFIG_NET_IPV6=n
# Enable IPv4 support
CONFIG_NET_IPV4=y

# Enable the DNS resolver
CONFIG_DNS_RESOLVER=y

CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1"
CONFIG_NET_CONFIG_PEER_IPV4_ADDR="mqtt.thingsboard.cloud"


# Enable the MQTT Lib
CONFIG_MQTT_LIB=y

CONFIG_NET_CONFIG_SETTINGS=y


CONFIG_MAIN_STACK_SIZE=2048

CONFIG_NET_DHCPV4=y

# For IPv6
CONFIG_NET_BUF_DATA_SIZE=256

CONFIG_NET_SHELL=y

CONFIG_ENTROPY_GENERATOR=y
CONFIG_TEST_RANDOM_GENERATOR=y

# TLS configuration
CONFIG_MQTT_LIB_TLS=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
CONFIG_MBEDTLS=y
CONFIG_MBEDTLS_BUILTIN=y
CONFIG_MBEDTLS_ENABLE_HEAP=y

CONFIG_TLS_CREDENTIAL_FILENAMES=y

CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y

/*
 * Copyright (c) 2017 Intel Corporation
 *
 * SPDX-License-Identifier: Apache-2.0
 */

#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(dns_resolve, LOG_LEVEL_DBG);

#include "dns_resolve.h"
#include "config.h"

#define DNS_TIMEOUT (2 * MSEC_PER_SEC)

char resolved_ip[NET_IPV4_ADDR_LEN]; // Global variable to store the resolved IP address

void dns_result_cb(enum dns_resolve_status status, struct dns_addrinfo *info, void *user_data) {
    char hr_addr[NET_IPV4_ADDR_LEN];
    char *hr_family;
    void *addr;

    switch (status) {
        case DNS_EAI_CANCELED:
            LOG_INF("DNS query was canceled");
            return;
        case DNS_EAI_FAIL:
            LOG_INF("DNS resolve failed");
            return;
        case DNS_EAI_NODATA:
            LOG_INF("Cannot resolve address");
            return;
        case DNS_EAI_ALLDONE:
            LOG_INF("DNS resolving finished");
            return;
        case DNS_EAI_INPROGRESS:
            break;
        default:
            LOG_INF("DNS resolving error (%d)", status);
            return;
    }

    if (!info) {
        return;
    }

    if (info->ai_family == AF_INET) {
        hr_family = "IPv4";
        addr = &net_sin(&info->ai_addr)->sin_addr;
    } else {
        LOG_ERR("Invalid IP address family %d", info->ai_family);
        return;
    }

    net_addr_ntop(info->ai_family, addr, hr_addr, sizeof(hr_addr));
    LOG_INF("%s %s address: %s", user_data ? (char *)user_data : "<null>",
            hr_family, hr_addr);
    strncpy(resolved_ip, hr_addr, sizeof(resolved_ip) - 1);
    resolved_ip[sizeof(resolved_ip) - 1] = '\0';
}

int do_ipv4_lookup(void) {
    static const char *query = SERVER_ADDR;
    static uint16_t dns_id;
    int ret;

    ret = dns_get_addr_info(query, DNS_QUERY_TYPE_A, &dns_id, dns_result_cb, (void *)query, DNS_TIMEOUT);
    if (ret < 0) {
        LOG_ERR("Cannot resolve IPv4 address (%d)", ret);
        return ret;
    }

    LOG_DBG("DNS id %u", dns_id);
    return 0;
}
#ifndef DNS_RESOLVE_H
#define DNS_RESOLVE_H

#include <zephyr/net/socket.h>
#include <zephyr/net/dns_resolve.h>

extern char resolved_ip[NET_IPV4_ADDR_LEN];

void dns_result_cb(enum dns_resolve_status status, struct dns_addrinfo *info, void *user_data);
int do_ipv4_lookup(void);

#endif // DNS_RESOLVE_H

and this i how the certificate.h is implemented

// credentials.h
static const char ca_cert[] = 
    "-----BEGIN CERTIFICATE-----\n"
    "MIIDdTCCAl2gAwIBAgIJ..."
    "-----END CERTIFICATE-----\n";

static const char client_cert[] = 
    "-----BEGIN CERTIFICATE-----\n"
    "MIIEqDCCA5CgAwIBAgIBA..."
    "-----END CERTIFICATE-----\n";

static const char client_key[] = 
    "-----BEGIN PRIVATE KEY-----\n"
    "MIIJKQIBAAKCAgEA7..."
    "-----END PRIVATE KEY-----\n";
config.h : /*
 * Copyright (c) 2017 Intel Corporation
 *
 * SPDX-License-Identifier: Apache-2.0
 */

#ifndef __CONFIG_H__
#define __CONFIG_H__

#define SERVER_ADDR        "mqtt.thingsboard.cloud"

#define SERVER_PORT        8883

#define APP_CONNECT_TIMEOUT_MS    2000
#define APP_SLEEP_MSECS        500

#define APP_CONNECT_TRIES    10

#define APP_MQTT_BUFFER_SIZE    10

#define MQTT_CLIENTID        "RD23 - Demo Device 03"

#endif

i think the problem comes from tls_init function and certificats in pem format but I don't see what's missing.

3
  • where have you set a port number & what is the default value?
    – hardillb
    Commented Jul 11 at 6:02
  • Please do not try and post code as comments, edit the question if needed.
    – hardillb
    Commented Jul 11 at 9:39
  • Hello hardillb I add this information.
    – Hally_BDX
    Commented Jul 11 at 19:50

0

Browse other questions tagged or ask your own question.