Skip to content

Commit

Permalink
added z_bytes_writer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Jun 27, 2024
1 parent baf0d00 commit f36e2e4
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 58 deletions.
10 changes: 10 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ TODO: owned type description
Represents an array of non null-terminated string.

.. c:type:: z_owned_bytes_writer_t
Represents a writer for serialized data.

Loaned Types
~~~~~~~~~~~

Expand Down Expand Up @@ -205,6 +209,10 @@ TODO: loaned type description
Represents an array of non null-terminated string.

.. c:type:: z_loaned_bytes_writer_t
Represents a writer for serialized data.

View Types
~~~~~~~~~~~

Expand Down Expand Up @@ -332,6 +340,8 @@ Primitives
.. autocfunction:: primitives.h::z_bytes_reader_read
.. autocfunction:: primitives.h::z_bytes_reader_seek
.. autocfunction:: primitives.h::z_bytes_reader_tell
.. autocfunction:: primitives.h::z_bytes_get_writer
.. autocfunction:: primitives.h::z_bytes_writer_write
.. autocfunction:: primitives.h::z_timestamp_check
.. autocfunction:: primitives.h::z_query_target_default
.. autocfunction:: primitives.h::z_query_consolidation_auto
Expand Down
26 changes: 26 additions & 0 deletions include/zenoh-pico/api/olv_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,28 @@
} \
}

#define _Z_OWNED_FUNCTIONS_PTR_TRIVIAL_IMPL(type, name) \
_Bool z_##name##_check(const z_owned_##name##_t *obj) { return obj->_val != NULL; } \
const z_loaned_##name##_t *z_##name##_loan(const z_owned_##name##_t *obj) { return obj->_val; } \
z_loaned_##name##_t *z_##name##_loan_mut(z_owned_##name##_t *obj) { return obj->_val; } \
void z_##name##_null(z_owned_##name##_t *obj) { obj->_val = NULL; } \
z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *obj) { return obj; } \
int8_t z_##name##_clone(z_owned_##name##_t *obj, const z_loaned_##name##_t *src) { \
int8_t ret = _Z_RES_OK; \
obj->_val = (type *)z_malloc(sizeof(type)); \
if (obj->_val != NULL) { \
*obj->_val = *src; \
} else { \
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; \
} \
return ret; \
} \
void z_##name##_drop(z_owned_##name##_t *obj) { \
if ((obj != NULL) && (obj->_val != NULL)) { \
z_free(&obj->_val); \
} \
}

#define _Z_OWNED_FUNCTIONS_RC_IMPL(name) \
_Bool z_##name##_check(const z_owned_##name##_t *val) { return val->_rc.in != NULL; } \
const z_loaned_##name##_t *z_##name##_loan(const z_owned_##name##_t *val) { return &val->_rc; } \
Expand Down Expand Up @@ -132,7 +154,11 @@
// Gets internal value from refcounted type (e.g. z_loaned_session_t, z_query_t)
#define _Z_RC_IN_VAL(arg) ((arg)->in->val)

// Checks if refcounted type is initialized
#define _Z_RC_IS_NULL(arg) ((arg)->in == NULL)

// Gets internal value from refcounted owned type (e.g. z_owned_session_t, z_owned_query_t)
#define _Z_OWNED_RC_IN_VAL(arg) ((arg)->_rc.in->val)


#endif /* INCLUDE_ZENOH_PICO_API_OLV_MACROS_H */
24 changes: 18 additions & 6 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -919,17 +919,29 @@ int8_t z_bytes_reader_seek(z_bytes_reader_t *reader, int64_t offset, int origin)
int64_t z_bytes_reader_tell(z_bytes_reader_t *reader);

/**
* Constructs :c:type:`z_owned_bytes_t` object corresponding to the next element of serialized data.
* Constructs writer for :c:type:`z_loaned_bytes_t`.
*
* Will construct null-state `z_owned_bytes_t` when iterator reaches the end (or in case of error).
* Parameters:
* bytes: Data container to write to.
* writer: Uninitialized memory location where writer is to be constructed.
*
* Return:
* ``0`` if encode successful, ``negative value`` otherwise.
*/
int8_t z_bytes_get_writer(z_loaned_bytes_t *bytes, z_owned_bytes_writer_t* writer);

