Work on tag support
authorJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 15 Jul 2020 17:34:09 +0000 (19:34 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 15 Jul 2020 17:34:09 +0000 (19:34 +0200)
include/siri/db/queries.h
include/siri/db/tag.h
include/siri/db/tags.h
src/siri/db/listener.c
src/siri/db/tag.c
src/siri/db/tags.c

index 7e7f5cfc58b4f5ac2de6436975d709d8e4ae9db1..d6f8d0824e83d260000b10b30d2a98a171322d54 100644 (file)
@@ -36,7 +36,8 @@ typedef enum
     QUERY_ALTER_GROUP,
     QUERY_ALTER_SERVER,
     QUERY_ALTER_SERVERS,
-    QUERY_ALTER_USER
+    QUERY_ALTER_USER,
+    QUERY_ALTER_SERIES
 } query_alter_tp;
 
 #define QUERY_DEF               \
index c0c4f19b48b7cd395de4da1e43217dff9b46df98..60406a977b127d984c70ad824244879027691d76 100644 (file)
@@ -35,9 +35,8 @@ struct siridb_tag_s
 {
     uint16_t ref;
     uint16_t flags;
-    uint32_t id;
+    uint32_t n;
     char * name;
-    char * fn;
     imap_t * series;
 };
 
index 42e262d4204fcd5518d3158d1e621b513b2ae1ea..c8355661cfb01ec1935a248dc1e413a6673ee89b 100644 (file)
@@ -25,7 +25,6 @@ struct siridb_tags_s
 {
     uint16_t flags;
     uint16_t ref;
-    uint32_t next_id;
     char * path;
     ct_t * tags;
     vec_t * cleanup;
@@ -41,14 +40,13 @@ ct_t * siridb_tags_lookup(siridb_tags_t * tags);
 void siridb_tags_cleanup(uv_async_t * handle);
 void siridb_tags_dropped_series(siridb_tags_t * tags);
 void siridb_tags_save(siridb_tags_t * tags);
+void siridb_tags_init_nseries(siridb_tags_t * tags);
 
 
-static inline void siridb_tags_set_require_save(
-        siridb_tags_t * tags,
-        siridb_tag_t * tag)
-{
-    tags->flags |= TAGS_FLAG_REQUIRE_SAVE;
-    tag->flags |= TAG_FLAG_REQUIRE_SAVE;
-}
+#define siridb_tags_set_require_save(__tags, __tag) \
+do{                                                 \
+    (__tags)->flags |= TAGS_FLAG_REQUIRE_SAVE;      \
+    (__tag)->flags |= TAG_FLAG_REQUIRE_SAVE;        \
+}while(0)
 
 #endif  /* SIRIDB_TAGS_H_ */
index 94b80ec544c6afbf9661c4cbfb2c7368d176c960..28764293ffc45c4c279839fa775d601fe6ed0931 100644 (file)
@@ -21,6 +21,7 @@
 #include <siri/db/servers.h>
 #include <siri/db/shard.h>
 #include <siri/db/shards.h>
+#include <siri/db/tags.h>
 #include <siri/db/user.h>
 #include <siri/db/users.h>
 #include <siri/db/listener.h>
@@ -191,7 +192,7 @@ static void enter_create_stmt(uv_async_t * handle);
 static void enter_create_user(uv_async_t * handle);
 static void enter_drop_stmt(uv_async_t * handle);
 static void enter_grant_user(uv_async_t * handle);
-static void enter_group_match(uv_async_t * handle);
+static void enter_group_tag_match(uv_async_t * handle);
 static void enter_help(uv_async_t * handle);
 static void enter_limit_expr(uv_async_t * handle);
 static void enter_list_stmt(uv_async_t * handle);
@@ -208,6 +209,7 @@ static void enter_series_match(uv_async_t * handle);
 static void enter_series_parentheses(uv_async_t * handle);
 static void enter_series_re(uv_async_t * handle);
 static void enter_series_setopr(uv_async_t * handle);
+static void enter_tag_series(uv_async_t * handle);
 static void enter_timeit_stmt(uv_async_t * handle);
 static void enter_where_xxx(uv_async_t * handle);
 static void enter_xxx_columns(uv_async_t * handle);
@@ -227,6 +229,7 @@ static void exit_count_servers_received(uv_async_t * handle);
 static void exit_count_servers_selected(uv_async_t * handle);
 static void exit_count_shards(uv_async_t * handle);
 static void exit_count_shards_size(uv_async_t * handle);
+static void exit_count_tags(uv_async_t * handle);
 static void exit_count_users(uv_async_t * handle);
 static void exit_create_group(uv_async_t * handle);
 static void exit_create_user(uv_async_t * handle);
@@ -242,6 +245,7 @@ static void exit_list_pools(uv_async_t * handle);
 static void exit_list_series(uv_async_t * handle);
 static void exit_list_servers(uv_async_t * handle);
 static void exit_list_shards(uv_async_t * handle);
+static void exit_list_tags(uv_async_t * handle);
 static void exit_list_users(uv_async_t * handle);
 static void exit_revoke_user(uv_async_t * handle);
 static void exit_select_aggregate(uv_async_t * handle);
@@ -283,9 +287,11 @@ static void on_count_xxx_response(vec_t * promises, uv_async_t * handle);
 static void on_drop_series_response(vec_t * promises, uv_async_t * handle);
 static void on_drop_shards_response(vec_t * promises, uv_async_t * handle);
 static void on_groups_response(vec_t * promises, uv_async_t * handle);
+static void on_tags_response(vec_t * promises, uv_async_t * handle);
 static void on_list_xxx_response(vec_t * promises, uv_async_t * handle);
 static void on_select_response(vec_t * promises, uv_async_t * handle);
 static void on_update_xxx_response(vec_t * promises, uv_async_t * handle);
+static void on_tag_response(vec_t * promises, uv_async_t * handle);
 
 /* helper functions */
 static void master_select_work(uv_work_t * handle);
@@ -331,6 +337,11 @@ static int values_list_groups(siridb_group_t * group, uv_async_t * handle);
 static int values_count_groups(siridb_group_t * group, uv_async_t * handle);
 static void finish_list_groups(uv_async_t * handle);
 static void finish_count_groups(uv_async_t * handle);
+static int values_list_tags(siridb_tag_t * tag, uv_async_t * handle);
+static int values_count_tags(siridb_tag_t * tag, uv_async_t * handle);
+static void finish_list_tags(uv_async_t * handle);
+static void finish_count_tags(uv_async_t * handle);
+
 
 /* address bindings for default list properties */
 static uint32_t GID_K_NAME = CLERI_GID_K_NAME;
@@ -419,7 +430,7 @@ void siridb_init_listener(void)
     siridb_listen_enter[CLERI_GID_DROP_STMT] = enter_drop_stmt;
     siridb_listen_enter[CLERI_GID_GRANT_USER] = enter_grant_user;
     siridb_listen_enter[CLERI_GID_GROUP_COLUMNS] = enter_xxx_columns;
-    siridb_listen_enter[CLERI_GID_GROUP_MATCH] = enter_group_match;
+    siridb_listen_enter[CLERI_GID_GROUP_TAG_MATCH] = enter_group_tag_match;
     siridb_listen_enter[CLERI_GID_HELP_STMT] = enter_help;
     siridb_listen_enter[CLERI_GID_LIMIT_EXPR] = enter_limit_expr;
     siridb_listen_enter[CLERI_GID_LIST_STMT] = enter_list_stmt;
@@ -440,6 +451,8 @@ void siridb_init_listener(void)
     siridb_listen_enter[CLERI_GID_SERIES_RE] = enter_series_re;
     siridb_listen_enter[CLERI_GID_SERIES_SETOPR] = enter_series_setopr;
     siridb_listen_enter[CLERI_GID_SHARD_COLUMNS] = enter_xxx_columns;
+    siridb_listen_enter[CLERI_GID_TAG_COLUMNS] = enter_xxx_columns;
+    siridb_listen_enter[CLERI_GID_TAG_SERIES] = enter_tag_series;
     siridb_listen_enter[CLERI_GID_TIMEIT_STMT] = enter_timeit_stmt;
     siridb_listen_enter[CLERI_GID_USER_COLUMNS] = enter_xxx_columns;
     siridb_listen_enter[CLERI_GID_WHERE_GROUP] = enter_where_xxx;
@@ -447,6 +460,7 @@ void siridb_init_listener(void)
     siridb_listen_enter[CLERI_GID_WHERE_SERIES] = enter_where_xxx;
     siridb_listen_enter[CLERI_GID_WHERE_SERVER] = enter_where_xxx;
     siridb_listen_enter[CLERI_GID_WHERE_SHARD] = enter_where_xxx;
+    siridb_listen_enter[CLERI_GID_WHERE_TAG] = enter_where_xxx;
     siridb_listen_enter[CLERI_GID_WHERE_USER] = enter_where_xxx;
 
 
@@ -465,6 +479,7 @@ void siridb_init_listener(void)
     siridb_listen_exit[CLERI_GID_COUNT_SERVERS_SELECTED] = exit_count_servers_selected;
     siridb_listen_exit[CLERI_GID_COUNT_SHARDS] = exit_count_shards;
     siridb_listen_exit[CLERI_GID_COUNT_SHARDS_SIZE] = exit_count_shards_size;
+    siridb_listen_exit[CLERI_GID_COUNT_TAGS] = exit_count_tags;
     siridb_listen_exit[CLERI_GID_COUNT_USERS] = exit_count_users;
     siridb_listen_exit[CLERI_GID_CREATE_GROUP] = exit_create_group;
     siridb_listen_exit[CLERI_GID_CREATE_USER] = exit_create_user;
@@ -479,6 +494,7 @@ void siridb_init_listener(void)
     siridb_listen_exit[CLERI_GID_LIST_SERIES] = exit_list_series;
     siridb_listen_exit[CLERI_GID_LIST_SERVERS] = exit_list_servers;
     siridb_listen_exit[CLERI_GID_LIST_SHARDS] = exit_list_shards;
+    siridb_listen_exit[CLERI_GID_LIST_TAGS] = exit_list_tags;
     siridb_listen_exit[CLERI_GID_LIST_USERS] = exit_list_users;
     siridb_listen_exit[CLERI_GID_REVOKE_USER] = exit_revoke_user;
     siridb_listen_exit[CLERI_GID_SELECT_AGGREGATE] = exit_select_aggregate;
@@ -780,12 +796,15 @@ static void enter_grant_user(uv_async_t * handle)
         SIRIPARSER_NEXT_NODE
     }
 }
