From: Jeroen van der Heijden Date: Thu, 23 Jan 2020 16:11:05 +0000 (+0100) Subject: Work on expire shard X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~5^2~22 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=6e3eb62db28f35dd6ce49f5a0840b57b2aa785a4;p=siridb-server.git Work on expire shard --- diff --git a/grammar/grammar.py b/grammar/grammar.py index ac5d6011..dd7ec0f0 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -596,15 +596,13 @@ class SiriGrammar(Grammar): set_expiration_num = Sequence( k_set, k_expiration_num, - Choice( - k_false, - time_expr, - most_greedy=False), + time_expr, + Optional(set_ignore_threshold)) + set_expiration_log = Sequence( + k_set, + k_expiration_log, + time_expr, Optional(set_ignore_threshold)) - set_expiration_log = Sequence(k_set, k_expiration_log, Choice( - k_false, - time_expr, - most_greedy=False)) alter_database = Sequence(k_database, Choice( set_drop_threshold, @@ -783,6 +781,8 @@ class SiriGrammar(Grammar): k_duration_log, k_duration_num, k_fifo_files, + k_expiration_log, + k_expiration_num, k_idle_percentage, k_idle_time, k_ip_support, diff --git a/include/siri/db/db.h b/include/siri/db/db.h index 961338d1..3671a47f 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -8,7 +8,7 @@ typedef struct siridb_s siridb_t; #define SIRIDB_MAX_SIZE_ERR_MSG 1024 #define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */ -#define SIRIDB_SCHEMA 5 +#define SIRIDB_SCHEMA 6 #define SIRIDB_FLAG_REINDEXING 1 #define SIRIDB_FLAG_DROPPED 2 @@ -52,6 +52,7 @@ int siridb_open_files(siridb_t * siridb); int siridb_save(siridb_t * siridb); void siridb__free(siridb_t * siridb); void siridb_drop(siridb_t * siridb); +void siridb_update_shard_expiration(siridb_t * siridb); #define siridb_incref(siridb) siridb->ref++ #define siridb_decref(_siridb) if (!--_siridb->ref) siridb__free(_siridb) @@ -93,6 +94,7 @@ struct siridb_s imap_t * series_map; uv_mutex_t series_mutex; uv_mutex_t shards_mutex; + uv_mutex_t values_mutex; imap_t * shards; FILE * dropped_fp; qp_fpacker_t * store; diff --git a/include/siri/db/series.h b/include/siri/db/series.h index c708489f..96f2d8f9 100644 --- a/include/siri/db/series.h +++ b/include/siri/db/series.h @@ -85,6 +85,7 @@ int siridb_series_add_idx( uint32_t pos, uint16_t len, uint16_t cinfo); +void series_update_start_end(siridb_series_t * series); int siridb_series_add_point( siridb_t *__restrict siridb, siridb_series_t *__restrict series, diff --git a/include/siri/db/shards.h b/include/siri/db/shards.h index 48b3736e..773bdbfb 100644 --- a/include/siri/db/shards.h +++ b/include/siri/db/shards.h @@ -24,5 +24,9 @@ int siridb_shards_add_points( siridb_t * siridb, siridb_series_t * series, siridb_points_t * points); +double siridb_shards_count_percent( + siridb_t * siridb, + uint64_t end_ts, + uint8_t tp); #endif /* SIRIDB_SHARDS_H_ */ diff --git a/include/siri/grammar/grammar.h b/include/siri/grammar/grammar.h index 83ea352a..2dbe8283 100644 --- a/include/siri/grammar/grammar.h +++ b/include/siri/grammar/grammar.h @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2020-01-21 16:05:35 + * Created at: 2020-01-23 14:08:47 */ #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ diff --git a/src/siri/db/db.c b/src/siri/db/db.c index fb2c4bf5..d9d1425e 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -310,7 +310,8 @@ static int siridb__from_unpacker( if ( qp_schema.via.int64 == 1 || qp_schema.via.int64 == 2 || qp_schema.via.int64 == 3 || - qp_schema.via.int64 == 4) + qp_schema.via.int64 == 4 || + qp_schema.via.int64 == 5) { log_info( "Found an old database schema (v%d), " @@ -492,6 +493,24 @@ static int siridb__from_unpacker( READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.") } } + if (qp_schema.via.int64 >= 6) + { + /* read select points limit */ + if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0) + { + READ_DB_EXIT_WITH_ERROR( + "Cannot read shard (log) expiration time.") + } + (*siridb)->expiration_log = qp_obj.via.int64; + + /* read list limit */ + if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0) + { + READ_DB_EXIT_WITH_ERROR( + "Cannot read shard (number) expiration time.") + } + (*siridb)->expiration_num = qp_obj.via.int64; + } if ((*siridb)->tee->pipe_name_ == NULL) { log_debug( @@ -651,6 +670,8 @@ int siridb_save(siridb_t * siridb) (siridb->tee->pipe_name_ == NULL ? qp_fadd_type(fpacker, QP_NULL) : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) || + qp_fadd_int64(fpacker, siridb->expiration_log) || + qp_fadd_int64(fpacker, siridb->expiration_num) || qp_fadd_type(fpacker, QP_ARRAY_CLOSE) || qp_close(fpacker)); } @@ -761,6 +782,7 @@ void siridb__free(siridb_t * siridb) uv_mutex_destroy(&siridb->series_mutex); uv_mutex_destroy(&siridb->shards_mutex); + uv_mutex_destroy(&siridb->values_mutex); if (siridb->flags & SIRIDB_FLAG_DROPPED) { @@ -808,6 +830,25 @@ void siridb_drop(siridb_t * siridb) siridb_decref(siridb); } +void siridb_update_shard_expiration(siridb_t * siridb) +{ + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + + uv_mutex_lock(&siridb->values_mutex); + + siridb->exp_at_num = siridb->expiration_num + ? siridb_time_now(siridb, now) - siridb->expiration_num + : 0; + + siridb->exp_at_log = siridb->expiration_log + ? siridb_time_now(siridb, now) - siridb->expiration_log + : 0; + + uv_mutex_unlock(&siridb->values_mutex); +} + + /* * Returns NULL and raises a SIGNAL in case an error has occurred. */ @@ -843,6 +884,10 @@ static siridb_t * siridb__new(void) siridb->groups = NULL; siridb->dropped_fp = NULL; siridb->store = NULL; + siridb->exp_at_log = 0; + siridb->exp_at_num = 0; + siridb->expiration_log = 0; + siridb->expiration_num = 0; siridb->series = ct_new(); if (siridb->series == NULL) @@ -876,6 +921,7 @@ static siridb_t * siridb__new(void) uv_mutex_init(&siridb->series_mutex); uv_mutex_init(&siridb->shards_mutex); + uv_mutex_init(&siridb->values_mutex); return siridb; diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index f2faf9e5..e67b7913 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -638,6 +638,14 @@ static int8_t INSERT_local_work( } } + if (series->length == 0) + { + if (siridb_series_drop(siridb, series)) + { + siridb_series_flush_dropped(siridb); + } + } + if (tp == QP_ARRAY_CLOSE) { qp_next(unpacker, qp_series_name); @@ -852,6 +860,15 @@ static int INSERT_local_work_test( siridb_points_free((siridb_points_t *) *pcache); *pcache = NULL; } + + } + + if (series->length == 0) + { + if (siridb_series_drop(siridb, series)) + { + siridb_series_flush_dropped(siridb); + } } if (tp == QP_ARRAY_CLOSE) diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 4e703ea1..94b80ec5 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -149,6 +150,8 @@ if (IS_MASTER && siridb_is_reindexing(siridb)) \ "Successfully changed drop_threshold from %g to %g." #define MSG_SUCCESS_SET_LIST_LIMIT \ "Successfully changed list limit from %" PRIu32 " to %" PRIu32 "." +#define MSG_SUCCESS_SET_EXPIRATION \ + "Successfully changed expiration from %" PRIu64 " to %" PRIu64 "." #define MSG_SUCCESS_SET_ADDR_PORT \ "Successfully changed server address to '%s'." #define MSG_SUCCESS_DROP_SERVER \ @@ -248,6 +251,8 @@ static void exit_series_parentheses(uv_async_t * handle); static void exit_set_address(uv_async_t * handle); static void exit_set_backup_mode(uv_async_t * handle); static void exit_set_drop_threshold(uv_async_t * handle); +static void exit_set_expiration_log(uv_async_t * handle); +static void exit_set_expiration_num(uv_async_t * handle); static void exit_set_list_limit(uv_async_t * handle); static void exit_set_log_level(uv_async_t * handle); static void exit_set_port(uv_async_t * handle); @@ -483,6 +488,8 @@ void siridb_init_listener(void) siridb_listen_exit[CLERI_GID_SET_ADDRESS] = exit_set_address; siridb_listen_exit[CLERI_GID_SET_BACKUP_MODE] = exit_set_backup_mode; siridb_listen_exit[CLERI_GID_SET_DROP_THRESHOLD] = exit_set_drop_threshold; + siridb_listen_exit[CLERI_GID_SET_EXPIRATION_LOG] = exit_set_expiration_log; + siridb_listen_exit[CLERI_GID_SET_EXPIRATION_NUM] = exit_set_expiration_num; siridb_listen_exit[CLERI_GID_SET_LIST_LIMIT] = exit_set_list_limit; siridb_listen_exit[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level; siridb_listen_exit[CLERI_GID_SET_PORT] = exit_set_port; @@ -3779,6 +3786,123 @@ static void exit_set_drop_threshold(uv_async_t * handle) } } +static void exit_set_expiration_xxx( + uv_async_t * handle, + uint64_t * expirep, + uint8_t tp) +{ + siridb_query_t * query = handle->data; + siridb_t * siridb = query->client->siridb; + + MASTER_CHECK_ACCESSIBLE(siridb) + MASTER_CHECK_VERSION(siridb, "2.0.35") + + cleri_node_t * node = query->nodes->node->children->next->next->node; + uint64_t expiration = (uint64_t) CLERI_NODE_DATA(node); + + if (IS_MASTER && expiration) + { + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + now.tv_sec -= (3600*24); /* remove one dat to be save */ + uint64_t now_ts = siridb_time_now(siridb, now); + + if (expiration >= now_ts) + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Shard expiration time should be a value greater " + "than or equal to zero (0) and smaller " + "than %"PRIu64" but got %" PRId64, + now_ts, (int64_t) CLERI_NODE_DATA(node)); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + + query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data; + double percent = siridb_shards_count_percent( + siridb, + now_ts - expiration, + tp); + + if ((~q_wrapper->flags & QUERIES_IGNORE_DROP_THRESHOLD) && + percent >= siridb->drop_threshold) + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "This query would drop %0.2f%% of the shards in pool %u. " + "Add \'set ignore_threshold true\' to the query " + "statement if you really want to do this.", + percent * 100, + siridb->server->pool); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + } + + uint64_t old = *expirep; + *expirep = expiration; + + siridb_update_shard_expiration(siridb); + + if (siridb_save(siridb)) + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Error while saving database changes!"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + } + else + { + QP_ADD_SUCCESS + + log_info( + MSG_SUCCESS_SET_EXPIRATION, + old, + *expirep); + + qp_add_fmt_safe(query->packer, + MSG_SUCCESS_SET_EXPIRATION, + old, + *expirep); + + if (IS_MASTER) + { + siridb_query_forward( + handle, + SIRIDB_QUERY_FWD_UPDATE, + (sirinet_promises_cb) on_update_xxx_response, + 0); + } + else + { + SIRIPARSER_ASYNC_NEXT_NODE + } + } +} +static void exit_set_expiration_log(uv_async_t * handle) +{ + siridb_query_t * query = handle->data; + siridb_t * siridb = query->client->siridb; + + exit_set_expiration_xxx( + handle, + &siridb->expiration_log, + SIRIDB_SHARD_TP_LOG); +} + + +static void exit_set_expiration_num(uv_async_t * handle) +{ + siridb_query_t * query = handle->data; + siridb_t * siridb = query->client->siridb; + + exit_set_expiration_xxx( + handle, + &siridb->expiration_num, + SIRIDB_SHARD_TP_NUMBER); +} + static void exit_set_list_limit(uv_async_t * handle) { siridb_query_t * query = handle->data; diff --git a/src/siri/db/props.c b/src/siri/db/props.c index 5ac92a5c..02cee86f 100644 --- a/src/siri/db/props.c +++ b/src/siri/db/props.c @@ -64,6 +64,14 @@ static void prop_duration_num( siridb_t * siridb, qp_packer_t * packer, int map); +static void prop_expiration_log( + siridb_t * siridb, + qp_packer_t * packer, + int map); +static void prop_expiration_num( + siridb_t * siridb, + qp_packer_t * packer, + int map); static void prop_fifo_files( siridb_t * siridb, qp_packer_t * packer, @@ -200,6 +208,10 @@ void siridb_init_props(void) prop_duration_num; siridb_props[CLERI_GID_K_FIFO_FILES - KW_OFFSET] = prop_fifo_files; + siridb_props[CLERI_GID_K_EXPIRATION_LOG - KW_OFFSET] = + prop_expiration_log; + siridb_props[CLERI_GID_K_EXPIRATION_NUM - KW_OFFSET] = + prop_expiration_num; siridb_props[CLERI_GID_K_IDLE_PERCENTAGE - KW_OFFSET] = prop_idle_percentage; siridb_props[CLERI_GID_K_IDLE_TIME - KW_OFFSET] = @@ -342,6 +354,38 @@ static void prop_fifo_files( qp_add_int64(packer, (int64_t) siridb_fifo_size(siridb->fifo)); } +static void prop_expiration_log( + siridb_t * siridb, + qp_packer_t * packer, + int map) +{ + SIRIDB_PROP_MAP("expiration_log", 14) + if (siridb->expiration_log) + { + qp_add_int64(packer, (int64_t) siridb->expiration_log); + } + else + { + qp_add_null(packer); + } +} + +static void prop_expiration_num( + siridb_t * siridb, + qp_packer_t * packer, + int map) +{ + SIRIDB_PROP_MAP("expiration_num", 14) + if (siridb->expiration_num) + { + qp_add_int64(packer, (int64_t) siridb->expiration_num); + } + else + { + qp_add_null(packer); + } +} + static void prop_idle_percentage( siridb_t * siridb, qp_packer_t * packer, diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 3dce1c73..edb83586 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -103,6 +103,12 @@ int siridb_series_cexpr_cb(siridb_series_t * series, cexpr_condition_t * cond) return -1; } +void series_update_start_end(siridb_series_t * series) +{ + SERIES_update_start(series); + SERIES_update_end(series); +} + /* * Returns 0 if successful; -1 and a SIGNAL is raised in case an error occurred. * diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index a2c1f4bd..0e729b9a 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -126,8 +126,14 @@ int siridb_shards_add_points( { _Bool is_num = siridb_series_isnum(series); siridb_shard_t * shard; + + uv_mutex_lock(&siridb->values_mutex); + uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log; uint64_t expire_at = is_num ? siridb->exp_at_num : siridb->exp_at_log; + + uv_mutex_unlock(&siridb->values_mutex); + uint64_t shard_start, shard_end, shard_id; uint_fast32_t start, end, num_chunks, pstart, pend; uint16_t chunk_sz; @@ -145,7 +151,11 @@ int siridb_shards_add_points( end++); if (shard_end < expire_at) + { + series->length -= end - start; + series_update_start_end(series); continue; + } if ((shard = imap_get(siridb->shards, shard_id)) == NULL) { @@ -219,6 +229,48 @@ int siridb_shards_add_points( return siri_err; } +double siridb_shards_count_percent( + siridb_t * siridb, + uint64_t end_ts, + uint8_t tp) +{ + double percent; + size_t i; + vec_t * shards_list; + size_t count = 0; + uint64_t duration = tp == SIRIDB_SHARD_TP_NUMBER + ? siridb->duration_num + : siridb->duration_log; + + uv_mutex_lock(&siridb->shards_mutex); + + shards_list = imap_2vec_ref(siridb->shards); + + uv_mutex_unlock(&siridb->shards_mutex); + + if (shards_list == NULL) + return 1.0; /* error, return as if all were removed */ + + if (shards_list->len == 0) + { + vec_free(shards_list); + return 0.0; + } + + for (i = 0; i < shards_list->len; i++) + { + siridb_shard_t * shard = (siridb_shard_t *) shards_list->data[i]; + if (shard->tp != tp) + continue; + + count += ((shard->id - shard->id % duration) + duration) < end_ts; + } + + percent = count / shards_list->len; + vec_free(shards_list); + return percent; +} + /* * Returns true if fn is a shard filename, false if not. * Argument ext should be either ".sdb" or ".idx". diff --git a/src/siri/grammar/grammar.c b/src/siri/grammar/grammar.c index 12d90266..4aba9137 100644 --- a/src/siri/grammar/grammar.c +++ b/src/siri/grammar/grammar.c @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2020-01-21 16:05:35 + * Created at: 2020-01-23 14:08:47 */ #include "siri/grammar/grammar.h" @@ -1172,27 +1172,16 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) 4, k_set, k_expiration_num, - cleri_choice( - CLERI_NONE, - CLERI_FIRST_MATCH, - 2, - k_false, - time_expr - ), + time_expr, cleri_optional(CLERI_NONE, set_ignore_threshold) ); cleri_t * set_expiration_log = cleri_sequence( CLERI_GID_SET_EXPIRATION_LOG, - 3, + 4, k_set, k_expiration_log, - cleri_choice( - CLERI_NONE, - CLERI_FIRST_MATCH, - 2, - k_false, - time_expr - ) + time_expr, + cleri_optional(CLERI_NONE, set_ignore_threshold) ); cleri_t * alter_database = cleri_sequence( CLERI_GID_ALTER_DATABASE, @@ -1566,7 +1555,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_list(CLERI_NONE, cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 35, + 37, k_active_handles, k_active_tasks, k_buffer_path, @@ -1577,6 +1566,8 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_duration_log, k_duration_num, k_fifo_files, + k_expiration_log, + k_expiration_num, k_idle_percentage, k_idle_time, k_ip_support, diff --git a/src/siri/heartbeat.c b/src/siri/heartbeat.c index c8e58cb4..06011e17 100644 --- a/src/siri/heartbeat.c +++ b/src/siri/heartbeat.c @@ -51,26 +51,15 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused))) llist_node_t * siridb_node; llist_node_t * server_node; - uint64_t now_ts; - struct timespec now; - log_debug("Start heart-beat task"); - clock_gettime(CLOCK_REALTIME, &now); - siridb_node = siri.siridb_list->first; while (siridb_node != NULL) { siridb = (siridb_t *) siridb_node->data; - siridb->exp_at_num = siridb->expiration_num - ? siridb_time_now(siridb, now) - siridb->expiration_num - : 0; - - siridb->exp_at_log = siridb->expiration_log - ? siridb_time_now(siridb, now) - siridb->expiration_log - : 0; + siridb_update_shard_expiration(siridb); if ( siridb_tee_is_configured(siridb->tee) && !siridb_tee_is_connected(siridb->tee)) diff --git a/src/siri/optimize.c b/src/siri/optimize.c index 62454016..cfaf958d 100644 --- a/src/siri/optimize.c +++ b/src/siri/optimize.c @@ -262,6 +262,10 @@ static void OPTIMIZE_work(uv_work_t * work __attribute__((unused))) siridb_shard_t * shard; uint8_t c = siri.cfg->shard_compression; size_t i; + uint64_t expi[2]; + uint64_t dura[2]; + + log_info("Start optimize task"); @@ -303,6 +307,15 @@ static void OPTIMIZE_work(uv_work_t * work __attribute__((unused))) uv_mutex_unlock(&siridb->shards_mutex); + uv_mutex_lock(&siridb->values_mutex); + + dura[SIRIDB_SHARD_TP_NUMBER] = siridb->duration_num; + dura[SIRIDB_SHARD_TP_LOG] = siridb->duration_log; + expi[SIRIDB_SHARD_TP_NUMBER] = siridb->exp_at_num; + expi[SIRIDB_SHARD_TP_LOG] = siridb->exp_at_log; + + uv_mutex_unlock(&siridb->values_mutex); + if (slshards == NULL) { log_error("Error creating reference list for shards."); @@ -316,7 +329,16 @@ static void OPTIMIZE_work(uv_work_t * work __attribute__((unused))) { shard = (siridb_shard_t *) slshards->data[j]; - if (!siri_err && + if ((shard->id - shard->id % dura[shard->tp]) + dura[shard->tp] < + expi[shard->tp]) + { + log_info( + "Shard id %" PRIu64 " (%" PRIu8 ") is expired " + "and will be dropped", + shard->id, shard->flags); + siridb_shard_drop(shard, siridb); + } + else if (!siri_err && optimize.status != SIRI_OPTIMIZE_CANCELLED && ((shard->flags & SIRIDB_SHARD_NEED_OPTIMIZE) || ((!(shard->flags & SIRIDB_SHARD_IS_COMPRESSED)) == c)) &&