../src/siri/db/shard.c \
../src/siri/db/shards.c \
../src/siri/db/sset.c \
+../src/siri/db/tag.c \
+../src/siri/db/tags.c \
../src/siri/db/tasks.c \
../src/siri/db/tee.c \
../src/siri/db/time.c \
./src/siri/db/shard.o \
./src/siri/db/shards.o \
./src/siri/db/sset.o \
+./src/siri/db/tag.o \
+./src/siri/db/tags.o \
./src/siri/db/tasks.o \
./src/siri/db/tee.o \
./src/siri/db/time.o \
./src/siri/db/shard.d \
./src/siri/db/shards.d \
./src/siri/db/sset.d \
+./src/siri/db/tag.d \
+./src/siri/db/tags.d \
./src/siri/db/tasks.d \
./src/siri/db/tee.d \
./src/siri/db/time.d \
../src/siri/db/shard.c \
../src/siri/db/shards.c \
../src/siri/db/sset.c \
+../src/siri/db/tag.c \
+../src/siri/db/tags.c \
../src/siri/db/tasks.c \
../src/siri/db/tee.c \
../src/siri/db/time.c \
./src/siri/db/shard.o \
./src/siri/db/shards.o \
./src/siri/db/sset.o \
+./src/siri/db/tag.o \
+./src/siri/db/tags.o \
./src/siri/db/tasks.o \
./src/siri/db/tee.o \
./src/siri/db/time.o \
./src/siri/db/shard.d \
./src/siri/db/shards.d \
./src/siri/db/sset.d \
+./src/siri/db/tag.d \
+./src/siri/db/tags.d \
./src/siri/db/tasks.d \
./src/siri/db/tee.d \
./src/siri/db/time.d \
Keyword('symmetric_difference'),
most_greedy=False)
k_sync_progress = Keyword('sync_progress')
+ k_tag = Keyword('tag')
+ k_tags = Keyword('tags')
k_tee_pipe_name = Keyword('tee_pipe_name')
k_timeit = Keyword('timeit')
k_timezone = Keyword('timezone')
Tokens(', |'),
Keyword('union'),
most_greedy=False)
+ k_untag = Keyword('untag')
k_uptime = Keyword('uptime')
k_user = Keyword('user')
k_users = Keyword('users')
k_access,
most_greedy=False), ',', 1)
+ tag_columns = List(Choice(
+ k_name,
+ k_series,
+ most_greedy=False), ',', 1)
+
pool_props = Choice(
k_pool,
k_servers,
Sequence(THIS, k_and, THIS),
Sequence(THIS, k_or, THIS)))
+ # where tag
+ where_tag = Sequence(k_where, Prio(
+ Sequence(k_name, str_operator, string),
+ Sequence(k_series, int_operator, int_expr),
+ Sequence('(', THIS, ')'),
+ Sequence(THIS, k_and, THIS),
+ Sequence(THIS, k_or, THIS)))
+
# where pool
where_pool = Sequence(k_where, Prio(
Sequence(pool_props, int_operator, int_expr),
series_all = Choice(Token('*'), k_all, most_greedy=False)
series_name = Repeat(string, 1, 1)
group_name = Repeat(r_grave_str, 1, 1)
+ tag_name = Repeat(r_grave_str, 1, 1)
series_re = Repeat(r_regex, 1, 1)
uuid = Choice(r_uuid_str, string, most_greedy=False)
- group_match = Repeat(r_grave_str, 1, 1)
+ group_tag_match = Repeat(r_grave_str, 1, 1)
series_match = Prio(
List(Choice(
series_all,
series_name,
- group_match,
+ group_tag_match,
series_re,
most_greedy=False), series_setopr, 1),
Choice(
series_all,
series_name,
- group_match,
+ group_tag_match,
series_re,
most_greedy=False),
series_parentheses,
set_select_points_limit = Sequence(
k_set, k_select_points_limit, r_uinteger)
set_timezone = Sequence(k_set, k_timezone, string)
+ tag_series = Sequence(k_tag, tag_name)
+ untag_series = Sequence(k_untag, tag_name)
set_expiration_num = Sequence(
k_set,
k_expiration_num,
- time_expr,
+ time_expr,
Optional(set_ignore_threshold))
set_expiration_log = Sequence(
k_set,
k_expiration_log,
- time_expr,
+ time_expr,
Optional(set_ignore_threshold))
alter_database = Sequence(k_database, Choice(
set_name,
most_greedy=False))
+ alter_series = Sequence(
+ k_series,
+ series_match,
+ Optional(where_series),
+ Choice(tag_series, untag_series, most_greedy=False))
+
count_groups = Sequence(
k_groups, Optional(where_group))
+ count_tags = Sequence(
+ k_tags, Optional(where_tag))
count_pools = Sequence(
k_pools, Optional(where_pool))
count_series = Sequence(
k_user, string, set_password)
drop_group = Sequence(k_group, group_name)
+ drop_tag = Sequence(k_tag, tag_name)
+
# Drop statement needs at least a series_math or where STMT or both
drop_series = Sequence(
k_series,
list_groups = Sequence(
k_groups, Optional(group_columns), Optional(where_group))
+ list_tags = Sequence(
+ k_tags, Optional(tag_columns), Optional(where_tag))
list_pools = Sequence(
k_pools, Optional(pool_columns), Optional(where_pool))
list_series = Sequence(
revoke_user = Sequence(k_user, string)
alter_stmt = Sequence(k_alter, Choice(
+ alter_series,
alter_user,
alter_group,
alter_server,
count_shards,
count_shards_size,
count_users,
+ count_tags,
count_series_length,
most_greedy=True))
drop_stmt = Sequence(k_drop, Choice(
drop_group,
+ drop_tag,
drop_series,
drop_shards,
drop_server,
list_stmt = Sequence(k_list, Choice(
list_series,
+ list_tags,
list_users,
list_shards,
list_groups,
#include <siri/db/time.h>
#include <siri/db/buffer.h>
#include <siri/db/tee.h>
+#include <siri/db/tags.h>
int32_t siridb_get_uptime(siridb_t * siridb);
siridb_replicate_t * replicate;
siridb_reindex_t * reindex;
siridb_groups_t * groups;
+ siridb_tags_t * tags;
siridb_buffer_t * buffer;
siridb_tee_t * tee;
siridb_tasks_t tasks;
#define SIRIDB_GROUP_H_
#define PCRE2_CODE_UNIT_WIDTH 8
-#define GROUP_FLAG_INIT 1
-#define GROUP_FLAG_DROPPED 2
+
+enum
+{
+ GROUP_FLAG_INIT = 1<<0,
+ GROUP_FLAG_DROPPED = 1<<1,
+};
typedef struct siridb_group_s siridb_group_t;
void siridb__group_decref(siridb_group_t * group);
void siridb__group_free(siridb_group_t * group);
-#define siridb_group_incref(group) group->ref++
+#define siridb_group_incref(group__) (group__)->ref++
#define siridb_group_decref(group__) \
- if (!--group__->ref) siridb__group_free(group__)
+ if (!--(group__)->ref) siridb__group_free(group__)
struct siridb_group_s
{
typedef struct siridb_groups_s siridb_groups_t;
-#define GROUPS_FLAG_DROPPED_SERIES 1
+enum
+{
+ GROUPS_FLAG_DROPPED_SERIES = 1<<0,
+};
#include <ctree/ctree.h>
#include <vec/vec.h>
const char * name,
char * err_msg);
void siridb_groups_destroy(siridb_groups_t * groups);
+void siridb_groups_incref(siridb_groups_t * groups);
void siridb_groups_decref(siridb_groups_t * groups);
int siridb_groups_add_group(
siridb_groups_t * groups,
--- /dev/null
+/*
+ * tag.h - Tag (tag series).
+ */
+#ifndef SIRIDB_TAG_H_
+#define SIRIDB_TAG_H_
+
+typedef struct siridb_tag_s siridb_tag_t;
+
+enum
+{
+ TAG_FLAG_CLEANUP = 1<<0,
+ TAG_FLAG_REQUIRE_SAVE = 1<<1,
+};
+
+#include <inttypes.h>
+#include <imap/imap.h>
+#include <siri/db/db.h>
+
+siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path);
+void siridb__tag_decref(siridb_tag_t * tag);
+void siridb__tag_free(siridb_tag_t * tag);
+int siridb_tag_is_valid_fn(const char * fn);
+siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn);
+int siridb_tag_save(siridb_tag_t * tag);
+int siridb_tag_is_remote_prop(uint32_t prop);
+void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop);
+int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond);
+
+
+#define siridb_tag_incref(tag__) (tag__)->ref++
+#define siridb_tag_decref(tag__) \
+ if (!--(tag__)->ref) siridb__tag_free(tag__)
+
+struct siridb_tag_s
+{
+ uint16_t ref;
+ uint16_t flags;
+ uint32_t id;
+ char * name;
+ char * fn;
+ imap_t * series;
+};
+
+
+#endif /* SIRIDB_TAG_H_ */
--- /dev/null
+/*
+ * tags.h - Tag (tagged series).
+ */
+#ifndef SIRIDB_TAGS_H_
+#define SIRIDB_TAGS_H_
+
+typedef struct siridb_tags_s siridb_tags_t;
+
+#include <inttypes.h>
+#include <ctree/ctree.h>
+#include <vec/vec.h>
+#include <uv.h>
+#include <siri/db/db.h>
+#include <siri/db/tag.h>
+
+#define SIRIDB_TAGS_PATH "tags/"
+
+enum
+{
+ TAGS_FLAG_DROPPED_SERIES = 1<<0,
+ TAGS_FLAG_REQUIRE_SAVE = 1<<1,
+};
+
+struct siridb_tags_s
+{
+ uint16_t flags;
+ uint16_t ref;
+ uint32_t next_id;
+ char * path;
+ ct_t * tags;
+ vec_t * cleanup;
+ uv_mutex_t mutex;
+};
+
+int siridb_tags_init(siridb_t * siridb);
+void siridb_tags_incref(siridb_tags_t * tags);
+void siridb_tags_decref(siridb_tags_t * tags);
+siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name);
+sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid);
+ct_t * siridb_tags_lookup(siridb_tags_t * tags);
+void siridb_tags_cleanup(uv_async_t * handle);
+void siridb_tags_dropped_series(siridb_tags_t * tags);
+void siridb_tags_save(siridb_tags_t * tags);
+
+
+static inline void siridb_tags_set_require_save(
+ siridb_tags_t * tags,
+ siridb_tag_t * tag)
+{
+ tags->flags |= TAGS_FLAG_REQUIRE_SAVE;
+ tag->flags |= TAG_FLAG_REQUIRE_SAVE;
+}
+
+#endif /* SIRIDB_TAGS_H_ */
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2020-01-23 14:08:47
+ * Created at: 2020-06-17 15:21:09
*/
#ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
#define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_
CLERI_GID_AGGREGATE_FUNCTIONS,
CLERI_GID_ALTER_DATABASE,
CLERI_GID_ALTER_GROUP,
+ CLERI_GID_ALTER_SERIES,
CLERI_GID_ALTER_SERVER,
CLERI_GID_ALTER_SERVERS,
CLERI_GID_ALTER_STMT,
CLERI_GID_COUNT_SHARDS,
CLERI_GID_COUNT_SHARDS_SIZE,
CLERI_GID_COUNT_STMT,
+ CLERI_GID_COUNT_TAGS,
CLERI_GID_COUNT_USERS,
CLERI_GID_CREATE_GROUP,
CLERI_GID_CREATE_STMT,
CLERI_GID_DROP_SERVER,
CLERI_GID_DROP_SHARDS,
CLERI_GID_DROP_STMT,
+ CLERI_GID_DROP_TAG,
CLERI_GID_DROP_USER,
CLERI_GID_F_ALL,
CLERI_GID_F_COUNT,
CLERI_GID_GRANT_STMT,
CLERI_GID_GRANT_USER,
CLERI_GID_GROUP_COLUMNS,
- CLERI_GID_GROUP_MATCH,
CLERI_GID_GROUP_NAME,
+ CLERI_GID_GROUP_TAG_MATCH,
CLERI_GID_HELP_ACCESS,
CLERI_GID_HELP_ALTER,
CLERI_GID_HELP_ALTER_DATABASE,
CLERI_GID_K_SUM,
CLERI_GID_K_SYMMETRIC_DIFFERENCE,
CLERI_GID_K_SYNC_PROGRESS,
+ CLERI_GID_K_TAG,
+ CLERI_GID_K_TAGS,
CLERI_GID_K_TEE_PIPE_NAME,
CLERI_GID_K_TIMEIT,
CLERI_GID_K_TIMEZONE,
CLERI_GID_K_TRUE,
CLERI_GID_K_TYPE,
CLERI_GID_K_UNION,
+ CLERI_GID_K_UNTAG,
CLERI_GID_K_UPTIME,
CLERI_GID_K_USER,
CLERI_GID_K_USERS,
CLERI_GID_LIST_SERVERS,
CLERI_GID_LIST_SHARDS,
CLERI_GID_LIST_STMT,
+ CLERI_GID_LIST_TAGS,
CLERI_GID_LIST_USERS,
CLERI_GID_LOG_KEYWORDS,
CLERI_GID_MERGE_AS,
CLERI_GID_STRING,
CLERI_GID_STR_OPERATOR,
CLERI_GID_SUFFIX_EXPR,
+ CLERI_GID_TAG_COLUMNS,
+ CLERI_GID_TAG_NAME,
+ CLERI_GID_TAG_SERIES,
CLERI_GID_TIMEIT_STMT,
CLERI_GID_TIME_EXPR,
+ CLERI_GID_UNTAG_SERIES,
CLERI_GID_USER_COLUMNS,
CLERI_GID_UUID,
CLERI_GID_WHERE_GROUP,
CLERI_GID_WHERE_SERIES,
CLERI_GID_WHERE_SERVER,
CLERI_GID_WHERE_SHARD,
+ CLERI_GID_WHERE_TAG,
CLERI_GID_WHERE_USER,
CLERI_GID__BOOLEAN,
CLERI_END // can be used to get the enum length
BPROTO_DISABLE_BACKUP_MODE, /* empty */
BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */
BPROTO_DROP_DATABASE, /* empty */
+ BPROTO_REQ_TAGS, /* empty */
} bproto_client_t;
/*
BPROTO_RES_GROUPS, /* [[name, series], ...] */
BPROTO_ACK_TEE_PIPE_NAME, /* empty */
BPROTO_ACK_DROP_DATABASE, /* empty */
+ BPROTO_RES_TAGS, /* [[name, series], ...] */
} bproto_server_t;
return NULL;
}
+ /* load tags */
+ if (siridb_tags_init(siridb))
+ {
+ log_error("Cannot read tags for database '%s'", siridb->dbname);
+ siridb_decref(siridb);
+ return NULL;
+ }
+
/* update series props */
log_info("Updating series properties");
siridb_groups_decref(siridb->groups);
}
+ if (siridb->tags != NULL)
+ {
+ siridb_tags_decref(siridb->tags);
+ }
+
if (siridb->tee != NULL)
{
siridb_tee_free(siridb->tee);
}
+
/* unlock the database in case no siri_err occurred */
if (!siri_err)
{
}
else
{
- groups->ref = 2; /* for the main thread and for the groups thread */
+ groups->ref = 1;
groups->fn = NULL;
groups->groups = ct_new();
groups->nseries = vec_new(VEC_DEFAULT_SIZE);
return 0;
}
+void siridb_groups_incref(siridb_groups_t * groups)
+{
+ groups->ref++;
+}
+
void siridb_groups_decref(siridb_groups_t * groups)
{
if (!--groups->ref)
siridb_groups_t * groups = siridb->groups;
uint64_t mod_test = 0;
+ siridb_groups_incref(siridb->groups);
+ siridb_tags_incref(siridb->tags);
+
while (groups->status != GROUPS_STOPPING)
{
sleep(GROUPS_LOOP_SLEEP);
{
GROUPS_cleanup(siridb->groups);
}
+ if (siridb->tags->flags & TAGS_FLAG_DROPPED_SERIES)
+ {
+ siridb_tags_dropped_series(siridb->tags);
+ }
+ if (siridb->tags->flags & TAGS_FLAG_REQUIRE_SAVE)
+ {
+ siridb_tags_save(siridb->tags);
+ }
break;
case GROUPS_STOPPING:
}
groups->status = GROUPS_CLOSED;
+
+ siridb_tags_decref(siridb->tags);
siridb_groups_decref(siridb->groups);
}
}
siridb->groups->flags |= GROUPS_FLAG_DROPPED_SERIES;
+ siridb->tags->flags |= TAGS_FLAG_DROPPED_SERIES;
return rc;
}
--- /dev/null
+/*
+ * tag.c - Tag.
+ *
+ * author : Jeroen van der Heijden
+ * email : jeroen@transceptor.technology
+ * copyright : 2017, Transceptor Technology
+ *
+ * changes
+ * - initial version, 16-06-2017
+ *
+ */
+#define _GNU_SOURCE
+#include <assert.h>
+#include <logger/logger.h>
+#include <siri/db/tag.h>
+#include <stdlib.h>
+#include <siri/db/series.h>
+#include <ctype.h>
+#include <uv.h>
+#include <unistd.h>
+#include <siri/grammar/grammar.h>
+
+#define TAGFN_NUMBERS 9
+
+/*
+ * Returns tag when successful or NULL in case of an error.
+ */
+siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path)
+{
+ siridb_tag_t * tag = (siridb_tag_t *) malloc(sizeof(siridb_tag_t));
+ if (tag != NULL)
+ {
+ tag->ref = 1;
+ tag->flags = 0;
+ tag->id = id;
+ tag->name = NULL;
+ ;
+ tag->series = imap_new();
+
+ if (asprintf(
+ &tag->fn,
+ "%s%0*" PRIu32 ".tag",
+ tags_path,
+ TAGFN_NUMBERS,
+ id) < 0 || tag->series == NULL)
+ {
+ siridb__tag_free(tag);
+ tag = NULL;
+ }
+ }
+ return tag;
+}
+
+/*
+ * Returns tag when successful or NULL in case of an error.
+ */
+siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn)
+{
+ siridb_tag_t * tag = siridb_tag_new(
+ (uint32_t) atoll(fn),
+ siridb->tags->path);
+ if (tag != NULL)
+ {
+ qp_unpacker_t * unpacker = qp_unpacker_ff(tag->fn);
+ if (unpacker == NULL)
+ {
+ log_critical("cannot open tag file for reading: %s", tag->fn);
+ siridb__tag_free(tag);
+ tag = NULL;
+ }
+ else
+ {
+ qp_obj_t qp_tn;
+
+ if (!qp_is_array(qp_next(unpacker, NULL)) ||
+ qp_next(unpacker, &qp_tn) != QP_RAW ||
+ (tag->name = strndup(qp_tn.via.raw, qp_tn.len)) == NULL)
+ {
+ /* or a memory allocation error, but the same result */
+ log_critical(
+ "expected an array with a tag name in file: %s",
+ tag->fn);
+ siridb__tag_free(tag);
+ tag = NULL;
+ }
+ else
+ {
+ qp_obj_t qp_series_id;
+ uint64_t series_id;
+ siridb_series_t * series;
+
+ while (qp_next(unpacker, &qp_series_id) == QP_INT64)
+ {
+ series_id = (uint64_t) qp_series_id.via.int64;
+ series = imap_get(siridb->series_map, series_id);
+
+ if (series == NULL)
+ {
+ siridb_tags_require_save(siridb->tags, tag);
+
+ log_error(
+ "cannot find series id %" PRId64
+ " which was tagged with '%s'",
+ qp_series_id.via.int64,
+ tag->name);
+ }
+ else if (imap_add(tag->series, series_id, series) == 0)
+ {
+ siridb_series_incref(series);
+ }
+ else
+ {
+ log_critical(
+ "cannot add series '%s' to tag '%s'",
+ series->name,
+ tag->name);
+ }
+ }
+ }
+ qp_unpacker_ff_free(unpacker);
+ }
+ }
+ return tag;
+}
+
+/*
+ * Lock is required
+ */
+int siridb_tag_save(siridb_tag_t * tag)
+{
+ qp_fpacker_t * fpacker;
+
+ fpacker = qp_open(tag->fn, "w");
+ if (fpacker == NULL)
+ {
+ return -1;
+ }
+
+ if (/* open a new array */
+ qp_fadd_type(fpacker, QP_ARRAY_OPEN) ||
+
+ /* write the tag name */
+ qp_fadd_string(fpacker, tag->name))
+ {
+ qp_close(fpacker);
+ return -1;
+ }
+
+ /* TODO: maybe replace with walk */
+ vec_t * series_list = imap_vec(tag->series);
+
+ if (series_list != NULL)
+ {
+ siridb_series_t * series;
+ for (size_t i = 0; i < series_list->len; i++)
+ {
+ series = (siridb_series_t *) series_list->data[i];
+ qp_fadd_int64(fpacker, (int64_t) series->id);
+ }
+ }
+
+ if (qp_close(fpacker) || series_list == NULL)
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Returns true when the given property (CLERI keyword) needs a remote query
+ */
+int siridb_tag_is_remote_prop(uint32_t prop)
+{
+ return (prop == CLERI_GID_K_SERIES) ? 1 : 0;
+}
+
+/*
+ * This function can raise a SIGNAL. In this case the packer is not filled
+ * with the correct values.
+ */
+void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop)
+{
+ switch (prop)
+ {
+ case CLERI_GID_K_NAME:
+ qp_add_string(packer, tag->name);
+ break;
+ case CLERI_GID_K_SERIES:
+ qp_add_int64(packer, (int64_t) tag->id);
+ break;
+ }
+}
+
+int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond)
+{
+ switch (cond->prop)
+ {
+ case CLERI_GID_K_NAME:
+ return cexpr_str_cmp(cond->operator, tag->name, cond->str);
+ case CLERI_GID_K_SERIES:
+ return cexpr_int_cmp(cond->operator, (int64_t) tag->id, cond->int64);
+ }
+
+ log_critical("Unknown group property received: %d", cond->prop);
+ assert (0);
+ return -1;
+}
+
+/*
+ * Can be used as a callback, in other cases go for the macro.
+ */
+void siridb__tag_decref(siridb_tag_t * tag)
+{
+ if (!--tag->ref)
+ {
+ siridb__tag_free(tag);
+ }
+}
+
+/*
+ * NEVER call this function but rather call siridb_tag_decref instead.
+ *
+ * Destroy a tag object. Parsing NULL is not allowed.
+ */
+void siridb__tag_free(siridb_tag_t * tag)
+{
+#ifdef DEBUG
+ log_debug("Free tag: '%s'", tag->name);
+#endif
+
+ if ((tag->flags & TAG_FLAG_CLEANUP) && unlink(tag->fn))
+ {
+ log_critical("Cannot remove tag file: '%s'", tag->fn);
+ }
+ else if ((tag->flags & TAG_FLAG_REQUIRE_SAVE) && siridb_tag_save(tag))
+ {
+ log_critical("Cannot save tag file: '%s'", tag->fn);
+ }
+
+ free(tag->name);
+ free(tag->fn);
+ if (tag->series != NULL)
+ {
+ imap_free(tag->series, (imap_free_cb) siridb__series_decref);
+ }
+
+ free(tag);
+}
+
+/*
+ * Returns 1 (true) if the file name is valid and 0 (false) if not
+ */
+int siridb_tag_is_valid_fn(const char * fn)
+{
+ int i = 0;
+ while (*fn && isdigit(*fn))
+ {
+ fn++;
+ i++;
+ }
+ return (i == TAGFN_NUMBERS) ? (strcmp(fn, ".tag") == 0) : 0;
+}
--- /dev/null
+/*
+ * tags.c - Tags.
+ *
+ * author : Jeroen van der Heijden
+ * email : jeroen@transceptor.technology
+ * copyright : 2017, Transceptor Technology
+ *
+ * changes
+ * - initial version, 16-06-2017
+ *
+ */
+#define _GNU_SOURCE
+#include <assert.h>
+#include <logger/logger.h>
+#include <siri/db/tags.h>
+#include <stdlib.h>
+#include <vec/vec.h>
+#include <siri/db/series.h>
+#include <siri/net/protocol.h>
+#include <unistd.h>
+#include <siri/siri.h>
+
+static void TAGS_free(siridb_tags_t * tags);
+static int TAGS_load(siridb_t * siridb);
+static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer);
+static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup);
+static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list);
+static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag);
+
+/*
+ * Initialize tags. Returns 0 if successful or -1 in case of an error.
+ */
+int siridb_tags_init(siridb_t * siridb)
+{
+ log_info("Loading tags");
+ siridb->tags = (siridb_tags_t *) malloc(sizeof(siridb_tags_t));
+ if (siridb->tags == NULL)
+ {
+ return -1;
+ }
+ siridb->tags->flags = 0;
+ siridb->tags->ref = 1;
+ siridb->tags->tags = ct_new();
+ siridb->tags->cleanup = vec_new(VEC_DEFAULT_SIZE);
+ siridb->tags->next_id = 0;
+
+ uv_mutex_init(&siridb->tags->mutex);
+
+ if (asprintf(
+ &siridb->tags->path,
+ "%s%s",
+ siridb->dbpath,
+ SIRIDB_TAGS_PATH) < 0 ||
+ siridb->tags->tags == NULL ||
+ siridb->tags->cleanup == NULL ||
+ TAGS_load(siridb))
+ {
+ TAGS_free(siridb->tags);
+ siridb->tags = NULL;
+ return -1;
+ }
+
+ return 0;
+}
+
+void siridb_tags_incref(siridb_tags_t * tags)
+{
+ tags->ref++;
+}
+
+void siridb_tags_decref(siridb_tags_t * tags)
+{
+ if (!--tags->ref)
+ {
+ TAGS_free(tags);
+ }
+}
+
+siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
+{
+ siridb_tag_t * tag = siridb_tag_new(tags->next_id++, tags->path);
+ if (tag != NULL)
+ {
+ tag->name = strdup(name);
+ if (tag->name == NULL || ct_add(tags->tags, name, tag))
+ {
+ siridb_tag_decref(tag);
+ tag = NULL;
+ }
+ }
+ return tag;
+}
+
+/*
+ * Main thread.
+ *
+ * Returns NULL and raises a signal in case of an error.
+ */
+sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid)
+{
+ qp_packer_t * packer = sirinet_packer_new(8192);
+ int rc;
+
+ if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN))
+ {
+ return NULL; /* signal is raised */
+ }
+
+ rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer);
+
+ if (rc)
+ {
+ /* signal is raised when not 0 */
+ qp_packer_free(packer);
+ return NULL;
+ }
+
+ return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS);
+}
+
+/*
+ * This function will set and unset the mutex lock.
+ */
+void siridb_tags_cleanup(uv_async_t * handle)
+{
+ siridb_tags_t * tags = (siridb_tags_t *) handle->data;
+ siridb_tag_t * tag, * rmtag;
+
+ uv_mutex_lock(&tags->mutex);
+
+ while (tags->cleanup->len)
+ {
+ tag = (siridb_tag_t *) slist_pop(tags->cleanup);
+
+ if (!tag->series->len &&
+ (rmtag = (siridb_tag_t *) ct_pop(tags->tags, tag->name)) != NULL)
+ {
+#ifdef DEBUG
+ assert(rmtag == tag && (tag->flags & TAG_FLAG_CLEANUP));
+#endif
+ siridb_tag_decref(rmtag);
+ }
+ }
+
+ uv_mutex_unlock(&tags->mutex);
+
+ siridb_tags_decref(tags);
+
+ uv_close((uv_handle_t *) handle, (uv_close_cb) free);
+}
+
+ct_t * siridb_tags_lookup(siridb_tags_t * tags)
+{
+ ct_t * lookup = ct_new();
+ if (lookup != NULL)
+ {
+ ct_values(tags->tags, (ct_val_cb) &TAGS_ctmap_update, lookup);
+ }
+ return lookup;
+}
+
+/*
+ * This function is called from the "Group" thread.
+ */
+void siridb_tags_dropped_series(siridb_tags_t * tags)
+{
+ siridb_tag_t * tag;
+ vec_t * tags_list;
+
+ uv_mutex_lock(&tags->mutex);
+
+ tags_list = slist_new(tags->tags->len);
+
+ tags->flags &= ~TAGS_FLAG_DROPPED_SERIES;
+
+ ct_values(tags->tags, (ct_val_cb) TAGS_2slist, tags_list);
+
+ uv_mutex_unlock(&tags->mutex);
+
+ while (tags_list->len)
+ {
+ tag = (siridb_tag_t *) slist_pop(tags_list);
+
+ uv_mutex_lock(&tags->mutex);
+
+ TAGS_dropped_series(tags, tag);
+
+ siridb_tag_decref(tag);
+
+ uv_mutex_unlock(&tags->mutex);
+
+ usleep(10000); // 10ms
+ }
+
+ slist_free(tags_list);
+
+ if (tags->cleanup->len)
+ {
+ uv_async_t * cleanup = (uv_async_t *) malloc(sizeof(uv_async_t));
+
+ if (cleanup == NULL)
+ {
+ log_critical("Allocation error while creating cleanup task");
+ return;
+ }
+ siridb_tags_incref(tags);
+
+ cleanup->data = (void *) tags;
+
+ uv_async_init(siri.loop,
+ cleanup,
+ (uv_async_cb) siridb_tags_cleanup);
+ uv_async_send(cleanup);
+ }
+}
+
+void siridb_tags_save(siridb_tags_t * tags)
+{
+ siridb_tag_t * tag;
+ vec_t * tags_list;
+
+ uv_mutex_lock(&tags->mutex);
+
+ tags_list = slist_new(tags->tags->len);
+
+ tags->flags &= ~TAGS_FLAG_REQUIRE_SAVE;
+
+ ct_values(tags->tags, (ct_val_cb) TAGS_2slist, tags_list);
+
+ uv_mutex_unlock(&tags->mutex);
+
+ while (tags_list->len)
+ {
+ tag = (siridb_tag_t *) slist_pop(tags_list);
+
+ if (tag->flags & TAG_FLAG_REQUIRE_SAVE)
+ {
+ uv_mutex_lock(&tags->mutex);
+
+ siridb_tag_save(tag);
+
+ tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE;
+
+ uv_mutex_unlock(&tags->mutex);
+ }
+
+ siridb_tag_decref(tag);
+
+ usleep(10000); // 10ms
+ }
+
+ slist_free(tags_list);
+}
+
+/*
+ * This function is called from the "Group" thread.
+ */
+static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list)
+{
+ siridb_tag_incref(tag);
+ slist_append(tags_list, tag);
+ return 0;
+}
+
+/*
+ * This function is called from the "Group" thread.
+ */
+static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag)
+{
+ vec_t * tag_series = imap_slist_pop(tag->series);
+ siridb_series_t * series, * s = NULL;
+
+ if (tag_series != NULL)
+ {
+ for (size_t i = 0; i < tag_series->len; i++)
+ {
+ series = (siridb_series_t *) tag_series->data[i];
+ if (series->flags & SIRIDB_SERIES_IS_DROPPED)
+ {
+ s = (siridb_series_t *) imap_pop(tag->series, series->id);
+ assert (s != NULL);
+ siridb_series_decref(s);
+ }
+ }
+
+ if (s == NULL)
+ {
+ /* unchanged, we can put the list back */
+ tag->series->vec = tag_series;
+ }
+ else
+ {
+ slist_free(tag_series);
+
+ if (!tag->series->len && (~tag->flags & TAG_FLAG_CLEANUP))
+ {
+ tag->flags |= TAG_FLAG_CLEANUP;
+ if (slist_append_safe(&tags->cleanup, tag))
+ {
+ log_critical(
+ "Unexpected error while appending tag to "
+ "cleanup list");
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup)
+{
+ if (tag->series->len)
+ {
+ volatile uintptr_t iptr = (uint32_t) tag->series->len;
+ return ct_add(lookup, tag->name, (uint32_t *) iptr);
+ }
+ return 0;
+}
+
+/*
+ * Main thread.
+ */
+static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer)
+{
+ int rc = 0;
+ rc += qp_add_type(packer, QP_ARRAY2);
+ rc += qp_add_string_term(packer, tag->name);
+ rc += qp_add_int64(packer, (int64_t) tag->series->len);
+ return rc;
+}
+
+static int TAGS_load(siridb_t * siridb)
+{
+ struct stat st = {0};
+ struct dirent ** tags_list;
+ int total, n, rc;
+ siridb_tag_t * tag;
+
+ if (stat(siridb->tags->path, &st) == -1)
+ {
+ log_warning(
+ "Tags directory not found, creating directory '%s'.",
+ siridb->tags->path);
+ if (mkdir(siridb->tags->path, 0700) == -1)
+ {
+ log_error("Cannot create directory '%s'.", siridb->tags->path);
+ return -1;
+ }
+ }
+
+ total = scandir(siridb->tags->path, &tags_list, NULL, alphasort);
+
+ if (total < 0)
+ {
+ /* no need to free tags_list when total < 0 */
+ log_error("Cannot read tags directory '%s'.", siridb->tags->path);
+ return -1;
+ }
+
+ rc = 0;
+
+ for (n = 0; n < total; n++)
+ {
+ if (!siridb_tag_is_valid_fn(tags_list[n]->d_name))
+ {
+ continue;
+ }
+
+ /* we are sure this fits since the filename is checked */
+ tag = siridb_tag_load(siridb, tags_list[n]->d_name);
+ if (tag == NULL)
+ {
+ log_error("Error while loading tag: '%s'", tags_list[n]->d_name);
+ rc = -1;
+ break;
+ }
+
+ if (!tag->series->len)
+ {
+ log_warning("Removing tag '%s' since it has no series", tag->name);
+ tag->flags |= TAG_FLAG_CLEANUP;
+ siridb_tag_decref(tag);
+ continue;
+ }
+
+ if (ct_add(siridb->tags->tags, tag->name, tag))
+ {
+ log_error("Cannot add tag to collection");
+ siridb_tag_decref(tag);
+ rc = -1;
+ break;
+ }
+
+ if (tag->id >= siridb->tags->next_id)
+ {
+ siridb->tags->next_id = tag->id + 1;
+ }
+
+ }
+
+ while (total--)
+ {
+ free(tags_list[total]);
+ }
+
+ free(tags_list);
+
+ return rc;
+}
+
+static void TAGS_free(siridb_tags_t * tags)
+{
+#ifdef DEBUG
+ log_debug("Free tags");
+#endif
+ free(tags->path);
+
+ if (tags->cleanup != NULL)
+ {
+ slist_free(tags->cleanup);
+ }
+
+ uv_mutex_lock(&tags->mutex);
+
+ if (tags->tags != NULL)
+ {
+ ct_free(tags->tags, (ct_free_cb) siridb__tag_decref);
+ }
+
+ uv_mutex_unlock(&tags->mutex);
+
+ uv_mutex_destroy(&tags->mutex);
+
+ free(tags);
+}
* should be used with the libcleri module.
*
* Source class: SiriGrammar
- * Created at: 2020-01-23 14:08:47
+ * Created at: 2020-06-17 15:21:09
*/
#include "siri/grammar/grammar.h"
cleri_keyword(CLERI_NONE, "symmetric_difference", CLERI_CASE_SENSITIVE)
);
cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE);
+ cleri_t * k_tag = cleri_keyword(CLERI_GID_K_TAG, "tag", CLERI_CASE_SENSITIVE);
+ cleri_t * k_tags = cleri_keyword(CLERI_GID_K_TAGS, "tags", CLERI_CASE_SENSITIVE);
cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE);
cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE);
cleri_t * k_timezone = cleri_keyword(CLERI_GID_K_TIMEZONE, "timezone", CLERI_CASE_SENSITIVE);
cleri_tokens(CLERI_NONE, ", |"),
cleri_keyword(CLERI_NONE, "union", CLERI_CASE_SENSITIVE)
);
+ cleri_t * k_untag = cleri_keyword(CLERI_GID_K_UNTAG, "untag", CLERI_CASE_SENSITIVE);
cleri_t * k_uptime = cleri_keyword(CLERI_GID_K_UPTIME, "uptime", CLERI_CASE_SENSITIVE);
cleri_t * k_user = cleri_keyword(CLERI_GID_K_USER, "user", CLERI_CASE_SENSITIVE);
cleri_t * k_users = cleri_keyword(CLERI_GID_K_USERS, "users", CLERI_CASE_SENSITIVE);
k_name,
k_access
), cleri_token(CLERI_NONE, ","), 1, 0, 0);
+ cleri_t * tag_columns = cleri_list(CLERI_GID_TAG_COLUMNS, cleri_choice(
+ CLERI_NONE,
+ CLERI_FIRST_MATCH,
+ 2,
+ k_name,
+ k_series
+ ), cleri_token(CLERI_NONE, ","), 1, 0, 0);
cleri_t * pool_props = cleri_choice(
CLERI_GID_POOL_PROPS,
CLERI_FIRST_MATCH,
)
)
);
+ cleri_t * where_tag = cleri_sequence(
+ CLERI_GID_WHERE_TAG,
+ 2,
+ k_where,
+ cleri_prio(
+ CLERI_NONE,
+ 5,
+ cleri_sequence(
+ CLERI_NONE,
+ 3,
+ k_name,
+ str_operator,
+ string
+ ),
+ cleri_sequence(
+ CLERI_NONE,
+ 3,
+ k_series,
+ int_operator,
+ int_expr
+ ),
+ cleri_sequence(
+ CLERI_NONE,
+ 3,
+ cleri_token(CLERI_NONE, "("),
+ CLERI_THIS,
+ cleri_token(CLERI_NONE, ")")
+ ),
+ cleri_sequence(
+ CLERI_NONE,
+ 3,
+ CLERI_THIS,
+ k_and,
+ CLERI_THIS
+ ),
+ cleri_sequence(
+ CLERI_NONE,
+ 3,
+ CLERI_THIS,
+ k_or,
+ CLERI_THIS
+ )
+ )
+ );
cleri_t * where_pool = cleri_sequence(
CLERI_GID_WHERE_POOL,
2,
);
cleri_t * series_name = cleri_dup(CLERI_GID_SERIES_NAME, string);
cleri_t * group_name = cleri_dup(CLERI_GID_GROUP_NAME, r_grave_str);
+ cleri_t * tag_name = cleri_dup(CLERI_GID_TAG_NAME, r_grave_str);
cleri_t * series_re = cleri_dup(CLERI_GID_SERIES_RE, r_regex);
cleri_t * uuid = cleri_choice(
CLERI_GID_UUID,
r_uuid_str,
string
);
- cleri_t * group_match = cleri_dup(CLERI_GID_GROUP_MATCH, r_grave_str);
+ cleri_t * group_tag_match = cleri_dup(CLERI_GID_GROUP_TAG_MATCH, r_grave_str);
cleri_t * series_match = cleri_prio(
CLERI_GID_SERIES_MATCH,
4,
4,
series_all,
series_name,
- group_match,
+ group_tag_match,
series_re
), series_setopr, 1, 0, 0),
cleri_choice(
4,
series_all,
series_name,
- group_match,
+ group_tag_match,
series_re
),
series_parentheses,
k_timezone,
string
);
+ cleri_t * tag_series = cleri_sequence(
+ CLERI_GID_TAG_SERIES,
+ 2,
+ k_tag,
+ tag_name
+ );
+ cleri_t * untag_series = cleri_sequence(
+ CLERI_GID_UNTAG_SERIES,
+ 2,
+ k_untag,
+ tag_name
+ );
cleri_t * set_expiration_num = cleri_sequence(
CLERI_GID_SET_EXPIRATION_NUM,
4,
set_name
)
);
+ cleri_t * alter_series = cleri_sequence(
+ CLERI_GID_ALTER_SERIES,
+ 4,
+ k_series,
+ series_match,
+ cleri_optional(CLERI_NONE, where_series),
+ cleri_choice(
+ CLERI_NONE,
+ CLERI_FIRST_MATCH,
+ 2,
+ tag_series,
+ untag_series
+ )
+ );
cleri_t * count_groups = cleri_sequence(
CLERI_GID_COUNT_GROUPS,
2,
k_groups,
cleri_optional(CLERI_NONE, where_group)
);
+ cleri_t * count_tags = cleri_sequence(
+ CLERI_GID_COUNT_TAGS,
+ 2,
+ k_tags,
+ cleri_optional(CLERI_NONE, where_tag)
+ );
cleri_t * count_pools = cleri_sequence(
CLERI_GID_COUNT_POOLS,
2,
k_group,
group_name
);
+ cleri_t * drop_tag = cleri_sequence(
+ CLERI_GID_DROP_TAG,
+ 2,
+ k_tag,
+ tag_name
+ );
cleri_t * drop_series = cleri_sequence(
CLERI_GID_DROP_SERIES,
4,
cleri_optional(CLERI_NONE, group_columns),
cleri_optional(CLERI_NONE, where_group)
);
+ cleri_t * list_tags = cleri_sequence(
+ CLERI_GID_LIST_TAGS,
+ 3,
+ k_tags,
+ cleri_optional(CLERI_NONE, tag_columns),
+ cleri_optional(CLERI_NONE, where_tag)
+ );
cleri_t * list_pools = cleri_sequence(
CLERI_GID_LIST_POOLS,
3,
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 5,
+ 6,
+ alter_series,
alter_user,
alter_group,
alter_server,
cleri_choice(
CLERI_NONE,
CLERI_MOST_GREEDY,
- 10,
+ 11,
count_groups,
count_pools,
count_series,
count_shards,
count_shards_size,
count_users,
+ count_tags,
count_series_length
)
);
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 5,
+ 6,
drop_group,
+ drop_tag,
drop_series,
drop_shards,
drop_server,
cleri_choice(
CLERI_NONE,
CLERI_FIRST_MATCH,
- 6,
+ 7,
list_series,
+ list_tags,
list_users,
list_shards,
list_groups,
case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
+ case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
default:
sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
return protocol_str;
case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS";
case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME";
case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE";
+ case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS";
default:
sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
return protocol_str;