Fix some on and off switching tee
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 1 Nov 2018 10:29:59 +0000 (11:29 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 1 Nov 2018 10:29:59 +0000 (11:29 +0100)
itest/test_tee.py
src/siri/db/listener.c
src/siri/db/tee.c
src/siri/net/bserver.c

index fb07c306e7a11738bd96b7f6d80344c244d435a1..86bb104d1c68461a83fd2d14514a63f86da096aa 100644 (file)
@@ -3,6 +3,7 @@ import asyncio
 import functools
 import random
 import time
+import math
 from testing import Client
 from testing import default_test_setup
 from testing import gen_data
@@ -23,19 +24,65 @@ from testing import parse_args
 PIPE_NAME = '/tmp/dbtest_tee.sock'
 
 DATA = {
-    'series num_float': [
+    'series float': [
         [1471254705, 1.5],
         [1471254707, -3.5],
         [1471254710, -7.3]],
-    'series num_integer': [
+    'series integer': [
         [1471254705, 5],
         [1471254708, -3],
         [1471254710, -7]],
-    'series_log': [
+    'aggr': [
+        [1447249033, 531], [1447249337, 534],
+        [1447249633, 535], [1447249937, 531],
+        [1447250249, 532], [1447250549, 537],
+        [1447250868, 530], [1447251168, 520],
+        [1447251449, 54], [1447251749, 54],
+        [1447252049, 513], [1447252349, 537],
+        [1447252649, 528], [1447252968, 531],
+        [1447253244, 533], [1447253549, 538],
+        [1447253849, 534], [1447254149, 532],
+        [1447254449, 533], [1447254748, 537]],
+    # 'huge': [
+    #     [1471254705, 9223372036854775807],
+    #     [1471254706, 9223372036854775806],
+    #     [1471254707, 9223372036854775805],
+    #     [1471254708, 9223372036854775804]],
+    'equal ts': [
+        [1471254705, 0], [1471254705, 1], [1471254705, 1],
+        [1471254707, 0], [1471254707, 1], [1471254708, 0],
+    ],
+    'variance': [
+        [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25],
+        [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25],
+        [1471254711, 3.5]
+    ],
+    'pvariance': [
+        [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25],
+        [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75],
+        [1471254711, 2.75], [1471254712, 3.25]
+    ],
+    'filter': [
+        [1471254705, 5],
+        [1471254710, -3],
+        [1471254715, -7],
+        [1471254720, 7]
+    ],
+    'one': [
+        [1471254710, 1]
+    ],
+    'log': [
         [1471254710, 'log line one'],
         [1471254712, 'log line two'],
         [1471254714, 'another line (three)'],
-        [1471254716, 'and yet one more']]
+        [1471254716, 'and yet one more'],
+    ],
+    # 'special': [
+    #     [1471254705, 0.1],
+    #     [1471254706, math.nan],
+    #     [1471254707, math.inf],
+    #     [1471254708, -math.inf],
+    # ]
 }
 
 if os.path.exists(PIPE_NAME):
@@ -52,7 +99,7 @@ class TestTee(TestBase):
             self._tee_data[k].extend(v)
 
 
-    @default_test_setup(1, pipe_name=PIPE_NAME)
+    @default_test_setup(2)
     async def run(self):
         self._tee_data = {}
 
@@ -69,22 +116,43 @@ class TestTee(TestBase):
 
         self.assertEqual(
             await self.client0.insert(DATA),
-            {'success_msg': 'Successfully inserted 10 point(s).'})
+            {'success_msg': 'Successfully inserted 56 point(s).'})
 
         self.assertAlmostEqual(
-            await self.client0.query('select * from "series num_float"'),
-            {'series num_float': DATA['series num_float']})
+            await self.client0.query('select * from "series float"'),
+            {'series float': DATA['series float']})
 
         self.assertEqual(
-            await self.client0.query('select * from "series num_integer"'),
-            {'series num_integer': DATA['series num_integer']})
+            await self.client0.query('select * from "series integer"'),
+            {'series integer': DATA['series integer']})
 
         self.assertEqual(
-            await self.client0.query('select * from "series_log"'),
-            {'series_log': DATA['series_log']})
+            await self.client0.query('select * from "log"'),
+            {'log': DATA['log']})
+
+        await asyncio.sleep(1)
+
+        self.assertEqual(DATA, self._tee_data)
+
+        await self.db.add_pool(self.server1, sleep=3)
+        await self.assertIsRunning(self.db, self.client0, timeout=50)
+
+        await self.client1.connect()
+
+        self._tee_data = {}
+        await self.client0.query('drop series set ignore_threshold true')
 
         await asyncio.sleep(1)
 
+        await self.client0.query(
+            'alter servers set tee_pipe_name "{}"'.format(PIPE_NAME))
+
+        await asyncio.sleep(1)
+
+        self.assertEqual(
+            await self.client0.insert(DATA),
+            {'success_msg': 'Successfully inserted 56 point(s).'})
+
         self.assertEqual(DATA, self._tee_data)
 
         self.client0.close()
index abccf5088bd4c5eed84118012321f8c11127ed01..1128157dd7a14d60c2e0efc2b6f2b8e386972d92 100644 (file)
@@ -4034,7 +4034,13 @@ static void exit_set_tee_pipe_name(uv_async_t * handle)
             query->nodes->node->children->next->next->node->children->node;
 
     char pipe_name[node->len - 1];
