k_sync_progress = Keyword('sync_progress')
k_tag = Keyword('tag')
k_tags = Keyword('tags')
- k_tee_pipe_name = Keyword('tee_pipe_name')
+ k_tee = Keyword('tee')
k_time_precision = Keyword('time_precision')
k_timeit = Keyword('timeit')
k_timeval = Keyword('timeval')
k_reindex_progress,
k_selected_points,
k_sync_progress,
- k_tee_pipe_name,
k_uptime,
most_greedy=False), ',', 1)
k_status,
k_reindex_progress,
k_sync_progress,
- k_tee_pipe_name,
most_greedy=False), str_operator, string),
Sequence(k_online, bool_operator, _boolean),
Sequence(k_log_level, int_operator, log_keywords),
Optional(Sequence(k_using, aggregate_functions)))
set_address = Sequence(k_set, k_address, string)
- set_tee_pipe_name = Sequence(k_set, k_tee_pipe_name, Choice(
+ set_tee = Sequence(k_set, k_tee, Choice(
k_false,
string,
most_greedy=False))
set_timezone,
set_expiration_num,
set_expiration_log,
+ set_tee,
most_greedy=False))
alter_group = Sequence(k_group, group_name, Choice(
alter_server = Sequence(k_server, uuid, Choice(
set_log_level,
set_backup_mode,
- set_tee_pipe_name,
set_address,
set_port,
most_greedy=False))
alter_servers = Sequence(k_servers, Optional(where_server), Choice(
set_log_level,
- set_tee_pipe_name,
most_greedy=False))
alter_user = Sequence(k_user, string, Choice(
k_startup_time,
k_status,
k_sync_progress,
- k_tee_pipe_name,
+ k_tee,
k_time_precision,
k_timezone,
k_uptime,
#define SIRIDB_MAX_SIZE_ERR_MSG 1024
#define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */
-#define SIRIDB_SCHEMA 6
+#define SIRIDB_SCHEMA 7
#define SIRIDB_FLAG_REINDEXING 1
#define SIRIDB_FLAG_DROPPED 2
typedef struct siridb_tee_s siridb_tee_t;
+#define SIRIDB_TEE_DEFAULT_TCP_PORT 9104
+
enum
{
- SIRIDB_TEE_FLAG_INIT = 1<<0,
- SIRIDB_TEE_FLAG_CONNECTING = 1<<1,
- SIRIDB_TEE_FLAG_CONNECTED = 1<<2,
SIRIDB_TEE_FLAG = 1<<31,
};
+enum siridb_tee_e_t
+{
+ SIRIDB_TEE_E_OK=0,
+ SIRIDB_TEE_E_ALLOC,
+ SIRIDB_TEE_E_READ,
+ SIRIDB_TEE_E_CONNECT,
+};
+
+
#include <uv.h>
#include <stdbool.h>
#include <siri/net/promise.h>
siridb_tee_t * siridb_tee_new(void);
-void siridb_tee_free(siridb_tee_t * tee);
-int siridb_tee_connect(siridb_tee_t * tee);
-int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name);
+int siridb_tee_set_address_port(
+ siridb_tee_t * tee,
+ const char * address,
+ uint16_t port);
void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise);
-const char * tee_str(siridb_tee_t * tee);
-static inline bool siridb_tee_is_configured(siridb_tee_t * tee);
-static inline bool siridb_tee_is_connected(siridb_tee_t * tee);
-static inline bool siridb_tee_is_handle(uv_handle_t * handle);
+void siridb_tee_free(siridb_tee_t * tee);
+const char * siridb_tee_str(siridb_tee_t * tee);
+
+typedef void (*siridb_tee_cb)(uv_handle_t *);
+
struct siridb_tee_s
{
uint32_t flags; /* maps to sirnet_stream_t tp for cleanup */
- char * pipe_name_;
- char * err_msg_;
- uv_pipe_t pipe;
+ uint16_t port;
+ uint16_t err_code;
+ char * address;
+ uv_tcp_t * tcp;
+ uv_mutex_t lock_;
+ sirinet_promise_t * promise_;
};
+
static inline bool siridb_tee_is_configured(siridb_tee_t * tee)
{
- return tee->pipe_name_ != NULL;
+ return tee->address != NULL;
};
-static inline bool siridb_tee_is_connected(siridb_tee_t * tee)
-{
- return tee->flags & SIRIDB_TEE_FLAG_CONNECTED;
-}
-
static inline bool siridb_tee_is_handle(uv_handle_t * handle)
{
return
- handle->type == UV_NAMED_PIPE &&
+ handle->type == UV_TCP &&
handle->data &&
(((siridb_tee_t *) handle->data)->flags & SIRIDB_TEE_FLAG);
}
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2020-09-25 10:57:26
+ * Created at: 2022-04-15 12:10:03
*/
#ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
#define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
CLERI_GID_K_SYNC_PROGRESS,
CLERI_GID_K_TAG,
CLERI_GID_K_TAGS,
- CLERI_GID_K_TEE_PIPE_NAME,
+ CLERI_GID_K_TEE,
CLERI_GID_K_TIMEIT,
CLERI_GID_K_TIMEVAL,
CLERI_GID_K_TIMEZONE,
CLERI_GID_SET_PASSWORD,
CLERI_GID_SET_PORT,
CLERI_GID_SET_SELECT_POINTS_LIMIT,
- CLERI_GID_SET_TEE_PIPE_NAME,
+ CLERI_GID_SET_TEE,
CLERI_GID_SET_TIMEZONE,
CLERI_GID_SHARD_COLUMNS,
CLERI_GID_SHOW_STMT,
BPROTO_REQ_GROUPS, /* empty */
BPROTO_ENABLE_BACKUP_MODE, /* empty */
BPROTO_DISABLE_BACKUP_MODE, /* empty */
- BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */
+ _BPROTO_TEE_PIPE_NAME_UPDATE, /* deprecated */
BPROTO_DROP_DATABASE, /* empty */
BPROTO_REQ_TAGS, /* empty */
BPROTO_SERIES_TAGS, /* [series name, tag name, ...] */
--- /dev/null
+import asyncio
+
+
+
+if __name__ == '__main__':
+ pass
\ No newline at end of file
--- /dev/null
+import asyncio
+import functools
+import random
+import time
+import string
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
+
+
+TIME_PRECISION = 'ms'
+
+
+class TestIntegerLoad(TestBase):
+ title = 'Test inserts and response'
+
+ GEN_POINTS = functools.partial(
+ gen_points,
+ tp=int,
+ mi=-2**10,
+ ma=2**10,
+ n=5,
+ time_precision=TIME_PRECISION,
+ ts_gap=3)
+
+ async def select(self, client, series, n):
+ series = {s.name: s for s in series}
+ chars=string.ascii_letters + string.digits
+ for _ in range(n):
+ regex = ''.join(random.choice(chars) for _ in range(3))
+ res = await client.query(f'select max() prefix "max-", min() prefix "min-", mean() prefix "mean-" from /.*{regex}.*/i')
+ if res:
+ print(res)
+ # for s, p in res.items():
+ # self.assertEqual(sorted(series[s].points), p)
+ await asyncio.sleep(0.2)
+
+ async def insert(self, client, series, n):
+ for _ in range(n):
+ await client.insert_some_series(
+ series, n=0.04, points=self.GEN_POINTS)
+ await asyncio.sleep(0.2)
+
+ @default_test_setup(
+ 1,
+ time_precision=TIME_PRECISION,
+ compression=True,
+ auto_duration=True,
+ optimize_interval=20)
+ async def run(self):
+ await self.client0.connect()
+
+ series = gen_series(n=1000)
+
+ insert = asyncio.ensure_future(
+ self.insert(self.client0, series, n=50000))
+ select = asyncio.ensure_future(
+ self.select(self.client0, series, n=50000))
+ await asyncio.gather(insert, select)
+
+ self.client0.close()
+ await asyncio.sleep(1800)
+
+
+if __name__ == '__main__':
+ random.seed(1)
+ parse_args()
+ run_test(TestIntegerLoad())
/* start tasks */
siridb_tasks_init(&siridb->tasks);
- /* init tee if configured */
- if (siridb_tee_is_configured(siridb->tee))
- {
- siridb_tee_connect(siridb->tee);
- }
-
log_info("Finished loading database: '%s'", siridb->dbname);
return siridb;
qp_schema.via.int64 == 2 ||
qp_schema.via.int64 == 3 ||
qp_schema.via.int64 == 4 ||
- qp_schema.via.int64 == 5)
+ qp_schema.via.int64 == 5 ||
+ qp_schema.via.int64 == 6)
{
log_info(
"Found an old database schema (v%d), "
}
/* for older schemas we keep the default tee_pipe_name=NULL */
- if (qp_schema.via.int64 >= 5)
+ if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6)
{
+ LOGC("HERE");
qp_next(unpacker, &qp_obj);
-
- if (qp_obj.tp == QP_RAW)
- {
- (*siridb)->tee->pipe_name_ = strndup(
- (char *) qp_obj.via.raw,
- qp_obj.len);
-
- if (!(*siridb)->tee->pipe_name_)
- {
- READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
- }
- }
- else if (qp_obj.tp != QP_NULL)
- {
- READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.")
- }
+ /* Skip the tee pipe name */
}
if (qp_schema.via.int64 >= 6)
{
/* read select points limit */
if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0)
{
+
READ_DB_EXIT_WITH_ERROR(
"Cannot read shard (log) expiration time.")
}
}
(*siridb)->expiration_num = qp_obj.via.int64;
}
- if ((*siridb)->tee->pipe_name_ == NULL)
+ if (qp_schema.via.int64 >= 7)
+ {
+ qp_next(unpacker, &qp_obj);
+
+ if (qp_obj.tp == QP_RAW)
+ {
+ (*siridb)->tee->address = strndup(
+ (char *) qp_obj.via.raw,
+ qp_obj.len);
+
+ if (!(*siridb)->tee->address)
+ {
+ READ_DB_EXIT_WITH_ERROR("Cannot allocate tee address.")
+ }
+ }
+ else if (qp_obj.tp != QP_NULL)
+ {
+ READ_DB_EXIT_WITH_ERROR("Cannot read tee address.")
+ }
+
+ if (qp_next(unpacker, &qp_obj) != QP_INT64 ||
+ qp_obj.via.int64 < 0 ||
+ qp_obj.via.int64 > 65535)
+ {
+ READ_DB_EXIT_WITH_ERROR("Cannot read tee port.")
+ }
+
+ (*siridb)->tee->port = qp_obj.via.int64;
+ }
+
+ if ((*siridb)->tee->address == NULL)
{
log_debug(
- "No tee pipe name configured for database: %s",
+ "No tee configured for database: %s",
(*siridb)->dbname);
}
else
{
log_debug(
- "Using tee pipe name '%s' for database: '%s'",
- (*siridb)->tee->pipe_name_,
+ "Using tee '%s:%u' for database: '%s'",
+ (*siridb)->tee->address,
+ (*siridb)->tee->port,
(*siridb)->dbname);
}
qp_fadd_double(fpacker, siridb->drop_threshold) ||
qp_fadd_int64(fpacker, siridb->select_points_limit) ||
qp_fadd_int64(fpacker, siridb->list_limit) ||
- (siridb->tee->pipe_name_ == NULL
- ? qp_fadd_type(fpacker, QP_NULL)
- : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) ||
qp_fadd_int64(fpacker, siridb->expiration_log) ||
qp_fadd_int64(fpacker, siridb->expiration_num) ||
+ (siridb->tee->address == NULL
+ ? qp_fadd_type(fpacker, QP_NULL)
+ : qp_fadd_string(fpacker, siridb->tee->address)) ||
+ qp_fadd_int64(fpacker, siridb->tee->port) ||
qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
qp_close(fpacker));
}
siridb->users = NULL;
siridb->servers = NULL;
siridb->pools = NULL;
+ siridb->dropped_fp = NULL;
siridb->max_series_id = 0;
siridb->received_points = 0;
siridb->selected_points = 0;
siridb_tasks_inc(siridb->tasks);
siridb->insert_tasks++;
- if (siridb_tee_is_connected(siridb->tee))
+ if (siridb_tee_is_configured(siridb->tee))
{
siridb_tee_write(siridb->tee, promise);
}
siridb_tasks_inc(siridb->tasks);
siridb->insert_tasks++;
- if (siridb_tee_is_connected(siridb->tee))
+ if (siridb_tee_is_configured(siridb->tee))
{
siridb_tee_write(siridb->tee, promise);
}
#include <siri/net/promises.h>
#include <siri/net/protocol.h>
#include <siri/net/clserver.h>
+#include <siri/net/tcp.h>
#include <siri/siri.h>
#include <xstr/xstr.h>
#include <sys/time.h>
"Successfully dropped server '%s'."
#define MSG_SUCCES_SET_LOG_LEVEL_MULTI \
"Successfully set log level to '%s' on %lu servers."
-#define MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI \
- "Successfully set tee_pipe name on %lu servers."
+#define MSG_SUCCES_SET_TEE_DISABLED \
+ "Successfully disabled tee."
+#define MSG_SUCCES_SET_TEE_ENABLED \
+ "Successfully configures tee to %s."
#define MSG_SUCCES_SET_LOG_LEVEL \
"Successfully set log level to '%s' on '%s'."
-#define MSG_SUCCES_SET_TEE_PIPE_NAME \
- "Successfully set tee pipe name to '%s' on '%s'."
#define MSG_SUCCESS_SET_SELECT_POINTS_LIMIT \
"Successfully changed select points limit from %" PRIu32 " to %" PRIu32 "."
#define MSG_SUCCES_DROP_SERIES \
static void exit_set_log_level(uv_async_t * handle);
static void exit_set_port(uv_async_t * handle);
static void exit_set_select_points_limit(uv_async_t * handle);
-static void exit_set_tee_pipe_name(uv_async_t * handle);
+static void exit_set_tee(uv_async_t * handle);
static void exit_set_timezone(uv_async_t * handle);
static void exit_show_stmt(uv_async_t * handle);
static void exit_tag_series(uv_async_t * handle);
SIRIDB_NODE_EXIT[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level;
SIRIDB_NODE_EXIT[CLERI_GID_SET_PORT] = exit_set_port;
SIRIDB_NODE_EXIT[CLERI_GID_SET_SELECT_POINTS_LIMIT] = exit_set_select_points_limit;
- SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name;
+ SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE] = exit_set_tee;
SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt;
SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series;
}
}
-static void exit_set_tee_pipe_name(uv_async_t * handle)
+static void exit_set_tee(uv_async_t * handle)
{
siridb_query_t * query = handle->data;
- query_alter_t * q_alter = (query_alter_t *) query->data;
siridb_t * siridb = query->client->siridb;
- assert (query->data != NULL);
+ MASTER_CHECK_ACCESSIBLE(siridb)
cleri_node_t * node = cleri_gn(cleri_gn(
query->nodes->node->children->next->next)->children);
- char pipe_name[node->len - 1];
- char * p_pipe_name = NULL;
+ char tee_addr_port[node->len - 1];
+ char tee_address[SIRI_CFG_MAX_LEN_ADDRESS];
+ uint16_t tee_port = SIRIDB_TEE_DEFAULT_TCP_PORT;
- if (node->cl_obj->gid == CLERI_GID_STRING)
- {
- xstr_extract_string(pipe_name, node->str, node->len);
- p_pipe_name = pipe_name;
- }
- if (q_alter->alter_tp == QUERY_ALTER_SERVERS)
+ if (node->cl_obj->gid == CLERI_GID_STRING)
{
- /*
- * alter_servers
- */
- cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr;
- siridb_server_walker_t wserver = {
- .server=siridb->server,
- .siridb=siridb
- };
+ xstr_extract_string(tee_addr_port, node->str, node->len);
- if (where_expr == NULL || cexpr_run(
- where_expr,
- (cexpr_cb_t) siridb_server_cexpr_cb,
- &wserver))
+ if (tee_addr_port[0] == 0)
{
- (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
- if (siridb_save(siridb))
- {
- log_critical("Could not save database changes (database: '%s')",
- siridb->dbname);
- }
- q_alter->n++;
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Tee address must not be empty");
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
}
- if (IS_MASTER)
+ if (sirinet_extract_addr_port(tee_addr_port, tee_address, &tee_port))
{
- /*
- * This is a trick because we share with setting log level on
- * multiple servers at once.
- */
- q_alter->n += LOGGER_NUM_LEVELS << 16;
- siridb_query_forward(
- handle,
- SIRIDB_QUERY_FWD_SERVERS,
- (sirinet_promises_cb) on_alter_xxx_response,
- 0);
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Invalid tee address; expecting ADDRESS[:PORT]");
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
}
- else
+
+ if (siridb_tee_set_address_port(siridb->tee, tee_address, tee_port))
{
- qp_add_raw(query->packer, (const unsigned char *) "servers", 7);
- qp_add_int64(query->packer, q_alter->n);
- SIRIPARSER_ASYNC_NEXT_NODE
+ log_error("Failed to set tee address"); /* continue on error..*/
}
}
else
{
- /*
- * alter_server
- *
- * we can set the success message, we just ignore the message in case
- * an error occurs.
- */
- siridb_server_t * server = q_alter->via.server;
+ /* disable the tee */
+ (void) siridb_tee_set_address_port(siridb->tee, NULL, 0);
+ }
+ if (siridb_save(siridb))
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Error while saving database changes!");
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ }
+ else
+ {
QP_ADD_SUCCESS
- qp_add_fmt_safe(query->packer,
- MSG_SUCCES_SET_TEE_PIPE_NAME,
- p_pipe_name ? p_pipe_name : "disabled",
- server->name);
- if (server == siridb->server)
+ if (siridb->tee->address)
{
- (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
- if (siridb_save(siridb))
- {
- log_critical("Could not save database changes (database: '%s')",
- siridb->dbname);
- }
-
- SIRIPARSER_ASYNC_NEXT_NODE
+ log_info(
+ MSG_SUCCES_SET_TEE_ENABLED,
+ siridb->tee->address,
+ siridb->tee->port);
+ qp_add_fmt_safe(query->packer,
+ MSG_SUCCES_SET_TEE_ENABLED,
+ siridb->tee->address,
+ siridb->tee->port);
}
else
{
+ log_info(MSG_SUCCES_SET_TEE_DISABLED);
+ qp_add_string(query->packer, MSG_SUCCES_SET_TEE_DISABLED);
+ }
- if (siridb_server_is_online(server))
- {
- sirinet_pkg_t * pkg = sirinet_pkg_new(
- 0,
- p_pipe_name ? strlen(p_pipe_name) : 0,
- BPROTO_TEE_PIPE_NAME_UPDATE,
- (unsigned char *) p_pipe_name);
- if (pkg != NULL)
- {
- /* handle will be bound to a timer so we should increment */
- siri_async_incref(handle);
- if (siridb_server_send_pkg(
- server,
- pkg,
- 0,
- (sirinet_promise_cb) on_ack_response,
- handle,
- 0))
- {
- /*
- * signal is raised and 'on_ack_response' will not be
- * called
- */
- free(pkg);
- siri_async_decref(&handle);
- }
- }
- }
- else
- {
- snprintf(query->err_msg,
- SIRIDB_MAX_SIZE_ERR_MSG,
- "Cannot set pipe name, '%s' is currently unavailable",
- server->name);
- siridb_query_send_error(handle, CPROTO_ERR_QUERY);
- }
+ if (IS_MASTER)
+ {
+ siridb_query_forward(
+ handle,
+ SIRIDB_QUERY_FWD_UPDATE,
+ (sirinet_promises_cb) on_update_xxx_response,
+ 0);
+ }
+ else
+ {
+ SIRIPARSER_ASYNC_NEXT_NODE
}
}
}
*/
QP_ADD_SUCCESS
- if ((q_alter->n >> 16) >= LOGGER_NUM_LEVELS)
- {
- log_info(MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, q_alter->n & 0xffff);
- qp_add_fmt_safe(
- query->packer,
- MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI,
- q_alter->n & 0xffff);
- }
- else
- {
- log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
- logger_level_name(q_alter->n >> 16),
- q_alter->n & 0xffff);
-
- qp_add_fmt_safe(
- query->packer,
- MSG_SUCCES_SET_LOG_LEVEL_MULTI,
- logger_level_name(q_alter->n >> 16),
- q_alter->n & 0xffff);
- }
+ log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+ logger_level_name(q_alter->n >> 16),
+ q_alter->n & 0xffff);
+ qp_add_fmt_safe(
+ query->packer,
+ MSG_SUCCES_SET_LOG_LEVEL_MULTI,
+ logger_level_name(q_alter->n >> 16),
+ q_alter->n & 0xffff);
SIRIPARSER_ASYNC_NEXT_NODE
}
siridb_t * siridb,
qp_packer_t * packer,
int map);
-static void prop_tee_pipe_name(
+static void prop_tee(
siridb_t * siridb,
qp_packer_t * packer,
int map);
prop_status);
props_set_cb(CLERI_GID_K_SYNC_PROGRESS - KW_OFFSET,
prop_sync_progress);
- props_set_cb(CLERI_GID_K_TEE_PIPE_NAME - KW_OFFSET,
- prop_tee_pipe_name);
+ props_set_cb(CLERI_GID_K_TEE - KW_OFFSET,
+ prop_tee);
props_set_cb(CLERI_GID_K_TIMEZONE - KW_OFFSET,
prop_timezone);
props_set_cb(CLERI_GID_K_TIME_PRECISION - KW_OFFSET,
qp_add_string(packer, siridb_initsync_sync_progress(siridb));
}
-static void prop_tee_pipe_name(
+static void prop_tee(
siridb_t * siridb,
qp_packer_t * packer,
int map)
{
- SIRIDB_PROP_MAP("tee_pipe_name", 13)
- qp_add_string(packer, tee_str(siridb->tee));
+ SIRIDB_PROP_MAP("tee", 3)
+ qp_add_string(packer, siridb_tee_str(siridb->tee));
}
static void prop_timezone(
cond->operator,
siridb_initsync_sync_progress(wserver->siridb),
cond->str);
-
- case CLERI_GID_K_TEE_PIPE_NAME:
- return cexpr_str_cmp(
- cond->operator,
- tee_str(wserver->siridb->tee),
- cond->str);
}
log_critical("Unexpected server property received: %d", cond->prop);
case CLERI_GID_K_SYNC_PROGRESS:
qp_add_string(query->packer, siridb_initsync_sync_progress(siridb));
break;
- case CLERI_GID_K_TEE_PIPE_NAME:
- qp_add_string(query->packer, tee_str(siridb->tee));
- break;
case CLERI_GID_K_UPTIME:
qp_add_int64(
query->packer,
#include <assert.h>
#include <siri/db/tee.h>
#include <siri/siri.h>
-#include <siri/net/pipe.h>
+#include <siri/net/tcp.h>
#include <logger/logger.h>
#define TEE__BUF_SZ 512
static char tee__buf[TEE__BUF_SZ];
-static void tee__runtime_init(uv_pipe_t * pipe);
-static void tee__write_cb(uv_write_t * req, int status);
-static void tee__on_connect(uv_connect_t * req, int status);
-static void tee__close_cb(uv_pipe_t * pipe);
static void tee__alloc_buffer(
- uv_handle_t * handle,
- size_t suggsz,
- uv_buf_t * buf);
-static void tee__on_data(
- uv_stream_t * client,
- ssize_t nread,
- const uv_buf_t * buf);
-
-siridb_tee_t * siridb_tee_new(void)
+ uv_handle_t * handle __attribute__((unused)),
+ size_t suggsz __attribute__((unused)),
+ uv_buf_t * buf)
{
- siridb_tee_t * tee = malloc(sizeof(siridb_tee_t));
- if (tee == NULL)
- {
- return NULL;
- }
- tee->pipe_name_ = NULL;
- tee->err_msg_ = NULL;
- tee->pipe.data = tee;
- tee->flags = SIRIDB_TEE_FLAG;
- return tee;
+ buf->base = tee__buf;
+ buf->len = TEE__BUF_SZ;
}
-void siridb_tee_free(siridb_tee_t * tee)
+static inline void tee__release(siridb_tee_t * tee)
{
- free(tee->err_msg_);
- free(tee->pipe_name_);
- free(tee);
+ sirinet_promise_decref(tee->promise_);
+ tee->promise_ = NULL;
+ uv_mutex_unlock(&tee->lock_);
}
-int siridb_tee_connect(siridb_tee_t * tee)
+static void tee__write_cb(uv_write_t * req, int status)
{
- if (tee->flags & SIRIDB_TEE_FLAG_CONNECTING)
+ sirinet_promise_t * promise = req->data;
+ if (status)
{
- return 0;
+ log_error("Socket (tee) write error: %s", uv_strerror(status));
}
- tee->flags |= SIRIDB_TEE_FLAG_CONNECTING;
- uv_connect_t * req = malloc(sizeof(uv_connect_t));
- if (req == NULL)
+ sirinet_promise_decref(promise);
+ free(req);
+}
+
+static void tee__on_data(
+ uv_tcp_t * tcp,
+ ssize_t nread,
+ const uv_buf_t * buf __attribute__((unused)))
+{
+ siridb_tee_t * tee = tcp->data;
+ if (!tee)
{
- return -1;
+ return;
}
- req->data = tee;
-
- if (uv_pipe_init(siri.loop, &tee->pipe, 0))
+ if (nread < 0)
{
- tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING;
- free(req);
- return -1;
+ if (nread != UV_EOF)
+ {
+ log_error("Read error on tee '%s': '%s'",
+ tee->address,
+ uv_err_name(nread));
+ }
+
+ log_info("Disconnected from tee `%s`", tee->address);
+
+ uv_close((uv_handle_t *) tcp, (uv_close_cb) free);
+ tee->tcp = NULL;
+ return;
}
- tee->flags |= SIRIDB_TEE_FLAG_INIT;
- tee->pipe.data = tee;
- uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect);
- return 0;
+ log_debug("Got %zd bytes on tee `%s` which will be ignored",
+ nread,
+ tee->address);
}
-int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name)
+static void tee__do_write(siridb_tee_t * tee, sirinet_promise_t * promise)
{
- free(tee->pipe_name_);
- free(tee->err_msg_);
- tee->err_msg_ = NULL;
+ uv_write_t * req = malloc(sizeof(uv_write_t));
+ uv_buf_t wrbuf = uv_buf_init(
+ (char *) promise->pkg,
+ sizeof(sirinet_pkg_t) + promise->pkg->len);
- if (pipe_name == NULL)
+ if (req)
{
- tee->pipe_name_ = NULL;
+ int rc;
+ req->data = promise;
+ sirinet_promise_incref(promise);
+ rc = uv_write(req, (uv_stream_t *) tee->tcp, &wrbuf, 1, tee__write_cb);
+ if (rc == 0)
+ {
+ return; /* success */
+ }
+ sirinet_promise_decref(promise);
+ }
+ log_error("Cannot write to tee");
+ return;
+}
- if (tee->flags & SIRIDB_TEE_FLAG_CONNECTED)
+static void tee__on_connect(uv_connect_t * req, int status)
+{
+ siridb_tee_t * tee = req->data;
+
+ if (status == 0)
+ {
+ if (uv_read_start(
+ req->handle,
+ tee__alloc_buffer,
+ (uv_read_cb) tee__on_data))
{
- uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__close_cb);
+ /* Failed to start reading the tee connection */
+ tee->err_code = SIRIDB_TEE_E_READ;
+ log_error("Failed to open tee `%s` for reading", tee->address);
+ goto fail;
}
- return 0;
+
+ /* success */
+ log_info("Connection created to tee: '%s'", tee->address);
+ tee__do_write(tee, tee->promise_);
+ goto done;
}
- tee->pipe_name_ = strdup(pipe_name);
- if (!tee->pipe_name_)
+ /* failed */
+ tee->err_code = SIRIDB_TEE_E_CONNECT;
+ log_warning(
+ "Cannot connect to tee '%s' (%s)",
+ tee->address,
+ uv_strerror(status));
+
+fail:
+ uv_close((uv_handle_t *) req->handle, (uv_close_cb) free);
+ tee->tcp = NULL;
+done:
+ tee__release(tee);
+ free(req);
+}
+
+void tee__make_connection(siridb_tee_t * tee, const struct sockaddr * dest)
+{
+ uv_connect_t * req = malloc(sizeof(uv_connect_t));
+ tee->tcp = malloc(sizeof(uv_tcp_t));
+ if (tee->tcp == NULL || req == NULL)
{
- return -1;
+ goto fail0;
}
- if (tee->flags & SIRIDB_TEE_FLAG_INIT)
+
+ req->data = tee;
+
+ log_debug("Trying to connect to tee '%s'...", tee->address);
+ (void) uv_tcp_init(siri.loop, tee->tcp);
+ (void) uv_tcp_connect(req, tee->tcp, dest, tee__on_connect);
+ return;
+
+fail0:
+ free(req);
+ free(tee->tcp);
+ tee->tcp = NULL;
+ tee__release(tee);
+}
+
+static void tee__on_resolved(
+ uv_getaddrinfo_t * resolver,
+ int status,
+ struct addrinfo * res)
+{
+ siridb_tee_t * tee = resolver->data;
+ free(resolver);
+
+ if (status < 0)
{
- uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__runtime_init);
+ log_error("Cannot resolve ip address for tee '%s' (error: %s)",
+ tee->address,
+ uv_err_name(status));
+ tee__release(tee);
+ return;
}
- else
+
+ if (Logger.level == LOGGER_DEBUG)
{
- tee__runtime_init(&tee->pipe);
+ char addr[47] = {'\0'}; /* enough for both ipv4 and ipv6 */
+
+ switch (res->ai_family)
+ {
+ case AF_INET:
+ uv_ip4_name((struct sockaddr_in *) res->ai_addr, addr, 16);
+ break;
+
+ case AF_INET6:
+ uv_ip6_name((struct sockaddr_in6 *) res->ai_addr, addr, 46);
+ break;
+
+ default:
+ sprintf(addr, "unsupported family");
+ }
+
+ log_debug(
+ "Resolved ip address '%s' for tee '%s', "
+ "trying to connect...",
+ addr, tee->address);
+
}
- return 0;
+
+ tee__make_connection(tee, (const struct sockaddr *) res->ai_addr);
}
-void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+static void tee__resolve_dns(siridb_tee_t * tee, int ai_family)
{
- uv_write_t * req = malloc(sizeof(uv_write_t));
- if (!req)
+ int result;
+ struct addrinfo hints;
+ char port[6]= {0};
+ uv_getaddrinfo_t * resolver = malloc(sizeof(uv_getaddrinfo_t));
+
+ hints.ai_family = ai_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+ hints.ai_flags = AI_NUMERICSERV;
+
+ if (resolver == NULL)
{
- log_error("Cannot allocate memory for tee request");
return;
}
- req->data = promise;
- sirinet_promise_incref(promise);
+ resolver->data = tee;
- uv_buf_t wrbuf = uv_buf_init(
- (char *) promise->pkg,
- sizeof(sirinet_pkg_t) + promise->pkg->len);
+ sprintf(port, "%u", tee->port);
+
+ result = uv_getaddrinfo(
+ siri.loop,
+ resolver,
+ tee__on_resolved,
+ tee->address,
+ port,
+ &hints);
- if (uv_write(req, (uv_stream_t *) &tee->pipe, &wrbuf, 1, tee__write_cb))
+ if (result)
{
- log_error("Cannot write to tee");
- sirinet_promise_decref(promise);
+ log_error("getaddrinfo call error %s", uv_err_name(result));
+ free(resolver);
}
}
-const char * tee_str(siridb_tee_t * tee)
+void tee__connect(siridb_tee_t * tee)
{
- if (tee->err_msg_)
+ struct in_addr sa;
+ struct in6_addr sa6;
+
+ if (inet_pton(AF_INET, tee->address, &sa))
{
- return tee->err_msg_;
+ /* IPv4 */
+ struct sockaddr_in dest;
+ (void) uv_ip4_addr(tee->address, tee->port, &dest);
+ tee__make_connection(tee, (const struct sockaddr *) &dest);
+ return;
}
- if (tee->pipe_name_)
+
+ if (inet_pton(AF_INET6, tee->address, &sa6))
{
- return tee->pipe_name_;
+ /* IPv6 */
+ struct sockaddr_in6 dest6;
+ (void) uv_ip6_addr(tee->address, tee->port, &dest6);
+ tee__make_connection(tee, (const struct sockaddr *) &dest6);
+ return;
}
- return "disabled";
-}
+ /* Try DNS */
+ tee__resolve_dns(tee, dns_req_family_map(siri.cfg->ip_support));
+}
-static void tee__runtime_init(uv_pipe_t * pipe)
+siridb_tee_t * siridb_tee_new(void)
{
- siridb_tee_t * tee = pipe->data;
-
- tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
- tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING;
- tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
-
- if (siridb_tee_connect(tee))
- {
- log_error("Could not connect to tee at runtime");
- }
+ siridb_tee_t * tee = malloc(sizeof(siridb_tee_t));
+ if (tee == NULL)
+ {
+ return NULL;
+ }
+ tee->address = NULL;
+ tee->tcp = NULL;
+ tee->promise_ = NULL;
+ tee->flags = SIRIDB_TEE_FLAG;
+ tee->err_code = 0;
+ uv_mutex_init(&tee->lock_);
+ return tee;
}
-static void tee__close_cb(uv_pipe_t * pipe)
+void siridb_tee_free(siridb_tee_t * tee)
{
- siridb_tee_t * tee = pipe->data;
+ /* must be closed before free can be used */
+ assert (tee->tcp == NULL);
+ assert (tee->promise_ == NULL);
- tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
- tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTING;
- tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+ uv_mutex_destroy(&tee->lock_);
+ free(tee->address);
+ free(tee);
}
-static void tee__write_cb(uv_write_t * req, int status)
+const char * siridb_tee_str(siridb_tee_t * tee)
{
- sirinet_promise_t * promise = req->data;
- sirinet_promise_decref(promise);
- if (status)
+ if (tee->err_code)
{
- log_error("Socket (tee) write error: %s", uv_strerror(status));
+ switch((enum siridb_tee_e_t) tee->err_code)
+ {
+ case SIRIDB_TEE_E_OK: return "OK";
+ case SIRIDB_TEE_E_ALLOC: return "memory allocation error";
+ case SIRIDB_TEE_E_READ: return "failed to open socket for reading";
+ case SIRIDB_TEE_E_CONNECT: return "failed to make connection";
+ }
}
- free(req);
+ if (tee->address)
+ {
+ return tee->address;
+ }
+ return "disabled";
}
-static void tee__on_connect(uv_connect_t * req, int status)
+void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
{
- siridb_tee_t * tee = req->data;
-
- if (status == 0)
+ if (!tee->address)
{
- log_info("Connection created to pipe: '%s'", tee->pipe_name_);
- if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data))
- {
- free(tee->err_msg_);
- if (asprintf(&tee->err_msg_,
- "Cannot open pipe '%s' for reading",
- tee->pipe_name_) >= 0)
- {
- log_warning(tee->err_msg_);
- }
- goto fail;
- }
- tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
- goto done;
+ /* Tee is not configured */
+ return;
}
- free(tee->err_msg_);
- tee->err_msg_ = NULL;
-
- if (asprintf(
- &tee->err_msg_,
- "Cannot connect to pipe '%s' (%s)",
- tee->pipe_name_,
- uv_strerror(status)) >= 0)
+ if (tee->tcp)
{
- log_warning(tee->err_msg_);
+ tee__do_write(tee, promise);
}
+ else
+ {
+ uv_mutex_lock(&tee->lock_);
-fail:
- uv_close((uv_handle_t *) req->handle, (uv_close_cb) tee__close_cb);
-done:
- free(req);
+ if (!tee->address)
+ {
+ /* Tee is not configured */
+ uv_mutex_unlock(&tee->lock_);
+ return;
+ }
+
+ assert (tee->promise_ == NULL);
+ tee->promise_ = promise;
+ sirinet_promise_incref(promise);
+ tee__connect(tee);
+ }
}
-static void tee__alloc_buffer(
- uv_handle_t * handle __attribute__((unused)),
- size_t suggsz __attribute__((unused)),
- uv_buf_t * buf)
+int siridb_tee_set_address_port(
+ siridb_tee_t * tee,
+ const char * address,
+ uint16_t port)
{
- buf->base = tee__buf;
- buf->len = TEE__BUF_SZ;
-}
+ uv_mutex_lock(&tee->lock_);
+ free(tee->address);
+ tee->err_code = SIRIDB_TEE_E_OK;
+ tee->address = address ? strdup(address) : NULL;
+ tee->port = port;
-static void tee__on_data(
- uv_stream_t * client,
- ssize_t nread,
- const uv_buf_t * buf __attribute__((unused)))
-{
- if (nread < 0)
+ if (tee->tcp && !uv_is_closing((uv_handle_t *) tee->tcp))
{
- if (nread != UV_EOF)
- {
- log_error("Read error on pipe '%s' : '%s'",
- sirinet_pipe_name((uv_pipe_t * ) client),
- uv_err_name(nread));
- }
- log_info("Disconnected from tee");
- uv_close((uv_handle_t *) client, (uv_close_cb) tee__close_cb);
+ uv_close((uv_handle_t *) tee->tcp, (uv_close_cb) free);
+ tee->tcp = NULL;
}
- if (nread > 0)
- {
- log_debug("Got %zd bytes on tee which will be ignored", nread);
- }
+ uv_mutex_unlock(&tee->lock_);
+
+ return address && !tee->address ? -1 : 0;
}
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2020-09-25 10:57:26
+ * Created at: 2022-04-15 12:10:03
*/
#include "siri/grammar/grammar.h"
cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE);
cleri_t * k_tag = cleri_keyword(CLERI_GID_K_TAG, "tag", CLERI_CASE_SENSITIVE);
cleri_t * k_tags = cleri_keyword(CLERI_GID_K_TAGS, "tags", CLERI_CASE_SENSITIVE);
- cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE);
+ cleri_t * k_tee = cleri_keyword(CLERI_GID_K_TEE, "tee", CLERI_CASE_SENSITIVE);
cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE);
cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE);
cleri_t * k_timeval = cleri_keyword(CLERI_GID_K_TIMEVAL, "timeval", CLERI_CASE_SENSITIVE);
cleri_t * server_columns = cleri_list(CLERI_GID_SERVER_COLUMNS, cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 29,
+ 28,
k_address,
k_buffer_path,
k_buffer_size,
k_reindex_progress,
k_selected_points,
k_sync_progress,
- k_tee_pipe_name,
k_uptime
), cleri_token(CLERI_NONE, ","), 1, 0, 0);
cleri_t * group_columns = cleri_list(CLERI_GID_GROUP_COLUMNS, cleri_choice(
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 12,
+ 11,
k_address,
k_buffer_path,
k_dbpath,
k_version,
k_status,
k_reindex_progress,
- k_sync_progress,
- k_tee_pipe_name
+ k_sync_progress
),
str_operator,
string
k_address,
string
);
- cleri_t * set_tee_pipe_name = cleri_sequence(
- CLERI_GID_SET_TEE_PIPE_NAME,
+ cleri_t * set_tee = cleri_sequence(
+ CLERI_GID_SET_TEE,
3,
k_set,
- k_tee_pipe_name,
+ k_tee,
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 6,
+ 7,
set_drop_threshold,
set_list_limit,
set_select_points_limit,
set_timezone,
set_expiration_num,
- set_expiration_log
+ set_expiration_log,
+ set_tee
)
);
cleri_t * alter_group = cleri_sequence(
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 5,
+ 4,
set_log_level,
set_backup_mode,
- set_tee_pipe_name,
set_address,
set_port
)
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 2,
- set_log_level,
- set_tee_pipe_name
+ 1,
+ set_log_level
)
);
cleri_t * alter_user = cleri_sequence(
k_startup_time,
k_status,
k_sync_progress,
- k_tee_pipe_name,
+ k_tee,
k_time_precision,
k_timezone,
k_uptime,
siridb_update_shard_expiration(siridb);
- if ( siridb_tee_is_configured(siridb->tee) &&
- !siridb_tee_is_connected(siridb->tee))
- {
- siridb_tee_connect(siridb->tee);
- }
-
server_node = siridb->servers->first;
while (server_node != NULL)
{
static void on_log_level_update(
sirinet_stream_t * client,
sirinet_pkg_t * pkg);
-static void on_tee_pipe_name_update(
- sirinet_stream_t * client,
- sirinet_pkg_t * pkg);
static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static void on_query(
case BPROTO_DISABLE_BACKUP_MODE:
on_disable_backup_mode(client, pkg);
break;
- case BPROTO_TEE_PIPE_NAME_UPDATE:
- on_tee_pipe_name_update(client, pkg);
+ case _BPROTO_TEE_PIPE_NAME_UPDATE:
+ log_warning("BPROTO_TEE_PIPE_NAME_UPDATE is deprecated");
break;
case BPROTO_DROP_DATABASE:
on_drop_database(client, pkg);
}
}
-static void on_tee_pipe_name_update(
- sirinet_stream_t * client,
- sirinet_pkg_t * pkg)
-{
- SERVER_CHECK_AUTHENTICATED(client, server);
- siridb_t * siridb = client->siridb;
- sirinet_pkg_t * package;
-
- char * pipe_name = pkg->len
- ? strndup((const char *) pkg->data, pkg->len)
- : NULL;
-
- (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
-
- free(pipe_name);
-
- package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL);
- if (package != NULL)
- {
- /* ignore result code, signal can be raised */
- sirinet_pkg_send(client, package);
- }
-}
-
static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg)
{
SERVER_CHECK_AUTHENTICATED(client, server)
case BPROTO_REQ_GROUPS: return "BPROTO_REQ_GROUPS";
case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE";
case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
- case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
+ case _BPROTO_TEE_PIPE_NAME_UPDATE: return "_BPROTO_TEE_PIPE_NAME_UPDATE";
case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS";
qp_fpacker_t * fpacker;
qp_unpacker_t unpacker;
qp_obj_t
+ qp_obj,
qp_uuid,
qp_schema,
qp_dbname,
qp_points_limit,
qp_list_limit,
qp_exp_log,
- qp_exp_num;
+ qp_exp_num,
+ qp_tee_address,
+ qp_tee_port;
siridb_t * siridb;
int rc;
/* 13 = strlen("database.dat")+1 */
return;
}
- /* list and points limit require at least schema 1 */
- (void) qp_next(&unpacker, &qp_points_limit);
- (void) qp_next(&unpacker, &qp_list_limit);
+ if (qp_schema.via.int64 >= 2)
+ {
+ if (qp_next(&unpacker, &qp_points_limit) != QP_INT64||
+ qp_next(&unpacker, &qp_list_limit) != QP_INT64)
+ {
+ CLIENT_err(adm_client, "invalid database file received");
+ return;
+ }
+ }
+ else
+ {
+ qp_points_limit.via.int64 = DEF_SELECT_POINTS_LIMIT;
+ qp_list_limit.via.int64 = DEF_LIST_LIMIT;
+ }
+
+ if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6)
+ {
+ qp_next(&unpacker, &qp_obj);
+ /* Skip the tee pipe name */
+ }
- /* this is the tee pipe name when schema is >= 5 */
- (void) qp_next(&unpacker, &qp_exp_log);
- (void) qp_next(&unpacker, &qp_exp_num);
+ if (qp_schema.via.int64 >= 6)
+ {
+ if (qp_next(&unpacker, &qp_exp_log) != QP_INT64||
+ qp_next(&unpacker, &qp_exp_num) != QP_INT64)
+ {
+ CLIENT_err(adm_client, "invalid database file received");
+ return;
+ }
+ }
+ else
+ {
+ qp_exp_log.via.int64 = 0;
+ qp_exp_num.via.int64 = 0;
+ }
- /* these are the expiration times when schema is >= 6 */
+ if (qp_schema.via.int64 >= 7)
+ {
+ if (qp_next(&unpacker, &qp_tee_address) == QP_ERR ||
+ qp_next(&unpacker, &qp_tee_port) != QP_INT64)
+ {
+ CLIENT_err(adm_client, "invalid database file received");
+ return;
+ }
+ }
+ else
+ {
+ qp_tee_address.tp = QP_NULL;
+ qp_tee_port.via.int64 = SIRIDB_TEE_DEFAULT_TCP_PORT;
+ }
if ((fpacker = qp_open(fn, "w")) == NULL)
{
return;
}
+ LOGC("HERE");
+
rc = ( qp_fadd_type(fpacker, QP_ARRAY_OPEN) ||
qp_fadd_int64(fpacker, SIRIDB_SCHEMA) ||
qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) ||
qp_fadd_int64(fpacker, qp_duration_log.via.int64) ||
qp_fadd_raw(fpacker, qp_timezone.via.raw, qp_timezone.len) ||
qp_fadd_double(fpacker, qp_drop_threshold.via.real) ||
- qp_fadd_int64(fpacker, qp_points_limit.tp == QP_INT64
- ? qp_points_limit.via.int64
- : DEF_SELECT_POINTS_LIMIT) ||
- qp_fadd_int64(fpacker, qp_list_limit.tp == QP_INT64
- ? qp_list_limit.via.int64
- : DEF_LIST_LIMIT) ||
- qp_fadd_type(fpacker, QP_NULL) ||
- qp_fadd_int64(fpacker, qp_exp_log.tp == QP_INT64
- ? qp_exp_log.via.int64
- : 0) ||
- qp_fadd_int64(fpacker, qp_exp_num.tp == QP_INT64
- ? qp_exp_num.via.int64
- : 0) ||
+ qp_fadd_int64(fpacker, qp_points_limit.via.int64) ||
+ qp_fadd_int64(fpacker, qp_list_limit.via.int64) ||
+ qp_fadd_int64(fpacker, qp_exp_log.via.int64) ||
+ qp_fadd_int64(fpacker, qp_exp_num.via.int64) ||
+ (qp_tee_address.tp == QP_RAW
+ ? qp_fadd_raw(
+ fpacker,
+ qp_tee_address.via.raw,
+ qp_tee_address.len)
+ : qp_fadd_type(fpacker, QP_NULL)) ||
+ qp_fadd_int64(fpacker, qp_tee_port.via.int64) ||
qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
qp_close(fpacker));
#include <siri/db/reindex.h>
#include <siri/db/server.h>
#include <siri/db/servers.h>
+#include <siri/db/tee.h>
#include <siri/service/account.h>
#include <siri/service/client.h>
#include <siri/service/request.h>
qp_fadd_double(fp, DEF_DROP_THRESHOLD) ||
qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) ||
qp_fadd_int64(fp, DEF_LIST_LIMIT) ||
- qp_fadd_type(fp, QP_NULL) ||
qp_fadd_int64(fp, 0) ||
qp_fadd_int64(fp, 0) ||
+ qp_fadd_type(fp, QP_NULL) ||
+ qp_fadd_int64(fp, SIRIDB_TEE_DEFAULT_TCP_PORT) ||
qp_fadd_type(fp, QP_ARRAY_CLOSE))
{
rc = -1;
case UV_TCP:
case UV_NAMED_PIPE:
{
- if (handle->data == NULL || siridb_tee_is_handle(handle))
+ if (handle->data == NULL)
{
uv_close(handle, NULL);
}
+ else if (siridb_tee_is_handle(handle))
+ {
+ // TODO: close tee handle
+ assert (0);
+ }
else if (siri_health_is_handle(handle))
{
siri_health_close((siri_health_request_t *) handle->data);