work on auto shard
authorJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 11 Sep 2020 14:55:36 +0000 (16:55 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 11 Sep 2020 14:55:36 +0000 (16:55 +0200)
debian/tests/control
include/siri/db/shard.h
itest/test_auto_duration.py [new file with mode: 0644]
itest/test_http_api.py
itest/test_insert.py
src/siri/backup.c
src/siri/db/listener.c
src/siri/db/points.c
src/siri/db/shard.c
src/siri/db/shards.c

index d6881040782fd940501f80cc0aaaf336e85ee47f..8ce3097d72c52b7f848219d228ffda530cf0697e 100644 (file)
@@ -1,3 +1,3 @@
-Test-Command: make --directory=Release test
+Test-Command: NOMEMTEST=1 make --directory=Release test
 Features: test-name=siridb-unit-tests
 Depends: @, @builddeps@
index 4d6813abeab14c915f0f1543a63fb284f735c249..969ef74dbcbe88c09480ca5a7ccd02bdec174615 100644 (file)
@@ -105,6 +105,10 @@ int siridb_shard_get_points_log_compressed(
         uint64_t * start_ts,
         uint64_t * end_ts,
         uint8_t has_overlap);
+int siridb_shard_migrate(
+        siridb_t * siridb,
+        uint64_t shard_id,
+        uint64_t * duration);
 int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb);
 void siridb__shard_free(siridb_shard_t * shard);
 void siridb__shard_decref(siridb_shard_t * shard);
