Work on expire shard
authorJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 21 Jan 2020 14:54:18 +0000 (15:54 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 21 Jan 2020 14:54:18 +0000 (15:54 +0100)
grammar/grammar.py
include/siri/db/db.h
src/siri/db/listener.c
src/siri/db/shards.c
src/siri/heartbeat.c

index fea49e33a8bade4d2c6d26db4f6765ac9179136a..ac5d6011526f812c59b6c9723138a984ef3ec752 100644 (file)
@@ -111,14 +111,14 @@ class SiriGrammar(Grammar):
     k_max_open_files = Keyword('max_open_files')
     k_mean = Keyword('mean')
     k_median = Keyword('median')
-    k_median_low = Keyword('median_low')
     k_median_high = Keyword('median_high')
+    k_median_low = Keyword('median_low')
     k_mem_usage = Keyword('mem_usage')
     k_merge = Keyword('merge')
     k_min = Keyword('min')
     k_modify = Keyword('modify')
-    k_nan = Keyword('nan')
     k_name = Keyword('name')
+    k_nan = Keyword('nan')
     k_ninf = Sequence('-', k_inf)
     k_now = Keyword('now')
     k_number = Keyword('number')
@@ -143,9 +143,11 @@ class SiriGrammar(Grammar):
     k_server = Keyword('server')
     k_servers = Keyword('servers')
     k_set = Keyword('set')
-    k_sid = Keyword('sid')
+    k_expiration_log = Keyword('expiration_log')
+    k_expiration_num = Keyword('expiration_num')
     k_shards = Keyword('shards')
     k_show = Keyword('show')
+    k_sid = Keyword('sid')
     k_size = Keyword('size')
     k_start = Keyword('start')
     k_startup_time = Keyword('startup_time')
@@ -591,12 +593,26 @@ class SiriGrammar(Grammar):
     set_select_points_limit = Sequence(
         k_set, k_select_points_limit, r_uinteger)
     set_timezone = Sequence(k_set, k_timezone, string)
+    set_expiration_num = Sequence(
+        k_set,
+        k_expiration_num,
+        Choice(
+            k_false,
+            time_expr,
+            most_greedy=False),
+        Optional(set_ignore_threshold))
+    set_expiration_log = Sequence(k_set, k_expiration_log, Choice(
+        k_false,
+        time_expr,
+        most_greedy=False))
 
     alter_database = Sequence(k_database, Choice(
         set_drop_threshold,
         set_list_limit,
         set_select_points_limit,
         set_timezone,
+        set_expiration_num,
+        set_expiration_log,
         most_greedy=False))
 
     alter_group = Sequence(k_group, group_name, Choice(
index b5b5a3fe9491c86d8a9fc11a9965910c52c61b53..2c0457842f5d864a2f5f93c01e8d5e281ac9dc80 100644 (file)
@@ -73,6 +73,10 @@ struct siridb_s
     struct timespec start_time;     /* to calculate up-time.                */
     uint64_t duration_num;          /* number duration in s, ms, us or ns   */
     uint64_t duration_log;          /* log duration in s, ms, us or ns      */
+    uint64_t expire_at_num;         /* UNIX time stamp in s, ms, us or ns   */
+    uint64_t expire_at_log;         /* UNIX time stamp in s, ms, us or ns   */
+    uint64_t expiration_num;        /* number duration in s, ms, us or ns   */
+    uint64_t expiration_log;        /* log duration in s, ms, us or ns      */
     char * dbname;
     char * dbpath;
     double drop_threshold;
index 9f300fbe89beb950151a27cbbbccd73a802c38c4..4e703ea1e0a1be8fd332716a3298def50bdba177 100644 (file)
@@ -1080,12 +1080,12 @@ static void enter_set_expression(uv_async_t * handle)
 static void enter_set_ignore_threshold(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
-    query_drop_t * q_drop = (query_drop_t *) query->data;
+    query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
 
     if (    query->nodes->node->children->next->next->node->children->node->
             cl_obj->gid == CLERI_GID_K_TRUE)
     {
-        q_drop->flags |= QUERIES_IGNORE_DROP_THRESHOLD;
+        q_wrapper->flags |= QUERIES_IGNORE_DROP_THRESHOLD;
     }
 
     SIRIPARSER_NEXT_NODE
index 290875ac7826e148b67d8488ded4b448ea8e5826..83b5111fd19e0727ec72be69face55a79289ac5f 100644 (file)
@@ -124,9 +124,10 @@ int siridb_shards_add_points(
         siridb_series_t * series,
         siridb_points_t * points)
 {
+    _Bool is_num = siridb_series_isnum(series);
     siridb_shard_t * shard;
-    uint64_t duration = siridb_series_isnum(series) ?
-            siridb->duration_num : siridb->duration_log;
+    uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log;
+    uint64_t expire = is_num ? siridb->expire_num : siridb->expire_log;
     uint64_t shard_start, shard_end, shard_id;
     uint_fast32_t start, end, num_chunks, pstart, pend;
     uint16_t chunk_sz;
@@ -139,14 +140,20 @@ int siridb_shards_add_points(
         shard_end = shard_start + duration;
         shard_id = shard_start + series->mask;
 
+        for (   start = end;
+                end < points->len && points->data[end].ts < shard_end;
+                end++);
+
+        if (shard_end < expire)
+            continue;
+
         if ((shard = imap_get(siridb->shards, shard_id)) == NULL)
         {
             shard = siridb_shard_create(
                     siridb,
                     shard_id,
                     duration,
-                    siridb_series_isnum(series) ?
-                            SIRIDB_SHARD_TP_NUMBER : SIRIDB_SHARD_TP_LOG,
+                    is_num ? SIRIDB_SHARD_TP_NUMBER : SIRIDB_SHARD_TP_LOG,
                     NULL);
             if (shard == NULL)
             {
@@ -154,10 +161,6 @@ int siridb_shards_add_points(
             }
         }
 
-        for (   start = end;
-                end < points->len && points->data[end].ts < shard_end;
-                end++);
-
         if (start != end)
         {
             size = end - start;
index f0ed77804db34df93b33d82f578227ccf31fab4d..b1ec11a14fba957b57ba315a86c8989cc55837f7 100644 (file)
@@ -51,14 +51,27 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused)))
     llist_node_t * siridb_node;
     llist_node_t * server_node;
 
+    uint64_t now_ts;
+    struct timespec now;
+
     log_debug("Start heart-beat task");
 
+    clock_gettime(CLOCK_REALTIME, &now);
+
     siridb_node = siri.siridb_list->first;
 
     while (siridb_node != NULL)
     {
         siridb = (siridb_t *) siridb_node->data;
 
+        siridb->expire_at_num = siridb->expiration_num
+                ? siridb_time_now(siridb, now) - siridb->expiration_num
+                : 0;
+
+        siridb->expire_at_log = siridb->expiration_log
+                ? siridb_time_now(siridb, now) - siridb->expiration_log
+                : 0;
+
         if (    siridb_tee_is_configured(siridb->tee) &&
                 !siridb_tee_is_connected(siridb->tee))
         {