Work on auto sharding
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 10 Sep 2020 12:45:20 +0000 (14:45 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 10 Sep 2020 12:45:20 +0000 (14:45 +0200)
21 files changed:
Debug/makefile
Debug/sources.mk
Debug/src/omap/subdir.mk [new file with mode: 0644]
Release/makefile
Release/sources.mk
Release/src/omap/subdir.mk [new file with mode: 0644]
include/omap/omap.h [new file with mode: 0644]
include/siri/db/db.h
include/siri/db/points.h
include/siri/db/query.h
include/siri/db/series.h
include/siri/db/shard.h
include/siri/db/shards.h
src/omap/omap.c [new file with mode: 0644]
src/siri/db/db.c
src/siri/db/points.c
src/siri/db/series.c
src/siri/db/shard.c
src/siri/db/shards.c
test/test_siridb/sources
test/test_siridb/test_siridb.c

index c43b8139d85ac0b24de1e3827cb3c71b6cccf890..0a89e84eba1fa401f2acfcedca5f08f8a0900b38 100644 (file)
@@ -31,6 +31,7 @@ RM := rm -rf
 -include src/iso8601/subdir.mk
 -include src/lib/subdir.mk
 -include src/imap/subdir.mk
+-include src/omap/subdir.mk
 -include src/expr/subdir.mk
 -include src/ctree/subdir.mk
 -include src/cfgparser/subdir.mk
index 35930d3fcefa2299821ba8df11a1a4c2b2484783..02a3425b82edf321d3c5fceb05f61abb7adbbd97 100644 (file)
@@ -22,6 +22,7 @@ src/lib \
 src/llist \
 src/lock \
 src/logger \
+src/omap \
 src/owcrypt \
 src/procinfo \
 src/qpack \
diff --git a/Debug/src/omap/subdir.mk b/Debug/src/omap/subdir.mk
new file mode 100644 (file)
index 0000000..8a60ff0
--- /dev/null
@@ -0,0 +1,20 @@
+# Add inputs and outputs from these tool invocations to the build variables
+C_SRCS += \
+../src/omap/omap.c
+
+OBJS += \
+./src/omap/omap.o
+
+C_DEPS += \
+./src/omap/omap.d
+
+
+# Each subdirectory must supply rules for building sources it contributes
+src/omap/%.o: ../src/omap/%.c
+       @echo 'Building file: $<'
+       @echo 'Invoking: GCC C Compiler'
+       gcc -I../include -O0 -g3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
+       @echo 'Finished building: $<'
+       @echo ' '
+
+
index cb978b7ba3f5cc0fce119f08bfad2fb13bad52f4..497e41988a7761bc338d26317292800a93bb5aa4 100644 (file)
@@ -31,6 +31,7 @@ RM := rm -rf
 -include src/iso8601/subdir.mk
 -include src/lib/subdir.mk
 -include src/imap/subdir.mk
+-include src/omap/subdir.mk
 -include src/expr/subdir.mk
 -include src/ctree/subdir.mk
 -include src/cfgparser/subdir.mk
index 35930d3fcefa2299821ba8df11a1a4c2b2484783..02a3425b82edf321d3c5fceb05f61abb7adbbd97 100644 (file)
@@ -22,6 +22,7 @@ src/lib \
 src/llist \
 src/lock \
 src/logger \
+src/omap \
 src/owcrypt \
 src/procinfo \
 src/qpack \
diff --git a/Release/src/omap/subdir.mk b/Release/src/omap/subdir.mk
new file mode 100644 (file)
index 0000000..cd3db93
--- /dev/null
@@ -0,0 +1,20 @@
+# Add inputs and outputs from these tool invocations to the build variables
+C_SRCS += \
+../src/omap/omap.c
+
+OBJS += \
+./src/omap/omap.o
+
+C_DEPS += \
+./src/omap/omap.d
+
+
+# Each subdirectory must supply rules for building sources it contributes
+src/omap/%.o: ../src/omap/%.c
+       @echo 'Building file: $<'
+       @echo 'Invoking: GCC C Compiler'
+       gcc -DNDEBUG -I../include -O3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
+       @echo 'Finished building: $<'
+       @echo ' '
+
+
diff --git a/include/omap/omap.h b/include/omap/omap.h
new file mode 100644 (file)
index 0000000..7b5d5fd
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * util/omap.h
+ */
+#ifndef OMAP_H_
+#define OMAP_H_
+
+enum
+{
+    OMAP_ERR_EXIST  =-2,
+    OMAP_ERR_ALLOC  =-1,
+    OMAP_SUCCESS    =0
+};
+
+typedef struct omap_s omap_t;
+typedef struct omap__s omap__t;
+typedef struct omap__s * omap_iter_t;
+
+#include <inttypes.h>
+
+typedef void (*omap_destroy_cb)(void * data);
+
+/* private */
+struct omap__s
+{
+    omap__t * next_;
+    uint64_t id_;
+    void * data_;
+};
+
+omap_t * omap_create(void);
+void omap_destroy(omap_t * omap, omap_destroy_cb cb);
+int omap_add(omap_t * omap, uint64_t id, void * data);
+void * omap_set(omap_t * omap, uint64_t id, void * data);
+void * omap_get(omap_t * omap, uint64_t id);
+uint64_t * omap_last_id(omap_t * omap);
+void * omap_rm(omap_t * omap, uint64_t id);
+static inline omap_iter_t omap_iter(omap_t * omap);
+static inline uint64_t omap_iter_id(omap_iter_t iter);
+#define omap_each(iter__, dt__, var__) \
+        dt__ * var__; \
+        iter__ && \
+        (var__ = (dt__ *) iter__->data_); \
+        iter__ = iter__->next_
+
+struct omap_s
+{
+    omap__t * next_;
+    size_t n;
+};
+
+static inline omap_iter_t omap_iter(omap_t * omap)
+{
+    return omap->next_;
+}
+
+static inline uint64_t omap_iter_id(omap_iter_t iter)
+{
+    return iter->id_;
+}
+
+
+#endif  /* OMAP_H_ */
index bfdea944e996bab40da51f27c8b9b3c42cbdc6cb..be216938ad77540a65e377ec5953b5671b3e2d93 100644 (file)
@@ -96,7 +96,7 @@ struct siridb_s
     uv_mutex_t series_mutex;
     uv_mutex_t shards_mutex;
     uv_mutex_t values_mutex;
-    imap_t * shards;
+    imap_t * shards;                /* contains lists with shards */
     FILE * dropped_fp;
     qp_fpacker_t * store;
     siridb_fifo_t * fifo;
index 19172635499e8088e816e77ce39613e0d7dcf478..bee1fdd33bf5acb0dd410d84ed0cc34bcf5f3e25 100644 (file)
@@ -86,6 +86,7 @@ int siridb_points_unzip_string_raw(
         uint8_t * bits,
         uint16_t len);
 size_t siridb_points_get_size_zipped(uint16_t cinfo, uint16_t len);
+uint64_t siridb_points_get_interval(siridb_points_t * points);
 
 #define siridb_points_zip(p__, s__, e__, c__, z__) \
 ((p__)->tp == TP_INT) ? \
index fbe2ef761dd1ca714001d369c5d5275d3b32f648..d522b1a04c59a1b6398f2c599929dd0071fb3d84 100644 (file)
@@ -82,7 +82,7 @@ struct siridb_query_s
     qp_packer_t * timeit;
     cleri_parse_t * pr;
     siridb_nodes_t * nodes;
-    struct timespec start;
+    struct timespec start;SIRIDB_IS64BIT
 };
 
 #endif  /* SIRIDB_QUERY_H_ */
