TCP tee
authorJeroen van der Heijden <jeroen@cesbit.com>
Wed, 20 Apr 2022 14:16:23 +0000 (16:16 +0200)
committerJeroen van der Heijden <jeroen@cesbit.com>
Wed, 20 Apr 2022 14:16:23 +0000 (16:16 +0200)
21 files changed:
grammar/grammar.py
include/siri/db/db.h
include/siri/db/tee.h
include/siri/grammar/grammar.h
include/siri/net/protocol.h
itest/tee_server.py [new file with mode: 0644]
itest/test_integer_load.py [new file with mode: 0644]
src/siri/db/db.c
src/siri/db/insert.c
src/siri/db/listener.c
src/siri/db/props.c
src/siri/db/server.c
src/siri/db/servers.c
src/siri/db/tee.c
src/siri/grammar/grammar.c
src/siri/heartbeat.c
src/siri/net/bserver.c
src/siri/net/protocol.c
src/siri/service/client.c
src/siri/service/request.c
src/siri/siri.c

index 2b689c23ef6bc04362369df45f0541fff1d0f9cc..ddc2b1cf1a69c0929e9dbe02dca50736219e03a1 100644 (file)
@@ -165,7 +165,7 @@ class SiriGrammar(Grammar):
     k_sync_progress = Keyword('sync_progress')
     k_tag = Keyword('tag')
     k_tags = Keyword('tags')
-    k_tee_pipe_name = Keyword('tee_pipe_name')
+    k_tee = Keyword('tee')
     k_time_precision = Keyword('time_precision')
     k_timeit = Keyword('timeit')
     k_timeval = Keyword('timeval')
@@ -287,7 +287,6 @@ class SiriGrammar(Grammar):
         k_reindex_progress,
         k_selected_points,
         k_sync_progress,
-        k_tee_pipe_name,
         k_uptime,
         most_greedy=False), ',', 1)
 
@@ -394,7 +393,6 @@ class SiriGrammar(Grammar):
             k_status,
             k_reindex_progress,
             k_sync_progress,
-            k_tee_pipe_name,
             most_greedy=False), str_operator, string),
         Sequence(k_online, bool_operator, _boolean),
         Sequence(k_log_level, int_operator, log_keywords),
@@ -606,7 +604,7 @@ class SiriGrammar(Grammar):
         Optional(Sequence(k_using, aggregate_functions)))
 
     set_address = Sequence(k_set, k_address, string)
