Skip to content

[Bug] Continuous ownership of shared memory #1267

@lawaarch

Description

@lawaarch

Describe the bug

I created a shared memory using z_shm_provider_default_new. Each time I send data, I call z_shm_provider_alloc_gc_defrag_blocking to obtain the shared memory, and then call z_bytes_from_shm_mut and z_put to publish it. The subscriber side calls z_shm_clone for data migration. However, during my testing, I found that every time the publisher re-runs, it will create a new shared memory, and when the program exits normally, this memory will be assigned to the subscriber. Is it because the interface process I used is incorrect? This is the cause of the problem.

To reproduce

pub_shm_demo

#include <string.h>
#include <stdio.h>
#include "zenoh.h"
#include "chrono"
#include <cinttypes>
#include <iomanip>
#include <iostream>
#include <thread>
#include <unistd.h>
#include <string>
#include "struct_test.h"
// 宏开关:1=指定IP连接, 0=使用默认配置(依赖multicast scouting)
#define USE_EXPLICIT_ENDPOINT 0
// 目标设备IP地址(当 USE_EXPLICIT_ENDPOINT=1 时使用)
#define TARGET_IP "192.168.20.146"
#define TARGET_PORT 7447
void log(const std::string &message)
{
    auto now = std::chrono::system_clock::now();
    auto in_time_t = std::chrono::system_clock::to_time_t(now);
    std::cout << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X")
              << " [pub] " << message << std::endl;
}
int main(int argc, char **argv)
{
    // zenoh框架变量
    z_owned_config_t config;   // 会话配置
    z_owned_session_t s;       // 会话
    z_view_keyexpr_t key_expr; // 发布地址
    z_put_options_t options;   // 发布选项
    // 共享内存变量
    z_owned_shm_provider_t provider;
    // 辅助测试变量
    const int DATA_SIZE = 4096 * 2160 * 3; // 4K, 3-channel image size
    const size_t TOTAL_PAYLOAD_SIZE = sizeof(DataTable) + DATA_SIZE;
    int loop_count = 3;
    size_t total_bytes_sent_all_iterations = 0;
    // --- Configuration ---
    z_config_default(&config);
#if USE_EXPLICIT_ENDPOINT
    // 配置 peer 模式,指定 connect endpoint
    std::string config_json = R"({
      "mode": "peer",
      "connect": {
        "endpoints": ["tcp/)" + std::string(TARGET_IP) + ":" + std::to_string(TARGET_PORT) + R"("]
      }
    })";
    
    if (zc_config_from_str(&config, config_json.c_str()) < 0)
    {
        printf("Failed to configure Zenoh from string\n");
        return -1;
    }
    log("Publisher configured as peer, connecting to tcp/" + std::string(TARGET_IP) + ":" + std::to_string(TARGET_PORT) + "...");
#else
    log("Publisher using default peer configuration (multicast scouting)...");
