From: Jeroen van der Heijden Date: Fri, 24 Jan 2020 09:54:42 +0000 (+0100) Subject: Update version, fixed shard expiration issues X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~5^2~21 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=bb2f6aee956b9fb1e4f30c4b0abdadfea82d8ac2;p=siridb-server.git Update version, fixed shard expiration issues --- diff --git a/include/siri/version.h b/include/siri/version.h index 80fdeff0..b693ec24 100644 --- a/include/siri/version.h +++ b/include/siri/version.h @@ -15,7 +15,7 @@ * Note that debian alpha packages should use versions like this: * 2.0.34-0alpha0 */ -#define SIRIDB_VERSION_PRE_RELEASE "-alpha-3" +#define SIRIDB_VERSION_PRE_RELEASE "-alpha-4" #ifndef NDEBUG #define SIRIDB_VERSION_BUILD_RELEASE "+debug" diff --git a/itest/run_all.py b/itest/run_all.py index dccabad1..155389c2 100644 --- a/itest/run_all.py +++ b/itest/run_all.py @@ -6,6 +6,7 @@ from test_buffer import TestBuffer from test_cluster import TestCluster from test_compression import TestCompression from test_create_database import TestCreateDatabase +from test_expiration import TestExpiration from test_group import TestGroup from test_http_api import TestHTTPAPI from test_insert import TestInsert @@ -28,6 +29,7 @@ if __name__ == '__main__': run_test(TestBuffer()) run_test(TestCompression()) run_test(TestCreateDatabase()) + run_test(TestExpiration()) run_test(TestGroup()) run_test(TestHTTPAPI()) run_test(TestInsert()) diff --git a/itest/test_expiration.py b/itest/test_expiration.py new file mode 100644 index 00000000..5559f676 --- /dev/null +++ b/itest/test_expiration.py @@ -0,0 +1,164 @@ +import asyncio +import functools +import random +import time +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import ServerError +from testing import SiriDB +from testing import TestBase +from testing import UserAuthError +from testing import parse_args + + +TIME_PRECISION = 's' + + +class TestExpiration(TestBase): + title = 'Test shard expiration' + + GEN_POINTS = functools.partial( + gen_points, n=1, time_precision=TIME_PRECISION) + + async def _test_series(self, client): + + result = await client.query('select * from "series float"') + self.assertEqual(result['series float'], self.series_float) + + result = await client.query('select * from "series int"') + self.assertEqual(result['series int'], self.series_int) + + result = await client.query( + 'list series name, length, type, start, end') + result['series'].sort() + self.assertEqual( + result, + { + 'columns': ['name', 'length', 'type', 'start', 'end'], + 'series': [ + [ + 'series float', + 10000, 'float', + self.series_float[0][0], + self.series_float[-1][0]], + [ + 'series int', 10000, + 'integer', + self.series_int[0][0], + self.series_int[-1][0]], + ] + }) + + async def insert(self, client, series, n, timeout=1): + for _ in range(n): + await client.insert_some_series( + series, timeout=timeout, points=self.GEN_POINTS) + await asyncio.sleep(1.0) + + @default_test_setup( + 2, + time_precision=TIME_PRECISION, + compression=True, + optimize_interval=20) + async def run(self): + await self.client0.connect() + + await self.db.add_replica(self.server1, 0, sleep=30) + # await self.db.add_pool(self.server1, sleep=30) + + await self.assertIsRunning(self.db, self.client0, timeout=30) + + await self.client1.connect() + + self.series_float = gen_points( + tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='10m') + random.shuffle(self.series_float) + + self.series_int = gen_points( + tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='10m') + random.shuffle(self.series_int) + + self.assertEqual( + await self.client0.insert({ + 'series float': self.series_float, + 'series int': self.series_int + }), {'success_msg': 'Successfully inserted 20000 point(s).'}) + + self.series_float.sort() + self.series_int.sort() + + await self._test_series(self.client0) + + total = (await self.client0.query('count shards'))['shards'] + rest = ( + await self.client0.query('count shards where end > now - 3w') + )['shards'] + + self.assertGreater(total, rest) + + await self.client0.query('alter database set expiration_num 3w') + await asyncio.sleep(40) # wait for optimize to complete + + total = (await self.client0.query('count shards'))['shards'] + self.assertEqual(total, rest) + + await self.client0.query('alter database set expiration_log 2w') + await self.client0.insert({ + 'series_log': [ + [int(time.time()) - 3600*24*15, "expired_log"] + ] + }) + + res = await self.client0.query('list series name, length "series_log"') + self.assertEqual(len(res['series']), 0) + + await self.client0.insert({ + 'series_log': [ + [int(time.time()) - 3600*24*15, "expired_log"], + [int(time.time()) - 3600*24*7, "valid_log"], + ] + }) + + res = await self.client0.query('list series name, length "series_log"') + self.assertEqual(len(res['series']), 1) + self.assertEqual(res['series'], [['series_log', 1]]) + + await self.client0.query('alter database set drop_threshold 0.1') + + with self.assertRaisesRegex( + QueryError, + "This query would drop .*"): + result = await self.client0.query( + 'alter database set expiration_num 1w') + + total = (await self.client0.query('count shards'))['shards'] + rest = ( + await self.client0.query('count shards where end > now - 1w') + )['shards'] + + result = await self.client0.query( + 'alter database set expiration_num 1w ' + 'set ignore_threshold true') + + await asyncio.sleep(40) # wait for optimize to complete + + total = (await self.client0.query('count shards'))['shards'] + self.assertEqual(total, rest) + + self.client0.close() + self.client1.close() + + +if __name__ == '__main__': + random.seed(1) + parse_args() + run_test(TestExpiration()) diff --git a/itest/test_http_api.py b/itest/test_http_api.py index 8c3cb572..1a6577dc 100644 --- a/itest/test_http_api.py +++ b/itest/test_http_api.py @@ -141,7 +141,8 @@ class TestHTTPAPI(TestBase): auth=('sa', 'siri')) self.assertEqual(x.status_code, 400) - self.assertEqual(x.json(), {'error_msg': + self.assertEqual(x.json(), { + 'error_msg': 'service account name should have at least 2 characters'}) x = requests.post( @@ -202,4 +203,3 @@ class TestHTTPAPI(TestBase): if __name__ == '__main__': parse_args() run_test(TestHTTPAPI()) - diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index 0e729b9a..5eb1c95a 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -234,39 +234,43 @@ double siridb_shards_count_percent( uint64_t end_ts, uint8_t tp) { - double percent; size_t i; - vec_t * shards_list; + double percent = 1.0; + vec_t * shards_list = NULL; size_t count = 0; + size_t total = 0; uint64_t duration = tp == SIRIDB_SHARD_TP_NUMBER ? siridb->duration_num : siridb->duration_log; uv_mutex_lock(&siridb->shards_mutex); - shards_list = imap_2vec_ref(siridb->shards); + if (siridb->shards->len == 0) + { + percent = 0.0; + } + else + { + shards_list = imap_2vec_ref(siridb->shards); + } uv_mutex_unlock(&siridb->shards_mutex); if (shards_list == NULL) - return 1.0; /* error, return as if all were removed */ - - if (shards_list->len == 0) - { - vec_free(shards_list); - return 0.0; - } + return percent; for (i = 0; i < shards_list->len; i++) { siridb_shard_t * shard = (siridb_shard_t *) shards_list->data[i]; - if (shard->tp != tp) - continue; - - count += ((shard->id - shard->id % duration) + duration) < end_ts; + if (shard->tp == tp) + { + ++total; + count += ((shard->id - shard->id % duration) + duration) < end_ts; + } + siridb_shard_decref(shard); } - percent = count / shards_list->len; + percent = total ? (double) count / (double) total : 0.0; vec_free(shards_list); return percent; } diff --git a/src/siri/service/client.c b/src/siri/service/client.c index dc9608af..0ffc9e83 100644 --- a/src/siri/service/client.c +++ b/src/siri/service/client.c @@ -630,7 +630,9 @@ static void CLIENT_on_file_database( qp_timezone, qp_drop_threshold, qp_points_limit, - qp_list_limit; + qp_list_limit, + qp_exp_log, + qp_exp_num; siridb_t * siridb; int rc; /* 13 = strlen("database.dat")+1 */ @@ -659,7 +661,11 @@ static void CLIENT_on_file_database( (void) qp_next(&unpacker, &qp_list_limit); /* this is the tee pipe name when schema is >= 5 */ - (void) qp_next(&unpacker, NULL); + (void) qp_next(&unpacker, &qp_exp_log); + (void) qp_next(&unpacker, &qp_exp_num); + + /* these are the expiration times when schema is >= 6 */ + if ((fpacker = qp_open(fn, "w")) == NULL) { @@ -684,6 +690,12 @@ static void CLIENT_on_file_database( ? qp_list_limit.via.int64 : DEF_LIST_LIMIT) || qp_fadd_type(fpacker, QP_NULL) || + qp_fadd_int64(fpacker, qp_exp_log.tp == QP_INT64 + ? qp_exp_log.via.int64 + : 0) || + qp_fadd_int64(fpacker, qp_exp_num.tp == QP_INT64 + ? qp_exp_num.via.int64 + : 0) || qp_fadd_type(fpacker, QP_ARRAY_CLOSE) || qp_close(fpacker)); diff --git a/src/siri/service/request.c b/src/siri/service/request.c index e73f12ed..11f4d707 100644 --- a/src/siri/service/request.c +++ b/src/siri/service/request.c @@ -720,6 +720,8 @@ static cproto_server_t SERVICE_on_new_database( qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) || qp_fadd_int64(fp, DEF_LIST_LIMIT) || qp_fadd_type(fp, QP_NULL) || + qp_fadd_int64(fp, 0) || + qp_fadd_int64(fp, 0) || qp_fadd_type(fp, QP_ARRAY_CLOSE)) { rc = -1;