From: Jeroen van der Heijden Date: Wed, 29 Jul 2020 10:16:37 +0000 (+0200) Subject: Update testing X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~3^2~6^2~7 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=c86eb73c6c3233bc81f738f930cc973dc2c81169;p=siridb-server.git Update testing --- diff --git a/include/siri/db/initsync.h b/include/siri/db/initsync.h index a76c4c82..8642cd54 100644 --- a/include/siri/db/initsync.h +++ b/include/siri/db/initsync.h @@ -26,7 +26,8 @@ struct siridb_initsync_s int fd; long int size; uint32_t * next_series_id; - sirinet_pkg_t * pkg; + sirinet_pkg_t * pkg_points; + sirinet_pkg_t * pkg_tags; }; #endif /* SIRIDB_INITSYNC_H_ */ diff --git a/itest/run_all.py b/itest/run_all.py index 155389c2..37515876 100644 --- a/itest/run_all.py +++ b/itest/run_all.py @@ -20,6 +20,7 @@ from test_select import TestSelect from test_select_ns import TestSelectNano from test_series import TestSeries from test_server import TestServer +from test_tags import TestTags from test_tee import TestTee from test_user import TestUser @@ -42,5 +43,6 @@ if __name__ == '__main__': run_test(TestSelectNano()) run_test(TestSeries()) run_test(TestServer()) + run_test(TestTags()) run_test(TestTee()) run_test(TestUser()) diff --git a/itest/test_cluster.py b/itest/test_cluster.py index 3a23e280..98e422eb 100644 --- a/itest/test_cluster.py +++ b/itest/test_cluster.py @@ -95,7 +95,7 @@ DATA = { class TestCluster(TestBase): title = 'Test siridb-cluster' - @default_test_setup(2, time_precision='s') + @default_test_setup(3, time_precision='s') async def run(self): await self.client0.connect() @@ -105,18 +105,19 @@ class TestCluster(TestBase): alter series /series.*/ tag `SERIES` ''') + await asyncio.sleep(3.0) + await self.client0.query(''' - alter series /series.*/ tag `OTHER` + alter series /.*/ - `SERIES` tag `OTHER` ''') await self.db.add_pool(self.server1) - await self.assertIsRunning(self.db, self.client0, timeout=12) - - # await asyncio.sleep(45) + await self.assertIsRunning(self.db, self.client0, timeout=30) - # await self.db.add_replica(self.server2, 0) - # await self.assertIsRunning(self.db, self.client0, timeout=12) + await asyncio.sleep(35) + await self.db.add_replica(self.server2, 0) + await self.assertIsRunning(self.db, self.client0, timeout=30) # await asyncio.sleep(45) diff --git a/itest/test_tags.py b/itest/test_tags.py new file mode 100644 index 00000000..6b871a43 --- /dev/null +++ b/itest/test_tags.py @@ -0,0 +1,245 @@ +import asyncio +import functools +import random +import time +import math +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import ServerError +from testing import SiriDB +from testing import TestBase +from testing import UserAuthError +from testing import parse_args + + +DATA = { + 'series-001 float': [ + [1471254705, 1.5], + [1471254707, -3.5], + [1471254710, -7.3]], + 'series-001 integer': [ + [1471254705, 5], + [1471254708, -3], + [1471254710, -7]], + 'series-002 float': [ + [1471254705, 3.5], + [1471254707, -2.5], + [1471254710, -8.3]], + 'series-002 integer': [ + [1471254705, 4], + [1471254708, -1], + [1471254710, -8]], + 'aggr': [ + [1447249033, 531], [1447249337, 534], + [1447249633, 535], [1447249937, 531], + [1447250249, 532], [1447250549, 537], + [1447250868, 530], [1447251168, 520], + [1447251449, 54], [1447251749, 54], + [1447252049, 513], [1447252349, 537], + [1447252649, 528], [1447252968, 531], + [1447253244, 533], [1447253549, 538], + [1447253849, 534], [1447254149, 532], + [1447254449, 533], [1447254748, 537]], + 'huge': [ + [1471254705, 9223372036854775807], + [1471254706, 9223372036854775806], + [1471254707, 9223372036854775805], + [1471254708, 9223372036854775804]], + 'equal ts': [ + [1471254705, 0], [1471254705, 1], [1471254705, 1], + [1471254707, 0], [1471254707, 1], [1471254708, 0], + ], + 'variance': [ + [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25], + [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25], + [1471254711, 3.5] + ], + 'pvariance': [ + [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25], + [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75], + [1471254711, 2.75], [1471254712, 3.25] + ], + 'filter': [ + [1471254705, 5], + [1471254710, -3], + [1471254715, -7], + [1471254720, 7] + ], + 'one': [ + [1471254710, 1] + ], + 'log': [ + [1471254710, 'log line one'], + [1471254712, 'log line two'], + [1471254714, 'another line (three)'], + [1471254716, 'and yet one more'], + ], + 'special': [ + [1471254705, 0.1], + [1471254706, math.nan], + [1471254707, math.inf], + [1471254708, -math.inf], + ] +} + + +class TestTags(TestBase): + title = 'Test tag and untag series' + + @default_test_setup(3, time_precision='s') + async def run(self): + await self.client0.connect() + + await self.client0.insert(DATA) + + res = await self.client0.query(''' + alter series /series.*/ tag `SERIES` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 4 series."}) + + res = await self.client0.query(''' + alter series /.*/ tag `ALL` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 13 series."}) + + await asyncio.sleep(3.0) + + res = await self.client0.query(''' + alter series `ALL` - `SERIES` tag `OTHER` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 9 series."}) + + await self.db.add_pool(self.server1) + + await asyncio.sleep(3.0) + + res = await self.client0.query(''' + alter series /series-00(1|2) integer/ tag `SERIES_INT` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 2 series."}) + + res = await self.client0.query(''' + alter series 'one' untag `OTHER` + ''') + self.assertEqual( + res, {"success_msg": "Successfully untagged 1 series."}) + + await self.assertIsRunning(self.db, self.client0, timeout=45) + + await asyncio.sleep(5) + + await self.db.add_replica(self.server2, 0) + await asyncio.sleep(3.0) + + res = await self.client0.query(''' + alter series /series-00(1|2) float/ tag `SERIES_FLOAT` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 2 series."}) + + res = await self.client0.query(''' + alter series 'huge' untag `OTHER` + ''') + self.assertEqual( + res, {"success_msg": "Successfully untagged 1 series."}) + + await self.assertIsRunning(self.db, self.client0, timeout=45) + + res = await self.client0.query(''' + alter series 'one', 'huge', 'log' tag `SPECIAL` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 3 series."}) + + res = await self.client0.query(''' + alter series /empty/ tag `EMPTY` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 0 series."}) + + await self.client0.query(''' + alter series 'variance', 'pvariance' untag `OTHER` + ''') + + await self.client1.connect() + await self.client2.connect() + + for client in (self.client0, self.client1, self.client2): + res = await self.client0.query(''' + list tags name, series + ''') + tags = sorted(res['tags']) + self.assertEqual(tags, [ + ["ALL", 13], + ["EMPTY", 0], + ["OTHER", 5], + ["SERIES", 4], + ["SERIES_FLOAT", 2], + ["SERIES_INT", 2], + ["SPECIAL", 3], + ]) + + for series in ('huge', 'log', 'series-001 integer', 'one'): + await self.client0.query(''' + drop series '{0}' + '''.format(series)) + + await asyncio.sleep(3.0) + + for client in (self.client0, self.client1, self.client2): + res = await self.client0.query(''' + list tags name, series + ''') + tags = sorted(res['tags']) + self.assertEqual(tags, [ + ["ALL", 9], + ["EMPTY", 0], + ["OTHER", 4], + ["SERIES", 3], + ["SERIES_FLOAT", 2], + ["SERIES_INT", 1], + ["SPECIAL", 0], + ]) + + for tag in ( + 'ALL', + 'EMPTY', + 'OTHER', + 'SERIES', + 'SERIES_FLOAT', + 'SERIES_INT', + 'SPECIAL'): + await self.client0.query(''' + drop tag `{0}` + '''.format(tag)) + + await asyncio.sleep(3.0) + + for client in (self.client0, self.client1, self.client2): + res = await self.client0.query(''' + list tags name, series + ''') + tags = sorted(res['tags']) + self.assertEqual(tags, []) + + self.client2.close() + self.client1.close() + self.client0.close() + + +if __name__ == '__main__': + parse_args() + run_test(TestTags()) diff --git a/src/siri/db/initsync.c b/src/siri/db/initsync.c index 771dcbe9..d8d5702e 100644 --- a/src/siri/db/initsync.c +++ b/src/siri/db/initsync.c @@ -57,7 +57,8 @@ siridb_initsync_t * siridb_initsync_open(siridb_t * siridb, int create_new) initsync->fn = NULL; initsync->fp = NULL; initsync->next_series_id = NULL; - initsync->pkg = NULL; + initsync->pkg_points = NULL; + initsync->pkg_tags = NULL; if (INITSYNC_fn(siridb, initsync) < 0) { @@ -161,7 +162,8 @@ void siridb_initsync_free(siridb_initsync_t ** initsync) } free((*initsync)->fn); free((*initsync)->next_series_id); - free((*initsync)->pkg); + free((*initsync)->pkg_points); + free((*initsync)->pkg_tags); free(*initsync); *initsync = NULL; } @@ -174,7 +176,7 @@ void siridb_initsync_run(uv_timer_t * timer) siridb_t * siridb = (siridb_t *) timer->data; uv_timer_start( timer, - (siridb->replicate->initsync->pkg == NULL) ? + (siridb->replicate->initsync->pkg_points == NULL) ? INITSYNC_work : INITSYNC_send, INITSYNC_SLEEP * siridb->tasks.active, 0); @@ -237,8 +239,10 @@ static void INITSYNC_next_series_id(siridb_t * siridb) siridb_initsync_t * initsync = siridb->replicate->initsync; /* free the current package (can be NULL already) */ - free(initsync->pkg); - initsync->pkg = NULL; + free(initsync->pkg_points); + free(initsync->pkg_tags); + initsync->pkg_points = NULL; + initsync->pkg_tags = NULL; if (initsync->size >= SIZE2) { @@ -303,7 +307,7 @@ static void INITSYNC_pause(siridb_replicate_t * replicate) static void INITSYNC_send(uv_timer_t * timer) { siridb_t * siridb = (siridb_t *) timer->data; - assert (siridb->replicate->initsync->pkg != NULL); + assert (siridb->replicate->initsync->pkg_points != NULL); if (siridb->replicate->status == REPLICATE_STOPPING) { @@ -315,7 +319,7 @@ static void INITSYNC_send(uv_timer_t * timer) { siridb_server_send_pkg( siridb->replica, - siridb->replicate->initsync->pkg, + siridb->replicate->initsync->pkg_points, INITSYNC_TIMEOUT, (sirinet_promise_cb) INITSYNC_on_insert_response, siridb, @@ -344,7 +348,8 @@ static void INITSYNC_work(uv_timer_t * timer) siridb->replicate->status == REPLICATE_STOPPING); assert (siridb->replicate->initsync != NULL); assert (siridb->replicate->initsync->fp != NULL); - assert (siridb->replicate->initsync->pkg == NULL); + assert (siridb->replicate->initsync->pkg_points == NULL); + assert (siridb->replicate->initsync->pkg_tags == NULL); if (siridb->insert_tasks) { @@ -385,11 +390,13 @@ static void INITSYNC_work(uv_timer_t * timer) { series->flags &= ~SIRIDB_SERIES_INIT_REPL; - initsync->pkg = sirinet_packer2pkg( + initsync->pkg_points = sirinet_packer2pkg( packer, 0, BPROTO_INSERT_SERVER); + initsync->pkg_tags = siridb_tags_series(series); + uv_timer_start( siridb->replicate->timer, INITSYNC_send, @@ -411,6 +418,28 @@ static void INITSYNC_work(uv_timer_t * timer) } } +/* + * Call-back function: sirinet_promise_cb + */ +static void INITSYNC_on_tag_response( + sirinet_promise_t * promise, + sirinet_pkg_t * pkg, + int status) +{ + if (status) + { + log_error("Error while sending tags (%d)", status); + } + else if (sirinet_protocol_is_error(pkg->tp)) + { + log_error( + "Error occurred while processing data on the replica: " + "(response type: %u)", pkg->tp); + } + + sirinet_promise_decref(promise); +} + /* * Call-back function: sirinet_promise_cb */ @@ -461,6 +490,16 @@ static void INITSYNC_on_insert_response( "(response type: %u)", pkg->tp); /* TODO: maybe write pkg to an error queue ? */ } + if (siridb->replicate->initsync->pkg_tags) + { + siridb_server_send_pkg( + siridb->replica, + siridb->replicate->initsync->pkg_tags, + INITSYNC_TIMEOUT, + (sirinet_promise_cb) INITSYNC_on_tag_response, + NULL, + FLAG_KEEP_PKG); + } INITSYNC_next_series_id(siridb); break; default: diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 1019a0c2..2b97dc3b 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -196,7 +196,7 @@ if (IS_MASTER && siridb_is_reindexing(siridb)) \ #define MSG_SUCCESS_TAG \ "Successfully tagged %zu series." #define MSG_SUCCESS_UNTAG \ - "Successfully unagged %zu series." + "Successfully untagged %zu series." #define MSG_SUCCESS_DROP_TAG \ "Successfully dropped tag '%s'." diff --git a/src/siri/db/reindex.c b/src/siri/db/reindex.c index f98cea5c..9906ea50 100644 --- a/src/siri/db/reindex.c +++ b/src/siri/db/reindex.c @@ -339,16 +339,6 @@ static void REINDEX_send(uv_timer_t * timer) (sirinet_promise_cb) REINDEX_on_insert_response, siridb, FLAG_KEEP_PKG); - if (siridb->reindex->pkg_tags) - { - siridb_server_send_pkg( - siridb->reindex->server, - siridb->reindex->pkg_tags, - REINDEX_TIMEOUT, - (sirinet_promise_cb) REINDEX_on_tag_response, - NULL, - FLAG_KEEP_PKG); - } } else { @@ -484,6 +474,10 @@ static void REINDEX_work(uv_timer_t * timer) if (points != NULL) /* signal is raised in case NULL */ { + /* tag package may be NULL when no tag need to be + * synchronized */ + reindex->pkg_tags = siridb_tags_series(reindex->series); + /* * Prepare drop, increasing the reference counter is not needed * since the series can only be decremented when dropped. since @@ -511,10 +505,6 @@ static void REINDEX_work(uv_timer_t * timer) 0, BPROTO_INSERT_TESTED_SERVER); - /* tag package may be NULL when no tag need to be - * synchronized */ - reindex->pkg_tags = siridb_tags_series(reindex->series); - uv_timer_start( reindex->timer, REINDEX_send, @@ -527,8 +517,6 @@ static void REINDEX_work(uv_timer_t * timer) } } siridb_points_free(points); - - } } } @@ -567,6 +555,17 @@ static void REINDEX_commit_series(siridb_t * siridb) } } + if (siridb->reindex->pkg_tags) + { + siridb_server_send_pkg( + siridb->reindex->server, + siridb->reindex->pkg_tags, + REINDEX_TIMEOUT, + (sirinet_promise_cb) REINDEX_on_tag_response, + NULL, + FLAG_KEEP_PKG); + } + /* commit the drop */ if (siridb_series_drop_commit(siridb, siridb->reindex->series) == 0) { @@ -611,7 +610,7 @@ static void REINDEX_on_insert_response( * Commit with error since this package has result in an unknown * package type. */ - log_error("Error occurred while sending series to the replica (%d)", + log_error("Error occurred while sending series to the new server (%d)", status); REINDEX_commit_series(siridb); REINDEX_next(siridb); @@ -620,7 +619,7 @@ static void REINDEX_on_insert_response( if (sirinet_protocol_is_error(pkg->tp)) { log_error( - "Error occurred while processing data on the replica: " + "Error occurred while processing data on the new server: " "(response type: %u)", pkg->tp); } REINDEX_commit_series(siridb); @@ -646,6 +645,12 @@ static void REINDEX_on_tag_response( { log_error("Error while sending tags (%d)", status); } + else if (sirinet_protocol_is_error(pkg->tp)) + { + log_error( + "Error occurred while processing data on the new server: " + "(response type: %u)", pkg->tp); + } sirinet_promise_decref(promise); } diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index 1cbcc74c..0b6fca95 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -904,6 +904,8 @@ static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg) { ++series->ref; } + + siridb_tags_set_require_save(siridb->tags, tag); } uv_mutex_unlock(&siridb->tags->mutex); diff --git a/test/test_siridb/sources b/test/test_siridb/sources index f7b6dba6..f638f653 100644 --- a/test/test_siridb/sources +++ b/test/test_siridb/sources @@ -71,6 +71,8 @@ ../src/siri/db/shard.c ../src/siri/db/shards.c ../src/siri/db/sset.c +../src/siri/db/tag.c +../src/siri/db/tags.c ../src/siri/db/tasks.c ../src/siri/db/tee.c ../src/siri/db/time.c