From: Jeroen van der Heijden Date: Wed, 17 Jun 2020 14:54:02 +0000 (+0200) Subject: Work on tag support X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~3^2~6^2~12 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=6e41faffb03240060f9bb6866786ea27d12396f9;p=siridb-server.git Work on tag support --- diff --git a/Debug/src/siri/db/subdir.mk b/Debug/src/siri/db/subdir.mk index 668065df..f189bf83 100644 --- a/Debug/src/siri/db/subdir.mk +++ b/Debug/src/siri/db/subdir.mk @@ -34,6 +34,8 @@ C_SRCS += \ ../src/siri/db/shard.c \ ../src/siri/db/shards.c \ ../src/siri/db/sset.c \ +../src/siri/db/tag.c \ +../src/siri/db/tags.c \ ../src/siri/db/tasks.c \ ../src/siri/db/tee.c \ ../src/siri/db/time.c \ @@ -77,6 +79,8 @@ OBJS += \ ./src/siri/db/shard.o \ ./src/siri/db/shards.o \ ./src/siri/db/sset.o \ +./src/siri/db/tag.o \ +./src/siri/db/tags.o \ ./src/siri/db/tasks.o \ ./src/siri/db/tee.o \ ./src/siri/db/time.o \ @@ -120,6 +124,8 @@ C_DEPS += \ ./src/siri/db/shard.d \ ./src/siri/db/shards.d \ ./src/siri/db/sset.d \ +./src/siri/db/tag.d \ +./src/siri/db/tags.d \ ./src/siri/db/tasks.d \ ./src/siri/db/tee.d \ ./src/siri/db/time.d \ diff --git a/Release/src/siri/db/subdir.mk b/Release/src/siri/db/subdir.mk index 6618c22f..17bd2152 100644 --- a/Release/src/siri/db/subdir.mk +++ b/Release/src/siri/db/subdir.mk @@ -34,6 +34,8 @@ C_SRCS += \ ../src/siri/db/shard.c \ ../src/siri/db/shards.c \ ../src/siri/db/sset.c \ +../src/siri/db/tag.c \ +../src/siri/db/tags.c \ ../src/siri/db/tasks.c \ ../src/siri/db/tee.c \ ../src/siri/db/time.c \ @@ -77,6 +79,8 @@ OBJS += \ ./src/siri/db/shard.o \ ./src/siri/db/shards.o \ ./src/siri/db/sset.o \ +./src/siri/db/tag.o \ +./src/siri/db/tags.o \ ./src/siri/db/tasks.o \ ./src/siri/db/tee.o \ ./src/siri/db/time.o \ @@ -120,6 +124,8 @@ C_DEPS += \ ./src/siri/db/shard.d \ ./src/siri/db/shards.d \ ./src/siri/db/sset.d \ +./src/siri/db/tag.d \ +./src/siri/db/tags.d \ ./src/siri/db/tasks.d \ ./src/siri/db/tee.d \ ./src/siri/db/time.d \ diff --git a/grammar/grammar.py b/grammar/grammar.py index dd7ec0f0..fca6316d 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -161,6 +161,8 @@ class SiriGrammar(Grammar): Keyword('symmetric_difference'), most_greedy=False) k_sync_progress = Keyword('sync_progress') + k_tag = Keyword('tag') + k_tags = Keyword('tags') k_tee_pipe_name = Keyword('tee_pipe_name') k_timeit = Keyword('timeit') k_timezone = Keyword('timezone') @@ -172,6 +174,7 @@ class SiriGrammar(Grammar): Tokens(', |'), Keyword('union'), most_greedy=False) + k_untag = Keyword('untag') k_uptime = Keyword('uptime') k_user = Keyword('user') k_users = Keyword('users') @@ -295,6 +298,11 @@ class SiriGrammar(Grammar): k_access, most_greedy=False), ',', 1) + tag_columns = List(Choice( + k_name, + k_series, + most_greedy=False), ',', 1) + pool_props = Choice( k_pool, k_servers, @@ -317,6 +325,14 @@ class SiriGrammar(Grammar): Sequence(THIS, k_and, THIS), Sequence(THIS, k_or, THIS))) + # where tag + where_tag = Sequence(k_where, Prio( + Sequence(k_name, str_operator, string), + Sequence(k_series, int_operator, int_expr), + Sequence('(', THIS, ')'), + Sequence(THIS, k_and, THIS), + Sequence(THIS, k_or, THIS))) + # where pool where_pool = Sequence(k_where, Prio( Sequence(pool_props, int_operator, int_expr), @@ -421,20 +437,21 @@ class SiriGrammar(Grammar): series_all = Choice(Token('*'), k_all, most_greedy=False) series_name = Repeat(string, 1, 1) group_name = Repeat(r_grave_str, 1, 1) + tag_name = Repeat(r_grave_str, 1, 1) series_re = Repeat(r_regex, 1, 1) uuid = Choice(r_uuid_str, string, most_greedy=False) - group_match = Repeat(r_grave_str, 1, 1) + group_tag_match = Repeat(r_grave_str, 1, 1) series_match = Prio( List(Choice( series_all, series_name, - group_match, + group_tag_match, series_re, most_greedy=False), series_setopr, 1), Choice( series_all, series_name, - group_match, + group_tag_match, series_re, most_greedy=False), series_parentheses, @@ -593,15 +610,17 @@ class SiriGrammar(Grammar): set_select_points_limit = Sequence( k_set, k_select_points_limit, r_uinteger) set_timezone = Sequence(k_set, k_timezone, string) + tag_series = Sequence(k_tag, tag_name) + untag_series = Sequence(k_untag, tag_name) set_expiration_num = Sequence( k_set, k_expiration_num, - time_expr, + time_expr, Optional(set_ignore_threshold)) set_expiration_log = Sequence( k_set, k_expiration_log, - time_expr, + time_expr, Optional(set_ignore_threshold)) alter_database = Sequence(k_database, Choice( @@ -636,8 +655,16 @@ class SiriGrammar(Grammar): set_name, most_greedy=False)) + alter_series = Sequence( + k_series, + series_match, + Optional(where_series), + Choice(tag_series, untag_series, most_greedy=False)) + count_groups = Sequence( k_groups, Optional(where_group)) + count_tags = Sequence( + k_tags, Optional(where_tag)) count_pools = Sequence( k_pools, Optional(where_pool)) count_series = Sequence( @@ -670,6 +697,8 @@ class SiriGrammar(Grammar): k_user, string, set_password) drop_group = Sequence(k_group, group_name) + drop_tag = Sequence(k_tag, tag_name) + # Drop statement needs at least a series_math or where STMT or both drop_series = Sequence( k_series, @@ -688,6 +717,8 @@ class SiriGrammar(Grammar): list_groups = Sequence( k_groups, Optional(group_columns), Optional(where_group)) + list_tags = Sequence( + k_tags, Optional(tag_columns), Optional(where_tag)) list_pools = Sequence( k_pools, Optional(pool_columns), Optional(where_pool)) list_series = Sequence( @@ -705,6 +736,7 @@ class SiriGrammar(Grammar): revoke_user = Sequence(k_user, string) alter_stmt = Sequence(k_alter, Choice( + alter_series, alter_user, alter_group, alter_server, @@ -724,6 +756,7 @@ class SiriGrammar(Grammar): count_shards, count_shards_size, count_users, + count_tags, count_series_length, most_greedy=True)) @@ -733,6 +766,7 @@ class SiriGrammar(Grammar): drop_stmt = Sequence(k_drop, Choice( drop_group, + drop_tag, drop_series, drop_shards, drop_server, @@ -745,6 +779,7 @@ class SiriGrammar(Grammar): list_stmt = Sequence(k_list, Choice( list_series, + list_tags, list_users, list_shards, list_groups, diff --git a/include/siri/db/db.h b/include/siri/db/db.h index 3671a47f..bfdea944 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -37,6 +37,7 @@ typedef struct siridb_s siridb_t; #include #include #include +#include int32_t siridb_get_uptime(siridb_t * siridb); @@ -102,6 +103,7 @@ struct siridb_s siridb_replicate_t * replicate; siridb_reindex_t * reindex; siridb_groups_t * groups; + siridb_tags_t * tags; siridb_buffer_t * buffer; siridb_tee_t * tee; siridb_tasks_t tasks; diff --git a/include/siri/db/group.h b/include/siri/db/group.h index 08667331..6a902ec4 100644 --- a/include/siri/db/group.h +++ b/include/siri/db/group.h @@ -5,8 +5,12 @@ #define SIRIDB_GROUP_H_ #define PCRE2_CODE_UNIT_WIDTH 8 -#define GROUP_FLAG_INIT 1 -#define GROUP_FLAG_DROPPED 2 + +enum +{ + GROUP_FLAG_INIT = 1<<0, + GROUP_FLAG_DROPPED = 1<<1, +}; typedef struct siridb_group_s siridb_group_t; @@ -37,9 +41,9 @@ int siridb_group_is_remote_prop(uint32_t prop); void siridb__group_decref(siridb_group_t * group); void siridb__group_free(siridb_group_t * group); -#define siridb_group_incref(group) group->ref++ +#define siridb_group_incref(group__) (group__)->ref++ #define siridb_group_decref(group__) \ - if (!--group__->ref) siridb__group_free(group__) + if (!--(group__)->ref) siridb__group_free(group__) struct siridb_group_s { diff --git a/include/siri/db/groups.h b/include/siri/db/groups.h index 1af1feed..5e5a90d7 100644 --- a/include/siri/db/groups.h +++ b/include/siri/db/groups.h @@ -32,7 +32,10 @@ typedef enum typedef struct siridb_groups_s siridb_groups_t; -#define GROUPS_FLAG_DROPPED_SERIES 1 +enum +{ + GROUPS_FLAG_DROPPED_SERIES = 1<<0, +}; #include #include @@ -51,6 +54,7 @@ int siridb_groups_drop_group( const char * name, char * err_msg); void siridb_groups_destroy(siridb_groups_t * groups); +void siridb_groups_incref(siridb_groups_t * groups); void siridb_groups_decref(siridb_groups_t * groups); int siridb_groups_add_group( siridb_groups_t * groups, diff --git a/include/siri/db/tag.h b/include/siri/db/tag.h new file mode 100644 index 00000000..c0c4f19b --- /dev/null +++ b/include/siri/db/tag.h @@ -0,0 +1,45 @@ +/* + * tag.h - Tag (tag series). + */ +#ifndef SIRIDB_TAG_H_ +#define SIRIDB_TAG_H_ + +typedef struct siridb_tag_s siridb_tag_t; + +enum +{ + TAG_FLAG_CLEANUP = 1<<0, + TAG_FLAG_REQUIRE_SAVE = 1<<1, +}; + +#include +#include +#include + +siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path); +void siridb__tag_decref(siridb_tag_t * tag); +void siridb__tag_free(siridb_tag_t * tag); +int siridb_tag_is_valid_fn(const char * fn); +siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn); +int siridb_tag_save(siridb_tag_t * tag); +int siridb_tag_is_remote_prop(uint32_t prop); +void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop); +int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond); + + +#define siridb_tag_incref(tag__) (tag__)->ref++ +#define siridb_tag_decref(tag__) \ + if (!--(tag__)->ref) siridb__tag_free(tag__) + +struct siridb_tag_s +{ + uint16_t ref; + uint16_t flags; + uint32_t id; + char * name; + char * fn; + imap_t * series; +}; + + +#endif /* SIRIDB_TAG_H_ */ diff --git a/include/siri/db/tags.h b/include/siri/db/tags.h new file mode 100644 index 00000000..42e262d4 --- /dev/null +++ b/include/siri/db/tags.h @@ -0,0 +1,54 @@ +/* + * tags.h - Tag (tagged series). + */ +#ifndef SIRIDB_TAGS_H_ +#define SIRIDB_TAGS_H_ + +typedef struct siridb_tags_s siridb_tags_t; + +#include +#include +#include +#include +#include +#include + +#define SIRIDB_TAGS_PATH "tags/" + +enum +{ + TAGS_FLAG_DROPPED_SERIES = 1<<0, + TAGS_FLAG_REQUIRE_SAVE = 1<<1, +}; + +struct siridb_tags_s +{ + uint16_t flags; + uint16_t ref; + uint32_t next_id; + char * path; + ct_t * tags; + vec_t * cleanup; + uv_mutex_t mutex; +}; + +int siridb_tags_init(siridb_t * siridb); +void siridb_tags_incref(siridb_tags_t * tags); +void siridb_tags_decref(siridb_tags_t * tags); +siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name); +sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid); +ct_t * siridb_tags_lookup(siridb_tags_t * tags); +void siridb_tags_cleanup(uv_async_t * handle); +void siridb_tags_dropped_series(siridb_tags_t * tags); +void siridb_tags_save(siridb_tags_t * tags); + + +static inline void siridb_tags_set_require_save( + siridb_tags_t * tags, + siridb_tag_t * tag) +{ + tags->flags |= TAGS_FLAG_REQUIRE_SAVE; + tag->flags |= TAG_FLAG_REQUIRE_SAVE; +} + +#endif /* SIRIDB_TAGS_H_ */ diff --git a/include/siri/grammar/grammar.h b/include/siri/grammar/grammar.h index 2dbe8283..362d5e74 100644 --- a/include/siri/grammar/grammar.h +++ b/include/siri/grammar/grammar.h @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2020-01-23 14:08:47 + * Created at: 2020-06-17 15:21:09 */ #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ @@ -22,6 +22,7 @@ enum cleri_grammar_ids { CLERI_GID_AGGREGATE_FUNCTIONS, CLERI_GID_ALTER_DATABASE, CLERI_GID_ALTER_GROUP, + CLERI_GID_ALTER_SERIES, CLERI_GID_ALTER_SERVER, CLERI_GID_ALTER_SERVERS, CLERI_GID_ALTER_STMT, @@ -40,6 +41,7 @@ enum cleri_grammar_ids { CLERI_GID_COUNT_SHARDS, CLERI_GID_COUNT_SHARDS_SIZE, CLERI_GID_COUNT_STMT, + CLERI_GID_COUNT_TAGS, CLERI_GID_COUNT_USERS, CLERI_GID_CREATE_GROUP, CLERI_GID_CREATE_STMT, @@ -50,6 +52,7 @@ enum cleri_grammar_ids { CLERI_GID_DROP_SERVER, CLERI_GID_DROP_SHARDS, CLERI_GID_DROP_STMT, + CLERI_GID_DROP_TAG, CLERI_GID_DROP_USER, CLERI_GID_F_ALL, CLERI_GID_F_COUNT, @@ -73,8 +76,8 @@ enum cleri_grammar_ids { CLERI_GID_GRANT_STMT, CLERI_GID_GRANT_USER, CLERI_GID_GROUP_COLUMNS, - CLERI_GID_GROUP_MATCH, CLERI_GID_GROUP_NAME, + CLERI_GID_GROUP_TAG_MATCH, CLERI_GID_HELP_ACCESS, CLERI_GID_HELP_ALTER, CLERI_GID_HELP_ALTER_DATABASE, @@ -226,6 +229,8 @@ enum cleri_grammar_ids { CLERI_GID_K_SUM, CLERI_GID_K_SYMMETRIC_DIFFERENCE, CLERI_GID_K_SYNC_PROGRESS, + CLERI_GID_K_TAG, + CLERI_GID_K_TAGS, CLERI_GID_K_TEE_PIPE_NAME, CLERI_GID_K_TIMEIT, CLERI_GID_K_TIMEZONE, @@ -234,6 +239,7 @@ enum cleri_grammar_ids { CLERI_GID_K_TRUE, CLERI_GID_K_TYPE, CLERI_GID_K_UNION, + CLERI_GID_K_UNTAG, CLERI_GID_K_UPTIME, CLERI_GID_K_USER, CLERI_GID_K_USERS, @@ -252,6 +258,7 @@ enum cleri_grammar_ids { CLERI_GID_LIST_SERVERS, CLERI_GID_LIST_SHARDS, CLERI_GID_LIST_STMT, + CLERI_GID_LIST_TAGS, CLERI_GID_LIST_USERS, CLERI_GID_LOG_KEYWORDS, CLERI_GID_MERGE_AS, @@ -302,8 +309,12 @@ enum cleri_grammar_ids { CLERI_GID_STRING, CLERI_GID_STR_OPERATOR, CLERI_GID_SUFFIX_EXPR, + CLERI_GID_TAG_COLUMNS, + CLERI_GID_TAG_NAME, + CLERI_GID_TAG_SERIES, CLERI_GID_TIMEIT_STMT, CLERI_GID_TIME_EXPR, + CLERI_GID_UNTAG_SERIES, CLERI_GID_USER_COLUMNS, CLERI_GID_UUID, CLERI_GID_WHERE_GROUP, @@ -311,6 +322,7 @@ enum cleri_grammar_ids { CLERI_GID_WHERE_SERIES, CLERI_GID_WHERE_SERVER, CLERI_GID_WHERE_SHARD, + CLERI_GID_WHERE_TAG, CLERI_GID_WHERE_USER, CLERI_GID__BOOLEAN, CLERI_END // can be used to get the enum length diff --git a/include/siri/net/protocol.h b/include/siri/net/protocol.h index f5a1f147..fb7dedb5 100644 --- a/include/siri/net/protocol.h +++ b/include/siri/net/protocol.h @@ -79,6 +79,7 @@ typedef enum BPROTO_DISABLE_BACKUP_MODE, /* empty */ BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */ BPROTO_DROP_DATABASE, /* empty */ + BPROTO_REQ_TAGS, /* empty */ } bproto_client_t; /* @@ -128,6 +129,7 @@ typedef enum BPROTO_RES_GROUPS, /* [[name, series], ...] */ BPROTO_ACK_TEE_PIPE_NAME, /* empty */ BPROTO_ACK_DROP_DATABASE, /* empty */ + BPROTO_RES_TAGS, /* [[name, series], ...] */ } bproto_server_t; diff --git a/src/siri/db/db.c b/src/siri/db/db.c index a51c7559..24256b46 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -217,6 +217,14 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags) return NULL; } + /* load tags */ + if (siridb_tags_init(siridb)) + { + log_error("Cannot read tags for database '%s'", siridb->dbname); + siridb_decref(siridb); + return NULL; + } + /* update series props */ log_info("Updating series properties"); @@ -765,11 +773,17 @@ void siridb__free(siridb_t * siridb) siridb_groups_decref(siridb->groups); } + if (siridb->tags != NULL) + { + siridb_tags_decref(siridb->tags); + } + if (siridb->tee != NULL) { siridb_tee_free(siridb->tee); } + /* unlock the database in case no siri_err occurred */ if (!siri_err) { diff --git a/src/siri/db/groups.c b/src/siri/db/groups.c index f8420397..5a1a0311 100644 --- a/src/siri/db/groups.c +++ b/src/siri/db/groups.c @@ -69,7 +69,7 @@ siridb_groups_t * siridb_groups_new(siridb_t * siridb) } else { - groups->ref = 2; /* for the main thread and for the groups thread */ + groups->ref = 1; groups->fn = NULL; groups->groups = ct_new(); groups->nseries = vec_new(VEC_DEFAULT_SIZE); @@ -329,6 +329,11 @@ int siridb_groups_drop_group( return 0; } +void siridb_groups_incref(siridb_groups_t * groups) +{ + groups->ref++; +} + void siridb_groups_decref(siridb_groups_t * groups) { if (!--groups->ref) @@ -417,6 +422,9 @@ static void GROUPS_loop(void * arg) siridb_groups_t * groups = siridb->groups; uint64_t mod_test = 0; + siridb_groups_incref(siridb->groups); + siridb_tags_incref(siridb->tags); + while (groups->status != GROUPS_STOPPING) { sleep(GROUPS_LOOP_SLEEP); @@ -445,6 +453,14 @@ static void GROUPS_loop(void * arg) { GROUPS_cleanup(siridb->groups); } + if (siridb->tags->flags & TAGS_FLAG_DROPPED_SERIES) + { + siridb_tags_dropped_series(siridb->tags); + } + if (siridb->tags->flags & TAGS_FLAG_REQUIRE_SAVE) + { + siridb_tags_save(siridb->tags); + } break; case GROUPS_STOPPING: @@ -457,6 +473,8 @@ static void GROUPS_loop(void * arg) } groups->status = GROUPS_CLOSED; + + siridb_tags_decref(siridb->tags); siridb_groups_decref(siridb->groups); } diff --git a/src/siri/db/series.c b/src/siri/db/series.c index a4802ad3..1510b237 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -569,6 +569,7 @@ int siridb_series_flush_dropped(siridb_t * siridb) } siridb->groups->flags |= GROUPS_FLAG_DROPPED_SERIES; + siridb->tags->flags |= TAGS_FLAG_DROPPED_SERIES; return rc; } diff --git a/src/siri/db/tag.c b/src/siri/db/tag.c new file mode 100644 index 00000000..14eaaf54 --- /dev/null +++ b/src/siri/db/tag.c @@ -0,0 +1,263 @@ +/* + * tag.c - Tag. + * + * author : Jeroen van der Heijden + * email : jeroen@transceptor.technology + * copyright : 2017, Transceptor Technology + * + * changes + * - initial version, 16-06-2017 + * + */ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define TAGFN_NUMBERS 9 + +/* + * Returns tag when successful or NULL in case of an error. + */ +siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path) +{ + siridb_tag_t * tag = (siridb_tag_t *) malloc(sizeof(siridb_tag_t)); + if (tag != NULL) + { + tag->ref = 1; + tag->flags = 0; + tag->id = id; + tag->name = NULL; + ; + tag->series = imap_new(); + + if (asprintf( + &tag->fn, + "%s%0*" PRIu32 ".tag", + tags_path, + TAGFN_NUMBERS, + id) < 0 || tag->series == NULL) + { + siridb__tag_free(tag); + tag = NULL; + } + } + return tag; +} + +/* + * Returns tag when successful or NULL in case of an error. + */ +siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn) +{ + siridb_tag_t * tag = siridb_tag_new( + (uint32_t) atoll(fn), + siridb->tags->path); + if (tag != NULL) + { + qp_unpacker_t * unpacker = qp_unpacker_ff(tag->fn); + if (unpacker == NULL) + { + log_critical("cannot open tag file for reading: %s", tag->fn); + siridb__tag_free(tag); + tag = NULL; + } + else + { + qp_obj_t qp_tn; + + if (!qp_is_array(qp_next(unpacker, NULL)) || + qp_next(unpacker, &qp_tn) != QP_RAW || + (tag->name = strndup(qp_tn.via.raw, qp_tn.len)) == NULL) + { + /* or a memory allocation error, but the same result */ + log_critical( + "expected an array with a tag name in file: %s", + tag->fn); + siridb__tag_free(tag); + tag = NULL; + } + else + { + qp_obj_t qp_series_id; + uint64_t series_id; + siridb_series_t * series; + + while (qp_next(unpacker, &qp_series_id) == QP_INT64) + { + series_id = (uint64_t) qp_series_id.via.int64; + series = imap_get(siridb->series_map, series_id); + + if (series == NULL) + { + siridb_tags_require_save(siridb->tags, tag); + + log_error( + "cannot find series id %" PRId64 + " which was tagged with '%s'", + qp_series_id.via.int64, + tag->name); + } + else if (imap_add(tag->series, series_id, series) == 0) + { + siridb_series_incref(series); + } + else + { + log_critical( + "cannot add series '%s' to tag '%s'", + series->name, + tag->name); + } + } + } + qp_unpacker_ff_free(unpacker); + } + } + return tag; +} + +/* + * Lock is required + */ +int siridb_tag_save(siridb_tag_t * tag) +{ + qp_fpacker_t * fpacker; + + fpacker = qp_open(tag->fn, "w"); + if (fpacker == NULL) + { + return -1; + } + + if (/* open a new array */ + qp_fadd_type(fpacker, QP_ARRAY_OPEN) || + + /* write the tag name */ + qp_fadd_string(fpacker, tag->name)) + { + qp_close(fpacker); + return -1; + } + + /* TODO: maybe replace with walk */ + vec_t * series_list = imap_vec(tag->series); + + if (series_list != NULL) + { + siridb_series_t * series; + for (size_t i = 0; i < series_list->len; i++) + { + series = (siridb_series_t *) series_list->data[i]; + qp_fadd_int64(fpacker, (int64_t) series->id); + } + } + + if (qp_close(fpacker) || series_list == NULL) + { + return -1; + } + + return 0; +} + +/* + * Returns true when the given property (CLERI keyword) needs a remote query + */ +int siridb_tag_is_remote_prop(uint32_t prop) +{ + return (prop == CLERI_GID_K_SERIES) ? 1 : 0; +} + +/* + * This function can raise a SIGNAL. In this case the packer is not filled + * with the correct values. + */ +void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop) +{ + switch (prop) + { + case CLERI_GID_K_NAME: + qp_add_string(packer, tag->name); + break; + case CLERI_GID_K_SERIES: + qp_add_int64(packer, (int64_t) tag->id); + break; + } +} + +int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond) +{ + switch (cond->prop) + { + case CLERI_GID_K_NAME: + return cexpr_str_cmp(cond->operator, tag->name, cond->str); + case CLERI_GID_K_SERIES: + return cexpr_int_cmp(cond->operator, (int64_t) tag->id, cond->int64); + } + + log_critical("Unknown group property received: %d", cond->prop); + assert (0); + return -1; +} + +/* + * Can be used as a callback, in other cases go for the macro. + */ +void siridb__tag_decref(siridb_tag_t * tag) +{ + if (!--tag->ref) + { + siridb__tag_free(tag); + } +} + +/* + * NEVER call this function but rather call siridb_tag_decref instead. + * + * Destroy a tag object. Parsing NULL is not allowed. + */ +void siridb__tag_free(siridb_tag_t * tag) +{ +#ifdef DEBUG + log_debug("Free tag: '%s'", tag->name); +#endif + + if ((tag->flags & TAG_FLAG_CLEANUP) && unlink(tag->fn)) + { + log_critical("Cannot remove tag file: '%s'", tag->fn); + } + else if ((tag->flags & TAG_FLAG_REQUIRE_SAVE) && siridb_tag_save(tag)) + { + log_critical("Cannot save tag file: '%s'", tag->fn); + } + + free(tag->name); + free(tag->fn); + if (tag->series != NULL) + { + imap_free(tag->series, (imap_free_cb) siridb__series_decref); + } + + free(tag); +} + +/* + * Returns 1 (true) if the file name is valid and 0 (false) if not + */ +int siridb_tag_is_valid_fn(const char * fn) +{ + int i = 0; + while (*fn && isdigit(*fn)) + { + fn++; + i++; + } + return (i == TAGFN_NUMBERS) ? (strcmp(fn, ".tag") == 0) : 0; +} diff --git a/src/siri/db/tags.c b/src/siri/db/tags.c new file mode 100644 index 00000000..5675ea3b --- /dev/null +++ b/src/siri/db/tags.c @@ -0,0 +1,436 @@ +/* + * tags.c - Tags. + * + * author : Jeroen van der Heijden + * email : jeroen@transceptor.technology + * copyright : 2017, Transceptor Technology + * + * changes + * - initial version, 16-06-2017 + * + */ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static void TAGS_free(siridb_tags_t * tags); +static int TAGS_load(siridb_t * siridb); +static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer); +static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup); +static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list); +static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag); + +/* + * Initialize tags. Returns 0 if successful or -1 in case of an error. + */ +int siridb_tags_init(siridb_t * siridb) +{ + log_info("Loading tags"); + siridb->tags = (siridb_tags_t *) malloc(sizeof(siridb_tags_t)); + if (siridb->tags == NULL) + { + return -1; + } + siridb->tags->flags = 0; + siridb->tags->ref = 1; + siridb->tags->tags = ct_new(); + siridb->tags->cleanup = vec_new(VEC_DEFAULT_SIZE); + siridb->tags->next_id = 0; + + uv_mutex_init(&siridb->tags->mutex); + + if (asprintf( + &siridb->tags->path, + "%s%s", + siridb->dbpath, + SIRIDB_TAGS_PATH) < 0 || + siridb->tags->tags == NULL || + siridb->tags->cleanup == NULL || + TAGS_load(siridb)) + { + TAGS_free(siridb->tags); + siridb->tags = NULL; + return -1; + } + + return 0; +} + +void siridb_tags_incref(siridb_tags_t * tags) +{ + tags->ref++; +} + +void siridb_tags_decref(siridb_tags_t * tags) +{ + if (!--tags->ref) + { + TAGS_free(tags); + } +} + +siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name) +{ + siridb_tag_t * tag = siridb_tag_new(tags->next_id++, tags->path); + if (tag != NULL) + { + tag->name = strdup(name); + if (tag->name == NULL || ct_add(tags->tags, name, tag)) + { + siridb_tag_decref(tag); + tag = NULL; + } + } + return tag; +} + +/* + * Main thread. + * + * Returns NULL and raises a signal in case of an error. + */ +sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid) +{ + qp_packer_t * packer = sirinet_packer_new(8192); + int rc; + + if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN)) + { + return NULL; /* signal is raised */ + } + + rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer); + + if (rc) + { + /* signal is raised when not 0 */ + qp_packer_free(packer); + return NULL; + } + + return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS); +} + +/* + * This function will set and unset the mutex lock. + */ +void siridb_tags_cleanup(uv_async_t * handle) +{ + siridb_tags_t * tags = (siridb_tags_t *) handle->data; + siridb_tag_t * tag, * rmtag; + + uv_mutex_lock(&tags->mutex); + + while (tags->cleanup->len) + { + tag = (siridb_tag_t *) slist_pop(tags->cleanup); + + if (!tag->series->len && + (rmtag = (siridb_tag_t *) ct_pop(tags->tags, tag->name)) != NULL) + { +#ifdef DEBUG + assert(rmtag == tag && (tag->flags & TAG_FLAG_CLEANUP)); +#endif + siridb_tag_decref(rmtag); + } + } + + uv_mutex_unlock(&tags->mutex); + + siridb_tags_decref(tags); + + uv_close((uv_handle_t *) handle, (uv_close_cb) free); +} + +ct_t * siridb_tags_lookup(siridb_tags_t * tags) +{ + ct_t * lookup = ct_new(); + if (lookup != NULL) + { + ct_values(tags->tags, (ct_val_cb) &TAGS_ctmap_update, lookup); + } + return lookup; +} + +/* + * This function is called from the "Group" thread. + */ +void siridb_tags_dropped_series(siridb_tags_t * tags) +{ + siridb_tag_t * tag; + vec_t * tags_list; + + uv_mutex_lock(&tags->mutex); + + tags_list = slist_new(tags->tags->len); + + tags->flags &= ~TAGS_FLAG_DROPPED_SERIES; + + ct_values(tags->tags, (ct_val_cb) TAGS_2slist, tags_list); + + uv_mutex_unlock(&tags->mutex); + + while (tags_list->len) + { + tag = (siridb_tag_t *) slist_pop(tags_list); + + uv_mutex_lock(&tags->mutex); + + TAGS_dropped_series(tags, tag); + + siridb_tag_decref(tag); + + uv_mutex_unlock(&tags->mutex); + + usleep(10000); // 10ms + } + + slist_free(tags_list); + + if (tags->cleanup->len) + { + uv_async_t * cleanup = (uv_async_t *) malloc(sizeof(uv_async_t)); + + if (cleanup == NULL) + { + log_critical("Allocation error while creating cleanup task"); + return; + } + siridb_tags_incref(tags); + + cleanup->data = (void *) tags; + + uv_async_init(siri.loop, + cleanup, + (uv_async_cb) siridb_tags_cleanup); + uv_async_send(cleanup); + } +} + +void siridb_tags_save(siridb_tags_t * tags) +{ + siridb_tag_t * tag; + vec_t * tags_list; + + uv_mutex_lock(&tags->mutex); + + tags_list = slist_new(tags->tags->len); + + tags->flags &= ~TAGS_FLAG_REQUIRE_SAVE; + + ct_values(tags->tags, (ct_val_cb) TAGS_2slist, tags_list); + + uv_mutex_unlock(&tags->mutex); + + while (tags_list->len) + { + tag = (siridb_tag_t *) slist_pop(tags_list); + + if (tag->flags & TAG_FLAG_REQUIRE_SAVE) + { + uv_mutex_lock(&tags->mutex); + + siridb_tag_save(tag); + + tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE; + + uv_mutex_unlock(&tags->mutex); + } + + siridb_tag_decref(tag); + + usleep(10000); // 10ms + } + + slist_free(tags_list); +} + +/* + * This function is called from the "Group" thread. + */ +static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list) +{ + siridb_tag_incref(tag); + slist_append(tags_list, tag); + return 0; +} + +/* + * This function is called from the "Group" thread. + */ +static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag) +{ + vec_t * tag_series = imap_slist_pop(tag->series); + siridb_series_t * series, * s = NULL; + + if (tag_series != NULL) + { + for (size_t i = 0; i < tag_series->len; i++) + { + series = (siridb_series_t *) tag_series->data[i]; + if (series->flags & SIRIDB_SERIES_IS_DROPPED) + { + s = (siridb_series_t *) imap_pop(tag->series, series->id); + assert (s != NULL); + siridb_series_decref(s); + } + } + + if (s == NULL) + { + /* unchanged, we can put the list back */ + tag->series->vec = tag_series; + } + else + { + slist_free(tag_series); + + if (!tag->series->len && (~tag->flags & TAG_FLAG_CLEANUP)) + { + tag->flags |= TAG_FLAG_CLEANUP; + if (slist_append_safe(&tags->cleanup, tag)) + { + log_critical( + "Unexpected error while appending tag to " + "cleanup list"); + } + } + } + } + + return 0; +} + +static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup) +{ + if (tag->series->len) + { + volatile uintptr_t iptr = (uint32_t) tag->series->len; + return ct_add(lookup, tag->name, (uint32_t *) iptr); + } + return 0; +} + +/* + * Main thread. + */ +static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer) +{ + int rc = 0; + rc += qp_add_type(packer, QP_ARRAY2); + rc += qp_add_string_term(packer, tag->name); + rc += qp_add_int64(packer, (int64_t) tag->series->len); + return rc; +} + +static int TAGS_load(siridb_t * siridb) +{ + struct stat st = {0}; + struct dirent ** tags_list; + int total, n, rc; + siridb_tag_t * tag; + + if (stat(siridb->tags->path, &st) == -1) + { + log_warning( + "Tags directory not found, creating directory '%s'.", + siridb->tags->path); + if (mkdir(siridb->tags->path, 0700) == -1) + { + log_error("Cannot create directory '%s'.", siridb->tags->path); + return -1; + } + } + + total = scandir(siridb->tags->path, &tags_list, NULL, alphasort); + + if (total < 0) + { + /* no need to free tags_list when total < 0 */ + log_error("Cannot read tags directory '%s'.", siridb->tags->path); + return -1; + } + + rc = 0; + + for (n = 0; n < total; n++) + { + if (!siridb_tag_is_valid_fn(tags_list[n]->d_name)) + { + continue; + } + + /* we are sure this fits since the filename is checked */ + tag = siridb_tag_load(siridb, tags_list[n]->d_name); + if (tag == NULL) + { + log_error("Error while loading tag: '%s'", tags_list[n]->d_name); + rc = -1; + break; + } + + if (!tag->series->len) + { + log_warning("Removing tag '%s' since it has no series", tag->name); + tag->flags |= TAG_FLAG_CLEANUP; + siridb_tag_decref(tag); + continue; + } + + if (ct_add(siridb->tags->tags, tag->name, tag)) + { + log_error("Cannot add tag to collection"); + siridb_tag_decref(tag); + rc = -1; + break; + } + + if (tag->id >= siridb->tags->next_id) + { + siridb->tags->next_id = tag->id + 1; + } + + } + + while (total--) + { + free(tags_list[total]); + } + + free(tags_list); + + return rc; +} + +static void TAGS_free(siridb_tags_t * tags) +{ +#ifdef DEBUG + log_debug("Free tags"); +#endif + free(tags->path); + + if (tags->cleanup != NULL) + { + slist_free(tags->cleanup); + } + + uv_mutex_lock(&tags->mutex); + + if (tags->tags != NULL) + { + ct_free(tags->tags, (ct_free_cb) siridb__tag_decref); + } + + uv_mutex_unlock(&tags->mutex); + + uv_mutex_destroy(&tags->mutex); + + free(tags); +} diff --git a/src/siri/grammar/grammar.c b/src/siri/grammar/grammar.c index 4aba9137..34bff80f 100644 --- a/src/siri/grammar/grammar.c +++ b/src/siri/grammar/grammar.c @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2020-01-23 14:08:47 + * Created at: 2020-06-17 15:21:09 */ #include "siri/grammar/grammar.h" @@ -162,6 +162,8 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_keyword(CLERI_NONE, "symmetric_difference", CLERI_CASE_SENSITIVE) ); cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE); + cleri_t * k_tag = cleri_keyword(CLERI_GID_K_TAG, "tag", CLERI_CASE_SENSITIVE); + cleri_t * k_tags = cleri_keyword(CLERI_GID_K_TAGS, "tags", CLERI_CASE_SENSITIVE); cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE); cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE); cleri_t * k_timezone = cleri_keyword(CLERI_GID_K_TIMEZONE, "timezone", CLERI_CASE_SENSITIVE); @@ -176,6 +178,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_tokens(CLERI_NONE, ", |"), cleri_keyword(CLERI_NONE, "union", CLERI_CASE_SENSITIVE) ); + cleri_t * k_untag = cleri_keyword(CLERI_GID_K_UNTAG, "untag", CLERI_CASE_SENSITIVE); cleri_t * k_uptime = cleri_keyword(CLERI_GID_K_UPTIME, "uptime", CLERI_CASE_SENSITIVE); cleri_t * k_user = cleri_keyword(CLERI_GID_K_USER, "user", CLERI_CASE_SENSITIVE); cleri_t * k_users = cleri_keyword(CLERI_GID_K_USERS, "users", CLERI_CASE_SENSITIVE); @@ -351,6 +354,13 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_name, k_access ), cleri_token(CLERI_NONE, ","), 1, 0, 0); + cleri_t * tag_columns = cleri_list(CLERI_GID_TAG_COLUMNS, cleri_choice( + CLERI_NONE, + CLERI_FIRST_MATCH, + 2, + k_name, + k_series + ), cleri_token(CLERI_NONE, ","), 1, 0, 0); cleri_t * pool_props = cleri_choice( CLERI_GID_POOL_PROPS, CLERI_FIRST_MATCH, @@ -413,6 +423,50 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) ) ) ); + cleri_t * where_tag = cleri_sequence( + CLERI_GID_WHERE_TAG, + 2, + k_where, + cleri_prio( + CLERI_NONE, + 5, + cleri_sequence( + CLERI_NONE, + 3, + k_name, + str_operator, + string + ), + cleri_sequence( + CLERI_NONE, + 3, + k_series, + int_operator, + int_expr + ), + cleri_sequence( + CLERI_NONE, + 3, + cleri_token(CLERI_NONE, "("), + CLERI_THIS, + cleri_token(CLERI_NONE, ")") + ), + cleri_sequence( + CLERI_NONE, + 3, + CLERI_THIS, + k_and, + CLERI_THIS + ), + cleri_sequence( + CLERI_NONE, + 3, + CLERI_THIS, + k_or, + CLERI_THIS + ) + ) + ); cleri_t * where_pool = cleri_sequence( CLERI_GID_WHERE_POOL, 2, @@ -772,6 +826,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) ); cleri_t * series_name = cleri_dup(CLERI_GID_SERIES_NAME, string); cleri_t * group_name = cleri_dup(CLERI_GID_GROUP_NAME, r_grave_str); + cleri_t * tag_name = cleri_dup(CLERI_GID_TAG_NAME, r_grave_str); cleri_t * series_re = cleri_dup(CLERI_GID_SERIES_RE, r_regex); cleri_t * uuid = cleri_choice( CLERI_GID_UUID, @@ -780,7 +835,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) r_uuid_str, string ); - cleri_t * group_match = cleri_dup(CLERI_GID_GROUP_MATCH, r_grave_str); + cleri_t * group_tag_match = cleri_dup(CLERI_GID_GROUP_TAG_MATCH, r_grave_str); cleri_t * series_match = cleri_prio( CLERI_GID_SERIES_MATCH, 4, @@ -790,7 +845,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) 4, series_all, series_name, - group_match, + group_tag_match, series_re ), series_setopr, 1, 0, 0), cleri_choice( @@ -799,7 +854,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) 4, series_all, series_name, - group_match, + group_tag_match, series_re ), series_parentheses, @@ -1167,6 +1222,18 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_timezone, string ); + cleri_t * tag_series = cleri_sequence( + CLERI_GID_TAG_SERIES, + 2, + k_tag, + tag_name + ); + cleri_t * untag_series = cleri_sequence( + CLERI_GID_UNTAG_SERIES, + 2, + k_untag, + tag_name + ); cleri_t * set_expiration_num = cleri_sequence( CLERI_GID_SET_EXPIRATION_NUM, 4, @@ -1254,12 +1321,32 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) set_name ) ); + cleri_t * alter_series = cleri_sequence( + CLERI_GID_ALTER_SERIES, + 4, + k_series, + series_match, + cleri_optional(CLERI_NONE, where_series), + cleri_choice( + CLERI_NONE, + CLERI_FIRST_MATCH, + 2, + tag_series, + untag_series + ) + ); cleri_t * count_groups = cleri_sequence( CLERI_GID_COUNT_GROUPS, 2, k_groups, cleri_optional(CLERI_NONE, where_group) ); + cleri_t * count_tags = cleri_sequence( + CLERI_GID_COUNT_TAGS, + 2, + k_tags, + cleri_optional(CLERI_NONE, where_tag) + ); cleri_t * count_pools = cleri_sequence( CLERI_GID_COUNT_POOLS, 2, @@ -1341,6 +1428,12 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) k_group, group_name ); + cleri_t * drop_tag = cleri_sequence( + CLERI_GID_DROP_TAG, + 2, + k_tag, + tag_name + ); cleri_t * drop_series = cleri_sequence( CLERI_GID_DROP_SERIES, 4, @@ -1382,6 +1475,13 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_optional(CLERI_NONE, group_columns), cleri_optional(CLERI_NONE, where_group) ); + cleri_t * list_tags = cleri_sequence( + CLERI_GID_LIST_TAGS, + 3, + k_tags, + cleri_optional(CLERI_NONE, tag_columns), + cleri_optional(CLERI_NONE, where_tag) + ); cleri_t * list_pools = cleri_sequence( CLERI_GID_LIST_POOLS, 3, @@ -1431,7 +1531,8 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 5, + 6, + alter_series, alter_user, alter_group, alter_server, @@ -1447,7 +1548,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_MOST_GREEDY, - 10, + 11, count_groups, count_pools, count_series, @@ -1457,6 +1558,7 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) count_shards, count_shards_size, count_users, + count_tags, count_series_length ) ); @@ -1479,8 +1581,9 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 5, + 6, drop_group, + drop_tag, drop_series, drop_shards, drop_server, @@ -1507,8 +1610,9 @@ cleri_grammar_t * compile_siri_grammar_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 6, + 7, list_series, + list_tags, list_users, list_shards, list_groups, diff --git a/src/siri/net/protocol.c b/src/siri/net/protocol.c index e0572c0d..5c685d2a 100644 --- a/src/siri/net/protocol.c +++ b/src/siri/net/protocol.c @@ -86,6 +86,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n) case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE"; case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE"; case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE"; + case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS"; default: sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n); return protocol_str; @@ -123,6 +124,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n) case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS"; case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME"; case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE"; + case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS"; default: sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n); return protocol_str;