#endif

    // --- Initialization ---
    if (z_open(&s, z_move(config), NULL) < 0) // 初始化会话
    {
        printf("Failed to open Zenoh session\n");
        return -1;
    }
    log("Zenoh session opened successfully.");
    
    z_view_keyexpr_from_str(&key_expr, "mec/supervisor/publish_fp");
    z_put_options_default(&options);
    // 使用 DROP 模式以获得更好的性能(非阻塞)
    options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
    // 创建共享内存池
    printf("Creating POSIX SHM Provider...\n");
    // We create a provider that can hold at least 2 messages in flight
    if (z_shm_provider_default_new(&provider, TOTAL_PAYLOAD_SIZE * 2) < 0) {
        printf("Failed to create SHM provider\n");
        z_drop(z_move(s));
        return -1;
    }
    // 测试
    auto start_time = std::chrono::high_resolution_clock::now();
    while (loop_count--)
    {
        // 从共享内存池分配缓冲区
        z_buf_layout_alloc_result_t alloc;
        z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), TOTAL_PAYLOAD_SIZE);

        if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
            // 获取可写指针并转换为 DataTable*
            uint8_t* raw_buf = z_shm_mut_data_mut(z_loan_mut(alloc.buf));
            DataTable* header = (DataTable*)raw_buf;
            // 填充结构体元数据
            header->data_len = TOTAL_PAYLOAD_SIZE;
            // 使用 system_clock 以确保跨设备时间戳可比较(Unix epoch: 1970-01-01)
            long long send_timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
            header->data.device_time.utc_offset = send_timestamp;
            header->image_data = NULL; // This pointer is part of the header but not used for data storage in this model
            // 获取图像数据写入地址并填充
            uint8_t* image_data_ptr = raw_buf + sizeof(DataTable);
            memset(image_data_ptr, loop_count, DATA_SIZE); // Example: fill with some data
            
            // 调试:验证写入的数据
            printf("[DEBUG PUB] Message %d: timestamp=%lld, data_len=%zu, sizeof(DataTable)=%zu, TOTAL_SIZE=%zu\n", 
                   loop_count, send_timestamp, header->data_len, sizeof(DataTable), TOTAL_PAYLOAD_SIZE);
            printf("[DEBUG PUB] First 16 bytes of header: ");
            for (int i = 0; i < 16 && i < (int)sizeof(DataTable); i++) {
                printf("%02x ", raw_buf[i]);
            }
            printf("\n");
            
            // 从共享内存创建 Zenoh payload (零拷贝)
            z_owned_bytes_t payload;
            z_bytes_from_shm_mut(&payload, z_move(alloc.buf));
            // 发布
            z_put(z_loan(s), z_loan(key_expr), z_move(payload), &options);
            total_bytes_sent_all_iterations += TOTAL_PAYLOAD_SIZE;
            printf("Sent message %d with timestamp %lld\n", loop_count, send_timestamp);
        } else {
            printf("Failed to allocate from SHM provider.\n");
            break;
        }
        usleep(100000); // 模拟发送间隔
    }
    auto end_time = std::chrono::high_resolution_clock::now();
    // 打印统计信息
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
    double duration_seconds = duration / 1000000.0;
    double megabytes = total_bytes_sent_all_iterations / (1024.0 * 1024.0);
    double throughput = (total_bytes_sent_all_iterations > 0 && duration_seconds > 0) ? megabytes / duration_seconds : 0.0;
    log("-------------------- Pub Results --------------------");
    log("Total data sent: " + std::to_string(megabytes) + " MB");
    log("Total time: " + std::to_string(duration_seconds) + " s");
    log("Throughput: " + std::to_string(throughput) + " MB/s");
    log("---------------------------------------------------");

    z_drop(z_move(provider));
    z_drop(z_move(s));
    return 0;
}

sub_shm_demo

#include <stdio.h>
#include "zenoh.h"
#include <chrono>
#include <cinttypes>
#include <unistd.h>
#include "struct_test.h"
#include <iostream>
#include <mutex>
#include <vector>
#include <numeric>
#include <algorithm>
#include <iomanip>
#include <string.h>
#include <string>

// 宏开关:1=指定IP监听, 0=使用默认配置(依赖multicast scouting)
#define USE_EXPLICIT_ENDPOINT 0

// 监听端口(当 USE_EXPLICIT_ENDPOINT=1 时使用)
#define LISTEN_PORT 7447

const int DATA_SIZE = 4096 * 2160 * 3;
const size_t TOTAL_PAYLOAD_SIZE = sizeof(DataTable) + DATA_SIZE;

// A struct to hold user data, including pre-allocated buffer and stats
struct UserData {
    std::mutex mtx;
    uint8_t* buffer = nullptr; // Pre-allocated buffer
    long long message_count = 0;
    double total_latency = 0.0;
    long long min_latency = -1;
    long long max_latency = -1;
    uint64_t total_bytes = 0;
};
void log(const std::string &message)
{
    auto now = std::chrono::system_clock::now();
    auto in_time_t = std::chrono::system_clock::to_time_t(now);
    std::cout << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X")
              << " [SUB] " << message << std::endl;
}

