Work on autoshard
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 10 Sep 2020 14:30:31 +0000 (16:30 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 10 Sep 2020 14:30:31 +0000 (16:30 +0200)
include/siri/db/series.h
include/siri/db/series.inline.h [new file with mode: 0644]
include/siri/version.h
src/siri/db/listener.c
src/siri/db/series.c
src/siri/db/shard.c
src/siri/db/shards.c
src/siri/optimize.c

index d3aa23c92b35e3acf3a7104d4236d25ed0c7e130..0e051aa8796c005699885e093190982045607592 100644 (file)
@@ -65,7 +65,6 @@ struct siridb_series_s
     uint32_t length;
     uint32_t idx_len;
     long int bf_offset;
-    uint64_t interval;
     siridb_points_t * buffer;
     char * name;
     idx_t * idx;
@@ -152,6 +151,4 @@ struct idx_s
     uint64_t end_ts;
 };
 
-
-
 #endif  /* SIRIDB_SERIES_H_ */
diff --git a/include/siri/db/series.inline.h b/include/siri/db/series.inline.h
new file mode 100644 (file)
index 0000000..5361cbf
--- /dev/null
@@ -0,0 +1,7 @@
+#include <siri/db/series.h>
+#include <siri/db/shard.h>
+
+static inline size_t siridb_series_duration(siridb_series_t * series)
+{
+    return series->idx_len ? series->idx->shard->duration : 0;
+}
index 26b90996afe8cc7fe5e0f4021096dad3197226b7..b834c833d55d0b166a6b9673c755e5d62dd74295 100644 (file)
@@ -15,7 +15,7 @@
  * Note that debian alpha packages should use versions like this:
  *   2.0.34-0alpha0
  */
-#define SIRIDB_VERSION_PRE_RELEASE "-alpha-0"
+#define SIRIDB_VERSION_PRE_RELEASE "-alpha-1"
 
 #ifndef NDEBUG
 #define SIRIDB_VERSION_BUILD_RELEASE "+debug"
index 4141cfbb93136f5c7902a071019c93c55f2ec9f0..593860914a2c13b6bdad709f9dfa392b3cb3e03e 100644 (file)
@@ -2417,8 +2417,7 @@ static void exit_count_shards(uv_async_t * handle)
             vshard.shard = (siridb_shard_t *) shards_list->data[i];
 
             /* set start and end properties */
-            duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                    siridb->duration_num : siridb->duration_log;
+            duration = vshard.shard->duration;
             vshard.start = vshard.shard->id - vshard.shard->id % duration;
             vshard.end = vshard.start + duration;
 
@@ -2481,8 +2480,8 @@ static void exit_count_shards_size(uv_async_t * handle)
         vshard.shard = (siridb_shard_t *) shards_list->data[i];
 
         /* set start and end properties */
-        duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                siridb->duration_num : siridb->duration_log;
+        duration = vshard.shard->duration;
+
         vshard.start = vshard.shard->id - vshard.shard->id % duration;
         vshard.end = vshard.start + duration;
 
@@ -2950,8 +2949,7 @@ static void exit_drop_shards(uv_async_t * handle)
             vshard.shard = (siridb_shard_t *) q_drop->shards_list->data[i];
 
             /* set start and end properties */
-            duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                    siridb->duration_num : siridb->duration_log;
+            duration = vshard.shard->duration;
 
             vshard.start = vshard.shard->id - vshard.shard->id % duration;
             vshard.end = vshard.start + duration;
@@ -3510,8 +3508,8 @@ static void exit_list_shards(uv_async_t * handle)
         vshard.shard = (siridb_shard_t *) shards_list->data[i];
 
         /* set start and end properties */
-        duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                siridb->duration_num : siridb->duration_log;
+        duration = vshard.shard->duration;
+
         vshard.start = vshard.shard->id - vshard.shard->id % duration;
         vshard.end = vshard.start + duration;
 
