Update testing
authorJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 29 Jul 2020 10:16:37 +0000 (12:16 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 29 Jul 2020 10:16:37 +0000 (12:16 +0200)
include/siri/db/initsync.h
itest/run_all.py
itest/test_cluster.py
itest/test_tags.py [new file with mode: 0644]
src/siri/db/initsync.c
src/siri/db/listener.c
src/siri/db/reindex.c
src/siri/net/bserver.c
test/test_siridb/sources

index a76c4c82ba33efd0bca072a0ad31c03689ba306b..8642cd54e720d4b37a31edd9182da8be73a361a1 100644 (file)
@@ -26,7 +26,8 @@ struct siridb_initsync_s
     int fd;
     long int size;
     uint32_t * next_series_id;
-    sirinet_pkg_t * pkg;
+    sirinet_pkg_t * pkg_points;
+    sirinet_pkg_t * pkg_tags;
 };
 
 #endif  /* SIRIDB_INITSYNC_H_ */
index 155389c2ecd3b07a034d4eb7a8405248d82fef1d..3751587656997dc6c12436b295df78a13555e45d 100644 (file)
@@ -20,6 +20,7 @@ from test_select import TestSelect
 from test_select_ns import TestSelectNano
 from test_series import TestSeries
 from test_server import TestServer
+from test_tags import TestTags
 from test_tee import TestTee
 from test_user import TestUser
 
@@ -42,5 +43,6 @@ if __name__ == '__main__':
     run_test(TestSelectNano())
     run_test(TestSeries())
     run_test(TestServer())
+    run_test(TestTags())
     run_test(TestTee())
     run_test(TestUser())