-static void enter_group_match(uv_async_t * handle)
+static void enter_group_tag_match(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
     siridb_t * siridb = query->client->siridb;
     cleri_node_t * node = query->nodes->node;
     query_wrapper_t * q_wrapper = query->data;
+    siridb_group_t * group;
+    siridb_tag_t * tag;
+
 
     /* we must send this query to all pools */
     if (q_wrapper->pmap != NULL)
@@ -794,20 +813,20 @@ static void enter_group_match(uv_async_t * handle)
         q_wrapper->pmap = NULL;
     }
 
-    char group_name[node->len - 1];
+    char group_or_tag_name[node->len - 1];
 
     /* extract series name */
-    xstr_extract_string(group_name, node->str, node->len);
+    xstr_extract_string(group_or_tag_name, node->str, node->len);
 
-    siridb_group_t * group =
-            (siridb_group_t *) ct_get(siridb->groups->groups, group_name);
+    group = ct_get(siridb->groups->groups, group_or_tag_name);
 
-    if (group == NULL)
+    if (group == NULL &&
+        (tag = ct_get(siridb->tags->tags, group_or_tag_name)) == NULL)
     {
         snprintf(query->err_msg,
                 SIRIDB_MAX_SIZE_ERR_MSG,
-                "Cannot find group '%s'",
-                group_name);
+                "Cannot find group or tag '%s'",
+                group_or_tag_name);
         siridb_query_send_error(handle, CPROTO_ERR_QUERY);
     }
     else
