Added tee test
authorJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 31 Oct 2018 13:36:55 +0000 (14:36 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 31 Oct 2018 13:36:55 +0000 (14:36 +0100)
15 files changed:
itest/run_all.py
itest/test_tee.py [new file with mode: 0644]
itest/testing/__init__.py
itest/testing/pipe_server.py [new file with mode: 0644]
src/siri/db/db.c
src/siri/db/insert.c
src/siri/db/listener.c
src/siri/db/tee.c
src/siri/service/client.c
src/siri/service/request.c
src/xstr/xstr.c
test/test_grammar/test_grammar.c
test/test_lookup/sources
test/test_siridb/sources
test/test_xstr/test_xstr.c

index 344c1d9b297ac17c6484c2719fc5f19980229ebd..822f36ec4b99bf22807cbbc61f915c01796eaf4c 100644 (file)
@@ -16,6 +16,7 @@ from test_log import TestLog
 from test_log import TestLog
 from test_pipe_support import TestPipeSupport
 from test_buffer import TestBuffer
+from test_tee import TestTee
 
 
 if __name__ == '__main__':
@@ -32,3 +33,4 @@ if __name__ == '__main__':
     run_test(TestLog())
     run_test(TestPipeSupport())
     run_test(TestBuffer())
+    run_test(TestTee())
diff --git a/itest/test_tee.py b/itest/test_tee.py
new file mode 100644 (file)
index 0000000..fb07c30
--- /dev/null
@@ -0,0 +1,97 @@
+import os
+import asyncio
+import functools
+import random
+import time
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import SiriDB
+from testing import TestBase
+from testing import SiriDBAsyncUnixServer
+from testing import parse_args
+
+
+PIPE_NAME = '/tmp/dbtest_tee.sock'
+
+DATA = {
+    'series num_float': [
+        [1471254705, 1.5],
+        [1471254707, -3.5],
+        [1471254710, -7.3]],
+    'series num_integer': [
+        [1471254705, 5],
+        [1471254708, -3],
+        [1471254710, -7]],
+    'series_log': [
+        [1471254710, 'log line one'],
+        [1471254712, 'log line two'],
+        [1471254714, 'another line (three)'],
+        [1471254716, 'and yet one more']]
+}
+
+if os.path.exists(PIPE_NAME):
+    os.unlink(PIPE_NAME)
+
+
+class TestTee(TestBase):
+    title = 'Test tee'
+
+    def on_data(self, data):
+        for k, v in data.items():
+            if k not in self._tee_data:
+                self._tee_data[k] = []
+            self._tee_data[k].extend(v)
+
+
+    @default_test_setup(1, pipe_name=PIPE_NAME)
+    async def run(self):
+        self._tee_data = {}
+
+        server = SiriDBAsyncUnixServer(PIPE_NAME, self.on_data)
+
+        await server.create()
+
+        await self.client0.connect()
+
+        await self.client0.query(
+            'alter servers set tee_pipe_name "{}"'.format(PIPE_NAME))
+
+        await asyncio.sleep(1)
+
+        self.assertEqual(
+            await self.client0.insert(DATA),
+            {'success_msg': 'Successfully inserted 10 point(s).'})
+
+        self.assertAlmostEqual(
+            await self.client0.query('select * from "series num_float"'),
+            {'series num_float': DATA['series num_float']})
+
+        self.assertEqual(
+            await self.client0.query('select * from "series num_integer"'),
+            {'series num_integer': DATA['series num_integer']})
+
+        self.assertEqual(
+            await self.client0.query('select * from "series_log"'),
+            {'series_log': DATA['series_log']})
+
+        await asyncio.sleep(1)
+
+        self.assertEqual(DATA, self._tee_data)
+
+        self.client0.close()
+
+        return False
+
+
+if __name__ == '__main__':
+    parse_args()
+    run_test(TestTee())
index 9ce56c322f355ba52bc75134257a83a0f77ebe4f..3f6cb861bff9a4d22dd4cf9a5b1c197a2e4fd6e4 100644 (file)
@@ -16,6 +16,7 @@ from .testbase import default_test_setup
 from .testbase import TestBase
 from .series import Series
 from .pipe_client import PipeClient as SiriDBAsyncUnixConnection
+from .pipe_server import PipeServer as SiriDBAsyncUnixServer
 from .args import parse_args
 from .task import Task
 
diff --git a/itest/testing/pipe_server.py b/itest/testing/pipe_server.py
new file mode 100644 (file)
index 0000000..3d0f2a0
--- /dev/null
@@ -0,0 +1,83 @@
+
+import logging
+import asyncio
+import struct
+import qpack
+from siridb.connector import SiriDBProtocol
+from siridb.connector.lib.connection import SiriDBAsyncConnection
+
+
+class Package:
+
+    __slots__ = ('pid', 'length', 'tipe', 'checkbit', 'data')
+
+    struct_datapackage = struct.Struct('<IHBB')
+
+    def __init__(self, barray):
+        self.length, self.pid, self.tipe, self.checkbit = \
+            self.__class__.struct_datapackage.unpack_from(barray, offset=0)
+        self.length += self.__class__.struct_datapackage.size
+        self.data = None
+
+    def extract_data_from(self, barray):
+        try:
+            self.data = qpack.unpackb(
+                barray[self.__class__.struct_datapackage.size:self.length],
+                decode='utf-8')
+        finally:
+            del barray[:self.length]
+
+
+class SiriDBServerProtocol(asyncio.Protocol):
+
+    def __init__(self, on_package_received):
+        self._buffered_data = bytearray()
+        self._data_package = None
+        self._on_package_received = on_package_received
+
+    def data_received(self, data):
+        '''
+        override asyncio.Protocol
+        '''
+        self._buffered_data.extend(data)
+        while self._buffered_data:
+            size = len(self._buffered_data)
+            if self._data_package is None:
+                if size < Package.struct_datapackage.size:
+                    return None
+                self._data_package = Package(self._buffered_data)
+            if size < self._data_package.length:
+                return None
+            try:
+                self._data_package.extract_data_from(self._buffered_data)
+            except KeyError as e:
+                logging.error('Unsupported package received: {}'.format(e))
+            except Exception as e:
+                logging.exception(e)
+                # empty the byte-array to recover from this error
+                self._buffered_data.clear()
+            else:
+                self._on_package_received(self._data_package.data)
+            self._data_package = None
+
+
+class PipeServer(SiriDBAsyncConnection):
+    def __init__(self, pipe_name, on_data):
+        self._pipe_name = pipe_name
+        self._protocol = None
+        self._server = None
+        self._on_data_cb = on_data
+
+    async def create(self, loop=None):
+        loop = loop or asyncio.get_event_loop()
+
+        self._server = await loop.create_unix_server(
+            path=self._pipe_name,
+            protocol_factory=lambda: SiriDBServerProtocol(self._on_data))
+
+    def _on_data(self, data):
+        '''
+        series names are returned as c strings (0 terminated)
+        '''
+        data = {k.rstrip('\x00'): v for k, v in data.items()}
+        self._on_data_cb(data)
index cf765268e3af8733f1de75c157b6ee1a4def2860..a1cd56de6d76fa84ab7083b87e279ae444922d18 100644 (file)
@@ -480,7 +480,11 @@ static int siridb__from_unpacker(
             (*siridb)->tee->pipe_name_ = strndup(
                 (char *) qp_obj.via.raw,
                 qp_obj.len);
-            READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
+
+            if (!(*siridb)->tee->pipe_name_)
+            {
+                READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
+            }
         }
         else if (qp_obj.tp != QP_NULL)
         {
index 920d0b39571573a3a21e4cf05de4c27fa561775d..e8f1eebefebd73192346ab7fd0ced8181231a26a 100644 (file)
@@ -1060,6 +1060,11 @@ static int INSERT_init_local(
     siridb_tasks_inc(siridb->tasks);
     siridb->insert_tasks++;
 
+    if (siridb_tee_is_connected(siridb->tee))
+    {
+        siridb_tee_write(siridb->tee, promise);
+    }
+
     uv_async_init(siri.loop, handle, INSERT_local_task);
     uv_async_send(handle);
 
index bc3ead7c8f31fa5ca6a711e9e467b708dc4ccecb..abccf5088bd4c5eed84118012321f8c11127ed01 100644 (file)
@@ -4052,7 +4052,12 @@ static void exit_set_tee_pipe_name(uv_async_t * handle)
                 (cexpr_cb_t) siridb_server_cexpr_cb,
                 &wserver))
         {
-            siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+            (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+            if (siridb_save(siridb))
+            {
+                log_critical("Could not save database changes (database: '%s')",
+                        siridb->dbname);
+            }
             q_alter->n++;
         }
 
@@ -4095,6 +4100,11 @@ static void exit_set_tee_pipe_name(uv_async_t * handle)
         if (server == siridb->server)
         {
             (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+            if (siridb_save(siridb))
+            {
+                log_critical("Could not save database changes (database: '%s')",
+                        siridb->dbname);
+            }
 
             SIRIPARSER_ASYNC_NEXT_NODE
         }
index 53070f2dcf3f6555e298f0b6211a0fd511c81d82..32787e90da1b7a4b2f14a615f71768d20d859dc2 100644 (file)
@@ -162,25 +162,26 @@ static void tee__on_connect(uv_connect_t * req, int status)
                     "Cannot open pipe '%s' for reading",
                     tee->pipe_name_) >= 0)
             {
-                log_error(tee->err_msg_);
+                log_warning(tee->err_msg_);
             }
+            goto fail;
         }