-    set_tee_pipe_name = Sequence(k_set, k_tee_pipe_name, Choice(
+    set_tee = Sequence(k_set, k_tee, Choice(
         k_false,
         string,
         most_greedy=False))
@@ -642,6 +640,7 @@ class SiriGrammar(Grammar):
         set_timezone,
         set_expiration_num,
         set_expiration_log,
+        set_tee,
         most_greedy=False))
 
     alter_group = Sequence(k_group, group_name, Choice(
@@ -656,14 +655,12 @@ class SiriGrammar(Grammar):
     alter_server = Sequence(k_server, uuid, Choice(
         set_log_level,
         set_backup_mode,
-        set_tee_pipe_name,
         set_address,
         set_port,
         most_greedy=False))
 
     alter_servers = Sequence(k_servers, Optional(where_server), Choice(
         set_log_level,
-        set_tee_pipe_name,
         most_greedy=False))
 
     alter_user = Sequence(k_user, string, Choice(
@@ -853,7 +850,7 @@ class SiriGrammar(Grammar):
         k_startup_time,
         k_status,
         k_sync_progress,
-        k_tee_pipe_name,
+        k_tee,
         k_time_precision,
         k_timezone,
         k_uptime,
index 20fb8fadf9861c83c36a3e83087db96c63253c9b..25c9bd482f26e13f90f1ef34fec03df0514d0a92 100644 (file)
@@ -8,7 +8,7 @@ typedef struct siridb_s siridb_t;
 
 #define SIRIDB_MAX_SIZE_ERR_MSG 1024
 #define SIRIDB_MAX_DBNAME_LEN 256  /*    255 + NULL     */
-#define SIRIDB_SCHEMA 6
+#define SIRIDB_SCHEMA 7
 #define SIRIDB_FLAG_REINDEXING 1
 #define SIRIDB_FLAG_DROPPED 2
 
index 16ff3f38b08fb414a1f425a6f9eb5562e151639d..cbee3562eb0a5c22c4e4d8562a5b139234f6f594 100644 (file)
@@ -6,50 +6,59 @@
 
 typedef struct siridb_tee_s siridb_tee_t;
 
+#define SIRIDB_TEE_DEFAULT_TCP_PORT 9104
+
 enum
 {
-    SIRIDB_TEE_FLAG_INIT = 1<<0,
-    SIRIDB_TEE_FLAG_CONNECTING = 1<<1,
-    SIRIDB_TEE_FLAG_CONNECTED = 1<<2,
     SIRIDB_TEE_FLAG = 1<<31,
 };
 
+enum siridb_tee_e_t
+{
+    SIRIDB_TEE_E_OK=0,
+    SIRIDB_TEE_E_ALLOC,
+    SIRIDB_TEE_E_READ,
+    SIRIDB_TEE_E_CONNECT,
+};
+
+
 #include <uv.h>
 #include <stdbool.h>
 #include <siri/net/promise.h>
 
 siridb_tee_t * siridb_tee_new(void);
-void siridb_tee_free(siridb_tee_t * tee);
-int siridb_tee_connect(siridb_tee_t * tee);
-int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name);
+int siridb_tee_set_address_port(
+        siridb_tee_t * tee,
+        const char * address,
+        uint16_t port);
 void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise);
-const char * tee_str(siridb_tee_t * tee);
-static inline bool siridb_tee_is_configured(siridb_tee_t * tee);
-static inline bool siridb_tee_is_connected(siridb_tee_t * tee);
-static inline bool siridb_tee_is_handle(uv_handle_t * handle);
+void siridb_tee_free(siridb_tee_t * tee);
+const char * siridb_tee_str(siridb_tee_t * tee);
+
+typedef void (*siridb_tee_cb)(uv_handle_t *);
+
 
 struct siridb_tee_s
 {
     uint32_t flags;  /* maps to sirnet_stream_t tp for cleanup */
-    char * pipe_name_;
-    char * err_msg_;
-    uv_pipe_t pipe;
+    uint16_t port;
+    uint16_t err_code;
+    char * address;
+    uv_tcp_t * tcp;
+    uv_mutex_t lock_;
+    sirinet_promise_t * promise_;
 };
 
+
 static inline bool siridb_tee_is_configured(siridb_tee_t * tee)
 {
-    return tee->pipe_name_ != NULL;
+    return tee->address != NULL;
 };
 
-static inline bool siridb_tee_is_connected(siridb_tee_t * tee)
-{
-    return tee->flags & SIRIDB_TEE_FLAG_CONNECTED;
-}
-
 static inline bool siridb_tee_is_handle(uv_handle_t * handle)
 {
     return
-        handle->type == UV_NAMED_PIPE &&
+        handle->type == UV_TCP &&
         handle->data &&
         (((siridb_tee_t *) handle->data)->flags & SIRIDB_TEE_FLAG);
 }
index 6a696e81e5561a727c9be41efec7b6e8903cd327..466e09164fc6a8dbcaf2a8595e0d0c941bc02d91 100644 (file)
@@ -5,7 +5,7 @@
  * should be used with the libcleri module.
  *
  * Source class: SiriGrammar
- * Created at: 2020-09-25 10:57:26
+ * Created at: 2022-04-15 12:10:03
  */
 #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
 #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
@@ -236,7 +236,7 @@ enum cleri_grammar_ids {
     CLERI_GID_K_SYNC_PROGRESS,
     CLERI_GID_K_TAG,
     CLERI_GID_K_TAGS,
-    CLERI_GID_K_TEE_PIPE_NAME,
+    CLERI_GID_K_TEE,
     CLERI_GID_K_TIMEIT,
     CLERI_GID_K_TIMEVAL,
     CLERI_GID_K_TIMEZONE,
@@ -307,7 +307,7 @@ enum cleri_grammar_ids {
     CLERI_GID_SET_PASSWORD,
     CLERI_GID_SET_PORT,
     CLERI_GID_SET_SELECT_POINTS_LIMIT,
-    CLERI_GID_SET_TEE_PIPE_NAME,
+    CLERI_GID_SET_TEE,
     CLERI_GID_SET_TIMEZONE,
     CLERI_GID_SHARD_COLUMNS,
     CLERI_GID_SHOW_STMT,
index d3787e84b2c6f3acd137fc1a0bc4fe95128ed9dc..ff37e4e38828f72c26c8d87b6dc1b93984e18d6e 100644 (file)
@@ -77,7 +77,7 @@ typedef enum
     BPROTO_REQ_GROUPS,                  /* empty                            */
     BPROTO_ENABLE_BACKUP_MODE,          /* empty                            */
     BPROTO_DISABLE_BACKUP_MODE,         /* empty                            */
-    BPROTO_TEE_PIPE_NAME_UPDATE,        /* tee pipe name                    */
+    _BPROTO_TEE_PIPE_NAME_UPDATE,       /* deprecated                       */
     BPROTO_DROP_DATABASE,               /* empty                            */
     BPROTO_REQ_TAGS,                    /* empty                            */
     BPROTO_SERIES_TAGS,                 /* [series name, tag name, ...]     */
diff --git a/itest/tee_server.py b/itest/tee_server.py
new file mode 100644 (file)
index 0000000..11b8ffc
--- /dev/null
@@ -0,0 +1,6 @@
+import asyncio
+
+
+
+if __name__ == '__main__':
+    pass
\ No newline at end of file
diff --git a/itest/test_integer_load.py b/itest/test_integer_load.py
new file mode 100644 (file)
index 0000000..acb0e4b
--- /dev/null
@@ -0,0 +1,81 @@
+import asyncio
+import functools
+import random
+import time
+import string
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
+
+
+TIME_PRECISION = 'ms'
+
+
+class TestIntegerLoad(TestBase):
+    title = 'Test inserts and response'
+
+    GEN_POINTS = functools.partial(
+        gen_points,
+        tp=int,
+        mi=-2**10,
+        ma=2**10,
+        n=5,
+        time_precision=TIME_PRECISION,
+        ts_gap=3)
+
+    async def select(self, client, series, n):
+        series = {s.name: s for s in series}
+        chars=string.ascii_letters + string.digits
+        for _ in range(n):
+            regex = ''.join(random.choice(chars) for _ in range(3))
+            res = await client.query(f'select max() prefix "max-", min() prefix "min-", mean() prefix "mean-" from /.*{regex}.*/i')
+            if res:
+                print(res)
+            # for s, p in res.items():
+            #     self.assertEqual(sorted(series[s].points), p)
+            await asyncio.sleep(0.2)
+
+    async def insert(self, client, series, n):
+        for _ in range(n):
+            await client.insert_some_series(
+                series, n=0.04, points=self.GEN_POINTS)
+            await asyncio.sleep(0.2)
+
+    @default_test_setup(
+            1,
+            time_precision=TIME_PRECISION,
+            compression=True,
+            auto_duration=True,
+            optimize_interval=20)
+    async def run(self):
+        await self.client0.connect()
+
+        series = gen_series(n=1000)
+
+        insert = asyncio.ensure_future(
+            self.insert(self.client0, series, n=50000))
+        select = asyncio.ensure_future(
+            self.select(self.client0, series, n=50000))
+        await asyncio.gather(insert, select)
+
+        self.client0.close()
+        await asyncio.sleep(1800)
+
+
+if __name__ == '__main__':
+    random.seed(1)
+    parse_args()
+    run_test(TestIntegerLoad())
index 72749f1be0b35cb87ca19a6bb662f337f4a2f234..374fe8b763ef5f8142a42e1903ff635cd8a7f8f0 100644 (file)
@@ -278,12 +278,6 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags)
     /* start tasks */
     siridb_tasks_init(&siridb->tasks);
 
-    /* init tee if configured */
-    if (siridb_tee_is_configured(siridb->tee))
-    {
-         siridb_tee_connect(siridb->tee);
-    }
-
     log_info("Finished loading database: '%s'", siridb->dbname);
 
     return siridb;
@@ -319,7 +313,8 @@ static int siridb__from_unpacker(
             qp_schema.via.int64 == 2 ||
             qp_schema.via.int64 == 3 ||
             qp_schema.via.int64 == 4 ||
-            qp_schema.via.int64 == 5)
+            qp_schema.via.int64 == 5 ||
+            qp_schema.via.int64 == 6)
     {
         log_info(
                 "Found an old database schema (v%d), "
@@ -481,31 +476,18 @@ static int siridb__from_unpacker(
     }
 
     /* for older schemas we keep the default tee_pipe_name=NULL */
-    if (qp_schema.via.int64 >= 5)
+    if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6)
     {
+        LOGC("HERE");
         qp_next(unpacker, &qp_obj);
-
-        if (qp_obj.tp == QP_RAW)
-        {
-            (*siridb)->tee->pipe_name_ = strndup(
-                (char *) qp_obj.via.raw,
-                qp_obj.len);
-
-            if (!(*siridb)->tee->pipe_name_)
-            {
-                READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
-            }
-        }
-        else if (qp_obj.tp != QP_NULL)
-        {
-            READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.")
-        }
+        /* Skip the tee pipe name */
     }
     if (qp_schema.via.int64 >= 6)
     {
         /* read select points limit */
         if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0)
         {
+
             READ_DB_EXIT_WITH_ERROR(
                     "Cannot read shard (log) expiration time.")
         }
@@ -519,17 +501,48 @@ static int siridb__from_unpacker(
         }
         (*siridb)->expiration_num = qp_obj.via.int64;
     }
-    if ((*siridb)->tee->pipe_name_ == NULL)
+    if (qp_schema.via.int64 >= 7)
+    {
+        qp_next(unpacker, &qp_obj);
+
+        if (qp_obj.tp == QP_RAW)
+        {
+            (*siridb)->tee->address = strndup(
+                (char *) qp_obj.via.raw,
+                qp_obj.len);
+
+            if (!(*siridb)->tee->address)
+            {
+                READ_DB_EXIT_WITH_ERROR("Cannot allocate tee address.")
+            }
+        }
+        else if (qp_obj.tp != QP_NULL)
+        {
+            READ_DB_EXIT_WITH_ERROR("Cannot read tee address.")
+        }
+
+        if (qp_next(unpacker, &qp_obj) != QP_INT64 ||
+            qp_obj.via.int64 < 0 ||
+            qp_obj.via.int64 > 65535)
+        {
+            READ_DB_EXIT_WITH_ERROR("Cannot read tee port.")
+        }
+
+        (*siridb)->tee->port = qp_obj.via.int64;
+    }
+
+    if ((*siridb)->tee->address == NULL)
     {
         log_debug(
-            "No tee pipe name configured for database: %s",
+            "No tee configured for database: %s",
             (*siridb)->dbname);
     }
     else
     {
         log_debug(
-            "Using tee pipe name '%s' for database: '%s'",
-            (*siridb)->tee->pipe_name_,
+            "Using tee '%s:%u' for database: '%s'",
+            (*siridb)->tee->address,
+            (*siridb)->tee->port,
             (*siridb)->dbname);
     }
 
@@ -675,11 +688,12 @@ int siridb_save(siridb_t * siridb)
             qp_fadd_double(fpacker, siridb->drop_threshold) ||
             qp_fadd_int64(fpacker, siridb->select_points_limit) ||
             qp_fadd_int64(fpacker, siridb->list_limit) ||
-            (siridb->tee->pipe_name_ == NULL
-                ? qp_fadd_type(fpacker, QP_NULL)
-                : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) ||
             qp_fadd_int64(fpacker, siridb->expiration_log) ||
             qp_fadd_int64(fpacker, siridb->expiration_num) ||
+            (siridb->tee->address == NULL
+                ? qp_fadd_type(fpacker, QP_NULL)
+                : qp_fadd_string(fpacker, siridb->tee->address)) ||
+            qp_fadd_int64(fpacker, siridb->tee->port) ||
             qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
             qp_close(fpacker));
 }