index 3a23e28093acc92f289e926bd517fcd084981911..98e422eb7b24a0ddf758244112f6ce6ed3b2f33d 100644 (file)
@@ -95,7 +95,7 @@ DATA = {
 class TestCluster(TestBase):
     title = 'Test siridb-cluster'
 
-    @default_test_setup(2, time_precision='s')
+    @default_test_setup(3, time_precision='s')
     async def run(self):
         await self.client0.connect()
 
@@ -105,18 +105,19 @@ class TestCluster(TestBase):
             alter series /series.*/ tag `SERIES`
         ''')
 
+        await asyncio.sleep(3.0)
+
         await self.client0.query('''
-            alter series /series.*/ tag `OTHER`
+            alter series /.*/ - `SERIES` tag `OTHER`
         ''')
 
         await self.db.add_pool(self.server1)
-        await self.assertIsRunning(self.db, self.client0, timeout=12)
-
-        # await asyncio.sleep(45)
+        await self.assertIsRunning(self.db, self.client0, timeout=30)
 
-        # await self.db.add_replica(self.server2, 0)
-        # await self.assertIsRunning(self.db, self.client0, timeout=12)
+        await asyncio.sleep(35)
 
+        await self.db.add_replica(self.server2, 0)
+        await self.assertIsRunning(self.db, self.client0, timeout=30)
 
         # await asyncio.sleep(45)
 
diff --git a/itest/test_tags.py b/itest/test_tags.py
new file mode 100644 (file)
index 0000000..6b871a4
--- /dev/null
@@ -0,0 +1,245 @@
+import asyncio
+import functools
+import random
+import time
+import math
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
+
+
+DATA = {
+    'series-001 float': [
+        [1471254705, 1.5],
+        [1471254707, -3.5],
+        [1471254710, -7.3]],
+    'series-001 integer': [
+        [1471254705, 5],
+        [1471254708, -3],
+        [1471254710, -7]],
+    'series-002 float': [
+        [1471254705, 3.5],
+        [1471254707, -2.5],
+        [1471254710, -8.3]],
+    'series-002 integer': [
+        [1471254705, 4],
+        [1471254708, -1],
+        [1471254710, -8]],
+    'aggr': [
+        [1447249033, 531], [1447249337, 534],
+        [1447249633, 535], [1447249937, 531],
+        [1447250249, 532], [1447250549, 537],
+        [1447250868, 530], [1447251168, 520],
+        [1447251449, 54], [1447251749, 54],
+        [1447252049, 513], [1447252349, 537],
+        [1447252649, 528], [1447252968, 531],
+        [1447253244, 533], [1447253549, 538],
+        [1447253849, 534], [1447254149, 532],
+        [1447254449, 533], [1447254748, 537]],
+    'huge': [
+        [1471254705, 9223372036854775807],
+        [1471254706, 9223372036854775806],
+        [1471254707, 9223372036854775805],
+        [1471254708, 9223372036854775804]],
+    'equal ts': [
+        [1471254705, 0], [1471254705, 1], [1471254705, 1],
+        [1471254707, 0], [1471254707, 1], [1471254708, 0],
+    ],
+    'variance': [
+        [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25],
+        [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25],
+        [1471254711, 3.5]
+    ],
+    'pvariance': [
+        [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25],
+        [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75],
+        [1471254711, 2.75], [1471254712, 3.25]
+    ],
+    'filter': [
+        [1471254705, 5],
+        [1471254710, -3],
+        [1471254715, -7],
+        [1471254720, 7]
+    ],
+    'one': [
+        [1471254710, 1]
+    ],
+    'log': [
+        [1471254710, 'log line one'],
+        [1471254712, 'log line two'],
+        [1471254714, 'another line (three)'],
+        [1471254716, 'and yet one more'],
+    ],
+    'special': [
+        [1471254705, 0.1],
+        [1471254706, math.nan],
+        [1471254707, math.inf],
+        [1471254708, -math.inf],
+    ]
+}
+
+
+class TestTags(TestBase):
+    title = 'Test tag and untag series'
+
+    @default_test_setup(3, time_precision='s')
+    async def run(self):
+        await self.client0.connect()
+
+        await self.client0.insert(DATA)
+
+        res = await self.client0.query('''
+            alter series /series.*/ tag `SERIES`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 4 series."})
+
+        res = await self.client0.query('''
+            alter series /.*/ tag `ALL`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 13 series."})
+
+        await asyncio.sleep(3.0)
+
+        res = await self.client0.query('''
+            alter series `ALL` - `SERIES` tag `OTHER`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 9 series."})
+
+        await self.db.add_pool(self.server1)
+
+        await asyncio.sleep(3.0)
+
+        res = await self.client0.query('''
+            alter series /series-00(1|2) integer/ tag `SERIES_INT`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 2 series."})
+
+        res = await self.client0.query('''
+            alter series 'one' untag `OTHER`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully untagged 1 series."})
+
+        await self.assertIsRunning(self.db, self.client0, timeout=45)
+
+        await asyncio.sleep(5)
+
+        await self.db.add_replica(self.server2, 0)
+        await asyncio.sleep(3.0)
+
+        res = await self.client0.query('''
+            alter series /series-00(1|2) float/ tag `SERIES_FLOAT`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 2 series."})
+
+        res = await self.client0.query('''
+            alter series 'huge' untag `OTHER`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully untagged 1 series."})
+
+        await self.assertIsRunning(self.db, self.client0, timeout=45)
+
+        res = await self.client0.query('''
+            alter series 'one', 'huge', 'log' tag `SPECIAL`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 3 series."})
+
+        res = await self.client0.query('''
+            alter series /empty/ tag `EMPTY`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 0 series."})
+
+        await self.client0.query('''
+            alter series 'variance', 'pvariance' untag `OTHER`
+        ''')
+
+        await self.client1.connect()
+        await self.client2.connect()
+
+        for client in (self.client0, self.client1, self.client2):
+            res = await self.client0.query('''
+                list tags name, series
+            ''')
+            tags = sorted(res['tags'])
+            self.assertEqual(tags, [
+                ["ALL", 13],
+                ["EMPTY", 0],
+                ["OTHER", 5],
+                ["SERIES", 4],
+                ["SERIES_FLOAT", 2],
+                ["SERIES_INT", 2],
+                ["SPECIAL", 3],
+            ])
+
+        for series in ('huge', 'log', 'series-001 integer', 'one'):
+            await self.client0.query('''
+                drop series '{0}'
+            '''.format(series))
+
+        await asyncio.sleep(3.0)
+
+        for client in (self.client0, self.client1, self.client2):
+            res = await self.client0.query('''
+                list tags name, series
+            ''')
+            tags = sorted(res['tags'])
+            self.assertEqual(tags, [
+                ["ALL", 9],
+                ["EMPTY", 0],
+                ["OTHER", 4],
+                ["SERIES", 3],
+                ["SERIES_FLOAT", 2],
+                ["SERIES_INT", 1],
+                ["SPECIAL", 0],
+            ])
+
+        for tag in (
+                'ALL',
+                'EMPTY',
+                'OTHER',
+                'SERIES',
+                'SERIES_FLOAT',
+                'SERIES_INT',
+                'SPECIAL'):
+            await self.client0.query('''
+                drop tag `{0}`
+            '''.format(tag))
+
+        await asyncio.sleep(3.0)
+
+        for client in (self.client0, self.client1, self.client2):
+            res = await self.client0.query('''
+                list tags name, series
+            ''')
+            tags = sorted(res['tags'])
+            self.assertEqual(tags, [])
+
+        self.client2.close()
+        self.client1.close()
+        self.client0.close()
+
+
+if __name__ == '__main__':
+    parse_args()
+    run_test(TestTags())
index 771dcbe9a89ffe327745eb7e806422a45f304763..d8d5702ec0b93bd6bd9d10ad234721f8be18f468 100644 (file)
@@ -57,7 +57,8 @@ siridb_initsync_t * siridb_initsync_open(siridb_t * siridb, int create_new)
         initsync->fn = NULL;
         initsync->fp = NULL;
         initsync->next_series_id = NULL;
-        initsync->pkg = NULL;
+        initsync->pkg_points = NULL;
+        initsync->pkg_tags = NULL;
 
         if (INITSYNC_fn(siridb, initsync) < 0)
         {
@@ -161,7 +162,8 @@ void siridb_initsync_free(siridb_initsync_t ** initsync)
     }
     free((*initsync)->fn);
     free((*initsync)->next_series_id);
-    free((*initsync)->pkg);
+    free((*initsync)->pkg_points);
+    free((*initsync)->pkg_tags);
     free(*initsync);
     *initsync = NULL;
 }
@@ -174,7 +176,7 @@ void siridb_initsync_run(uv_timer_t * timer)
     siridb_t * siridb = (siridb_t *) timer->data;
     uv_timer_start(
             timer,
-            (siridb->replicate->initsync->pkg == NULL) ?
+            (siridb->replicate->initsync->pkg_points == NULL) ?
                     INITSYNC_work : INITSYNC_send,
             INITSYNC_SLEEP * siridb->tasks.active,
             0);
@@ -237,8 +239,10 @@ static void INITSYNC_next_series_id(siridb_t * siridb)
     siridb_initsync_t * initsync = siridb->replicate->initsync;
 
     /* free the current package (can be NULL already) */
-    free(initsync->pkg);
-    initsync->pkg = NULL;
+    free(initsync->pkg_points);
+    free(initsync->pkg_tags);
+    initsync->pkg_points = NULL;
+    initsync->pkg_tags = NULL;
 
     if (initsync->size >= SIZE2)
     {
@@ -303,7 +307,7 @@ static void INITSYNC_pause(siridb_replicate_t * replicate)
 static void INITSYNC_send(uv_timer_t * timer)
 {
     siridb_t * siridb = (siridb_t *) timer->data;
-    assert (siridb->replicate->initsync->pkg != NULL);
+    assert (siridb->replicate->initsync->pkg_points != NULL);
 
     if (siridb->replicate->status == REPLICATE_STOPPING)
     {
@@ -315,7 +319,7 @@ static void INITSYNC_send(uv_timer_t * timer)
         {
             siridb_server_send_pkg(
                     siridb->replica,
-                    siridb->replicate->initsync->pkg,
+                    siridb->replicate->initsync->pkg_points,
                     INITSYNC_TIMEOUT,
                     (sirinet_promise_cb) INITSYNC_on_insert_response,
                     siridb,
@@ -344,7 +348,8 @@ static void INITSYNC_work(uv_timer_t * timer)
             siridb->replicate->status == REPLICATE_STOPPING);
     assert (siridb->replicate->initsync != NULL);
     assert (siridb->replicate->initsync->fp != NULL);
-    assert (siridb->replicate->initsync->pkg == NULL);
+    assert (siridb->replicate->initsync->pkg_points == NULL);
+    assert (siridb->replicate->initsync->pkg_tags == NULL);
 
     if (siridb->insert_tasks)
     {
@@ -385,11 +390,13 @@ static void INITSYNC_work(uv_timer_t * timer)
                 {
                     series->flags &= ~SIRIDB_SERIES_INIT_REPL;
 
-                    initsync->pkg = sirinet_packer2pkg(
+                    initsync->pkg_points = sirinet_packer2pkg(
                             packer,
                             0,
                             BPROTO_INSERT_SERVER);
 
+                    initsync->pkg_tags = siridb_tags_series(series);
+
                     uv_timer_start(
                             siridb->replicate->timer,
                             INITSYNC_send,
@@ -411,6 +418,28 @@ static void INITSYNC_work(uv_timer_t * timer)
     }
 }
 
+/*
+ * Call-back function: sirinet_promise_cb
+ */
+static void INITSYNC_on_tag_response(
+        sirinet_promise_t * promise,
+        sirinet_pkg_t * pkg,
+        int status)
+{
+    if (status)
+    {
+        log_error("Error while sending tags (%d)", status);
+    }
+    else if (sirinet_protocol_is_error(pkg->tp))
+    {
+        log_error(
+                "Error occurred while processing data on the replica: "
+                "(response type: %u)", pkg->tp);
+    }
+
+    sirinet_promise_decref(promise);
+}
+
 /*
  * Call-back function: sirinet_promise_cb
  */
@@ -461,6 +490,16 @@ static void INITSYNC_on_insert_response(
                     "(response type: %u)", pkg->tp);
             /* TODO: maybe write pkg to an error queue ? */
         }
+        if (siridb->replicate->initsync->pkg_tags)
+        {
+            siridb_server_send_pkg(
+                    siridb->replica,
+                    siridb->replicate->initsync->pkg_tags,
+                    INITSYNC_TIMEOUT,
+                    (sirinet_promise_cb) INITSYNC_on_tag_response,
+                    NULL,
+                    FLAG_KEEP_PKG);
+        }
         INITSYNC_next_series_id(siridb);
         break;
     default:
index 1019a0c21f060ae0fb9d89ba26a13af98eccbb6d..2b97dc3b9a43f1853093024600929e5d42ef389e 100644 (file)
@@ -196,7 +196,7 @@ if (IS_MASTER && siridb_is_reindexing(siridb))                              \
 #define MSG_SUCCESS_TAG \
     "Successfully tagged %zu series."
 #define MSG_SUCCESS_UNTAG \
-    "Successfully unagged %zu series."
+    "Successfully untagged %zu series."
 #define MSG_SUCCESS_DROP_TAG \
     "Successfully dropped tag '%s'."
 
index f98cea5c49ad94eacea285511d046006f9ba47cc..9906ea50fd46a88e1829574d31b8a9a81b9d8099 100644 (file)
@@ -339,16 +339,6 @@ static void REINDEX_send(uv_timer_t * timer)
                 (sirinet_promise_cb) REINDEX_on_insert_response,
                 siridb,
                 FLAG_KEEP_PKG);
-        if (siridb->reindex->pkg_tags)
-        {
-            siridb_server_send_pkg(
-                    siridb->reindex->server,
-                    siridb->reindex->pkg_tags,
-                    REINDEX_TIMEOUT,
-                    (sirinet_promise_cb) REINDEX_on_tag_response,
-                    NULL,
-                    FLAG_KEEP_PKG);
-        }
     }
     else
     {
@@ -484,6 +474,10 @@ static void REINDEX_work(uv_timer_t * timer)
 
         if (points != NULL)  /* signal is raised in case NULL */
         {
+            /* tag package may be NULL when no tag need to be
+             * synchronized */
+            reindex->pkg_tags = siridb_tags_series(reindex->series);
+
             /*
              * Prepare drop, increasing the reference counter is not needed
              * since the series can only be decremented when dropped. since
@@ -511,10 +505,6 @@ static void REINDEX_work(uv_timer_t * timer)
                             0,
                             BPROTO_INSERT_TESTED_SERVER);
 
-                    /* tag package may be NULL when no tag need to be
-                     * synchronized */
-                    reindex->pkg_tags = siridb_tags_series(reindex->series);
-
                     uv_timer_start(
                             reindex->timer,
                             REINDEX_send,
@@ -527,8 +517,6 @@ static void REINDEX_work(uv_timer_t * timer)
                 }
             }
             siridb_points_free(points);
-
-
         }
     }
 }
@@ -567,6 +555,17 @@ static void REINDEX_commit_series(siridb_t * siridb)
         }
     }
 
+    if (siridb->reindex->pkg_tags)
+    {
+        siridb_server_send_pkg(
+                siridb->reindex->server,
+                siridb->reindex->pkg_tags,
+                REINDEX_TIMEOUT,
+                (sirinet_promise_cb) REINDEX_on_tag_response,
+                NULL,
+                FLAG_KEEP_PKG);
+    }
+
     /* commit the drop */
     if (siridb_series_drop_commit(siridb, siridb->reindex->series) == 0)
     {
@@ -611,7 +610,7 @@ static void REINDEX_on_insert_response(
          * Commit with error since this package has result in an unknown
          * package type.
          */
-        log_error("Error occurred while sending series to the replica (%d)",
+        log_error("Error occurred while sending series to the new server (%d)",
                 status);
         REINDEX_commit_series(siridb);
         REINDEX_next(siridb);
@@ -620,7 +619,7 @@ static void REINDEX_on_insert_response(
         if (sirinet_protocol_is_error(pkg->tp))
         {
             log_error(
-                    "Error occurred while processing data on the replica: "
+                    "Error occurred while processing data on the new server: "
                     "(response type: %u)", pkg->tp);
         }
         REINDEX_commit_series(siridb);
@@ -646,6 +645,12 @@ static void REINDEX_on_tag_response(
     {
         log_error("Error while sending tags (%d)", status);
     }
+    else if (sirinet_protocol_is_error(pkg->tp))
+    {
+        log_error(
+                "Error occurred while processing data on the new server: "
+                "(response type: %u)", pkg->tp);
+    }
 
     sirinet_promise_decref(promise);
 }
index 1cbcc74c7e6b4c124e0eb9d68cee05be099d3981..0b6fca9504c242a8c1784e6dde2bcfc60330f7b6 100644 (file)
@@ -904,6 +904,8 @@ static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg)
                     {
                         ++series->ref;
                     }
+
+                    siridb_tags_set_require_save(siridb->tags, tag);
                 }
 
                 uv_mutex_unlock(&siridb->tags->mutex);
index f7b6dba6ad6729eed4cf2c95333dc24418e57633..f638f65393439f6548795259d6815c6b35afe480 100644 (file)
@@ -71,6 +71,8 @@
 ../src/siri/db/shard.c
 ../src/siri/db/shards.c
 ../src/siri/db/sset.c
+../src/siri/db/tag.c
+../src/siri/db/tags.c
 ../src/siri/db/tasks.c
 ../src/siri/db/tee.c
 ../src/siri/db/time.c