void data_handler(z_loaned_sample_t *sample, void *arg)
{
    // 使用 system_clock 以确保跨设备时间戳可比较(Unix epoch: 1970-01-01)
    long long recv_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
    UserData *user_data = (UserData *)arg;
    //获取zenoh内存字节地址和长度
    const z_loaned_bytes_t *payload = z_sample_payload(sample);
    size_t payload_len = z_bytes_len(payload);

    if (payload_len > TOTAL_PAYLOAD_SIZE) {
        fprintf(stderr, "Received a message larger than pre-allocated buffer: %zu\n", payload_len);
        return;
    }

    // ========== 零拷贝逻辑:检测并直接访问共享内存 ==========
    // 尝试检测是否是 SHM 类型(单设备内)
    const z_loaned_shm_t *shm = NULL;
    const DataTable *header = nullptr;
    bool is_zero_copy = false;
    z_owned_shm_t owned_shm_for_async = {}; // 用于异步管理的 owned SHM(初始化为空)
    
    if (z_bytes_as_loaned_shm(payload, &shm) == Z_OK) {
        // 零拷贝:直接访问 SHM 内存(单设备内通信)
        const unsigned char *shm_data = z_shm_data(shm);
        header = (const DataTable *)shm_data;
        is_zero_copy = true;
        printf("[DEBUG SUB] Zero-copy: Direct SHM access, payload_len=%zu\n", payload_len);
        
        // ========== 异步管理共享内存的方法 ==========
        // 方法1:使用 z_shm_clone 克隆 SHM 引用(浅拷贝,增加引用计数)
        // 这样可以在回调外使用,处理完成后需要调用 z_shm_drop 归还引用
        // 
        // z_shm_clone(&owned_shm_for_async, shm);
        // 
        // 然后将 owned_shm_for_async 放入异步处理队列:
        //   - 可以使用 std::queue<z_owned_shm_t> 或类似结构
        //   - 在异步线程中处理数据:const unsigned char *data = z_shm_data(z_loan(owned_shm_for_async));
        //   - 处理完成后必须调用:z_shm_drop(z_move(owned_shm_for_async));  // 归还引用,减少引用计数
        //
        // 方法2:使用 z_bytes_to_owned_shm 直接从 payload 获取 owned SHM
        // 
        // if (z_bytes_to_owned_shm(payload, &owned_shm_for_async) == Z_OK) {
        //     // 将 owned_shm_for_async 放入异步队列
        //     // 异步处理完成后:z_shm_drop(z_move(owned_shm_for_async));
        // }
        //
        // ========== 重要说明:引用计数和内存归还 ==========
        // 1. z_loaned_shm_t *shm 只能在回调函数内使用,回调返回后失效
        // 2. 要在回调外使用,必须获取 owned 版本(z_owned_shm_t)
        // 3. owned 版本使用引用计数管理,多个引用共享同一块内存
        // 4. 每个 owned 引用处理完成后必须调用 z_shm_drop 归还
        // 5. 当所有引用都归还后(引用计数为0),Zenoh 才会真正释放共享内存回池中
        //
        // ⚠️ 关键问题:异步管理不及时归还的影响
        // - Publisher 使用 z_shm_provider_alloc_gc_defrag_blocking 分配内存
        // - 如果 Subscriber 异步处理不及时归还(z_shm_drop),内存引用计数不为0
        // - 当所有内存都被 Subscriber 持有引用时,Publisher 的分配会阻塞等待
        // - 即使 GC 和 defrag 也无法释放内存,因为引用计数不为0
        // - 这会导致 Publisher 无法继续发布新消息,形成死锁!
        //
        // 💡 解决方案:
        // 1. 限制异步队列大小,避免积压过多未处理的消息
        // 2. 设置超时机制,超时未处理的消息强制归还(z_shm_drop)
        // 3. 增加 Publisher 的 SHM provider 容量(TOTAL_PAYLOAD_SIZE * N,N 为并发消息数)
        // 4. 监控异步处理速度,确保处理速度 >= 发布速度
        // 5. 如果处理速度跟不上,考虑:
        //    - 降低发布频率
        //    - 增加异步处理线程数
        //    - 使用非阻塞模式(Z_CONGESTION_CONTROL_DROP)丢弃旧消息
        // ============================================
        
    } else {
        // 非 SHM(跨设备或普通数据),需要拷贝到用户缓冲区
        struct z_bytes_reader_t reader = z_bytes_get_reader(payload);
        z_bytes_reader_read(&reader, user_data->buffer, payload_len);
        header = (const DataTable *)user_data->buffer;
        printf("[DEBUG SUB] Copy: Network or non-SHM data, payload_len=%zu\n", payload_len); 
    }

    // 调试:验证接收的数据
    printf("[DEBUG SUB] Received: payload_len=%zu, expected=%zu, zero_copy=%s\n", 
           payload_len, TOTAL_PAYLOAD_SIZE, is_zero_copy ? "YES" : "NO");
    printf("[DEBUG SUB] First 16 bytes of received data: ");
    for (size_t i = 0; i < 16 && i < payload_len; i++) {
        printf("%02x ", is_zero_copy ? 
               ((const unsigned char*)header)[i] : user_data->buffer[i]);
    }
    printf("\n");

    //数据解析 或者插入缓冲区
    // 1. Direct Access (zero-copy from SHM or copied buffer)
    long long send_time = header->data.device_time.utc_offset;
    long long latency = recv_time - send_time;
    
    // Update stats under a lock
    {
        std::lock_guard<std::mutex> lock(user_data->mtx);
        user_data->message_count++;
        user_data->total_latency += latency;
        user_data->total_bytes += payload_len;
        
        // 调试:打印时间戳信息(在锁内,确保message_count已更新)
        printf("[DEBUG SUB] Message #%lld: send_time=%lld, recv_time=%lld, latency=%lld ms\n", 
               user_data->message_count, send_time, recv_time, latency);
        printf("[DEBUG SUB] Header data_len=%d, utc_offset=%lld\n", 
               header->data_len, header->data.device_time.utc_offset);
        
        if (user_data->min_latency == -1 || latency < user_data->min_latency) {
            user_data->min_latency = latency;
        }
        if (latency > user_data->max_latency) {
            user_data->max_latency = latency;
        }
    }
     z_view_string_t key_string;
    z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_string);
    // printf(">> Received (%.*s, %.*s)\n",
        //     (int)z_string_len(z_loan(key_string)), z_string_data(z_loan(key_string)),
        //     (int)z_string_len(z_loan(payload_string)), z_string_data(z_loan(payload_string))
        // );
}

