Update version, fixed shard expiration issues
authorJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 24 Jan 2020 09:54:42 +0000 (10:54 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 24 Jan 2020 09:54:42 +0000 (10:54 +0100)
include/siri/version.h
itest/run_all.py
itest/test_expiration.py [new file with mode: 0644]
itest/test_http_api.py
src/siri/db/shards.c
src/siri/service/client.c
src/siri/service/request.c

index 80fdeff059d36e5e0a98242cec2c3c52ad2cbae1..b693ec24976f2f838a43ecbad8f013c57e3c7858 100644 (file)
@@ -15,7 +15,7 @@
  * Note that debian alpha packages should use versions like this:
  *   2.0.34-0alpha0
  */
-#define SIRIDB_VERSION_PRE_RELEASE "-alpha-3"
+#define SIRIDB_VERSION_PRE_RELEASE "-alpha-4"
 
 #ifndef NDEBUG
 #define SIRIDB_VERSION_BUILD_RELEASE "+debug"
index dccabad1bc036a4659fe5ab32a3ec2e48082c507..155389c2ecd3b07a034d4eb7a8405248d82fef1d 100644 (file)
@@ -6,6 +6,7 @@ from test_buffer import TestBuffer
 from test_cluster import TestCluster
 from test_compression import TestCompression
 from test_create_database import TestCreateDatabase
+from test_expiration import TestExpiration
 from test_group import TestGroup
 from test_http_api import TestHTTPAPI
 from test_insert import TestInsert
@@ -28,6 +29,7 @@ if __name__ == '__main__':
     run_test(TestBuffer())
     run_test(TestCompression())
     run_test(TestCreateDatabase())
+    run_test(TestExpiration())
     run_test(TestGroup())
     run_test(TestHTTPAPI())
     run_test(TestInsert())
diff --git a/itest/test_expiration.py b/itest/test_expiration.py
new file mode 100644 (file)
index 0000000..5559f67
--- /dev/null
@@ -0,0 +1,164 @@
+import asyncio
+import functools
+import random
+import time
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
+
+
+TIME_PRECISION = 's'
+
+
+class TestExpiration(TestBase):
+    title = 'Test shard expiration'
+
+    GEN_POINTS = functools.partial(
+        gen_points, n=1, time_precision=TIME_PRECISION)
+
+    async def _test_series(self, client):
+
+        result = await client.query('select * from "series float"')
+        self.assertEqual(result['series float'], self.series_float)
+
+        result = await client.query('select * from "series int"')
+        self.assertEqual(result['series int'], self.series_int)
+
+        result = await client.query(
+            'list series name, length, type, start, end')
+        result['series'].sort()
+        self.assertEqual(
+            result,
+            {
+                'columns': ['name', 'length', 'type', 'start', 'end'],
+                'series': [
+                    [
+                        'series float',
+                        10000, 'float',
+                        self.series_float[0][0],
+                        self.series_float[-1][0]],
+                    [
+                        'series int', 10000,
+                        'integer',
+                        self.series_int[0][0],
+                        self.series_int[-1][0]],
+                ]
+            })
+
+    async def insert(self, client, series, n, timeout=1):
+        for _ in range(n):
+            await client.insert_some_series(
+                series, timeout=timeout, points=self.GEN_POINTS)
+            await asyncio.sleep(1.0)
+
+    @default_test_setup(
+        2,
+        time_precision=TIME_PRECISION,
+        compression=True,
+        optimize_interval=20)
+    async def run(self):
+        await self.client0.connect()
+
+        await self.db.add_replica(self.server1, 0, sleep=30)
+        # await self.db.add_pool(self.server1, sleep=30)
+
+        await self.assertIsRunning(self.db, self.client0, timeout=30)
+
+        await self.client1.connect()
+
+        self.series_float = gen_points(
+            tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='10m')
+        random.shuffle(self.series_float)
+
+        self.series_int = gen_points(
+            tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='10m')
+        random.shuffle(self.series_int)
+
+        self.assertEqual(
+            await self.client0.insert({
+                'series float': self.series_float,
+                'series int': self.series_int
+            }), {'success_msg': 'Successfully inserted 20000 point(s).'})
+
+        self.series_float.sort()
+        self.series_int.sort()
+
+        await self._test_series(self.client0)
+
+        total = (await self.client0.query('count shards'))['shards']
+        rest = (
+            await self.client0.query('count shards where end > now - 3w')
+        )['shards']
+
+        self.assertGreater(total, rest)
+
+        await self.client0.query('alter database set expiration_num 3w')
+        await asyncio.sleep(40)  # wait for optimize to complete
+
+        total = (await self.client0.query('count shards'))['shards']
+        self.assertEqual(total, rest)
+
+        await self.client0.query('alter database set expiration_log 2w')
+        await self.client0.insert({
+            'series_log': [
+                [int(time.time()) - 3600*24*15, "expired_log"]
+            ]
+        })
+
+        res = await self.client0.query('list series name, length "series_log"')
+        self.assertEqual(len(res['series']), 0)
+
+        await self.client0.insert({
+            'series_log': [
+                [int(time.time()) - 3600*24*15, "expired_log"],
+                [int(time.time()) - 3600*24*7, "valid_log"],
+            ]
+        })
+
+        res = await self.client0.query('list series name, length "series_log"')
+        self.assertEqual(len(res['series']), 1)
+        self.assertEqual(res['series'], [['series_log', 1]])
+
+        await self.client0.query('alter database set drop_threshold 0.1')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                "This query would drop .*"):
+            result = await self.client0.query(
+                'alter database set expiration_num 1w')
+
+        total = (await self.client0.query('count shards'))['shards']
+        rest = (
+            await self.client0.query('count shards where end > now - 1w')
+        )['shards']
+
+        result = await self.client0.query(
+                'alter database set expiration_num 1w '
+                'set ignore_threshold true')
+
+        await asyncio.sleep(40)  # wait for optimize to complete
+
+        total = (await self.client0.query('count shards'))['shards']
+        self.assertEqual(total, rest)
+
+        self.client0.close()
+        self.client1.close()
+
+
+if __name__ == '__main__':
+    random.seed(1)
+    parse_args()
+    run_test(TestExpiration())
index 8c3cb572d0d9cf686537b013efb1b79213d93e7d..1a6577dc5fdd1adbaae1cf519635be664e4bb5fa 100644 (file)
@@ -141,7 +141,8 @@ class TestHTTPAPI(TestBase):
             auth=('sa', 'siri'))
 
         self.assertEqual(x.status_code, 400)