/**
* Writes `len` bytes from `src` into underlying :c:type:`z_loaned_bytes_t.
*
* Parameters:
* iter: An iterator over multi-element serialized data.
* out: An uninitialized :c:type:`z_owned_bytes_t` that will contained next serialized element.
* writer: A data writer
* src: Buffer to write from.
* len: Number of bytes to write.
*
* Return:
* ``false`` when iterator reaches the end, ``true`` otherwise.
* ``0`` if encode successful, ``negative value`` otherwise.
*/
_Bool z_bytes_iterator_next(z_bytes_iterator_t *iter, z_owned_bytes_t *out);
int8_t z_bytes_writer_write(z_loaned_bytes_writer_t *writer, const uint8_t *src, size_t len);

/**
* Checks validity of a timestamp
Expand Down
6 changes: 6 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ _Z_LOANED_TYPE(_z_slice_t, slice)
_Z_OWNED_TYPE_PTR(_z_bytes_t, bytes)
_Z_LOANED_TYPE(_z_bytes_t, bytes)

/**
* Represents a writer for serialized data.
*/
_Z_OWNED_TYPE_PTR(_z_bytes_writer_t, bytes_writer)
_Z_LOANED_TYPE(_z_bytes_writer_t, bytes_writer)

/**
* An iterator over multi-element serialized data.
*/
Expand Down
22 changes: 19 additions & 3 deletions include/zenoh-pico/collections/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ typedef struct {

_Bool _z_bytes_check(const _z_bytes_t *bytes);
_z_bytes_t _z_bytes_null(void);
int8_t _z_bytes_append(_z_bytes_t *dst, _z_bytes_t *src);
_Bool _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s);
int8_t _z_bytes_append_bytes(_z_bytes_t *dst, _z_bytes_t *src);
int8_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s);
int8_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src);
_z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src);
void _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src);
Expand Down Expand Up @@ -90,7 +90,6 @@ int8_t _z_bytes_reader_seek(_z_bytes_reader_t *reader, int64_t offset, int origi
int64_t _z_bytes_reader_tell(const _z_bytes_reader_t *reader);
int8_t _z_bytes_reader_read_slices(_z_bytes_reader_t *reader, size_t len, _z_bytes_t *out);
int8_t _z_bytes_reader_read(_z_bytes_reader_t *reader, uint8_t *buf, size_t len);
int8_t _z_bytes_reader_read_next(_z_bytes_reader_t *reader, _z_bytes_t *out);

typedef struct {
_z_bytes_reader_t _reader;
Expand All @@ -99,4 +98,21 @@ typedef struct {
_z_bytes_iterator_t _z_bytes_get_iterator(const _z_bytes_t *bytes);
_Bool _z_bytes_iterator_next(_z_bytes_iterator_t *iter, _z_bytes_t *b);


typedef struct {
uint8_t* cache;
size_t cache_size;
_z_bytes_t *bytes;
} _z_bytes_writer_t;

_z_bytes_writer_t _z_bytes_get_writer(_z_bytes_t *bytes, size_t cache_size);
int8_t _z_bytes_writer_write(_z_bytes_writer_t *writer, const uint8_t *src, size_t len);
int8_t _z_bytes_writer_ensure_cache(_z_bytes_writer_t *writer);

typedef struct {
_z_bytes_writer_t writer;
} _z_bytes_iterator_writer_t;

_z_bytes_iterator_writer_t _z_bytes_get_iterator_writer(_z_bytes_t *bytes);
int8_t _z_bytes_iterator_writer_write(_z_bytes_iterator_writer_t *writer, _z_bytes_t *bytes);
#endif /* ZENOH_PICO_COLLECTIONS_BYTES_H */
16 changes: 15 additions & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@ int8_t z_bytes_serialize_from_iter(z_owned_bytes_t *bytes, _Bool (*iterator_body
// Init owned bytes
_Z_RETURN_IF_ERR(z_bytes_empty(bytes));
z_owned_bytes_t data;
_z_bytes_iterator_writer_t iter_writer = _z_bytes_get_iterator_writer(bytes->_val);
while (iterator_body(&data, context)) {
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_append(bytes->_val, data._val), z_bytes_drop(bytes));
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_iterator_writer_write(&iter_writer, data._val), z_bytes_drop(bytes));
}
return _Z_RES_OK;
}
Expand Down Expand Up @@ -492,6 +493,17 @@ _Bool z_bytes_iterator_next(z_bytes_iterator_t *iter, z_owned_bytes_t *bytes) {
return true;
}

int8_t z_bytes_get_writer(z_loaned_bytes_t *bytes, z_owned_bytes_writer_t* writer) {
writer->_val = (z_loaned_bytes_writer_t *)z_malloc(sizeof(z_loaned_bytes_writer_t));
if (writer->_val == NULL) return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
*writer->_val = _z_bytes_get_writer(bytes, Z_IOSLICE_SIZE);
return _Z_RES_OK;
}

int8_t z_bytes_writer_write(z_loaned_bytes_writer_t *writer, const uint8_t *src, size_t len) {
return _z_bytes_writer_write(writer, src, len);
}

_Bool z_timestamp_check(z_timestamp_t ts) { return _z_timestamp_check(&ts); }

z_query_target_t z_query_target_default(void) { return Z_QUERY_TARGET_DEFAULT; }
Expand Down Expand Up @@ -593,6 +605,8 @@ _Z_OWNED_FUNCTIONS_PTR_IMPL(_z_string_vec_t, string_array, _z_owner_noop_copy, _
_Z_VIEW_FUNCTIONS_PTR_IMPL(_z_string_vec_t, string_array)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_slice_t, slice, _z_slice_copy, _z_slice_free)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_bytes_t, bytes, _z_bytes_copy, _z_bytes_free)
_Z_OWNED_FUNCTIONS_PTR_TRIVIAL_IMPL(_z_bytes_writer_t, bytes_writer)


#if Z_FEATURE_PUBLICATION == 1 || Z_FEATURE_QUERYABLE == 1 || Z_FEATURE_QUERY == 1
// Convert a user owned bytes payload to an internal bytes payload, returning an empty one if value invalid
Expand Down
148 changes: 100 additions & 48 deletions src/collections/bytes.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/system/platform.h"
#include "zenoh-pico/utils/endianness.h"
#include "zenoh-pico/utils/result.h"
#include "zenoh-pico/api/olv_macros.h"

/*-------- Bytes --------*/
_Bool _z_bytes_check(const _z_bytes_t *bytes) { return !_z_bytes_is_empty(bytes); }
Expand Down Expand Up @@ -132,13 +133,19 @@ int8_t _z_bytes_to_slice(const _z_bytes_t *bytes, _z_slice_t *s) {
return _Z_RES_OK;
}

_Bool _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s) { return _z_arc_slice_svec_append(&dst->_slices, s); }
int8_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s) {
_Z_CLEAN_RETURN_IF_ERR(
_z_arc_slice_svec_append(&dst->_slices, s) ? _Z_RES_OK : _Z_ERR_SYSTEM_OUT_OF_MEMORY,
_z_arc_slice_drop(s)
);
return _Z_RES_OK;
}