@@ -823,20 +842,49 @@ static void enter_group_match(uv_async_t * handle)
             MEM_ERR_RET
         }
 
-        uv_mutex_lock(&siridb->groups->mutex);
-
-        for (i = 0; i < group->series->len; i++)
+        if (group)
         {
-            series = (siridb_series_t *) group->series->data[i];
-            siridb_series_incref(series);
-            if (imap_add(q_wrapper->series_tmp, series->id, series))
+            uv_mutex_lock(&siridb->groups->mutex);
+
+            for (i = 0; i < group->series->len; i++)
             {
-                log_critical("Cannot add series to temporary map.");
-                siridb_series_decref(series);
+                series = (siridb_series_t *) group->series->data[i];
+                siridb_series_incref(series);
+                if (imap_add(q_wrapper->series_tmp, series->id, series))
+                {
+                    log_critical("Cannot add series to temporary map.");
+                    siridb_series_decref(series);
+                }
             }
+
+            uv_mutex_unlock(&siridb->groups->mutex);
         }
+        else /* tag */
+        {
+            vec_t * tag_series;
+
+            assert (tag != NULL);
+
+            uv_mutex_lock(&siridb->tags->mutex);
 
-        uv_mutex_unlock(&siridb->groups->mutex);
+            tag_series = imap_vec(tag->series);
+
+            if (tag_series != NULL)
+            {
+                for (size_t i = 0; i < tag_series->len; i++)
+                {
+                    series = (siridb_series_t *) tag_series->data[i];
+                    siridb_series_incref(series);
+                    if (imap_add(q_wrapper->series_tmp, series->id, series))
+                    {
+                        log_critical("Cannot add series to temporary map.");
+                        siridb_series_decref(series);
+                    }
+                }
+            }
+
+            uv_mutex_unlock(&siridb->tags->mutex);
+        }
 
         if (q_wrapper->update_cb != NULL)
         {
@@ -1516,6 +1564,91 @@ static void enter_series_setopr(uv_async_t * handle)
     SIRIPARSER_NEXT_NODE
 }
 