@@ -888,6 +902,7 @@ static siridb_t * siridb__new(void)
     siridb->users = NULL;
     siridb->servers = NULL;
     siridb->pools = NULL;
+    siridb->dropped_fp = NULL;
     siridb->max_series_id = 0;
     siridb->received_points = 0;
     siridb->selected_points = 0;
index e67b7913394432a472cb5dee4335df14810cb6c6..a581a5571aa7032d48ced5f86e61794b20aea2f6 100644 (file)
@@ -345,7 +345,7 @@ int insert_init_backend_local(
     siridb_tasks_inc(siridb->tasks);
     siridb->insert_tasks++;
 
-    if (siridb_tee_is_connected(siridb->tee))
+    if (siridb_tee_is_configured(siridb->tee))
     {
         siridb_tee_write(siridb->tee, promise);
     }
@@ -1075,7 +1075,7 @@ static int INSERT_init_local(
     siridb_tasks_inc(siridb->tasks);
     siridb->insert_tasks++;
 
-    if (siridb_tee_is_connected(siridb->tee))
+    if (siridb_tee_is_configured(siridb->tee))
     {
         siridb_tee_write(siridb->tee, promise);
     }
index ec42c8c94eeaf77ecb440c412a0cb4ce78ee91b0..d2919a07c56b906c7799f508e6bf2f86ae090b22 100644 (file)
@@ -33,6 +33,7 @@
 #include <siri/net/promises.h>
 #include <siri/net/protocol.h>
 #include <siri/net/clserver.h>
+#include <siri/net/tcp.h>
 #include <siri/siri.h>
 #include <xstr/xstr.h>
 #include <sys/time.h>
@@ -175,12 +176,12 @@ if (IS_MASTER && siridb_is_reindexing(siridb))                              \
     "Successfully dropped server '%s'."
 #define MSG_SUCCES_SET_LOG_LEVEL_MULTI \
     "Successfully set log level to '%s' on %lu servers."
-#define MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI \
-    "Successfully set tee_pipe name on %lu servers."
+#define MSG_SUCCES_SET_TEE_DISABLED \
+    "Successfully disabled tee."
+#define MSG_SUCCES_SET_TEE_ENABLED \
+    "Successfully configures tee to %s."
 #define MSG_SUCCES_SET_LOG_LEVEL \
     "Successfully set log level to '%s' on '%s'."
-#define MSG_SUCCES_SET_TEE_PIPE_NAME \
-    "Successfully set tee pipe name to '%s' on '%s'."
 #define MSG_SUCCESS_SET_SELECT_POINTS_LIMIT \
     "Successfully changed select points limit from %" PRIu32 " to %" PRIu32 "."
 #define MSG_SUCCES_DROP_SERIES \
@@ -287,7 +288,7 @@ static void exit_set_list_limit(uv_async_t * handle);
 static void exit_set_log_level(uv_async_t * handle);
 static void exit_set_port(uv_async_t * handle);
 static void exit_set_select_points_limit(uv_async_t * handle);
-static void exit_set_tee_pipe_name(uv_async_t * handle);
+static void exit_set_tee(uv_async_t * handle);
 static void exit_set_timezone(uv_async_t * handle);
 static void exit_show_stmt(uv_async_t * handle);
 static void exit_tag_series(uv_async_t * handle);
@@ -543,7 +544,7 @@ void siridb_init_listener(void)
     SIRIDB_NODE_EXIT[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level;
     SIRIDB_NODE_EXIT[CLERI_GID_SET_PORT] = exit_set_port;
     SIRIDB_NODE_EXIT[CLERI_GID_SET_SELECT_POINTS_LIMIT] = exit_set_select_points_limit;
-    SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name;
+    SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE] = exit_set_tee;
     SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
     SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt;
     SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series;
@@ -4737,137 +4738,93 @@ static void exit_set_select_points_limit(uv_async_t * handle)
     }
 }
 
