From: Jeroen van der Heijden Date: Tue, 28 Jul 2020 14:35:17 +0000 (+0200) Subject: work on tag support X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~3^2~6^2~8 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=fb189fb7ebef17ca779cb8fd85aa79179d646a33;p=siridb-server.git work on tag support --- diff --git a/include/qpack/qpack.h b/include/qpack/qpack.h index 3c8e731f..f6659d76 100644 --- a/include/qpack/qpack.h +++ b/include/qpack/qpack.h @@ -175,6 +175,7 @@ static inline int qp_is_raw_term(qp_obj_t * qp_obj) int qp_add_raw(qp_packer_t * packer, const unsigned char * raw, size_t len); int qp_add_string(qp_packer_t * packer, const char * str); int qp_add_string_term(qp_packer_t * packer, const char * str); +int qp_add_string_term_n(qp_packer_t * packer, const char * str, size_t n); int qp_add_raw_term(qp_packer_t * packer, const unsigned char * raw, size_t len); int qp_add_double(qp_packer_t * packer, double real); diff --git a/include/siri/db/reindex.h b/include/siri/db/reindex.h index db80b426..63eda084 100644 --- a/include/siri/db/reindex.h +++ b/include/siri/db/reindex.h @@ -42,7 +42,8 @@ struct siridb_reindex_s int fd; long int size; uint32_t * next_series_id; - sirinet_pkg_t * pkg; + sirinet_pkg_t * pkg_points; + sirinet_pkg_t * pkg_tags; siridb_series_t * series; siridb_server_t * server; uv_timer_t * timer; diff --git a/include/siri/db/tags.h b/include/siri/db/tags.h index 5bf36ed0..95c5b879 100644 --- a/include/siri/db/tags.h +++ b/include/siri/db/tags.h @@ -39,10 +39,16 @@ int siridb_tags_drop_tag( siridb_tags_t * tags, const char * name, char * err_msg); +siridb_tag_t * siridb_tags_add_n( + siridb_tags_t * tags, + const char * name, + size_t name_len); siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name); void siridb_tags_dropped_series(siridb_tags_t * tags); void siridb_tags_save(siridb_tags_t * tags); void siridb_tags_init_nseries(siridb_tags_t * tags); +sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid); +sirinet_pkg_t * siridb_tags_series(siridb_series_t * series); #define siridb_tags_set_require_save(__tags, __tag) \ diff --git a/include/siri/net/protocol.h b/include/siri/net/protocol.h index 23c5f17d..7efb3374 100644 --- a/include/siri/net/protocol.h +++ b/include/siri/net/protocol.h @@ -79,6 +79,8 @@ typedef enum BPROTO_DISABLE_BACKUP_MODE, /* empty */ BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */ BPROTO_DROP_DATABASE, /* empty */ + BPROTO_REQ_TAGS, /* empty */ + BPROTO_SERIES_TAGS, /* [series name, tag name, ...] */ } bproto_client_t; /* @@ -128,8 +130,8 @@ typedef enum BPROTO_RES_GROUPS, /* [[name, series], ...] */ BPROTO_ACK_TEE_PIPE_NAME, /* empty */ BPROTO_ACK_DROP_DATABASE, /* empty */ - BPROTO_RES_TAGS, /* [[name, series], ...] */ - + BPROTO_RES_TAGS, /* [[name, series], ...] */ + BPROTO_ACK_SERIES_TAGS /* empty */ } bproto_server_t; #define sirinet_protocol_is_error(tp) (tp >= 64 && tp < 192) diff --git a/itest/test_cluster.py b/itest/test_cluster.py index a686d393..3a23e280 100644 --- a/itest/test_cluster.py +++ b/itest/test_cluster.py @@ -2,6 +2,7 @@ import asyncio import functools import random import time +import math from testing import Client from testing import default_test_setup from testing import gen_data @@ -20,25 +21,107 @@ from testing import UserAuthError from testing import parse_args +DATA = { + 'series-001 float': [ + [1471254705, 1.5], + [1471254707, -3.5], + [1471254710, -7.3]], + 'series-001 integer': [ + [1471254705, 5], + [1471254708, -3], + [1471254710, -7]], + 'series-002 float': [ + [1471254705, 3.5], + [1471254707, -2.5], + [1471254710, -8.3]], + 'series-002 integer': [ + [1471254705, 4], + [1471254708, -1], + [1471254710, -8]], + 'aggr': [ + [1447249033, 531], [1447249337, 534], + [1447249633, 535], [1447249937, 531], + [1447250249, 532], [1447250549, 537], + [1447250868, 530], [1447251168, 520], + [1447251449, 54], [1447251749, 54], + [1447252049, 513], [1447252349, 537], + [1447252649, 528], [1447252968, 531], + [1447253244, 533], [1447253549, 538], + [1447253849, 534], [1447254149, 532], + [1447254449, 533], [1447254748, 537]], + 'huge': [ + [1471254705, 9223372036854775807], + [1471254706, 9223372036854775806], + [1471254707, 9223372036854775805], + [1471254708, 9223372036854775804]], + 'equal ts': [ + [1471254705, 0], [1471254705, 1], [1471254705, 1], + [1471254707, 0], [1471254707, 1], [1471254708, 0], + ], + 'variance': [ + [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25], + [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25], + [1471254711, 3.5] + ], + 'pvariance': [ + [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25], + [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75], + [1471254711, 2.75], [1471254712, 3.25] + ], + 'filter': [ + [1471254705, 5], + [1471254710, -3], + [1471254715, -7], + [1471254720, 7] + ], + 'one': [ + [1471254710, 1] + ], + 'log': [ + [1471254710, 'log line one'], + [1471254712, 'log line two'], + [1471254714, 'another line (three)'], + [1471254716, 'and yet one more'], + ], + 'special': [ + [1471254705, 0.1], + [1471254706, math.nan], + [1471254707, math.inf], + [1471254708, -math.inf], + ] +} + + class TestCluster(TestBase): title = 'Test siridb-cluster' - @default_test_setup(4, time_precision='s') + @default_test_setup(2, time_precision='s') async def run(self): await self.client0.connect() + await self.client0.insert(DATA) + + await self.client0.query(''' + alter series /series.*/ tag `SERIES` + ''') + + await self.client0.query(''' + alter series /series.*/ tag `OTHER` + ''') + await self.db.add_pool(self.server1) await self.assertIsRunning(self.db, self.client0, timeout=12) - await asyncio.sleep(45) + # await asyncio.sleep(45) - await self.db.add_replica(self.server2, 0) - await self.assertIsRunning(self.db, self.client0, timeout=12) + # await self.db.add_replica(self.server2, 0) + # await self.assertIsRunning(self.db, self.client0, timeout=12) - await asyncio.sleep(45) - await self.db.add_replica(self.server3, 1) - await self.assertIsRunning(self.db, self.client0, timeout=12) + # await asyncio.sleep(45) + + # await self.db.add_replica(self.server3, 1) + # await self.assertIsRunning(self.db, self.client0, timeout=12) # await asyncio.sleep(35) diff --git a/src/qpack/qpack.c b/src/qpack/qpack.c index 07c6b5e5..adbc7d98 100644 --- a/src/qpack/qpack.c +++ b/src/qpack/qpack.c @@ -418,6 +418,10 @@ int qp_add_string_term(qp_packer_t * packer, const char * str) { return qp_add_raw(packer, (unsigned char *) str, strlen(str) + 1); } +int qp_add_string_term_n(qp_packer_t * packer, const char * str, size_t n) +{ + return qp_add_raw(packer, (unsigned char *) str, n + 1); +} /* * Adds a raw string to the packer and appends a terminator (0) so the written diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 74f7bd4f..1019a0c2 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -2452,7 +2452,7 @@ static void exit_count_tags(uv_async_t * handle) } else { - sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_GROUPS, NULL); + sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_TAGS, NULL); if (pkg != NULL) { @@ -3550,7 +3550,7 @@ static void exit_list_tags(uv_async_t * handle) } else { - sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_GROUPS, NULL); + sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_TAGS, NULL); if (pkg != NULL) { @@ -6059,7 +6059,7 @@ static void on_tags_response(vec_t * promises, uv_async_t * handle) pkg = (sirinet_pkg_t *) promise->data; - if (pkg != NULL && pkg->tp == BPROTO_RES_GROUPS) + if (pkg != NULL && pkg->tp == BPROTO_RES_TAGS) { qp_unpacker_init(&unpacker, pkg->data, pkg->len); diff --git a/src/siri/db/reindex.c b/src/siri/db/reindex.c index d85e94c2..f98cea5c 100644 --- a/src/siri/db/reindex.c +++ b/src/siri/db/reindex.c @@ -55,6 +55,10 @@ static void REINDEX_on_insert_response( sirinet_promise_t * promise, sirinet_pkg_t * pkg, int status); +static void REINDEX_on_tag_response( + sirinet_promise_t * promise, + sirinet_pkg_t * pkg, + int status); static char reindex_progress[30]; @@ -76,7 +80,8 @@ siridb_reindex_t * siridb_reindex_open(siridb_t * siridb, int create_new) reindex->fn = NULL; reindex->fp = NULL; reindex->next_series_id = NULL; - reindex->pkg = NULL; + reindex->pkg_points = NULL; + reindex->pkg_tags = NULL; reindex->timer = NULL; reindex->server = NULL; if (REINDEX_fn(siridb, reindex) < 0) @@ -235,7 +240,8 @@ void siridb_reindex_free(siridb_reindex_t ** reindex) } free((*reindex)->fn); free((*reindex)->next_series_id); - free((*reindex)->pkg); + free((*reindex)->pkg_points); + free((*reindex)->pkg_tags); free(*reindex); *reindex = NULL; } @@ -320,7 +326,7 @@ void siridb_reindex_start(uv_timer_t * timer) static void REINDEX_send(uv_timer_t * timer) { siridb_t * siridb = (siridb_t *) timer->data; - assert (siridb->reindex->pkg != NULL); + assert (siridb->reindex->pkg_points != NULL); /* actually 'available' is sufficient since the destination server has * never status 're-indexing' unless one day we support down-scaling. */ @@ -328,11 +334,21 @@ static void REINDEX_send(uv_timer_t * timer) { siridb_server_send_pkg( siridb->reindex->server, - siridb->reindex->pkg, + siridb->reindex->pkg_points, REINDEX_TIMEOUT, (sirinet_promise_cb) REINDEX_on_insert_response, siridb, FLAG_KEEP_PKG); + if (siridb->reindex->pkg_tags) + { + siridb_server_send_pkg( + siridb->reindex->server, + siridb->reindex->pkg_tags, + REINDEX_TIMEOUT, + (sirinet_promise_cb) REINDEX_on_tag_response, + NULL, + FLAG_KEEP_PKG); + } } else { @@ -358,8 +374,10 @@ static void REINDEX_send(uv_timer_t * timer) static int REINDEX_next_series_id(siridb_reindex_t * reindex) { /* free re-index package */ - free(reindex->pkg); - reindex->pkg = NULL; + free(reindex->pkg_points); + free(reindex->pkg_tags); + reindex->pkg_points = NULL; + reindex->pkg_tags = NULL; int rc; reindex->size -= sizeof(uint32_t); @@ -436,7 +454,8 @@ static void REINDEX_work(uv_timer_t * timer) assert (SIRI_OPTIMZE_IS_PAUSED); assert (reindex != NULL); - assert (siridb->reindex->pkg == NULL); + assert (siridb->reindex->pkg_points == NULL); + assert (siridb->reindex->pkg_tags == NULL); reindex->series = imap_get(siridb->series_map, *reindex->next_series_id); @@ -487,10 +506,15 @@ static void REINDEX_work(uv_timer_t * timer) if (siridb_points_pack(points, packer) == 0) { - reindex->pkg = sirinet_packer2pkg( + reindex->pkg_points = sirinet_packer2pkg( packer, 0, BPROTO_INSERT_TESTED_SERVER); + + /* tag package may be NULL when no tag need to be + * synchronized */ + reindex->pkg_tags = siridb_tags_series(reindex->series); + uv_timer_start( reindex->timer, REINDEX_send, @@ -503,6 +527,8 @@ static void REINDEX_work(uv_timer_t * timer) } } siridb_points_free(points); + + } } } @@ -547,6 +573,7 @@ static void REINDEX_commit_series(siridb_t * siridb) siridb_series_flush_dropped(siridb); } } + /* * Call-back function: sirinet_promise_cb */ @@ -607,6 +634,22 @@ static void REINDEX_on_insert_response( sirinet_promise_decref(promise); } +/* + * Call-back function: sirinet_promise_cb + */ +static void REINDEX_on_tag_response( + sirinet_promise_t * promise, + sirinet_pkg_t * pkg, + int status) +{ + if (status) + { + log_error("Error while sending tags (%d)", status); + } + + sirinet_promise_decref(promise); +} + /* * Typedef: imap_cb * diff --git a/src/siri/db/tags.c b/src/siri/db/tags.c index 66b8ee71..91b1c6a9 100644 --- a/src/siri/db/tags.c +++ b/src/siri/db/tags.c @@ -111,6 +111,25 @@ int siridb_tags_drop_tag( return 0; } +siridb_tag_t * siridb_tags_add_n( + siridb_tags_t * tags, + const char * name, + size_t name_len) +{ + siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++); + + if (tag != NULL) + { + tag->name = strndup(name, name_len); + if (tag->name == NULL || ct_add(tags->tags, tag->name, tag)) + { + siridb_tag_decref(tag); + tag = NULL; + } + } + return tag; +} + siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name) { siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++); @@ -172,6 +191,92 @@ void siridb_tags_init_nseries(siridb_tags_t * tags) ct_values(tags->tags, (ct_val_cb) TAGS_nseries, NULL); } +/* + * Main thread. + */ +static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer) +{ + int rc = 0; + rc += qp_add_type(packer, QP_ARRAY2); + rc += qp_add_string_term(packer, tag->name); + rc += qp_add_int64(packer, tag->series->len); + return rc; +} + +/* + * Main thread. + * + * Returns NULL and raises a signal in case of an error. + */ +sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid) +{ + qp_packer_t * packer = sirinet_packer_new(8192); + int rc; + + if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN)) + { + return NULL; /* signal is raised */ + } + + rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer); + + if (rc) + { + /* signal is raised when not 0 */ + qp_packer_free(packer); + return NULL; + } + + return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS); +} + +typedef struct +{ + qp_packer_t * packer; + uint64_t id; +} TAGS_series_t; + +/* + * Main thread. + */ +static int TAGS_series_pkg(siridb_tag_t * tag, TAGS_series_t * w) +{ + return imap_get(tag->series, w->id) + ? qp_add_string(w->packer, tag->name) == 0 + : 0; +} + +/* + * Main thread. + * + * Returns NULL and raises a signal in case of an error. + */ +sirinet_pkg_t * siridb_tags_series(siridb_series_t * series) +{ + TAGS_series_t w = { + .packer = sirinet_packer_new(1024), + .id = series->id, + }; + + if (w.packer == NULL || + qp_add_type(w.packer, QP_ARRAY_OPEN) || + qp_add_string_term_n(w.packer, series->name, series->name_len)) + { + return NULL; + } + + if (ct_values( + series->siridb->tags->tags, + (ct_val_cb) TAGS_series_pkg, + &w) == 0) + { + free(w.packer); + return NULL; + } + + return sirinet_packer2pkg(w.packer, 0, BPROTO_SERIES_TAGS); +} + /* * Main thread. */ diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index b4792d44..1cbcc74c 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -63,6 +63,8 @@ static void on_enable_backup_mode( static void on_disable_backup_mode( sirinet_stream_t * client, sirinet_pkg_t * pkg); +static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg); +static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg); static uv_loop_t * loop = NULL; static struct sockaddr_storage server_addr; @@ -283,6 +285,12 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg) case BPROTO_DROP_DATABASE: on_drop_database(client, pkg); break; + case BPROTO_REQ_TAGS: + on_req_tags(client, pkg); + break; + case BPROTO_SERIES_TAGS: + on_series_tags(client, pkg); + break; } } @@ -819,3 +827,114 @@ static void on_disable_backup_mode(sirinet_stream_t * client, sirinet_pkg_t * pk sirinet_pkg_send(client, package); } } + +static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg) +{ + SERVER_CHECK_AUTHENTICATED(client, server) + + siridb_t * siridb = client->siridb; + sirinet_pkg_t * package = siridb_tags_pkg(siridb->tags, pkg->pid); + + if (package != NULL) + { + sirinet_pkg_send(client, package); + } +} + +static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg) +{ + SERVER_CHECK_AUTHENTICATED(client, server) + + sirinet_pkg_t * package = NULL; + siridb_t * siridb = client->siridb; + + if (~siridb->server->flags & SERVER_FLAG_RUNNING) + { + log_error("Cannot tag series because of having status %d", + siridb->server->flags); + + package = sirinet_pkg_new( + pkg->pid, + 0, + BPROTO_ERR_DROP_SERIES, + NULL); + } + else + { + qp_obj_t qp_series_name; + qp_unpacker_t unpacker; + qp_unpacker_init(&unpacker, pkg->data, pkg->len); + + if (qp_is_array(qp_next(&unpacker, NULL)) && + qp_next(&unpacker, &qp_series_name) == QP_RAW && + qp_is_raw_term(&qp_series_name)) + { + siridb_series_t * series; + + series = ct_get( + siridb->series, + (const char *) qp_series_name.via.raw); + + if (series != NULL) + { + qp_obj_t qp_tag_name; + + /* take a reference since this task might wait for a lock */ + + ++series->ref; + + uv_mutex_lock(&siridb->tags->mutex); + + while (qp_next(&unpacker, &qp_tag_name) == QP_RAW) + { + siridb_tag_t * tag = ct_getn( + siridb->tags->tags, + qp_tag_name.via.str, + qp_tag_name.len); + + if (tag == NULL) + { + tag = siridb_tags_add_n( + siridb->tags, + qp_tag_name.via.str, + qp_tag_name.len); + } + + if (tag && imap_add(tag->series, series->id, series) == 0) + { + ++series->ref; + } + } + + uv_mutex_unlock(&siridb->tags->mutex); + + siridb_series_decref(series); + } + else + { + log_warning( + "Received a request to tag series '%s' but " + "the series is not found (already dropped?)", + qp_series_name.via.raw); + } + + package = sirinet_pkg_new( + pkg->pid, + 0, + BPROTO_ACK_SERIES_TAGS, + NULL); + } + else + { + log_error( + "Illegal back-end tag series package " + "received, probably the series name was not " + "terminated?"); + } + } + + if (package != NULL) + { + sirinet_pkg_send(client, package); + } +} diff --git a/src/siri/net/protocol.c b/src/siri/net/protocol.c index 0d50d335..f07d2508 100644 --- a/src/siri/net/protocol.c +++ b/src/siri/net/protocol.c @@ -86,6 +86,8 @@ const char * sirinet_bproto_client_str(bproto_client_t n) case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE"; case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE"; case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE"; + case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS"; + case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS"; default: sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n); return protocol_str; @@ -124,6 +126,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n) case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME"; case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE"; case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS"; + case BPROTO_ACK_SERIES_TAGS: return "BPROTO_ACK_SERIES_TAGS"; default: sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n); return protocol_str;