+static void enter_tag_series(uv_async_t * handle)
+{
+
+    siridb_query_t * query = (siridb_query_t *) handle->data;
+    siridb_t * siridb = query->client->siridb;
+    query_alter_t * q_alter = (query_alter_t *) query->data;
+
+    q_alter->tp = QUERY_ALTER_SERIES;
+
+    MASTER_CHECK_ACCESSIBLE(siridb)
+    MASTER_CHECK_VERSION(siridb, "2.0.19")
+
+    cleri_node_t * tag_node =
+                    query->nodes->node->children->next->node;
+    siridb_tag_t * tag;
+
+    char name[tag_node->len - 1];
+    xstr_extract_string(name, tag_node->str, tag_node->len);
+
+    tag = ct_get(siridb->tags->tags, name);
+
+    if (tag == NULL)
+    {
+        if (ct_get(siridb->groups->groups, name) != NULL)
+        {
+            snprintf(query->err_msg,
+                    SIRIDB_MAX_SIZE_ERR_MSG,
+                    "Cannot create tag `%s` because a group with this name "
+                    "already exist.",
+                    name);
+            siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            return;
+        }
+
+        uv_mutex_lock(&siridb->tags->mutex);
+
+        tag = siridb_tags_add(siridb->tags, name);
+
+        if (tag == NULL)
+        {
+            uv_mutex_unlock(&siridb->tags->mutex);
+            snprintf(query->err_msg,
+                    SIRIDB_MAX_SIZE_ERR_MSG,
+                    "Unexpected error while creating tag: `%s`",
+                    name);
+            siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+            return;
+        }
+    }
+    else
+    {
+        uv_mutex_lock(&siridb->tags->mutex);
+    }
+
+    q_alter->n = q_alter->series_map->len;
+
+    imap_union_ref(
+            tag->series,
+            q_alter->series_map,
+            (imap_free_cb) &siridb__series_decref);
+
+    siridb_tags_set_require_save(siridb->tags, tag);
+
+    uv_mutex_unlock(&siridb->tags->mutex);
+
+    q_alter->series_map = NULL;
+
+    QP_ADD_SUCCESS
+
+    if (IS_MASTER)
+    {
+        siridb_query_forward(
+                handle,
+                SIRIDB_QUERY_FWD_UPDATE,
+                (sirinet_promises_cb) on_tag_response,
+                0);
+    }
+    else
+    {
+        qp_add_int64(query->packer, q_alter->n);
+
+        SIRIPARSER_ASYNC_NEXT_NODE
+    }
+}
+
 static void enter_timeit_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -2216,6 +2349,39 @@ static void exit_count_shards_size(uv_async_t * handle)
     }
 }
 
