Skip to content

Testing jbpf_io_channel_share_data_ptr in single thread and multithreads #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
205 changes: 205 additions & 0 deletions jbpf_tests/concurrency/mem/mem_jbpf_io_channel_share_data_ptr_tests.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
/*
Tests the jbpf_io_channel_share_data_ptr functionality in a multithreaded environment.
The test creates a channel with a capacity of 255 elements and spawns the following threads:
- 1 producer thread that reserves and submits a buffer to the channel
- 5 consumer threads that consume the elements from the channel, share the data pointer with the worker threads, and
release the buffer (but the buffer should still be accessible via the shared pointer)
- 5 worker threads that process the elements consumed by the consumer threads

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the ultimate goal of the test? My understanding is that the test checks if by the end of the execution of the worker and consumer threads, the buffer has still not been released.

If so, it should also check if the release works as expected (i.e., once all threads using it release the buffer, it should go back to the pool).

The test verifies that the jbpf_io_channel_share_data_ptr functionality works correctly in a multithreaded
environment.
*/

#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <fcntl.h>
#include <pthread.h>
#include <semaphore.h>

#include "jbpf_io.h"
#include "jbpf_io_defs.h"
#include "jbpf_io_queue.h"
#include "jbpf_io_channel.h"
#include "jbpf_io_utils.h"

#define NUM_THREADS 5
#define CHANNEL_NUMBER_OF_ELEMENTS 100
#define CHANNEL_CAPACITY 255

struct test_struct
{
uint32_t id;
uint32_t value;
};

struct jbpf_io_stream_id local_stream_id = {
.id = {0xF5, 0xFF, 0XFF, 0XFF, 0xFF, 0xFF, 0XFF, 0XFF, 0xFF, 0xFF, 0XFF, 0XFF, 0xFF, 0xFF, 0XFF, 0XFF}};

sem_t producer_sem;
sem_t consumer_sem[NUM_THREADS];
sem_t worker_start_sem[NUM_THREADS];
sem_t worker_done_sem[NUM_THREADS];
void* data_pointers[NUM_THREADS];

jbpf_io_channel_t* local_channel;

struct worker_thread_args
{
int thread_index;
};

void*
producer_thread_func(void* arg)
{
jbpf_io_register_thread();
for (uint32_t i = 0; i < CHANNEL_CAPACITY; i++) {
// Wait until there is space in the channel
sem_wait(&producer_sem);

struct test_struct* buffer = jbpf_io_channel_reserve_buf(local_channel);
assert(buffer);
buffer->id = i;
buffer->value = i * 10;

assert(jbpf_io_channel_submit_buf(local_channel) == 0);

// Signal the consumer thread
sem_post(&consumer_sem[i % NUM_THREADS]);
}
return NULL;
}

void*
consumer_thread_func(void* arg)
{
jbpf_io_register_thread();
for (uint32_t i = 0; i < CHANNEL_CAPACITY; i++) {
if (i % NUM_THREADS != ((struct worker_thread_args*)arg)->thread_index) {
continue;
}
sem_wait(&consumer_sem[i % NUM_THREADS]);

void* recv_ptr[10];
int num_received = jbpf_io_channel_recv_data(local_channel, recv_ptr, 1);
assert(num_received == 1);

data_pointers[i % NUM_THREADS] = (struct test_struct*)recv_ptr[0];

// Test jbpf_io_channel_share_data_ptr functionality
void* shared_ptr = jbpf_io_channel_share_data_ptr(data_pointers[i % NUM_THREADS]);
assert(shared_ptr == data_pointers[i % NUM_THREADS]);

// Signal the associated worker to start processing
sem_post(&worker_start_sem[i % NUM_THREADS]);

// Release the buffer, but it should still be accessible via shared_ptr
jbpf_io_channel_release_buf(data_pointers[i % NUM_THREADS]);

// Wait for the worker to complete processing
sem_wait(&worker_done_sem[i % NUM_THREADS]);
}
return NULL;
}

void*
worker_thread_func(void* arg)
{
jbpf_io_register_thread();
for (int i = 0; i < CHANNEL_CAPACITY; i++) {
if (i % NUM_THREADS != ((struct worker_thread_args*)arg)->thread_index) {
continue;
}
sem_wait(&worker_start_sem[i % NUM_THREADS]);

// Process shared buffer
struct test_struct* buffer = data_pointers[i % NUM_THREADS];
assert(buffer);
uint32_t id = buffer->id;
uint32_t value = buffer->value;
assert(id * 10 == value);
// printf("Worker %d processed buffer with id %d and value %d\n", i % NUM_THREADS, id, value);

// Signal the consumer thread that processing is done
sem_post(&worker_done_sem[i % NUM_THREADS]);
}
return NULL;
}

