Fix check shard size and add configurable auto sharding
authorJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 15 Sep 2020 12:02:25 +0000 (14:02 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 15 Sep 2020 12:02:25 +0000 (14:02 +0200)
include/siri/cfg/cfg.h
itest/test_auto_duration.py
itest/testing/server.py
main.c
siridb.conf
src/siri/cfg/cfg.c
src/siri/db/shard.c
src/siri/db/shards.c
src/siri/evars.c

index 49d197c9fa3fe970d8489d0c506c669384e95453..725cbf850f535a5b8fb39dfc3eeb0acc088e7c23 100644 (file)
@@ -38,7 +38,7 @@ struct siri_cfg_s
     uint8_t pipe_support;
     uint8_t ip_support;
     uint8_t shard_compression;
-    uint8_t pad1_;
+    uint8_t shard_auto_duration;
 
     char * bind_client_addr;
     char * bind_backend_addr;
index a6c88aa9281fb8b598ec4d418167e4ab6ef810b6..fe9d04e50e3ee7e28bf32d88e1a78bf9dab1a84d 100644 (file)
@@ -22,10 +22,20 @@ from testing import UserAuthError
 from testing import parse_args
 
 
+TIME_PRECISION = 's'
+factor = 1
+
+
 class TestAutoDuration(TestBase):
-    title = 'Test select and aggregate functions'
+    title = 'Test auto duration'
 
-    @default_test_setup(2, compression=False, buffer_size=1024)
+    @default_test_setup(
+        2,
+        compression=True,
+        buffer_size=1024,
+        time_precision=TIME_PRECISION,
+        auto_duration=True,
+        optimize_interval=20)
     async def run(self):
         await self.client0.connect()
 
@@ -42,7 +52,7 @@ class TestAutoDuration(TestBase):
             for nameval in [['int', 42], ['float', 3.14], ['str', 'hi']]:
                 name, val = nameval
                 series['{}-{}'.format(name, i)] = [
-                    [t + random.randint(-r, r), val]
+                    [(t + random.randint(-r, r)) * factor, val]
                     for t in range(start_ts, end_td, interval)
                 ]
 
index adff52ac664563f256d5e6d139a948cf3018c11a..ad4b9dac437d89840f6c95aaecfcf92106022112 100644 (file)
@@ -43,11 +43,13 @@ class Server:
                  heartbeat_interval=30,
                  buffer_sync_interval=500,
                  compression=True,
+                 auto_duration=True,
                  pipe_name=None,
                  **unused):
         self.n = n
         self.test_title = title.lower().replace(' ', '_')
         self.compression = compression
+        self.auto_duration = auto_duration
         self.enable_pipe_support = int(bool(pipe_name))
         self.pipe_name = \
             'siridb_client.sock' if not self.enable_pipe_support else \
@@ -95,6 +97,9 @@ class Server:
         config.set('siridb', 'default_db_path', self.dbpath)
         config.set('siridb', 'max_open_files', MAX_OPEN_FILES)
         config.set('siridb', 'enable_shard_compression', int(self.compression))
+        config.set(
+            'siridb', 'enable_shard_auto_duration',
+            int(self.auto_duration))
         config.set('siridb', 'enable_pipe_support', self.enable_pipe_support)
         config.set('siridb', 'pipe_client_name',  self.pipe_name)
         config.set('siridb', 'http_status_port',  self.http_status_port)
diff --git a/main.c b/main.c
index 5a19d1b7b1ad2ef4861e7406ec848ab15af0f4e9..563fd9a66b887d5a28a1fb12dedcfc537b9f13bb 100644 (file)
--- a/main.c
+++ b/main.c
@@ -68,6 +68,7 @@ int main(int argc, char * argv[])
     set_max_open_files_limit();
 
     log_debug("Shard compression: %s", siri.cfg->shard_compression ? "enabled" : "disabled");
+    log_debug("Shard auto duration: %s", siri.cfg->shard_auto_duration ? "enabled" : "disabled");
     log_debug("Pipe support: %s", siri.cfg->pipe_support ? "enabled" : "disabled");
     log_debug("IP support: %s", sirinet_tcp_ip_support_str(siri.cfg->ip_support));
 
index 6588e0d7fcec788b3b7950b4b2af67228fb3c6e7..a8ef21c5cf6421c5cd037dc8781ef700dba5a6d0 100644 (file)
@@ -74,6 +74,13 @@ max_open_files = 32768
 #
 enable_shard_compression = 1
 
