Work on expire shard
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 23 Jan 2020 16:11:05 +0000 (17:11 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 23 Jan 2020 16:11:05 +0000 (17:11 +0100)
14 files changed:
grammar/grammar.py
include/siri/db/db.h
include/siri/db/series.h
include/siri/db/shards.h
include/siri/grammar/grammar.h
src/siri/db/db.c
src/siri/db/insert.c
src/siri/db/listener.c
src/siri/db/props.c
src/siri/db/series.c
src/siri/db/shards.c
src/siri/grammar/grammar.c
src/siri/heartbeat.c
src/siri/optimize.c

index ac5d6011526f812c59b6c9723138a984ef3ec752..dd7ec0f07b51b18cc08bc3566e0594f9ca632cba 100644 (file)
@@ -596,15 +596,13 @@ class SiriGrammar(Grammar):
     set_expiration_num = Sequence(
         k_set,
         k_expiration_num,
-        Choice(
-            k_false,
-            time_expr,
-            most_greedy=False),
+       time_expr,        
+        Optional(set_ignore_threshold))
+    set_expiration_log = Sequence(
+        k_set,
+        k_expiration_log,
+       time_expr,        
         Optional(set_ignore_threshold))
-    set_expiration_log = Sequence(k_set, k_expiration_log, Choice(
-        k_false,
-        time_expr,
-        most_greedy=False))
 
     alter_database = Sequence(k_database, Choice(
         set_drop_threshold,
@@ -783,6 +781,8 @@ class SiriGrammar(Grammar):
         k_duration_log,
         k_duration_num,
         k_fifo_files,
+        k_expiration_log,
+        k_expiration_num,
         k_idle_percentage,
         k_idle_time,
         k_ip_support,
index 961338d16ed4a91e15d60451872770107fd34c11..3671a47f283aa4849355cf833fc2296e57a95d87 100644 (file)
@@ -8,7 +8,7 @@ typedef struct siridb_s siridb_t;
 
 #define SIRIDB_MAX_SIZE_ERR_MSG 1024
 #define SIRIDB_MAX_DBNAME_LEN 256  /*    255 + NULL     */
-#define SIRIDB_SCHEMA 5
+#define SIRIDB_SCHEMA 6
 #define SIRIDB_FLAG_REINDEXING 1
 #define SIRIDB_FLAG_DROPPED 2
 
@@ -52,6 +52,7 @@ int siridb_open_files(siridb_t * siridb);
 int siridb_save(siridb_t * siridb);
 void siridb__free(siridb_t * siridb);
 void siridb_drop(siridb_t * siridb);
+void siridb_update_shard_expiration(siridb_t * siridb);
 
 #define siridb_incref(siridb) siridb->ref++
 #define siridb_decref(_siridb) if (!--_siridb->ref) siridb__free(_siridb)
@@ -93,6 +94,7 @@ struct siridb_s
     imap_t * series_map;
     uv_mutex_t series_mutex;
     uv_mutex_t shards_mutex;
+    uv_mutex_t values_mutex;
     imap_t * shards;
     FILE * dropped_fp;
     qp_fpacker_t * store;
index c708489f8f36f917ac2e805afa669871dc9908c8..96f2d8f950cfc2a16d756e46839c5b10ccec24f2 100644 (file)
@@ -85,6 +85,7 @@ int siridb_series_add_idx(
         uint32_t pos,
         uint16_t len,
         uint16_t cinfo);
+void series_update_start_end(siridb_series_t * series);
 int siridb_series_add_point(
         siridb_t *__restrict siridb,
         siridb_series_t *__restrict series,
index 48b3736e31451f7f422b254fca48d22f46925376..773bdbfb5411abf1226da91f2ea2d2d06391cc53 100644 (file)
@@ -24,5 +24,9 @@ int siridb_shards_add_points(
         siridb_t * siridb,
         siridb_series_t * series,
         siridb_points_t * points);
+double siridb_shards_count_percent(
+        siridb_t * siridb,
+        uint64_t end_ts,
+        uint8_t tp);
 
 #endif  /* SIRIDB_SHARDS_H_ */
index 83ea352ab2cd59e36710277ada8417ad76186b7f..2dbe8283ec92da25cd60c35cbee181d2dd6283f4 100644 (file)
@@ -5,7 +5,7 @@
  * should be used with the libcleri module.
  *
  * Source class: SiriGrammar
- * Created at: 2020-01-21 16:05:35
+ * Created at: 2020-01-23 14:08:47
  */
 #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
 #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
index fb2c4bf597b6beb52f59f05c0b900afdd5474aa5..d9d1425e6e282f43a84e0ce7742776fe1019f8e8 100644 (file)
@@ -310,7 +310,8 @@ static int siridb__from_unpacker(
     if (    qp_schema.via.int64 == 1 ||
             qp_schema.via.int64 == 2 ||
             qp_schema.via.int64 == 3 ||
-            qp_schema.via.int64 == 4)
+            qp_schema.via.int64 == 4 ||
+            qp_schema.via.int64 == 5)
     {
         log_info(
                 "Found an old database schema (v%d), "
@@ -492,6 +493,24 @@ static int siridb__from_unpacker(
             READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.")
         }
     }
+    if (qp_schema.via.int64 >= 6)
+    {
+        /* read select points limit */
+        if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0)
+        {
+            READ_DB_EXIT_WITH_ERROR(
+                    "Cannot read shard (log) expiration time.")
+        }
+        (*siridb)->expiration_log = qp_obj.via.int64;
+
+        /* read list limit */
+        if (qp_next(unpacker, &qp_obj) != QP_INT64 || qp_obj.via.int64 < 0)
+        {
+            READ_DB_EXIT_WITH_ERROR(
+                    "Cannot read shard (number) expiration time.")
+        }
+        (*siridb)->expiration_num = qp_obj.via.int64;
+    }
     if ((*siridb)->tee->pipe_name_ == NULL)
     {
         log_debug(
@@ -651,6 +670,8 @@ int siridb_save(siridb_t * siridb)
             (siridb->tee->pipe_name_ == NULL
                 ? qp_fadd_type(fpacker, QP_NULL)
                 : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) ||
+            qp_fadd_int64(fpacker, siridb->expiration_log) ||
+            qp_fadd_int64(fpacker, siridb->expiration_num) ||
             qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
             qp_close(fpacker));
 }
@@ -761,6 +782,7 @@ void siridb__free(siridb_t * siridb)
 
     uv_mutex_destroy(&siridb->series_mutex);
     uv_mutex_destroy(&siridb->shards_mutex);
+    uv_mutex_destroy(&siridb->values_mutex);
 
     if (siridb->flags & SIRIDB_FLAG_DROPPED)
     {
@@ -808,6 +830,25 @@ void siridb_drop(siridb_t * siridb)
     siridb_decref(siridb);
 }
 
+void siridb_update_shard_expiration(siridb_t * siridb)
+{
+    struct timespec now;
+    clock_gettime(CLOCK_REALTIME, &now);
+
+    uv_mutex_lock(&siridb->values_mutex);
+
+    siridb->exp_at_num = siridb->expiration_num
+            ? siridb_time_now(siridb, now) - siridb->expiration_num
+            : 0;
+
+    siridb->exp_at_log = siridb->expiration_log
+            ? siridb_time_now(siridb, now) - siridb->expiration_log
+            : 0;
+
+    uv_mutex_unlock(&siridb->values_mutex);
+}
+
+
 /*
  * Returns NULL and raises a SIGNAL in case an error has occurred.
  */
@@ -843,6 +884,10 @@ static siridb_t * siridb__new(void)
     siridb->groups = NULL;
     siridb->dropped_fp = NULL;
     siridb->store = NULL;
+    siridb->exp_at_log = 0;
+    siridb->exp_at_num = 0;
+    siridb->expiration_log = 0;
+    siridb->expiration_num = 0;
 
     siridb->series = ct_new();
     if (siridb->series == NULL)
@@ -876,6 +921,7 @@ static siridb_t * siridb__new(void)
 
     uv_mutex_init(&siridb->series_mutex);
     uv_mutex_init(&siridb->shards_mutex);
+    uv_mutex_init(&siridb->values_mutex);
 
     return siridb;
 
index f2faf9e549bfa8e7dceb1bed84300e2c0293e268..e67b7913394432a472cb5dee4335df14810cb6c6 100644 (file)
@@ -638,6 +638,14 @@ static int8_t INSERT_local_work(
             }
         }
 
+        if (series->length == 0)
+        {
+            if (siridb_series_drop(siridb, series))
+            {
+                siridb_series_flush_dropped(siridb);
+            }
+        }
+
         if (tp == QP_ARRAY_CLOSE)
         {
             qp_next(unpacker, qp_series_name);
@@ -852,6 +860,15 @@ static int INSERT_local_work_test(
                 siridb_points_free((siridb_points_t *) *pcache);
                 *pcache = NULL;
             }
+
+        }
+
+        if (series->length == 0)
+        {
+            if (siridb_series_drop(siridb, series))
+            {
+                siridb_series_flush_dropped(siridb);
+            }
         }
 
         if (tp == QP_ARRAY_CLOSE)
index 4e703ea1e0a1be8fd332716a3298def50bdba177..94b80ec544c6afbf9661c4cbfb2c7368d176c960 100644 (file)
@@ -20,6 +20,7 @@
 #include <siri/db/server.h>
 #include <siri/db/servers.h>
 #include <siri/db/shard.h>
+#include <siri/db/shards.h>
 #include <siri/db/user.h>
 #include <siri/db/users.h>
 #include <siri/db/listener.h>
@@ -149,6 +150,8 @@ if (IS_MASTER && siridb_is_reindexing(siridb))                              \
     "Successfully changed drop_threshold from %g to %g."
 #define MSG_SUCCESS_SET_LIST_LIMIT \
     "Successfully changed list limit from %" PRIu32 " to %" PRIu32 "."
+#define MSG_SUCCESS_SET_EXPIRATION \
+    "Successfully changed expiration from %" PRIu64 " to %" PRIu64 "."
 #define MSG_SUCCESS_SET_ADDR_PORT \
     "Successfully changed server address to '%s'."
 #define MSG_SUCCESS_DROP_SERVER \
@@ -248,6 +251,8 @@ static void exit_series_parentheses(uv_async_t * handle);
 static void exit_set_address(uv_async_t * handle);
 static void exit_set_backup_mode(uv_async_t * handle);
 static void exit_set_drop_threshold(uv_async_t * handle);
+static void exit_set_expiration_log(uv_async_t * handle);
+static void exit_set_expiration_num(uv_async_t * handle);
 static void exit_set_list_limit(uv_async_t * handle);
 static void exit_set_log_level(uv_async_t * handle);
 static void exit_set_port(uv_async_t * handle);
@@ -483,6 +488,8 @@ void siridb_init_listener(void)
     siridb_listen_exit[CLERI_GID_SET_ADDRESS] = exit_set_address;
     siridb_listen_exit[CLERI_GID_SET_BACKUP_MODE] = exit_set_backup_mode;
     siridb_listen_exit[CLERI_GID_SET_DROP_THRESHOLD] = exit_set_drop_threshold;
+    siridb_listen_exit[CLERI_GID_SET_EXPIRATION_LOG] = exit_set_expiration_log;
+    siridb_listen_exit[CLERI_GID_SET_EXPIRATION_NUM] = exit_set_expiration_num;
     siridb_listen_exit[CLERI_GID_SET_LIST_LIMIT] = exit_set_list_limit;
     siridb_listen_exit[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level;
     siridb_listen_exit[CLERI_GID_SET_PORT] = exit_set_port;
@@ -3779,6 +3786,123 @@ static void exit_set_drop_threshold(uv_async_t * handle)
     }
 }
 
+static void exit_set_expiration_xxx(
+        uv_async_t * handle,
+        uint64_t * expirep,
+        uint8_t tp)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+
+    MASTER_CHECK_ACCESSIBLE(siridb)
+    MASTER_CHECK_VERSION(siridb, "2.0.35")
+
+    cleri_node_t * node = query->nodes->node->children->next->next->node;
+    uint64_t expiration = (uint64_t) CLERI_NODE_DATA(node);
+
+    if (IS_MASTER && expiration)
+    {
+        struct timespec now;
+        clock_gettime(CLOCK_REALTIME, &now);
+        now.tv_sec -= (3600*24);  /* remove one dat to be save */
+        uint64_t now_ts = siridb_time_now(siridb, now);
+
+        if (expiration >= now_ts)
+        {
+            snprintf(query->err_msg,
+                    SIRIDB_MAX_SIZE_ERR_MSG,
+                    "Shard expiration time should be a value greater "
+                    "than or equal to zero (0) and smaller "
+                    "than %"PRIu64" but got %" PRId64,
+                    now_ts, (int64_t) CLERI_NODE_DATA(node));
+            siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            return;
+        }
+
+        query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
+        double percent = siridb_shards_count_percent(
+                siridb,
+                now_ts - expiration,
+                tp);
+
+        if ((~q_wrapper->flags & QUERIES_IGNORE_DROP_THRESHOLD) &&
+            percent >= siridb->drop_threshold)
+        {
+            snprintf(query->err_msg,
+                    SIRIDB_MAX_SIZE_ERR_MSG,
+                    "This query would drop %0.2f%% of the shards in pool %u. "
+                    "Add \'set ignore_threshold true\' to the query "
+                    "statement if you really want to do this.",
+                    percent * 100,
+                    siridb->server->pool);
+            siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            return;
+        }
+    }
+
+    uint64_t old = *expirep;
+    *expirep = expiration;
+
+    siridb_update_shard_expiration(siridb);
+
+    if (siridb_save(siridb))
+    {
+        snprintf(query->err_msg,
+                SIRIDB_MAX_SIZE_ERR_MSG,
+                "Error while saving database changes!");
+        siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+    }
+    else
+    {
+        QP_ADD_SUCCESS
+
+        log_info(
+                MSG_SUCCESS_SET_EXPIRATION,
+                old,
+                *expirep);
+
+        qp_add_fmt_safe(query->packer,
+                MSG_SUCCESS_SET_EXPIRATION,
+                old,
+                *expirep);
+
+        if (IS_MASTER)
+        {
+            siridb_query_forward(
+                    handle,
+                    SIRIDB_QUERY_FWD_UPDATE,
+                    (sirinet_promises_cb) on_update_xxx_response,
+                    0);
+        }
+        else
+        {
+            SIRIPARSER_ASYNC_NEXT_NODE
+        }
+    }
+}
+static void exit_set_expiration_log(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+
+    exit_set_expiration_xxx(
+            handle,
+            &siridb->expiration_log,
+            SIRIDB_SHARD_TP_LOG);
+}
+
+
+static void exit_set_expiration_num(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+
+    exit_set_expiration_xxx(
+            handle,
+            &siridb->expiration_num,
+            SIRIDB_SHARD_TP_NUMBER);
+}
+
 static void exit_set_list_limit(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
index 5ac92a5cb153bd6932543ba62e0cd6908b4d7c51..02cee86fdc2b95253a65d6516268c593ae6000d0 100644 (file)
@@ -64,6 +64,14 @@ static void prop_duration_num(
         siridb_t * siridb,
         qp_packer_t * packer,
         int map);
+static void prop_expiration_log(
+        siridb_t * siridb,
+        qp_packer_t * packer,
+        int map);
+static void prop_expiration_num(
+        siridb_t * siridb,
+        qp_packer_t * packer,
+        int map);
 static void prop_fifo_files(
         siridb_t * siridb,
         qp_packer_t * packer,
@@ -200,6 +208,10 @@ void siridb_init_props(void)
             prop_duration_num;
     siridb_props[CLERI_GID_K_FIFO_FILES - KW_OFFSET] =
             prop_fifo_files;
+    siridb_props[CLERI_GID_K_EXPIRATION_LOG - KW_OFFSET] =
+            prop_expiration_log;
+    siridb_props[CLERI_GID_K_EXPIRATION_NUM - KW_OFFSET] =
+            prop_expiration_num;
     siridb_props[CLERI_GID_K_IDLE_PERCENTAGE - KW_OFFSET] =
             prop_idle_percentage;
     siridb_props[CLERI_GID_K_IDLE_TIME - KW_OFFSET] =
@@ -342,6 +354,38 @@ static void prop_fifo_files(
     qp_add_int64(packer, (int64_t) siridb_fifo_size(siridb->fifo));
 }
 
+static void prop_expiration_log(
+        siridb_t * siridb,
+        qp_packer_t * packer,
+        int map)
+{
+    SIRIDB_PROP_MAP("expiration_log", 14)
+    if (siridb->expiration_log)
+    {
+        qp_add_int64(packer, (int64_t) siridb->expiration_log);
+    }
+    else
+    {
+        qp_add_null(packer);
+    }
+}
+
+static void prop_expiration_num(
+        siridb_t * siridb,
+        qp_packer_t * packer,
+        int map)
+{
+    SIRIDB_PROP_MAP("expiration_num", 14)
+    if (siridb->expiration_num)
+    {
+        qp_add_int64(packer, (int64_t) siridb->expiration_num);
+    }
+    else
+    {
+        qp_add_null(packer);
+    }
+}
+
 static void prop_idle_percentage(
         siridb_t * siridb,
         qp_packer_t * packer,
index 3dce1c73c4affed649a8e3a356b827021ece6001..edb835864b94d86054db95f484e5ced5b068ab4d 100644 (file)
@@ -103,6 +103,12 @@ int siridb_series_cexpr_cb(siridb_series_t * series, cexpr_condition_t * cond)
     return -1;
 }
 
+void series_update_start_end(siridb_series_t * series)
+{
+    SERIES_update_start(series);
+    SERIES_update_end(series);
+}
+
 /*
  * Returns 0 if successful; -1 and a SIGNAL is raised in case an error occurred.
  *
index a2c1f4bd1b8964697d2f1a4a71c2e143233c06d6..0e729b9ae7a322c2d8f5fe40aeffa2824f738d63 100644 (file)
@@ -126,8 +126,14 @@ int siridb_shards_add_points(
 {
     _Bool is_num = siridb_series_isnum(series);
     siridb_shard_t * shard;
+
+    uv_mutex_lock(&siridb->values_mutex);
+
     uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log;
     uint64_t expire_at = is_num ? siridb->exp_at_num : siridb->exp_at_log;
+
+    uv_mutex_unlock(&siridb->values_mutex);
+
     uint64_t shard_start, shard_end, shard_id;
     uint_fast32_t start, end, num_chunks, pstart, pend;
     uint16_t chunk_sz;
@@ -145,7 +151,11 @@ int siridb_shards_add_points(
                 end++);
 
         if (shard_end < expire_at)
+        {
+            series->length -= end - start;
+            series_update_start_end(series);
             continue;
+        }
 
         if ((shard = imap_get(siridb->shards, shard_id)) == NULL)
         {
@@ -219,6 +229,48 @@ int siridb_shards_add_points(
     return siri_err;
 }
 
+double siridb_shards_count_percent(
+        siridb_t * siridb,
+        uint64_t end_ts,
+        uint8_t tp)
+{
+    double percent;
+    size_t i;
+    vec_t * shards_list;
+    size_t count = 0;
+    uint64_t duration = tp == SIRIDB_SHARD_TP_NUMBER
+            ? siridb->duration_num
+            : siridb->duration_log;
+
+    uv_mutex_lock(&siridb->shards_mutex);
+
+    shards_list = imap_2vec_ref(siridb->shards);
+
+    uv_mutex_unlock(&siridb->shards_mutex);
+
+    if (shards_list == NULL)
+        return 1.0;  /* error, return as if all were removed */
+
+    if (shards_list->len == 0)
+    {
+        vec_free(shards_list);
+        return 0.0;
+    }
+
+    for (i = 0; i < shards_list->len; i++)
+    {
+        siridb_shard_t * shard = (siridb_shard_t *) shards_list->data[i];
+        if (shard->tp != tp)
+            continue;
+
+        count += ((shard->id - shard->id % duration) + duration) < end_ts;
+    }
+
+    percent = count / shards_list->len;
+    vec_free(shards_list);
+    return percent;
+}
+
 /*
  * Returns true if fn is a shard filename, false if not.
  * Argument ext should be either ".sdb" or ".idx".
index 12d90266a778b66d3d07b369cbe19687e07854f8..4aba91376121d95ca37b2675c5102c0acb795bdf 100644 (file)
@@ -5,7 +5,7 @@
  * should be used with the libcleri module.
  *
  * Source class: SiriGrammar
- * Created at: 2020-01-21 16:05:35
+ * Created at: 2020-01-23 14:08:47
  */
 
 #include "siri/grammar/grammar.h"
@@ -1172,27 +1172,16 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         4,
         k_set,
         k_expiration_num,
-        cleri_choice(
-            CLERI_NONE,
-            CLERI_FIRST_MATCH,
-            2,
-            k_false,
-            time_expr
-        ),
+        time_expr,
         cleri_optional(CLERI_NONE, set_ignore_threshold)
     );
     cleri_t * set_expiration_log = cleri_sequence(
         CLERI_GID_SET_EXPIRATION_LOG,
-        3,
+        4,
         k_set,
         k_expiration_log,
-        cleri_choice(
-            CLERI_NONE,
-            CLERI_FIRST_MATCH,
-            2,
-            k_false,
-            time_expr
-        )
+        time_expr,
+        cleri_optional(CLERI_NONE, set_ignore_threshold)
     );
     cleri_t * alter_database = cleri_sequence(
         CLERI_GID_ALTER_DATABASE,
@@ -1566,7 +1555,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
         cleri_list(CLERI_NONE, cleri_choice(
             CLERI_NONE,
             CLERI_FIRST_MATCH,
-            35,
+            37,
             k_active_handles,
             k_active_tasks,
             k_buffer_path,
@@ -1577,6 +1566,8 @@ cleri_grammar_t * compile_siri_grammar_grammar(void)
             k_duration_log,
             k_duration_num,
             k_fifo_files,
+            k_expiration_log,
+            k_expiration_num,
             k_idle_percentage,
             k_idle_time,
             k_ip_support,
index c8e58cb45ea8e3eb27dc9fba1249956e05f1d914..06011e1725f45303095f8e76e3519e1cf5b694ad 100644 (file)
@@ -51,26 +51,15 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused)))
     llist_node_t * siridb_node;
     llist_node_t * server_node;
 
-    uint64_t now_ts;
-    struct timespec now;
-
     log_debug("Start heart-beat task");
 
-    clock_gettime(CLOCK_REALTIME, &now);
-
     siridb_node = siri.siridb_list->first;
 
     while (siridb_node != NULL)
     {
         siridb = (siridb_t *) siridb_node->data;
 
-        siridb->exp_at_num = siridb->expiration_num
-                ? siridb_time_now(siridb, now) - siridb->expiration_num
-                : 0;
-
-        siridb->exp_at_log = siridb->expiration_log
-                ? siridb_time_now(siridb, now) - siridb->expiration_log
-                : 0;
+        siridb_update_shard_expiration(siridb);
 
         if (    siridb_tee_is_configured(siridb->tee) &&
                 !siridb_tee_is_connected(siridb->tee))
index 624540165b8af39e701a1eded09d3a71dff8a321..cfaf958d93ab252c36e5b085ad2818f5dd74b92e 100644 (file)
@@ -262,6 +262,10 @@ static void OPTIMIZE_work(uv_work_t * work  __attribute__((unused)))
     siridb_shard_t * shard;
     uint8_t c = siri.cfg->shard_compression;
     size_t i;
+    uint64_t expi[2];
+    uint64_t dura[2];
+
+
 
     log_info("Start optimize task");
 
@@ -303,6 +307,15 @@ static void OPTIMIZE_work(uv_work_t * work  __attribute__((unused)))
 
         uv_mutex_unlock(&siridb->shards_mutex);
 
+        uv_mutex_lock(&siridb->values_mutex);
+
+        dura[SIRIDB_SHARD_TP_NUMBER] = siridb->duration_num;
+        dura[SIRIDB_SHARD_TP_LOG] = siridb->duration_log;
+        expi[SIRIDB_SHARD_TP_NUMBER] = siridb->exp_at_num;
+        expi[SIRIDB_SHARD_TP_LOG] = siridb->exp_at_log;
+
+        uv_mutex_unlock(&siridb->values_mutex);
+
         if (slshards == NULL)
         {
             log_error("Error creating reference list for shards.");
@@ -316,7 +329,16 @@ static void OPTIMIZE_work(uv_work_t * work  __attribute__((unused)))
         {
             shard = (siridb_shard_t *) slshards->data[j];
 
-            if (!siri_err &&
+            if ((shard->id - shard->id % dura[shard->tp]) + dura[shard->tp] <
+                    expi[shard->tp])
+            {
+                log_info(
+                        "Shard id %" PRIu64 " (%" PRIu8 ") is expired "
+                        "and will be dropped",
+                        shard->id, shard->flags);
+                siridb_shard_drop(shard, siridb);
+            }
+            else if (!siri_err &&
                 optimize.status != SIRI_OPTIMIZE_CANCELLED &&
                 ((shard->flags & SIRIDB_SHARD_NEED_OPTIMIZE) ||
                     ((!(shard->flags & SIRIDB_SHARD_IS_COMPRESSED)) == c)) &&