From: Jeroen van der Heijden Date: Thu, 1 Nov 2018 10:29:59 +0000 (+0100) Subject: Fix some on and off switching tee X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~7^2~2^2~22^2~4 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=dcc022fe6a3d80059b40902213bf5c80c721848c;p=siridb-server.git Fix some on and off switching tee --- diff --git a/itest/test_tee.py b/itest/test_tee.py index fb07c306..86bb104d 100644 --- a/itest/test_tee.py +++ b/itest/test_tee.py @@ -3,6 +3,7 @@ import asyncio import functools import random import time +import math from testing import Client from testing import default_test_setup from testing import gen_data @@ -23,19 +24,65 @@ from testing import parse_args PIPE_NAME = '/tmp/dbtest_tee.sock' DATA = { - 'series num_float': [ + 'series float': [ [1471254705, 1.5], [1471254707, -3.5], [1471254710, -7.3]], - 'series num_integer': [ + 'series integer': [ [1471254705, 5], [1471254708, -3], [1471254710, -7]], - 'series_log': [ + 'aggr': [ + [1447249033, 531], [1447249337, 534], + [1447249633, 535], [1447249937, 531], + [1447250249, 532], [1447250549, 537], + [1447250868, 530], [1447251168, 520], + [1447251449, 54], [1447251749, 54], + [1447252049, 513], [1447252349, 537], + [1447252649, 528], [1447252968, 531], + [1447253244, 533], [1447253549, 538], + [1447253849, 534], [1447254149, 532], + [1447254449, 533], [1447254748, 537]], + # 'huge': [ + # [1471254705, 9223372036854775807], + # [1471254706, 9223372036854775806], + # [1471254707, 9223372036854775805], + # [1471254708, 9223372036854775804]], + 'equal ts': [ + [1471254705, 0], [1471254705, 1], [1471254705, 1], + [1471254707, 0], [1471254707, 1], [1471254708, 0], + ], + 'variance': [ + [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25], + [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25], + [1471254711, 3.5] + ], + 'pvariance': [ + [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25], + [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75], + [1471254711, 2.75], [1471254712, 3.25] + ], + 'filter': [ + [1471254705, 5], + [1471254710, -3], + [1471254715, -7], + [1471254720, 7] + ], + 'one': [ + [1471254710, 1] + ], + 'log': [ [1471254710, 'log line one'], [1471254712, 'log line two'], [1471254714, 'another line (three)'], - [1471254716, 'and yet one more']] + [1471254716, 'and yet one more'], + ], + # 'special': [ + # [1471254705, 0.1], + # [1471254706, math.nan], + # [1471254707, math.inf], + # [1471254708, -math.inf], + # ] } if os.path.exists(PIPE_NAME): @@ -52,7 +99,7 @@ class TestTee(TestBase): self._tee_data[k].extend(v) - @default_test_setup(1, pipe_name=PIPE_NAME) + @default_test_setup(2) async def run(self): self._tee_data = {} @@ -69,22 +116,43 @@ class TestTee(TestBase): self.assertEqual( await self.client0.insert(DATA), - {'success_msg': 'Successfully inserted 10 point(s).'}) + {'success_msg': 'Successfully inserted 56 point(s).'}) self.assertAlmostEqual( - await self.client0.query('select * from "series num_float"'), - {'series num_float': DATA['series num_float']}) + await self.client0.query('select * from "series float"'), + {'series float': DATA['series float']}) self.assertEqual( - await self.client0.query('select * from "series num_integer"'), - {'series num_integer': DATA['series num_integer']}) + await self.client0.query('select * from "series integer"'), + {'series integer': DATA['series integer']}) self.assertEqual( - await self.client0.query('select * from "series_log"'), - {'series_log': DATA['series_log']}) + await self.client0.query('select * from "log"'), + {'log': DATA['log']}) + + await asyncio.sleep(1) + + self.assertEqual(DATA, self._tee_data) + + await self.db.add_pool(self.server1, sleep=3) + await self.assertIsRunning(self.db, self.client0, timeout=50) + + await self.client1.connect() + + self._tee_data = {} + await self.client0.query('drop series set ignore_threshold true') await asyncio.sleep(1) + await self.client0.query( + 'alter servers set tee_pipe_name "{}"'.format(PIPE_NAME)) + + await asyncio.sleep(1) + + self.assertEqual( + await self.client0.insert(DATA), + {'success_msg': 'Successfully inserted 56 point(s).'}) + self.assertEqual(DATA, self._tee_data) self.client0.close() diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index abccf508..1128157d 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -4034,7 +4034,13 @@ static void exit_set_tee_pipe_name(uv_async_t * handle) query->nodes->node->children->next->next->node->children->node; char pipe_name[node->len - 1]; - xstr_extract_string(pipe_name, node->str, node->len); + char * p_pipe_name = NULL; + + if (node->cl_obj->gid == CLERI_GID_STRING) + { + xstr_extract_string(pipe_name, node->str, node->len); + p_pipe_name = pipe_name; + } if (q_alter->alter_tp == QUERY_ALTER_SERVERS) { @@ -4052,7 +4058,7 @@ static void exit_set_tee_pipe_name(uv_async_t * handle) (cexpr_cb_t) siridb_server_cexpr_cb, &wserver)) { - (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name); if (siridb_save(siridb)) { log_critical("Could not save database changes (database: '%s')", @@ -4094,12 +4100,12 @@ static void exit_set_tee_pipe_name(uv_async_t * handle) QP_ADD_SUCCESS qp_add_fmt_safe(query->packer, MSG_SUCCES_SET_TEE_PIPE_NAME, - pipe_name, + p_pipe_name ? p_pipe_name : "disabled", server->name); if (server == siridb->server) { - (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name); if (siridb_save(siridb)) { log_critical("Could not save database changes (database: '%s')", @@ -4115,9 +4121,9 @@ static void exit_set_tee_pipe_name(uv_async_t * handle) { sirinet_pkg_t * pkg = sirinet_pkg_new( 0, - strlen(pipe_name), + p_pipe_name ? strlen(p_pipe_name) : 0, BPROTO_TEE_PIPE_NAME_UPDATE, - (unsigned char *) pipe_name); + (unsigned char *) p_pipe_name); if (pkg != NULL) { /* handle will be bound to a timer so we should increment */ diff --git a/src/siri/db/tee.c b/src/siri/db/tee.c index 32787e90..51f87668 100644 --- a/src/siri/db/tee.c +++ b/src/siri/db/tee.c @@ -16,6 +16,7 @@ static char tee__buf[TEE__BUF_SZ]; static void tee__runtime_init(uv_pipe_t * pipe); static void tee__write_cb(uv_write_t * req, int status); static void tee__on_connect(uv_connect_t * req, int status); +static void tee__close_cb(uv_pipe_t * pipe); static void tee__alloc_buffer( uv_handle_t * handle, size_t suggsz, @@ -63,8 +64,6 @@ int siridb_tee_connect(siridb_tee_t * tee) tee->flags |= SIRIDB_TEE_FLAG_INIT; tee->pipe.data = tee; - free(tee->err_msg_); - uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect); return 0; } @@ -72,6 +71,20 @@ int siridb_tee_connect(siridb_tee_t * tee) int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name) { free(tee->pipe_name_); + free(tee->err_msg_); + tee->err_msg_ = NULL; + + if (pipe_name == NULL) + { + tee->pipe_name_ = NULL; + + if (tee->flags & SIRIDB_TEE_FLAG_CONNECTED) + { + uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__close_cb); + } + return 0; + } + tee->pipe_name_ = strdup(pipe_name); if (!tee->pipe_name_) { @@ -138,6 +151,14 @@ static void tee__runtime_init(uv_pipe_t * pipe) } } +static void tee__close_cb(uv_pipe_t * pipe) +{ + siridb_tee_t * tee = pipe->data; + + tee->flags &= ~SIRIDB_TEE_FLAG_INIT; + tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; +} + static void tee__write_cb(uv_write_t * req, int status) { sirinet_promise_t * promise = req->data; @@ -158,6 +179,7 @@ static void tee__on_connect(uv_connect_t * req, int status) log_info("Connection created to pipe: '%s'", tee->pipe_name_); if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data)) { + free(tee->err_msg_); if (asprintf(&tee->err_msg_, "Cannot open pipe '%s' for reading", tee->pipe_name_) >= 0) @@ -170,6 +192,7 @@ static void tee__on_connect(uv_connect_t * req, int status) goto done; } + free(tee->err_msg_); if (asprintf( &tee->err_msg_, "Cannot connect to pipe '%s' (%s)", @@ -180,7 +203,7 @@ static void tee__on_connect(uv_connect_t * req, int status) } fail: - uv_close((uv_handle_t *) req->handle, NULL); + uv_close((uv_handle_t *) req->handle, (uv_close_cb) tee__close_cb); done: free(req); } @@ -201,8 +224,6 @@ static void tee__on_data( ssize_t nread, const uv_buf_t * buf __attribute__((unused))) { - siridb_tee_t * tee = client->data; - if (nread < 0) { if (nread != UV_EOF) @@ -212,9 +233,7 @@ static void tee__on_data( uv_err_name(nread)); } log_info("Disconnected from tee"); - tee->flags &= ~SIRIDB_TEE_FLAG_INIT; - tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; - uv_close((uv_handle_t *) client, NULL); + uv_close((uv_handle_t *) client, (uv_close_cb) tee__close_cb); } if (nread > 0) diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index 816687ff..3854db7e 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -441,11 +441,14 @@ static void on_tee_pipe_name_update( SERVER_CHECK_AUTHENTICATED(client, server); siridb_t * siridb = client->siridb; sirinet_pkg_t * package; - char * pipe_name = strndup((const char *) pkg->data, pkg->len); - if (pipe_name != NULL) - { - (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); - } + + char * pipe_name = pkg->len + ? strndup((const char *) pkg->data, pkg->len) + : NULL; + + (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + + free(pipe_name); package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL); if (package != NULL)