-static void exit_set_tee_pipe_name(uv_async_t * handle)
+static void exit_set_tee(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
-    query_alter_t * q_alter = (query_alter_t *) query->data;
     siridb_t * siridb = query->client->siridb;
 
-    assert (query->data != NULL);
+    MASTER_CHECK_ACCESSIBLE(siridb)
 
     cleri_node_t * node = cleri_gn(cleri_gn(
             query->nodes->node->children->next->next)->children);
 
-    char pipe_name[node->len - 1];
-    char * p_pipe_name = NULL;
+    char tee_addr_port[node->len - 1];
+    char tee_address[SIRI_CFG_MAX_LEN_ADDRESS];
+    uint16_t tee_port = SIRIDB_TEE_DEFAULT_TCP_PORT;
 
-    if (node->cl_obj->gid == CLERI_GID_STRING)
-    {
-        xstr_extract_string(pipe_name, node->str, node->len);
-        p_pipe_name = pipe_name;
-    }
 
-    if (q_alter->alter_tp == QUERY_ALTER_SERVERS)
+    if (node->cl_obj->gid == CLERI_GID_STRING)
     {
-        /*
-         * alter_servers
-         */
-        cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr;
-        siridb_server_walker_t wserver = {
-            .server=siridb->server,
-            .siridb=siridb
-        };
+        xstr_extract_string(tee_addr_port, node->str, node->len);
 
-        if (where_expr == NULL || cexpr_run(
-                where_expr,
-                (cexpr_cb_t) siridb_server_cexpr_cb,
-                &wserver))
+        if (tee_addr_port[0] == 0)
         {
-            (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
-            if (siridb_save(siridb))
-            {
-                log_critical("Could not save database changes (database: '%s')",
-                        siridb->dbname);
-            }
-            q_alter->n++;
+            snprintf(query->err_msg,
+                    SIRIDB_MAX_SIZE_ERR_MSG,
+                    "Tee address must not be empty");
+            siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            return;
         }
 
-        if (IS_MASTER)
+        if (sirinet_extract_addr_port(tee_addr_port, tee_address, &tee_port))
         {
-            /*
-             * This is a trick because we share with setting log level on
-             * multiple servers at once.
-             */
-            q_alter->n += LOGGER_NUM_LEVELS << 16;
-            siridb_query_forward(
-                    handle,
-                    SIRIDB_QUERY_FWD_SERVERS,
-                    (sirinet_promises_cb) on_alter_xxx_response,
-                    0);
+            snprintf(query->err_msg,
+                    SIRIDB_MAX_SIZE_ERR_MSG,
+                    "Invalid tee address; expecting ADDRESS[:PORT]");
+            siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            return;
         }
-        else
+
+        if (siridb_tee_set_address_port(siridb->tee, tee_address, tee_port))
         {
-            qp_add_raw(query->packer, (const unsigned char *) "servers", 7);
-            qp_add_int64(query->packer, q_alter->n);
-            SIRIPARSER_ASYNC_NEXT_NODE
+            log_error("Failed to set tee address");  /* continue on error..*/
         }
     }
     else
     {
-        /*
-         * alter_server
-         *
-         * we can set the success message, we just ignore the message in case
-         * an error occurs.
-         */
-        siridb_server_t * server = q_alter->via.server;
+        /* disable the tee */
+        (void) siridb_tee_set_address_port(siridb->tee, NULL, 0);
+    }
 
+    if (siridb_save(siridb))
+    {
+        snprintf(query->err_msg,
+                SIRIDB_MAX_SIZE_ERR_MSG,
+                "Error while saving database changes!");
+        siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+    }
+    else
+    {
         QP_ADD_SUCCESS
-        qp_add_fmt_safe(query->packer,
-                    MSG_SUCCES_SET_TEE_PIPE_NAME,
-                    p_pipe_name ? p_pipe_name : "disabled",
-                    server->name);
 
-        if (server == siridb->server)
+        if (siridb->tee->address)
         {
-            (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
-            if (siridb_save(siridb))
-            {
-                log_critical("Could not save database changes (database: '%s')",
-                        siridb->dbname);
-            }
-
-            SIRIPARSER_ASYNC_NEXT_NODE
+            log_info(
+                    MSG_SUCCES_SET_TEE_ENABLED,
+                    siridb->tee->address,
+                    siridb->tee->port);
+            qp_add_fmt_safe(query->packer,
+                    MSG_SUCCES_SET_TEE_ENABLED,
+                    siridb->tee->address,
+                    siridb->tee->port);
         }
         else
         {
+            log_info(MSG_SUCCES_SET_TEE_DISABLED);
+            qp_add_string(query->packer, MSG_SUCCES_SET_TEE_DISABLED);
+        }
 
-            if (siridb_server_is_online(server))
-            {
-                sirinet_pkg_t * pkg = sirinet_pkg_new(
-                        0,
-                        p_pipe_name ? strlen(p_pipe_name) : 0,
-                        BPROTO_TEE_PIPE_NAME_UPDATE,
-                        (unsigned char *) p_pipe_name);
-                if (pkg != NULL)
-                {
-                    /* handle will be bound to a timer so we should increment */
-                    siri_async_incref(handle);
-                    if (siridb_server_send_pkg(
-                            server,
-                            pkg,
-                            0,
-                            (sirinet_promise_cb) on_ack_response,
-                            handle,
-                            0))
-                    {
-                        /*
-                         * signal is raised and 'on_ack_response' will not be
-                         * called
-                         */
-                        free(pkg);
-                        siri_async_decref(&handle);
-                    }
-                }
-            }
-            else
-            {
-                snprintf(query->err_msg,
-                        SIRIDB_MAX_SIZE_ERR_MSG,
-                        "Cannot set pipe name, '%s' is currently unavailable",
-                        server->name);
-                siridb_query_send_error(handle, CPROTO_ERR_QUERY);
-            }
+        if (IS_MASTER)
+        {
+            siridb_query_forward(
+                    handle,
+                    SIRIDB_QUERY_FWD_UPDATE,
+                    (sirinet_promises_cb) on_update_xxx_response,
+                    0);
+        }
+        else
+        {
+            SIRIPARSER_ASYNC_NEXT_NODE
         }
     }
 }
@@ -5928,27 +5885,15 @@ static void on_alter_xxx_response(vec_t * promises, uv_async_t * handle)
      */
     QP_ADD_SUCCESS
 
-    if ((q_alter->n >> 16) >= LOGGER_NUM_LEVELS)
-    {
-        log_info(MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, q_alter->n & 0xffff);
-        qp_add_fmt_safe(
-                query->packer,
-                MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI,
-                q_alter->n & 0xffff);
-    }
-    else
-    {
-        log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
-                logger_level_name(q_alter->n >> 16),
-                q_alter->n & 0xffff);
-
-        qp_add_fmt_safe(
-                query->packer,
-                MSG_SUCCES_SET_LOG_LEVEL_MULTI,
-                logger_level_name(q_alter->n >> 16),
-                q_alter->n & 0xffff);
-    }
+    log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+            logger_level_name(q_alter->n >> 16),
+            q_alter->n & 0xffff);
 
+    qp_add_fmt_safe(
+            query->packer,
+            MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+            logger_level_name(q_alter->n >> 16),
+            q_alter->n & 0xffff);
 
     SIRIPARSER_ASYNC_NEXT_NODE
 }
index a555bf25f079993c14424bc95760810eb776e527..a14e5a68e1dae2383e677c662c80e17e7fec117a 100644 (file)
@@ -172,7 +172,7 @@ static void prop_sync_progress(
         siridb_t * siridb,
         qp_packer_t * packer,
         int map);
-static void prop_tee_pipe_name(
+static void prop_tee(
         siridb_t * siridb,
         qp_packer_t * packer,
         int map);
@@ -270,8 +270,8 @@ void siridb_init_props(void)
             prop_status);
     props_set_cb(CLERI_GID_K_SYNC_PROGRESS - KW_OFFSET,
             prop_sync_progress);