+static void exit_count_tags(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+    query_count_t * q_count = (query_count_t *) query->data;
+
+    if (q_count->where_expr == NULL || !cexpr_contains(
+            q_count->where_expr,
+            siridb_tag_is_remote_prop))
+    {
+        finish_count_tags(handle);
+    }
+    else
+    {
+        sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_GROUPS, NULL);
+
+        if (pkg != NULL)
+        {
+            siri_async_incref(handle);
+
+            query->nodes->cb = (uv_async_cb) finish_count_tags;
+
+            siridb_pools_send_pkg(
+                    siridb,
+                    pkg,
+                    0,
+                    (sirinet_promises_cb) on_tags_response,
+                    handle,
+                    0);
+        }
+    }
+}
+
 static void exit_count_users(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -3218,6 +3384,63 @@ static void exit_list_shards(uv_async_t * handle)
     }
 }
 
+static void exit_list_tags(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+    query_list_t * q_list = (query_list_t *) query->data;
+
+    int is_local = (q_list->props == NULL);
+
+    /* if not is_local check for 'remote' columns */
+    if (!is_local)
+    {
+        is_local = 1;
+        size_t i;
+        for (i = 0; i < q_list->props->len; i++)
+        {
+            if (siridb_tag_is_remote_prop(
+                    *((uint32_t *) q_list->props->data[i])))
+            {
+                is_local = 0;
+                break;
+            }
+        }
+    }
+
+    /* if is_local, check if we use 'remote' props in where expression */
+    if (is_local && q_list->where_expr != NULL)
+    {
+        is_local = !cexpr_contains(
+                q_list->where_expr,
+                siridb_tag_is_remote_prop);
+    }
+
+    if (is_local)
+    {
+        finish_list_tags(handle);
+    }
+    else
+    {
+        sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_GROUPS, NULL);
+
+        if (pkg != NULL)
+        {
+            siri_async_incref(handle);
+
+            query->nodes->cb = (uv_async_cb) finish_list_tags;
+
+            siridb_pools_send_pkg(
+                    siridb,
+                    pkg,
+                    0,
+                    (sirinet_promises_cb) on_tags_response,
+                    handle,
+                    0);
+        }
+    }
+}
+
 static void exit_list_users(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -5651,6 +5874,63 @@ static void on_groups_response(vec_t * promises, uv_async_t * handle)
     query->nodes->cb(handle);
 }
 