int main(int argc, char **argv)
{
    z_owned_config_t config;
    z_owned_session_t s;
    z_owned_closure_sample_t callback;
    z_view_keyexpr_t key_expr;
    z_owned_subscriber_t sub;
    char c = 0;
    // Create and pre-allocate user data struct
    UserData user_data;
    user_data.buffer = new (std::nothrow) uint8_t[TOTAL_PAYLOAD_SIZE];
    if (!user_data.buffer) {
        printf("Failed to allocate memory for the reception buffer.\n");
        return -1;
    }
    // --- Configuration ---
    z_config_default(&config);
    
#if USE_EXPLICIT_ENDPOINT
    // 配置 peer 模式,指定 listen endpoint
    std::string config_json = R"({
      "mode": "peer",
      "listen": {
        "endpoints": ["tcp/0.0.0.0:)" + std::to_string(LISTEN_PORT) + R"("]
      }
    })";
    
    if (zc_config_from_str(&config, config_json.c_str()) < 0) {
        printf("Failed to configure Zenoh from string\n");
        delete[] user_data.buffer;
        return -1;
    }
    log("Subscriber configured as peer, listening on tcp/0.0.0.0:" + std::to_string(LISTEN_PORT) + "...");
#else
    log("Subscriber using default peer configuration (multicast scouting)...");
#endif

    z_view_keyexpr_from_str(&key_expr, "mec/supervisor/publish_fp");
    z_closure(&callback, data_handler, NULL, &user_data);
    
    // --- Initialization ---
    if (z_open(&s, z_move(config), NULL) < 0)
    {
        printf("Failed to open Zenoh session.\n");
        delete[] user_data.buffer;
        goto end;
    }
    log("Zenoh session opened successfully.");
    if (z_declare_subscriber(z_loan(s), &sub, z_loan(key_expr), z_move(callback), NULL) < 0)
    {
        printf("Unable to create Zenoh subscriber.\n");
        goto end;
    }
    log("Subscriber declared for key 'mec/supervisor/publish_fp'. Waiting for data...");
    
    printf("\nPress 'q' then 'Enter' to quit...\n\n");
    while (c != 'q')
    {
        c = fgetc(stdin);
    }
    // Print final statistics
    {
        std::lock_guard<std::mutex> lock(user_data.mtx);
        log("-------------------- Sub Results --------------------");
        if (user_data.message_count > 0) {
            double avg_latency = user_data.total_latency / user_data.message_count;
            double megabytes = user_data.total_bytes / (1024.0 * 1024.0);
            std::cout << "Total messages received: " << user_data.message_count << std::endl;
            std::cout << "Total data received: " << megabytes << " MB" << std::endl;
            std::cout << "Average latency: " << avg_latency << " ms" << std::endl;
            std::cout << "Minimum latency: " << user_data.min_latency << " ms" << std::endl;
            std::cout << "Maximum latency: " << user_data.max_latency << " ms" << std::endl;
            // 修复 FPS 计算:FPS = 消息数 / 总时间(秒),而不是 消息数 / 平均延迟
            // 如果平均延迟为正数,可以使用:FPS = 1000.0 / avg_latency(每消息的FPS)
            // 或者:FPS = message_count / (total_latency / 1000.0)(整体FPS)
            if (avg_latency > 0) {
                double fps_per_message = 1000.0 / avg_latency;
                std::cout << "FPS (per message): " << fps_per_message << std::endl;
            } else {
                std::cout << "FPS: N/A (invalid latency)" << std::endl;
            }
        } else {
            std::cout << "No messages received." << std::endl;
        }
        log("---------------------------------------------------");
    }
end:
    z_drop(z_move(sub));
    z_drop(z_move(s));
    delete[] user_data.buffer; // IMPORTANT: free the pre-allocated buffer
    return 0;
}

System info

-platform:debian 11
--zenohc 1.7.1
--aarch:aarch64

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions