From: Jeroen van der Heijden Date: Wed, 20 Apr 2022 14:16:23 +0000 (+0200) Subject: TCP tee X-Git-Tag: archive/raspbian/2.0.48-1+rpi1^2~6^2^2~9^2~5 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=ed5d63342eaecf982193f107b449a64c227670f3;p=siridb-server.git TCP tee --- diff --git a/grammar/grammar.py b/grammar/grammar.py index 2b689c23..ddc2b1cf 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -165,7 +165,7 @@ class SiriGrammar(Grammar): k_sync_progress = Keyword('sync_progress') k_tag = Keyword('tag') k_tags = Keyword('tags') - k_tee_pipe_name = Keyword('tee_pipe_name') + k_tee = Keyword('tee') k_time_precision = Keyword('time_precision') k_timeit = Keyword('timeit') k_timeval = Keyword('timeval') @@ -287,7 +287,6 @@ class SiriGrammar(Grammar): k_reindex_progress, k_selected_points, k_sync_progress, - k_tee_pipe_name, k_uptime, most_greedy=False), ',', 1) @@ -394,7 +393,6 @@ class SiriGrammar(Grammar): k_status, k_reindex_progress, k_sync_progress, - k_tee_pipe_name, most_greedy=False), str_operator, string), Sequence(k_online, bool_operator, _boolean), Sequence(k_log_level, int_operator, log_keywords), @@ -606,7 +604,7 @@ class SiriGrammar(Grammar): Optional(Sequence(k_using, aggregate_functions))) set_address = Sequence(k_set, k_address, string) - set_tee_pipe_name = Sequence(k_set, k_tee_pipe_name, Choice( + set_tee = Sequence(k_set, k_tee, Choice( k_false, string, most_greedy=False)) @@ -642,6 +640,7 @@ class SiriGrammar(Grammar): set_timezone, set_expiration_num, set_expiration_log, + set_tee, most_greedy=False)) alter_group = Sequence(k_group, group_name, Choice( @@ -656,14 +655,12 @@ class SiriGrammar(Grammar): alter_server = Sequence(k_server, uuid, Choice( set_log_level, set_backup_mode, - set_tee_pipe_name, set_address, set_port, most_greedy=False)) alter_servers = Sequence(k_servers, Optional(where_server), Choice( set_log_level, - set_tee_pipe_name, most_greedy=False)) alter_user = Sequence(k_user, string, Choice( @@ -853,7 +850,7 @@ class SiriGrammar(Grammar): k_startup_time, k_status, k_sync_progress, - k_tee_pipe_name, + k_tee, k_time_precision, k_timezone, k_uptime, diff --git a/include/siri/db/db.h b/include/siri/db/db.h index 20fb8fad..25c9bd48 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -8,7 +8,7 @@ typedef struct siridb_s siridb_t; #define SIRIDB_MAX_SIZE_ERR_MSG 1024 #define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */ -#define SIRIDB_SCHEMA 6 +#define SIRIDB_SCHEMA 7 #define SIRIDB_FLAG_REINDEXING 1 #define SIRIDB_FLAG_DROPPED 2 diff --git a/include/siri/db/tee.h b/include/siri/db/tee.h index 16ff3f38..cbee3562 100644 --- a/include/siri/db/tee.h +++ b/include/siri/db/tee.h @@ -6,50 +6,59 @@ typedef struct siridb_tee_s siridb_tee_t; +#define SIRIDB_TEE_DEFAULT_TCP_PORT 9104 + enum { - SIRIDB_TEE_FLAG_INIT = 1<<0, - SIRIDB_TEE_FLAG_CONNECTING = 1<<1, - SIRIDB_TEE_FLAG_CONNECTED = 1<<2, SIRIDB_TEE_FLAG = 1<<31, }; +enum siridb_tee_e_t +{ + SIRIDB_TEE_E_OK=0, + SIRIDB_TEE_E_ALLOC, + SIRIDB_TEE_E_READ, + SIRIDB_TEE_E_CONNECT, +}; + + #include #include #include siridb_tee_t * siridb_tee_new(void); -void siridb_tee_free(siridb_tee_t * tee); -int siridb_tee_connect(siridb_tee_t * tee); -int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name); +int siridb_tee_set_address_port( + siridb_tee_t * tee, + const char * address, + uint16_t port); void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise); -const char * tee_str(siridb_tee_t * tee); -static inline bool siridb_tee_is_configured(siridb_tee_t * tee); -static inline bool siridb_tee_is_connected(siridb_tee_t * tee); -static inline bool siridb_tee_is_handle(uv_handle_t * handle); +void siridb_tee_free(siridb_tee_t * tee); +const char * siridb_tee_str(siridb_tee_t * tee); + +typedef void (*siridb_tee_cb)(uv_handle_t *); + struct siridb_tee_s { uint32_t flags; /* maps to sirnet_stream_t tp for cleanup */ - char * pipe_name_; - char * err_msg_; - uv_pipe_t pipe; + uint16_t port; + uint16_t err_code; + char * address; + uv_tcp_t * tcp; + uv_mutex_t lock_; + sirinet_promise_t * promise_; }; + static inline bool siridb_tee_is_configured(siridb_tee_t * tee) { - return tee->pipe_name_ != NULL; + return tee->address != NULL; }; -static inline bool siridb_tee_is_connected(siridb_tee_t * tee) -{ - return tee->flags & SIRIDB_TEE_FLAG_CONNECTED; -} - static inline bool siridb_tee_is_handle(uv_handle_t * handle) { return - handle->type == UV_NAMED_PIPE && + handle->type == UV_TCP && handle->data && (((siridb_tee_t *) handle->data)->flags & SIRIDB_TEE_FLAG); } diff --git a/include/siri/grammar/grammar.h b/include/siri/grammar/grammar.h index 6a696e81..466e0916 100644 --- a/include/siri/grammar/grammar.h +++ b/include/siri/grammar/grammar.h @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2020-09-25 10:57:26 + * Created at: 2022-04-15 12:10:03 */ #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ @@ -236,7 +236,7 @@ enum cleri_grammar_ids { CLERI_GID_K_SYNC_PROGRESS, CLERI_GID_K_TAG, CLERI_GID_K_TAGS, - CLERI_GID_K_TEE_PIPE_NAME, + CLERI_GID_K_TEE, CLERI_GID_K_TIMEIT, CLERI_GID_K_TIMEVAL, CLERI_GID_K_TIMEZONE, @@ -307,7 +307,7 @@ enum cleri_grammar_ids { CLERI_GID_SET_PASSWORD, CLERI_GID_SET_PORT, CLERI_GID_SET_SELECT_POINTS_LIMIT, - CLERI_GID_SET_TEE_PIPE_NAME, + CLERI_GID_SET_TEE, CLERI_GID_SET_TIMEZONE, CLERI_GID_SHARD_COLUMNS, CLERI_GID_SHOW_STMT, diff --git a/include/siri/net/protocol.h b/include/siri/net/protocol.h index d3787e84..ff37e4e3 100644 --- a/include/siri/net/protocol.h +++ b/include/siri/net/protocol.h @@ -77,7 +77,7 @@ typedef enum BPROTO_REQ_GROUPS, /* empty */ BPROTO_ENABLE_BACKUP_MODE, /* empty */ BPROTO_DISABLE_BACKUP_MODE, /* empty */ - BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */ + _BPROTO_TEE_PIPE_NAME_UPDATE, /* deprecated */ BPROTO_DROP_DATABASE, /* empty */ BPROTO_REQ_TAGS, /* empty */ BPROTO_SERIES_TAGS, /* [series name, tag name, ...] */ diff --git a/itest/tee_server.py b/itest/tee_server.py new file mode 100644 index 00000000..11b8ffc0 --- /dev/null +++ b/itest/tee_server.py @@ -0,0 +1,6 @@ +import asyncio + + + +if __name__ == '__main__': + pass \ No newline at end of file diff --git a/itest/test_integer_load.py b/itest/test_integer_load.py new file mode 100644 index 00000000..acb0e4b5 --- /dev/null +++ b/itest/test_integer_load.py @@ -0,0 +1,81 @@ +import asyncio +import functools +import random +import time +import string +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import ServerError +from testing import SiriDB +from testing import TestBase +from testing import UserAuthError +from testing import parse_args + + +TIME_PRECISION = 'ms' + + +class TestIntegerLoad(TestBase): + title = 'Test inserts and response' + + GEN_POINTS = functools.partial( + gen_points, + tp=int, + mi=-2**10, + ma=2**10, + n=5, + time_precision=TIME_PRECISION, + ts_gap=3) + + async def select(self, client, series, n): + series = {s.name: s for s in series} + chars=string.ascii_letters + string.digits + for _ in range(n): + regex = ''.join(random.choice(chars) for _ in range(3)) + res = await client.query(f'select max() prefix "max-", min() prefix "min-", mean() prefix "mean-" from /.*{regex}.*/i') + if res: + print(res) + # for s, p in res.items(): + # self.assertEqual(sorted(series[s].points), p) + await asyncio.sleep(0.2) + + async def insert(self, client, series, n): + for _ in range(n): + await client.insert_some_series( + series, n=0.04, points=self.GEN_POINTS) + await asyncio.sleep(0.2) + + @default_test_setup( + 1, + time_precision=TIME_PRECISION, + compression=True, + auto_duration=True, + optimize_interval=20) + async def run(self): + await self.client0.connect() + + series = gen_series(n=1000) + + insert = asyncio.ensure_future( + self.insert(self.client0, series, n=50000)) + select = asyncio.ensure_future( + self.select(self.client0, series, n=50000)) + await asyncio.gather(insert, select) + + self.client0.close() + await asyncio.sleep(1800) + + +if __name__ == '__main__': + random.seed(1) + parse_args() + run_test(TestIntegerLoad()) diff --git a/src/siri/db/db.c b/src/siri/db/db.c index 72749f1b..374fe8b7 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -278,12 +278,6 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags) /* start tasks */ siridb_tasks_init(&siridb->tasks); - /* init tee if configured */ - if (siridb_tee_is_configured(siridb->tee)) - { - siridb_tee_connect(siridb->tee); - } - log_info("Finished loading database: '%s'", siridb->dbname); return siridb; @@ -319,7 +313,8 @@ static int siridb__from_unpacker( qp_schema.via.int64 == 2 || qp_schema.via.int64 == 3 || qp_schema.via.int64 == 4 || - qp_schema.via.int64 == 5) + qp_schema.via.int64 == 5 || + qp_schema.via.int64 == 6) { log_info( "Found an old database schema (v%d), " @@ -481,31 +476,18 @@ static int siridb__from_unpacker( } /* for older schemas we keep the default tee_pipe_name=NULL */ - if (qp_schema.via.int64 >= 5) + if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6) { + LOGC("HERE"); qp_next(unpacker, &qp_obj); - - if (qp_obj.tp == QP_RAW) - { - (*siridb)->tee->pipe_name_ = strndup( - (char *) qp_obj.via.raw, - qp_obj.len); - - if (!(*siridb)->tee->pipe_name_) - { - READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.") - } - } - else if (qp_obj.tp != QP_NULL) - { - READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.") - } + /* Skip the tee pipe name */ } if (qp_schema.via.int64 >= 6) { /* read select points limit */ if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0) { + READ_DB_EXIT_WITH_ERROR( "Cannot read shard (log) expiration time.") } @@ -519,17 +501,48 @@ static int siridb__from_unpacker( } (*siridb)->expiration_num = qp_obj.via.int64; } - if ((*siridb)->tee->pipe_name_ == NULL) + if (qp_schema.via.int64 >= 7) + { + qp_next(unpacker, &qp_obj); + + if (qp_obj.tp == QP_RAW) + { + (*siridb)->tee->address = strndup( + (char *) qp_obj.via.raw, + qp_obj.len); + + if (!(*siridb)->tee->address) + { + READ_DB_EXIT_WITH_ERROR("Cannot allocate tee address.") + } + } + else if (qp_obj.tp != QP_NULL) + { + READ_DB_EXIT_WITH_ERROR("Cannot read tee address.") + } + + if (qp_next(unpacker, &qp_obj) != QP_INT64 || + qp_obj.via.int64 < 0 || + qp_obj.via.int64 > 65535) + { + READ_DB_EXIT_WITH_ERROR("Cannot read tee port.") + } + + (*siridb)->tee->port = qp_obj.via.int64; + } + + if ((*siridb)->tee->address == NULL) { log_debug( - "No tee pipe name configured for database: %s", + "No tee configured for database: %s", (*siridb)->dbname); } else { log_debug( - "Using tee pipe name '%s' for database: '%s'", - (*siridb)->tee->pipe_name_, + "Using tee '%s:%u' for database: '%s'", + (*siridb)->tee->address, + (*siridb)->tee->port, (*siridb)->dbname); } @@ -675,11 +688,12 @@ int siridb_save(siridb_t * siridb) qp_fadd_double(fpacker, siridb->drop_threshold) || qp_fadd_int64(fpacker, siridb->select_points_limit) || qp_fadd_int64(fpacker, siridb->list_limit) || - (siridb->tee->pipe_name_ == NULL - ? qp_fadd_type(fpacker, QP_NULL) - : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) || qp_fadd_int64(fpacker, siridb->expiration_log) || qp_fadd_int64(fpacker, siridb->expiration_num) || + (siridb->tee->address == NULL + ? qp_fadd_type(fpacker, QP_NULL) + : qp_fadd_string(fpacker, siridb->tee->address)) || + qp_fadd_int64(fpacker, siridb->tee->port) || qp_fadd_type(fpacker, QP_ARRAY_CLOSE) || qp_close(fpacker)); } @@ -888,6 +902,7 @@ static siridb_t * siridb__new(void) siridb->users = NULL; siridb->servers = NULL; siridb->pools = NULL; + siridb->dropped_fp = NULL; siridb->max_series_id = 0; siridb->received_points = 0; siridb->selected_points = 0; diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index e67b7913..a581a557 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -345,7 +345,7 @@ int insert_init_backend_local( siridb_tasks_inc(siridb->tasks); siridb->insert_tasks++; - if (siridb_tee_is_connected(siridb->tee)) + if (siridb_tee_is_configured(siridb->tee)) { siridb_tee_write(siridb->tee, promise); } @@ -1075,7 +1075,7 @@ static int INSERT_init_local( siridb_tasks_inc(siridb->tasks); siridb->insert_tasks++; - if (siridb_tee_is_connected(siridb->tee)) + if (siridb_tee_is_configured(siridb->tee)) { siridb_tee_write(siridb->tee, promise); } diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index ec42c8c9..d2919a07 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -175,12 +176,12 @@ if (IS_MASTER && siridb_is_reindexing(siridb)) \ "Successfully dropped server '%s'." #define MSG_SUCCES_SET_LOG_LEVEL_MULTI \ "Successfully set log level to '%s' on %lu servers." -#define MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI \ - "Successfully set tee_pipe name on %lu servers." +#define MSG_SUCCES_SET_TEE_DISABLED \ + "Successfully disabled tee." +#define MSG_SUCCES_SET_TEE_ENABLED \ + "Successfully configures tee to %s." #define MSG_SUCCES_SET_LOG_LEVEL \ "Successfully set log level to '%s' on '%s'." -#define MSG_SUCCES_SET_TEE_PIPE_NAME \ - "Successfully set tee pipe name to '%s' on '%s'." #define MSG_SUCCESS_SET_SELECT_POINTS_LIMIT \ "Successfully changed select points limit from %" PRIu32 " to %" PRIu32 "." #define MSG_SUCCES_DROP_SERIES \ @@ -287,7 +288,7 @@ static void exit_set_list_limit(uv_async_t * handle); static void exit_set_log_level(uv_async_t * handle); static void exit_set_port(uv_async_t * handle); static void exit_set_select_points_limit(uv_async_t * handle); -static void exit_set_tee_pipe_name(uv_async_t * handle); +static void exit_set_tee(uv_async_t * handle); static void exit_set_timezone(uv_async_t * handle); static void exit_show_stmt(uv_async_t * handle); static void exit_tag_series(uv_async_t * handle); @@ -543,7 +544,7 @@ void siridb_init_listener(void) SIRIDB_NODE_EXIT[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level; SIRIDB_NODE_EXIT[CLERI_GID_SET_PORT] = exit_set_port; SIRIDB_NODE_EXIT[CLERI_GID_SET_SELECT_POINTS_LIMIT] = exit_set_select_points_limit; - SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name; + SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE] = exit_set_tee; SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone; SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt; SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series; @@ -4737,137 +4738,93 @@ static void exit_set_select_points_limit(uv_async_t * handle) } } -static void exit_set_tee_pipe_name(uv_async_t * handle) +static void exit_set_tee(uv_async_t * handle) { siridb_query_t * query = handle->data; - query_alter_t * q_alter = (query_alter_t *) query->data; siridb_t * siridb = query->client->siridb; - assert (query->data != NULL); + MASTER_CHECK_ACCESSIBLE(siridb) cleri_node_t * node = cleri_gn(cleri_gn( query->nodes->node->children->next->next)->children); - char pipe_name[node->len - 1]; - char * p_pipe_name = NULL; + char tee_addr_port[node->len - 1]; + char tee_address[SIRI_CFG_MAX_LEN_ADDRESS]; + uint16_t tee_port = SIRIDB_TEE_DEFAULT_TCP_PORT; - if (node->cl_obj->gid == CLERI_GID_STRING) - { - xstr_extract_string(pipe_name, node->str, node->len); - p_pipe_name = pipe_name; - } - if (q_alter->alter_tp == QUERY_ALTER_SERVERS) + if (node->cl_obj->gid == CLERI_GID_STRING) { - /* - * alter_servers - */ - cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr; - siridb_server_walker_t wserver = { - .server=siridb->server, - .siridb=siridb - }; + xstr_extract_string(tee_addr_port, node->str, node->len); - if (where_expr == NULL || cexpr_run( - where_expr, - (cexpr_cb_t) siridb_server_cexpr_cb, - &wserver)) + if (tee_addr_port[0] == 0) { - (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name); - if (siridb_save(siridb)) - { - log_critical("Could not save database changes (database: '%s')", - siridb->dbname); - } - q_alter->n++; + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Tee address must not be empty"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; } - if (IS_MASTER) + if (sirinet_extract_addr_port(tee_addr_port, tee_address, &tee_port)) { - /* - * This is a trick because we share with setting log level on - * multiple servers at once. - */ - q_alter->n += LOGGER_NUM_LEVELS << 16; - siridb_query_forward( - handle, - SIRIDB_QUERY_FWD_SERVERS, - (sirinet_promises_cb) on_alter_xxx_response, - 0); + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Invalid tee address; expecting ADDRESS[:PORT]"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; } - else + + if (siridb_tee_set_address_port(siridb->tee, tee_address, tee_port)) { - qp_add_raw(query->packer, (const unsigned char *) "servers", 7); - qp_add_int64(query->packer, q_alter->n); - SIRIPARSER_ASYNC_NEXT_NODE + log_error("Failed to set tee address"); /* continue on error..*/ } } else { - /* - * alter_server - * - * we can set the success message, we just ignore the message in case - * an error occurs. - */ - siridb_server_t * server = q_alter->via.server; + /* disable the tee */ + (void) siridb_tee_set_address_port(siridb->tee, NULL, 0); + } + if (siridb_save(siridb)) + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Error while saving database changes!"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + } + else + { QP_ADD_SUCCESS - qp_add_fmt_safe(query->packer, - MSG_SUCCES_SET_TEE_PIPE_NAME, - p_pipe_name ? p_pipe_name : "disabled", - server->name); - if (server == siridb->server) + if (siridb->tee->address) { - (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name); - if (siridb_save(siridb)) - { - log_critical("Could not save database changes (database: '%s')", - siridb->dbname); - } - - SIRIPARSER_ASYNC_NEXT_NODE + log_info( + MSG_SUCCES_SET_TEE_ENABLED, + siridb->tee->address, + siridb->tee->port); + qp_add_fmt_safe(query->packer, + MSG_SUCCES_SET_TEE_ENABLED, + siridb->tee->address, + siridb->tee->port); } else { + log_info(MSG_SUCCES_SET_TEE_DISABLED); + qp_add_string(query->packer, MSG_SUCCES_SET_TEE_DISABLED); + } - if (siridb_server_is_online(server)) - { - sirinet_pkg_t * pkg = sirinet_pkg_new( - 0, - p_pipe_name ? strlen(p_pipe_name) : 0, - BPROTO_TEE_PIPE_NAME_UPDATE, - (unsigned char *) p_pipe_name); - if (pkg != NULL) - { - /* handle will be bound to a timer so we should increment */ - siri_async_incref(handle); - if (siridb_server_send_pkg( - server, - pkg, - 0, - (sirinet_promise_cb) on_ack_response, - handle, - 0)) - { - /* - * signal is raised and 'on_ack_response' will not be - * called - */ - free(pkg); - siri_async_decref(&handle); - } - } - } - else - { - snprintf(query->err_msg, - SIRIDB_MAX_SIZE_ERR_MSG, - "Cannot set pipe name, '%s' is currently unavailable", - server->name); - siridb_query_send_error(handle, CPROTO_ERR_QUERY); - } + if (IS_MASTER) + { + siridb_query_forward( + handle, + SIRIDB_QUERY_FWD_UPDATE, + (sirinet_promises_cb) on_update_xxx_response, + 0); + } + else + { + SIRIPARSER_ASYNC_NEXT_NODE } } } @@ -5928,27 +5885,15 @@ static void on_alter_xxx_response(vec_t * promises, uv_async_t * handle) */ QP_ADD_SUCCESS - if ((q_alter->n >> 16) >= LOGGER_NUM_LEVELS) - { - log_info(MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, q_alter->n & 0xffff); - qp_add_fmt_safe( - query->packer, - MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, - q_alter->n & 0xffff); - } - else - { - log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI, - logger_level_name(q_alter->n >> 16), - q_alter->n & 0xffff); - - qp_add_fmt_safe( - query->packer, - MSG_SUCCES_SET_LOG_LEVEL_MULTI, - logger_level_name(q_alter->n >> 16), - q_alter->n & 0xffff); - } + log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI, + logger_level_name(q_alter->n >> 16), + q_alter->n & 0xffff); + qp_add_fmt_safe( + query->packer, + MSG_SUCCES_SET_LOG_LEVEL_MULTI, + logger_level_name(q_alter->n >> 16), + q_alter->n & 0xffff); SIRIPARSER_ASYNC_NEXT_NODE } diff --git a/src/siri/db/props.c b/src/siri/db/props.c index a555bf25..a14e5a68 100644 --- a/src/siri/db/props.c +++ b/src/siri/db/props.c @@ -172,7 +172,7 @@ static void prop_sync_progress( siridb_t * siridb, qp_packer_t * packer, int map); -static void prop_tee_pipe_name( +static void prop_tee( siridb_t * siridb, qp_packer_t * packer, int map); @@ -270,8 +270,8 @@ void siridb_init_props(void) prop_status); props_set_cb(CLERI_GID_K_SYNC_PROGRESS - KW_OFFSET, prop_sync_progress); - props_set_cb(CLERI_GID_K_TEE_PIPE_NAME - KW_OFFSET, - prop_tee_pipe_name); + props_set_cb(CLERI_GID_K_TEE - KW_OFFSET, + prop_tee); props_set_cb(CLERI_GID_K_TIMEZONE - KW_OFFSET, prop_timezone); props_set_cb(CLERI_GID_K_TIME_PRECISION - KW_OFFSET, @@ -572,13 +572,13 @@ static void prop_sync_progress( qp_add_string(packer, siridb_initsync_sync_progress(siridb)); } -static void prop_tee_pipe_name( +static void prop_tee( siridb_t * siridb, qp_packer_t * packer, int map) { - SIRIDB_PROP_MAP("tee_pipe_name", 13) - qp_add_string(packer, tee_str(siridb->tee)); + SIRIDB_PROP_MAP("tee", 3) + qp_add_string(packer, siridb_tee_str(siridb->tee)); } static void prop_timezone( diff --git a/src/siri/db/server.c b/src/siri/db/server.c index 7b429922..ccd2f645 100644 --- a/src/siri/db/server.c +++ b/src/siri/db/server.c @@ -1175,12 +1175,6 @@ int siridb_server_cexpr_cb( cond->operator, siridb_initsync_sync_progress(wserver->siridb), cond->str); - - case CLERI_GID_K_TEE_PIPE_NAME: - return cexpr_str_cmp( - cond->operator, - tee_str(wserver->siridb->tee), - cond->str); } log_critical("Unexpected server property received: %d", cond->prop); diff --git a/src/siri/db/servers.c b/src/siri/db/servers.c index f4bebd05..690f3b26 100644 --- a/src/siri/db/servers.c +++ b/src/siri/db/servers.c @@ -701,9 +701,6 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle) case CLERI_GID_K_SYNC_PROGRESS: qp_add_string(query->packer, siridb_initsync_sync_progress(siridb)); break; - case CLERI_GID_K_TEE_PIPE_NAME: - qp_add_string(query->packer, tee_str(siridb->tee)); - break; case CLERI_GID_K_UPTIME: qp_add_int64( query->packer, diff --git a/src/siri/db/tee.c b/src/siri/db/tee.c index 449dbab7..568f2ebc 100644 --- a/src/siri/db/tee.c +++ b/src/siri/db/tee.c @@ -7,248 +7,359 @@ #include #include #include -#include +#include #include #define TEE__BUF_SZ 512 static char tee__buf[TEE__BUF_SZ]; -static void tee__runtime_init(uv_pipe_t * pipe); -static void tee__write_cb(uv_write_t * req, int status); -static void tee__on_connect(uv_connect_t * req, int status); -static void tee__close_cb(uv_pipe_t * pipe); static void tee__alloc_buffer( - uv_handle_t * handle, - size_t suggsz, - uv_buf_t * buf); -static void tee__on_data( - uv_stream_t * client, - ssize_t nread, - const uv_buf_t * buf); - -siridb_tee_t * siridb_tee_new(void) + uv_handle_t * handle __attribute__((unused)), + size_t suggsz __attribute__((unused)), + uv_buf_t * buf) { - siridb_tee_t * tee = malloc(sizeof(siridb_tee_t)); - if (tee == NULL) - { - return NULL; - } - tee->pipe_name_ = NULL; - tee->err_msg_ = NULL; - tee->pipe.data = tee; - tee->flags = SIRIDB_TEE_FLAG; - return tee; + buf->base = tee__buf; + buf->len = TEE__BUF_SZ; } -void siridb_tee_free(siridb_tee_t * tee) +static inline void tee__release(siridb_tee_t * tee) { - free(tee->err_msg_); - free(tee->pipe_name_); - free(tee); + sirinet_promise_decref(tee->promise_); + tee->promise_ = NULL; + uv_mutex_unlock(&tee->lock_); } -int siridb_tee_connect(siridb_tee_t * tee) +static void tee__write_cb(uv_write_t * req, int status) { - if (tee->flags & SIRIDB_TEE_FLAG_CONNECTING) + sirinet_promise_t * promise = req->data; + if (status) { - return 0; + log_error("Socket (tee) write error: %s", uv_strerror(status)); } - tee->flags |= SIRIDB_TEE_FLAG_CONNECTING; - uv_connect_t * req = malloc(sizeof(uv_connect_t)); - if (req == NULL) + sirinet_promise_decref(promise); + free(req); +} + +static void tee__on_data( + uv_tcp_t * tcp, + ssize_t nread, + const uv_buf_t * buf __attribute__((unused))) +{ + siridb_tee_t * tee = tcp->data; + if (!tee) { - return -1; + return; } - req->data = tee; - - if (uv_pipe_init(siri.loop, &tee->pipe, 0)) + if (nread < 0) { - tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING; - free(req); - return -1; + if (nread != UV_EOF) + { + log_error("Read error on tee '%s': '%s'", + tee->address, + uv_err_name(nread)); + } + + log_info("Disconnected from tee `%s`", tee->address); + + uv_close((uv_handle_t *) tcp, (uv_close_cb) free); + tee->tcp = NULL; + return; } - tee->flags |= SIRIDB_TEE_FLAG_INIT; - tee->pipe.data = tee; - uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect); - return 0; + log_debug("Got %zd bytes on tee `%s` which will be ignored", + nread, + tee->address); } -int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name) +static void tee__do_write(siridb_tee_t * tee, sirinet_promise_t * promise) { - free(tee->pipe_name_); - free(tee->err_msg_); - tee->err_msg_ = NULL; + uv_write_t * req = malloc(sizeof(uv_write_t)); + uv_buf_t wrbuf = uv_buf_init( + (char *) promise->pkg, + sizeof(sirinet_pkg_t) + promise->pkg->len); - if (pipe_name == NULL) + if (req) { - tee->pipe_name_ = NULL; + int rc; + req->data = promise; + sirinet_promise_incref(promise); + rc = uv_write(req, (uv_stream_t *) tee->tcp, &wrbuf, 1, tee__write_cb); + if (rc == 0) + { + return; /* success */ + } + sirinet_promise_decref(promise); + } + log_error("Cannot write to tee"); + return; +} - if (tee->flags & SIRIDB_TEE_FLAG_CONNECTED) +static void tee__on_connect(uv_connect_t * req, int status) +{ + siridb_tee_t * tee = req->data; + + if (status == 0) + { + if (uv_read_start( + req->handle, + tee__alloc_buffer, + (uv_read_cb) tee__on_data)) { - uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__close_cb); + /* Failed to start reading the tee connection */ + tee->err_code = SIRIDB_TEE_E_READ; + log_error("Failed to open tee `%s` for reading", tee->address); + goto fail; } - return 0; + + /* success */ + log_info("Connection created to tee: '%s'", tee->address); + tee__do_write(tee, tee->promise_); + goto done; } - tee->pipe_name_ = strdup(pipe_name); - if (!tee->pipe_name_) + /* failed */ + tee->err_code = SIRIDB_TEE_E_CONNECT; + log_warning( + "Cannot connect to tee '%s' (%s)", + tee->address, + uv_strerror(status)); + +fail: + uv_close((uv_handle_t *) req->handle, (uv_close_cb) free); + tee->tcp = NULL; +done: + tee__release(tee); + free(req); +} + +void tee__make_connection(siridb_tee_t * tee, const struct sockaddr * dest) +{ + uv_connect_t * req = malloc(sizeof(uv_connect_t)); + tee->tcp = malloc(sizeof(uv_tcp_t)); + if (tee->tcp == NULL || req == NULL) { - return -1; + goto fail0; } - if (tee->flags & SIRIDB_TEE_FLAG_INIT) + + req->data = tee; + + log_debug("Trying to connect to tee '%s'...", tee->address); + (void) uv_tcp_init(siri.loop, tee->tcp); + (void) uv_tcp_connect(req, tee->tcp, dest, tee__on_connect); + return; + +fail0: + free(req); + free(tee->tcp); + tee->tcp = NULL; + tee__release(tee); +} + +static void tee__on_resolved( + uv_getaddrinfo_t * resolver, + int status, + struct addrinfo * res) +{ + siridb_tee_t * tee = resolver->data; + free(resolver); + + if (status < 0) { - uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__runtime_init); + log_error("Cannot resolve ip address for tee '%s' (error: %s)", + tee->address, + uv_err_name(status)); + tee__release(tee); + return; } - else + + if (Logger.level == LOGGER_DEBUG) { - tee__runtime_init(&tee->pipe); + char addr[47] = {'\0'}; /* enough for both ipv4 and ipv6 */ + + switch (res->ai_family) + { + case AF_INET: + uv_ip4_name((struct sockaddr_in *) res->ai_addr, addr, 16); + break; + + case AF_INET6: + uv_ip6_name((struct sockaddr_in6 *) res->ai_addr, addr, 46); + break; + + default: + sprintf(addr, "unsupported family"); + } + + log_debug( + "Resolved ip address '%s' for tee '%s', " + "trying to connect...", + addr, tee->address); + } - return 0; + + tee__make_connection(tee, (const struct sockaddr *) res->ai_addr); } -void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise) +static void tee__resolve_dns(siridb_tee_t * tee, int ai_family) { - uv_write_t * req = malloc(sizeof(uv_write_t)); - if (!req) + int result; + struct addrinfo hints; + char port[6]= {0}; + uv_getaddrinfo_t * resolver = malloc(sizeof(uv_getaddrinfo_t)); + + hints.ai_family = ai_family; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_NUMERICSERV; + + if (resolver == NULL) { - log_error("Cannot allocate memory for tee request"); return; } - req->data = promise; - sirinet_promise_incref(promise); + resolver->data = tee; - uv_buf_t wrbuf = uv_buf_init( - (char *) promise->pkg, - sizeof(sirinet_pkg_t) + promise->pkg->len); + sprintf(port, "%u", tee->port); + + result = uv_getaddrinfo( + siri.loop, + resolver, + tee__on_resolved, + tee->address, + port, + &hints); - if (uv_write(req, (uv_stream_t *) &tee->pipe, &wrbuf, 1, tee__write_cb)) + if (result) { - log_error("Cannot write to tee"); - sirinet_promise_decref(promise); + log_error("getaddrinfo call error %s", uv_err_name(result)); + free(resolver); } } -const char * tee_str(siridb_tee_t * tee) +void tee__connect(siridb_tee_t * tee) { - if (tee->err_msg_) + struct in_addr sa; + struct in6_addr sa6; + + if (inet_pton(AF_INET, tee->address, &sa)) { - return tee->err_msg_; + /* IPv4 */ + struct sockaddr_in dest; + (void) uv_ip4_addr(tee->address, tee->port, &dest); + tee__make_connection(tee, (const struct sockaddr *) &dest); + return; } - if (tee->pipe_name_) + + if (inet_pton(AF_INET6, tee->address, &sa6)) { - return tee->pipe_name_; + /* IPv6 */ + struct sockaddr_in6 dest6; + (void) uv_ip6_addr(tee->address, tee->port, &dest6); + tee__make_connection(tee, (const struct sockaddr *) &dest6); + return; } - return "disabled"; -} + /* Try DNS */ + tee__resolve_dns(tee, dns_req_family_map(siri.cfg->ip_support)); +} -static void tee__runtime_init(uv_pipe_t * pipe) +siridb_tee_t * siridb_tee_new(void) { - siridb_tee_t * tee = pipe->data; - - tee->flags &= ~SIRIDB_TEE_FLAG_INIT; - tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING; - tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; - - if (siridb_tee_connect(tee)) - { - log_error("Could not connect to tee at runtime"); - } + siridb_tee_t * tee = malloc(sizeof(siridb_tee_t)); + if (tee == NULL) + { + return NULL; + } + tee->address = NULL; + tee->tcp = NULL; + tee->promise_ = NULL; + tee->flags = SIRIDB_TEE_FLAG; + tee->err_code = 0; + uv_mutex_init(&tee->lock_); + return tee; } -static void tee__close_cb(uv_pipe_t * pipe) +void siridb_tee_free(siridb_tee_t * tee) { - siridb_tee_t * tee = pipe->data; + /* must be closed before free can be used */ + assert (tee->tcp == NULL); + assert (tee->promise_ == NULL); - tee->flags &= ~SIRIDB_TEE_FLAG_INIT; - tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING; - tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; + uv_mutex_destroy(&tee->lock_); + free(tee->address); + free(tee); } -static void tee__write_cb(uv_write_t * req, int status) +const char * siridb_tee_str(siridb_tee_t * tee) { - sirinet_promise_t * promise = req->data; - sirinet_promise_decref(promise); - if (status) + if (tee->err_code) { - log_error("Socket (tee) write error: %s", uv_strerror(status)); + switch((enum siridb_tee_e_t) tee->err_code) + { + case SIRIDB_TEE_E_OK: return "OK"; + case SIRIDB_TEE_E_ALLOC: return "memory allocation error"; + case SIRIDB_TEE_E_READ: return "failed to open socket for reading"; + case SIRIDB_TEE_E_CONNECT: return "failed to make connection"; + } } - free(req); + if (tee->address) + { + return tee->address; + } + return "disabled"; } -static void tee__on_connect(uv_connect_t * req, int status) +void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise) { - siridb_tee_t * tee = req->data; - - if (status == 0) + if (!tee->address) { - log_info("Connection created to pipe: '%s'", tee->pipe_name_); - if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data)) - { - free(tee->err_msg_); - if (asprintf(&tee->err_msg_, - "Cannot open pipe '%s' for reading", - tee->pipe_name_) >= 0) - { - log_warning(tee->err_msg_); - } - goto fail; - } - tee->flags |= SIRIDB_TEE_FLAG_CONNECTED; - goto done; + /* Tee is not configured */ + return; } - free(tee->err_msg_); - tee->err_msg_ = NULL; - - if (asprintf( - &tee->err_msg_, - "Cannot connect to pipe '%s' (%s)", - tee->pipe_name_, - uv_strerror(status)) >= 0) + if (tee->tcp) { - log_warning(tee->err_msg_); + tee__do_write(tee, promise); } + else + { + uv_mutex_lock(&tee->lock_); -fail: - uv_close((uv_handle_t *) req->handle, (uv_close_cb) tee__close_cb); -done: - free(req); + if (!tee->address) + { + /* Tee is not configured */ + uv_mutex_unlock(&tee->lock_); + return; + } + + assert (tee->promise_ == NULL); + tee->promise_ = promise; + sirinet_promise_incref(promise); + tee__connect(tee); + } } -static void tee__alloc_buffer( - uv_handle_t * handle __attribute__((unused)), - size_t suggsz __attribute__((unused)), - uv_buf_t * buf) +int siridb_tee_set_address_port( + siridb_tee_t * tee, + const char * address, + uint16_t port) { - buf->base = tee__buf; - buf->len = TEE__BUF_SZ; -} + uv_mutex_lock(&tee->lock_); + free(tee->address); + tee->err_code = SIRIDB_TEE_E_OK; + tee->address = address ? strdup(address) : NULL; + tee->port = port; -static void tee__on_data( - uv_stream_t * client, - ssize_t nread, - const uv_buf_t * buf __attribute__((unused))) -{ - if (nread < 0) + if (tee->tcp && !uv_is_closing((uv_handle_t *) tee->tcp)) { - if (nread != UV_EOF) - { - log_error("Read error on pipe '%s' : '%s'", - sirinet_pipe_name((uv_pipe_t * ) client), - uv_err_name(nread)); - } - log_info("Disconnected from tee"); - uv_close((uv_handle_t *) client, (uv_close_cb) tee__close_cb); + uv_close((uv_handle_t *) tee->tcp, (uv_close_cb) free); + tee->tcp = NULL; } - if (nread > 0) - { - log_debug("Got %zd bytes on tee which will be ignored", nread); - } + uv_mutex_unlock(&tee->lock_); + + return address && !tee->address ? -1 : 0; } diff --git a/src/siri/grammar/grammar.c b/src/siri/grammar/grammar.c index c533570e..403c6b04 100644 --- a/src/siri/grammar/grammar.c +++ b/src/siri/grammar/grammar.c @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2020-09-25 10:57:26 + * Created at: 2022-04-15 12:10:03 */ #include "siri/grammar/grammar.h" @@ -166,7 +166,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE); cleri_t * k_tag = cleri_keyword(CLERI_GID_K_TAG, "tag", CLERI_CASE_SENSITIVE); cleri_t * k_tags = cleri_keyword(CLERI_GID_K_TAGS, "tags", CLERI_CASE_SENSITIVE); - cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE); + cleri_t * k_tee = cleri_keyword(CLERI_GID_K_TEE, "tee", CLERI_CASE_SENSITIVE); cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE); cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE); cleri_t * k_timeval = cleri_keyword(CLERI_GID_K_TIMEVAL, "timeval", CLERI_CASE_SENSITIVE); @@ -312,7 +312,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_t * server_columns = cleri_list(CLERI_GID_SERVER_COLUMNS, cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 29, + 28, k_address, k_buffer_path, k_buffer_size, @@ -340,7 +340,6 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_reindex_progress, k_selected_points, k_sync_progress, - k_tee_pipe_name, k_uptime ), cleri_token(CLERI_NONE, ","), 1, 0, 0); cleri_t * group_columns = cleri_list(CLERI_GID_GROUP_COLUMNS, cleri_choice( @@ -625,7 +624,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 12, + 11, k_address, k_buffer_path, k_dbpath, @@ -636,8 +635,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_version, k_status, k_reindex_progress, - k_sync_progress, - k_tee_pipe_name + k_sync_progress ), str_operator, string @@ -1153,11 +1151,11 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_address, string ); - cleri_t * set_tee_pipe_name = cleri_sequence( - CLERI_GID_SET_TEE_PIPE_NAME, + cleri_t * set_tee = cleri_sequence( + CLERI_GID_SET_TEE, 3, k_set, - k_tee_pipe_name, + k_tee, cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, @@ -1278,13 +1276,14 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 6, + 7, set_drop_threshold, set_list_limit, set_select_points_limit, set_timezone, set_expiration_num, - set_expiration_log + set_expiration_log, + set_tee ) ); cleri_t * alter_group = cleri_sequence( @@ -1320,10 +1319,9 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 5, + 4, set_log_level, set_backup_mode, - set_tee_pipe_name, set_address, set_port ) @@ -1336,9 +1334,8 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 2, - set_log_level, - set_tee_pipe_name + 1, + set_log_level ) ); cleri_t * alter_user = cleri_sequence( @@ -1724,7 +1721,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_startup_time, k_status, k_sync_progress, - k_tee_pipe_name, + k_tee, k_time_precision, k_timezone, k_uptime, diff --git a/src/siri/heartbeat.c b/src/siri/heartbeat.c index 200740ce..5c292869 100644 --- a/src/siri/heartbeat.c +++ b/src/siri/heartbeat.c @@ -61,12 +61,6 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused))) siridb_update_shard_expiration(siridb); - if ( siridb_tee_is_configured(siridb->tee) && - !siridb_tee_is_connected(siridb->tee)) - { - siridb_tee_connect(siridb->tee); - } - server_node = siridb->servers->first; while (server_node != NULL) { diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index 2d59bab3..2c2d3402 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -43,9 +43,6 @@ static void on_flags_update(sirinet_stream_t * client, sirinet_pkg_t * pkg); static void on_log_level_update( sirinet_stream_t * client, sirinet_pkg_t * pkg); -static void on_tee_pipe_name_update( - sirinet_stream_t * client, - sirinet_pkg_t * pkg); static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg); static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg); static void on_query( @@ -280,8 +277,8 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg) case BPROTO_DISABLE_BACKUP_MODE: on_disable_backup_mode(client, pkg); break; - case BPROTO_TEE_PIPE_NAME_UPDATE: - on_tee_pipe_name_update(client, pkg); + case _BPROTO_TEE_PIPE_NAME_UPDATE: + log_warning("BPROTO_TEE_PIPE_NAME_UPDATE is deprecated"); break; case BPROTO_DROP_DATABASE: on_drop_database(client, pkg); @@ -450,30 +447,6 @@ static void on_log_level_update(sirinet_stream_t * client, sirinet_pkg_t * pkg) } } -static void on_tee_pipe_name_update( - sirinet_stream_t * client, - sirinet_pkg_t * pkg) -{ - SERVER_CHECK_AUTHENTICATED(client, server); - siridb_t * siridb = client->siridb; - sirinet_pkg_t * package; - - char * pipe_name = pkg->len - ? strndup((const char *) pkg->data, pkg->len) - : NULL; - - (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); - - free(pipe_name); - - package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL); - if (package != NULL) - { - /* ignore result code, signal can be raised */ - sirinet_pkg_send(client, package); - } -} - static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg) { SERVER_CHECK_AUTHENTICATED(client, server) diff --git a/src/siri/net/protocol.c b/src/siri/net/protocol.c index fc90cd98..a3b8682b 100644 --- a/src/siri/net/protocol.c +++ b/src/siri/net/protocol.c @@ -84,7 +84,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n) case BPROTO_REQ_GROUPS: return "BPROTO_REQ_GROUPS"; case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE"; case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE"; - case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE"; + case _BPROTO_TEE_PIPE_NAME_UPDATE: return "_BPROTO_TEE_PIPE_NAME_UPDATE"; case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE"; case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS"; case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS"; diff --git a/src/siri/service/client.c b/src/siri/service/client.c index 1eb4fb20..6e433f7e 100644 --- a/src/siri/service/client.c +++ b/src/siri/service/client.c @@ -620,6 +620,7 @@ static void CLIENT_on_file_database( qp_fpacker_t * fpacker; qp_unpacker_t unpacker; qp_obj_t + qp_obj, qp_uuid, qp_schema, qp_dbname, @@ -632,7 +633,9 @@ static void CLIENT_on_file_database( qp_points_limit, qp_list_limit, qp_exp_log, - qp_exp_num; + qp_exp_num, + qp_tee_address, + qp_tee_port; siridb_t * siridb; int rc; /* 13 = strlen("database.dat")+1 */ @@ -656,16 +659,57 @@ static void CLIENT_on_file_database( return; } - /* list and points limit require at least schema 1 */ - (void) qp_next(&unpacker, &qp_points_limit); - (void) qp_next(&unpacker, &qp_list_limit); + if (qp_schema.via.int64 >= 2) + { + if (qp_next(&unpacker, &qp_points_limit) != QP_INT64|| + qp_next(&unpacker, &qp_list_limit) != QP_INT64) + { + CLIENT_err(adm_client, "invalid database file received"); + return; + } + } + else + { + qp_points_limit.via.int64 = DEF_SELECT_POINTS_LIMIT; + qp_list_limit.via.int64 = DEF_LIST_LIMIT; + } + + if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6) + { + qp_next(&unpacker, &qp_obj); + /* Skip the tee pipe name */ + } - /* this is the tee pipe name when schema is >= 5 */ - (void) qp_next(&unpacker, &qp_exp_log); - (void) qp_next(&unpacker, &qp_exp_num); + if (qp_schema.via.int64 >= 6) + { + if (qp_next(&unpacker, &qp_exp_log) != QP_INT64|| + qp_next(&unpacker, &qp_exp_num) != QP_INT64) + { + CLIENT_err(adm_client, "invalid database file received"); + return; + } + } + else + { + qp_exp_log.via.int64 = 0; + qp_exp_num.via.int64 = 0; + } - /* these are the expiration times when schema is >= 6 */ + if (qp_schema.via.int64 >= 7) + { + if (qp_next(&unpacker, &qp_tee_address) == QP_ERR || + qp_next(&unpacker, &qp_tee_port) != QP_INT64) + { + CLIENT_err(adm_client, "invalid database file received"); + return; + } + } + else + { + qp_tee_address.tp = QP_NULL; + qp_tee_port.via.int64 = SIRIDB_TEE_DEFAULT_TCP_PORT; + } if ((fpacker = qp_open(fn, "w")) == NULL) { @@ -673,6 +717,8 @@ static void CLIENT_on_file_database( return; } + LOGC("HERE"); + rc = ( qp_fadd_type(fpacker, QP_ARRAY_OPEN) || qp_fadd_int64(fpacker, SIRIDB_SCHEMA) || qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) || @@ -683,19 +729,17 @@ static void CLIENT_on_file_database( qp_fadd_int64(fpacker, qp_duration_log.via.int64) || qp_fadd_raw(fpacker, qp_timezone.via.raw, qp_timezone.len) || qp_fadd_double(fpacker, qp_drop_threshold.via.real) || - qp_fadd_int64(fpacker, qp_points_limit.tp == QP_INT64 - ? qp_points_limit.via.int64 - : DEF_SELECT_POINTS_LIMIT) || - qp_fadd_int64(fpacker, qp_list_limit.tp == QP_INT64 - ? qp_list_limit.via.int64 - : DEF_LIST_LIMIT) || - qp_fadd_type(fpacker, QP_NULL) || - qp_fadd_int64(fpacker, qp_exp_log.tp == QP_INT64 - ? qp_exp_log.via.int64 - : 0) || - qp_fadd_int64(fpacker, qp_exp_num.tp == QP_INT64 - ? qp_exp_num.via.int64 - : 0) || + qp_fadd_int64(fpacker, qp_points_limit.via.int64) || + qp_fadd_int64(fpacker, qp_list_limit.via.int64) || + qp_fadd_int64(fpacker, qp_exp_log.via.int64) || + qp_fadd_int64(fpacker, qp_exp_num.via.int64) || + (qp_tee_address.tp == QP_RAW + ? qp_fadd_raw( + fpacker, + qp_tee_address.via.raw, + qp_tee_address.len) + : qp_fadd_type(fpacker, QP_NULL)) || + qp_fadd_int64(fpacker, qp_tee_port.via.int64) || qp_fadd_type(fpacker, QP_ARRAY_CLOSE) || qp_close(fpacker)); diff --git a/src/siri/service/request.c b/src/siri/service/request.c index 12560ce1..bc0fc239 100644 --- a/src/siri/service/request.c +++ b/src/siri/service/request.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -720,9 +721,10 @@ static cproto_server_t SERVICE_on_new_database( qp_fadd_double(fp, DEF_DROP_THRESHOLD) || qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) || qp_fadd_int64(fp, DEF_LIST_LIMIT) || - qp_fadd_type(fp, QP_NULL) || qp_fadd_int64(fp, 0) || qp_fadd_int64(fp, 0) || + qp_fadd_type(fp, QP_NULL) || + qp_fadd_int64(fp, SIRIDB_TEE_DEFAULT_TCP_PORT) || qp_fadd_type(fp, QP_ARRAY_CLOSE)) { rc = -1; diff --git a/src/siri/siri.c b/src/siri/siri.c index fd250e34..ce112dd2 100644 --- a/src/siri/siri.c +++ b/src/siri/siri.c @@ -631,10 +631,15 @@ static void SIRI_walk_close_handlers( case UV_TCP: case UV_NAMED_PIPE: { - if (handle->data == NULL || siridb_tee_is_handle(handle)) + if (handle->data == NULL) { uv_close(handle, NULL); } + else if (siridb_tee_is_handle(handle)) + { + // TODO: close tee handle + assert (0); + } else if (siri_health_is_handle(handle)) { siri_health_close((siri_health_request_t *) handle->data);