-    props_set_cb(CLERI_GID_K_TEE_PIPE_NAME - KW_OFFSET,
-            prop_tee_pipe_name);
+    props_set_cb(CLERI_GID_K_TEE - KW_OFFSET,
+            prop_tee);
     props_set_cb(CLERI_GID_K_TIMEZONE - KW_OFFSET,
             prop_timezone);
     props_set_cb(CLERI_GID_K_TIME_PRECISION - KW_OFFSET,
@@ -572,13 +572,13 @@ static void prop_sync_progress(
     qp_add_string(packer, siridb_initsync_sync_progress(siridb));
 }
 
-static void prop_tee_pipe_name(
+static void prop_tee(
         siridb_t * siridb,
         qp_packer_t * packer,
         int map)
 {
-    SIRIDB_PROP_MAP("tee_pipe_name", 13)
-    qp_add_string(packer, tee_str(siridb->tee));
+    SIRIDB_PROP_MAP("tee", 3)
+    qp_add_string(packer, siridb_tee_str(siridb->tee));
 }
 
 static void prop_timezone(
index 7b4299223676b1ed692e7e997c7dcc6e9b929450..ccd2f6452df740c6be764fd51adbd16b43ee80f6 100644 (file)
@@ -1175,12 +1175,6 @@ int siridb_server_cexpr_cb(
                 cond->operator,
                 siridb_initsync_sync_progress(wserver->siridb),
                 cond->str);
-
-    case CLERI_GID_K_TEE_PIPE_NAME:
-        return cexpr_str_cmp(
-                cond->operator,
-                tee_str(wserver->siridb->tee),
-                cond->str);
     }
 
     log_critical("Unexpected server property received: %d", cond->prop);
index f4bebd05607abdcff8f95d44dbb4499836c01846..690f3b26170fc1281fa02348a2db6449b7e69569 100644 (file)
@@ -701,9 +701,6 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle)
         case CLERI_GID_K_SYNC_PROGRESS:
             qp_add_string(query->packer, siridb_initsync_sync_progress(siridb));
             break;
-        case CLERI_GID_K_TEE_PIPE_NAME:
-            qp_add_string(query->packer, tee_str(siridb->tee));
-            break;
         case CLERI_GID_K_UPTIME:
             qp_add_int64(
                     query->packer,
index 449dbab7a06ac50a82eabf75944c5d63140ec654..568f2ebccd738913a04f85fee4c7d19173ab9146 100644 (file)
 #include <assert.h>
 #include <siri/db/tee.h>
 #include <siri/siri.h>
-#include <siri/net/pipe.h>
+#include <siri/net/tcp.h>
 #include <logger/logger.h>
 
 #define TEE__BUF_SZ 512
 static char tee__buf[TEE__BUF_SZ];
 
-static void tee__runtime_init(uv_pipe_t * pipe);
-static void tee__write_cb(uv_write_t * req, int status);
-static void tee__on_connect(uv_connect_t * req, int status);
-static void tee__close_cb(uv_pipe_t * pipe);
 static void tee__alloc_buffer(
-    uv_handle_t * handle,
-    size_t suggsz,
-    uv_buf_t * buf);
-static void tee__on_data(
-    uv_stream_t * client,
-    ssize_t nread,
-    const uv_buf_t * buf);
-
-siridb_tee_t * siridb_tee_new(void)
+    uv_handle_t * handle __attribute__((unused)),
+    size_t suggsz __attribute__((unused)),
+    uv_buf_t * buf)
 {
-    siridb_tee_t * tee = malloc(sizeof(siridb_tee_t));
-    if (tee == NULL)
-    {
-        return NULL;
-    }
-    tee->pipe_name_ = NULL;
-    tee->err_msg_ = NULL;
-    tee->pipe.data = tee;
-    tee->flags = SIRIDB_TEE_FLAG;
-    return tee;
+    buf->base = tee__buf;
+    buf->len = TEE__BUF_SZ;
 }
 
-void siridb_tee_free(siridb_tee_t * tee)
+static inline void tee__release(siridb_tee_t * tee)
 {
-    free(tee->err_msg_);
-    free(tee->pipe_name_);
-    free(tee);
+    sirinet_promise_decref(tee->promise_);
+    tee->promise_ = NULL;
+    uv_mutex_unlock(&tee->lock_);
 }
 
-int siridb_tee_connect(siridb_tee_t * tee)
+static void tee__write_cb(uv_write_t * req, int status)
 {
-    if (tee->flags & SIRIDB_TEE_FLAG_CONNECTING)
+    sirinet_promise_t * promise = req->data;
+    if (status)
     {
-        return 0;
+        log_error("Socket (tee) write error: %s", uv_strerror(status));
     }
-    tee->flags |= SIRIDB_TEE_FLAG_CONNECTING;
-    uv_connect_t * req = malloc(sizeof(uv_connect_t));
-    if (req == NULL)
+    sirinet_promise_decref(promise);
+    free(req);
+}
+
+static void tee__on_data(
+    uv_tcp_t * tcp,
+    ssize_t nread,
+    const uv_buf_t * buf __attribute__((unused)))
+{
+    siridb_tee_t * tee = tcp->data;
+    if (!tee)
     {
-        return -1;
+        return;
     }
 
-    req->data = tee;
-
-    if (uv_pipe_init(siri.loop, &tee->pipe, 0))
+    if (nread < 0)
     {
-        tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING;
-        free(req);
-        return -1;
+        if (nread != UV_EOF)
+        {
+            log_error("Read error on tee '%s': '%s'",
+                tee->address,
+                uv_err_name(nread));
+        }
+
+        log_info("Disconnected from tee `%s`", tee->address);
+
+        uv_close((uv_handle_t *) tcp, (uv_close_cb) free);
+        tee->tcp = NULL;
+        return;
     }
-    tee->flags |= SIRIDB_TEE_FLAG_INIT;
-    tee->pipe.data = tee;
 
-    uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect);
-    return 0;
+    log_debug("Got %zd bytes on tee `%s` which will be ignored",
+            nread,
+            tee->address);
 }
 
-int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name)
+static void tee__do_write(siridb_tee_t * tee, sirinet_promise_t * promise)
 {
-    free(tee->pipe_name_);
-    free(tee->err_msg_);
-    tee->err_msg_ = NULL;
+    uv_write_t * req = malloc(sizeof(uv_write_t));
+    uv_buf_t wrbuf = uv_buf_init(
+            (char *) promise->pkg,
+            sizeof(sirinet_pkg_t) + promise->pkg->len);
 
-    if (pipe_name == NULL)
+    if (req)
     {
-        tee->pipe_name_ = NULL;
+        int rc;
+        req->data = promise;
+        sirinet_promise_incref(promise);
+        rc = uv_write(req, (uv_stream_t *) tee->tcp, &wrbuf, 1, tee__write_cb);
+        if (rc == 0)
+        {
+            return;  /* success */
+        }
+        sirinet_promise_decref(promise);
+    }
+    log_error("Cannot write to tee");
+    return;
+}
 
-        if (tee->flags & SIRIDB_TEE_FLAG_CONNECTED)
+static void tee__on_connect(uv_connect_t * req, int status)
+{
+    siridb_tee_t * tee = req->data;
+
+    if (status == 0)
+    {
+        if (uv_read_start(
+                req->handle,
+                tee__alloc_buffer,
+                (uv_read_cb) tee__on_data))
         {
-            uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__close_cb);
+            /* Failed to start reading the tee connection */
+            tee->err_code = SIRIDB_TEE_E_READ;
+            log_error("Failed to open tee `%s` for reading", tee->address);
+            goto fail;
         }
-        return 0;
+
+        /* success */
+        log_info("Connection created to tee: '%s'", tee->address);
+        tee__do_write(tee, tee->promise_);
+        goto done;
     }
 
-    tee->pipe_name_ = strdup(pipe_name);
-    if (!tee->pipe_name_)
+    /* failed */
+    tee->err_code = SIRIDB_TEE_E_CONNECT;
+    log_warning(
+            "Cannot connect to tee '%s' (%s)",
+            tee->address,
+            uv_strerror(status));
+
+fail:
+    uv_close((uv_handle_t *) req->handle, (uv_close_cb) free);
+    tee->tcp = NULL;
+done:
+    tee__release(tee);
+    free(req);
+}
+
+void tee__make_connection(siridb_tee_t * tee, const struct sockaddr * dest)
+{
+    uv_connect_t * req = malloc(sizeof(uv_connect_t));
+    tee->tcp = malloc(sizeof(uv_tcp_t));
+    if (tee->tcp == NULL || req == NULL)
     {
-        return -1;
+        goto fail0;
     }
-    if (tee->flags & SIRIDB_TEE_FLAG_INIT)
+
+    req->data = tee;
+
+    log_debug("Trying to connect to tee '%s'...", tee->address);
+    (void) uv_tcp_init(siri.loop, tee->tcp);
+    (void) uv_tcp_connect(req, tee->tcp, dest, tee__on_connect);
+    return;
+
+fail0:
+    free(req);
+    free(tee->tcp);
+    tee->tcp = NULL;
+    tee__release(tee);
+}
+
+static void tee__on_resolved(
+        uv_getaddrinfo_t * resolver,
+        int status,
+        struct addrinfo * res)
+{
+    siridb_tee_t * tee = resolver->data;
+    free(resolver);
+
+    if (status < 0)
     {
-        uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__runtime_init);
+        log_error("Cannot resolve ip address for tee '%s' (error: %s)",
+                tee->address,
+                uv_err_name(status));
+        tee__release(tee);
+        return;
     }
