k_as = Keyword('as')
k_backup_mode = Keyword('backup_mode')
k_before = Keyword('before')
- k_buffer_size = Keyword('buffer_size')
- k_buffer_path = Keyword('buffer_path')
k_between = Keyword('between')
+ k_buffer_path = Keyword('buffer_path')
+ k_buffer_size = Keyword('buffer_size')
k_count = Keyword('count')
k_create = Keyword('create')
k_critical = Keyword('critical')
k_grant = Keyword('grant')
k_group = Keyword('group')
k_groups = Keyword('groups')
+ k_head = Keyword('head')
k_help = Choice(Keyword('help'), Token('?'))
k_idle_percentage = Keyword('idle_percentage')
k_idle_time = Keyword('idle_time')
k_sync_progress = Keyword('sync_progress')
k_tag = Keyword('tag')
k_tags = Keyword('tags')
+ k_tail = Keyword('tail')
k_tee = Keyword('tee')
k_time_precision = Keyword('time_precision')
k_timeit = Keyword('timeit')
before_expr = Sequence(k_before, time_expr)
after_expr = Sequence(k_after, time_expr)
between_expr = Sequence(k_between, time_expr, k_and, time_expr)
+ head_expr = Sequence(k_head, int_expr)
+ tail_expr = Sequence(k_tail, int_expr)
access_expr = List(access_keywords, ',', 1)
prefix_expr = Sequence(k_prefix, string)
after_expr,
between_expr,
before_expr,
+ tail_expr,
+ head_expr,
most_greedy=False)),
Optional(merge_as))
siridb_points_t * siridb_points_new(size_t size, points_tp tp);
void siridb_points_free(siridb_points_t * points);
int siridb_points_resize(siridb_points_t * points, size_t n);
+void siridb_points_tail(siridb_points_t * points, size_t n);
+void siridb_points_head(siridb_points_t * points, size_t n);
void siridb_points_add_point(
siridb_points_t *__restrict points,
uint64_t * ts,
QUERY_DEF
size_t n;
size_t nselects;
+ ssize_t headtail; /* negative is tail, positive is head */
uint64_t * start_ts; /* will NOT be freed */
uint64_t * end_ts; /* will NOT be freed */
siridb_presuf_t * presuf;
siridb_series_t *__restrict series,
uint64_t *__restrict start_ts,
uint64_t *__restrict end_ts);
+siridb_points_t * siridb_series_get_points_tail(
+ siridb_series_t *__restrict series,
+ size_t tail);
+siridb_points_t * siridb_series_get_points_head(
+ siridb_series_t *__restrict series,
+ size_t head);
void siridb_series_remove_shard(
siridb_t *__restrict siridb,
siridb_series_t *__restrict series,
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2022-04-15 12:10:03
+ * Created at: 2022-05-05 15:08:05
*/
#ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
#define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
CLERI_GID_GROUP_COLUMNS,
CLERI_GID_GROUP_NAME,
CLERI_GID_GROUP_TAG_MATCH,
+ CLERI_GID_HEAD_EXPR,
CLERI_GID_HELP_ACCESS,
CLERI_GID_HELP_ALTER,
CLERI_GID_HELP_ALTER_DATABASE,
CLERI_GID_K_GRANT,
CLERI_GID_K_GROUP,
CLERI_GID_K_GROUPS,
+ CLERI_GID_K_HEAD,
CLERI_GID_K_HELP,
CLERI_GID_K_IDLE_PERCENTAGE,
CLERI_GID_K_IDLE_TIME,
CLERI_GID_K_SYNC_PROGRESS,
CLERI_GID_K_TAG,
CLERI_GID_K_TAGS,
+ CLERI_GID_K_TAIL,
CLERI_GID_K_TEE,
CLERI_GID_K_TIMEIT,
CLERI_GID_K_TIMEVAL,
CLERI_GID_TAG_COLUMNS,
CLERI_GID_TAG_NAME,
CLERI_GID_TAG_SERIES,
+ CLERI_GID_TAIL_EXPR,
CLERI_GID_TIMEIT_STMT,
CLERI_GID_TIME_EXPR,
CLERI_GID_UNTAG_SERIES,
#define SIRI_MAX_SIZE_ERR_MSG 1024
#define MAX_NUMBER_DB 1024
+#define MAX_HEADTAIL 1000000L
#if defined(__GLIBC__)
#define strerror_si(__err, __buf, __sz) \
static void exit_calc_stmt(uv_async_t * handle);
static void exit_count_groups(uv_async_t * handle);
static void exit_count_pools(uv_async_t * handle);
-static void exit_count_series(uv_async_t * handle);
static void exit_count_series_length(uv_async_t * handle);
-static void exit_count_servers(uv_async_t * handle);
+static void exit_count_series(uv_async_t * handle);
static void exit_count_servers_received(uv_async_t * handle);
static void exit_count_servers_selected(uv_async_t * handle);
-static void exit_count_shards(uv_async_t * handle);
+static void exit_count_servers(uv_async_t * handle);
static void exit_count_shards_size(uv_async_t * handle);
+static void exit_count_shards(uv_async_t * handle);
static void exit_count_tags(uv_async_t * handle);
static void exit_count_users(uv_async_t * handle);
static void exit_create_group(uv_async_t * handle);
static void exit_drop_tag(uv_async_t * handle);
static void exit_drop_user(uv_async_t * handle);
static void exit_grant_user(uv_async_t * handle);
+static void exit_head_expr(uv_async_t * handle);
static void exit_help_xxx(uv_async_t * handle);
static void exit_list_groups(uv_async_t * handle);
static void exit_list_pools(uv_async_t * handle);
static void exit_set_timezone(uv_async_t * handle);
static void exit_show_stmt(uv_async_t * handle);
static void exit_tag_series(uv_async_t * handle);
+static void exit_tail_expr(uv_async_t * handle);
static void exit_timeit_stmt(uv_async_t * handle);
static void exit_untag_series(uv_async_t * handle);
SIRIDB_NODE_EXIT[CLERI_GID_CALC_STMT] = exit_calc_stmt;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_GROUPS] = exit_count_groups;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_POOLS] = exit_count_pools;
- SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERIES] = exit_count_series;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERIES_LENGTH] = exit_count_series_length;
- SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERVERS] = exit_count_servers;
+ SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERIES] = exit_count_series;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERVERS_RECEIVED] = exit_count_servers_received;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERVERS_SELECTED] = exit_count_servers_selected;
- SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SHARDS] = exit_count_shards;
+ SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SERVERS] = exit_count_servers;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SHARDS_SIZE] = exit_count_shards_size;
+ SIRIDB_NODE_EXIT[CLERI_GID_COUNT_SHARDS] = exit_count_shards;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_TAGS] = exit_count_tags;
SIRIDB_NODE_EXIT[CLERI_GID_COUNT_USERS] = exit_count_users;
SIRIDB_NODE_EXIT[CLERI_GID_CREATE_GROUP] = exit_create_group;
SIRIDB_NODE_EXIT[CLERI_GID_DROP_TAG] = exit_drop_tag;
SIRIDB_NODE_EXIT[CLERI_GID_DROP_USER] = exit_drop_user;
SIRIDB_NODE_EXIT[CLERI_GID_GRANT_USER] = exit_grant_user;
+ SIRIDB_NODE_EXIT[CLERI_GID_HEAD_EXPR] = exit_head_expr;
SIRIDB_NODE_EXIT[CLERI_GID_LIST_GROUPS] = exit_list_groups;
SIRIDB_NODE_EXIT[CLERI_GID_LIST_POOLS] = exit_list_pools;
SIRIDB_NODE_EXIT[CLERI_GID_LIST_SERIES] = exit_list_series;
SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt;
SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series;
+ SIRIDB_NODE_EXIT[CLERI_GID_TAIL_EXPR] = exit_tail_expr;
SIRIDB_NODE_EXIT[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt;
SIRIDB_NODE_EXIT[CLERI_GID_UNTAG_SERIES] = exit_untag_series;
SIRIPARSER_NEXT_NODE
}
+static void exit_head_expr(uv_async_t * handle)
+{
+ siridb_query_t * query = handle->data;
+ ssize_t head = *((ssize_t *) CLERI_NODE_DATA_ADDR(
+ cleri_gn(query->nodes->node->children->next)));
+
+
+ if (head <= 0 || head > MAX_HEADTAIL)
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Head must be a value between 1 and %ld, got %zd",
+ MAX_HEADTAIL, head);
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
+ }
+
+ ((query_select_t *) query->data)->headtail = head;
+
+ LOGC("Head: %zd", ((query_select_t *) query->data)->headtail);
+
+ SIRIPARSER_NEXT_NODE
+}
+
+static void exit_tail_expr(uv_async_t * handle)
+{
+ siridb_query_t * query = handle->data;
+ ssize_t tail = *((ssize_t *) CLERI_NODE_DATA_ADDR(
+ cleri_gn(query->nodes->node->children->next)));
+
+ if (tail < 1 || tail > MAX_HEADTAIL)
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Tail must be a value between 1 and %ld, got %zd",
+ MAX_HEADTAIL, tail);
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
+ }
+
+ ((query_select_t *) query->data)->headtail = -tail;
+
+ LOGC("Tail: %zd", ((query_select_t *) query->data)->headtail);
+
+ SIRIPARSER_NEXT_NODE
+}
+
static void exit_alter_group(uv_async_t * handle)
{
siridb_query_t * query = handle->data;
{
uv_mutex_lock(&siridb->series_mutex);
- points = (series->flags & SIRIDB_SERIES_IS_DROPPED) ?
- NULL : siridb_series_get_points(
+ points = (series->flags & SIRIDB_SERIES_IS_DROPPED)
+ ? NULL
+ : q_select->headtail == 0
+ ? siridb_series_get_points(
series,
q_select->start_ts,
- q_select->end_ts);
+ q_select->end_ts)
+ : q_select->headtail < 0
+ ? siridb_series_get_points_tail(
+ series,
+ -q_select->headtail)
+ : siridb_series_get_points_head(
+ series,
+ q_select->headtail);
+
uv_mutex_unlock(&siridb->series_mutex);
/* when having a cache and points, add a copy of points to the cache */
return 0;
}
+/*
+ * Resize points to a new size. Returns 0 when successful or -1 if failed.
+ */
+void siridb_points_tail(siridb_points_t * points, size_t n)
+{
+ if (n < points->len)
+ {
+ size_t i;
+ siridb_point_t * point = points->data + (points->len - n);
+ if (points->tp == TP_STRING)
+ {
+ for (i = 0; i < n; ++i, ++point)
+ {
+ free((points->data + i)->val.str);
+ *(points->data + i) = *point;
+ }
+ }
+ else
+ {
+ for (i = 0; i < n; ++i, ++point)
+ {
+ *(points->data + i) = *point;
+ }
+ }
+ points->len = n;
+ (void) siridb_points_resize(points, n);
+ }
+}
+
+/*
+ * Resize points to a new size. Returns 0 when successful or -1 if failed.
+ */
+void siridb_points_head(siridb_points_t * points, size_t n)
+{
+ if (n < points->len)
+ {
+ if (points->tp == TP_STRING)
+ {
+ size_t i = n, m = points->len;
+ for (; i < m; ++i)
+ {
+ free((points->data + i)->val.str);
+ }
+ }
+ points->len = n;
+ (void) siridb_points_resize(points, n);
+ }
+}
+
/*
* Returns a copy of points or NULL in case of an error. NULL is also returned
* if points is NULL.
query_select_t * query_select_new(void)
{
- query_select_t * q_select = malloc(sizeof(query_select_t));
+ query_select_t * q_select = calloc(1, sizeof(query_select_t));
if (q_select == NULL)
{
QUERIES_NEW(q_select)
q_select->tp = QUERIES_SELECT;
- q_select->start_ts = NULL;
- q_select->end_ts = NULL;
- q_select->presuf = NULL;
- q_select->merge_as = NULL;
- q_select->n = 0;
q_select->nselects = 1; /* we have at least one select function */
- q_select->points_map = NULL;
- q_select->alist = NULL;
- q_select->mlist = NULL;
q_select->result = ct_new();
if (q_select->result == NULL)
}
}
+static inline idx_t * series__last_idx(siridb_series_t *__restrict series)
+{
+ size_t i = series->idx_len - 1;
+ idx_t * idx = series->idx + i;
+ idx_t * last = idx;
+
+ for (; i && last->shard == (--idx)->shard; --i)
+ {
+ if (idx->end_ts > last->end_ts)
+ {
+ last = idx;
+ }
+ }
+ return last;
+}
+
+siridb_points_t * siridb_series_get_points_tail(
+ siridb_series_t *__restrict series,
+ size_t tail)
+{
+ idx_t * idx;
+ siridb_points_t * points;
+ siridb_point_t * point;
+ size_t n = 0, ibuf = 0, iidx = series->idx_len, size = 0;
+ uint64_t buf_start_ts = 0;
+
+ if (series->buffer)
+ {
+ /* We only have a buffer */
+ point = series->buffer->data;
+ size = ibuf = series->buffer->len;
+
+ if (tail < ibuf)
+ {
+ point += ibuf-tail;
+ size = ibuf = tail;
+ }
+
+ if (ibuf)
+ {
+ buf_start_ts = series->buffer->data->ts;
+ }
+ }
+
+ if (iidx == 0)
+ {
+ /* no shards, just use the buffer */
+ goto done;
+ }
+
+ idx = series->idx + series->idx_len - 1;
+
+ if (series->flags & SIRIDB_SERIES_HAS_OVERLAP)
+ {
+ if (series__last_idx(series)->end_ts <= buf_start_ts)
+ {
+ /* buffer is in line */
+ n = size;
+ }
+
+ while (n < tail && iidx)
+ {
+ siridb_shard_t * shard = idx->shard;
+ int overlap = shard->flags & SIRIDB_SHARD_HAS_OVERLAP;
+
+ /* if the shard is without overlaps, we can quite the loop
+ * once we have enough points; otherwise we read the whole shard
+ * even if too much points */
+
+ do
+ {
+ n += idx->len;
+ size += idx->len;
+ --idx;
+ --iidx;
+ }
+ while ((overlap || n < tail) && iidx && idx->shard == shard);
+ }
+ }
+ else
+ {
+ /* no overlap in this series */
+
+ if (idx->end_ts <= buf_start_ts)
+ {
+ /* buffer is in line */
+ n = size;
+ }
+
+ while (n < tail && iidx)
+ {
+ n += idx->len;
+ size += idx->len;
+ --idx;
+ --iidx;
+ }
+ }
+
+done:
+ points = siridb_points_new(size, series->tp);
+
+ if (points == NULL)
+ {
+ ERR_ALLOC /* TODO: maybe remove ERR_ALLOC */
+ return NULL;
+ }
+
+ for (; iidx < series->idx_len; ++iidx)
+ {
+ idx = series->idx + iidx;
+
+ (void) siridb_shard_get_points_callback(idx->shard->flags, series)(
+ points,
+ idx,
+ NULL,
+ NULL,
+ series->flags & SIRIDB_SERIES_HAS_OVERLAP);
+ /* errors can be ignored here */
+ }
+
+ /* add buffer points */
+ for (; ibuf; ++point, --ibuf)
+ {
+ siridb_points_add_point(points, &point->ts, &point->val);
+ }
+
+ siridb_points_tail(points, tail);
+ return points;
+}
+
+siridb_points_t * siridb_series_get_points_head(
+ siridb_series_t *__restrict series,
+ size_t head)
+{
+ idx_t * idx;
+ siridb_points_t * points;
+ siridb_point_t * point;
+ size_t n = 0, ibuf = 0, iidx = series->idx_len, size = 0;
+ int btest = series->flags & SIRIDB_SERIES_HAS_OVERLAP;
+
+ idx = series->idx;
+
+ while (n < head && iidx)
+ {
+ siridb_shard_t * shard = idx->shard;
+ int overlap = btest && (shard->flags & SIRIDB_SHARD_HAS_OVERLAP);
+
+ /* if the shard is without overlaps, we can quite the loop
+ * once we have enough points; otherwise we read the whole shard
+ * even if too much points */
+ do
+ {
+ n += idx->len;
+ size += idx->len;
+ ++idx;
+ --iidx;
+ }
+ while ((overlap || n < head) && iidx && idx->shard == shard);
+ }
+
+ /* get the number of index we need to load */
+ iidx = series->idx_len - iidx;
+
+ if (series->buffer && series->buffer->len)
+ {
+ /* We only have a buffer */
+ point = series->buffer->data;
+ btest = iidx && (series->idx + (iidx - 1))->end_ts <= point->ts;
+
+ if (btest && n < head)
+ {
+ ibuf = head - n;
+ if (ibuf > series->buffer->len)
+ {
+ ibuf = series->buffer->len;
+ }
+ }
+ else if (!btest)
+ {
+ ibuf = series->buffer->len;
+ if (ibuf > head)
+ {
+ ibuf = head;
+ }
+ }
+ size += ibuf;
+ }
+
+ points = siridb_points_new(size, series->tp);
+
+ if (points == NULL)
+ {
+ ERR_ALLOC /* TODO: maybe remove ERR_ALLOC */
+ return NULL;
+ }
+
+ idx = series->idx;
+
+ for (; iidx; --iidx, ++idx)
+ {
+ (void) siridb_shard_get_points_callback(idx->shard->flags, series)(
+ points,
+ idx,
+ NULL,
+ NULL,
+ series->flags & SIRIDB_SERIES_HAS_OVERLAP);
+ /* errors can be ignored here */
+ }
+
+ /* add buffer points */
+ for (; ibuf; ++point, --ibuf)
+ {
+ siridb_points_add_point(points, &point->ts, &point->val);
+ }
+
+ siridb_points_head(points, head);
+ return points;
+}
+
+
/*
* Returns NULL and raises a SIGNAL in case an error has occurred.
*/
siridb_points_t * buf = series->buffer;
siridb_points_t * points;
siridb_point_t * point;
+ idx_t * last;
if (buf != NULL &&
buf->len &&
assert (series->idx_len);
/* if not in the buffer, then if must be in a shard */
-
- size_t i = series->idx_len - 1;
- idx_t * idx = series->idx + i;
- idx_t * last = idx;
-
- for (; i && last->shard == (--idx)->shard; --i)
- {
- if (idx->end_ts > last->end_ts)
- {
- last = idx;
- }
- }
-
+ last = series__last_idx(series);
points = siridb_points_new(last->len, series->tp);
if (points == NULL)
{
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2022-04-15 12:10:03
+ * Created at: 2022-05-05 15:08:05
*/
#include "siri/grammar/grammar.h"
cleri_t * k_as = cleri_keyword(CLERI_GID_K_AS, "as", CLERI_CASE_SENSITIVE);
cleri_t * k_backup_mode = cleri_keyword(CLERI_GID_K_BACKUP_MODE, "backup_mode", CLERI_CASE_SENSITIVE);
cleri_t * k_before = cleri_keyword(CLERI_GID_K_BEFORE, "before", CLERI_CASE_SENSITIVE);
- cleri_t * k_buffer_size = cleri_keyword(CLERI_GID_K_BUFFER_SIZE, "buffer_size", CLERI_CASE_SENSITIVE);
- cleri_t * k_buffer_path = cleri_keyword(CLERI_GID_K_BUFFER_PATH, "buffer_path", CLERI_CASE_SENSITIVE);
cleri_t * k_between = cleri_keyword(CLERI_GID_K_BETWEEN, "between", CLERI_CASE_SENSITIVE);
+ cleri_t * k_buffer_path = cleri_keyword(CLERI_GID_K_BUFFER_PATH, "buffer_path", CLERI_CASE_SENSITIVE);
+ cleri_t * k_buffer_size = cleri_keyword(CLERI_GID_K_BUFFER_SIZE, "buffer_size", CLERI_CASE_SENSITIVE);
cleri_t * k_count = cleri_keyword(CLERI_GID_K_COUNT, "count", CLERI_CASE_SENSITIVE);
cleri_t * k_create = cleri_keyword(CLERI_GID_K_CREATE, "create", CLERI_CASE_SENSITIVE);
cleri_t * k_critical = cleri_keyword(CLERI_GID_K_CRITICAL, "critical", CLERI_CASE_SENSITIVE);
cleri_t * k_grant = cleri_keyword(CLERI_GID_K_GRANT, "grant", CLERI_CASE_SENSITIVE);
cleri_t * k_group = cleri_keyword(CLERI_GID_K_GROUP, "group", CLERI_CASE_SENSITIVE);
cleri_t * k_groups = cleri_keyword(CLERI_GID_K_GROUPS, "groups", CLERI_CASE_SENSITIVE);
+ cleri_t * k_head = cleri_keyword(CLERI_GID_K_HEAD, "head", CLERI_CASE_SENSITIVE);
cleri_t * k_help = cleri_choice(
CLERI_GID_K_HELP,
CLERI_MOST_GREEDY,
cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE);
cleri_t * k_tag = cleri_keyword(CLERI_GID_K_TAG, "tag", CLERI_CASE_SENSITIVE);
cleri_t * k_tags = cleri_keyword(CLERI_GID_K_TAGS, "tags", CLERI_CASE_SENSITIVE);
+ cleri_t * k_tail = cleri_keyword(CLERI_GID_K_TAIL, "tail", CLERI_CASE_SENSITIVE);
cleri_t * k_tee = cleri_keyword(CLERI_GID_K_TEE, "tee", CLERI_CASE_SENSITIVE);
cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE);
cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE);
k_and,
time_expr
);
+ cleri_t * head_expr = cleri_sequence(
+ CLERI_GID_HEAD_EXPR,
+ 2,
+ k_head,
+ int_expr
+ );
+ cleri_t * tail_expr = cleri_sequence(
+ CLERI_GID_TAIL_EXPR,
+ 2,
+ k_tail,
+ int_expr
+ );
cleri_t * access_expr = cleri_list(CLERI_GID_ACCESS_EXPR, access_keywords, cleri_token(CLERI_NONE, ","), 1, 0, 0);
cleri_t * prefix_expr = cleri_sequence(
CLERI_GID_PREFIX_EXPR,
cleri_optional(CLERI_NONE, cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 3,
+ 5,
after_expr,
between_expr,
- before_expr
+ before_expr,
+ tail_expr,
+ head_expr
)),
cleri_optional(CLERI_NONE, merge_as)
);