index 96f2d8f950cfc2a16d756e46839c5b10ccec24f2..d3aa23c92b35e3acf3a7104d4236d25ed0c7e130 100644 (file)
@@ -65,11 +65,13 @@ struct siridb_series_s
     uint32_t length;
     uint32_t idx_len;
     long int bf_offset;
+    uint64_t interval;
     siridb_points_t * buffer;
     char * name;
     idx_t * idx;
     siridb_t * siridb;
 };
+
 #include <siri/db/shard.h>
 
 int siridb_series_load(siridb_t * siridb);
index 85bacfa559e6cb24a4a9b8587e26b1d1713b3983..4d6813abeab14c915f0f1543a63fb284f735c249 100644 (file)
@@ -37,18 +37,22 @@ typedef struct siridb_shard_view_s siridb_shard_view_t;
 #include <siri/db/points.h>
 #include <siri/db/series.h>
 #include <siri/file/handler.h>
+#include <omap/omap.h>
 
 siridb_shard_t * siridb_shard_create(
         siridb_t * siridb,
+        omap_t * shards,
         uint64_t id,
         uint64_t duration,
         uint8_t tp,
         siridb_shard_t * replacing);
+uint64_t siridb_shard_duration_from_interval(siridb_t * siridb, uint64_t interval);
+uint64_t siridb_shard_interval_from_duration(uint64_t duration);
 int siridb_shard_cexpr_cb(
         siridb_shard_view_t * vshard,
         cexpr_condition_t * cond);
 int siridb_shard_status(char * str, siridb_shard_t * shard);