-        else
-        {
-            tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
-        }
+        tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
+        goto done;
     }
-    else
+
+    if (asprintf(
+            &tee->err_msg_,
+            "Cannot connect to pipe '%s' (%s)",
+            tee->pipe_name_,
+            uv_strerror(status)) >= 0)
     {
-        if (asprintf(
-                &tee->err_msg_,
-                "Cannot connect to pipe '%s' (%s)",
-                tee->pipe_name_,
-                uv_strerror(status)) >= 0)
-        {
-            log_error(tee->err_msg_);
-        }
+        log_warning(tee->err_msg_);
     }
+
+fail:
+    uv_close((uv_handle_t *) req->handle, NULL);
+done:
     free(req);
 }
 
@@ -210,8 +211,8 @@ static void tee__on_data(
                 sirinet_pipe_name((uv_pipe_t * ) client),
                 uv_err_name(nread));
         }
-        log_info("Disconnected from tee pipe: '%s'",
-            sirinet_pipe_name((uv_pipe_t * ) client));
+        log_info("Disconnected from tee");
+        tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
         tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
         uv_close((uv_handle_t *) client, NULL);
     }
index 127346cda02c26a29941613cc56bd1257d0668fd..941efb42ed5bde017f6dbe8c3e45ad50daa6c243 100644 (file)
@@ -618,9 +618,20 @@ static void CLIENT_on_file_database(
         siri_service_client_t * adm_client,
         sirinet_pkg_t * pkg)
 {
-    FILE * fp;
+    qp_fpacker_t * fpacker;
     qp_unpacker_t unpacker;
-    qp_obj_t qp_uuid;
+    qp_obj_t
+        qp_uuid,
+        qp_schema,
+        qp_dbname,
+        qp_time_precision,
+        qp_buffer_size,
+        qp_duration_num,
+        qp_duration_log,
+        qp_timezone,
+        qp_drop_threshold,
+        qp_points_limit,
+        qp_list_limit;
     siridb_t * siridb;
     int rc;
     /* 13 = strlen("database.dat")+1  */
@@ -629,31 +640,55 @@ static void CLIENT_on_file_database(
 
     qp_unpacker_init(&unpacker, pkg->data, pkg->len);
 
-    if (qp_is_array(qp_next(&unpacker, NULL)) &&
-        /* schema check is not required at this moment but can be done here */
-        qp_next(&unpacker, NULL) == QP_INT64 &&
-        qp_next(&unpacker, &qp_uuid) == QP_RAW &&
-        qp_uuid.len == 16)
-    {
-        memcpy(unpacker.pt - 16, &adm_client->uuid, 16);
-    }
-    else
+    if (!qp_is_array(qp_next(&unpacker, NULL)) ||
+        qp_next(&unpacker, &qp_schema) != QP_INT64 ||
+        qp_next(&unpacker, &qp_uuid) != QP_RAW || qp_uuid.len != 16 ||
+        qp_next(&unpacker, &qp_dbname) != QP_RAW ||
+        qp_next(&unpacker, &qp_time_precision) != QP_INT64 ||
+        qp_next(&unpacker, &qp_buffer_size) != QP_INT64 ||
+        qp_next(&unpacker, &qp_duration_num) != QP_INT64 ||
+        qp_next(&unpacker, &qp_duration_log) != QP_INT64 ||
+        qp_next(&unpacker, &qp_timezone) != QP_RAW ||
+        qp_next(&unpacker, &qp_drop_threshold) != QP_DOUBLE)
     {
         CLIENT_err(adm_client, "invalid database file received");
         return;
     }
 
-    fp = fopen(fn, "w");
+    /* list and points limit require at least schema 1 */
+    (void) qp_next(&unpacker, &qp_points_limit);
+    (void) qp_next(&unpacker, &qp_list_limit);
 
-    if (fp == NULL)
+    /* this is the tee pipe name when schema is >= 5 */
+    (void) qp_next(&unpacker, NULL);
+
+    if ((fpacker = qp_open(fn, "w")) == NULL)
     {
         CLIENT_err(adm_client, "cannot write or create file: %s", fn);
         return;
     }
 
-    rc = fwrite(pkg->data, pkg->len, 1, fp);
-
-    if (fclose(fp) || rc != 1)
+    rc = (  qp_fadd_type(fpacker, QP_ARRAY_OPEN) ||
+            qp_fadd_int64(fpacker, SIRIDB_SCHEMA) ||
+            qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) ||
+            qp_fadd_raw(fpacker, qp_dbname.via.raw, qp_dbname.len) ||
+            qp_fadd_int64(fpacker, qp_time_precision.via.int64) ||
+            qp_fadd_int64(fpacker, qp_buffer_size.via.int64) ||
+            qp_fadd_int64(fpacker, qp_duration_num.via.int64) ||
+            qp_fadd_int64(fpacker, qp_duration_log.via.int64) ||
+            qp_fadd_raw(fpacker, qp_timezone.via.raw, qp_timezone.len) ||
+            qp_fadd_double(fpacker, qp_drop_threshold.via.real) ||
+            qp_fadd_int64(fpacker, qp_points_limit.tp == QP_INT64
+                    ? qp_points_limit.via.int64
+                    : DEF_SELECT_POINTS_LIMIT) ||
+            qp_fadd_int64(fpacker, qp_list_limit.tp == QP_INT64
+                    ? qp_list_limit.via.int64
+                    : DEF_LIST_LIMIT) ||
+            qp_fadd_type(fpacker, QP_NULL) ||
+            qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
+            qp_close(fpacker));
+
+    if (rc != 0)
     {
         CLIENT_err(adm_client, "cannot write or create file: %s", fn);
         return;
index 1207f35347dd77259c05eaa85a0fdd01a1759db8..c60ab7a6eb18f8e2908cf59d5ffd60a8c57b81b1 100644 (file)
@@ -622,6 +622,7 @@ static cproto_server_t SERVICE_on_new_database(
         qp_fadd_double(fp, DEF_DROP_THRESHOLD) ||
         qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) ||
         qp_fadd_int64(fp, DEF_LIST_LIMIT) ||
+        qp_fadd_type(fp, QP_NULL) ||
         qp_fadd_type(fp, QP_ARRAY_CLOSE))
     {
         rc = -1;
index 358c425293605de3a1ae4f96f01e354543b49841..ee3dcd32b5eaff7b04c11ea7a39cd66d9a5ec02d 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * xstr.c - Extra String functions used by SiriDB.
  */
+#include <assert.h>
 #include <ctype.h>
 #include <inttypes.h>
 #include <logger/logger.h>
@@ -275,42 +276,48 @@ size_t xstr_extract_string(char * dest, const char * source, size_t len)
 double xstr_to_double(const char * src, size_t len)
 {
     char * pt = (char *) src;
+    assert (len);
     double d = 0;
     double convert;
+    uint64_t r1 = 0;
 
     switch (*pt)
     {
     case '-':
+        assert (len > 1);
         convert = -1.0;
-        pt++;
+        ++pt;
+        --len;
         break;
     case '+':
-        pt++;
-        /* FALLTHRU */
-        /* no break */
+        assert (len > 1);
+        convert = 1.0;
+        ++pt;
+        --len;
+        break;
     default:
         convert = 1.0;
     }
 
-    uint64_t r1 = *pt - '0';
-
-    while (--len && isdigit(*(++pt)))
+    for (; len && isdigit(*pt); --len, ++pt)
     {
         r1 = 10 * r1 + *pt - '0';
     }
 
     d = (double) r1;
 
-    if (--len && *(pt++) == '.')
+    if (len && --len)
     {
-        uint64_t r2 = *pt - '0';
-        ssize_t i;
-        for (i = -1; --len && isdigit(*(++pt)); i--)
+        uint64_t r2;
+        double power;
+        ++pt;
+        r2 = *pt - '0';
+        for (power = -1.0f; --len && isdigit(*(++pt)); power--)
         {
-             r2 = 10 * r2 + *pt - '0';
+            r2 = 10 * r2 + *pt - '0';
         }
 
-        d += pow(10, i) * (double) r2;
+        d += pow(10.0f, power) * (double) r2;
     }
 
     return convert * d;
index e5190df7955085a105d0d0a216c4bf011a309562..9f052bbcfd9457272ff4a707f59717c539710e20 100644 (file)
@@ -18,7 +18,7 @@ int main()
 {
     test_start("grammar");
 
-    cleri_grammar_t * grammar = compile_grammar();
+    cleri_grammar_t * grammar = compile_siri_grammar_grammar();
 
     assert_invalid(grammar, "select * from");
     assert_invalid(grammar, "list");
index eee57c6b2fa703c1dcea74575f95dcaa57bc8009..a167b2876344013821c93acef10e9d19806963cc 100644 (file)
@@ -1,3 +1,3 @@
 ../src/siri/db/lookup.c
 ../src/siri/err.c
-../src/logger/logger.c
\ No newline at end of file
+../src/logger/logger.c
index e6b944a3b9f76520b819ddcd4e41079497fe4979..100f4f7692f64326bd35e380712b4c8d85b066e1 100644 (file)
@@ -66,6 +66,7 @@
 ../src/siri/db/shard.c
 ../src/siri/db/shards.c
 ../src/siri/db/tasks.c
+../src/siri/db/tee.c
 ../src/siri/db/time.c
 ../src/siri/db/user.c
 ../src/siri/db/users.c
@@ -78,4 +79,4 @@
 ../src/siri/service/request.c
 ../src/siri/help/help.c
 ../src/siri/cfg/cfg.c
-../src/siri/grammar/grammar.c
\ No newline at end of file
+../src/siri/grammar/grammar.c
index 6e883437ecfb3d617c37aa1b4109f6d12960182c..0cf3a135644b135be2a9a5305629d4b02f785bc0 100644 (file)
@@ -12,9 +12,20 @@ int main()
         _assert (xstr_to_double("0.55", 3) == 0.5);
         _assert (xstr_to_double("123.456", 7) == 123.456);
         _assert (xstr_to_double("123", 3) == 123);
+        _assert (xstr_to_double("1234", 3) == 123);
         _assert (xstr_to_double("123.", 4) == 123);
+        _assert (xstr_to_double("123.", 3) == 123);
+        _assert (xstr_to_double("+1234", 4) == 123);
+        _assert (xstr_to_double("-1234", 4) == -123);
         _assert (xstr_to_double("123456.", 3) == 123);
-        _assert (xstr_to_double("-0.5", 3) == -0.5);
+        _assert (xstr_to_double("-0.5", 4) == -0.5);
+        _assert (xstr_to_double("-0.56", 4) == -0.5);
+        _assert (xstr_to_double("+0.5", 4) == 0.5);
+        _assert (xstr_to_double("+0.56", 4) == 0.5);
+        _assert (xstr_to_double("-.5", 3) == -0.5);
+        _assert (xstr_to_double("+.55", 3) == 0.5);
+        _assert (xstr_to_double(".55", 2) == 0.5);
+        _assert (xstr_to_double("-.55", 3) == -0.5);
     }
 
     return test_end();