-    else
+
+    if (Logger.level == LOGGER_DEBUG)
     {
-        tee__runtime_init(&tee->pipe);
+        char addr[47] = {'\0'};  /* enough for both ipv4 and ipv6 */
+
+        switch (res->ai_family)
+        {
+        case AF_INET:
+            uv_ip4_name((struct sockaddr_in *) res->ai_addr, addr, 16);
+            break;
+
+        case AF_INET6:
+            uv_ip6_name((struct sockaddr_in6 *) res->ai_addr, addr, 46);
+            break;
+
+        default:
+            sprintf(addr, "unsupported family");
+        }
+
+        log_debug(
+                "Resolved ip address '%s' for tee '%s', "
+                "trying to connect...",
+                addr, tee->address);
+
     }
-    return 0;
+
+    tee__make_connection(tee, (const struct sockaddr *) res->ai_addr);
 }
 
-void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+static void tee__resolve_dns(siridb_tee_t * tee, int ai_family)
 {
-    uv_write_t * req = malloc(sizeof(uv_write_t));
-    if (!req)
+    int result;
+    struct addrinfo hints;
+    char port[6]= {0};
+    uv_getaddrinfo_t * resolver = malloc(sizeof(uv_getaddrinfo_t));
+
+    hints.ai_family = ai_family;
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_protocol = IPPROTO_TCP;
+    hints.ai_flags = AI_NUMERICSERV;
+
+    if (resolver == NULL)
     {
-        log_error("Cannot allocate memory for tee request");
         return;
     }
 
-    req->data = promise;
-    sirinet_promise_incref(promise);
+    resolver->data = tee;
 
-    uv_buf_t wrbuf = uv_buf_init(
-            (char *) promise->pkg,
-            sizeof(sirinet_pkg_t) + promise->pkg->len);
+    sprintf(port, "%u", tee->port);
+
+    result = uv_getaddrinfo(
+            siri.loop,
+            resolver,
+            tee__on_resolved,
+            tee->address,
+            port,
+            &hints);
 
-    if (uv_write(req, (uv_stream_t *) &tee->pipe, &wrbuf, 1, tee__write_cb))
+    if (result)
     {
-        log_error("Cannot write to tee");
-        sirinet_promise_decref(promise);
+        log_error("getaddrinfo call error %s", uv_err_name(result));
+        free(resolver);
     }
 }
 
-const char * tee_str(siridb_tee_t * tee)
+void tee__connect(siridb_tee_t * tee)
 {
-    if (tee->err_msg_)
+    struct in_addr sa;
+    struct in6_addr sa6;
+
+    if (inet_pton(AF_INET, tee->address, &sa))
     {
-        return tee->err_msg_;
+        /* IPv4 */
+        struct sockaddr_in dest;
+        (void) uv_ip4_addr(tee->address, tee->port, &dest);
+        tee__make_connection(tee, (const struct sockaddr *) &dest);
+        return;
     }
-    if (tee->pipe_name_)
+
+    if (inet_pton(AF_INET6, tee->address, &sa6))
     {
-        return tee->pipe_name_;
+        /* IPv6 */
+        struct sockaddr_in6 dest6;
+        (void) uv_ip6_addr(tee->address, tee->port, &dest6);
+        tee__make_connection(tee, (const struct sockaddr *) &dest6);
+        return;
     }
-    return "disabled";
-}
 
+    /* Try DNS */
+    tee__resolve_dns(tee, dns_req_family_map(siri.cfg->ip_support));
+}
 
-static void tee__runtime_init(uv_pipe_t * pipe)
+siridb_tee_t * siridb_tee_new(void)
 {
-    siridb_tee_t * tee = pipe->data;
-
-    tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
-    tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING;
-    tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
-
-     if (siridb_tee_connect(tee))
-     {
-         log_error("Could not connect to tee at runtime");
-     }
+    siridb_tee_t * tee = malloc(sizeof(siridb_tee_t));
+    if (tee == NULL)
+    {
+        return NULL;
+    }
+    tee->address = NULL;
+    tee->tcp = NULL;
+    tee->promise_ = NULL;
+    tee->flags = SIRIDB_TEE_FLAG;
+    tee->err_code = 0;
+    uv_mutex_init(&tee->lock_);
+    return tee;
 }
 
-static void tee__close_cb(uv_pipe_t * pipe)
+void siridb_tee_free(siridb_tee_t * tee)
 {
-    siridb_tee_t * tee = pipe->data;
+    /* must be closed before free can be used */
+    assert (tee->tcp == NULL);
+    assert (tee->promise_ == NULL);
 
-    tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
-    tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING;
-    tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+    uv_mutex_destroy(&tee->lock_);
+    free(tee->address);
+    free(tee);
 }
 
-static void tee__write_cb(uv_write_t * req, int status)
+const char * siridb_tee_str(siridb_tee_t * tee)
 {
-    sirinet_promise_t * promise = req->data;
-    sirinet_promise_decref(promise);
-    if (status)
+    if (tee->err_code)
     {
-        log_error("Socket (tee) write error: %s", uv_strerror(status));
+        switch((enum siridb_tee_e_t) tee->err_code)
+        {
+        case SIRIDB_TEE_E_OK:       return "OK";
+        case SIRIDB_TEE_E_ALLOC:    return "memory allocation error";
+        case SIRIDB_TEE_E_READ:     return "failed to open socket for reading";
+        case SIRIDB_TEE_E_CONNECT:  return "failed to make connection";
+        }
     }
-    free(req);
+    if (tee->address)
+    {
+        return tee->address;
+    }
+    return "disabled";
 }
 
-static void tee__on_connect(uv_connect_t * req, int status)
+void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
 {
-    siridb_tee_t * tee = req->data;
-
-    if (status == 0)
+    if (!tee->address)
     {
-        log_info("Connection created to pipe: '%s'", tee->pipe_name_);
-        if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data))
-        {
-            free(tee->err_msg_);
-            if (asprintf(&tee->err_msg_,
-                    "Cannot open pipe '%s' for reading",
-                    tee->pipe_name_) >= 0)
-            {
-                log_warning(tee->err_msg_);
-            }
-            goto fail;
-        }
-        tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
-        goto done;
+        /* Tee is not configured */
+        return;
     }
 
-    free(tee->err_msg_);
-    tee->err_msg_ = NULL;
-
-    if (asprintf(
-            &tee->err_msg_,
-            "Cannot connect to pipe '%s' (%s)",
-            tee->pipe_name_,
-            uv_strerror(status)) >= 0)
+    if (tee->tcp)
     {
-        log_warning(tee->err_msg_);
+        tee__do_write(tee, promise);
     }
+    else
+    {
+        uv_mutex_lock(&tee->lock_);
 
-fail:
-    uv_close((uv_handle_t *) req->handle, (uv_close_cb) tee__close_cb);
-done:
-    free(req);
+        if (!tee->address)
+        {
+            /* Tee is not configured */
+            uv_mutex_unlock(&tee->lock_);
+            return;
+        }
+
+        assert (tee->promise_ == NULL);
+        tee->promise_ = promise;
+        sirinet_promise_incref(promise);
+        tee__connect(tee);
+    }
 }
 
-static void tee__alloc_buffer(
-    uv_handle_t * handle __attribute__((unused)),
-    size_t suggsz __attribute__((unused)),
-    uv_buf_t * buf)
+int siridb_tee_set_address_port(
+        siridb_tee_t * tee,
+        const char * address,
+        uint16_t port)
 {
-    buf->base = tee__buf;
-    buf->len = TEE__BUF_SZ;
-}
+    uv_mutex_lock(&tee->lock_);
 
+    free(tee->address);
 
+    tee->err_code = SIRIDB_TEE_E_OK;
+    tee->address = address ? strdup(address) : NULL;
+    tee->port = port;
 
-static void tee__on_data(
-    uv_stream_t * client,
-    ssize_t nread,
-    const uv_buf_t * buf __attribute__((unused)))
-{
-    if (nread < 0)
+    if (tee->tcp && !uv_is_closing((uv_handle_t *) tee->tcp))
     {
-        if (nread != UV_EOF)
-        {
-            log_error("Read error on pipe '%s' : '%s'",
-                sirinet_pipe_name((uv_pipe_t * ) client),
-                uv_err_name(nread));
-        }
-        log_info("Disconnected from tee");
-        uv_close((uv_handle_t *) client, (uv_close_cb) tee__close_cb);
+        uv_close((uv_handle_t *) tee->tcp, (uv_close_cb) free);
+        tee->tcp = NULL;
     }
 
-    if (nread > 0)
-    {
-        log_debug("Got %zd bytes on tee which will be ignored", nread);
-    }
+    uv_mutex_unlock(&tee->lock_);
+
+    return address && !tee->address ? -1 : 0;
 }
index c533570e4772e4a29e931320845848d91ee5f0b2..403c6b04bb66fac9c4d7dd13d9cb7f5b0f9791a2 100644 (file)
@@ -5,7 +5,7 @@
  * should be used with the libcleri module.
  *
  * Source class: SiriGrammar
- * Created at: 2020-09-25 10:57:26
+ * Created at: 2022-04-15 12:10:03
  */
 
 #include "siri/grammar/grammar.h"
@@ -166,7 +166,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
     cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE);
     cleri_t * k_tag = cleri_keyword(CLERI_GID_K_TAG, "tag", CLERI_CASE_SENSITIVE);
     cleri_t * k_tags = cleri_keyword(CLERI_GID_K_TAGS, "tags", CLERI_CASE_SENSITIVE);
-    cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE);
+    cleri_t * k_tee = cleri_keyword(CLERI_GID_K_TEE, "tee", CLERI_CASE_SENSITIVE);
     cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE);
     cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE);
     cleri_t * k_timeval = cleri_keyword(CLERI_GID_K_TIMEVAL, "timeval", CLERI_CASE_SENSITIVE);
@@ -312,7 +312,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
     cleri_t * server_columns = cleri_list(CLERI_GID_SERVER_COLUMNS, cleri_choice(
         CLERI_NONE,
         CLERI_FIRST_MATCH,
-        29,
+        28,
         k_address,
         k_buffer_path,
         k_buffer_size,
@@ -340,7 +340,6 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         k_reindex_progress,
         k_selected_points,
         k_sync_progress,
-        k_tee_pipe_name,
         k_uptime
     ), cleri_token(CLERI_NONE, ","), 1, 0, 0);
     cleri_t * group_columns = cleri_list(CLERI_GID_GROUP_COLUMNS, cleri_choice(
@@ -625,7 +624,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
                 cleri_choice(
                     CLERI_NONE,
                     CLERI_FIRST_MATCH,
-                    12,
+                    11,
                     k_address,
                     k_buffer_path,
                     k_dbpath,
@@ -636,8 +635,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
                     k_version,
                     k_status,
                     k_reindex_progress,
-                    k_sync_progress,
-                    k_tee_pipe_name
+                    k_sync_progress
                 ),
                 str_operator,
                 string
@@ -1153,11 +1151,11 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         k_address,
         string
     );
-    cleri_t * set_tee_pipe_name = cleri_sequence(
-        CLERI_GID_SET_TEE_PIPE_NAME,
+    cleri_t * set_tee = cleri_sequence(
+        CLERI_GID_SET_TEE,
         3,
         k_set,
-        k_tee_pipe_name,
+        k_tee,
         cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
@@ -1278,13 +1276,14 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
-            6,
+            7,
             set_drop_threshold,
             set_list_limit,
             set_select_points_limit,
             set_timezone,
             set_expiration_num,
-            set_expiration_log
+            set_expiration_log,
+            set_tee
         )
     );
     cleri_t * alter_group = cleri_sequence(
@@ -1320,10 +1319,9 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
-            5,
+            4,
             set_log_level,
             set_backup_mode,
-            set_tee_pipe_name,
             set_address,
             set_port
         )
@@ -1336,9 +1334,8 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
-            2,
-            set_log_level,
-            set_tee_pipe_name
+            1,
+            set_log_level
         )
     );
     cleri_t * alter_user = cleri_sequence(
@@ -1724,7 +1721,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
             k_startup_time,
             k_status,
             k_sync_progress,
-            k_tee_pipe_name,
+            k_tee,
             k_time_precision,
             k_timezone,
             k_uptime,
index 200740ce55b61af52ffb863b805ab352e9a360af..5c292869499dffa38bd44eaa074708435be6d5d9 100644 (file)
@@ -61,12 +61,6 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused)))
 
         siridb_update_shard_expiration(siridb);
 
-        if (    siridb_tee_is_configured(siridb->tee) &&
-                !siridb_tee_is_connected(siridb->tee))
-        {
-             siridb_tee_connect(siridb->tee);
-        }
-
         server_node = siridb->servers->first;
         while (server_node != NULL)
         {
index 2d59bab3a51903f4ecd8e72144ec9a3f24438594..2c2d34023e54b50b611918311d1f7c3d63cbd19d 100644 (file)
@@ -43,9 +43,6 @@ static void on_flags_update(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_log_level_update(
         sirinet_stream_t * client,
         sirinet_pkg_t * pkg);
-static void on_tee_pipe_name_update(
-        sirinet_stream_t * client,
-        sirinet_pkg_t * pkg);
 static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_query(
@@ -280,8 +277,8 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg)
     case BPROTO_DISABLE_BACKUP_MODE:
         on_disable_backup_mode(client, pkg);
         break;
-    case BPROTO_TEE_PIPE_NAME_UPDATE:
-        on_tee_pipe_name_update(client, pkg);
+    case _BPROTO_TEE_PIPE_NAME_UPDATE:
+        log_warning("BPROTO_TEE_PIPE_NAME_UPDATE is deprecated");
         break;
     case BPROTO_DROP_DATABASE:
         on_drop_database(client, pkg);
@@ -450,30 +447,6 @@ static void on_log_level_update(sirinet_stream_t * client, sirinet_pkg_t * pkg)
     }
 }
 
-static void on_tee_pipe_name_update(
-        sirinet_stream_t * client,
-        sirinet_pkg_t * pkg)
-{
-    SERVER_CHECK_AUTHENTICATED(client, server);
-    siridb_t * siridb = client->siridb;
-    sirinet_pkg_t * package;
-
-    char * pipe_name = pkg->len
-            ? strndup((const char *) pkg->data, pkg->len)
-            : NULL;
-
-    (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
-
-    free(pipe_name);
-
-    package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL);
-    if (package != NULL)
-    {
-        /* ignore result code, signal can be raised */
-        sirinet_pkg_send(client, package);
-    }
-}
-
 static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg)
 {
     SERVER_CHECK_AUTHENTICATED(client, server)
index fc90cd9847af9faaba1b80a8cf44296b094f28e0..a3b8682b457c2e8fadb385dc730c0a2da01626bc 100644 (file)
@@ -84,7 +84,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n)
     case BPROTO_REQ_GROUPS: return "BPROTO_REQ_GROUPS";
     case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE";
     case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