-int siridb_shard_load(siridb_t * siridb, uint64_t id);
+int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration);
 void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb);
 size_t siridb_shard_write_points(
         siridb_t * siridb,
@@ -113,13 +117,14 @@ struct siridb_shard_flags_repr_s
 
 struct siridb_shard_s
 {
-    uint32_t ref;   /* keep ref on top */
-    uint8_t tp; /* TP_NUMBER, TP_LOG */
+    uint32_t ref;       /* keep ref on top */
+    uint8_t tp;         /* TP_NUMBER, TP_LOG */
     uint8_t flags;
     uint16_t max_chunk_sz;
     uint64_t id;
-    size_t len;
-    size_t size;
+    size_t len;         /* size of the shard which is used */
+    size_t size;        /* size of shard on disk */
+    size_t duration;    /* based on the interval of series */
     siri_fp_t * fp;
     char * fn;
     siridb_shard_t * replacing;
index 773bdbfb5411abf1226da91f2ea2d2d06391cc53..ed7c64c32de8764594807ce04bdc00a7cf4e23cf 100644 (file)
@@ -18,7 +18,9 @@
 #define SIRIDB_SHARDS_PATH "shards/"
 
 #include <siri/db/db.h>
+#include <omap/omap.h>
 
+void siridb_shards_destroy_cb(omap_t * shards);
 int siridb_shards_load(siridb_t * siridb);
 int siridb_shards_add_points(
         siridb_t * siridb,
diff --git a/src/omap/omap.c b/src/omap/omap.c
new file mode 100644 (file)
index 0000000..999442f
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * util/omap.h
+ */
+#include <assert.h>
+#include <stdlib.h>
+#include <omap/omap.h>
+
+static void * omap__rm(omap_t * omap, omap__t ** omap_);
+static omap__t * omap__new(uint64_t id, void * data, omap__t * next);
+
+omap_t * omap_create(void)
+{
+    omap_t * omap = malloc(sizeof(omap_t));
+    if (!omap)
+    {
+        return NULL;
+    }
+
+    omap->next_ = NULL;
+    omap->n = 0;
+
+    return omap;
+}
+
+void omap_destroy(omap_t * omap, omap_destroy_cb cb)
+{
+    if (!omap)
+    {
+        return;
+    }
+    omap__t * cur = (omap__t *) omap;
+    omap__t * tmp;
+
+    for (; (tmp = cur->next_); cur = tmp)
+    {
+        if (cb && tmp)
+        {
+            (*cb)(tmp->data_);
+        }
+        free(cur);
+    }
+    free(cur);
+}
+
+/*
+ * In case of a duplicate id the return value is OMAP_ERR_EXIST and data
+ * will NOT be overwritten. On success the return value is OMAP_SUCCESS and
+ * if a memory error has occurred the return value is OMAP_ERR_ALLOC.
+ */
+int omap_add(omap_t * omap, uint64_t id, void * data)
+{
+    assert (omap);
+    assert (data);
+    omap__t * cur, * tmp;
+
+    for (   cur = (omap__t *) omap;
+            cur->next_ && cur->next_->id_ < id;
+            cur = cur->next_);
+
+    if (cur->next_ && cur->next_->id_ == id)
+    {
+        return OMAP_ERR_EXIST;
+    }
+
+    tmp = omap__new(id, data, cur->next_);
+    if (!tmp)
+    {
+        return OMAP_ERR_ALLOC;
+    }
+
+    omap->n++;
+    cur->next_ = tmp;
+
+    return OMAP_SUCCESS;
+}
+
+/*
+ * In case of a duplicate id the return value is the previous value and data
+ * will be overwritten. On success the return value is equal to void*data and
+ * if a memory error has occurred the return value is NULL.
+ */
+void * omap_set(omap_t * omap, uint64_t id, void * data)
+{
+    assert (omap);
+    assert (data);
+    omap__t * cur, * tmp;
+
+    for (   cur = (omap__t *) omap;
+            cur->next_ && cur->next_->id_ < id;
+            cur = cur->next_);
+
+    if (cur->next_ && cur->next_->id_ == id)
+    {
+        void * prev = cur->next_->data_;
+        cur->next_->data_ = data;
+        return prev;
+    }
+
+    tmp = omap__new(id, data, cur->next_);
+    if (!tmp)
+        return NULL;
+
+    omap->n++;
+    cur->next_ = tmp;
+
+    return data;
+}
+
+void * omap_get(omap_t * omap, uint64_t id)
+{
+    omap__t * cur = (omap__t *) omap;
+    while ((cur = cur->next_) && cur->id_ < id);
+
+    return cur && cur->id_ == id ? cur->data_ : NULL;
+}
+
+void * omap_rm(omap_t * omap, uint64_t id)
+{
+    omap__t * cur, * prev = (omap__t *) omap;
+    while ((cur = prev->next_) && cur->id_ < id)
+    {
+        prev = cur;
+    }
+
+    return cur && cur->id_ == id ? omap__rm(omap, &prev->next_) : NULL;
+}
+
+static void * omap__rm(omap_t * omap, omap__t ** omap_)
+{
+    omap__t * cur = *omap_;
+    void * data = cur->data_;
+    *omap_ = cur->next_;
+
+    free(cur);
+    --omap->n;
+
+    return data;
+}
+
+static omap__t * omap__new(uint64_t id, void * data, omap__t * next)
+{
+    omap__t * omap = malloc(sizeof(omap__t));
+    if (!omap)
+        return NULL;
+
+    omap->id_ = id;
+    omap->data_ = data;
+    omap->next_ = next;
+
+    return omap;
+}
index 24256b4687e221cf39acbbced073716ffeeda4f2..152cd12d74300458957d4041e713183961eb368d 100644 (file)
@@ -684,6 +684,7 @@ int siridb_save(siridb_t * siridb)
             qp_close(fpacker));
 }
 
+
 /*
  * Destroy SiriDB object.
  *
@@ -764,7 +765,7 @@ void siridb__free(siridb_t * siridb)
     /* free shards using imap walk an free the imap */
     if (siridb->shards != NULL)
     {
-        imap_free(siridb->shards, (imap_free_cb) &siridb__shard_decref);
+        imap_free(siridb->shards, (imap_free_cb) &siridb_shards_destroy_cb);
     }
 
     if (siridb->groups != NULL)
index 18788724ca0f4ba5872ea33f755e65df9d76a2b0..e672a776fb8991d9c5b1eabdb5117922775c0db9 100644 (file)
@@ -15,6 +15,7 @@
 #define POINTS_MAX_QSORT 250000
 #define RAW_VALUES_THRESHOLD 7
 #define DICT_SZ 0x3fff
+#define TOLERANCE_INTERVAL_DETECT 8
 
 static unsigned char * POINTS_zip_raw(
         siridb_points_t * points,
@@ -1686,6 +1687,47 @@ static int POINTS_set_cinfo_size(uint16_t * cinfo, size_t * size)
     return 0;
 }
 
+uint64_t siridb_points_get_interval(siridb_points_t * points)
+{
+    size_t i, j, n;
+    uint64_t * arr;
+    uint64_t x, a, b, c;
+
+    n = points->len - 1;
+    n = n > 63 ? 63 : n;
+    if (n < 7)
+    {
+        return 0;
+    }
+
+    arr = malloc(n * sizeof(uint64_t));
+    if (arr == NULL)
+    {
+        return 0;
+    }
+
+    for (i = 0; i < n; ++i)
+    {
+        x = points->data[i+1].ts - points->data[i].ts;
+        for (j = i; j > 0 && arr[j-1] > x; --j)
+        {
+            arr[j] = arr[j-1];
+        }
+        arr[j] = x;
+    }
+
+    a = n/4;
+    b = n/2;
+    c = arr[(b<<1)-a];
+    a = arr[a];
+    b = arr[b];
+    x = b / (100 / TOLERANCE_INTERVAL_DETECT);
+    x = (a+x < b || c-x > b) ? 0 : b;
+
+    free(arr);
+    return x;
+}
+
 inline static uint16_t POINTS_hash(uint32_t h)
 {
     return ((h >> 17) ^ (h & 0xffff)) & DICT_SZ;
index 1510b23772480a32c2afe15b889c84703d36ad87..c7c6b9372fe211414f78dad1a56617948b7608e7 100644 (file)
@@ -1039,9 +1039,7 @@ int siridb_series_optimize_shard(
     siridb_points_t *__restrict points;
     int rc;
     uint16_t cinfo = 0;
-    uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-                siridb->duration_num : siridb->duration_log;
-    max_ts = (shard->id + duration) - series->mask;
+    max_ts = (shard->id + shard->duration) - series->mask;
 
     rc = new_idx = end = i = size = start = 0;
 
@@ -1355,6 +1353,7 @@ static siridb_series_t * SERIES_new(
             series->idx_len = 0;
             series->idx = NULL;
             series->siridb = siridb;
+            series->interval = 0;
 
             /* get sum series name to calculate series mask (for sharding) */
             for (n = 0; *name; name++)
index 1cfa187847e202cc37ed57f47c1daeff89611a3f..4b5e1c6de964b628bd48dacc0221f3d489a16231 100644 (file)
@@ -32,6 +32,9 @@
 /* shard schema (schemas below 20 are reserved for Python SiriDB) */
 #define SIRIDB_SHARD_SHEMA 21
 
+/* optimal points in a single shard */
+#define OPTIMAL_POINTS_PER_SHARD 2000
+
 /*
  * Header schema layout
  *
@@ -130,11 +133,51 @@ static size_t SHARD_write_header(
         FILE * fp);
 static int SHARD_remove(siridb_shard_t * shard);
 
+uint64_t siridb_shard_duration_from_interval(siridb_t * siridb, uint64_t interval)
+{
+    uint64_t x, n, week, day, hour;
+
+    n = interval * OPTIMAL_POINTS_PER_SHARD;
+
+    if (n == siridb->duration_num)
+    {
+        return siridb->duration_num;
+    }
+
+    if (n == siridb->duration_log)
+    {
+        return siridb->duration_log;
+    }
+
+    week = 3600*24*7*siridb->time->factor;
+    x = n / week;
+    if (x)
+    {
+        return (x + 1) * week;
+    }
+
+    day = 3600*24*siridb->time->factor;
+    x = n / day;
+    if (x)
+    {
+        return (x + 1) * day;
+    }
+
+    hour = 3600*siridb->time->factor;
+    x = n / hour;
+    return (x + 1) * hour;
+}
+
+uint64_t siridb_shard_interval_from_duration(uint64_t duration)
+{
+    return duration / OPTIMAL_POINTS_PER_SHARD;;
+}
+
 /*
  * Returns 0 if successful or -1 in case of an error.
  * When an error occurs, a SIGNAL can be raised in some cases but not for sure.
  */
-int siridb_shard_load(siridb_t * siridb, uint64_t id)
+int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration)
 {
     int is_ts64;
     FILE * fp;
@@ -152,10 +195,13 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id)
         free(shard);
         return -1;  /* signal is raised */
     }
+
     shard->id = id;
     shard->ref = 1;
     shard->len = HEADER_SIZE;
     shard->replacing = NULL;
+    shard->duration = duration;
+
     if (SHARD_init_fn(siridb, shard) < 0)
     {
         ERR_ALLOC
@@ -163,7 +209,6 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id)
         return -1;  /* signal is raised */
     }
 
-
     log_info("Loading shard %" PRIu64, id);
 
     if ((fp = fopen(shard->fn, "r")) == NULL)
@@ -290,12 +335,15 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id)
  */
 siridb_shard_t *  siridb_shard_create(
         siridb_t * siridb,
+        omap_t * shards,
         uint64_t id,
         uint64_t duration,
         uint8_t tp,
         siridb_shard_t * replacing)
 {
     siridb_shard_t * shard = malloc(sizeof(siridb_shard_t));
+    FILE * fp;
+
     if (shard == NULL)
     {
         ERR_ALLOC
@@ -311,12 +359,12 @@ siridb_shard_t *  siridb_shard_create(
     shard->tp = tp;
     shard->replacing = replacing;
     shard->len = shard->size = HEADER_SIZE;
+    shard->duration = duration;
     shard->max_chunk_sz = (replacing == NULL) ?
             (tp == SIRIDB_SHARD_TP_NUMBER ?
                     DEFAULT_MAX_CHUNK_SZ_NUM : DEFAULT_MAX_CHUNK_SZ_LOG) :
                     replacing->max_chunk_sz;
 
-    FILE * fp;
     if (SHARD_init_fn(siridb, shard) < 0)
     {
         siridb_shard_decref(shard);
@@ -376,7 +424,7 @@ siridb_shard_t *  siridb_shard_create(
         return NULL;
     }
 
-    if (imap_set(siridb->shards, id, shard) == -1)
+    if (omap_set(shards, duration, shard) == NULL)
     {
         siridb_shard_decref(shard);
         ERR_ALLOC
@@ -579,6 +627,7 @@ size_t siridb_shard_write_points(
     {
         size_t p = 0;
         size_t ts_sz = siridb->time->ts_sz;
+
         cdata = malloc(dsize);
         if (cdata == NULL)
         {
@@ -1201,8 +1250,6 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
 {
     int rc = 0;
     siridb_shard_t * new_shard = NULL;
-    uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
-            siridb->duration_num : siridb->duration_log;
     siridb_series_t * series;
     size_t i;
 
@@ -1214,10 +1261,14 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
      */
     if (~shard->flags & SIRIDB_SHARD_IS_REMOVED)
     {
+        omap_t * shards = imap_get(siridb->shards, shard->id);
+        assert (shards);
+
         if ((new_shard = siridb_shard_create(
             siridb,
+            shards,
             shard->id,
-            duration,
+            shard->duration,
             shard->tp,
             shard)) == NULL)
         {
@@ -1289,7 +1340,7 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
 
         if (    !siri_err &&
                 siri.optimize->status != SIRI_OPTIMIZE_CANCELLED &&
-                shard->id % duration == series->mask &&
+                shard->id % shard->duration == series->mask &&
                 (~series->flags & SIRIDB_SERIES_IS_DROPPED) &&
                 (~new_shard->flags & SIRIDB_SHARD_IS_REMOVED))
         {
@@ -1874,12 +1925,12 @@ static inline int SHARD_init_fn(siridb_t * siridb, siridb_shard_t * shard)
 {
      return asprintf(
              &shard->fn,
-             "%s%s%s%" PRIu64 "%s",
+             "%s%s%s%016"PRIX64"_%016"PRIX64".sdb",
              siridb->dbpath,
              SIRIDB_SHARDS_PATH,
              (shard->replacing == NULL) ? "" : "__",
              shard->id,
-             ".sdb");
+             shard->duration);
 }
 
 /*
index 5eb1c95a918f158f38da8f64e7e17b32e77273ff..b62a89f6396b5a84d210f0f90031812e318cdbbf 100644 (file)
 #include <unistd.h>
 #include <siri/db/db.h>
 #include <xpath/xpath.h>
+#include <omap/omap.h>
 
-#define SIRIDB_MAX_SHARD_FN_LEN 23
+#define SIRIDB_SHARD_LEN 37
+
+
+static bool SHARDS_read_id_and_duration(
+        char * fn,
+        const char * ext,
+        uint64_t * shard_id,
+        uint64_t * duration)
+{
+    size_t n = strlen(fn);
+    char * tmp = NULL;
+
+    if (n != SIRIDB_SHARD_LEN)
+    {
+        return false;
+    }
+
+    *shard_id = strtoull(fn, &tmp, 16);
+    if (tmp == NULL)
+    {
+        return false;
+    }
+    fn = tmp;
+
+    if (*fn != '_')
+    {
+        return false;
+    }
+
+    *duration = strtoull(fn, &tmp, 16);
+    if (tmp == NULL)
+    {
+        return false;
+    }
+    fn = tmp;
+
+    return strcmp(fn, ext) == 0;
+}
+
+/*
+ * Returns true if fn is a temp shard or index filename, false if not.
+ */
+static bool SHARDS_is_temp_fn(char * fn)
+{
+    int i;
+    uint64_t shard_id, duration;
+    for (i = 0; i < 2; i++, fn++)
+    {
+        if (*fn != '_')
+        {
+            return false;
+        }
+    }
+
+    return (
+        SHARDS_read_id_and_duration(fn, ".sdb", &shard_id, &duration) ||
+        SHARDS_read_id_and_duration(fn, ".idx", &shard_id, &duration)
+    );
+}
 
-static bool is_shard_fn(const char * fn, const char * ext);
-static bool is_temp_fn(const char * fn);
 
 /*
  * Returns 0 if successful or -1 in case of an error.
@@ -41,6 +98,7 @@ int siridb_shards_load(siridb_t * siridb)
     struct dirent ** shard_list;
     char buffer[XPATH_MAX];
     int n, total, rc = 0;
+    uint64_t shard_id, duration;
 
     memset(&st, 0, sizeof(struct stat));
 
@@ -48,7 +106,7 @@ int siridb_shards_load(siridb_t * siridb)
 
     siridb_misc_get_fn(path, siridb->dbpath, SIRIDB_SHARDS_PATH);
 
-    if (strlen(path) >= XPATH_MAX - SIRIDB_MAX_SHARD_FN_LEN - 1)
+    if (strlen(path) >= XPATH_MAX - SIRIDB_SHARD_LEN - 1)
     {
         log_error("Shard path too long: '%s'", path);
         return -1;
@@ -77,7 +135,7 @@ int siridb_shards_load(siridb_t * siridb)
 
     for (n = 0; n < total; n++)
     {
-        if (is_temp_fn(shard_list[n]->d_name))
+        if (SHARDS_is_temp_fn(shard_list[n]->d_name))
         {
             snprintf(buffer, XPATH_MAX, "%s%s",
                    path, shard_list[n]->d_name);
@@ -92,13 +150,17 @@ int siridb_shards_load(siridb_t * siridb)
             }
         }
 
-        if (!is_shard_fn(shard_list[n]->d_name, ".sdb"))
+        if (!SHARDS_read_id_and_duration(
+                shard_list[n]->d_name,
+                ".sdb",
+                &shard_id,
+                &duration))
         {
             continue;
         }
 
         /* we are sure this fits since the filename is checked */
-        if (siridb_shard_load(siridb, (uint64_t) atoll(shard_list[n]->d_name)))
+        if (siridb_shard_load(siridb, shard_id, duration))
         {
            log_error("Error while loading shard: '%s'", shard_list[n]->d_name);
            rc = -1;
@@ -115,6 +177,11 @@ int siridb_shards_load(siridb_t * siridb)
     return rc;
 }
 
+void siridb_shards_destroy_cb(omap_t * shards)
+{
+    omap_destroy(shards, (omap_destroy_cb) &siridb__shard_decref);
+}
+
 /*
  * Returns siri_err which is 0 if successful or a negative integer in case
  * of an error. (a SIGNAL is also raised in case of an error)
@@ -126,6 +193,7 @@ int siridb_shards_add_points(
 {
     _Bool is_num = siridb_series_isnum(series);
     siridb_shard_t * shard;
+    omap_t * shards;
 
     uv_mutex_lock(&siridb->values_mutex);
 
@@ -134,6 +202,19 @@ int siridb_shards_add_points(
 
     uv_mutex_unlock(&siridb->values_mutex);
 
+    if (series->interval == 0)
+    {
+        series->interval = siridb_points_get_interval(points);
+
+        if (series->interval == 0)
+        {
+            /* fall-back to default interval */
+            series->interval = siridb_shard_interval_from_duration(duration);
+        }
+    }
+
+    duration = siridb_shard_duration_from_interval(siridb, series->interval);
+
     uint64_t shard_start, shard_end, shard_id;
     uint_fast32_t start, end, num_chunks, pstart, pend;
     uint16_t chunk_sz;
@@ -157,10 +238,28 @@ int siridb_shards_add_points(
             continue;
         }
 
-        if ((shard = imap_get(siridb->shards, shard_id)) == NULL)
+        shard = NULL;
+        shards = imap_get(siridb->shards, shard_id);
+        if (shards != NULL)
+        {
+            shard = omap_get(shards, duration);
+            /* shard may be NULL if no shard according the duration is found */
+        }
+        else
+        {
+            shards = omap_create();
+            if (shards == NULL || imap_add(siridb->shards, shard_id, shards))
+            {
+                ERR_ALLOC
+                return -1;  /* might leak a few bytes */
+            }
+        }
+
+        if (shard == NULL)
         {
             shard = siridb_shard_create(
                     siridb,
+                    shards,
                     shard_id,
                     duration,
                     is_num ? SIRIDB_SHARD_TP_NUMBER : SIRIDB_SHARD_TP_LOG,
@@ -197,8 +296,8 @@ int siridb_shards_add_points(
                         &cinfo)) == 0)
                 {
                     log_critical(
-                            "Could not write points to shard id %" PRIu64,
-                            shard->id);
+                            "Could not write points to shard '%s'",
+                            shard->fn);
                 }
                 else
                 {
@@ -274,39 +373,3 @@ double siridb_shards_count_percent(
     vec_free(shards_list);
     return percent;
 }
-
-/*
- * Returns true if fn is a shard filename, false if not.
- * Argument ext should be either ".sdb" or ".idx".
- */
-static bool is_shard_fn(const char * fn, const char * ext)
-{
-    if (!isdigit(*fn) || strlen(fn) > SIRIDB_MAX_SHARD_FN_LEN)
-    {
-        return false;
-    }
-
-    fn++;
-    while (*fn && isdigit(*fn))
-    {
-        fn++;
-    }
-
-    return (strcmp(fn, ext) == 0);
-}
-
-/*
- * Returns true if fn is a temp shard or index filename, false if not.
- */
-static bool is_temp_fn(const char * fn)
-{
-    int i;
-    for (i = 0; i < 2; i++, fn++)
-    {
-        if (*fn != '_')
-        {
-            return false;
-        }
-    }
-    return is_shard_fn(fn, ".sdb") || is_shard_fn(fn, ".idx");
-}
index f638f65393439f6548795259d6815c6b35afe480..7f4427a4d193e5c2ff8fb07dc154af652f06024d 100644 (file)
@@ -6,6 +6,7 @@
 ../src/qpack/qpack.c
 ../src/qpjson/qpjson.c
 ../src/imap/imap.c
+../src/omap/omap.c
 ../src/llist/llist.c
 ../src/logger/logger.c
 ../src/xstr/xstr.c
index 40771c770efab52585824a639c11f0eda05ef87e..d1161f40530f76f30f9794947503019272475fc2 100644 (file)
@@ -1,6 +1,7 @@
 #include "../test.h"
 #include <locale.h>
 #include <siri/db/series.h>
+#include <siri/db/shard.h>
 
 
 static int test_series_ensure_type(void)
@@ -107,6 +108,45 @@ static int test_series_ensure_type(void)
         _assert (strlen("-1") == qp_obj.len);
         _assert (strncmp("-1", qp_obj.via.str, qp_obj.len) == 0);
     }
+
+    /* test interval */
+    {
+        uint64_t interval, duration, test, w, d, h;
+        siridb_t siridb;
+        siridb.duration_num = 1000 * 3600 * 24 * 8;
+        siridb.duration_log = 1000 * 3600 * 41;
+        siridb.time = malloc(sizeof(siridb_time_t));
+        siridb.time->factor = 1000;
+
+        for (w = 1; w < 8; ++w)
+        {
+            duration = 3600 * 24 * 7 * w * siridb.time->factor;
+            interval = siridb_shard_interval_from_duration(&siridb, duration);
+            test = siridb_shard_duration_from_interval(&siridb, interval);
+            printf("%lu: %lu (%lu) %lu\n", i, duration, interval, test);
+            _assert (duration == test);
+        }
+
+        for (d = 1; d < 8; ++d)
+        {
+            duration = 3600 * 24 * d * siridb.time->factor;
+            interval = siridb_shard_interval_from_duration(&siridb, duration);
+            test = siridb_shard_duration_from_interval(&siridb, interval);
+            printf("%lu: %lu (%lu) %lu\n", i, duration, interval, test);
+            _assert (duration == test);
+        }
+
+        for (h = 1; h < 25; ++d)
+        {
+            duration = 3600 * h * siridb.time->factor;
+            interval = siridb_shard_interval_from_duration(&siridb, duration);
+            test = siridb_shard_duration_from_interval(&siridb, interval);
+            printf("%lu: %lu (%lu) %lu\n", i, duration, interval, test);
+            _assert (duration == test);
+        }
+
+        free(siridb.time);
+    }
     (void) setlocale(LC_ALL, NULL);
     return test_end();
 };