-        self.assertEqual(x.json(), {'error_msg':
+        self.assertEqual(x.json(), {
+            'error_msg':
                 'service account name should have at least 2 characters'})
 
         x = requests.post(
@@ -202,4 +203,3 @@ class TestHTTPAPI(TestBase):
 if __name__ == '__main__':
     parse_args()
     run_test(TestHTTPAPI())
-
index 0e729b9ae7a322c2d8f5fe40aeffa2824f738d63..5eb1c95a918f158f38da8f64e7e17b32e77273ff 100644 (file)
@@ -234,39 +234,43 @@ double siridb_shards_count_percent(
         uint64_t end_ts,
         uint8_t tp)
 {
-    double percent;
     size_t i;
-    vec_t * shards_list;
+    double percent = 1.0;
+    vec_t * shards_list = NULL;
     size_t count = 0;
+    size_t total = 0;
     uint64_t duration = tp == SIRIDB_SHARD_TP_NUMBER
             ? siridb->duration_num
             : siridb->duration_log;
 
     uv_mutex_lock(&siridb->shards_mutex);
 
-    shards_list = imap_2vec_ref(siridb->shards);
+    if (siridb->shards->len == 0)
+    {
+        percent = 0.0;
+    }
+    else
+    {
+        shards_list = imap_2vec_ref(siridb->shards);
+    }
 
     uv_mutex_unlock(&siridb->shards_mutex);
 
     if (shards_list == NULL)
-        return 1.0;  /* error, return as if all were removed */
-
-    if (shards_list->len == 0)
-    {
-        vec_free(shards_list);
-        return 0.0;
-    }
+        return percent;
 
     for (i = 0; i < shards_list->len; i++)
     {
         siridb_shard_t * shard = (siridb_shard_t *) shards_list->data[i];
-        if (shard->tp != tp)
-            continue;
-
-        count += ((shard->id - shard->id % duration) + duration) < end_ts;
+        if (shard->tp == tp)
+        {
+            ++total;
+            count += ((shard->id - shard->id % duration) + duration) < end_ts;
+        }
+        siridb_shard_decref(shard);
     }
 
-    percent = count / shards_list->len;
+    percent = total ? (double) count / (double) total : 0.0;
     vec_free(shards_list);
     return percent;
 }
index dc9608af24ac81eafa8863bbd7f6b66e6af953d5..0ffc9e833d7c3a402a324b0881f73bb0d4ed87d2 100644 (file)
@@ -630,7 +630,9 @@ static void CLIENT_on_file_database(
         qp_timezone,
         qp_drop_threshold,
         qp_points_limit,
-        qp_list_limit;
+        qp_list_limit,
+        qp_exp_log,
+        qp_exp_num;
     siridb_t * siridb;
     int rc;
     /* 13 = strlen("database.dat")+1  */
@@ -659,7 +661,11 @@ static void CLIENT_on_file_database(
     (void) qp_next(&unpacker, &qp_list_limit);
 
     /* this is the tee pipe name when schema is >= 5 */
-    (void) qp_next(&unpacker, NULL);
+    (void) qp_next(&unpacker, &qp_exp_log);
+    (void) qp_next(&unpacker, &qp_exp_num);
+
+    /* these are the expiration times when schema is >= 6 */
+
 
     if ((fpacker = qp_open(fn, "w")) == NULL)
     {
@@ -684,6 +690,12 @@ static void CLIENT_on_file_database(
                     ? qp_list_limit.via.int64
                     : DEF_LIST_LIMIT) ||
             qp_fadd_type(fpacker, QP_NULL) ||
+            qp_fadd_int64(fpacker, qp_exp_log.tp == QP_INT64
+                    ? qp_exp_log.via.int64
+                    : 0) ||
+            qp_fadd_int64(fpacker, qp_exp_num.tp == QP_INT64
+                    ? qp_exp_num.via.int64
+                    : 0) ||
             qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
             qp_close(fpacker));
 
index e73f12ed76bbcaec086f95ffa895b5525c1cd48b..11f4d707c7c10fb3461cfd5bf14385edb4d7d9f7 100644 (file)
@@ -720,6 +720,8 @@ static cproto_server_t SERVICE_on_new_database(
         qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) ||
         qp_fadd_int64(fp, DEF_LIST_LIMIT) ||
         qp_fadd_type(fp, QP_NULL) ||
+        qp_fadd_int64(fp, 0) ||
+        qp_fadd_int64(fp, 0) ||
         qp_fadd_type(fp, QP_ARRAY_CLOSE))
     {
         rc = -1;