index c7c6b9372fe211414f78dad1a56617948b7608e7..5dde94765c196d36d33a37d253e68e81e0a20655 100644 (file)
@@ -586,8 +586,8 @@ void siridb_series_remove_shard(
 {
     idx_t *__restrict idx;
     uint_fast32_t i, offset;
-    uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                siridb->duration_num : siridb->duration_log;
+    uint64_t start = shard->id - series->mask;
+    uint64_t end = start + shard->duration;
 
     i = offset = 0;
 
@@ -633,8 +633,6 @@ void siridb_series_remove_shard(
             {
                 series->idx = idx;
             }
-            uint64_t start = shard->id - series->mask;
-            uint64_t end = start + duration;
             if (series->start >= start && series->start < end)
             {
                 SERIES_update_start(series);
@@ -1353,7 +1351,6 @@ static siridb_series_t * SERIES_new(
             series->idx_len = 0;
             series->idx = NULL;
             series->siridb = siridb;
-            series->interval = 0;
 
             /* get sum series name to calculate series mask (for sharding) */
             for (n = 0; *name; name++)
index 4b5e1c6de964b628bd48dacc0221f3d489a16231..7a5ff7c6cef57ec24d8f53b1ba2ba43e94f28202 100644 (file)
@@ -1534,7 +1534,7 @@ void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb)
         else for (i = 0; i < vec->len; i++)
         {
             series = (siridb_series_t *) vec->data[i];
-            if (shard->id % siridb->duration_num == series->mask)
+            if (shard->id % shard->duration == series->mask)
             {
                 siridb_series_remove_shard(siridb, series, shard);
                 siridb_series_remove_shard(siridb, series, pop_shard);
@@ -1556,7 +1556,7 @@ void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb)
         else for (i = 0; i < vec->len; i++)
         {
             series = (siridb_series_t *) vec->data[i];
-            if (shard->id % siridb->duration_num == series->mask)
+            if (shard->id % shard->duration == series->mask)
             {
                 siridb_series_remove_shard(siridb, series, shard);
             }
@@ -1738,8 +1738,7 @@ static ssize_t SHARD_apply_idx(
                 (uint64_t) *((uint64_t *) (pt + 12)) :
                 (uint64_t) *((uint32_t *) (pt + 8));
         uint64_t start = shard->id - series->mask;
-        uint64_t end = start + ((shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                    siridb->duration_num : siridb->duration_log);
+        uint64_t end = start + shard->duration;
 
         if (start_ts < start || end_ts >= end)
         {
index b62a89f6396b5a84d210f0f90031812e318cdbbf..5fd4ae17f0d121d5d5f665f848c47e8c619adae1 100644 (file)
@@ -17,6 +17,7 @@
 #include <logger/logger.h>
 #include <siri/db/shard.h>
 #include <siri/db/shards.h>
+#include <siri/db/series.inline.h>
 #include <siri/db/misc.h>
 #include <siri/siri.h>
 #include <stdbool.h>
@@ -156,6 +157,7 @@ int siridb_shards_load(siridb_t * siridb)
                 &shard_id,
                 &duration))
         {
+            /* TODO: migration code, for backwards compatibility */
             continue;
         }
 
@@ -194,26 +196,24 @@ int siridb_shards_add_points(
     _Bool is_num = siridb_series_isnum(series);
     siridb_shard_t * shard;
     omap_t * shards;
+    uint64_t duration = siridb_series_duration(series);
 
     uv_mutex_lock(&siridb->values_mutex);
 
-    uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log;
     uint64_t expire_at = is_num ? siridb->exp_at_num : siridb->exp_at_log;
 
-    uv_mutex_unlock(&siridb->values_mutex);
-
-    if (series->interval == 0)
+    if (duration == 0)
     {
-        series->interval = siridb_points_get_interval(points);
+        uint64_t interval = siridb_points_get_interval(points);
 
-        if (series->interval == 0)
-        {
-            /* fall-back to default interval */
-            series->interval = siridb_shard_interval_from_duration(duration);
-        }
+        duration = interval
+            ? siridb_shard_duration_from_interval(siridb, interval)
+            : is_num
+            ? siridb->duration_num
+            : siridb->duration_log;
     }
 
-    duration = siridb_shard_duration_from_interval(siridb, series->interval);
+    uv_mutex_unlock(&siridb->values_mutex);
 
     uint64_t shard_start, shard_end, shard_id;
     uint_fast32_t start, end, num_chunks, pstart, pend;
index cfaf958d93ab252c36e5b085ad2818f5dd74b92e..b9fb1ccc46d6ca1a6d5ff4246ce365b0c889c7ab 100644 (file)
@@ -263,9 +263,6 @@ static void OPTIMIZE_work(uv_work_t * work  __attribute__((unused)))
     uint8_t c = siri.cfg->shard_compression;
     size_t i;
     uint64_t expi[2];
-    uint64_t dura[2];
-
-
 
     log_info("Start optimize task");
 
@@ -309,8 +306,6 @@ static void OPTIMIZE_work(uv_work_t * work  __attribute__((unused)))
 
         uv_mutex_lock(&siridb->values_mutex);
 
-        dura[SIRIDB_SHARD_TP_NUMBER] = siridb->duration_num;
-        dura[SIRIDB_SHARD_TP_LOG] = siridb->duration_log;
         expi[SIRIDB_SHARD_TP_NUMBER] = siridb->exp_at_num;
         expi[SIRIDB_SHARD_TP_LOG] = siridb->exp_at_log;
 
@@ -329,8 +324,7 @@ static void OPTIMIZE_work(uv_work_t * work  __attribute__((unused)))
         {
             shard = (siridb_shard_t *) slshards->data[j];
 
-            if ((shard->id - shard->id % dura[shard->tp]) + dura[shard->tp] <
-                    expi[shard->tp])
+            if ((shard->id - shard->id % shard->duration) + shard->duration < expi[shard->tp])
             {
                 log_info(
                         "Shard id %" PRIu64 " (%" PRIu8 ") is expired "