+static void on_tags_response(vec_t * promises, uv_async_t * handle)
+{
+    ON_PROMISES
+
+    sirinet_pkg_t * pkg;
+    sirinet_promise_t * promise;
+    qp_unpacker_t unpacker;
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+    siridb_tag_t * tag;
+    qp_obj_t qp_name;
+    qp_obj_t qp_series;
+    size_t i;
+
+    siridb_tags_init_nseries(siridb->tags);
+
+    for (i = 0; i < promises->len; i++)
+    {
+        promise = promises->data[i];
+
+        if (promise == NULL)
+        {
+            continue;
+        }
+
+        pkg = (sirinet_pkg_t *) promise->data;
+
+        if (pkg != NULL && pkg->tp == BPROTO_RES_GROUPS)
+        {
+            qp_unpacker_init(&unpacker, pkg->data, pkg->len);
+
+            if (    qp_is_array(qp_next(&unpacker, NULL)))
+            {
+                while ( qp_is_array(qp_next(&unpacker, NULL)) &&
+                        qp_is_raw(qp_next(&unpacker, &qp_name)) &&
+                        qp_is_raw_term(&qp_name) &&
+                        qp_is_int(qp_next(&unpacker, &qp_series)))
+                {
+                    tag = (siridb_tag_t *) ct_get(
+                            siridb->tags->tags,
+                            (const char *) qp_name.via.raw);
+                    if (tag != NULL)
+                    {
+                        tag->n += qp_series.via.int64;
+                    }
+                }
+            }
+        }
+
+        /* make sure we free the promise and data */
+        free(promise->data);
+        sirinet_promise_decref(promise);
+    }
+
+    query->nodes->cb(handle);
+}
+
 /*
  * Call-back function: sirinet_promises_cb
  *
@@ -5965,6 +6245,70 @@ static void on_update_xxx_response(vec_t * promises, uv_async_t * handle)
     }
 }
 
+/*
+ * Call-back function: sirinet_promises_cb
+ *
+ * Make sure to run siri_async_incref() on the handle.
+ *
+ * Note: used both for tag and untag response.
+ */
+static void on_tag_response(vec_t * promises, uv_async_t * handle)
+{
+    ON_PROMISES
+
+    siridb_query_t * query = (siridb_query_t *) handle->data;
+    sirinet_pkg_t * pkg;
+    sirinet_promise_t * promise;
+    qp_unpacker_t unpacker;
+    qp_obj_t qp_tag;
+
+    query_alter_t * q_tag = (query_alter_t *) query->data;
+
+    for (size_t i = 0; i < promises->len; i++)
+    {
+        promise = promises->data[i];
+
+        if (promise == NULL)
+        {
+            continue;
+        }
+
+        pkg = (sirinet_pkg_t *) promise->data;
+
+        if (pkg != NULL && pkg->tp == BPROTO_RES_QUERY)
+        {
+            qp_unpacker_init(&unpacker, pkg->data, pkg->len);
+
+            if (    qp_is_map(qp_next(&unpacker, NULL)) &&
+                    qp_is_raw(qp_next(&unpacker, NULL)) &&  // success_msg
+                    qp_is_int(qp_next(&unpacker, &qp_tag))) // one result
+            {
+                q_tag->n += qp_tag.via.int64;
+
+                /* extract time-it info if needed */
+                if (query->timeit != NULL)
+                {
+                    siridb_query_timeit_from_unpacker(query, &unpacker);
+                }
+            }
+        }
+
+        /* make sure we free the promise and data */
+        free(promise->data);
+        sirinet_promise_decref(promise);
+    }
+
+    SIRIPARSER_ASYNC_NEXT_NODE
+}
+
+/*
+ * Call-back function: sirinet_promises_cb
+ *
+ * Make sure to run siri_async_incref() on the handle
+ */
+
+
+
 /******************************************************************************
  * Helper functions
  *****************************************************************************/
@@ -6404,3 +6748,97 @@ static void finish_count_groups(uv_async_t * handle)
 
     SIRIPARSER_ASYNC_NEXT_NODE
 }
+
+static int values_list_tags(siridb_tag_t * tag, uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr;
+    cexpr_cb_t cb = (cexpr_cb_t) siridb_group_cexpr_cb;
+    vec_t * props = ((query_list_t *) query->data)->props;
+
+    if (where_expr == NULL || cexpr_run(where_expr, cb, tag))
+    {
+        size_t i;
+        qp_add_type(query->packer, QP_ARRAY_OPEN);
+
+        for (i = 0; i < props->len; i++)
+        {
+            siridb_tag_prop(
+                    tag,
+                    query->packer,
+                    *((uint32_t *) props->data[i]));
+        }
+
+        qp_add_type(query->packer, QP_ARRAY_CLOSE);
+
+        return 1;
+    }
+
+    return 0;
+}
+
+static int values_count_tags(siridb_tag_t * tag, uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+
+    return cexpr_run(
+            ((query_list_t *) query->data)->where_expr,
+            (cexpr_cb_t) siridb_tag_cexpr_cb,
+            tag);
+}
+
+static void finish_list_tags(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    query_list_t * q_list = (query_list_t *) query->data;
+    siridb_t * siridb = query->client->siridb;
+
+    if (q_list->props == NULL)
+    {
+        q_list->props = vec_new(1);
+        if (q_list->props == NULL)
+        {
+            MEM_ERR_RET
+        }
+        vec_append(q_list->props, &GID_K_NAME);
+        qp_add_raw(query->packer, (const unsigned char *) "name", 4);
+    }
+
+    qp_add_type(query->packer, QP_ARRAY_CLOSE);
+
+    qp_add_raw(query->packer, (const unsigned char *) "tags", 4);
+    qp_add_type(query->packer, QP_ARRAY_OPEN);
+
+    ct_valuesn(
+            siridb->tags->tags,
+            &q_list->limit,
+            (ct_val_cb) values_list_tags,
+            handle);
+
+    qp_add_type(query->packer, QP_ARRAY_CLOSE);
+
+    SIRIPARSER_ASYNC_NEXT_NODE
+}
+
+static void finish_count_tags(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    query_count_t * q_count = (query_count_t *) query->data;
+    siridb_t * siridb = query->client->siridb;
+
+    /* Note: ct_values(..values_count_tags..) can only result in a positive
+     *       value.
+     */
+    size_t n = (q_count->where_expr == NULL) ?
+            siridb->tags->tags->len :
+            (size_t) ct_values(
+                        siridb->tags->tags,
+                        (ct_val_cb) values_count_tags,
+                        handle);
+
+    qp_add_raw(query->packer, (const unsigned char *) "tags", 4);
+
+    qp_add_int64(query->packer, n);
+
+    SIRIPARSER_ASYNC_NEXT_NODE
+}
index 14eaaf54666b4f5bcbcff622ace0e41519d23d47..d2954b2c9b739cb264944c6610093034ffc1157c 100644 (file)
 /*
  * Returns tag when successful or NULL in case of an error.
  */
-siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path)
+siridb_tag_t * siridb_tag_new(char * name)
 {
     siridb_tag_t * tag = (siridb_tag_t *) malloc(sizeof(siridb_tag_t));
     if (tag != NULL)
     {
         tag->ref = 1;
         tag->flags = 0;
-        tag->id = id;
-        tag->name = NULL;
-        ;
+        tag->name = name;
         tag->series = imap_new();
-
-        if (asprintf(
-                &tag->fn,
-                "%s%0*" PRIu32 ".tag",
-                tags_path,
-                TAGFN_NUMBERS,
-                id) < 0 || tag->series == NULL)
-        {
-            siridb__tag_free(tag);
-            tag = NULL;
-        }
     }
     return tag;
 }
@@ -74,7 +61,9 @@ siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn)
 
             if (!qp_is_array(qp_next(unpacker, NULL)) ||
                 qp_next(unpacker, &qp_tn) != QP_RAW ||
-                (tag->name = strndup(qp_tn.via.raw, qp_tn.len)) == NULL)
+                (tag->name = strndup(
+                        (const char *) qp_tn.via.raw,
+                        qp_tn.len)) == NULL)
             {
                 /* or a memory allocation error, but the same result */
                 log_critical(
@@ -96,7 +85,7 @@ siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn)
 
                     if (series == NULL)
                     {
-                        siridb_tags_require_save(siridb->tags, tag);
+                        siridb_tags_set_require_save(siridb->tags, tag);
 
                         log_error(
                                 "cannot find series id %" PRId64
index 5675ea3bbaf29fd23e5a5304dabacd29d4076c77..9c8a88b047ecae41cbabd8975726e9c0b938fb03 100644 (file)
@@ -24,8 +24,11 @@ static void TAGS_free(siridb_tags_t * tags);
 static int TAGS_load(siridb_t * siridb);
 static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer);
 static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup);
-static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list);
+static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list);
 static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag);