-    xstr_extract_string(pipe_name, node->str, node->len);
+    char * p_pipe_name = NULL;
+
+    if (node->cl_obj->gid == CLERI_GID_STRING)
+    {
+        xstr_extract_string(pipe_name, node->str, node->len);
+        p_pipe_name = pipe_name;
+    }
 
     if (q_alter->alter_tp == QUERY_ALTER_SERVERS)
     {
@@ -4052,7 +4058,7 @@ static void exit_set_tee_pipe_name(uv_async_t * handle)
                 (cexpr_cb_t) siridb_server_cexpr_cb,
                 &wserver))
         {
-            (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+            (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
             if (siridb_save(siridb))
             {
                 log_critical("Could not save database changes (database: '%s')",
@@ -4094,12 +4100,12 @@ static void exit_set_tee_pipe_name(uv_async_t * handle)
         QP_ADD_SUCCESS
         qp_add_fmt_safe(query->packer,
                     MSG_SUCCES_SET_TEE_PIPE_NAME,
-                    pipe_name,
+                    p_pipe_name ? p_pipe_name : "disabled",
                     server->name);
 
         if (server == siridb->server)
         {
-            (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+            (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
             if (siridb_save(siridb))
             {
                 log_critical("Could not save database changes (database: '%s')",
@@ -4115,9 +4121,9 @@ static void exit_set_tee_pipe_name(uv_async_t * handle)
             {
                 sirinet_pkg_t * pkg = sirinet_pkg_new(
                         0,
-                        strlen(pipe_name),
+                        p_pipe_name ? strlen(p_pipe_name) : 0,
                         BPROTO_TEE_PIPE_NAME_UPDATE,
-                        (unsigned char *) pipe_name);
+                        (unsigned char *) p_pipe_name);
                 if (pkg != NULL)
                 {
                     /* handle will be bound to a timer so we should increment */
index 32787e90da1b7a4b2f14a615f71768d20d859dc2..51f876689e848b1beb272314e067d6da0303a1d0 100644 (file)
@@ -16,6 +16,7 @@ static char tee__buf[TEE__BUF_SZ];
 static void tee__runtime_init(uv_pipe_t * pipe);
 static void tee__write_cb(uv_write_t * req, int status);
 static void tee__on_connect(uv_connect_t * req, int status);
+static void tee__close_cb(uv_pipe_t * pipe);
 static void tee__alloc_buffer(
     uv_handle_t * handle,
     size_t suggsz,
@@ -63,8 +64,6 @@ int siridb_tee_connect(siridb_tee_t * tee)
     tee->flags |= SIRIDB_TEE_FLAG_INIT;
     tee->pipe.data = tee;
 
-    free(tee->err_msg_);
-
     uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect);
     return 0;
 }
@@ -72,6 +71,20 @@ int siridb_tee_connect(siridb_tee_t * tee)
 int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name)
 {
     free(tee->pipe_name_);
+    free(tee->err_msg_);
+    tee->err_msg_ = NULL;
+
+    if (pipe_name == NULL)
+    {
+        tee->pipe_name_ = NULL;
+
+        if (tee->flags & SIRIDB_TEE_FLAG_CONNECTED)
+        {
+            uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__close_cb);
+        }
+        return 0;
+    }
+
     tee->pipe_name_ = strdup(pipe_name);
     if (!tee->pipe_name_)
     {
@@ -138,6 +151,14 @@ static void tee__runtime_init(uv_pipe_t * pipe)
     }
 }
 
+static void tee__close_cb(uv_pipe_t * pipe)
+{
+    siridb_tee_t * tee = pipe->data;
+
+    tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
+    tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+}
+
 static void tee__write_cb(uv_write_t * req, int status)
 {
     sirinet_promise_t * promise = req->data;
@@ -158,6 +179,7 @@ static void tee__on_connect(uv_connect_t * req, int status)
         log_info("Connection created to pipe: '%s'", tee->pipe_name_);
         if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data))
         {
+            free(tee->err_msg_);
             if (asprintf(&tee->err_msg_,
                     "Cannot open pipe '%s' for reading",
                     tee->pipe_name_) >= 0)
@@ -170,6 +192,7 @@ static void tee__on_connect(uv_connect_t * req, int status)
         goto done;
     }
 
+    free(tee->err_msg_);
     if (asprintf(
             &tee->err_msg_,
             "Cannot connect to pipe '%s' (%s)",
@@ -180,7 +203,7 @@ static void tee__on_connect(uv_connect_t * req, int status)
     }
 
 fail:
-    uv_close((uv_handle_t *) req->handle, NULL);
+    uv_close((uv_handle_t *) req->handle, (uv_close_cb) tee__close_cb);
 done:
     free(req);
 }
@@ -201,8 +224,6 @@ static void tee__on_data(
     ssize_t nread,
     const uv_buf_t * buf __attribute__((unused)))
 {
-    siridb_tee_t * tee = client->data;
-
     if (nread < 0)
     {
         if (nread != UV_EOF)
@@ -212,9 +233,7 @@ static void tee__on_data(
                 uv_err_name(nread));
         }
         log_info("Disconnected from tee");
-        tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
-        tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
-        uv_close((uv_handle_t *) client, NULL);
+        uv_close((uv_handle_t *) client, (uv_close_cb) tee__close_cb);
     }
 
     if (nread > 0)
index 816687ff658e9b35b6ac42c4ee3eda9bb3cf3702..3854db7e630c41051210e957dc079a48876d99d0 100644 (file)
@@ -441,11 +441,14 @@ static void on_tee_pipe_name_update(
     SERVER_CHECK_AUTHENTICATED(client, server);
     siridb_t * siridb = client->siridb;
     sirinet_pkg_t * package;
-    char * pipe_name = strndup((const char *) pkg->data, pkg->len);
-    if (pipe_name != NULL)
-    {
-        (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
-    }
+
+    char * pipe_name = pkg->len
+            ? strndup((const char *) pkg->data, pkg->len)
+            : NULL;
+
+    (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+
+    free(pipe_name);
 
     package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL);
     if (package != NULL)