set_expiration_num = Sequence(
k_set,
k_expiration_num,
- Choice(
- k_false,
- time_expr,
- most_greedy=False),
+ time_expr,
+ Optional(set_ignore_threshold))
+ set_expiration_log = Sequence(
+ k_set,
+ k_expiration_log,
+ time_expr,
Optional(set_ignore_threshold))
- set_expiration_log = Sequence(k_set, k_expiration_log, Choice(
- k_false,
- time_expr,
- most_greedy=False))
alter_database = Sequence(k_database, Choice(
set_drop_threshold,
k_duration_log,
k_duration_num,
k_fifo_files,
+ k_expiration_log,
+ k_expiration_num,
k_idle_percentage,
k_idle_time,
k_ip_support,
#define SIRIDB_MAX_SIZE_ERR_MSG 1024
#define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */
-#define SIRIDB_SCHEMA 5
+#define SIRIDB_SCHEMA 6
#define SIRIDB_FLAG_REINDEXING 1
#define SIRIDB_FLAG_DROPPED 2
int siridb_save(siridb_t * siridb);
void siridb__free(siridb_t * siridb);
void siridb_drop(siridb_t * siridb);
+void siridb_update_shard_expiration(siridb_t * siridb);
#define siridb_incref(siridb) siridb->ref++
#define siridb_decref(_siridb) if (!--_siridb->ref) siridb__free(_siridb)
imap_t * series_map;
uv_mutex_t series_mutex;
uv_mutex_t shards_mutex;
+ uv_mutex_t values_mutex;
imap_t * shards;
FILE * dropped_fp;
qp_fpacker_t * store;
uint32_t pos,
uint16_t len,
uint16_t cinfo);
+void series_update_start_end(siridb_series_t * series);
int siridb_series_add_point(
siridb_t *__restrict siridb,
siridb_series_t *__restrict series,
siridb_t * siridb,
siridb_series_t * series,
siridb_points_t * points);
+double siridb_shards_count_percent(
+ siridb_t * siridb,
+ uint64_t end_ts,
+ uint8_t tp);
#endif /* SIRIDB_SHARDS_H_ */
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2020-01-21 16:05:35
+ * Created at: 2020-01-23 14:08:47
*/
#ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
#define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
if ( qp_schema.via.int64 == 1 ||
qp_schema.via.int64 == 2 ||
qp_schema.via.int64 == 3 ||
- qp_schema.via.int64 == 4)
+ qp_schema.via.int64 == 4 ||
+ qp_schema.via.int64 == 5)
{
log_info(
"Found an old database schema (v%d), "
READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.")
}
}
+ if (qp_schema.via.int64 >= 6)
+ {
+ /* read select points limit */
+ if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0)
+ {
+ READ_DB_EXIT_WITH_ERROR(
+ "Cannot read shard (log) expiration time.")
+ }
+ (*siridb)->expiration_log = qp_obj.via.int64;
+
+ /* read list limit */
+ if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0)
+ {
+ READ_DB_EXIT_WITH_ERROR(
+ "Cannot read shard (number) expiration time.")
+ }
+ (*siridb)->expiration_num = qp_obj.via.int64;
+ }
if ((*siridb)->tee->pipe_name_ == NULL)
{
log_debug(
(siridb->tee->pipe_name_ == NULL
? qp_fadd_type(fpacker, QP_NULL)
: qp_fadd_string(fpacker, siridb->tee->pipe_name_)) ||
+ qp_fadd_int64(fpacker, siridb->expiration_log) ||
+ qp_fadd_int64(fpacker, siridb->expiration_num) ||
qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
qp_close(fpacker));
}
uv_mutex_destroy(&siridb->series_mutex);
uv_mutex_destroy(&siridb->shards_mutex);
+ uv_mutex_destroy(&siridb->values_mutex);
if (siridb->flags & SIRIDB_FLAG_DROPPED)
{
siridb_decref(siridb);
}
+void siridb_update_shard_expiration(siridb_t * siridb)
+{
+ struct timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ uv_mutex_lock(&siridb->values_mutex);
+
+ siridb->exp_at_num = siridb->expiration_num
+ ? siridb_time_now(siridb, now) - siridb->expiration_num
+ : 0;
+
+ siridb->exp_at_log = siridb->expiration_log
+ ? siridb_time_now(siridb, now) - siridb->expiration_log
+ : 0;
+
+ uv_mutex_unlock(&siridb->values_mutex);
+}
+
+
/*
* Returns NULL and raises a SIGNAL in case an error has occurred.
*/
siridb->groups = NULL;
siridb->dropped_fp = NULL;
siridb->store = NULL;
+ siridb->exp_at_log = 0;
+ siridb->exp_at_num = 0;
+ siridb->expiration_log = 0;
+ siridb->expiration_num = 0;
siridb->series = ct_new();
if (siridb->series == NULL)
uv_mutex_init(&siridb->series_mutex);
uv_mutex_init(&siridb->shards_mutex);
+ uv_mutex_init(&siridb->values_mutex);
return siridb;
}
}
+ if (series->length == 0)
+ {
+ if (siridb_series_drop(siridb, series))
+ {
+ siridb_series_flush_dropped(siridb);
+ }
+ }
+
if (tp == QP_ARRAY_CLOSE)
{
qp_next(unpacker, qp_series_name);
siridb_points_free((siridb_points_t *) *pcache);
*pcache = NULL;
}
+
+ }
+
+ if (series->length == 0)
+ {
+ if (siridb_series_drop(siridb, series))
+ {
+ siridb_series_flush_dropped(siridb);
+ }
}
if (tp == QP_ARRAY_CLOSE)
#include <siri/db/server.h>
#include <siri/db/servers.h>
#include <siri/db/shard.h>
+#include <siri/db/shards.h>
#include <siri/db/user.h>
#include <siri/db/users.h>
#include <siri/db/listener.h>
"Successfully changed drop_threshold from %g to %g."
#define MSG_SUCCESS_SET_LIST_LIMIT \
"Successfully changed list limit from %" PRIu32 " to %" PRIu32 "."
+#define MSG_SUCCESS_SET_EXPIRATION \
+ "Successfully changed expiration from %" PRIu64 " to %" PRIu64 "."
#define MSG_SUCCESS_SET_ADDR_PORT \
"Successfully changed server address to '%s'."
#define MSG_SUCCESS_DROP_SERVER \
static void exit_set_address(uv_async_t * handle);
static void exit_set_backup_mode(uv_async_t * handle);
static void exit_set_drop_threshold(uv_async_t * handle);
+static void exit_set_expiration_log(uv_async_t * handle);
+static void exit_set_expiration_num(uv_async_t * handle);
static void exit_set_list_limit(uv_async_t * handle);
static void exit_set_log_level(uv_async_t * handle);
static void exit_set_port(uv_async_t * handle);
siridb_listen_exit[CLERI_GID_SET_ADDRESS] = exit_set_address;
siridb_listen_exit[CLERI_GID_SET_BACKUP_MODE] = exit_set_backup_mode;
siridb_listen_exit[CLERI_GID_SET_DROP_THRESHOLD] = exit_set_drop_threshold;
+ siridb_listen_exit[CLERI_GID_SET_EXPIRATION_LOG] = exit_set_expiration_log;
+ siridb_listen_exit[CLERI_GID_SET_EXPIRATION_NUM] = exit_set_expiration_num;
siridb_listen_exit[CLERI_GID_SET_LIST_LIMIT] = exit_set_list_limit;
siridb_listen_exit[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level;
siridb_listen_exit[CLERI_GID_SET_PORT] = exit_set_port;
}
}
+static void exit_set_expiration_xxx(
+ uv_async_t * handle,
+ uint64_t * expirep,
+ uint8_t tp)
+{
+ siridb_query_t * query = handle->data;
+ siridb_t * siridb = query->client->siridb;
+
+ MASTER_CHECK_ACCESSIBLE(siridb)
+ MASTER_CHECK_VERSION(siridb, "2.0.35")
+
+ cleri_node_t * node = query->nodes->node->children->next->next->node;
+ uint64_t expiration = (uint64_t) CLERI_NODE_DATA(node);
+
+ if (IS_MASTER && expiration)
+ {
+ struct timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+ now.tv_sec -= (3600*24); /* remove one dat to be save */
+ uint64_t now_ts = siridb_time_now(siridb, now);
+
+ if (expiration >= now_ts)
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Shard expiration time should be a value greater "
+ "than or equal to zero (0) and smaller "
+ "than %"PRIu64" but got %" PRId64,
+ now_ts, (int64_t) CLERI_NODE_DATA(node));
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
+ }
+
+ query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
+ double percent = siridb_shards_count_percent(
+ siridb,
+ now_ts - expiration,
+ tp);
+
+ if ((~q_wrapper->flags & QUERIES_IGNORE_DROP_THRESHOLD) &&
+ percent >= siridb->drop_threshold)
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "This query would drop %0.2f%% of the shards in pool %u. "
+ "Add \'set ignore_threshold true\' to the query "
+ "statement if you really want to do this.",
+ percent * 100,
+ siridb->server->pool);
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
+ }
+ }
+
+ uint64_t old = *expirep;
+ *expirep = expiration;
+
+ siridb_update_shard_expiration(siridb);
+
+ if (siridb_save(siridb))
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Error while saving database changes!");
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ }
+ else
+ {
+ QP_ADD_SUCCESS
+
+ log_info(
+ MSG_SUCCESS_SET_EXPIRATION,
+ old,
+ *expirep);
+
+ qp_add_fmt_safe(query->packer,
+ MSG_SUCCESS_SET_EXPIRATION,
+ old,
+ *expirep);
+
+ if (IS_MASTER)
+ {
+ siridb_query_forward(
+ handle,
+ SIRIDB_QUERY_FWD_UPDATE,
+ (sirinet_promises_cb) on_update_xxx_response,
+ 0);
+ }
+ else
+ {
+ SIRIPARSER_ASYNC_NEXT_NODE
+ }
+ }
+}
+static void exit_set_expiration_log(uv_async_t * handle)
+{
+ siridb_query_t * query = handle->data;
+ siridb_t * siridb = query->client->siridb;
+
+ exit_set_expiration_xxx(
+ handle,
+ &siridb->expiration_log,
+ SIRIDB_SHARD_TP_LOG);
+}
+
+
+static void exit_set_expiration_num(uv_async_t * handle)
+{
+ siridb_query_t * query = handle->data;
+ siridb_t * siridb = query->client->siridb;
+
+ exit_set_expiration_xxx(
+ handle,
+ &siridb->expiration_num,
+ SIRIDB_SHARD_TP_NUMBER);
+}
+
static void exit_set_list_limit(uv_async_t * handle)
{
siridb_query_t * query = handle->data;
siridb_t * siridb,
qp_packer_t * packer,
int map);
+static void prop_expiration_log(
+ siridb_t * siridb,
+ qp_packer_t * packer,
+ int map);
+static void prop_expiration_num(
+ siridb_t * siridb,
+ qp_packer_t * packer,
+ int map);
static void prop_fifo_files(
siridb_t * siridb,
qp_packer_t * packer,
prop_duration_num;
siridb_props[CLERI_GID_K_FIFO_FILES - KW_OFFSET] =
prop_fifo_files;
+ siridb_props[CLERI_GID_K_EXPIRATION_LOG - KW_OFFSET] =
+ prop_expiration_log;
+ siridb_props[CLERI_GID_K_EXPIRATION_NUM - KW_OFFSET] =
+ prop_expiration_num;
siridb_props[CLERI_GID_K_IDLE_PERCENTAGE - KW_OFFSET] =
prop_idle_percentage;
siridb_props[CLERI_GID_K_IDLE_TIME - KW_OFFSET] =
qp_add_int64(packer, (int64_t) siridb_fifo_size(siridb->fifo));
}
+static void prop_expiration_log(
+ siridb_t * siridb,
+ qp_packer_t * packer,
+ int map)
+{
+ SIRIDB_PROP_MAP("expiration_log", 14)
+ if (siridb->expiration_log)
+ {
+ qp_add_int64(packer, (int64_t) siridb->expiration_log);
+ }
+ else
+ {
+ qp_add_null(packer);
+ }
+}
+
+static void prop_expiration_num(
+ siridb_t * siridb,
+ qp_packer_t * packer,
+ int map)
+{
+ SIRIDB_PROP_MAP("expiration_num", 14)
+ if (siridb->expiration_num)
+ {
+ qp_add_int64(packer, (int64_t) siridb->expiration_num);
+ }
+ else
+ {
+ qp_add_null(packer);
+ }
+}
+
static void prop_idle_percentage(
siridb_t * siridb,
qp_packer_t * packer,
return -1;
}
+void series_update_start_end(siridb_series_t * series)
+{
+ SERIES_update_start(series);
+ SERIES_update_end(series);
+}
+
/*
* Returns 0 if successful; -1 and a SIGNAL is raised in case an error occurred.
*
{
_Bool is_num = siridb_series_isnum(series);
siridb_shard_t * shard;
+
+ uv_mutex_lock(&siridb->values_mutex);
+
uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log;
uint64_t expire_at = is_num ? siridb->exp_at_num : siridb->exp_at_log;
+
+ uv_mutex_unlock(&siridb->values_mutex);
+
uint64_t shard_start, shard_end, shard_id;
uint_fast32_t start, end, num_chunks, pstart, pend;
uint16_t chunk_sz;
end++);
if (shard_end < expire_at)
+ {
+ series->length -= end - start;
+ series_update_start_end(series);
continue;
+ }
if ((shard = imap_get(siridb->shards, shard_id)) == NULL)
{
return siri_err;
}
+double siridb_shards_count_percent(
+ siridb_t * siridb,
+ uint64_t end_ts,
+ uint8_t tp)
+{
+ double percent;
+ size_t i;
+ vec_t * shards_list;
+ size_t count = 0;
+ uint64_t duration = tp == SIRIDB_SHARD_TP_NUMBER
+ ? siridb->duration_num
+ : siridb->duration_log;
+
+ uv_mutex_lock(&siridb->shards_mutex);
+
+ shards_list = imap_2vec_ref(siridb->shards);
+
+ uv_mutex_unlock(&siridb->shards_mutex);
+
+ if (shards_list == NULL)
+ return 1.0; /* error, return as if all were removed */
+
+ if (shards_list->len == 0)
+ {
+ vec_free(shards_list);
+ return 0.0;
+ }
+
+ for (i = 0; i < shards_list->len; i++)
+ {
+ siridb_shard_t * shard = (siridb_shard_t *) shards_list->data[i];
+ if (shard->tp != tp)
+ continue;
+
+ count += ((shard->id - shard->id % duration) + duration) < end_ts;
+ }
+
+ percent = count / shards_list->len;
+ vec_free(shards_list);
+ return percent;
+}
+
/*
* Returns true if fn is a shard filename, false if not.
* Argument ext should be either ".sdb" or ".idx".
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2020-01-21 16:05:35
+ * Created at: 2020-01-23 14:08:47
*/
#include "siri/grammar/grammar.h"
4,
k_set,
k_expiration_num,
- cleri_choice(
- CLERI_NONE,
- CLERI_FIRST_MATCH,
- 2,
- k_false,
- time_expr
- ),
+ time_expr,
cleri_optional(CLERI_NONE, set_ignore_threshold)
);
cleri_t * set_expiration_log = cleri_sequence(
CLERI_GID_SET_EXPIRATION_LOG,
- 3,
+ 4,
k_set,
k_expiration_log,
- cleri_choice(
- CLERI_NONE,
- CLERI_FIRST_MATCH,
- 2,
- k_false,
- time_expr
- )
+ time_expr,
+ cleri_optional(CLERI_NONE, set_ignore_threshold)
);
cleri_t * alter_database = cleri_sequence(
CLERI_GID_ALTER_DATABASE,
cleri_list(CLERI_NONE, cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 35,
+ 37,
k_active_handles,
k_active_tasks,
k_buffer_path,
k_duration_log,
k_duration_num,
k_fifo_files,
+ k_expiration_log,
+ k_expiration_num,
k_idle_percentage,
k_idle_time,
k_ip_support,
llist_node_t * siridb_node;
llist_node_t * server_node;
- uint64_t now_ts;
- struct timespec now;
-
log_debug("Start heart-beat task");
- clock_gettime(CLOCK_REALTIME, &now);
-
siridb_node = siri.siridb_list->first;
while (siridb_node != NULL)
{
siridb = (siridb_t *) siridb_node->data;
- siridb->exp_at_num = siridb->expiration_num
- ? siridb_time_now(siridb, now) - siridb->expiration_num
- : 0;
-
- siridb->exp_at_log = siridb->expiration_log
- ? siridb_time_now(siridb, now) - siridb->expiration_log
- : 0;
+ siridb_update_shard_expiration(siridb);
if ( siridb_tee_is_configured(siridb->tee) &&
!siridb_tee_is_connected(siridb->tee))
siridb_shard_t * shard;
uint8_t c = siri.cfg->shard_compression;
size_t i;
+ uint64_t expi[2];
+ uint64_t dura[2];
+
+
log_info("Start optimize task");
uv_mutex_unlock(&siridb->shards_mutex);
+ uv_mutex_lock(&siridb->values_mutex);
+
+ dura[SIRIDB_SHARD_TP_NUMBER] = siridb->duration_num;
+ dura[SIRIDB_SHARD_TP_LOG] = siridb->duration_log;
+ expi[SIRIDB_SHARD_TP_NUMBER] = siridb->exp_at_num;
+ expi[SIRIDB_SHARD_TP_LOG] = siridb->exp_at_log;
+
+ uv_mutex_unlock(&siridb->values_mutex);
+
if (slshards == NULL)
{
log_error("Error creating reference list for shards.");
{
shard = (siridb_shard_t *) slshards->data[j];
- if (!siri_err &&
+ if ((shard->id - shard->id % dura[shard->tp]) + dura[shard->tp] <
+ expi[shard->tp])
+ {
+ log_info(
+ "Shard id %" PRIu64 " (%" PRIu8 ") is expired "
+ "and will be dropped",
+ shard->id, shard->flags);
+ siridb_shard_drop(shard, siridb);
+ }
+ else if (!siri_err &&
optimize.status != SIRI_OPTIMIZE_CANCELLED &&
((shard->flags & SIRIDB_SHARD_NEED_OPTIMIZE) ||
((!(shard->flags & SIRIDB_SHARD_IS_COMPRESSED)) == c)) &&