From: Jeroen van der Heijden Date: Thu, 10 Sep 2020 14:30:31 +0000 (+0200) Subject: Work on autoshard X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~2^2~14^2~8 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=69197b76af21578eb49599bd3cea35c700683500;p=siridb-server.git Work on autoshard --- diff --git a/include/siri/db/series.h b/include/siri/db/series.h index d3aa23c9..0e051aa8 100644 --- a/include/siri/db/series.h +++ b/include/siri/db/series.h @@ -65,7 +65,6 @@ struct siridb_series_s uint32_t length; uint32_t idx_len; long int bf_offset; - uint64_t interval; siridb_points_t * buffer; char * name; idx_t * idx; @@ -152,6 +151,4 @@ struct idx_s uint64_t end_ts; }; - - #endif /* SIRIDB_SERIES_H_ */ diff --git a/include/siri/db/series.inline.h b/include/siri/db/series.inline.h new file mode 100644 index 00000000..5361cbf2 --- /dev/null +++ b/include/siri/db/series.inline.h @@ -0,0 +1,7 @@ +#include +#include + +static inline size_t siridb_series_duration(siridb_series_t * series) +{ + return series->idx_len ? series->idx->shard->duration : 0; +} diff --git a/include/siri/version.h b/include/siri/version.h index 26b90996..b834c833 100644 --- a/include/siri/version.h +++ b/include/siri/version.h @@ -15,7 +15,7 @@ * Note that debian alpha packages should use versions like this: * 2.0.34-0alpha0 */ -#define SIRIDB_VERSION_PRE_RELEASE "-alpha-0" +#define SIRIDB_VERSION_PRE_RELEASE "-alpha-1" #ifndef NDEBUG #define SIRIDB_VERSION_BUILD_RELEASE "+debug" diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 4141cfbb..59386091 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -2417,8 +2417,7 @@ static void exit_count_shards(uv_async_t * handle) vshard.shard = (siridb_shard_t *) shards_list->data[i]; /* set start and end properties */ - duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; + duration = vshard.shard->duration; vshard.start = vshard.shard->id - vshard.shard->id % duration; vshard.end = vshard.start + duration; @@ -2481,8 +2480,8 @@ static void exit_count_shards_size(uv_async_t * handle) vshard.shard = (siridb_shard_t *) shards_list->data[i]; /* set start and end properties */ - duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; + duration = vshard.shard->duration; + vshard.start = vshard.shard->id - vshard.shard->id % duration; vshard.end = vshard.start + duration; @@ -2950,8 +2949,7 @@ static void exit_drop_shards(uv_async_t * handle) vshard.shard = (siridb_shard_t *) q_drop->shards_list->data[i]; /* set start and end properties */ - duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; + duration = vshard.shard->duration; vshard.start = vshard.shard->id - vshard.shard->id % duration; vshard.end = vshard.start + duration; @@ -3510,8 +3508,8 @@ static void exit_list_shards(uv_async_t * handle) vshard.shard = (siridb_shard_t *) shards_list->data[i]; /* set start and end properties */ - duration = (vshard.shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; + duration = vshard.shard->duration; + vshard.start = vshard.shard->id - vshard.shard->id % duration; vshard.end = vshard.start + duration; diff --git a/src/siri/db/series.c b/src/siri/db/series.c index c7c6b937..5dde9476 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -586,8 +586,8 @@ void siridb_series_remove_shard( { idx_t *__restrict idx; uint_fast32_t i, offset; - uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; + uint64_t start = shard->id - series->mask; + uint64_t end = start + shard->duration; i = offset = 0; @@ -633,8 +633,6 @@ void siridb_series_remove_shard( { series->idx = idx; } - uint64_t start = shard->id - series->mask; - uint64_t end = start + duration; if (series->start >= start && series->start < end) { SERIES_update_start(series); @@ -1353,7 +1351,6 @@ static siridb_series_t * SERIES_new( series->idx_len = 0; series->idx = NULL; series->siridb = siridb; - series->interval = 0; /* get sum series name to calculate series mask (for sharding) */ for (n = 0; *name; name++) diff --git a/src/siri/db/shard.c b/src/siri/db/shard.c index 4b5e1c6d..7a5ff7c6 100644 --- a/src/siri/db/shard.c +++ b/src/siri/db/shard.c @@ -1534,7 +1534,7 @@ void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb) else for (i = 0; i < vec->len; i++) { series = (siridb_series_t *) vec->data[i]; - if (shard->id % siridb->duration_num == series->mask) + if (shard->id % shard->duration == series->mask) { siridb_series_remove_shard(siridb, series, shard); siridb_series_remove_shard(siridb, series, pop_shard); @@ -1556,7 +1556,7 @@ void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb) else for (i = 0; i < vec->len; i++) { series = (siridb_series_t *) vec->data[i]; - if (shard->id % siridb->duration_num == series->mask) + if (shard->id % shard->duration == series->mask) { siridb_series_remove_shard(siridb, series, shard); } @@ -1738,8 +1738,7 @@ static ssize_t SHARD_apply_idx( (uint64_t) *((uint64_t *) (pt + 12)) : (uint64_t) *((uint32_t *) (pt + 8)); uint64_t start = shard->id - series->mask; - uint64_t end = start + ((shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log); + uint64_t end = start + shard->duration; if (start_ts < start || end_ts >= end) { diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index b62a89f6..5fd4ae17 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,7 @@ int siridb_shards_load(siridb_t * siridb) &shard_id, &duration)) { + /* TODO: migration code, for backwards compatibility */ continue; } @@ -194,26 +196,24 @@ int siridb_shards_add_points( _Bool is_num = siridb_series_isnum(series); siridb_shard_t * shard; omap_t * shards; + uint64_t duration = siridb_series_duration(series); uv_mutex_lock(&siridb->values_mutex); - uint64_t duration = is_num ? siridb->duration_num : siridb->duration_log; uint64_t expire_at = is_num ? siridb->exp_at_num : siridb->exp_at_log; - uv_mutex_unlock(&siridb->values_mutex); - - if (series->interval == 0) + if (duration == 0) { - series->interval = siridb_points_get_interval(points); + uint64_t interval = siridb_points_get_interval(points); - if (series->interval == 0) - { - /* fall-back to default interval */ - series->interval = siridb_shard_interval_from_duration(duration); - } + duration = interval + ? siridb_shard_duration_from_interval(siridb, interval) + : is_num + ? siridb->duration_num + : siridb->duration_log; } - duration = siridb_shard_duration_from_interval(siridb, series->interval); + uv_mutex_unlock(&siridb->values_mutex); uint64_t shard_start, shard_end, shard_id; uint_fast32_t start, end, num_chunks, pstart, pend; diff --git a/src/siri/optimize.c b/src/siri/optimize.c index cfaf958d..b9fb1ccc 100644 --- a/src/siri/optimize.c +++ b/src/siri/optimize.c @@ -263,9 +263,6 @@ static void OPTIMIZE_work(uv_work_t * work __attribute__((unused))) uint8_t c = siri.cfg->shard_compression; size_t i; uint64_t expi[2]; - uint64_t dura[2]; - - log_info("Start optimize task"); @@ -309,8 +306,6 @@ static void OPTIMIZE_work(uv_work_t * work __attribute__((unused))) uv_mutex_lock(&siridb->values_mutex); - dura[SIRIDB_SHARD_TP_NUMBER] = siridb->duration_num; - dura[SIRIDB_SHARD_TP_LOG] = siridb->duration_log; expi[SIRIDB_SHARD_TP_NUMBER] = siridb->exp_at_num; expi[SIRIDB_SHARD_TP_LOG] = siridb->exp_at_log; @@ -329,8 +324,7 @@ static void OPTIMIZE_work(uv_work_t * work __attribute__((unused))) { shard = (siridb_shard_t *) slshards->data[j]; - if ((shard->id - shard->id % dura[shard->tp]) + dura[shard->tp] < - expi[shard->tp]) + if ((shard->id - shard->id % shard->duration) + shard->duration < expi[shard->tp]) { log_info( "Shard id %" PRIu64 " (%" PRIu8 ") is expired "