+#
+# Let SiriDB control shard duration when possible. When enabled, the configured 
+# shard duration fot both number and log values are still used when SiriDB is 
+# not able to detect a sensible duration. 
+#
+enable_shard_auto_duration = 1
+
 #
 # Enable named pipe support for client connections.
 #
index 1652774431f04de47f0662ee9af8e0f2aa1dcc13..b062db4d6d63163c6aaea4e95de6524746286160 100644 (file)
@@ -26,6 +26,7 @@ static siri_cfg_t siri_cfg = {
         .optimize_interval=3600,
         .ip_support=IP_SUPPORT_ALL,
         .shard_compression=0,
+        .shard_auto_duration=0,
         .server_address="localhost",
         .default_db_path="",
         .pipe_support=0,
@@ -53,6 +54,7 @@ static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_max_open_files(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_ip_support(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser);
+static void SIRI_CFG_read_shard_auto_duration(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser);
 
 void siri_cfg_init(siri_t * siri)
@@ -130,6 +132,7 @@ void siri_cfg_init(siri_t * siri)
     SIRI_CFG_read_max_open_files(cfgparser);
     SIRI_CFG_read_ip_support(cfgparser);
     SIRI_CFG_read_shard_compression(cfgparser);
+    SIRI_CFG_read_shard_auto_duration(cfgparser);
 
     SIRI_CFG_read_addr(
             cfgparser,
@@ -293,6 +296,35 @@ static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser)
     }
 }
 
+static void SIRI_CFG_read_shard_auto_duration(cfgparser_t * cfgparser)
+{
+    cfgparser_option_t * option;
+    cfgparser_return_t rc;
+    rc = cfgparser_get_option(
+                &option,
+                cfgparser,
+                "siridb",
+                "enable_shard_auto_duration");
+    if (rc != CFGPARSER_SUCCESS)
+    {
+        log_warning(
+                "Missing 'enable_shard_auto_duration' in '%s' (%s).",
+                siri.args->config,
+                cfgparser_errmsg(rc));
+    }
+    else if (option->tp != CFGPARSER_TP_INTEGER || option->val->integer > 1)
+    {
+        log_warning(
+                "Error reading 'enable_shard_auto_duration' in '%s': %s.",
+                siri.args->config,
+                "error: expecting 0 or 1");
+    }
+    else if (option->val->integer == 1)
+    {
+        siri_cfg.shard_auto_duration = 1;
+    }
+}
+
 static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser)
 {
     cfgparser_option_t * option;
index 286df429737957f26365465b97ee96386a8e0187..3bdf141c5625b4eaebe4c17bad09a26721f16fb5 100644 (file)
@@ -558,7 +558,7 @@ int siridb_shard_cexpr_cb(
     case CLERI_GID_K_POOL:
         return cexpr_int_cmp(cond->operator, vshard->server->pool, cond->int64);
     case CLERI_GID_K_SIZE:
-        return cexpr_int_cmp(cond->operator, vshard->shard->size, cond->int64);
+        return cexpr_int_cmp(cond->operator, vshard->shard->len, cond->int64);
     case CLERI_GID_K_START:
         return cexpr_int_cmp(cond->operator, vshard->start, cond->int64);
     case CLERI_GID_K_END:
index 3980bd4dd866ff47e4d4d79d9c74f3b36124a639..d7a8307c13a63ad1331a223e01abec1408fe9d6b 100644 (file)
@@ -243,7 +243,9 @@ int siridb_shards_add_points(
 
     if (duration == 0)
     {
-        uint64_t interval = siridb_points_get_interval(points);
+        uint64_t interval = siri.cfg->shard_auto_duration
+                ? siridb_points_get_interval(points)
+                : 0;
 
         duration = interval
             ? siridb_shard_duration_from_interval(siridb, interval)
index 9f645ecbfc7629fcb3b3a13ea621277666407932..8f9d77630cd0049af362f0c14417c9175dc008be 100644 (file)
@@ -140,6 +140,9 @@ void siri_evars_parse(siri_t * siri)
     evars__bool(
             "SIRIDB_ENABLE_SHARD_COMPRESSION",
             &siri->cfg->shard_compression);
+    evars__bool(
+            "SIRIDB_ENABLE_SHARD_AUTO_DURATION",
+            &siri->cfg->shard_auto_duration);
     evars__to_strn(
             "SIRIDB_DEFAULT_DB_PATH",
             siri->cfg->default_db_path,