../src/siri/db/shard.c \
../src/siri/db/shards.c \
../src/siri/db/tasks.c \
+../src/siri/db/tee.c \
../src/siri/db/time.c \
../src/siri/db/user.c \
../src/siri/db/users.c \
./src/siri/db/shard.o \
./src/siri/db/shards.o \
./src/siri/db/tasks.o \
+./src/siri/db/tee.o \
./src/siri/db/time.o \
./src/siri/db/user.o \
./src/siri/db/users.o \
./src/siri/db/shard.d \
./src/siri/db/shards.d \
./src/siri/db/tasks.d \
+./src/siri/db/tee.d \
./src/siri/db/time.d \
./src/siri/db/user.d \
./src/siri/db/users.d \
../src/siri/db/shard.c \
../src/siri/db/shards.c \
../src/siri/db/tasks.c \
+../src/siri/db/tee.c \
../src/siri/db/time.c \
../src/siri/db/user.c \
../src/siri/db/users.c \
./src/siri/db/shard.o \
./src/siri/db/shards.o \
./src/siri/db/tasks.o \
+./src/siri/db/tee.o \
./src/siri/db/time.o \
./src/siri/db/user.o \
./src/siri/db/users.o \
./src/siri/db/shard.d \
./src/siri/db/shards.d \
./src/siri/db/tasks.d \
+./src/siri/db/tee.d \
./src/siri/db/time.d \
./src/siri/db/user.d \
./src/siri/db/users.d \
Author: Jeroen van der Heijden (Transceptor Technology)
Date: 2016-10-10
'''
-# import sys
-# sys.path.insert(0, '../../pyleri/')
+import sys
+sys.path.insert(0, '../../pyleri/')
import os
from grammar import siri_grammar
from pyleri import Grammar
Keyword('symmetric_difference'),
most_greedy=False)
k_sync_progress = Keyword('sync_progress')
+ k_tee_pipe_name = Keyword('tee_pipe_name')
k_timeit = Keyword('timeit')
k_timezone = Keyword('timezone')
k_time_precision = Keyword('time_precision')
k_reindex_progress,
k_selected_points,
k_sync_progress,
+ k_tee_pipe_name,
k_uptime,
most_greedy=False), ',', 1)
k_status,
k_reindex_progress,
k_sync_progress,
+ k_tee_pipe_name,
most_greedy=False), str_operator, string),
Sequence(k_online, bool_operator, _boolean),
Sequence(k_log_level, int_operator, log_keywords),
Optional(Sequence(k_using, aggregate_functions)))
set_address = Sequence(k_set, k_address, string)
+ set_tee_pipe_name = Sequence(k_set, k_tee_pipe_name, Choice(
+ k_false,
+ string,
+ most_greedy=False))
set_backup_mode = Sequence(k_set, k_backup_mode, _boolean)
set_drop_threshold = Sequence(k_set, k_drop_threshold, r_float)
set_expression = Sequence(k_set, k_expression, r_regex)
alter_server = Sequence(k_server, uuid, Choice(
set_log_level,
set_backup_mode,
+ set_tee_pipe_name,
set_address,
set_port,
most_greedy=False))
- alter_servers = Sequence(k_servers, Optional(where_server), set_log_level)
+ alter_servers = Sequence(k_servers, Optional(where_server), Choice(
+ set_log_level,
+ set_tee_pipe_name,
+ most_greedy=False))
alter_user = Sequence(k_user, string, Choice(
set_password,
k_startup_time,
k_status,
k_sync_progress,
+ k_tee_pipe_name,
k_time_precision,
k_timezone,
k_uptime,
#define SIRIDB_MAX_SIZE_ERR_MSG 1024
#define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */
-#define SIRIDB_SCHEMA 4
+#define SIRIDB_SCHEMA 5
#define SIRIDB_FLAG_REINDEXING 1
#define DEF_DROP_THRESHOLD 1.0 /* 100% */
#include <siri/db/tasks.h>
#include <siri/db/time.h>
#include <siri/db/buffer.h>
+#include <siri/db/tee.h>
+
int32_t siridb_get_uptime(siridb_t * siridb);
int8_t siridb_get_idle_percentage(siridb_t * siridb);
siridb_reindex_t * reindex;
siridb_groups_t * groups;
siridb_buffer_t * buffer;
+ siridb_tee_t * tee;
siridb_tasks_t tasks;
};
--- /dev/null
+/*
+ * tee.h - To tee the data for a SiriDB database.
+ */
+#ifndef SIRIDB_TEE_H_
+#define SIRIDB_TEE_H_
+
+typedef struct siridb_tee_s siridb_tee_t;
+
+enum
+{
+ SIRIDB_TEE_FLAG_INIT = 1<<0,
+ SIRIDB_TEE_FLAG_CONNECTED = 1<<1,
+ SIRIDB_TEE_FLAG = 1<<31,
+};
+
+#include <uv.h>
+#include <stdbool.h>
+#include <siri/net/promise.h>
+
+siridb_tee_t * siridb_tee_new(void);
+void siridb_tee_free(siridb_tee_t * tee);
+int siridb_tee_connect(siridb_tee_t * tee);
+int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name);
+void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise);
+const char * tee_str(siridb_tee_t * tee);
+static inline _Bool siridb_tee_is_configured(siridb_tee_t * tee);
+static inline _Bool siridb_tee_is_connected(siridb_tee_t * tee);
+
+struct siridb_tee_s
+{
+ uint32_t flags; /* maps to sirnet_stream_t tp for cleanup */
+ char * pipe_name_;
+ char * err_msg_;
+ uv_pipe_t pipe;
+};
+
+static inline _Bool siridb_tee_is_configured(siridb_tee_t * tee)
+{
+ return tee->pipe_name_ != NULL;
+};
+
+static inline _Bool siridb_tee_is_connected(siridb_tee_t * tee)
+{
+ return tee->flags & SIRIDB_TEE_FLAG_CONNECTED;
+}
+
+#endif /* SIRIDB_TEE_H_ */
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2018-07-05 16:20:26
+ * Created at: 2018-10-29 10:52:57
*/
#ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
#define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
#include <cleri/cleri.h>
-cleri_grammar_t * compile_grammar(void);
+cleri_grammar_t * compile_siri_grammar_grammar(void);
enum cleri_grammar_ids {
- CLERI_NONE, /* used for objects with no name */
+ CLERI_NONE, // used for objects with no name
CLERI_GID_ACCESS_EXPR,
CLERI_GID_ACCESS_KEYWORDS,
CLERI_GID_AFTER_EXPR,
CLERI_GID_K_SUM,
CLERI_GID_K_SYMMETRIC_DIFFERENCE,
CLERI_GID_K_SYNC_PROGRESS,
+ CLERI_GID_K_TEE_PIPE_NAME,
CLERI_GID_K_TIMEIT,
CLERI_GID_K_TIMEZONE,
CLERI_GID_K_TIME_PRECISION,
CLERI_GID_SET_PASSWORD,
CLERI_GID_SET_PORT,
CLERI_GID_SET_SELECT_POINTS_LIMIT,
+ CLERI_GID_SET_TEE_PIPE_NAME,
CLERI_GID_SET_TIMEZONE,
CLERI_GID_SHARD_COLUMNS,
CLERI_GID_SHOW_STMT,
CLERI_GID_WHERE_SHARD,
CLERI_GID_WHERE_USER,
CLERI_GID__BOOLEAN,
- CLERI_END /* can be used to get the enum length */
+ CLERI_END // can be used to get the enum length
};
#endif /* CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ */
const char * sirinet_promise_strstatus(sirinet_promise_status_t status);
-#define sirinet_promise_incref(promise) promise->ref++
-#define sirinet_promise_decref(promise) if (!--promise->ref) free(promise)
+#define sirinet_promise_incref(p__) (p__)->ref++
+#define sirinet_promise_decref(p__) if (!--(p__)->ref) free(p__)
/* the callback will always be called and is responsible to free the promise */
struct sirinet_promise_s
BPROTO_REQ_GROUPS, /* empty */
BPROTO_ENABLE_BACKUP_MODE, /* empty */
BPROTO_DISABLE_BACKUP_MODE, /* empty */
+ BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */
} bproto_client_t;
/*
BPROTO_ACK_DROP_SERIES, /* empty */
BPROTO_ACK_ENABLE_BACKUP_MODE, /* empty */
BPROTO_ACK_DISABLE_BACKUP_MODE, /* empty */
- BPROTO_RES_GROUPS /* [[name, series], ...] */
+ BPROTO_RES_GROUPS, /* [[name, series], ...] */
+ BPROTO_ACK_TEE_PIPE_NAME /* empty */
} bproto_server_t;
struct sirinet_stream_s
{
- sirinet_stream_tp_t tp;
+ uint32_t tp; /* maps to siridb_tee_t flags for cleanup */
uint32_t ref;
on_data_cb_t on_data;
siridb_t * siridb;
heartbeat_interval = 30
#
-# SiriDB can run fsync on the buffer file on an interval in milliseconds.
-# This value is set to 0 by default which tells SiriDB to run fsync after
-# each insert request. When having many insert requests per second, it can be
+# SiriDB can run fsync on the buffer file on an interval in milliseconds.
+# This value is set to 0 by default which tells SiriDB to run fsync after
+# each insert request. When having many insert requests per second, it can be
# useful to use an interval like 500 milliseconds.
#
#buffer_sync_interval = 500
# SiriDB will bind the client named pipe in this location.
#
pipe_client_name = siridb_client.sock
+
*/
static cexpr_condition_t * CEXPR_condition_new(void)
{
- cexpr_condition_t * condition =
- (cexpr_condition_t *) malloc(sizeof(cexpr_condition_t));
+ cexpr_condition_t * condition = malloc(sizeof(cexpr_condition_t));
if (condition != NULL)
{
#include <time.h>
logger_t Logger = {
- .level=10,
+ .level=2,
.level_name=NULL,
.ostream=NULL,
.flags=0
}
else
{
- unpacker = (qp_unpacker_t *) malloc(sizeof(qp_unpacker_t));
+ unpacker = malloc(sizeof(qp_unpacker_t));
if (unpacker == NULL)
{
ERR_ALLOC
}
else
{
- unpacker->source = (unsigned char *) malloc(size);
+ unpacker->source = malloc(size);
if (unpacker->source == NULL)
{
ERR_ALLOC
/* start tasks */
siridb_tasks_init(&siridb->tasks);
+ /* init tee if configured */
+ if (siridb_tee_is_configured(siridb->tee))
+ {
+ siridb_tee_connect(siridb->tee);
+ }
+
log_info("Finished loading database: '%s'", siridb->dbname);
return siridb;
/* check schema */
if ( qp_schema.via.int64 == 1 ||
qp_schema.via.int64 == 2 ||
- qp_schema.via.int64 == 3)
+ qp_schema.via.int64 == 3 ||
+ qp_schema.via.int64 == 4)
{
log_info(
"Found an old database schema (v%d), "
(*siridb)->list_limit = qp_obj.via.int64;
}
+ /* for older schemas we keep the default tee_pipe_name=NULL */
+ if (qp_schema.via.int64 >= 5)
+ {
+ qp_next(unpacker, &qp_obj);
+
+ if (qp_obj.tp == QP_RAW)
+ {
+ (*siridb)->tee->pipe_name_ = strndup(
+ (char *) qp_obj.via.raw,
+ qp_obj.len);
+ READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
+ }
+ else if (qp_obj.tp != QP_NULL)
+ {
+ READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.")
+ }
+ }
+ if ((*siridb)->tee->pipe_name_ == NULL)
+ {
+ log_debug(
+ "No tee pipe name configured for database: %s",
+ (*siridb)->dbname);
+ }
+ else
+ {
+ log_debug(
+ "Using tee pipe name '%s' for database: '%s'",
+ (*siridb)->tee->pipe_name_,
+ (*siridb)->dbname);
+ }
+
return (qp_schema.via.int64 == SIRIDB_SCHEMA) ? 0 : qp_schema.via.int64;
}
qp_fadd_double(fpacker, siridb->drop_threshold) ||
qp_fadd_int64(fpacker, siridb->select_points_limit) ||
qp_fadd_int64(fpacker, siridb->list_limit) ||
+ (siridb->tee->pipe_name_ == NULL
+ ? qp_fadd_type(fpacker, QP_NULL)
+ : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) ||
qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
qp_close(fpacker));
}
siridb_groups_decref(siridb->groups);
}
+ if (siridb->tee != NULL)
+ {
+ siridb_tee_free(siridb->tee);
+ }
+
/* unlock the database in case no siri_err occurred */
if (!siri_err)
{
siridb_t * siridb = (siridb_t *) malloc(sizeof(siridb_t));
if (siridb == NULL)
{
- ERR_ALLOC
+ goto fail0;
}
- else
+
+ siridb->dbname = NULL;
+ siridb->dbpath = NULL;
+ siridb->ref = 1;
+ siridb->insert_tasks = 0;
+ siridb->flags = 0;
+ siridb->time = NULL;
+ siridb->users = NULL;
+ siridb->servers = NULL;
+ siridb->pools = NULL;
+ siridb->max_series_id = 0;
+ siridb->received_points = 0;
+ siridb->selected_points = 0;
+ siridb->drop_threshold = DEF_DROP_THRESHOLD;
+ siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT;
+ siridb->list_limit = DEF_LIST_LIMIT;
+ siridb->tz = -1;
+ siridb->server = NULL;
+ siridb->replica = NULL;
+ siridb->fifo = NULL;
+ siridb->replicate = NULL;
+ siridb->reindex = NULL;
+ siridb->groups = NULL;
+ siridb->dropped_fp = NULL;
+ siridb->store = NULL;
+
+ siridb->series = ct_new();
+ if (siridb->series == NULL)
{
- siridb->series = ct_new();
- if (siridb->series == NULL)
- {
- ERR_ALLOC
- free(siridb);
- siridb = NULL;
- }
- else
- {
- siridb->series_map = imap_new();
- if (siridb->series_map == NULL)
- {
- ct_free(siridb->series, NULL);
- free(siridb);
- siridb = NULL;
- ERR_ALLOC
- }
- else
- {
- siridb->shards = imap_new();
- if (siridb->shards == NULL)
- {
- imap_free(siridb->series_map, NULL);
- ct_free(siridb->series, NULL);
- free(siridb);
- siridb = NULL;
- ERR_ALLOC
-
- }
- else
- {
- /* allocate a buffer */
- siridb->buffer = siridb_buffer_new();
- if (siridb->buffer == NULL)
- {
- imap_free(siridb->shards, NULL);
- imap_free(siridb->series_map, NULL);
- ct_free(siridb->series, NULL);
- free(siridb);
- siridb = NULL;
- ERR_ALLOC
- }
- else
- {
- siridb->dbname = NULL;
- siridb->dbpath = NULL;
- siridb->ref = 1;
- siridb->insert_tasks = 0;
- siridb->flags = 0;
- siridb->time = NULL;
- siridb->users = NULL;
- siridb->servers = NULL;
- siridb->pools = NULL;
- siridb->max_series_id = 0;
- siridb->received_points = 0;
- siridb->selected_points = 0;
- siridb->drop_threshold = DEF_DROP_THRESHOLD;
- siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT;
- siridb->list_limit = DEF_LIST_LIMIT;
- siridb->tz = -1;
- siridb->server = NULL;
- siridb->replica = NULL;
- siridb->fifo = NULL;
- siridb->replicate = NULL;
- siridb->reindex = NULL;
- siridb->groups = NULL;
-
- /* make file pointers are NULL when file is closed */
- siridb->dropped_fp = NULL;
- siridb->store = NULL;
-
- uv_mutex_init(&siridb->series_mutex);
- uv_mutex_init(&siridb->shards_mutex);
- }
- }
- }
- }
+ goto fail0;
+ }
+
+ siridb->series_map = imap_new();
+ if (siridb->series_map == NULL)
+ {
+ goto fail1;
}
+ siridb->shards = imap_new();
+ if (siridb->shards == NULL)
+ {
+ goto fail2;
+ }
+ /* allocate a buffer */
+ siridb->buffer = siridb_buffer_new();
+ if (siridb->buffer == NULL)
+ {
+ goto fail3;
+ }
+
+ /* allocate tee */
+ siridb->tee = siridb_tee_new();
+ if (siridb->tee == NULL)
+ {
+ goto fail4;
+ }
+
+ uv_mutex_init(&siridb->series_mutex);
+ uv_mutex_init(&siridb->shards_mutex);
+
return siridb;
+
+fail4:
+ siridb_buffer_free(siridb->buffer);
+fail3:
+ imap_free(siridb->shards, NULL);
+fail2:
+ imap_free(siridb->series_map, NULL);
+fail1:
+ ct_free(siridb->series, NULL);
+fail0:
+ free(siridb);
+ ERR_ALLOC
+ return NULL;
}
static siridb_t * siridb__from_dat(const char * dbpath)
sirinet_pkg_t * pkg,
uint8_t flags)
{
- sirinet_promise_t * promise =
- (sirinet_promise_t *) malloc(sizeof(sirinet_promise_t));
+ sirinet_promise_t * promise = malloc(sizeof(sirinet_promise_t));
if (promise == NULL)
{
ERR_ALLOC
return -1;
}
- siridb_insert_local_t * ilocal =
- (siridb_insert_local_t *) malloc(sizeof(siridb_insert_local_t));
+ siridb_insert_local_t * ilocal = malloc(sizeof(siridb_insert_local_t));
if (ilocal == NULL)
{
free(promise);
return -1;
}
- uv_async_t * handle = (uv_async_t *) malloc(sizeof(uv_async_t));
+ uv_async_t * handle = malloc(sizeof(uv_async_t));
if (handle == NULL)
{
free(promise);
siridb_tasks_inc(siridb->tasks);
siridb->insert_tasks++;
+ if (siridb_tee_is_connected(siridb->tee))
+ {
+ siridb_tee_write(siridb->tee, promise);
+ }
+
uv_async_init(siri.loop, handle, INSERT_local_task);
uv_async_send(handle);
"Successfully dropped server '%s'."
#define MSG_SUCCES_SET_LOG_LEVEL_MULTI \
"Successfully set log level to '%s' on %lu servers."
+#define MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI \
+ "Successfully set tee_pipe name on %lu servers."
#define MSG_SUCCES_SET_LOG_LEVEL \
"Successfully set log level to '%s' on '%s'."
+#define MSG_SUCCES_SET_TEE_PIPE_NAME \
+ "Successfully set tee pipe name to '%s' on '%s'."
#define MSG_SUCCESS_SET_SELECT_POINTS_LIMIT \
"Successfully changed select points limit from %" PRIu32 " to %" PRIu32 "."
#define MSG_SUCCES_DROP_SERIES \
static void exit_set_log_level(uv_async_t * handle);
static void exit_set_port(uv_async_t * handle);
static void exit_set_select_points_limit(uv_async_t * handle);
+static void exit_set_tee_pipe_name(uv_async_t * handle);
static void exit_set_timezone(uv_async_t * handle);
static void exit_show_stmt(uv_async_t * handle);
static void exit_timeit_stmt(uv_async_t * handle);
siridb_listen_exit[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level;
siridb_listen_exit[CLERI_GID_SET_PORT] = exit_set_port;
siridb_listen_exit[CLERI_GID_SET_SELECT_POINTS_LIMIT] = exit_set_select_points_limit;
+ siridb_listen_exit[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name;
siridb_listen_exit[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
siridb_listen_exit[CLERI_GID_SHOW_STMT] = exit_show_stmt;
siridb_listen_exit[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt;
}
}
+static void exit_set_tee_pipe_name(uv_async_t * handle)
+{
+ siridb_query_t * query = (siridb_query_t *) handle->data;
+ query_alter_t * q_alter = (query_alter_t *) query->data;
+ siridb_t * siridb = query->client->siridb;
+
+ assert (query->data != NULL);
+
+ cleri_node_t * node =
+ query->nodes->node->children->next->next->node->children->node;
+
+ char pipe_name[node->len - 1];
+ xstr_extract_string(pipe_name, node->str, node->len);
+
+ if (q_alter->alter_tp == QUERY_ALTER_SERVERS)
+ {
+ /*
+ * alter_servers
+ */
+ cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr;
+ siridb_server_walker_t wserver = {
+ .server=siridb->server,
+ .siridb=siridb
+ };
+
+ if (where_expr == NULL || cexpr_run(
+ where_expr,
+ (cexpr_cb_t) siridb_server_cexpr_cb,
+ &wserver))
+ {
+ siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ q_alter->n++;
+ }
+
+ if (IS_MASTER)
+ {
+ /*
+ * This is a trick because we share with setting log level on
+ * multiple servers at once.
+ */
+ q_alter->n += LOGGER_NUM_LEVELS << 16;
+ siridb_query_forward(
+ handle,
+ SIRIDB_QUERY_FWD_SERVERS,
+ (sirinet_promises_cb) on_alter_xxx_response,
+ 0);
+ }
+ else
+ {
+ qp_add_raw(query->packer, (const unsigned char *) "servers", 7);
+ qp_add_int64(query->packer, q_alter->n);
+ SIRIPARSER_ASYNC_NEXT_NODE
+ }
+ }
+ else
+ {
+ /*
+ * alter_server
+ *
+ * we can set the success message, we just ignore the message in case
+ * an error occurs.
+ */
+ siridb_server_t * server = q_alter->via.server;
+
+ QP_ADD_SUCCESS
+ qp_add_fmt_safe(query->packer,
+ MSG_SUCCES_SET_TEE_PIPE_NAME,
+ pipe_name,
+ server->name);
+
+ if (server == siridb->server)
+ {
+ (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+
+ SIRIPARSER_ASYNC_NEXT_NODE
+ }
+ else
+ {
+
+ if (siridb_server_is_online(server))
+ {
+ sirinet_pkg_t * pkg = sirinet_pkg_new(
+ 0,
+ strlen(pipe_name),
+ BPROTO_TEE_PIPE_NAME_UPDATE,
+ (unsigned char *) pipe_name);
+ if (pkg != NULL)
+ {
+ /* handle will be bound to a timer so we should increment */
+ siri_async_incref(handle);
+ if (siridb_server_send_pkg(
+ server,
+ pkg,
+ 0,
+ (sirinet_promise_cb) on_ack_response,
+ handle,
+ 0))
+ {
+ /*
+ * signal is raised and 'on_ack_response' will not be
+ * called
+ */
+ free(pkg);
+ siri_async_decref(&handle);
+ }
+ }
+ }
+ else
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Cannot set pipe name, '%s' is currently unavailable",
+ server->name);
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ }
+ }
+ }
+}
+
static void exit_set_timezone(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
case BPROTO_ACK_DISABLE_BACKUP_MODE:
/* success message is already set */
break;
-
+ case BPROTO_ACK_TEE_PIPE_NAME:
+ /* success message is already set */
+ break;
default:
status = PROMISE_PKG_TYPE_ERROR;
break;
}
/*
* Note: since this function has the sole purpose for alter servers
- * and setting log levels, we now simply ad the message here.
+ * and setting log levels or pipe name, we now simply add the
+ * message here.
*/
QP_ADD_SUCCESS
- log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
- logger_level_name(q_alter->n >> 16),
- q_alter->n & 0xffff);
+ if ((q_alter->n >> 16) >= LOGGER_NUM_LEVELS)
+ {
+ log_info(MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, q_alter->n & 0xffff);
+ qp_add_fmt_safe(
+ query->packer,
+ MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI,
+ q_alter->n & 0xffff);
+ }
+ else
+ {
+ log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+ logger_level_name(q_alter->n >> 16),
+ q_alter->n & 0xffff);
+
+ qp_add_fmt_safe(
+ query->packer,
+ MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+ logger_level_name(q_alter->n >> 16),
+ q_alter->n & 0xffff);
+ }
- qp_add_fmt_safe(
- query->packer,
- MSG_SUCCES_SET_LOG_LEVEL_MULTI,
- logger_level_name(q_alter->n >> 16),
- q_alter->n & 0xffff);
SIRIPARSER_ASYNC_NEXT_NODE
}
#include <siri/db/time.h>
#include <siri/grammar/grammar.h>
#include <siri/db/fifo.h>
+#include <siri/db/tee.h>
#include <siri/net/tcp.h>
#include <siri/siri.h>
#include <siri/version.h>
siridb_t * siridb,
qp_packer_t * packer,
int map);
+static void prop_tee_pipe_name(
+ siridb_t * siridb,
+ qp_packer_t * packer,
+ int map);
static void prop_timezone(
siridb_t * siridb,
qp_packer_t * packer,
prop_status;
siridb_props[CLERI_GID_K_SYNC_PROGRESS - KW_OFFSET] =
prop_sync_progress;
+ siridb_props[CLERI_GID_K_TEE_PIPE_NAME - KW_OFFSET] =
+ prop_tee_pipe_name;
siridb_props[CLERI_GID_K_TIMEZONE - KW_OFFSET] =
prop_timezone;
siridb_props[CLERI_GID_K_TIME_PRECISION - KW_OFFSET] =
qp_add_string(packer, siridb_initsync_sync_progress(siridb));
}
+static void prop_tee_pipe_name(
+ siridb_t * siridb,
+ qp_packer_t * packer,
+ int map)
+{
+ SIRIDB_PROP_MAP("tee_pipe_name", 13)
+ qp_add_string(packer, tee_str(siridb->tee));
+}
+
static void prop_timezone(
siridb_t * siridb,
qp_packer_t * packer,
#include <siri/db/server.h>
#include <siri/db/servers.h>
#include <siri/db/fifo.h>
+#include <siri/db/tee.h>
#include <siri/err.h>
#include <siri/net/promise.h>
#include <siri/net/stream.h>
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_NUMERICSERV;
- uv_getaddrinfo_t * resolver =
- (uv_getaddrinfo_t *) malloc(sizeof(uv_getaddrinfo_t));
+ uv_getaddrinfo_t * resolver = malloc(sizeof(uv_getaddrinfo_t));
if (resolver == NULL)
{
int status,
struct addrinfo * res)
{
- siridb_server_t * server = (siridb_server_t *) resolver->data;
+ siridb_server_t * server = resolver->data;
if (status < 0)
{
cond->operator,
siridb_initsync_sync_progress(wserver->siridb),
cond->str);
+
+ case CLERI_GID_K_TEE_PIPE_NAME:
+ return cexpr_str_cmp(
+ cond->operator,
+ tee_str(wserver->siridb->tee),
+ cond->str);
}
log_critical("Unexpected server property received: %d", cond->prop);
#include <siri/db/server.h>
#include <siri/db/servers.h>
#include <siri/db/misc.h>
+#include <siri/db/tee.h>
#include <siri/err.h>
#include <siri/net/promises.h>
#include <siri/net/tcp.h>
case CLERI_GID_K_SYNC_PROGRESS:
qp_add_string(query->packer, siridb_initsync_sync_progress(siridb));
break;
+ case CLERI_GID_K_TEE_PIPE_NAME:
+ qp_add_string(query->packer, tee_str(siridb->tee));
+ break;
case CLERI_GID_K_UPTIME:
qp_add_int64(
query->packer,
--- /dev/null
+/*
+ * tee.c - To tee the data for a SiriDB database.
+ */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <assert.h>
+#include <siri/db/tee.h>
+#include <siri/siri.h>
+#include <siri/net/pipe.h>
+#include <logger/logger.h>
+
+#define TEE__BUF_SZ 512
+static char tee__buf[TEE__BUF_SZ];
+
+static void tee__runtime_init(uv_pipe_t * pipe);
+static void tee__write_cb(uv_write_t * req, int status);
+static void tee__on_connect(uv_connect_t * req, int status);
+static void tee__alloc_buffer(
+ uv_handle_t * handle,
+ size_t suggsz,
+ uv_buf_t * buf);
+static void tee__on_data(
+ uv_stream_t * client,
+ ssize_t nread,
+ const uv_buf_t * buf);
+
+siridb_tee_t * siridb_tee_new(void)
+{
+ siridb_tee_t * tee = malloc(sizeof(siridb_tee_t));
+ if (tee == NULL)
+ {
+ return NULL;
+ }
+ tee->pipe_name_ = NULL;
+ tee->err_msg_ = NULL;
+ tee->pipe.data = tee;
+ tee->flags = SIRIDB_TEE_FLAG;
+ return tee;
+}
+
+void siridb_tee_free(siridb_tee_t * tee)
+{
+ free(tee->err_msg_);
+ free(tee->pipe_name_);
+ free(tee);
+}
+
+int siridb_tee_connect(siridb_tee_t * tee)
+{
+ uv_connect_t * req = malloc(sizeof(uv_connect_t));
+ if (req == NULL)
+ {
+ return -1;
+ }
+
+ req->data = tee;
+
+ if (uv_pipe_init(siri.loop, &tee->pipe, 0))
+ {
+ return -1;
+ }
+ tee->flags |= SIRIDB_TEE_FLAG_INIT;
+ tee->pipe.data = tee;
+
+ free(tee->err_msg_);
+
+ uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect);
+ return 0;
+}
+
+int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name)
+{
+ free(tee->pipe_name_);
+ tee->pipe_name_ = strdup(pipe_name);
+ if (!tee->pipe_name_)
+ {
+ return -1;
+ }
+ if (tee->flags & SIRIDB_TEE_FLAG_INIT)
+ {
+ uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__runtime_init);
+ }
+ else
+ {
+ tee__runtime_init(&tee->pipe);
+ }
+ return 0;
+}
+
+void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+{
+ uv_write_t * req = malloc(sizeof(uv_write_t));
+ if (!req)
+ {
+ log_error("Cannot allocate memory for tee request");
+ return;
+ }
+
+ req->data = promise;
+ sirinet_promise_incref(promise);
+
+ uv_buf_t wrbuf = uv_buf_init(
+ (char *) promise->pkg,
+ sizeof(sirinet_pkg_t) + promise->pkg->len);
+
+ if (uv_write(req, (uv_stream_t *) &tee->pipe, &wrbuf, 1, tee__write_cb))
+ {
+ log_error("Cannot write to tee");
+ sirinet_promise_decref(promise);
+ }
+}
+
+const char * tee_str(siridb_tee_t * tee)
+{
+ if (tee->err_msg_)
+ {
+ return tee->err_msg_;
+ }
+ if (tee->pipe_name_)
+ {
+ return tee->pipe_name_;
+ }
+ return "disabled";
+}
+
+
+static void tee__runtime_init(uv_pipe_t * pipe)
+{
+ siridb_tee_t * tee = pipe->data;
+
+ tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
+ tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+
+ if (siridb_tee_connect(tee))
+ {
+ log_error("Could not connect to tee at runtime");
+ }
+}
+
+static void tee__write_cb(uv_write_t * req, int status)
+{
+ sirinet_promise_t * promise = req->data;
+ sirinet_promise_decref(promise);
+ if (status)
+ {
+ log_error("Socket (tee) write error: %s", uv_strerror(status));
+ }
+ free(req);
+}
+
+static void tee__on_connect(uv_connect_t * req, int status)
+{
+ siridb_tee_t * tee = req->data;
+
+ if (status == 0)
+ {
+ log_info("Connection created to pipe: '%s'", tee->pipe_name_);
+ if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data))
+ {
+ if (asprintf(&tee->err_msg_,
+ "Cannot open pipe '%s' for reading",
+ tee->pipe_name_) >= 0)
+ {
+ log_error(tee->err_msg_);
+ }
+ }
+ else
+ {
+ tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
+ }
+ }
+ else
+ {
+ if (asprintf(
+ &tee->err_msg_,
+ "Cannot connect to pipe '%s' (%s)",
+ tee->pipe_name_,
+ uv_strerror(status)) >= 0)
+ {
+ log_error(tee->err_msg_);
+ }
+ }
+ free(req);
+}
+
+static void tee__alloc_buffer(
+ uv_handle_t * handle __attribute__((unused)),
+ size_t suggsz __attribute__((unused)),
+ uv_buf_t * buf)
+{
+ buf->base = tee__buf;
+ buf->len = TEE__BUF_SZ;
+}
+
+
+
+static void tee__on_data(
+ uv_stream_t * client,
+ ssize_t nread,
+ const uv_buf_t * buf __attribute__((unused)))
+{
+ siridb_tee_t * tee = client->data;
+
+ if (nread < 0)
+ {
+ if (nread != UV_EOF)
+ {
+ log_error("Read error on pipe '%s' : '%s'",
+ sirinet_pipe_name((uv_pipe_t * ) client),
+ uv_err_name(nread));
+ }
+ log_info("Disconnected from tee pipe: '%s'",
+ sirinet_pipe_name((uv_pipe_t * ) client));
+ tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+ uv_close((uv_handle_t *) client, NULL);
+ }
+
+ if (nread > 0)
+ {
+ log_debug("Got %zd bytes on tee which will be ignored", nread);
+ }
+}
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2018-07-05 16:20:26
+ * Created at: 2018-10-29 10:52:57
*/
#include "siri/grammar/grammar.h"
#define CLERI_FIRST_MATCH 0
#define CLERI_MOST_GREEDY 1
-cleri_grammar_t * compile_grammar(void)
+cleri_grammar_t * compile_siri_grammar_grammar(void)
{
cleri_t * r_float = cleri_regex(CLERI_GID_R_FLOAT, "^[-+]?[0-9]*\\.?[0-9]+");
cleri_t * r_integer = cleri_regex(CLERI_GID_R_INTEGER, "^[-+]?[0-9]+");
cleri_keyword(CLERI_NONE, "symmetric_difference", CLERI_CASE_SENSITIVE)
);
cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE);
+ cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE);
cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE);
cleri_t * k_timezone = cleri_keyword(CLERI_GID_K_TIMEZONE, "timezone", CLERI_CASE_SENSITIVE);
cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE);
cleri_t * server_columns = cleri_list(CLERI_GID_SERVER_COLUMNS, cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 28,
+ 29,
k_address,
k_buffer_path,
k_buffer_size,
k_reindex_progress,
k_selected_points,
k_sync_progress,
+ k_tee_pipe_name,
k_uptime
), cleri_token(CLERI_NONE, ","), 1, 0, 0);
cleri_t * group_columns = cleri_list(CLERI_GID_GROUP_COLUMNS, cleri_choice(
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 11,
+ 12,
k_address,
k_buffer_path,
k_dbpath,
k_version,
k_status,
k_reindex_progress,
- k_sync_progress
+ k_sync_progress,
+ k_tee_pipe_name
),
str_operator,
string
k_address,
string
);
+ cleri_t * set_tee_pipe_name = cleri_sequence(
+ CLERI_GID_SET_TEE_PIPE_NAME,
+ 3,
+ k_set,
+ k_tee_pipe_name,
+ cleri_choice(
+ CLERI_NONE,
+ CLERI_FIRST_MATCH,
+ 2,
+ k_false,
+ string
+ )
+ );
cleri_t * set_backup_mode = cleri_sequence(
CLERI_GID_SET_BACKUP_MODE,
3,
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 4,
+ 5,
set_log_level,
set_backup_mode,
+ set_tee_pipe_name,
set_address,
set_port
)
3,
k_servers,
cleri_optional(CLERI_NONE, where_server),
- set_log_level
+ cleri_choice(
+ CLERI_NONE,
+ CLERI_FIRST_MATCH,
+ 2,
+ set_log_level,
+ set_tee_pipe_name
+ )
);
cleri_t * alter_user = cleri_sequence(
CLERI_GID_ALTER_USER,
cleri_list(CLERI_NONE, cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 34,
+ 35,
k_active_handles,
k_active_tasks,
k_buffer_path,
k_startup_time,
k_status,
k_sync_progress,
+ k_tee_pipe_name,
k_time_precision,
k_timezone,
k_uptime,
{
siridb = (siridb_t *) siridb_node->data;
+ if ( siridb_tee_is_configured(siridb->tee) &&
+ !siridb_tee_is_connected(siridb->tee))
+ {
+ siridb_tee_connect(siridb->tee);
+ }
+
server_node = siridb->servers->first;
while (server_node != NULL)
{
static void on_log_level_update(
sirinet_stream_t * client,
sirinet_pkg_t * pkg);
+static void on_tee_pipe_name_update(
+ sirinet_stream_t * client,
+ sirinet_pkg_t * pkg);
static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static void on_query(
sirinet_stream_t * client,
case BPROTO_DISABLE_BACKUP_MODE:
on_disable_backup_mode(client, pkg);
break;
+ case BPROTO_TEE_PIPE_NAME_UPDATE:
+ on_tee_pipe_name_update(client, pkg);
+ break;
}
}
}
}
+static void on_tee_pipe_name_update(
+ sirinet_stream_t * client,
+ sirinet_pkg_t * pkg)
+{
+ SERVER_CHECK_AUTHENTICATED(client, server);
+ siridb_t * siridb = client->siridb;
+ sirinet_pkg_t * package;
+ char * pipe_name = strndup((const char *) pkg->data, pkg->len);
+ if (pipe_name != NULL)
+ {
+ (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ }
+
+ package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL);
+ if (package != NULL)
+ {
+ /* ignore result code, signal can be raised */
+ sirinet_pkg_send(client, package);
+ }
+}
+
static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg)
{
SERVER_CHECK_AUTHENTICATED(client, server)
}
/*
- * Cleanup socket (pipe) file. (Unused)
+ * Cleanup socket (pipe) file. (UNUSED)
*/
void sirinet_pipe_unlink(uv_pipe_t * client)
{
*/
int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg)
{
- uv_write_t * req = (uv_write_t *) malloc(sizeof(uv_write_t));
+ uv_write_t * req = malloc(sizeof(uv_write_t));
if (req == NULL)
{
return -1;
}
- pkg_send_t * data = (pkg_send_t *) malloc(sizeof(pkg_send_t));
+ pkg_send_t * data = malloc(sizeof(pkg_send_t));
if (data == NULL)
{
(char *) pkg,
sizeof(sirinet_pkg_t) + pkg->len);
- uv_write(req, client->stream, &wrbuf, 1, PKG_write_cb);
+ if (uv_write(req, client->stream, &wrbuf, 1, PKG_write_cb))
+ {
+ sirinet_stream_decref(data->client);
+ free(pkg);
+ free(req);
+ return -1;
+ }
return 0;
}
sirinet_pkg_t * sirinet_pkg_dup(sirinet_pkg_t * pkg)
{
size_t size = sizeof(sirinet_pkg_t) + pkg->len;
- sirinet_pkg_t * dup = (sirinet_pkg_t *) malloc(size);
+ sirinet_pkg_t * dup = malloc(size);
if (dup == NULL)
{
ERR_ALLOC
case BPROTO_REQ_GROUPS: return "BPROTO_REQ_GROUPS";
case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE";
case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
+ case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
default:
sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
return protocol_str;
case BPROTO_ACK_ENABLE_BACKUP_MODE: return "BPROTO_ACK_ENABLE_BACKUP_MODE";
case BPROTO_ACK_DISABLE_BACKUP_MODE: return "BPROTO_ACK_DISABLE_BACKUP_MODE";
case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS";
+ case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME";
default:
sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
return protocol_str;
*/
char * sirinet_stream_name(sirinet_stream_t * client)
{
- switch (client->tp)
+ switch ((sirinet_stream_tp_t) client->tp)
{
case STREAM_TCP_CLIENT:
case STREAM_TCP_BACKEND:
{
sirinet_stream_t * client = uvclient->data;
- switch (client->tp)
+ switch ((sirinet_stream_tp_t) client->tp)
{
case STREAM_PIPE_CLIENT:
case STREAM_TCP_CLIENT: /* listens to client connections */
siridb_init_aggregates();
/* load SiriDB grammar */
- siri.grammar = compile_grammar();
+ siri.grammar = compile_siri_grammar_grammar();
/* create store for SiriDB instances */
siri.siridb_list = llist_new();
case UV_TCP:
case UV_NAMED_PIPE:
- if (handle->data == NULL)
{
- uv_close(handle, NULL);
- }
- else
- {
- sirinet_stream_decref((sirinet_stream_t *) handle->data);
+ sirinet_stream_t * stream = handle->data;
+ if (stream == NULL || (stream->tp & SIRIDB_TEE_FLAG))
+ {
+ uv_close(handle, NULL);
+ }
+ else
+ {
+ sirinet_stream_decref(stream);
+ }
}
break;