From 64153055ecfadef6c8d790051e8b8861a81225f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 7 Feb 2025 14:16:12 +0100 Subject: [PATCH] Remove APIs for separate init and connect of async cluster contexts (#165) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Embed a `valkeyClusterContext` in a created `valkeyClusterAsyncContext` to be able to cast between these types. This enables users to access the async context in an event callback without using `privdata`, and we can remove the APIs previously kept just for this scenario: valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options); int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc); int valkeyClusterAsyncSetEventCallback(valkeyClusterAsyncContext *acc, void(fn)(const valkeyClusterContext *cc, int event, void *privdata), Also removing the testcase based on `clusterclient_reconnect_async.c` since it was using the removed APIs. There is now a symmetry to how standalone contexts works, i.e. `valkeyAsyncContext` embeds the `valkeyContext`. Signed-off-by: Björn Svensson --- docs/cluster.md | 36 ++--- docs/migration-guide.md | 9 +- examples/cluster-clientside-caching-async.c | 20 +-- include/valkey/cluster.h | 20 +-- src/cluster.c | 170 ++++++++------------ tests/CMakeLists.txt | 6 - tests/clusterclient_async.c | 19 ++- tests/clusterclient_reconnect_async.c | 116 ------------- tests/ct_async.c | 17 +- tests/ct_connection.c | 8 +- tests/ct_out_of_memory_handling.c | 20 +-- tests/ct_specific_nodes.c | 10 +- tests/scripts/reconnect-test.sh | 64 -------- tests/ut_slotmap_update.c | 32 ++-- 14 files changed, 154 insertions(+), 393 deletions(-) delete mode 100644 tests/clusterclient_reconnect_async.c delete mode 100755 tests/scripts/reconnect-test.sh diff --git a/docs/cluster.md b/docs/cluster.md index a08ef367..b17dfc7f 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -193,8 +193,9 @@ The callback is called with `event` set to one of the following values: * `VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED` when the slot mapping has been updated; * `VALKEYCLUSTER_EVENT_READY` when the slot mapping has been fetched for the first time and the client is ready to accept commands, useful when initiating the - client with `valkeyClusterAsyncConnect` where a client is not immediately - ready after a successful call; + client using `valkeyClusterAsyncConnectWithOptions` without enabling the option + `VALKEY_OPT_BLOCKING_INITIAL_UPDATE` where a client is not immediately ready + after a successful call; * `VALKEYCLUSTER_EVENT_FREE_CONTEXT` when the cluster context is being freed, so that the user can free the event `privdata`. @@ -255,26 +256,6 @@ When enabled `valkeyClusterAsyncConnectWithOptions` will initially connect to th Any command sent by the user thereafter will create a new non-blocking connection, unless a non-blocking connection already exists to the destination. The function returns a pointer to a newly created `valkeyClusterAsyncContext` struct and its `err` field should be checked to make sure the initial slot map update was successful. -There is also a separate API to perform the context initiation and initial connect in separate steps. -This is useful when there is a need to provide an event callback with the current `valkeyClusterAsyncContext`. -The context is first initiated using `valkeyClusterAsyncContextInit` and then `valkeyClusterAsyncConnect` will initiate connection attempts. - -```c -valkeyClusterOptions options = { - .initial_nodes = "127.0.0.1:7000"; -}; -valkeyClusterOptionsUseLibev(&options, EV_DEFAULT); - -// Initiate the context. -valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); - -// Set the event callback using the context as privdata. -valkeyClusterAsyncSetEventCallback(acc, eventCallback, acc); - -// Start connecting to the initial nodes. -valkeyClusterAsyncConnect(acc) -``` - ### Connection options There is a variety of options you can specify using the `valkeyClusterOptions` struct when connecting to a cluster. @@ -339,9 +320,14 @@ After this, the disconnection callback is executed with the `VALKEY_OK` status a Use [`event_callback` in `valkeyClusterOptions`](#events-per-cluster-context) to get notified when certain events occur. -When the callback function requires the current `valkeyClusterAsyncContext` it can be provided in the `privdata`. -In this case initiate the context using `valkeyClusterAsyncContextInit`, set the callback and `privdata` using `valkeyClusterAsyncSetEventCallback`, -and initiate connection attempts using `valkeyClusterAsyncConnect` as described under the [Connecting](#connecting-1) section. +When the callback function requires the current `valkeyClusterAsyncContext`, it can typecast the given `valkeyClusterContext` to a `valkeyClusterAsyncContext`. +The `valkeyClusterAsyncContext` struct is an extension of the `valkeyClusterContext` struct. + +```c +void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) { + valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)cc; +} +``` #### Events per connection diff --git a/docs/migration-guide.md b/docs/migration-guide.md index afa1c346..53a979c3 100644 --- a/docs/migration-guide.md +++ b/docs/migration-guide.md @@ -36,6 +36,8 @@ The type `sds` is removed from the public API. initiation examples that might be helpful. * The default command to update the internal slot map is changed to `CLUSTER SLOTS`. `CLUSTER NODES` can be re-enabled through options using `VALKEY_OPT_USE_CLUSTER_NODES`. +* A `valkeyClusterAsyncContext` now embeds a `valkeyClusterContext` instead of + holding a pointer to it. Replace any use of `acc->cc` with `&acc->cc` or similar. ### Renamed API functions @@ -61,10 +63,13 @@ The type `sds` is removed from the public API. * `redisClusterSetOptionConnectNonBlock` removed since it was deprecated. * `redisClusterSetOptionConnectTimeout` removed, use `valkeyClusterOptions.connect_timeout`. * `redisClusterSetOptionMaxRetry` removed, use `valkeyClusterOptions.max_retry`. -* `redisClusterSetOptionParseSlaves` removed, use `valkeyClusterOptions.flags` and `VALKEY_OPT_USE_REPLICAS`. +* `redisClusterSetOptionParseSlaves` removed, use `valkeyClusterOptions.options` and `VALKEY_OPT_USE_REPLICAS`. * `redisClusterSetOptionPassword` removed, use `valkeyClusterOptions.password`. -* `redisClusterSetOptionRouteUseSlots` removed, the use of `CLUSTER SLOTS` is enabled by default. +* `redisClusterSetOptionRouteUseSlots` removed, `CLUSTER SLOTS` is used by default. * `redisClusterSetOptionUsername` removed, use `valkeyClusterOptions.username`. +* `redisClusterAsyncConnect` removed, use `valkeyClusterAsyncConnectWithOptions` with options flag `VALKEY_OPT_BLOCKING_INITIAL_UPDATE`. +* `redisClusterAsyncConnect2` removed, use `valkeyClusterAsyncConnectWithOptions`. +* `redisClusterAsyncContextInit` removed, `valkeyClusterAsyncConnectWithOptions` will initiate the context. * `redisClusterAsyncSetConnectCallback` removed, but `valkeyClusterOptions.async_connect_callback` can be used which accepts a non-const callback function prototype. * `redisClusterAsyncSetConnectCallbackNC` removed, use `valkeyClusterOptions.async_connect_callback`. * `redisClusterAsyncSetDisconnectCallback` removed, use `valkeyClusterOptions.async_disconnect_callback`. diff --git a/examples/cluster-clientside-caching-async.c b/examples/cluster-clientside-caching-async.c index c153ef58..0ccf721a 100644 --- a/examples/cluster-clientside-caching-async.c +++ b/examples/cluster-clientside-caching-async.c @@ -36,7 +36,10 @@ void connectCallback(valkeyAsyncContext *ac, int status) { commands. A reply is expected via a call to 'setCallback()' */ void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) { (void)cc; - valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata; + (void)privdata; + /* Get the async context by a simple cast since in the Async API a + * valkeyClusterAsyncContext is an extension of the valkeyClusterContext. */ + valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)cc; /* We send our commands when the client is ready to accept commands. */ if (event == VALKEYCLUSTER_EVENT_READY) { @@ -145,17 +148,14 @@ int main(int argc, char **argv) { options.initial_nodes = CLUSTER_NODE; options.async_connect_callback = connectCallback; options.async_disconnect_callback = disconnectCallback; + options.event_callback = eventCallback; valkeyClusterOptionsUseLibevent(&options, base); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); - assert(acc); - - int status; - status = valkeyClusterAsyncSetEventCallback(acc, eventCallback, acc); - assert(status == VALKEY_OK); - - status = valkeyClusterAsyncConnect(acc); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + if (acc == NULL || acc->err != 0) { + printf("Connect error: %s\n", acc ? acc->errstr : "OOM"); + exit(2); + } event_base_dispatch(base); diff --git a/include/valkey/cluster.h b/include/valkey/cluster.h index 56a5eef5..fd71e06d 100644 --- a/include/valkey/cluster.h +++ b/include/valkey/cluster.h @@ -115,10 +115,13 @@ typedef struct valkeyClusterContext { /* Context for accessing a Valkey Cluster asynchronously */ typedef struct valkeyClusterAsyncContext { - valkeyClusterContext *cc; + /* Hold the regular context. */ + valkeyClusterContext cc; - int err; /* Error flags, 0 when there is no error */ - char errstr[128]; /* String representation of error when applicable */ + int err; /* Error flag, 0 when there is no error, + * a copy of cc->err for convenience. */ + char *errstr; /* String representation of error when applicable, + * always pointing to cc->errstr[] */ int64_t lastSlotmapUpdateAttempt; /* Timestamp */ @@ -289,17 +292,6 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClus void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc); void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc); -/* Initiate and connect as separate steps. */ -valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options); -int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc); - -/* Callback option configurable after a context initiation, enabling that the - * valkeyClusterAsyncContext pointer can be given as privdata in the callback. */ -int valkeyClusterAsyncSetEventCallback(valkeyClusterAsyncContext *acc, - void(fn)(const valkeyClusterContext *cc, - int event, void *privdata), - void *privdata); - /* Commands */ int valkeyClusterAsyncCommand(valkeyClusterAsyncContext *acc, valkeyClusterCallbackFn *fn, void *privdata, diff --git a/src/cluster.c b/src/cluster.c index 6878ec7d..3a3f223c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -114,6 +114,7 @@ static int valkeyClusterSetOptionAddNodes(valkeyClusterContext *cc, const char * static int valkeyClusterSetOptionConnectTimeout(valkeyClusterContext *cc, const struct timeval tv); static int valkeyClusterSetOptionPassword(valkeyClusterContext *cc, const char *password); static int valkeyClusterSetOptionUsername(valkeyClusterContext *cc, const char *username); +static int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc); void listClusterNodeDestructor(void *val) { freeValkeyClusterNode(val); } @@ -1256,22 +1257,17 @@ int valkeyClusterUpdateSlotmap(valkeyClusterContext *cc) { return VALKEY_ERR; } -static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions *options) { - valkeyClusterContext *cc; - - cc = vk_calloc(1, sizeof(valkeyClusterContext)); - if (cc == NULL) - return NULL; - +static int valkeyClusterContextInit(valkeyClusterContext *cc, + const valkeyClusterOptions *options) { cc->nodes = dictCreate(&clusterNodesDictType, NULL); if (cc->nodes == NULL) { - valkeyClusterFree(cc); - return NULL; + valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); + return VALKEY_ERR; } cc->requests = listCreate(); if (cc->requests == NULL) { - valkeyClusterFree(cc); - return NULL; + valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); + return VALKEY_ERR; } cc->requests->free = listCommandFree; @@ -1291,23 +1287,23 @@ static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions } if (options->initial_nodes != NULL && valkeyClusterSetOptionAddNodes(cc, options->initial_nodes) != VALKEY_OK) { - return cc; /* err and errstr already set. */ + return VALKEY_ERR; /* err and errstr already set. */ } if (options->connect_timeout != NULL && valkeyClusterSetOptionConnectTimeout(cc, *options->connect_timeout) != VALKEY_OK) { - return cc; /* err and errstr already set. */ + return VALKEY_ERR; /* err and errstr already set. */ } if (options->command_timeout != NULL && valkeyClusterSetOptionTimeout(cc, *options->command_timeout) != VALKEY_OK) { - return cc; /* err and errstr already set. */ + return VALKEY_ERR; /* err and errstr already set. */ } if (options->username != NULL && valkeyClusterSetOptionUsername(cc, options->username) != VALKEY_OK) { - return cc; /* err and errstr already set. */ + return VALKEY_ERR; /* err and errstr already set. */ } if (options->password != NULL && valkeyClusterSetOptionPassword(cc, options->password) != VALKEY_OK) { - return cc; /* err and errstr already set. */ + return VALKEY_ERR; /* err and errstr already set. */ } if (options->connect_callback) { cc->on_connect = options->connect_callback; @@ -1321,7 +1317,7 @@ static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions cc->tls_init_fn = options->tls_init_fn; } - return cc; + return VALKEY_OK; } void valkeyClusterFree(valkeyClusterContext *cc) { @@ -1346,12 +1342,14 @@ void valkeyClusterFree(valkeyClusterContext *cc) { } valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options) { - valkeyClusterContext *cc = valkeyClusterContextInit(options); - if (cc == NULL) { + valkeyClusterContext *cc; + + cc = vk_calloc(1, sizeof(valkeyClusterContext)); + if (cc == NULL) return NULL; - } - /* Only connect if options are ok. */ - if (cc->err == 0) { + + if (valkeyClusterContextInit(cc, options) == VALKEY_OK) { + /* Only connect if options are ok. */ valkeyClusterUpdateSlotmap(cc); } return cc; @@ -2613,22 +2611,13 @@ void valkeyClusterReset(valkeyClusterContext *cc) { static void valkeyClusterAsyncSetError(valkeyClusterAsyncContext *acc, int type, const char *str) { - valkeyClusterSetError(acc->cc, type, str); /* Keep error flags identical. */ - acc->err = type; - - assert(str != NULL); - if (str != NULL && str != acc->errstr) { - size_t len = strlen(str); - len = len < (sizeof(acc->errstr) - 1) ? len : (sizeof(acc->errstr) - 1); - memcpy(acc->errstr, str, len); - acc->errstr[len] = '\0'; - } + valkeyClusterSetError(&acc->cc, type, str); /* Keep error flags identical. */ + acc->err = acc->cc.err; } static inline void valkeyClusterAsyncClearError(valkeyClusterAsyncContext *acc) { - valkeyClusterClearError(acc->cc); - acc->err = 0; - acc->errstr[0] = '\0'; + valkeyClusterClearError(&acc->cc); + acc->err = acc->cc.err; } static cluster_async_data *cluster_async_data_create(void) { @@ -2690,8 +2679,8 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc, valkeyOptions options = {0}; VALKEY_OPTIONS_SET_TCP(&options, node->host, node->port); - options.connect_timeout = acc->cc->connect_timeout; - options.command_timeout = acc->cc->command_timeout; + options.connect_timeout = acc->cc.connect_timeout; + options.command_timeout = acc->cc.command_timeout; node->lastConnectionAttempt = vk_usec_now(); @@ -2707,21 +2696,21 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc, return NULL; } - if (acc->cc->tls && - acc->cc->tls_init_fn(&ac->c, acc->cc->tls) != VALKEY_OK) { + if (acc->cc.tls && + acc->cc.tls_init_fn(&ac->c, acc->cc.tls) != VALKEY_OK) { valkeyClusterAsyncSetError(acc, ac->c.err, ac->c.errstr); valkeyAsyncFree(ac); return NULL; } // Authenticate when needed - if (acc->cc->password != NULL) { - if (acc->cc->username != NULL) { + if (acc->cc.password != NULL) { + if (acc->cc.username != NULL) { ret = valkeyAsyncCommand(ac, NULL, NULL, "AUTH %s %s", - acc->cc->username, acc->cc->password); + acc->cc.username, acc->cc.password); } else { ret = valkeyAsyncCommand(ac, NULL, NULL, "AUTH %s", - acc->cc->password); + acc->cc.password); } if (ret != VALKEY_OK) { @@ -2756,22 +2745,15 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc, return ac; } -valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options) { - valkeyClusterContext *cc; - valkeyClusterAsyncContext *acc; +static int valkeyClusterAsyncContextInit(valkeyClusterAsyncContext *acc, + const valkeyClusterOptions *options) { + /* Setup errstr to point to common error string in valkeyClusterContext. */ + acc->errstr = acc->cc.errstr; - cc = valkeyClusterContextInit(options); - if (cc == NULL) { - return NULL; - } - - acc = vk_calloc(1, sizeof(valkeyClusterAsyncContext)); - if (acc == NULL) { - valkeyClusterFree(cc); - return NULL; + if (valkeyClusterContextInit(&acc->cc, options) != VALKEY_OK) { + valkeyClusterAsyncSetError(acc, acc->cc.err, acc->cc.errstr); + return VALKEY_ERR; } - acc->cc = cc; - valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); if (options->async_connect_callback != NULL) { acc->onConnect = options->async_connect_callback; @@ -2779,38 +2761,36 @@ valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOpti if (options->async_disconnect_callback != NULL) { acc->onDisconnect = options->async_disconnect_callback; } - if (options->attach_fn != NULL) { - acc->attach_fn = options->attach_fn; - acc->attach_data = options->attach_data; + if (options->attach_fn == NULL) { + valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER, + "No event library configured"); + return VALKEY_ERR; } - - return acc; + acc->attach_fn = options->attach_fn; + acc->attach_data = options->attach_data; + return VALKEY_OK; } valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options) { - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(options); - if (acc == NULL) { + valkeyClusterAsyncContext *acc; + + acc = vk_calloc(1, sizeof(valkeyClusterAsyncContext)); + if (acc == NULL) return NULL; + + if (valkeyClusterAsyncContextInit(acc, options) == VALKEY_OK) { + /* Only connect if options are ok. */ + valkeyClusterAsyncConnect(acc); } - valkeyClusterAsyncConnect(acc); return acc; } -int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) { - /* An attach function for an async event library is required. */ - if (acc->attach_fn == NULL) { - return VALKEY_ERR; - } - - /* Clear a previously set shutdown flag to allow a - * reconnection of an async context using this API. */ - acc->cc->flags &= ~VALKEY_FLAG_DISCONNECTING; - +static int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) { /* Use blocking initial slotmap update when configured. */ - if (acc->cc->flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) { - if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) { - valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr); + if (acc->cc.flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) { + if (valkeyClusterUpdateSlotmap(&acc->cc) != VALKEY_OK) { + valkeyClusterAsyncSetError(acc, acc->cc.err, acc->cc.errstr); return VALKEY_ERR; } return VALKEY_OK; @@ -2819,18 +2799,6 @@ int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) { return updateSlotMapAsync(acc, NULL /*any node*/); } -int valkeyClusterAsyncSetEventCallback(valkeyClusterAsyncContext *acc, - void(fn)(const valkeyClusterContext *cc, - int event, void *privdata), - void *privdata) { - if (acc->cc->event_callback == NULL) { - acc->cc->event_callback = fn; - acc->cc->event_privdata = privdata; - return VALKEY_OK; - } - return VALKEY_ERR; -} - /* Reply callback function for CLUSTER SLOTS */ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r, void *privdata) { @@ -2844,7 +2812,7 @@ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r, return; } - valkeyClusterContext *cc = acc->cc; + valkeyClusterContext *cc = &acc->cc; dict *nodes = parse_cluster_slots(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { /* Retry using available nodes */ @@ -2865,7 +2833,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r, return; } - valkeyClusterContext *cc = acc->cc; + valkeyClusterContext *cc = &acc->cc; dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { /* Retry using available nodes */ @@ -2926,13 +2894,13 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc, /* Don't allow concurrent slot map updates. */ return VALKEY_ERR; } - if (acc->cc->flags & VALKEY_FLAG_DISCONNECTING) { + if (acc->cc.flags & VALKEY_FLAG_DISCONNECTING) { /* No slot map updates during a cluster client disconnect. */ return VALKEY_ERR; } if (ac == NULL) { - valkeyClusterNode *node = selectNode(acc->cc->nodes); + valkeyClusterNode *node = selectNode(acc->cc.nodes); if (node == NULL) { goto error; } @@ -2945,7 +2913,7 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc, /* Send a command depending of config */ int status; - if (acc->cc->flags & VALKEY_FLAG_USE_CLUSTER_NODES) { + if (acc->cc.flags & VALKEY_FLAG_USE_CLUSTER_NODES) { status = valkeyAsyncCommand(ac, clusterNodesReplyCallback, acc, VALKEY_COMMAND_CLUSTER_NODES); } else { @@ -2994,7 +2962,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r, goto error; } - cc = acc->cc; + cc = &acc->cc; if (cc == NULL) { goto error; } @@ -3128,7 +3096,7 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, return VALKEY_ERR; } - cc = acc->cc; + cc = &acc->cc; /* Don't accept new commands when the client is about to disconnect. */ if (cc->flags & VALKEY_FLAG_DISCONNECTING) { @@ -3205,7 +3173,7 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc, valkeyClusterCallbackFn *fn, void *privdata, char *cmd, int len) { - valkeyClusterContext *cc = acc->cc; + valkeyClusterContext *cc = &acc->cc; valkeyAsyncContext *ac; int status; cluster_async_data *cad = NULL; @@ -3393,7 +3361,7 @@ void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc) { return; } - cc = acc->cc; + cc = &acc->cc; cc->flags |= VALKEY_FLAG_DISCONNECTING; dictIterator di; @@ -3416,11 +3384,9 @@ void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc) { if (acc == NULL) return; - valkeyClusterContext *cc = acc->cc; + valkeyClusterContext *cc = &acc->cc; cc->flags |= VALKEY_FLAG_DISCONNECTING; valkeyClusterFree(cc); - - vk_free(acc); } struct nodeIterator { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 39db3f50..7fa0f9bd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -161,8 +161,6 @@ if (LIBEVENT_LIBRARY) target_link_libraries(clusterclient valkey ${TLS_LIBRARY}) add_executable(clusterclient_async clusterclient_async.c) target_link_libraries(clusterclient_async valkey ${TLS_LIBRARY} ${LIBEVENT_LIBRARY}) - add_executable(clusterclient_reconnect_async clusterclient_reconnect_async.c) - target_link_libraries(clusterclient_reconnect_async valkey ${TLS_LIBRARY} ${LIBEVENT_LIBRARY}) add_test(NAME set-get-test COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/set-get-test.sh" "$" @@ -211,10 +209,6 @@ if (LIBEVENT_LIBRARY) COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh" "$" WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") - add_test(NAME reconnect-test - COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/reconnect-test.sh" - "$" - WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") add_test(NAME timeout-handling-test COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/timeout-handling-test.sh" "$" diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index 84c4c322..f8f000be 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -156,7 +156,7 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { if (send_to_all) { valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); + valkeyClusterInitNodeIterator(&ni, &acc->cc); valkeyClusterNode *node; while ((node = valkeyClusterNodeNext(&ni)) != NULL) { @@ -193,9 +193,12 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) { (void)cc; + (void)privdata; if (event == VALKEYCLUSTER_EVENT_READY) { - /* Schedule a read from stdin and send next command. */ - valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata; + /* Schedule a read from stdin and send next command. + * Get the async context by a simple cast since in the Async API a + * valkeyClusterAsyncContext is an extension of the valkeyClusterContext. */ + valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)cc; struct timeval timeout = {0, 10}; struct event_base *base = acc->attach_data; event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout); @@ -268,6 +271,7 @@ int main(int argc, char **argv) { options.initial_nodes = initnode; options.connect_timeout = &timeout; options.command_timeout = &timeout; + options.event_callback = eventCallback; options.max_retry = 1; if (!async_initial_update) { options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; @@ -281,14 +285,9 @@ int main(int argc, char **argv) { } valkeyClusterOptionsUseLibevent(&options, base); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); if (acc == NULL || acc->err != 0) { - printf("Initiation failure: %s\n", acc ? acc->errstr : "OOM"); - exit(2); - } - valkeyClusterAsyncSetEventCallback(acc, eventCallback, acc); - if (valkeyClusterAsyncConnect(acc) != VALKEY_OK) { - printf("Connect error: %s\n", acc->errstr); + printf("Connect error: %s\n", acc ? acc->errstr : "OOM"); exit(2); } diff --git a/tests/clusterclient_reconnect_async.c b/tests/clusterclient_reconnect_async.c deleted file mode 100644 index 6af5451f..00000000 --- a/tests/clusterclient_reconnect_async.c +++ /dev/null @@ -1,116 +0,0 @@ -/* - * This program connects to a Valkey node and then reads commands from stdin, such - * as "SET foo bar", one per line and prints the results to stdout. - * - * The behaviour is similar to that of clusterclient_async.c, but it sends the - * next command after receiving a reply from the previous command. It also works - * for standalone Valkey nodes (without cluster mode), and uses the - * valkeyClusterAsyncCommandToNode function to send the command to the first node. - * If it receives any I/O error, the program performs a reconnect. - */ - -#include "adapters/libevent.h" -#include "cluster.h" -#include "test_utils.h" - -#include -#include -#include -#include - -/* Unfortunately there is no error code for this error to match */ -#define VALKEY_ENOCLUSTER "ERR This instance has cluster support disabled" - -void sendNextCommand(evutil_socket_t, short, void *); - -void connectToValkey(valkeyClusterAsyncContext *acc) { - /* reset context in case of reconnect */ - valkeyClusterAsyncDisconnect(acc); - - if (valkeyClusterAsyncConnect(acc) == VALKEY_OK) { - // cluster mode - } else if (acc->err && - strcmp(acc->errstr, VALKEY_ENOCLUSTER) == 0) { - printf("[no cluster]\n"); - acc->err = 0; - memset(acc->errstr, '\0', strlen(acc->errstr)); - } else { - printf("Connect error: %s\n", acc->errstr); - exit(-1); - } -} - -void replyCallback(valkeyClusterAsyncContext *acc, void *r, void *privdata) { - UNUSED(privdata); - valkeyReply *reply = (valkeyReply *)r; - - if (reply == NULL) { - if (acc->err == VALKEY_ERR_IO || acc->err == VALKEY_ERR_EOF) { - printf("[reconnect]\n"); - connectToValkey(acc); - } else if (acc->err) { - printf("error: %s\n", acc->errstr); - } else { - printf("unknown error\n"); - } - } else { - printf("%s\n", reply->str); - } - - // schedule reading from stdin and sending next command - struct event_base *base = acc->attach_data; - event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); -} - -void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { - UNUSED(fd); - UNUSED(kind); - valkeyClusterAsyncContext *acc = arg; - - char command[256]; - if (fgets(command, 256, stdin)) { - size_t len = strlen(command); - if (command[len - 1] == '\n') // Chop trailing line break - command[len - 1] = '\0'; - - valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); - valkeyClusterNode *node = valkeyClusterNodeNext(&ni); - assert(node); - - // coverity[tainted_scalar] - int status = valkeyClusterAsyncCommandToNode(acc, node, replyCallback, - NULL, command); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); - } else { - // disconnect if nothing is left to read from stdin - valkeyClusterAsyncDisconnect(acc); - } -} - -int main(int argc, char **argv) { - if (argc <= 1) { - fprintf(stderr, "Usage: %s HOST:PORT\n", argv[0]); - exit(1); - } - const char *initnode = argv[1]; - struct event_base *base = event_base_new(); - - valkeyClusterOptions options = {0}; - options.initial_nodes = initnode; - options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; - valkeyClusterOptionsUseLibevent(&options, base); - - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); - assert(acc); - - connectToValkey(acc); - // schedule reading from stdin and sending next command - event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); - - event_base_dispatch(base); - - valkeyClusterAsyncFree(acc); - event_base_free(base); - return 0; -} diff --git a/tests/ct_async.c b/tests/ct_async.c index 4aff38c5..5af0e88d 100644 --- a/tests/ct_async.c +++ b/tests/ct_async.c @@ -35,7 +35,10 @@ void disconnectCallback(const valkeyAsyncContext *ac, int status) { void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) { (void)cc; - valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata; + (void)privdata; + /* Get the async context by a simple cast since in the Async API a + * valkeyClusterAsyncContext is an extension of the valkeyClusterContext. */ + valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)cc; /* We send our commands when the client is ready to accept commands. */ if (event == VALKEYCLUSTER_EVENT_READY) { @@ -66,17 +69,11 @@ int main(void) { options.initial_nodes = CLUSTER_NODE; options.async_connect_callback = connectCallback; options.async_disconnect_callback = disconnectCallback; + options.event_callback = eventCallback; valkeyClusterOptionsUseLibevent(&options, base); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); - assert(acc); - - /* Set an event callback that uses acc as privdata */ - int status = valkeyClusterAsyncSetEventCallback(acc, eventCallback, acc); - assert(status == VALKEY_OK); - - status = valkeyClusterAsyncConnect(acc); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc && acc->err == 0); event_base_dispatch(base); diff --git a/tests/ct_connection.c b/tests/ct_connection.c index b257203c..0e371ded 100644 --- a/tests/ct_connection.c +++ b/tests/ct_connection.c @@ -333,9 +333,9 @@ void test_async_password_wrong(void) { assert(acc); assert(acc->err == VALKEY_ERR_OTHER); if (valkey_version_less_than(6, 0)) - assert(strcmp(acc->cc->errstr, "ERR invalid password") == 0); + assert(strcmp(acc->errstr, "ERR invalid password") == 0); else - assert(strncmp(acc->cc->errstr, "WRONGPASS", 9) == 0); + assert(strncmp(acc->errstr, "WRONGPASS", 9) == 0); // No connection ExpectedResult r; @@ -364,7 +364,7 @@ void test_async_password_missing(void) { valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); assert(acc); assert(acc->err == VALKEY_ERR_OTHER); - assert(strncmp(acc->cc->errstr, "NOAUTH", 6) == 0); + assert(strncmp(acc->errstr, "NOAUTH", 6) == 0); // No connection ExpectedResult r; @@ -519,7 +519,7 @@ void test_async_command_timeout(void) { ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); + valkeyClusterInitNodeIterator(&ni, &acc->cc); valkeyClusterNode *node = valkeyClusterNodeNext(&ni); assert(node); diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index ad5992f7..561697e7 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -125,13 +125,11 @@ void test_alloc_failure_handling(void) { // Connect valkeyClusterContext *cc; { - for (int i = 0; i < 3; ++i) { - successfulAllocations = i; - cc = valkeyClusterConnectWithOptions(&options); - assert(cc == NULL); - } + successfulAllocations = 0; + cc = valkeyClusterConnectWithOptions(&options); + assert(cc == NULL); - for (int i = 3; i < 100; ++i) { + for (int i = 1; i < 100; ++i) { successfulAllocations = i; cc = valkeyClusterConnectWithOptions(&options); assert(cc); @@ -462,13 +460,11 @@ void test_alloc_failure_handling_async(void) { // Connect valkeyClusterAsyncContext *acc; { - for (int i = 0; i < 13; ++i) { - successfulAllocations = i; - acc = valkeyClusterAsyncConnectWithOptions(&options); - assert(acc == NULL); - } + successfulAllocations = 0; + acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc == NULL); - for (int i = 13; i < 100; ++i) { + for (int i = 1; i < 100; ++i) { successfulAllocations = i; acc = valkeyClusterAsyncConnectWithOptions(&options); ASSERT_STR_EQ(acc->errstr, "Out of memory"); diff --git a/tests/ct_specific_nodes.c b/tests/ct_specific_nodes.c index 8e1063c9..7eedcbb3 100644 --- a/tests/ct_specific_nodes.c +++ b/tests/ct_specific_nodes.c @@ -332,7 +332,7 @@ void test_async_to_single_node(void) { ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); + valkeyClusterInitNodeIterator(&ni, &acc->cc); valkeyClusterNode *node = valkeyClusterNodeNext(&ni); assert(node); @@ -363,7 +363,7 @@ void test_async_formatted_to_single_node(void) { ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); + valkeyClusterInitNodeIterator(&ni, &acc->cc); valkeyClusterNode *node = valkeyClusterNodeNext(&ni); assert(node); @@ -395,7 +395,7 @@ void test_async_command_argv_to_single_node(void) { ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); + valkeyClusterInitNodeIterator(&ni, &acc->cc); valkeyClusterNode *node = valkeyClusterNodeNext(&ni); assert(node); @@ -427,7 +427,7 @@ void test_async_to_all_nodes(void) { ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; - valkeyClusterInitNodeIterator(&ni, acc->cc); + valkeyClusterInitNodeIterator(&ni, &acc->cc); int status; ExpectedResult r1 = {.type = VALKEY_REPLY_INTEGER}; @@ -466,7 +466,7 @@ void test_async_transaction(void) { valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); - valkeyClusterNode *node = valkeyClusterGetNodeByKey(acc->cc, (char *)"foo"); + valkeyClusterNode *node = valkeyClusterGetNodeByKey(&acc->cc, (char *)"foo"); assert(node); int status; diff --git a/tests/scripts/reconnect-test.sh b/tests/scripts/reconnect-test.sh deleted file mode 100755 index 60e0e2e8..00000000 --- a/tests/scripts/reconnect-test.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/sh - -# Usage: $0 /path/to/clusterclient-binary - -clientprog=${1:-./clusterclient_reconnect_async} -testname=reconnect-test - -# Sync process just waiting for server to be ready to accept connection. -perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & -syncpid=$! - -# Start simulated server -timeout 5s ./simulated-valkey.pl -p 7400 -d --sigcont $syncpid <<'EOF' & -EXPECT CONNECT -EXPECT ["CLUSTER", "SLOTS"] -SEND -ERR This instance has cluster support disabled -EXPECT CLOSE -EXPECT CONNECT -EXPECT ["SET", "foo", "bar"] -SEND +OK -EXPECT ["GET", "foo"] -SEND "bar" -CLOSE -EXPECT CONNECT -EXPECT ["CLUSTER", "SLOTS"] -SEND -ERR This instance has cluster support disabled -EXPECT CLOSE -EXPECT CONNECT -EXPECT ["GET", "foo"] -SEND "bar" -EXPECT CLOSE -EOF -server=$! - -# Wait until server is ready to accept client connection -wait $syncpid; - -# Run client -timeout 3s "$clientprog" 127.0.0.1:7400 > "$testname.out" <<'EOF' -SET foo bar -GET foo -GET foo -GET foo -EOF -clientexit=$? - -# Wait for server to exit -wait $server; serverexit=$? - -# Check exit statuses -if [ $serverexit -ne 0 ]; then - echo "Simulated server exited with status $serverexit" - exit $serverexit -fi -if [ $clientexit -ne 0 ]; then - echo "$clientprog exited with status $clientexit" - exit $clientexit -fi - -# Check the output from clusterclient -printf '[no cluster]\nOK\nbar\n[reconnect]\n[no cluster]\nbar\n' | cmp "$testname.out" - || exit 99 - -# Clean up -rm "$testname.out" diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index f0b05556..b53333c5 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -22,6 +22,12 @@ const char *__asan_default_options(void) { valkeyReply *create_reply(const char *buf, size_t len); char *resp_encode_array(char *p, sds *resp); +valkeyClusterContext *createClusterContext(const valkeyClusterOptions *options) { + valkeyClusterContext *cc = vk_calloc(1, sizeof(valkeyClusterContext)); + assert(valkeyClusterContextInit(cc, options) == VALKEY_OK); + return cc; +} + /* Helper to create a valkeyReply that contains a bulkstring. */ valkeyReply *create_cluster_nodes_reply(const char *str) { /* Create a RESP Bulk String. */ @@ -127,7 +133,7 @@ void test_parse_cluster_nodes(bool parse_replicas) { if (parse_replicas) options.options |= VALKEY_OPT_USE_REPLICAS; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; @@ -211,7 +217,7 @@ void test_parse_cluster_nodes(bool parse_replicas) { void test_parse_cluster_nodes_during_failover(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; @@ -277,7 +283,7 @@ void test_parse_cluster_nodes_during_failover(void) { /* Skip nodes with the `noaddr` flag. */ void test_parse_cluster_nodes_with_noaddr(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; dictIterator di; @@ -306,7 +312,7 @@ void test_parse_cluster_nodes_with_noaddr(void) { void test_parse_cluster_nodes_with_empty_ip(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyClusterNode *node; dictIterator di; @@ -342,7 +348,7 @@ void test_parse_cluster_nodes_with_empty_ip(void) { /* Parse replies with additional importing and migrating information. */ void test_parse_cluster_nodes_with_special_slot_entries(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; @@ -385,7 +391,7 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { valkeyClusterOptions options = {0}; options.options |= VALKEY_OPT_USE_REPLICAS; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; @@ -449,7 +455,7 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { /* Give error when parsing erroneous data. */ void test_parse_cluster_nodes_with_parse_error(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyReply *reply; dict *nodes; @@ -498,7 +504,7 @@ void test_parse_cluster_nodes_with_parse_error(void) { * i.e. `ip:port` instead of `ip:port@cport` */ void test_parse_cluster_nodes_with_legacy_format(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; dictIterator di; @@ -526,7 +532,7 @@ void test_parse_cluster_slots(bool parse_replicas) { if (parse_replicas) options.options |= VALKEY_OPT_USE_REPLICAS; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; @@ -608,7 +614,7 @@ void test_parse_cluster_slots(bool parse_replicas) { void test_parse_cluster_slots_with_empty_ip(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyClusterNode *node; dictIterator di; @@ -644,7 +650,7 @@ void test_parse_cluster_slots_with_empty_ip(void) { void test_parse_cluster_slots_with_null_ip(void) { valkeyClusterOptions options = {0}; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyClusterNode *node; dictIterator di; @@ -683,7 +689,7 @@ void test_parse_cluster_slots_with_multiple_replicas(void) { valkeyClusterOptions options = {0}; options.options |= VALKEY_OPT_USE_REPLICAS; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; @@ -743,7 +749,7 @@ void test_parse_cluster_slots_with_noncontiguous_slots(void) { valkeyClusterOptions options = {0}; options.options |= VALKEY_OPT_USE_REPLICAS; - valkeyClusterContext *cc = valkeyClusterContextInit(&options); + valkeyClusterContext *cc = createClusterContext(&options); valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot;