ondata
authorJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 29 Oct 2018 16:11:41 +0000 (17:11 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 29 Oct 2018 16:11:41 +0000 (17:11 +0100)
29 files changed:
Debug/src/siri/db/subdir.mk
Release/src/siri/db/subdir.mk
grammar/export_grammar.py
grammar/grammar.py
include/siri/db/db.h
include/siri/db/tee.h [new file with mode: 0644]
include/siri/grammar/grammar.h
include/siri/net/promise.h
include/siri/net/protocol.h
include/siri/net/stream.h
siridb.conf
src/cexpr/cexpr.c
src/logger/logger.c
src/qpack/qpack.c
src/siri/db/db.c
src/siri/db/insert.c
src/siri/db/listener.c
src/siri/db/props.c
src/siri/db/server.c
src/siri/db/servers.c
src/siri/db/tee.c [new file with mode: 0644]
src/siri/grammar/grammar.c
src/siri/heartbeat.c
src/siri/net/bserver.c
src/siri/net/pipe.c
src/siri/net/pkg.c
src/siri/net/protocol.c
src/siri/net/stream.c
src/siri/siri.c

index 57a30dea04a157305d1a9b23e2b144ec1b11f617..c294f8c25ea593e9b1ba8597b16ea31c5e6e7fb4 100644 (file)
@@ -38,6 +38,7 @@ C_SRCS += \
 ../src/siri/db/shard.c \
 ../src/siri/db/shards.c \
 ../src/siri/db/tasks.c \
+../src/siri/db/tee.c \
 ../src/siri/db/time.c \
 ../src/siri/db/user.c \
 ../src/siri/db/users.c \
@@ -79,6 +80,7 @@ OBJS += \
 ./src/siri/db/shard.o \
 ./src/siri/db/shards.o \
 ./src/siri/db/tasks.o \
+./src/siri/db/tee.o \
 ./src/siri/db/time.o \
 ./src/siri/db/user.o \
 ./src/siri/db/users.o \
@@ -120,6 +122,7 @@ C_DEPS += \
 ./src/siri/db/shard.d \
 ./src/siri/db/shards.d \
 ./src/siri/db/tasks.d \
+./src/siri/db/tee.d \
 ./src/siri/db/time.d \
 ./src/siri/db/user.d \
 ./src/siri/db/users.d \
index 96d8a76852dbaf236ec7030dea608e3f0f007558..154d084329c5219723e36a70453e9da05c6536e7 100644 (file)
@@ -38,6 +38,7 @@ C_SRCS += \
 ../src/siri/db/shard.c \
 ../src/siri/db/shards.c \
 ../src/siri/db/tasks.c \
+../src/siri/db/tee.c \
 ../src/siri/db/time.c \
 ../src/siri/db/user.c \
 ../src/siri/db/users.c \
@@ -79,6 +80,7 @@ OBJS += \
 ./src/siri/db/shard.o \
 ./src/siri/db/shards.o \
 ./src/siri/db/tasks.o \
+./src/siri/db/tee.o \
 ./src/siri/db/time.o \
 ./src/siri/db/user.o \
 ./src/siri/db/users.o \
@@ -120,6 +122,7 @@ C_DEPS += \
 ./src/siri/db/shard.d \
 ./src/siri/db/shards.d \
 ./src/siri/db/tasks.d \
+./src/siri/db/tee.d \
 ./src/siri/db/time.d \
 ./src/siri/db/user.d \
 ./src/siri/db/users.d \
index 22e14bc0abd65aab1d7f742c69454a217de74215..b0b1d72d133bbf444f3a96c4b0fa9e2db684e295 100755 (executable)
@@ -4,8 +4,8 @@
 Author: Jeroen van der Heijden (Transceptor Technology)
 Date: 2016-10-10
 '''
-import sys
-sys.path.insert(0, '../../pyleri/')
+import sys
+sys.path.insert(0, '../../pyleri/')
 import os
 from grammar import siri_grammar
 from pyleri import Grammar
index 3f2affa7c7571134bf17241aa92344d8c2cbfa9f..5f6d96ffdc98257bd78b8648dc279be407a3c8a3 100644 (file)
@@ -159,6 +159,7 @@ class SiriGrammar(Grammar):
         Keyword('symmetric_difference'),
         most_greedy=False)
     k_sync_progress = Keyword('sync_progress')
+    k_tee_pipe_name = Keyword('tee_pipe_name')
     k_timeit = Keyword('timeit')
     k_timezone = Keyword('timezone')
     k_time_precision = Keyword('time_precision')
@@ -277,6 +278,7 @@ class SiriGrammar(Grammar):
         k_reindex_progress,
         k_selected_points,
         k_sync_progress,
+        k_tee_pipe_name,
         k_uptime,
         most_greedy=False), ',', 1)
 
@@ -370,6 +372,7 @@ class SiriGrammar(Grammar):
             k_status,
             k_reindex_progress,
             k_sync_progress,
+            k_tee_pipe_name,
             most_greedy=False), str_operator, string),
         Sequence(k_online, bool_operator, _boolean),
         Sequence(k_log_level, int_operator, log_keywords),
@@ -562,6 +565,10 @@ class SiriGrammar(Grammar):
         Optional(Sequence(k_using, aggregate_functions)))
 
     set_address = Sequence(k_set, k_address, string)
+    set_tee_pipe_name = Sequence(k_set, k_tee_pipe_name, Choice(
+        k_false,
+        string,
+        most_greedy=False))
     set_backup_mode = Sequence(k_set, k_backup_mode, _boolean)
     set_drop_threshold = Sequence(k_set, k_drop_threshold, r_float)
     set_expression = Sequence(k_set, k_expression, r_regex)
@@ -590,11 +597,15 @@ class SiriGrammar(Grammar):
     alter_server = Sequence(k_server, uuid, Choice(
         set_log_level,
         set_backup_mode,
+        set_tee_pipe_name,
         set_address,
         set_port,
         most_greedy=False))
 
-    alter_servers = Sequence(k_servers, Optional(where_server), set_log_level)
+    alter_servers = Sequence(k_servers, Optional(where_server), Choice(
+        set_log_level,
+        set_tee_pipe_name,
+        most_greedy=False))
 
     alter_user = Sequence(k_user, string, Choice(
         set_password,
@@ -764,6 +775,7 @@ class SiriGrammar(Grammar):
         k_startup_time,
         k_status,
         k_sync_progress,
+        k_tee_pipe_name,
         k_time_precision,
         k_timezone,
         k_uptime,
index 1fcf2448d4afa603b744d767f3bcbd4fc44f0f8a..952f96d89644ed9d380c7ceb3f428b4d266a81e2 100644 (file)
@@ -8,7 +8,7 @@ typedef struct siridb_s siridb_t;
 
 #define SIRIDB_MAX_SIZE_ERR_MSG 1024
 #define SIRIDB_MAX_DBNAME_LEN 256  /*    255 + NULL     */
-#define SIRIDB_SCHEMA 4
+#define SIRIDB_SCHEMA 5
 #define SIRIDB_FLAG_REINDEXING 1
 
 #define DEF_DROP_THRESHOLD 1.0              /* 100%         */
@@ -35,6 +35,8 @@ typedef struct siridb_s siridb_t;
 #include <siri/db/tasks.h>
 #include <siri/db/time.h>
 #include <siri/db/buffer.h>
+#include <siri/db/tee.h>
+
 
 int32_t siridb_get_uptime(siridb_t * siridb);
 int8_t siridb_get_idle_percentage(siridb_t * siridb);
@@ -91,6 +93,7 @@ struct siridb_s
     siridb_reindex_t * reindex;
     siridb_groups_t * groups;
     siridb_buffer_t * buffer;
+    siridb_tee_t * tee;
     siridb_tasks_t tasks;
 };
 
diff --git a/include/siri/db/tee.h b/include/siri/db/tee.h
new file mode 100644 (file)
index 0000000..36739ee
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * tee.h - To tee the data for a SiriDB database.
+ */
+#ifndef SIRIDB_TEE_H_
+#define SIRIDB_TEE_H_
+
+typedef struct siridb_tee_s siridb_tee_t;
+
+enum
+{
+    SIRIDB_TEE_FLAG_INIT = 1<<0,
+    SIRIDB_TEE_FLAG_CONNECTED = 1<<1,
+    SIRIDB_TEE_FLAG = 1<<31,
+};
+
+#include <uv.h>
+#include <stdbool.h>
+#include <siri/net/promise.h>
+
+siridb_tee_t * siridb_tee_new(void);
+void siridb_tee_free(siridb_tee_t * tee);
+int siridb_tee_connect(siridb_tee_t * tee);
+int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name);
+void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise);
+const char * tee_str(siridb_tee_t * tee);
+static inline _Bool siridb_tee_is_configured(siridb_tee_t * tee);
+static inline _Bool siridb_tee_is_connected(siridb_tee_t * tee);
+
+struct siridb_tee_s
+{
+    uint32_t flags;  /* maps to sirnet_stream_t tp for cleanup */
+    char * pipe_name_;
+    char * err_msg_;
+    uv_pipe_t pipe;
+};
+
+static inline _Bool siridb_tee_is_configured(siridb_tee_t * tee)
+{
+    return tee->pipe_name_ != NULL;
+};
+
+static inline _Bool siridb_tee_is_connected(siridb_tee_t * tee)
+{
+    return tee->flags & SIRIDB_TEE_FLAG_CONNECTED;
+}
+
+#endif /* SIRIDB_TEE_H_ */
index b7c9033f34c2e82b79b7290a978309a46d92e7d3..45b8a8b2fcdcade40cb03cf71f9077a85fe154ff 100644 (file)
@@ -5,17 +5,17 @@
  * should be used with the libcleri module.
  *
  * Source class: SiriGrammar
- * Created at: 2018-07-05 16:20:26
+ * Created at: 2018-10-29 10:52:57
  */
 #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
 #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
 
 #include <cleri/cleri.h>
 
-cleri_grammar_t * compile_grammar(void);
+cleri_grammar_t * compile_siri_grammar_grammar(void);
 
 enum cleri_grammar_ids {
-    CLERI_NONE,   /* used for objects with no name  */
+    CLERI_NONE,   // used for objects with no name
     CLERI_GID_ACCESS_EXPR,
     CLERI_GID_ACCESS_KEYWORDS,
     CLERI_GID_AFTER_EXPR,
@@ -224,6 +224,7 @@ enum cleri_grammar_ids {
     CLERI_GID_K_SUM,
     CLERI_GID_K_SYMMETRIC_DIFFERENCE,
     CLERI_GID_K_SYNC_PROGRESS,
+    CLERI_GID_K_TEE_PIPE_NAME,
     CLERI_GID_K_TIMEIT,
     CLERI_GID_K_TIMEZONE,
     CLERI_GID_K_TIME_PRECISION,
@@ -288,6 +289,7 @@ enum cleri_grammar_ids {
     CLERI_GID_SET_PASSWORD,
     CLERI_GID_SET_PORT,
     CLERI_GID_SET_SELECT_POINTS_LIMIT,
+    CLERI_GID_SET_TEE_PIPE_NAME,
     CLERI_GID_SET_TIMEZONE,
     CLERI_GID_SHARD_COLUMNS,
     CLERI_GID_SHOW_STMT,
@@ -306,7 +308,7 @@ enum cleri_grammar_ids {
     CLERI_GID_WHERE_SHARD,
     CLERI_GID_WHERE_USER,
     CLERI_GID__BOOLEAN,
-    CLERI_END  /* can be used to get the enum length  */
+    CLERI_END // can be used to get the enum length
 };
 
 #endif /* CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ */
index 61f7e0575832664eb413d23ead6bbb585126fc5c..85273e27f78ffd7f96165f825faff35b5cfab2d9 100644 (file)
@@ -30,8 +30,8 @@ typedef void (* sirinet_promise_cb)(
 
 const char * sirinet_promise_strstatus(sirinet_promise_status_t status);
 
-#define sirinet_promise_incref(promise) promise->ref++
-#define sirinet_promise_decref(promise) if (!--promise->ref) free(promise)
+#define sirinet_promise_incref(p__) (p__)->ref++
+#define sirinet_promise_decref(p__) if (!--(p__)->ref) free(p__)
 
 /* the callback will always be called and is responsible to free the promise */
 struct sirinet_promise_s
index 682e5c3015d4c7f937cfd5f06b4b3be7484a140c..455f6c9885e88366c42263a4ed0ef32730f8adbf 100644 (file)
@@ -77,6 +77,7 @@ typedef enum
     BPROTO_REQ_GROUPS,                  /* empty                            */
     BPROTO_ENABLE_BACKUP_MODE,          /* empty                            */
     BPROTO_DISABLE_BACKUP_MODE,         /* empty                            */
+    BPROTO_TEE_PIPE_NAME_UPDATE,        /* tee pipe name                    */
 } bproto_client_t;
 
 /*
@@ -123,7 +124,8 @@ typedef enum
     BPROTO_ACK_DROP_SERIES,                     /* empty                    */
     BPROTO_ACK_ENABLE_BACKUP_MODE,              /* empty                    */
     BPROTO_ACK_DISABLE_BACKUP_MODE,             /* empty                    */
-    BPROTO_RES_GROUPS                           /* [[name, series], ...]    */
+    BPROTO_RES_GROUPS,                          /* [[name, series], ...]    */
+    BPROTO_ACK_TEE_PIPE_NAME                    /* empty                    */
 
 } bproto_server_t;
 
index 48afb3d37372b513bc1f4c5a7b419077322c8ded..2e06b6e2d7092bbf8cd1b8fe6571aab4d5608a51 100644 (file)
@@ -48,7 +48,7 @@ void sirinet__stream_free(uv_stream_t * uvclient);
 
 struct sirinet_stream_s
 {
-    sirinet_stream_tp_t tp;
+    uint32_t tp;        /* maps to siridb_tee_t flags for cleanup */
     uint32_t ref;
     on_data_cb_t on_data;
     siridb_t * siridb;
index 689e11a47ca55413dda1f50fe89b01edc254a248..21c6ab23d2feb9ac20368541f2fcfeeaa364337f 100644 (file)
@@ -53,9 +53,9 @@ optimize_interval = 3600
 heartbeat_interval = 30
 
 #
-# SiriDB can run fsync on the buffer file on an interval in milliseconds. 
-# This value is set to 0 by default which tells SiriDB to run fsync after 
-# each insert request. When having many insert requests per second, it can be 
+# SiriDB can run fsync on the buffer file on an interval in milliseconds.
+# This value is set to 0 by default which tells SiriDB to run fsync after
+# each insert request. When having many insert requests per second, it can be
 # useful to use an interval like 500 milliseconds.
 #
 #buffer_sync_interval = 500
@@ -83,3 +83,4 @@ enable_pipe_support = 0
 # SiriDB will bind the client named pipe in this location.
 #
 pipe_client_name = siridb_client.sock
+
index 6bd01fd1a07b9d1edffea92dd6a2689155724d4e..54ce516d9b2571985027838f02f014c86252a534 100644 (file)
@@ -570,8 +570,7 @@ static cexpr_t * CEXPR_new(void)
  */
 static cexpr_condition_t * CEXPR_condition_new(void)
 {
-    cexpr_condition_t * condition =
-            (cexpr_condition_t *) malloc(sizeof(cexpr_condition_t));
+    cexpr_condition_t * condition = malloc(sizeof(cexpr_condition_t));
 
     if (condition != NULL)
     {
index 6cad93508283753fa5ae1e71c42ff1a98d15e7a6..e1750ee273aad2a8d1258b546e7929fdc01d0ad7 100644 (file)
@@ -8,7 +8,7 @@
 #include <time.h>
 
 logger_t Logger = {
-        .level=10,
+        .level=2,
         .level_name=NULL,
         .ostream=NULL,
         .flags=0
index 8bade381b589c7c9d288db8687a6034ec5deee36..032bbf258ebc979dea3a4fccea2be7fadd3732ed 100644 (file)
@@ -169,14 +169,14 @@ qp_unpacker_t * qp_unpacker_ff(const char * fn)
     }
     else
     {
-        unpacker = (qp_unpacker_t *) malloc(sizeof(qp_unpacker_t));
+        unpacker = malloc(sizeof(qp_unpacker_t));
         if (unpacker == NULL)
         {
             ERR_ALLOC
         }
         else
         {
-            unpacker->source = (unsigned char *) malloc(size);
+            unpacker->source = malloc(size);
             if (unpacker->source == NULL)
             {
                 ERR_ALLOC
index 7c6f329cf8a0dff2fea2098e3e77a9aea1e68fac..cf765268e3af8733f1de75c157b6ee1a4def2860 100644 (file)
@@ -269,6 +269,12 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags)
     /* start tasks */
     siridb_tasks_init(&siridb->tasks);
 
+    /* init tee if configured */
+    if (siridb_tee_is_configured(siridb->tee))
+    {
+        siridb_tee_connect(siridb->tee);
+    }
+
     log_info("Finished loading database: '%s'", siridb->dbname);
 
     return siridb;
@@ -302,7 +308,8 @@ static int siridb__from_unpacker(
     /* check schema */
     if (    qp_schema.via.int64 == 1 ||
             qp_schema.via.int64 == 2 ||
-            qp_schema.via.int64 == 3)
+            qp_schema.via.int64 == 3 ||
+            qp_schema.via.int64 == 4)
     {
         log_info(
                 "Found an old database schema (v%d), "
@@ -463,6 +470,37 @@ static int siridb__from_unpacker(
         (*siridb)->list_limit = qp_obj.via.int64;
     }
 
+    /* for older schemas we keep the default tee_pipe_name=NULL */
+    if (qp_schema.via.int64 >= 5)
+    {
+        qp_next(unpacker, &qp_obj);
+
+        if (qp_obj.tp == QP_RAW)
+        {
+            (*siridb)->tee->pipe_name_ = strndup(
+                (char *) qp_obj.via.raw,
+                qp_obj.len);
+            READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
+        }
+        else if (qp_obj.tp != QP_NULL)
+        {
+            READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.")
+        }
+    }
+    if ((*siridb)->tee->pipe_name_ == NULL)
+    {
+        log_debug(
+            "No tee pipe name configured for database: %s",
+            (*siridb)->dbname);
+    }
+    else
+    {
+        log_debug(
+            "Using tee pipe name '%s' for database: '%s'",
+            (*siridb)->tee->pipe_name_,
+            (*siridb)->dbname);
+    }
+
     return (qp_schema.via.int64 == SIRIDB_SCHEMA) ? 0 : qp_schema.via.int64;
 }
 
@@ -553,6 +591,9 @@ int siridb_save(siridb_t * siridb)
             qp_fadd_double(fpacker, siridb->drop_threshold) ||
             qp_fadd_int64(fpacker, siridb->select_points_limit) ||
             qp_fadd_int64(fpacker, siridb->list_limit) ||
+            (siridb->tee->pipe_name_ == NULL
+                ? qp_fadd_type(fpacker, QP_NULL)
+                : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) ||
             qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
             qp_close(fpacker));
 }
@@ -645,6 +686,11 @@ void siridb__free(siridb_t * siridb)
         siridb_groups_decref(siridb->groups);
     }
 
+    if (siridb->tee != NULL)
+    {
+        siridb_tee_free(siridb->tee);
+    }
+
     /* unlock the database in case no siri_err occurred */
     if (!siri_err)
     {
@@ -673,89 +719,81 @@ static siridb_t * siridb__new(void)
     siridb_t * siridb = (siridb_t *) malloc(sizeof(siridb_t));
     if (siridb == NULL)
     {
-        ERR_ALLOC
+        goto fail0;
     }
-    else
+
+    siridb->dbname = NULL;
+    siridb->dbpath = NULL;
+    siridb->ref = 1;
+    siridb->insert_tasks = 0;
+    siridb->flags = 0;
+    siridb->time = NULL;
+    siridb->users = NULL;
+    siridb->servers = NULL;
+    siridb->pools = NULL;
+    siridb->max_series_id = 0;
+    siridb->received_points = 0;
+    siridb->selected_points = 0;
+    siridb->drop_threshold = DEF_DROP_THRESHOLD;
+    siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT;
+    siridb->list_limit = DEF_LIST_LIMIT;
+    siridb->tz = -1;
+    siridb->server = NULL;
+    siridb->replica = NULL;
+    siridb->fifo = NULL;
+    siridb->replicate = NULL;
+    siridb->reindex = NULL;
+    siridb->groups = NULL;
+    siridb->dropped_fp = NULL;
+    siridb->store = NULL;
+
+    siridb->series = ct_new();
+    if (siridb->series == NULL)
     {
-        siridb->series = ct_new();
-        if (siridb->series == NULL)
-        {
-            ERR_ALLOC
-            free(siridb);
-            siridb = NULL;
-        }
-        else
-        {
-            siridb->series_map = imap_new();
-            if (siridb->series_map == NULL)
-            {
-                ct_free(siridb->series, NULL);
-                free(siridb);
-                siridb = NULL;
-                ERR_ALLOC
-            }
-            else
-            {
-                siridb->shards = imap_new();
-                if (siridb->shards == NULL)
-                {
-                    imap_free(siridb->series_map, NULL);
-                    ct_free(siridb->series, NULL);
-                    free(siridb);
-                    siridb = NULL;
-                    ERR_ALLOC
-
-                }
-                else
-                {
-                    /* allocate a buffer */
-                    siridb->buffer = siridb_buffer_new();
-                    if (siridb->buffer == NULL)
-                    {
-                        imap_free(siridb->shards, NULL);
-                        imap_free(siridb->series_map, NULL);
-                        ct_free(siridb->series, NULL);
-                        free(siridb);
-                        siridb = NULL;
-                        ERR_ALLOC
-                    }
-                    else
-                    {
-                        siridb->dbname = NULL;
-                        siridb->dbpath = NULL;
-                        siridb->ref = 1;
-                        siridb->insert_tasks = 0;
-                        siridb->flags = 0;
-                        siridb->time = NULL;
-                        siridb->users = NULL;
-                        siridb->servers = NULL;
-                        siridb->pools = NULL;
-                        siridb->max_series_id = 0;
-                        siridb->received_points = 0;
-                        siridb->selected_points = 0;
-                        siridb->drop_threshold = DEF_DROP_THRESHOLD;
-                        siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT;
-                        siridb->list_limit = DEF_LIST_LIMIT;
-                        siridb->tz = -1;
-                        siridb->server = NULL;
-                        siridb->replica = NULL;
-                        siridb->fifo = NULL;
-                        siridb->replicate = NULL;
-                        siridb->reindex = NULL;
-                        siridb->groups = NULL;
-
-                        /* make file pointers are NULL when file is closed */
-                        siridb->dropped_fp = NULL;
-                        siridb->store = NULL;
-
-                        uv_mutex_init(&siridb->series_mutex);
-                        uv_mutex_init(&siridb->shards_mutex);
-                    }
-                }
-            }
-        }
+        goto fail0;
+    }
+
+    siridb->series_map = imap_new();
+    if (siridb->series_map == NULL)
+    {
+        goto fail1;
     }
+    siridb->shards = imap_new();
+    if (siridb->shards == NULL)
+    {
+        goto fail2;
+    }
+    /* allocate a buffer */
+    siridb->buffer = siridb_buffer_new();
+    if (siridb->buffer == NULL)
+    {
+        goto fail3;
+    }
+
+    /* allocate tee */
+    siridb->tee = siridb_tee_new();
+    if (siridb->tee == NULL)
+    {
+        goto fail4;
+    }
+
+    uv_mutex_init(&siridb->series_mutex);
+    uv_mutex_init(&siridb->shards_mutex);
+
     return siridb;
+
+fail4:
+    siridb_buffer_free(siridb->buffer);
+fail3:
+    imap_free(siridb->shards, NULL);
+fail2:
+    imap_free(siridb->series_map, NULL);
+fail1:
+    ct_free(siridb->series, NULL);
+fail0:
+    free(siridb);
+    ERR_ALLOC
+    return NULL;
 }
 
 static siridb_t * siridb__from_dat(const char * dbpath)
index 6c9c4551a3d156a0ff023fe9a96a82311cd0488e..920d0b39571573a3a21e4cf05de4c27fa561775d 100644 (file)
@@ -284,16 +284,14 @@ int insert_init_backend_local(
         sirinet_pkg_t * pkg,
         uint8_t flags)
 {
-    sirinet_promise_t * promise =
-            (sirinet_promise_t *) malloc(sizeof(sirinet_promise_t));
+    sirinet_promise_t * promise = malloc(sizeof(sirinet_promise_t));
     if (promise == NULL)
     {
         ERR_ALLOC
         return -1;
     }
 
-    siridb_insert_local_t * ilocal =
-            (siridb_insert_local_t *) malloc(sizeof(siridb_insert_local_t));
+    siridb_insert_local_t * ilocal = malloc(sizeof(siridb_insert_local_t));
     if (ilocal == NULL)
     {
         free(promise);
@@ -301,7 +299,7 @@ int insert_init_backend_local(
         return -1;
     }
 
-    uv_async_t * handle = (uv_async_t *) malloc(sizeof(uv_async_t));
+    uv_async_t * handle = malloc(sizeof(uv_async_t));
     if (handle == NULL)
     {
         free(promise);
@@ -347,6 +345,11 @@ int insert_init_backend_local(
     siridb_tasks_inc(siridb->tasks);
     siridb->insert_tasks++;
 
+    if (siridb_tee_is_connected(siridb->tee))
+    {
+        siridb_tee_write(siridb->tee, promise);
+    }
+
     uv_async_init(siri.loop, handle, INSERT_local_task);
     uv_async_send(handle);
 
index 70ab542b8f3b2735ab24e6559fd4aaafa12c53e0..bc3ead7c8f31fa5ca6a711e9e467b708dc4ccecb 100644 (file)
@@ -154,8 +154,12 @@ if (IS_MASTER && siridb_is_reindexing(siridb))                              \
     "Successfully dropped server '%s'."
 #define MSG_SUCCES_SET_LOG_LEVEL_MULTI \
     "Successfully set log level to '%s' on %lu servers."
+#define MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI \
+    "Successfully set tee_pipe name on %lu servers."
 #define MSG_SUCCES_SET_LOG_LEVEL \
     "Successfully set log level to '%s' on '%s'."
+#define MSG_SUCCES_SET_TEE_PIPE_NAME \
+    "Successfully set tee pipe name to '%s' on '%s'."
 #define MSG_SUCCESS_SET_SELECT_POINTS_LIMIT \
     "Successfully changed select points limit from %" PRIu32 " to %" PRIu32 "."
 #define MSG_SUCCES_DROP_SERIES \
@@ -245,6 +249,7 @@ static void exit_set_list_limit(uv_async_t * handle);
 static void exit_set_log_level(uv_async_t * handle);
 static void exit_set_port(uv_async_t * handle);
 static void exit_set_select_points_limit(uv_async_t * handle);
+static void exit_set_tee_pipe_name(uv_async_t * handle);
 static void exit_set_timezone(uv_async_t * handle);
 static void exit_show_stmt(uv_async_t * handle);
 static void exit_timeit_stmt(uv_async_t * handle);
@@ -477,6 +482,7 @@ void siridb_init_listener(void)
     siridb_listen_exit[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level;
     siridb_listen_exit[CLERI_GID_SET_PORT] = exit_set_port;
     siridb_listen_exit[CLERI_GID_SET_SELECT_POINTS_LIMIT] = exit_set_select_points_limit;
+    siridb_listen_exit[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name;
     siridb_listen_exit[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
     siridb_listen_exit[CLERI_GID_SHOW_STMT] = exit_show_stmt;
     siridb_listen_exit[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt;
@@ -4016,6 +4022,125 @@ static void exit_set_select_points_limit(uv_async_t * handle)
     }
 }
 
+static void exit_set_tee_pipe_name(uv_async_t * handle)
+{
+    siridb_query_t * query = (siridb_query_t *) handle->data;
+    query_alter_t * q_alter = (query_alter_t *) query->data;
+    siridb_t * siridb = query->client->siridb;
+
+    assert (query->data != NULL);
+
+    cleri_node_t * node =
+            query->nodes->node->children->next->next->node->children->node;
+
+    char pipe_name[node->len - 1];
+    xstr_extract_string(pipe_name, node->str, node->len);
+
+    if (q_alter->alter_tp == QUERY_ALTER_SERVERS)
+    {
+        /*
+         * alter_servers
+         */
+        cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr;
+        siridb_server_walker_t wserver = {
+            .server=siridb->server,
+            .siridb=siridb
+        };
+
+        if (where_expr == NULL || cexpr_run(
+                where_expr,
+                (cexpr_cb_t) siridb_server_cexpr_cb,
+                &wserver))
+        {
+            siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+            q_alter->n++;
+        }
+
+        if (IS_MASTER)
+        {
+            /*
+             * This is a trick because we share with setting log level on
+             * multiple servers at once.
+             */
+            q_alter->n += LOGGER_NUM_LEVELS << 16;
+            siridb_query_forward(
+                    handle,
+                    SIRIDB_QUERY_FWD_SERVERS,
+                    (sirinet_promises_cb) on_alter_xxx_response,
+                    0);
+        }
+        else
+        {
+            qp_add_raw(query->packer, (const unsigned char *) "servers", 7);
+            qp_add_int64(query->packer, q_alter->n);
+            SIRIPARSER_ASYNC_NEXT_NODE
+        }
+    }
+    else
+    {
+        /*
+         * alter_server
+         *
+         * we can set the success message, we just ignore the message in case
+         * an error occurs.
+         */
+        siridb_server_t * server = q_alter->via.server;
+
+        QP_ADD_SUCCESS
+        qp_add_fmt_safe(query->packer,
+                    MSG_SUCCES_SET_TEE_PIPE_NAME,
+                    pipe_name,
+                    server->name);
+
+        if (server == siridb->server)
+        {
+            (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+
+            SIRIPARSER_ASYNC_NEXT_NODE
+        }
+        else
+        {
+
+            if (siridb_server_is_online(server))
+            {
+                sirinet_pkg_t * pkg = sirinet_pkg_new(
+                        0,
+                        strlen(pipe_name),
+                        BPROTO_TEE_PIPE_NAME_UPDATE,
+                        (unsigned char *) pipe_name);
+                if (pkg != NULL)
+                {
+                    /* handle will be bound to a timer so we should increment */
+                    siri_async_incref(handle);
+                    if (siridb_server_send_pkg(
+                            server,
+                            pkg,
+                            0,
+                            (sirinet_promise_cb) on_ack_response,
+                            handle,
+                            0))
+                    {
+                        /*
+                         * signal is raised and 'on_ack_response' will not be
+                         * called
+                         */
+                        free(pkg);
+                        siri_async_decref(&handle);
+                    }
+                }
+            }
+            else
+            {
+                snprintf(query->err_msg,
+                        SIRIDB_MAX_SIZE_ERR_MSG,
+                        "Cannot set pipe name, '%s' is currently unavailable",
+                        server->name);
+                siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            }
+        }
+    }
+}
+
 static void exit_set_timezone(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
@@ -4951,7 +5076,9 @@ static void on_ack_response(
             case BPROTO_ACK_DISABLE_BACKUP_MODE:
                 /* success message is already set */
                 break;
-
+            case BPROTO_ACK_TEE_PIPE_NAME:
+                /* success message is already set */
+                break;
             default:
                 status = PROMISE_PKG_TYPE_ERROR;
                 break;
@@ -5029,19 +5156,32 @@ static void on_alter_xxx_response(vec_t * promises, uv_async_t * handle)
     }
     /*
      * Note: since this function has the sole purpose for alter servers
-     *       and setting log levels, we now simply ad the message here.
+     *       and setting log levels or pipe name, we now simply add the
+     *       message here.
      */
     QP_ADD_SUCCESS
 
-    log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
-            logger_level_name(q_alter->n >> 16),
-            q_alter->n & 0xffff);
+    if ((q_alter->n >> 16) >= LOGGER_NUM_LEVELS)
+    {
+        log_info(MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, q_alter->n & 0xffff);
+        qp_add_fmt_safe(
+                query->packer,
+                MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI,
+                q_alter->n & 0xffff);
+    }
+    else
+    {
+        log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+                logger_level_name(q_alter->n >> 16),
+                q_alter->n & 0xffff);
+
+        qp_add_fmt_safe(
+                query->packer,
+                MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+                logger_level_name(q_alter->n >> 16),
+                q_alter->n & 0xffff);
+    }
 
-    qp_add_fmt_safe(
-            query->packer,
-            MSG_SUCCES_SET_LOG_LEVEL_MULTI,
-            logger_level_name(q_alter->n >> 16),
-            q_alter->n & 0xffff);
 
     SIRIPARSER_ASYNC_NEXT_NODE
 }
index 596a3e1353ea591cb03cdb15d3f85a58326404c5..5ac92a5cb153bd6932543ba62e0cd6908b4d7c51 100644 (file)
@@ -10,6 +10,7 @@
 #include <siri/db/time.h>
 #include <siri/grammar/grammar.h>
 #include <siri/db/fifo.h>
+#include <siri/db/tee.h>
 #include <siri/net/tcp.h>
 #include <siri/siri.h>
 #include <siri/version.h>
@@ -139,6 +140,10 @@ static void prop_sync_progress(
         siridb_t * siridb,
         qp_packer_t * packer,
         int map);
+static void prop_tee_pipe_name(
+        siridb_t * siridb,
+        qp_packer_t * packer,
+        int map);
 static void prop_timezone(
         siridb_t * siridb,
         qp_packer_t * packer,
@@ -231,6 +236,8 @@ void siridb_init_props(void)
             prop_status;
     siridb_props[CLERI_GID_K_SYNC_PROGRESS - KW_OFFSET] =
             prop_sync_progress;
+    siridb_props[CLERI_GID_K_TEE_PIPE_NAME - KW_OFFSET] =
+            prop_tee_pipe_name;
     siridb_props[CLERI_GID_K_TIMEZONE - KW_OFFSET] =
             prop_timezone;
     siridb_props[CLERI_GID_K_TIME_PRECISION - KW_OFFSET] =
@@ -499,6 +506,15 @@ static void prop_sync_progress(
     qp_add_string(packer, siridb_initsync_sync_progress(siridb));
 }
 
+static void prop_tee_pipe_name(
+        siridb_t * siridb,
+        qp_packer_t * packer,
+        int map)
+{
+    SIRIDB_PROP_MAP("tee_pipe_name", 13)
+    qp_add_string(packer, tee_str(siridb->tee));
+}
+
 static void prop_timezone(
         siridb_t * siridb,
         qp_packer_t * packer,
index 8272072d82c184da54d246fb39cf089b15755da8..800f7434d9761aa6ec87bdd086cf8c1739977672 100644 (file)
@@ -8,6 +8,7 @@
 #include <siri/db/server.h>
 #include <siri/db/servers.h>
 #include <siri/db/fifo.h>
+#include <siri/db/tee.h>
 #include <siri/err.h>
 #include <siri/net/promise.h>
 #include <siri/net/stream.h>
@@ -479,8 +480,7 @@ static int SERVER_resolve_dns(
     hints.ai_protocol = IPPROTO_TCP;
     hints.ai_flags = AI_NUMERICSERV;
 
-    uv_getaddrinfo_t * resolver =
-            (uv_getaddrinfo_t *) malloc(sizeof(uv_getaddrinfo_t));
+    uv_getaddrinfo_t * resolver = malloc(sizeof(uv_getaddrinfo_t));
 
     if (resolver == NULL)
     {
@@ -519,7 +519,7 @@ static void SERVER_on_resolved(
         int status,
         struct addrinfo * res)
 {
-    siridb_server_t * server = (siridb_server_t *) resolver->data;
+    siridb_server_t * server = resolver->data;
 
     if (status < 0)
     {
@@ -1173,6 +1173,12 @@ int siridb_server_cexpr_cb(
                 cond->operator,
                 siridb_initsync_sync_progress(wserver->siridb),
                 cond->str);
+
+    case CLERI_GID_K_TEE_PIPE_NAME:
+        return cexpr_str_cmp(
+                cond->operator,
+                tee_str(wserver->siridb->tee),
+                cond->str);
     }
 
     log_critical("Unexpected server property received: %d", cond->prop);
index 065d6dc7c2612c591aa750c912e16d1c1b046686..56d26a8fb5c0caea8286a427c79eab6321f69386 100644 (file)
@@ -11,6 +11,7 @@
 #include <siri/db/server.h>
 #include <siri/db/servers.h>
 #include <siri/db/misc.h>
+#include <siri/db/tee.h>
 #include <siri/err.h>
 #include <siri/net/promises.h>
 #include <siri/net/tcp.h>
@@ -697,6 +698,9 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle)
         case CLERI_GID_K_SYNC_PROGRESS:
             qp_add_string(query->packer, siridb_initsync_sync_progress(siridb));
             break;
+        case CLERI_GID_K_TEE_PIPE_NAME:
+            qp_add_string(query->packer, tee_str(siridb->tee));
+            break;
         case CLERI_GID_K_UPTIME:
             qp_add_int64(
                     query->packer,
diff --git a/src/siri/db/tee.c b/src/siri/db/tee.c
new file mode 100644 (file)
index 0000000..53070f2
--- /dev/null
@@ -0,0 +1,223 @@
+/*
+ * tee.c - To tee the data for a SiriDB database.
+ */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <assert.h>
+#include <siri/db/tee.h>
+#include <siri/siri.h>
+#include <siri/net/pipe.h>
+#include <logger/logger.h>
+
+#define TEE__BUF_SZ 512
+static char tee__buf[TEE__BUF_SZ];
+
+static void tee__runtime_init(uv_pipe_t * pipe);
+static void tee__write_cb(uv_write_t * req, int status);
+static void tee__on_connect(uv_connect_t * req, int status);
+static void tee__alloc_buffer(
+    uv_handle_t * handle,
+    size_t suggsz,
+    uv_buf_t * buf);
+static void tee__on_data(
+    uv_stream_t * client,
+    ssize_t nread,
+    const uv_buf_t * buf);
+
+siridb_tee_t * siridb_tee_new(void)
+{
+    siridb_tee_t * tee = malloc(sizeof(siridb_tee_t));
+    if (tee == NULL)
+    {
+        return NULL;
+    }
+    tee->pipe_name_ = NULL;
+    tee->err_msg_ = NULL;
+    tee->pipe.data = tee;
+    tee->flags = SIRIDB_TEE_FLAG;
+    return tee;
+}
+
+void siridb_tee_free(siridb_tee_t * tee)
+{
+    free(tee->err_msg_);
+    free(tee->pipe_name_);
+    free(tee);
+}
+
+int siridb_tee_connect(siridb_tee_t * tee)
+{
+    uv_connect_t * req = malloc(sizeof(uv_connect_t));
+    if (req == NULL)
+    {
+        return -1;
+    }
+
+    req->data = tee;
+
+    if (uv_pipe_init(siri.loop, &tee->pipe, 0))
+    {
+        return -1;
+    }
+    tee->flags |= SIRIDB_TEE_FLAG_INIT;
+    tee->pipe.data = tee;
+
+    free(tee->err_msg_);
+
+    uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect);
+    return 0;
+}
+
+int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name)
+{
+    free(tee->pipe_name_);
+    tee->pipe_name_ = strdup(pipe_name);
+    if (!tee->pipe_name_)
+    {
+        return -1;
+    }
+    if (tee->flags & SIRIDB_TEE_FLAG_INIT)
+    {
+        uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__runtime_init);
+    }
+    else
+    {
+        tee__runtime_init(&tee->pipe);
+    }
+    return 0;
+}
+
+void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+{
+    uv_write_t * req = malloc(sizeof(uv_write_t));
+    if (!req)
+    {
+        log_error("Cannot allocate memory for tee request");
+        return;
+    }
+
+    req->data = promise;
+    sirinet_promise_incref(promise);
+
+    uv_buf_t wrbuf = uv_buf_init(
+            (char *) promise->pkg,
+            sizeof(sirinet_pkg_t) + promise->pkg->len);
+
+    if (uv_write(req, (uv_stream_t *) &tee->pipe, &wrbuf, 1, tee__write_cb))
+    {
+        log_error("Cannot write to tee");
+        sirinet_promise_decref(promise);
+    }
+}
+
+const char * tee_str(siridb_tee_t * tee)
+{
+    if (tee->err_msg_)
+    {
+        return tee->err_msg_;
+    }
+    if (tee->pipe_name_)
+    {
+        return tee->pipe_name_;
+    }
+    return "disabled";
+}
+
+
+static void tee__runtime_init(uv_pipe_t * pipe)
+{
+    siridb_tee_t * tee = pipe->data;
+
+    tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
+    tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+
+    if (siridb_tee_connect(tee))
+    {
+        log_error("Could not connect to tee at runtime");
+    }
+}
+
+static void tee__write_cb(uv_write_t * req, int status)
+{
+    sirinet_promise_t * promise = req->data;
+    sirinet_promise_decref(promise);
+    if (status)
+    {
+        log_error("Socket (tee) write error: %s", uv_strerror(status));
+    }
+    free(req);
+}
+
+static void tee__on_connect(uv_connect_t * req, int status)
+{
+    siridb_tee_t * tee = req->data;
+
+    if (status == 0)
+    {
+        log_info("Connection created to pipe: '%s'", tee->pipe_name_);
+        if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data))
+        {
+            if (asprintf(&tee->err_msg_,
+                    "Cannot open pipe '%s' for reading",
+                    tee->pipe_name_) >= 0)
+            {
+                log_error(tee->err_msg_);
+            }
+        }
+        else
+        {
+            tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
+        }
+    }
+    else
+    {
+        if (asprintf(
+                &tee->err_msg_,
+                "Cannot connect to pipe '%s' (%s)",
+                tee->pipe_name_,
+                uv_strerror(status)) >= 0)
+        {
+            log_error(tee->err_msg_);
+        }
+    }
+    free(req);
+}
+
+static void tee__alloc_buffer(
+    uv_handle_t * handle __attribute__((unused)),
+    size_t suggsz __attribute__((unused)),
+    uv_buf_t * buf)
+{
+    buf->base = tee__buf;
+    buf->len = TEE__BUF_SZ;
+}
+
+
+
+static void tee__on_data(
+    uv_stream_t * client,
+    ssize_t nread,
+    const uv_buf_t * buf __attribute__((unused)))
+{
+    siridb_tee_t * tee = client->data;
+
+    if (nread < 0)
+    {
+        if (nread != UV_EOF)
+        {
+            log_error("Read error on pipe '%s' : '%s'",
+                sirinet_pipe_name((uv_pipe_t * ) client),
+                uv_err_name(nread));
+        }
+        log_info("Disconnected from tee pipe: '%s'",
+            sirinet_pipe_name((uv_pipe_t * ) client));
+        tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+        uv_close((uv_handle_t *) client, NULL);
+    }
+
+    if (nread > 0)
+    {
+        log_debug("Got %zd bytes on tee which will be ignored", nread);
+    }
+}
index cdc883eb52fed8db0b435c997a8429eb658892a2..0f14df216dfb46c18003ba33c7d751b65d0bd9de 100644 (file)
@@ -5,7 +5,7 @@
  * should be used with the libcleri module.
  *
  * Source class: SiriGrammar
- * Created at: 2018-07-05 16:20:26
+ * Created at: 2018-10-29 10:52:57
  */
 
 #include "siri/grammar/grammar.h"
@@ -17,7 +17,7 @@
 #define CLERI_FIRST_MATCH 0
 #define CLERI_MOST_GREEDY 1
 
-cleri_grammar_t * compile_grammar(void)
+cleri_grammar_t * compile_siri_grammar_grammar(void)
 {
     cleri_t * r_float = cleri_regex(CLERI_GID_R_FLOAT, "^[-+]?[0-9]*\\.?[0-9]+");
     cleri_t * r_integer = cleri_regex(CLERI_GID_R_INTEGER, "^[-+]?[0-9]+");
@@ -160,6 +160,7 @@ cleri_grammar_t * compile_grammar(void)
         cleri_keyword(CLERI_NONE, "symmetric_difference", CLERI_CASE_SENSITIVE)
     );
     cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE);
+    cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE);
     cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE);
     cleri_t * k_timezone = cleri_keyword(CLERI_GID_K_TIMEZONE, "timezone", CLERI_CASE_SENSITIVE);
     cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE);
@@ -302,7 +303,7 @@ cleri_grammar_t * compile_grammar(void)
     cleri_t * server_columns = cleri_list(CLERI_GID_SERVER_COLUMNS, cleri_choice(
         CLERI_NONE,
         CLERI_FIRST_MATCH,
-        28,
+        29,
         k_address,
         k_buffer_path,
         k_buffer_size,
@@ -330,6 +331,7 @@ cleri_grammar_t * compile_grammar(void)
         k_reindex_progress,
         k_selected_points,
         k_sync_progress,
+        k_tee_pipe_name,
         k_uptime
     ), cleri_token(CLERI_NONE, ","), 1, 0, 0);
     cleri_t * group_columns = cleri_list(CLERI_GID_GROUP_COLUMNS, cleri_choice(
@@ -562,7 +564,7 @@ cleri_grammar_t * compile_grammar(void)
                 cleri_choice(
                     CLERI_NONE,
                     CLERI_FIRST_MATCH,
-                    11,
+                    12,
                     k_address,
                     k_buffer_path,
                     k_dbpath,
@@ -573,7 +575,8 @@ cleri_grammar_t * compile_grammar(void)
                     k_version,
                     k_status,
                     k_reindex_progress,
-                    k_sync_progress
+                    k_sync_progress,
+                    k_tee_pipe_name
                 ),
                 str_operator,
                 string
@@ -1044,6 +1047,19 @@ cleri_grammar_t * compile_grammar(void)
         k_address,
         string
     );
+    cleri_t * set_tee_pipe_name = cleri_sequence(
+        CLERI_GID_SET_TEE_PIPE_NAME,
+        3,
+        k_set,
+        k_tee_pipe_name,
+        cleri_choice(
+            CLERI_NONE,
+            CLERI_FIRST_MATCH,
+            2,
+            k_false,
+            string
+        )
+    );
     cleri_t * set_backup_mode = cleri_sequence(
         CLERI_GID_SET_BACKUP_MODE,
         3,
@@ -1156,9 +1172,10 @@ cleri_grammar_t * compile_grammar(void)
         cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
-            4,
+            5,
             set_log_level,
             set_backup_mode,
+            set_tee_pipe_name,
             set_address,
             set_port
         )
@@ -1168,7 +1185,13 @@ cleri_grammar_t * compile_grammar(void)
         3,
         k_servers,
         cleri_optional(CLERI_NONE, where_server),
-        set_log_level
+        cleri_choice(
+            CLERI_NONE,
+            CLERI_FIRST_MATCH,
+            2,
+            set_log_level,
+            set_tee_pipe_name
+        )
     );
     cleri_t * alter_user = cleri_sequence(
         CLERI_GID_ALTER_USER,
@@ -1484,7 +1507,7 @@ cleri_grammar_t * compile_grammar(void)
         cleri_list(CLERI_NONE, cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
-            34,
+            35,
             k_active_handles,
             k_active_tasks,
             k_buffer_path,
@@ -1513,6 +1536,7 @@ cleri_grammar_t * compile_grammar(void)
             k_startup_time,
             k_status,
             k_sync_progress,
+            k_tee_pipe_name,
             k_time_precision,
             k_timezone,
             k_uptime,
index 7c659f8bbcce548333cce34325ad4b1ca24d1008..f4ff0dce50e1a80b396b7affbdb4ba820cf23b7e 100644 (file)
@@ -59,6 +59,12 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused)))
     {
         siridb = (siridb_t *) siridb_node->data;
 
+        if (    siridb_tee_is_configured(siridb->tee) &&
+                !siridb_tee_is_connected(siridb->tee))
+        {
+            siridb_tee_connect(siridb->tee);
+        }
+
         server_node = siridb->servers->first;
         while (server_node != NULL)
         {
index f2a36b862f9ac2cc9a8821a512863a8c9462c43b..816687ff658e9b35b6ac42c4ee3eda9bb3cf3702 100644 (file)
@@ -43,6 +43,9 @@ static void on_flags_update(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_log_level_update(
         sirinet_stream_t * client,
         sirinet_pkg_t * pkg);
+static void on_tee_pipe_name_update(
+        sirinet_stream_t * client,
+        sirinet_pkg_t * pkg);
 static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_query(
         sirinet_stream_t * client,
@@ -273,6 +276,9 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg)
     case BPROTO_DISABLE_BACKUP_MODE:
         on_disable_backup_mode(client, pkg);
         break;
+    case BPROTO_TEE_PIPE_NAME_UPDATE:
+        on_tee_pipe_name_update(client, pkg);
+        break;
     }
 
 }
@@ -428,6 +434,27 @@ static void on_log_level_update(sirinet_stream_t * client, sirinet_pkg_t * pkg)
     }
 }
 
+static void on_tee_pipe_name_update(
+        sirinet_stream_t * client,
+        sirinet_pkg_t * pkg)
+{
+    SERVER_CHECK_AUTHENTICATED(client, server);
+    siridb_t * siridb = client->siridb;
+    sirinet_pkg_t * package;
+    char * pipe_name = strndup((const char *) pkg->data, pkg->len);
+    if (pipe_name != NULL)
+    {
+        (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+    }
+
+    package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL);
+    if (package != NULL)
+    {
+        /* ignore result code, signal can be raised */
+        sirinet_pkg_send(client, package);
+    }
+}
+
 static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg)
 {
     SERVER_CHECK_AUTHENTICATED(client, server)
index 8ca2d40562e23b681329990cc03e81cd2d9d99e2..55b028b8cffefc1405d38dfd0f2715bd4e195288 100644 (file)
@@ -32,7 +32,7 @@ char * sirinet_pipe_name(uv_pipe_t * client)
 }
 
 /*
- * Cleanup socket (pipe) file. (Unused)
+ * Cleanup socket (pipe) file. (UNUSED)
  */
 void sirinet_pipe_unlink(uv_pipe_t * client)
 {
index 883c3c9d3d3334d65a96ae640e57da1ed06e95cf..c94ee780b1603efac9ee513698ef6883fd748f26 100644 (file)
@@ -135,7 +135,7 @@ sirinet_pkg_t * sirinet_pkg_err(
  */
 int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg)
 {
-    uv_write_t * req = (uv_write_t *) malloc(sizeof(uv_write_t));
+    uv_write_t * req = malloc(sizeof(uv_write_t));
 
     if (req == NULL)
     {
@@ -144,7 +144,7 @@ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg)
         return -1;
     }
 
-    pkg_send_t * data = (pkg_send_t *) malloc(sizeof(pkg_send_t));
+    pkg_send_t * data = malloc(sizeof(pkg_send_t));
 
     if (data == NULL)
     {
@@ -168,7 +168,13 @@ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg)
             (char *) pkg,
             sizeof(sirinet_pkg_t) + pkg->len);
 
-    uv_write(req, client->stream, &wrbuf, 1, PKG_write_cb);
+    if (uv_write(req, client->stream, &wrbuf, 1, PKG_write_cb))
+    {
+        sirinet_stream_decref(data->client);
+        free(pkg);
+        free(req);
+        return -1;
+    }
 
     return 0;
 }
@@ -180,7 +186,7 @@ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg)
 sirinet_pkg_t * sirinet_pkg_dup(sirinet_pkg_t * pkg)
 {
     size_t size = sizeof(sirinet_pkg_t) + pkg->len;
-    sirinet_pkg_t * dup = (sirinet_pkg_t *) malloc(size);
+    sirinet_pkg_t * dup = malloc(size);
     if (dup == NULL)
     {
         ERR_ALLOC
index 856948ced95dff07e2a8c494d78467a7b16f82e8..b9b6122bf6b8da26cd10eaa7ff9c2a1761a70173 100644 (file)
@@ -84,6 +84,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n)
     case BPROTO_REQ_GROUPS: return "BPROTO_REQ_GROUPS";
     case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE";
     case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
+    case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
     default:
         sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
         return protocol_str;
@@ -119,6 +120,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n)
     case BPROTO_ACK_ENABLE_BACKUP_MODE: return "BPROTO_ACK_ENABLE_BACKUP_MODE";
     case BPROTO_ACK_DISABLE_BACKUP_MODE: return "BPROTO_ACK_DISABLE_BACKUP_MODE";
     case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS";
+    case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME";
     default:
         sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
         return protocol_str;
index 30f1d293c310cd14a2ff14b7ccb40cd01fae08b6..2c522775e30b068a7a792e61d1a8a6c860bc6dde 100644 (file)
@@ -85,7 +85,7 @@ sirinet_stream_t * sirinet_stream_new(sirinet_stream_tp_t tp, on_data_cb_t cb)
  */
 char * sirinet_stream_name(sirinet_stream_t * client)
 {
-    switch (client->tp)
+    switch ((sirinet_stream_tp_t) client->tp)
     {
     case STREAM_TCP_CLIENT:
     case STREAM_TCP_BACKEND:
@@ -242,7 +242,7 @@ void sirinet__stream_free(uv_stream_t * uvclient)
 {
     sirinet_stream_t * client = uvclient->data;
 
-    switch (client->tp)
+    switch ((sirinet_stream_tp_t) client->tp)
     {
     case STREAM_PIPE_CLIENT:
     case STREAM_TCP_CLIENT:  /* listens to client connections  */
index 11e42256ab8781129d70448a07b6ac2fd738101e..4cdd17749f637a44242217cad200ad71cde93e04 100644 (file)
@@ -142,7 +142,7 @@ int siri_start(void)
     siridb_init_aggregates();
 
     /* load SiriDB grammar */
-    siri.grammar = compile_grammar();
+    siri.grammar = compile_siri_grammar_grammar();
 
     /* create store for SiriDB instances */
     siri.siridb_list = llist_new();
@@ -495,13 +495,16 @@ static void SIRI_walk_close_handlers(
 
     case UV_TCP:
     case UV_NAMED_PIPE:
-        if (handle->data == NULL)
         {
-            uv_close(handle, NULL);
-        }
-        else
-        {
-            sirinet_stream_decref((sirinet_stream_t *) handle->data);
+            sirinet_stream_t * stream = handle->data;
+            if (stream == NULL || (stream->tp & SIRIDB_TEE_FLAG))
+            {
+                uv_close(handle, NULL);
+            }
+            else
+            {
+                sirinet_stream_decref(stream);
+            }
         }
         break;