int8_t _z_bytes_append_inner(_z_bytes_t *dst, _z_bytes_t *src) {
int8_t _z_bytes_append_bytes(_z_bytes_t *dst, _z_bytes_t *src) {
_Bool success = true;
for (size_t i = 0; i < _z_bytes_num_slices(src); ++i) {
_z_arc_slice_t *s = _z_bytes_get_slice(src, i);
success = success && _z_bytes_append_slice(dst, s);
success = success && _z_bytes_append_slice(dst, s) == _Z_RES_OK;
}
if (!success) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
Expand All @@ -148,48 +155,20 @@ int8_t _z_bytes_append_inner(_z_bytes_t *dst, _z_bytes_t *src) {
return _Z_RES_OK;
}

int8_t _z_bytes_append(_z_bytes_t *dst, _z_bytes_t *src) {
uint8_t l_buf[16];
size_t l_len = _z_zsize_encode_buf(l_buf, _z_bytes_len(src));
_z_slice_t s = _z_slice_wrap_copy(l_buf, l_len);
if (!_z_slice_check(s)) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
_z_arc_slice_t arc_s = _z_arc_slice_wrap(s, 0, l_len);
_z_bytes_append_slice(dst, &arc_s);

if (dst->_slices._val == NULL) {
_z_arc_slice_drop(&arc_s);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
int8_t res = _z_bytes_append_inner(dst, src);
if (res != _Z_RES_OK) {
return res;
}
return _Z_RES_OK;
}

int8_t _z_bytes_serialize_from_pair(_z_bytes_t *out, _z_bytes_t *first, _z_bytes_t *second) {
int8_t res = _z_bytes_append(out, first);
if (res != _Z_RES_OK) {
_z_bytes_drop(out);
_z_bytes_drop(first);
return res;
}
res = _z_bytes_append(out, second);
if (res != _Z_RES_OK) {
_z_bytes_drop(out);
_z_bytes_drop(second);
}
*out = _z_bytes_null();
_z_bytes_iterator_writer_t writer = _z_bytes_get_iterator_writer(out);
_Z_RETURN_IF_ERR(_z_bytes_iterator_writer_write(&writer, first));
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_iterator_writer_write(&writer, second), _z_bytes_drop(out));

return res;
return _Z_RES_OK;
}

int8_t _z_bytes_deserialize_into_pair(const _z_bytes_t *bs, _z_bytes_t *first_out, _z_bytes_t *second_out) {
_z_bytes_reader_t reader = _z_bytes_get_reader(bs);
int8_t res = _z_bytes_reader_read_next(&reader, first_out);
_z_bytes_iterator_t iter = _z_bytes_get_iterator(bs);
int8_t res = _z_bytes_iterator_next(&iter, first_out);
if (res != _Z_RES_OK) return res;
res = _z_bytes_reader_read_next(&reader, second_out);
res = _z_bytes_iterator_next(&iter, second_out);
if (res != _Z_RES_OK) {
_z_bytes_drop(first_out);
};
Expand Down Expand Up @@ -419,7 +398,8 @@ int8_t _z_bytes_reader_read_slices(_z_bytes_reader_t *reader, size_t len, _z_byt
res = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
break;
}
res = _z_bytes_append_slice(out, &ss) ? _Z_RES_OK : _Z_ERR_SYSTEM_OUT_OF_MEMORY;

res = _z_bytes_append_slice(out, &ss);
if (res != _Z_RES_OK) {
_z_arc_slice_drop(&ss);
break;
Expand All @@ -433,20 +413,92 @@ int8_t _z_bytes_reader_read_slices(_z_bytes_reader_t *reader, size_t len, _z_byt
return res;
}

int8_t _z_bytes_reader_read_next(_z_bytes_reader_t *reader, _z_bytes_t *out) {
*out = _z_bytes_null();
_z_bytes_iterator_t _z_bytes_get_iterator(const _z_bytes_t *bytes) {
return (_z_bytes_iterator_t){._reader = _z_bytes_get_reader(bytes)};
}

_Bool _z_bytes_iterator_next(_z_bytes_iterator_t *iter, _z_bytes_t *b) {
*b = _z_bytes_null();
_z_zint_t len;
if (_z_bytes_reader_read_zint(reader, &len) != _Z_RES_OK) {
if (_z_bytes_reader_read_zint(&iter->_reader, &len) != _Z_RES_OK) {
return _Z_ERR_DID_NOT_READ;
}

return _z_bytes_reader_read_slices(reader, len, out);
return _z_bytes_reader_read_slices(&iter->_reader, len, b);
}

_z_bytes_iterator_t _z_bytes_get_iterator(const _z_bytes_t *bytes) {
return (_z_bytes_iterator_t){._reader = _z_bytes_get_reader(bytes)};
_z_bytes_writer_t _z_bytes_get_writer(_z_bytes_t *bytes, size_t cache_size) {
return (_z_bytes_writer_t) {
.cache = NULL,
.cache_size = cache_size,
.bytes = bytes
};
}

_Bool _z_bytes_iterator_next(_z_bytes_iterator_t *iter, _z_bytes_t *b) {
return _z_bytes_reader_read_next(&iter->_reader, b) == _Z_RES_OK;
int8_t _z_bytes_writer_ensure_cache(_z_bytes_writer_t *writer) {
// first we check if cache stayed untouched since previous write operation
if (writer->cache != NULL) {
_z_arc_slice_t *arc_s = _z_bytes_get_slice(writer->bytes, _z_bytes_num_slices(writer->bytes) - 1);
if (_Z_RC_IN_VAL(&arc_s->slice).start + arc_s->len == writer->cache) {
size_t remaining_in_cache = _Z_RC_IN_VAL(&arc_s->slice).len - arc_s->len;
if (remaining_in_cache > 0) return _Z_RES_OK;
}
}
// otherwise we allocate a new cache
assert(writer->cache_size > 0);
_z_slice_t s = _z_slice_make(writer->cache_size);
if (s.start == NULL) return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_z_arc_slice_t cache = _z_arc_slice_wrap(s, 0, 0);
if (_Z_RC_IS_NULL(&cache.slice)) {
_z_slice_clear(&s);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}

_Z_CLEAN_RETURN_IF_ERR(
_z_bytes_append_slice(writer->bytes, &cache),
_z_arc_slice_drop(&cache)
);
writer->cache = (uint8_t*)_Z_RC_IN_VAL(&cache.slice).start;
return _Z_RES_OK;
}

int8_t _z_bytes_writer_write(_z_bytes_writer_t *writer, const uint8_t *src, size_t len) {
if (writer->cache_size == 0) { // no cache append data as a single slice
_z_slice_t s = _z_slice_wrap_copy(src, len);
if (s.len != len) return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_z_arc_slice_t arc_s = _z_arc_slice_wrap(s, 0, len);
if _Z_RC_IS_NULL(&arc_s.slice) {
_z_slice_clear(&s);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
return _z_bytes_append_slice(writer->bytes, &arc_s);
}

while (len > 0) {
_Z_RETURN_IF_ERR(_z_bytes_writer_ensure_cache(writer));
_z_arc_slice_t *arc_s = _z_bytes_get_slice(writer->bytes, _z_bytes_num_slices(writer->bytes) - 1);
size_t remaining_in_cache = _Z_RC_IN_VAL(&arc_s->slice).len - arc_s->len;
size_t to_copy = remaining_in_cache < len ? remaining_in_cache : len;
memcpy(writer->cache, src, to_copy);
len -= to_copy;
arc_s->len += to_copy;
src += to_copy;
writer->cache += to_copy;
}
return _Z_RES_OK;
}


_z_bytes_iterator_writer_t _z_bytes_get_iterator_writer(_z_bytes_t *bytes) {
return (_z_bytes_iterator_writer_t){
.writer = _z_bytes_get_writer(bytes, 0)
};
}
int8_t _z_bytes_iterator_writer_write(_z_bytes_iterator_writer_t *writer, _z_bytes_t *src) {
uint8_t l_buf[16];
size_t l_len = _z_zsize_encode_buf(l_buf, _z_bytes_len(src));
_Z_RETURN_IF_ERR(_z_bytes_writer_write(&writer->writer, l_buf, l_len));
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_append_bytes(writer->writer.bytes, src), _z_bytes_drop(src));

return _Z_RES_OK;
}
Loading

0 comments on commit f36e2e4

Please sign in to comment.