-    case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
+    case _BPROTO_TEE_PIPE_NAME_UPDATE: return "_BPROTO_TEE_PIPE_NAME_UPDATE";
     case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
     case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
     case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS";
index 1eb4fb20041238d9f4ed81608acca841126275cd..6e433f7e4a47e446be2e6450f88d8ae5a244c885 100644 (file)
@@ -620,6 +620,7 @@ static void CLIENT_on_file_database(
     qp_fpacker_t * fpacker;
     qp_unpacker_t unpacker;
     qp_obj_t
+        qp_obj,
         qp_uuid,
         qp_schema,
         qp_dbname,
@@ -632,7 +633,9 @@ static void CLIENT_on_file_database(
         qp_points_limit,
         qp_list_limit,
         qp_exp_log,
-        qp_exp_num;
+        qp_exp_num,
+        qp_tee_address,
+        qp_tee_port;
     siridb_t * siridb;
     int rc;
     /* 13 = strlen("database.dat")+1  */
@@ -656,16 +659,57 @@ static void CLIENT_on_file_database(
         return;
     }
 
-    /* list and points limit require at least schema 1 */
-    (void) qp_next(&unpacker, &qp_points_limit);
-    (void) qp_next(&unpacker, &qp_list_limit);
+    if (qp_schema.via.int64 >= 2)
+    {
+        if (qp_next(&unpacker, &qp_points_limit) != QP_INT64||
+            qp_next(&unpacker, &qp_list_limit) != QP_INT64)
+        {
+            CLIENT_err(adm_client, "invalid database file received");
+            return;
+        }
+    }
+    else
+    {
+        qp_points_limit.via.int64 = DEF_SELECT_POINTS_LIMIT;
+        qp_list_limit.via.int64 = DEF_LIST_LIMIT;
+    }
+
+    if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6)
+    {
+        qp_next(&unpacker, &qp_obj);
+        /* Skip the tee pipe name */
+    }
 
-    /* this is the tee pipe name when schema is >= 5 */
-    (void) qp_next(&unpacker, &qp_exp_log);
-    (void) qp_next(&unpacker, &qp_exp_num);
+    if (qp_schema.via.int64 >= 6)
+    {
+        if (qp_next(&unpacker, &qp_exp_log) != QP_INT64||
+            qp_next(&unpacker, &qp_exp_num) != QP_INT64)
+        {
+            CLIENT_err(adm_client, "invalid database file received");
+            return;
+        }
+    }
+    else
+    {
+        qp_exp_log.via.int64 = 0;
+        qp_exp_num.via.int64 = 0;
+    }
 
-    /* these are the expiration times when schema is >= 6 */
+    if (qp_schema.via.int64 >= 7)
+    {
 
+        if (qp_next(&unpacker, &qp_tee_address) == QP_ERR ||
+            qp_next(&unpacker, &qp_tee_port) != QP_INT64)
+        {
+            CLIENT_err(adm_client, "invalid database file received");
+            return;
+        }
+    }
+    else
+    {
+        qp_tee_address.tp = QP_NULL;
+        qp_tee_port.via.int64 = SIRIDB_TEE_DEFAULT_TCP_PORT;
+    }
 
     if ((fpacker = qp_open(fn, "w")) == NULL)
     {
@@ -673,6 +717,8 @@ static void CLIENT_on_file_database(
         return;
     }
 
+    LOGC("HERE");
+
     rc = (  qp_fadd_type(fpacker, QP_ARRAY_OPEN) ||
             qp_fadd_int64(fpacker, SIRIDB_SCHEMA) ||
             qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) ||
@@ -683,19 +729,17 @@ static void CLIENT_on_file_database(
             qp_fadd_int64(fpacker, qp_duration_log.via.int64) ||
             qp_fadd_raw(fpacker, qp_timezone.via.raw, qp_timezone.len) ||
             qp_fadd_double(fpacker, qp_drop_threshold.via.real) ||
-            qp_fadd_int64(fpacker, qp_points_limit.tp == QP_INT64
-                    ? qp_points_limit.via.int64
-                    : DEF_SELECT_POINTS_LIMIT) ||
-            qp_fadd_int64(fpacker, qp_list_limit.tp == QP_INT64
-                    ? qp_list_limit.via.int64
-                    : DEF_LIST_LIMIT) ||
-            qp_fadd_type(fpacker, QP_NULL) ||
-            qp_fadd_int64(fpacker, qp_exp_log.tp == QP_INT64
-                    ? qp_exp_log.via.int64
-                    : 0) ||
-            qp_fadd_int64(fpacker, qp_exp_num.tp == QP_INT64
-                    ? qp_exp_num.via.int64
-                    : 0) ||
+            qp_fadd_int64(fpacker, qp_points_limit.via.int64) ||
+            qp_fadd_int64(fpacker, qp_list_limit.via.int64) ||
+            qp_fadd_int64(fpacker, qp_exp_log.via.int64) ||
+            qp_fadd_int64(fpacker, qp_exp_num.via.int64) ||
+            (qp_tee_address.tp == QP_RAW
+                    ? qp_fadd_raw(
+                            fpacker,
+                            qp_tee_address.via.raw,
+                            qp_tee_address.len)
+                    : qp_fadd_type(fpacker, QP_NULL)) ||
+            qp_fadd_int64(fpacker, qp_tee_port.via.int64) ||
             qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
             qp_close(fpacker));
 
index 12560ce1fbe49eb44e8661ee3ec5aafdba137758..bc0fc2396e5ddd57f56db1f7b48d7c5e789aae5a 100644 (file)
@@ -10,6 +10,7 @@
 #include <siri/db/reindex.h>
 #include <siri/db/server.h>
 #include <siri/db/servers.h>
+#include <siri/db/tee.h>
 #include <siri/service/account.h>
 #include <siri/service/client.h>
 #include <siri/service/request.h>
@@ -720,9 +721,10 @@ static cproto_server_t SERVICE_on_new_database(
         qp_fadd_double(fp, DEF_DROP_THRESHOLD) ||
         qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) ||
         qp_fadd_int64(fp, DEF_LIST_LIMIT) ||
-        qp_fadd_type(fp, QP_NULL) ||
         qp_fadd_int64(fp, 0) ||
         qp_fadd_int64(fp, 0) ||
+        qp_fadd_type(fp, QP_NULL) ||
+        qp_fadd_int64(fp, SIRIDB_TEE_DEFAULT_TCP_PORT) ||
         qp_fadd_type(fp, QP_ARRAY_CLOSE))
     {
         rc = -1;
index fd250e3429c695af9d9e2da9a7f892c1d3cb7475..ce112dd2abf38d5726a635b9437e95db449c8cdd 100644 (file)
@@ -631,10 +631,15 @@ static void SIRI_walk_close_handlers(
     case UV_TCP:
     case UV_NAMED_PIPE:
         {
-            if (handle->data == NULL || siridb_tee_is_handle(handle))
+            if (handle->data == NULL)
             {
                 uv_close(handle, NULL);
             }
+            else if (siridb_tee_is_handle(handle))
+            {
+                // TODO: close tee handle
+                assert (0);
+            }
             else if (siri_health_is_handle(handle))
             {
                 siri_health_close((siri_health_request_t *) handle->data);