+static int TAGS_nseries(
+        siridb_tag_t * tag,
+        void * data __attribute__((unused)));
 
 /*
  * Initialize tags. Returns 0 if successful or -1 in case of an error.
@@ -42,7 +45,6 @@ int siridb_tags_init(siridb_t * siridb)
     siridb->tags->ref = 1;
     siridb->tags->tags = ct_new();
     siridb->tags->cleanup = vec_new(VEC_DEFAULT_SIZE);
-    siridb->tags->next_id = 0;
 
     uv_mutex_init(&siridb->tags->mutex);
 
@@ -78,7 +80,7 @@ void siridb_tags_decref(siridb_tags_t * tags)
 
 siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
 {
-    siridb_tag_t * tag = siridb_tag_new(tags->next_id++, tags->path);
+    siridb_tag_t * tag = siridb_tag_new(name);
     if (tag != NULL)
     {
         tag->name = strdup(name);
@@ -130,7 +132,7 @@ void siridb_tags_cleanup(uv_async_t * handle)
 
     while (tags->cleanup->len)
     {
-        tag = (siridb_tag_t *) slist_pop(tags->cleanup);
+        tag = (siridb_tag_t *) vec_pop(tags->cleanup);
 
         if (!tag->series->len &&
             (rmtag = (siridb_tag_t *) ct_pop(tags->tags, tag->name)) != NULL)
@@ -169,17 +171,17 @@ void siridb_tags_dropped_series(siridb_tags_t * tags)
 
     uv_mutex_lock(&tags->mutex);
 
-    tags_list = slist_new(tags->tags->len);
+    tags_list = vec_new(tags->tags->len);
 
     tags->flags &= ~TAGS_FLAG_DROPPED_SERIES;
 
-    ct_values(tags->tags, (ct_val_cb) TAGS_2slist, tags_list);
+    ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list);
 
     uv_mutex_unlock(&tags->mutex);
 
     while (tags_list->len)
     {
-        tag = (siridb_tag_t *) slist_pop(tags_list);
+        tag = (siridb_tag_t *) vec_pop(tags_list);
 
         uv_mutex_lock(&tags->mutex);
 
@@ -192,7 +194,7 @@ void siridb_tags_dropped_series(siridb_tags_t * tags)
         usleep(10000);  // 10ms
     }
 
-    slist_free(tags_list);
+    vec_free(tags_list);
 
     if (tags->cleanup->len)
     {
@@ -221,17 +223,17 @@ void siridb_tags_save(siridb_tags_t * tags)
 
     uv_mutex_lock(&tags->mutex);
 
-    tags_list = slist_new(tags->tags->len);
+    tags_list = vec_new(tags->tags->len);
 
     tags->flags &= ~TAGS_FLAG_REQUIRE_SAVE;
 
-    ct_values(tags->tags, (ct_val_cb) TAGS_2slist, tags_list);
+    ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list);
 
     uv_mutex_unlock(&tags->mutex);
 
     while (tags_list->len)
     {
-        tag = (siridb_tag_t *) slist_pop(tags_list);
+        tag = (siridb_tag_t *) vec_pop(tags_list);
 
         if (tag->flags & TAG_FLAG_REQUIRE_SAVE)
         {
@@ -249,16 +251,35 @@ void siridb_tags_save(siridb_tags_t * tags)
         usleep(10000);  // 10ms
     }
 
-    slist_free(tags_list);
+    vec_free(tags_list);
+}
+
+/*
+ * Initialize each 'n' group property with the local value.
+ */
+void siridb_tags_init_nseries(siridb_tags_t * tags)
+{
+    ct_values(tags->tags, (ct_val_cb) TAGS_nseries, NULL);
+}
+
+/*
+ * Main thread.
+ */
+static int TAGS_nseries(
+        siridb_tag_t * tag,
+        void * data __attribute__((unused)))
+{
+    tag->n = tag->series->len;
+    return 0;
 }
 
 /*
  * This function is called from the "Group" thread.
  */
-static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list)
+static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list)
 {
     siridb_tag_incref(tag);
-    slist_append(tags_list, tag);
+    vec_append(tags_list, tag);
     return 0;
 }
 
@@ -267,7 +288,7 @@ static int TAGS_2slist(siridb_tag_t * tag, vec_t * tags_list)
  */
 static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag)
 {
-    vec_t * tag_series = imap_slist_pop(tag->series);
+    vec_t * tag_series = imap_vec_pop(tag->series);
     siridb_series_t * series, * s = NULL;
 
     if (tag_series != NULL)
@@ -290,12 +311,12 @@ static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag)
         }
         else
         {
-            slist_free(tag_series);
+            vec_free(tag_series);
 
             if (!tag->series->len && (~tag->flags & TAG_FLAG_CLEANUP))
             {
                 tag->flags |= TAG_FLAG_CLEANUP;
-                if (slist_append_safe(&tags->cleanup, tag))
+                if (vec_append_safe(&tags->cleanup, tag))
                 {
                     log_critical(
                             "Unexpected error while appending tag to "
@@ -418,7 +439,7 @@ static void TAGS_free(siridb_tags_t * tags)
 
     if (tags->cleanup != NULL)
     {
-        slist_free(tags->cleanup);
+        vec_free(tags->cleanup);
     }
 
     uv_mutex_lock(&tags->mutex);