int
main(int argc, char* argv[])
{
// printf("Starting jbpf_io_channel_share_data_ptr multithreading tests ...\n");

struct jbpf_io_config io_config = {0};
struct jbpf_io_ctx* io_ctx;

strncpy(io_config.ipc_config.addr.jbpf_io_ipc_name, "test", JBPF_IO_IPC_MAX_NAMELEN);
io_ctx = jbpf_io_init(&io_config);
assert(io_ctx);

jbpf_io_register_thread();

// Create a channel with a capacity of 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not in line with the actual test and description. This should be 255.

local_channel = jbpf_io_create_channel(
io_ctx,
JBPF_IO_CHANNEL_OUTPUT,
JBPF_IO_CHANNEL_QUEUE,
CHANNEL_NUMBER_OF_ELEMENTS,
sizeof(struct test_struct),
local_stream_id,
NULL,
0);

assert(local_channel);

pthread_t producer_thread, consumer_threads[NUM_THREADS], worker_threads[NUM_THREADS];

sem_init(&producer_sem, 0, CHANNEL_CAPACITY);
for (int i = 0; i < NUM_THREADS; i++) {
sem_init(&consumer_sem[i], 0, 0);
sem_init(&worker_start_sem[i], 0, 0);
sem_init(&worker_done_sem[i], 0, 0);
}

// Start producer thread
assert(pthread_create(&producer_thread, NULL, producer_thread_func, NULL) == 0);

// Start consumer and worker threads
struct worker_thread_args worker_args[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
worker_args[i].thread_index = i;

assert(pthread_create(&consumer_threads[i], NULL, consumer_thread_func, &worker_args[i]) == 0);
assert(pthread_create(&worker_threads[i], NULL, worker_thread_func, &worker_args[i]) == 0);
}

// Wait for producer to finish
assert(pthread_join(producer_thread, NULL) == 0);

// Wait for consumer and worker threads to finish
for (int i = 0; i < NUM_THREADS; i++) {
assert(pthread_join(consumer_threads[i], NULL) == 0);
assert(pthread_join(worker_threads[i], NULL) == 0);
}

// we can't allocate any more because the channel is full
assert(jbpf_io_channel_reserve_buf(local_channel) == NULL);

// Cleanup
sem_destroy(&producer_sem);
for (int i = 0; i < NUM_THREADS; i++) {
sem_destroy(&consumer_sem[i]);
sem_destroy(&worker_start_sem[i]);
sem_destroy(&worker_done_sem[i]);
}

jbpf_io_destroy_channel(io_ctx, local_channel);
jbpf_io_stop();

return 0;
}
89 changes: 89 additions & 0 deletions jbpf_tests/functional/io/jbpf_io_channel_share_data_ptr_tests.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
/*
This test tests the single thead version of the jbpf_io_channel_share_data_ptr() function.
This test creates a channel: The size of the mempool is 255 and the capacity is 255.
It then reserves a buffer, share it (to increase the reference count), and then release it.
It then submits the buffer to the channel and repeats the process until the mempool is full.
It then destroys the channel and checks if it is gone.
The counter is used to check if the correct number of buffers were reserved.
*/

#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <fcntl.h>

#include "jbpf_io.h"
#include "jbpf_io_defs.h"
#include "jbpf_io_queue.h"
#include "jbpf_io_channel.h"
#include "jbpf_io_utils.h"

struct test_struct
{
uint32_t counter_a;
uint32_t counter_b;
};

struct jbpf_io_stream_id local_stream_id = {
.id = {0xF5, 0xFF, 0XFF, 0XFF, 0xFF, 0xFF, 0XFF, 0XFF, 0xFF, 0xFF, 0XFF, 0XFF, 0xFF, 0xFF, 0XFF, 0XFF}};

int
main(int argc, char* argv[])
{
struct jbpf_io_config io_config = {0};
struct jbpf_io_ctx* io_ctx;

strncpy(io_config.ipc_config.addr.jbpf_io_ipc_name, "test", JBPF_IO_IPC_MAX_NAMELEN);
io_ctx = jbpf_io_init(&io_config);

assert(io_ctx);

jbpf_io_register_thread();

jbpf_io_channel_t* local_channel;
struct test_struct* local_data;

jbpf_io_register_thread();

// Create an output channel
local_channel = jbpf_io_create_channel(
io_ctx,
JBPF_IO_CHANNEL_OUTPUT,
JBPF_IO_CHANNEL_QUEUE,
200,
sizeof(struct test_struct),
local_stream_id,
NULL,
0);

assert(local_channel);

int i = 0;
while (true) {
local_data = jbpf_io_channel_reserve_buf(local_channel);
if (!local_data) {
break;
}
void* p = jbpf_io_channel_share_data_ptr(local_data);
assert(p == local_data);
jbpf_io_channel_release_buf(local_data);
assert(jbpf_io_channel_submit_buf(local_channel) == 0);
i++;
}
assert(i == 255);

jbpf_io_destroy_channel(io_ctx, local_channel);
assert(!jbpf_io_find_channel(io_ctx, local_stream_id, true));

// Local channel should be gone by now
assert(!jbpf_io_find_channel(io_ctx, local_stream_id, true));

// cleanup
jbpf_io_stop();

return 0;
}