diff --git a/itest/test_auto_duration.py b/itest/test_auto_duration.py
new file mode 100644 (file)
index 0000000..2c0f147
--- /dev/null
@@ -0,0 +1,462 @@
+import asyncio
+import functools
+import random
+import time
+import math
+import re
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
+
+
+class TestAutoDuration(TestBase):
+    title = 'Test select and aggregate functions'
+
+    @default_test_setup(2, compression=False, buffer_size=1024)
+    async def run(self):
+        await self.client0.connect()
+
+        gap =
+        ts = int(time.time()) -
+
+        points = []
+
+        for i in range(tx,)
+            [ts, i]
+            for i
+        ]
+
+        self.assertEqual(
+            await self.client0.insert(DATA),
+            {'success_msg': 'Successfully inserted {} point(s).'.format(
+                LENPOINTS)})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select difference() from "series-001 integer"'),
+            {'series-001 integer': [[1471254708, -8], [1471254710, -4]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select difference() => difference() '
+                'from "series-001 integer"'),
+            {'series-001 integer': [[1471254710, 4]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select difference() => difference() => difference() '
+                'from "series-001 integer"'),
+            {'series-001 integer': []})
+
+        now = int(time.time())
+        self.assertEqual(
+            await self.client0.query(
+                'select difference({}) from "series-001 integer"'.format(now)),
+            {'series-001 integer': [[now, -12]]})
+
+        now = int(time.time())
+        self.assertEqual(
+            await self.client0.query(
+                'select difference({}) from "series-001 integer"'.format(now)),
+            {'series-001 integer': [[now, -12]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select * from /series-001.*/ '
+                'merge as "median_low" using median_low({})'
+                .format(now)),
+            {'median_low': [[now, -3.5]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select * from /series-001.*/ '
+                'merge as "median_high" using median_high({})'
+                .format(now)),
+            {'median_high': [[now, -3.0]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select * from /series.*/ '
+                'merge as "max" using max(1s)'),
+            {'max': [
+                [1471254705, 5.0],
+                [1471254707, -2.5],
+                [1471254708, -1.0],
+                [1471254710, -7.0]
+            ]})
+
+        # Test all aggregation methods
+
+        self.assertEqual(
+            await self.client0.query('select sum(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 2663], [1447254000, 5409], [1447257600, 1602]]})
+
+        self.assertEqual(
+            await self.client0.query('select count(1h) from "aggr"'),
+            {'aggr': [[1447250400, 5], [1447254000, 12], [1447257600, 3]]})
+
+        self.assertEqual(
+            await self.client0.query('select mean(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 532.6],
+                [1447254000, 450.75],
+                [1447257600, 534.0]]})
+
+        self.assertEqual(
+            await self.client0.query('select median(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 532.0],
+                [1447254000, 530.5],
+                [1447257600, 533.0]]})
+
+        self.assertEqual(
+            await self.client0.query('select median_low(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 532], [1447254000, 530], [1447257600, 533]]})
+
+        self.assertEqual(
+            await self.client0.query('select median_high(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 532], [1447254000, 531], [1447257600, 533]]})
+
+        self.assertEqual(
+            await self.client0.query('select min(1h) from "aggr"'),
+            {'aggr': [[1447250400, 531], [1447254000, 54], [1447257600, 532]]})
+
+        self.assertEqual(
+            await self.client0.query('select max(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 535], [1447254000, 538], [1447257600, 537]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query('select variance(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 3.3],
+                [1447254000, 34396.931818181816],
+                [1447257600, 7.0]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query('select pvariance(1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 2.6399999999999997],
+                [1447254000, 31530.520833333332],
+                [1447257600, 4.666666666666667]]})
+
+        self.assertEqual(
+            await self.client0.query('select * from ({}) - ("a", "b")'.format(
+                ','.join(['"aggr"'] * 600)
+            )),
+            {'aggr': DATA['aggr']}
+        )
+
+        self.assertEqual(
+            await self.client0.query('select difference(1h) from "aggr"'),
+            {'aggr': [[1447250400, 1], [1447254000, -3], [1447257600, 5]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query('select derivative(1, 1h) from "aggr"'),
+            {'aggr': [
+                [1447250400, 0.0002777777777777778],
+                [1447254000, -0.0008333333333333333],
+                [1447257600, 0.001388888888888889]]})
+
+        self.assertEqual(
+            await self.client0.query('select filter(>534) from "aggr"'),
+            {'aggr': [
+                [1447249633, 535],
+                [1447250549, 537],
+                [1447252349, 537],
+                [1447253549, 538],
+                [1447254748, 537]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(/l.*/) from * where type == string'),
+            {'log': [p for p in DATA['log'] if re.match('l.*', p[1])]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(==/l.*/) from * where type == string'),
+            {'log': [p for p in DATA['log'] if re.match('l.*', p[1])]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(!=/l.*/) from * where type == string'),
+            {'log': [p for p in DATA['log'] if not re.match('l.*', p[1])]})
+
+        self.assertEqual(
+            await self.client0.query('select limit(300, mean) from "aggr"'),
+            {'aggr': DATA['aggr']})
+
+        self.assertEqual(
+            await self.client0.query('select limit(1, sum)  from "aggr"'),
+            {'aggr': [[1447254748, 9674]]})
+
+        self.assertEqual(
+            await self.client0.query('select limit(3, mean) from "aggr"'),
+            {'aggr': [
+                [1447250938, 532.8571428571429],
+                [1447252844, 367.6666666666667],
+                [1447254750, 534.0]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select limit(2, max)  from "series-001 float"'),
+            {'series-001 float': [[1471254707, 1.5], [1471254713, -7.3]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select variance(1471254712) from "variance"'),
+            {'variance': [[1471254712, 1.3720238095238095]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select pvariance(1471254715) from "pvariance"'),
+            {'pvariance': [[1471254715, 1.25]]})
+
+        self.assertEqual(
+            await self.client0.query('select * from "one"'),
+            {'one': [[1471254710, 1]]})
+
+        self.assertEqual(
+            await self.client0.query('select * from "log"'),
+            {'log': DATA['log']})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(~"log") => filter(!~"one") from "log"'),
+            {'log': [DATA['log'][1]]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(!=nan) from "special"'),
+            {'special': [p for p in DATA['special'] if not math.isnan(p[1])]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(==nan) from "special"'),
+            {'special': [p for p in DATA['special'] if math.isnan(p[1])]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(>=nan) from "special"'),
+            {'special': [p for p in DATA['special'] if math.isnan(p[1])]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(<=nan) from "special"'),
+            {'special': [p for p in DATA['special'] if math.isnan(p[1])]})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(==inf) from "special"'),
+            {'special': [p for p in DATA['special'] if p[1] == math.inf]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(<inf) from "special"'),
+            {'special': [p for p in DATA['special'] if p[1] < math.inf]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(>inf) from "special"'),
+            {'special': []})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(==-inf) from "special"'),
+            {'special': [p for p in DATA['special'] if p[1] == -math.inf]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(>-inf) from "special"'),
+            {'special': [p for p in DATA['special'] if p[1] > -math.inf]})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select filter(<-inf) from "special"'),
+            {'special': []})
+
+        self.assertEqual(
+            await self.client0.query(
+                'select filter(~"one") prefix "1-", '
+                'filter(~"two") prefix "2-" from "log"'),
+            {
+                '1-log': [
+                    [1471254710, 'log line one'],
+                    [1471254716, 'and yet one more']],
+                '2-log': [[1471254712, 'log line two']]
+            })
+
+        self.assertEqual(
+            await self.client0.query('select difference() from "one"'),
+            {'one': []})
+
+        with self.assertRaisesRegex(
+                QueryError,
+                'Regular expressions can only be used with.*'):
+            await self.client0.query('select filter(~//) from "log"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                'Cannot use a string filter on number type.'):
+            await self.client0.query('select filter(//) from "aggr"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Cannot use mean\(\) on string type\.'):
+            await self.client0.query('select mean(1w) from "log"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Group by time must be an integer value larger than zero\.'):
+            await self.client0.query('select mean(0) from "aggr"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Limit must be an integer value larger than zero\.'):
+            await self.client0.query('select limit(6 - 6, mean) from "aggr"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Cannot use a string filter on number type\.'):
+            await self.client0.query(
+                'select * from "aggr" '
+                'merge as "t" using filter("0")')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Cannot use difference\(\) on string type\.'):
+            await self.client0.query('select difference() from "log"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Cannot use derivative\(\) on string type\.'):
+            await self.client0.query('select derivative(6, 3) from "log"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Cannot use derivative\(\) on string type\.'):
+            await self.client0.query('select derivative() from "log"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                r'Overflow detected while using sum\(\)\.'):
+            await self.client0.query('select sum(now) from "huge"')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                'Max depth reached in \'where\' expression!'):
+            await self.client0.query(
+                'select * from "aggr" where ((((((length > 1))))))')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                'Cannot compile regular expression.*'):
+            await self.client0.query(
+                'select * from /(bla/')
+
+        with self.assertRaisesRegex(
+                QueryError,
+                'Memory allocation error or maximum recursion depth reached.'):
+            await self.client0.query(
+                'select * from {}"aggr"{}'.format(
+                    '(' * 501,
+                    ')' * 501))
+
+        with self.assertRaisesRegex(
+                    QueryError,
+                    'Query too long.'):
+            await self.client0.query('select * from "{}"'.format('a' * 65535))
+
+        with self.assertRaisesRegex(
+                    QueryError,
+                    'Error while merging points. Make sure the destination '
+                    'series name is valid.'):
+            await self.client0.query(
+                'select * from "aggr", "huge" merge as ""')
+
+        self.assertEqual(
+            await self.client0.query(
+                'select min(2h) prefix "min-", max(1h) prefix "max-" '
+                'from /.*/ where type == integer and name != "filter" '
+                'and name != "one" and name != "series-002 integer" '
+                'merge as "int_min_max" using median_low(1) => difference()'),
+            {
+                'max-int_min_max': [
+                    [1447254000, 3], [1447257600, -1], [1471255200, -532]],
+                'min-int_min_max': [
+                    [1447257600, -477], [1471255200, -54]]})
+
+        await self.client0.query('select derivative() from "equal ts"')
+
+        self.assertEqual(
+            await self.client0.query('select first() from *'),
+            {k: [v[0]] for k, v in DATA.items()})
+
+        self.assertEqual(
+            await self.client0.query('select last() from *'),
+            {k: [v[-1]] for k, v in DATA.items()})
+
+        self.assertEqual(
+            await self.client0.query('select count() from *'),
+            {k: [[v[-1][0], len(v)]] for k, v in DATA.items()})
+
+        self.assertEqual(
+            await self.client0.query('select mean() from "aggr"'),
+            {'aggr': [[
+                DATA['aggr'][-1][0],
+                sum([x[1] for x in DATA['aggr']]) / len(DATA['aggr'])]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query('select stddev() from "aggr"'),
+            {'aggr': [[
+                DATA['aggr'][-1][0],
+                147.07108914792838]]})
+
+        self.assertAlmostEqual(
+            await self.client0.query('select stddev(1h) from "aggr"'),
+            {"aggr": [
+                [1447250400, 1.8165902124584952],
+                [1447254000, 185.46409846162092],
+                [1447257600, 2.6457513110645907]]})
+
+        # test prefix, suffex
+        result = await self.client0.query(
+                'select sum(1d) prefix "sum-" suffix "-sum", '
+                'min(1d) prefix "minimum-", '
+                'max(1d) suffix "-maximum" from "aggr"')
+
+        self.assertIn('sum-aggr-sum', result)
+        self.assertIn('minimum-aggr', result)
+        self.assertIn('aggr-maximum', result)
+
+        await self.client0.query('alter database set select_points_limit 10')
+        with self.assertRaisesRegex(
+                QueryError,
+                'Query has reached the maximum number of selected points.*'):
+            await self.client0.query(
+                'select * from /.*/')
+        await self.client0.query(
+            'alter database set select_points_limit 1000000')
+
+        self.client0.close()
+
+
+if __name__ == '__main__':
+    parse_args()
+    run_test(TestSelect())
index 1a6577dc5fdd1adbaae1cf519635be664e4bb5fa..37282103ce7b807a0b263cfe3af9948c8f47dc51 100644 (file)
@@ -190,7 +190,7 @@ class TestHTTPAPI(TestBase):
         self.assertEqual(x.json(), 'OK')
 
         self.db.servers.append(self.server2)
-        await self.assertIsRunning(self.db, self.client0, timeout=30)
+        await self.assertIsRunning(self.db, self.client0, timeout=50)
 
         x = requests.get(
             f'http://localhost:9022/get-databases', auth=auth)
index bc0aebb08c5c6c2b61100c7f7e6ce1312ed06f75..6dcbc20d73901baf8784edc06cbed7abc6d1c5d0 100644 (file)
@@ -165,19 +165,19 @@ class TestInsert(TestBase):
         await self.assertSeries(self.client0, series)
         await self.assertSeries(self.client1, series)
 
-        tasks = [
-            asyncio.ensure_future(self.client0.query(
-                    'drop series /.*/ set ignore_threshold true'))
-            for i in range(5)]
+        tasks = [
+            asyncio.ensure_future(self.client0.query(
+                    'drop series /.*/ set ignore_threshold true'))
+            for i in range(5)]
 
-        await asyncio.gather(*tasks)
+        await asyncio.gather(*tasks)
 
-        tasks = [
-            asyncio.ensure_future(self.client0.query(
-                    'drop shards set ignore_threshold true'))
-            for i in range(5)]
+        tasks = [
+            asyncio.ensure_future(self.client0.query(
+                    'drop shards set ignore_threshold true'))
+            for i in range(5)]
 
-        await asyncio.gather(*tasks)
+        await asyncio.gather(*tasks)
 
         await asyncio.sleep(2)
 
index 3da9f531caa59c0b10747725d6a0fa5e2a6302bd..217f4ee73d81c0165e676ceed2c13507b51eba12 100644 (file)
@@ -8,6 +8,7 @@
 #include <siri/db/server.h>
 #include <siri/db/servers.h>
 #include <siri/db/shard.h>
+#include <siri/db/shards.h>
 #include <siri/optimize.h>
 #include <siri/siri.h>
 #include <stddef.h>
@@ -206,7 +207,7 @@ static int BACKUP_walk(siridb_t * siridb, void * args __attribute__((unused)))
          * A lock is not needed since the optimize thread is paused and this
          * is running from the main thread.
          */
-        vec_t * shard_list = imap_2vec(siridb->shards);
+        vec_t * shard_list = siridb_shards_vec(siridb);
 
         if (shard_list == NULL)
         {
@@ -227,6 +228,7 @@ static int BACKUP_walk(siridb_t * siridb, void * args __attribute__((unused)))
             {
                 siri_fp_close(shard->replacing->fp);
             }
+            --shard->ref;  /* at least two references exist */
         }
 
         vec_free(shard_list);
index 693ca0081b8ee146ae495090f1b296728a781781..dbbdd4e4f455cc9729b5c56106cb60f70a96699b 100644 (file)
@@ -2390,7 +2390,7 @@ static void exit_count_shards(uv_async_t * handle)
 
     if (q_count->where_expr == NULL)
     {
-        q_count->n = siridb->shards->len;
+        q_count->n = siridb_shards_n(siridb);
     }
     else
     {
@@ -2403,7 +2403,7 @@ static void exit_count_shards(uv_async_t * handle)
 
         uv_mutex_lock(&siridb->shards_mutex);
 
-        shards_list = imap_2vec_ref(siridb->shards);
+        shards_list = siridb_shards_vec(siridb);
 
         uv_mutex_unlock(&siridb->shards_mutex);
 
@@ -2466,7 +2466,7 @@ static void exit_count_shards_size(uv_async_t * handle)
 
     uv_mutex_lock(&siridb->shards_mutex);
 
-    shards_list = imap_2vec_ref(siridb->shards);
+    shards_list = siridb_shards_vec(siridb);
 
     uv_mutex_unlock(&siridb->shards_mutex);
 
@@ -2926,7 +2926,7 @@ static void exit_drop_shards(uv_async_t * handle)
 
     uv_mutex_lock(&siridb->shards_mutex);
 
-    q_drop->shards_list = imap_2vec_ref(siridb->shards);
+    q_drop->shards_list = siridb_shards_vec(siridb);
 
     uv_mutex_unlock(&siridb->shards_mutex);
 
@@ -3468,7 +3468,7 @@ static void exit_list_shards(uv_async_t * handle)
 
     uv_mutex_lock(&siridb->shards_mutex);
 
-    shards_list = imap_2vec_ref(siridb->shards);
+    shards_list = siridb_shards_vec(siridb);
 
     uv_mutex_unlock(&siridb->shards_mutex);
 
index e672a776fb8991d9c5b1eabdb5117922775c0db9..b3939a82527e75a5a7ddf7f44b0bb485912ed1e1 100644 (file)
@@ -1693,13 +1693,14 @@ uint64_t siridb_points_get_interval(siridb_points_t * points)
     uint64_t * arr;
     uint64_t x, a, b, c;
 
-    n = points->len - 1;
-    n = n > 63 ? 63 : n;
-    if (n < 7)
+    if (points->len < 8)
     {
         return 0;
     }
 
+    n = points->len - 1;
+    n = n > 63 ? 63 : n;
+
     arr = malloc(n * sizeof(uint64_t));
     if (arr == NULL)
     {
index 7a5ff7c6cef57ec24d8f53b1ba2ba43e94f28202..286df429737957f26365465b97ee96386a8e0187 100644 (file)
@@ -173,6 +173,96 @@ uint64_t siridb_shard_interval_from_duration(uint64_t duration)
     return duration / OPTIMAL_POINTS_PER_SHARD;;
 }
 
+int siridb_shard_migrate(
+        siridb_t * siridb,
+        uint64_t shard_id,
+        uint64_t * duration)
+{
+    FILE * fp;
+    char * fn, * new_fn;
+    int rc;
+    size_t n;
+    uint8_t schema, tp;
+    rc = asprintf(
+            &fn,
+            "%s%s%" PRIu64 ".sdb",
+             siridb->dbpath,
+             SIRIDB_SHARDS_PATH,
+             shard_id);
+    if (rc < 0)
+    {
+        log_error("Cannot create shard filename");
+        return -1;
+    }
+
+    if ((fp = fopen(fn, "r")) == NULL)
+    {
+        log_error("Cannot open (old) shard file for reading: '%s'", fn);
+        return -1;
+    }
+
+    char header[HEADER_SIZE];
+
+    if (fread(&header, HEADER_SIZE, 1, fp) != 1)
+    {
+        /* cannot read header from shard file,
+         * close file decrement reference shard and return -1
+         */
+        fclose(fp);
+        log_critical("Missing header in (old) shard file: '%s'", fn);
+        return -1;
+    }
+
+    schema = (uint8_t) header[HEADER_SCHEMA];
+    if (schema > SIRIDB_SHARD_SHEMA)
+    {
+        fclose(fp);
+        log_critical(
+                "Shard file '%s' has schema '%u' which is not supported with "
+                "this version of SiriDB.", fn, schema);
+        return -1;
+    }
+
+    tp = (uint8_t) header[HEADER_TP];
+    fclose(fp);
+
+    *duration = tp == SIRIDB_SHARD_TP_NUMBER
+            ? siridb->duration_num
+            : siridb->duration_log;
+
+    rc = asprintf(
+            &new_fn,
+            "%s%s%016"PRIX64"_%016"PRIX64".sdb",
+            siridb->dbpath,
+            SIRIDB_SHARDS_PATH,
+            shard_id,
+            *duration);
+    if (rc < 0)
+    {
+        log_error("Cannot create new shard file name");
+        free(fn);
+        free(new_fn);
+        return -1;
+    }
+
+    (void) rename(fn, new_fn);
+
+    n = strlen(fn);
+    fn[n-3] = 'i';
+    fn[n-1] = 'x';
+
+    n = strlen(new_fn);
+    new_fn[n-3] = 'i';
+    new_fn[n-1] = 'x';
+
+    (void) rename(fn, new_fn);
+
+    free(fn);
+    free(new_fn);
+
+    return 0;
+}
+
 /*
  * Returns 0 if successful or -1 in case of an error.
  * When an error occurs, a SIGNAL can be raised in some cases but not for sure.
@@ -183,6 +273,7 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration)
     FILE * fp;
     off_t shard_sz;
     siridb_shard_t * shard = malloc(sizeof(siridb_shard_t));
+    omap_t * shards;
 
     if (shard == NULL)
     {
@@ -316,7 +407,18 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration)
         return -1;
     }
 
-    if (imap_set(siridb->shards, id, shard) == -1)
+    shards = imap_get(siridb->shards, id);
+    if (shards == NULL)
+    {
+        shards = omap_create();
+        if (shards == NULL || imap_set(siridb->shards, id, shards) == -1)
+        {
+            siridb_shard_decref(shard);
+            return -1;
+        }
+    }
+
+    if (omap_set(shards, duration, shard) == NULL)
     {
         siridb_shard_decref(shard);
         return -1;
@@ -1476,13 +1578,22 @@ void siridb__shard_decref(siridb_shard_t * shard)
 void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb)
 {
     siridb_series_t * series;
-    siridb_shard_t * pop_shard;
+    siridb_shard_t * pop_shard = NULL;
+    omap_t * shards;
     int optimizing = 0;
 
     uv_mutex_lock(&siridb->series_mutex);
     uv_mutex_lock(&siridb->shards_mutex);
 
-    pop_shard = (siridb_shard_t *) imap_pop(siridb->shards, shard->id);
+    shards = imap_get(siridb->shards, shard->id);
+    if (shards)
+    {
+        pop_shard = omap_rm(shards, shard->duration);
+        if (shards->n == 0)
+        {
+            free(imap_pop(siridb->shards, shard->id));
+        }
+    }
 
     /*
      * When optimizing, 'pop_shard' is always the new shard and 'shard'
index 784a5062c8ddf8c3d326550f7ea3816d4a9beff9..b7c061e7cffeeabc0f58d44145f5035cf95faa44 100644 (file)
 
 #define SIRIDB_SHARD_LEN 37
 
+static bool SHARDS_must_migrate_shard(
+        char * fn,
+        const char * ext,
+        uint64_t * shard_id)
+{
+    size_t n = strlen(fn);
+    char * tmp = NULL;
+
+    if (n < 6)
+    {
+        return false;
+    }
+
+    *shard_id = strtoull(fn, &tmp, 16);
+
+    if (tmp == NULL)
+    {
+        return false;
+    }
+
+    return strcmp(tmp, ext) == 0;
+}
 
 static bool SHARDS_read_id_and_duration(
         char * fn,
@@ -57,13 +79,14 @@ static bool SHARDS_read_id_and_duration(
         return false;
     }
 
+    ++fn;
+
     *duration = strtoull(fn, &tmp, 16);
     if (tmp == NULL)
     {
         return false;
     }
     fn = tmp;
-
     return strcmp(fn, ext) == 0;
 }
 
@@ -72,20 +95,20 @@ static bool SHARDS_read_id_and_duration(
  */
 static bool SHARDS_is_temp_fn(char * fn)
 {
-    int i;
-    uint64_t shard_id, duration;
-    for (i = 0; i < 2; i++, fn++)
-    {
-        if (*fn != '_')
-        {
-            return false;
-        }
-    }
+    size_t n = strlen(fn);
 
-    return (
-        SHARDS_read_id_and_duration(fn, ".sdb", &shard_id, &duration) ||
-        SHARDS_read_id_and_duration(fn, ".idx", &shard_id, &duration)
-    );
+    return (n > 8 &&
+            fn[0] == '_' &&
+            fn[1] == '_' &&
+            fn[n-4] == '.' && ((
+                fn[n-3] == 's' &&
+                fn[n-2] == 'd' &&
+                fn[n-1] == 'b'
+            ) || (
+                fn[n-3] == 'i' &&
+                fn[n-2] == 'd' &&
+                fn[n-1] == 'x'
+    )));
 }
 
 
@@ -136,10 +159,11 @@ int siridb_shards_load(siridb_t * siridb)
 
     for (n = 0; n < total; n++)
     {
-        if (SHARDS_is_temp_fn(shard_list[n]->d_name))
+        char * base_fn = shard_list[n]->d_name;
+
+        if (SHARDS_is_temp_fn(base_fn))
         {
-            snprintf(buffer, XPATH_MAX, "%s%s",
-                   path, shard_list[n]->d_name);
+            snprintf(buffer, XPATH_MAX, "%s%s", path, base_fn);
 
             log_warning("Removing temporary file: '%s'", buffer);
 
@@ -152,19 +176,34 @@ int siridb_shards_load(siridb_t * siridb)
         }
 
         if (!SHARDS_read_id_and_duration(
-                shard_list[n]->d_name,
+                base_fn,
                 ".sdb",
                 &shard_id,
                 &duration))
         {
-            /* TODO: migration code, for backwards compatibility */
-            continue;
+            if (SHARDS_must_migrate_shard(
+                    base_fn,
+                    ".sdb",
+                    &shard_id))
+            {
+                log_info("Migrate shard: '%s'", base_fn);
+                if (siridb_shard_migrate(siridb, shard_id, &duration))
+                {
+                    log_error("Error while migrating shard: '%s'", base_fn);
+                    rc = -1;
+                    break;
+                }
+            }
+            else
+            {
+                continue;
+            }
         }
 
         /* we are sure this fits since the filename is checked */
         if (siridb_shard_load(siridb, shard_id, duration))
         {
-           log_error("Error while loading shard: '%s'", shard_list[n]->d_name);
+           log_error("Error while loading shard: '%s'", base_fn);
            rc = -1;
            break;
         }
@@ -361,7 +400,7 @@ vec_t * siridb_shards_vec(siridb_t * siridb)
     {
         return NULL;
     }
-    imap_walk(siridb->shards, (imap_cb) SHARDS_to_vec_cb, &n);
+    (void) imap_walk(siridb->shards, (imap_cb) SHARDS_to_vec_cb, vec);
     return vec;
 }
 
@@ -381,19 +420,15 @@ double siridb_shards_count_percent(
 
     uv_mutex_lock(&siridb->shards_mutex);
 
-    if (siridb->shards->len == 0)
-    {
-        percent = 0.0;
-    }
-    else
-    {
-        shards_list = imap_2vec_ref(siridb->shards);
-    }
+    shards_list = siridb_shards_vec(siridb);
 
     uv_mutex_unlock(&siridb->shards_mutex);
 
-    if (shards_list == NULL)
-        return percent;
+    if (shards_list == NULL || shards_list->len == 0)
+    {
+        free(shards_list);
+        return 0.0;
+    }
 
     for (i = 0; i < shards_list->len; i++)
     {
@@ -407,6 +442,6 @@ double siridb_shards_count_percent(
     }
 
     percent = total ? (double) count / (double) total : 0.0;
-    vec_free(shards_list);
+    free(shards_list);
     return percent;
 }