From: Jeroen van der Heijden Date: Tue, 21 Jan 2020 14:54:18 +0000 (+0100) Subject: Work on expire shard X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~5^2~24 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=7f904d23fd8c7fef8a15599cc000cd034a502fb4;p=siridb-server.git Work on expire shard --- diff --git a/grammar/grammar.py b/grammar/grammar.py index fea49e33..ac5d6011 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -111,14 +111,14 @@ class SiriGrammar(Grammar): k_max_open_files = Keyword('max_open_files') k_mean = Keyword('mean') k_median = Keyword('median') - k_median_low = Keyword('median_low') k_median_high = Keyword('median_high') + k_median_low = Keyword('median_low') k_mem_usage = Keyword('mem_usage') k_merge = Keyword('merge') k_min = Keyword('min') k_modify = Keyword('modify') - k_nan = Keyword('nan') k_name = Keyword('name') + k_nan = Keyword('nan') k_ninf = Sequence('-', k_inf) k_now = Keyword('now') k_number = Keyword('number') @@ -143,9 +143,11 @@ class SiriGrammar(Grammar): k_server = Keyword('server') k_servers = Keyword('servers') k_set = Keyword('set') - k_sid = Keyword('sid') + k_expiration_log = Keyword('expiration_log') + k_expiration_num = Keyword('expiration_num') k_shards = Keyword('shards') k_show = Keyword('show') + k_sid = Keyword('sid') k_size = Keyword('size') k_start = Keyword('start') k_startup_time = Keyword('startup_time') @@ -591,12 +593,26 @@ class SiriGrammar(Grammar): set_select_points_limit = Sequence( k_set, k_select_points_limit, r_uinteger) set_timezone = Sequence(k_set, k_timezone, string) + set_expiration_num = Sequence( + k_set, + k_expiration_num, + Choice( + k_false, + time_expr, + most_greedy=False), + Optional(set_ignore_threshold)) + set_expiration_log = Sequence(k_set, k_expiration_log, Choice( + k_false, + time_expr, + most_greedy=False)) alter_database = Sequence(k_database, Choice( set_drop_threshold, set_list_limit, set_select_points_limit, set_timezone, + set_expiration_num, + set_expiration_log, most_greedy=False)) alter_group = Sequence(k_group, group_name, Choice( diff --git a/include/siri/db/db.h b/include/siri/db/db.h index b5b5a3fe..2c045784 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -73,6 +73,10 @@ struct siridb_s struct timespec start_time; /* to calculate up-time. */ uint64_t duration_num; /* number duration in s, ms, us or ns */ uint64_t duration_log; /* log duration in s, ms, us or ns */ + uint64_t expire_at_num; /* UNIX time stamp in s, ms, us or ns */ + uint64_t expire_at_log; /* UNIX time stamp in s, ms, us or ns */ + uint64_t expiration_num; /* number duration in s, ms, us or ns */ + uint64_t expiration_log; /* log duration in s, ms, us or ns */ char * dbname; char * dbpath; double drop_threshold; diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 9f300fbe..4e703ea1 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -1080,12 +1080,12 @@ static void enter_set_expression(uv_async_t * handle) static void enter_set_ignore_threshold(uv_async_t * handle) { siridb_query_t * query = handle->data; - query_drop_t * q_drop = (query_drop_t *) query->data; + query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data; if ( query->nodes->node->children->next->next->node->children->node-> cl_obj->gid == CLERI_GID_K_TRUE) { - q_drop->flags |= QUERIES_IGNORE_DROP_THRESHOLD; + q_wrapper->flags |= QUERIES_IGNORE_DROP_THRESHOLD; } SIRIPARSER_NEXT_NODE diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index 290875ac..83b5111f 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -124,9 +124,10 @@ int siridb_shards_add_points( siridb_series_t * series, siridb_points_t * points) { + _Bool is_num = siridb_series_isnum(series); siridb_shard_t * shard; - uint64_t duration = siridb_series_isnum(series) ? - siridb->duration_num : siridb->duration_log; + uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log; + uint64_t expire = is_num ? siridb->expire_num : siridb->expire_log; uint64_t shard_start, shard_end, shard_id; uint_fast32_t start, end, num_chunks, pstart, pend; uint16_t chunk_sz; @@ -139,14 +140,20 @@ int siridb_shards_add_points( shard_end = shard_start + duration; shard_id = shard_start + series->mask; + for ( start = end; + end < points->len && points->data[end].ts < shard_end; + end++); + + if (shard_end < expire) + continue; + if ((shard = imap_get(siridb->shards, shard_id)) == NULL) { shard = siridb_shard_create( siridb, shard_id, duration, - siridb_series_isnum(series) ? - SIRIDB_SHARD_TP_NUMBER : SIRIDB_SHARD_TP_LOG, + is_num ? SIRIDB_SHARD_TP_NUMBER : SIRIDB_SHARD_TP_LOG, NULL); if (shard == NULL) { @@ -154,10 +161,6 @@ int siridb_shards_add_points( } } - for ( start = end; - end < points->len && points->data[end].ts < shard_end; - end++); - if (start != end) { size = end - start; diff --git a/src/siri/heartbeat.c b/src/siri/heartbeat.c index f0ed7780..b1ec11a1 100644 --- a/src/siri/heartbeat.c +++ b/src/siri/heartbeat.c @@ -51,14 +51,27 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused))) llist_node_t * siridb_node; llist_node_t * server_node; + uint64_t now_ts; + struct timespec now; + log_debug("Start heart-beat task"); + clock_gettime(CLOCK_REALTIME, &now); + siridb_node = siri.siridb_list->first; while (siridb_node != NULL) { siridb = (siridb_t *) siridb_node->data; + siridb->expire_at_num = siridb->expiration_num + ? siridb_time_now(siridb, now) - siridb->expiration_num + : 0; + + siridb->expire_at_log = siridb->expiration_log + ? siridb_time_now(siridb, now) - siridb->expiration_log + : 0; + if ( siridb_tee_is_configured(siridb->tee) && !siridb_tee_is_connected(siridb->tee)) {