Work on tag support
authorJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 28 Jul 2020 07:16:51 +0000 (09:16 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 28 Jul 2020 07:16:51 +0000 (09:16 +0200)
include/siri/db/tag.h
include/siri/db/tags.h
include/siri/net/protocol.h
include/siri/version.h
src/siri/db/listener.c
src/siri/db/tag.c
src/siri/db/tags.c
src/siri/net/protocol.c

index 60406a977b127d984c70ad824244879027691d76..960cc5712f8efb4ac54ed058b9c374f79fd08a49 100644 (file)
@@ -16,12 +16,13 @@ enum
 #include <imap/imap.h>
 #include <siri/db/db.h>
 
-siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path);
+siridb_tag_t * siridb_tag_new(siridb_tags_t * tags, uint64_t id);
 void siridb__tag_decref(siridb_tag_t * tag);
 void siridb__tag_free(siridb_tag_t * tag);
 int siridb_tag_is_valid_fn(const char * fn);
-siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn);
+siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * _fn);
 int siridb_tag_save(siridb_tag_t * tag);
+char * siridb_tag_fn(siridb_tag_t * tag);
 int siridb_tag_is_remote_prop(uint32_t prop);
 void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop);
 int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond);
@@ -36,7 +37,9 @@ struct siridb_tag_s
     uint16_t ref;
     uint16_t flags;
     uint32_t n;
+    uint64_t id;
     char * name;
+    siridb_tags_t * tags;
     imap_t * series;
 };
 
index c8355661cfb01ec1935a248dc1e413a6673ee89b..5bf36ed01054073af01244f22c7017a29762596d 100644 (file)
@@ -25,19 +25,21 @@ struct siridb_tags_s
 {
     uint16_t flags;
     uint16_t ref;
+    uint32_t pad0;
+    uint64_t next_id;
     char * path;
     ct_t * tags;
-    vec_t * cleanup;
     uv_mutex_t mutex;
 };
 
 int siridb_tags_init(siridb_t * siridb);
 void siridb_tags_incref(siridb_tags_t * tags);
 void siridb_tags_decref(siridb_tags_t * tags);
+int siridb_tags_drop_tag(
+        siridb_tags_t * tags,
+        const char * name,
+        char * err_msg);
 siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name);
-sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid);
-ct_t * siridb_tags_lookup(siridb_tags_t * tags);
-void siridb_tags_cleanup(uv_async_t * handle);
 void siridb_tags_dropped_series(siridb_tags_t * tags);
 void siridb_tags_save(siridb_tags_t * tags);
 void siridb_tags_init_nseries(siridb_tags_t * tags);
index fb7dedb5a3ea5487223bd1ec786e6eeb7a35b848..23c5f17d9f25122a74c9fb348d5e5f45638cb3a2 100644 (file)
@@ -79,7 +79,6 @@ typedef enum
     BPROTO_DISABLE_BACKUP_MODE,         /* empty                            */
     BPROTO_TEE_PIPE_NAME_UPDATE,        /* tee pipe name                    */
     BPROTO_DROP_DATABASE,               /* empty                            */
-    BPROTO_REQ_TAGS,                    /* empty                            */
 } bproto_client_t;
 
 /*
index 98e2795bee900628ed0d2e60d004a62bda6ddeb4..912ebc22f375181c4353b2898319b868acc7d3e0 100644 (file)
@@ -6,7 +6,7 @@
 
 #define SIRIDB_VERSION_MAJOR 2
 #define SIRIDB_VERSION_MINOR 0
-#define SIRIDB_VERSION_PATCH 37
+#define SIRIDB_VERSION_PATCH 38
 
 /*
  * Use SIRIDB_VERSION_PRE_RELEASE for alpha release versions.
@@ -15,7 +15,7 @@
  * Note that debian alpha packages should use versions like this:
  *   2.0.34-0alpha0
  */
-#define SIRIDB_VERSION_PRE_RELEASE ""
+#define SIRIDB_VERSION_PRE_RELEASE "-alpha-0"
 
 #ifndef NDEBUG
 #define SIRIDB_VERSION_BUILD_RELEASE "+debug"
index 3b7eb4fafcefa29c06de1e99831dd338b9c2e0db..74f7bd4f2dc36eedba5c94c45b577706ea1da89b 100644 (file)
@@ -193,7 +193,12 @@ if (IS_MASTER && siridb_is_reindexing(siridb))                              \
 #define MSG_ERR_SERVER_ADDRESS \
     "Its only possible to change a servers address or port when the server " \
     "is not connected."
-
+#define MSG_SUCCESS_TAG \
+    "Successfully tagged %zu series."
+#define MSG_SUCCESS_UNTAG \
+    "Successfully unagged %zu series."
+#define MSG_SUCCESS_DROP_TAG \
+    "Successfully dropped tag '%s'."
 
 static void enter_access_expr(uv_async_t * handle);
 static void enter_alter_group(uv_async_t * handle);
@@ -225,6 +230,7 @@ static void enter_series_re(uv_async_t * handle);
 static void enter_series_setopr(uv_async_t * handle);
 static void enter_tag_series(uv_async_t * handle);
 static void enter_timeit_stmt(uv_async_t * handle);
+static void enter_untag_series(uv_async_t * handle);
 static void enter_where_xxx(uv_async_t * handle);
 static void enter_xxx_columns(uv_async_t * handle);
 
@@ -251,6 +257,7 @@ static void exit_drop_group(uv_async_t * handle);
 static void exit_drop_series(uv_async_t * handle);
 static void exit_drop_server(uv_async_t * handle);
 static void exit_drop_shards(uv_async_t * handle);
+static void exit_drop_tag(uv_async_t * handle);
 static void exit_drop_user(uv_async_t * handle);
 static void exit_grant_user(uv_async_t * handle);
 static void exit_help_xxx(uv_async_t * handle);
@@ -278,7 +285,9 @@ static void exit_set_select_points_limit(uv_async_t * handle);
 static void exit_set_tee_pipe_name(uv_async_t * handle);
 static void exit_set_timezone(uv_async_t * handle);
 static void exit_show_stmt(uv_async_t * handle);
+static void exit_tag_series(uv_async_t * handle);
 static void exit_timeit_stmt(uv_async_t * handle);
+static void exit_untag_series(uv_async_t * handle);
 
 /* async loop functions */
 static void async_count_series(uv_async_t * handle);
@@ -468,6 +477,7 @@ void siridb_init_listener(void)
     SIRIDB_NODE_ENTER[CLERI_GID_TAG_COLUMNS] = enter_xxx_columns;
     SIRIDB_NODE_ENTER[CLERI_GID_TAG_SERIES] = enter_tag_series;
     SIRIDB_NODE_ENTER[CLERI_GID_TIMEIT_STMT] = enter_timeit_stmt;
+    SIRIDB_NODE_ENTER[CLERI_GID_UNTAG_SERIES] = enter_untag_series;
     SIRIDB_NODE_ENTER[CLERI_GID_USER_COLUMNS] = enter_xxx_columns;
     SIRIDB_NODE_ENTER[CLERI_GID_WHERE_GROUP] = enter_where_xxx;
     SIRIDB_NODE_ENTER[CLERI_GID_WHERE_POOL] = enter_where_xxx;
@@ -501,6 +511,7 @@ void siridb_init_listener(void)
     SIRIDB_NODE_EXIT[CLERI_GID_DROP_SERIES] = exit_drop_series;
     SIRIDB_NODE_EXIT[CLERI_GID_DROP_SERVER] = exit_drop_server;
     SIRIDB_NODE_EXIT[CLERI_GID_DROP_SHARDS] = exit_drop_shards;
+    SIRIDB_NODE_EXIT[CLERI_GID_DROP_TAG] = exit_drop_tag;
     SIRIDB_NODE_EXIT[CLERI_GID_DROP_USER] = exit_drop_user;
     SIRIDB_NODE_EXIT[CLERI_GID_GRANT_USER] = exit_grant_user;
     SIRIDB_NODE_EXIT[CLERI_GID_LIST_GROUPS] = exit_list_groups;
@@ -527,7 +538,9 @@ void siridb_init_listener(void)
     SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name;
     SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
     SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt;
+    SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series;
     SIRIDB_NODE_EXIT[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt;
+    SIRIDB_NODE_EXIT[CLERI_GID_UNTAG_SERIES] = exit_untag_series;
 
     for (i = HELP_OFFSET; i < HELP_OFFSET + HELP_COUNT; i++)
     {
@@ -1588,7 +1601,7 @@ static void enter_tag_series(uv_async_t * handle)
     q_alter->tp = QUERY_ALTER_SERIES;
 
     MASTER_CHECK_ACCESSIBLE(siridb)
-    MASTER_CHECK_VERSION(siridb, "2.0.19")
+    MASTER_CHECK_VERSION(siridb, "2.0.38")
 
     cleri_node_t * tag_node =
                     query->nodes->node->children->next->node;
@@ -1679,6 +1692,68 @@ static void enter_timeit_stmt(uv_async_t * handle)
     SIRIPARSER_NEXT_NODE
 }
 
+static void enter_untag_series(uv_async_t * handle)
+{
+    siridb_query_t * query = (siridb_query_t *) handle->data;
+    siridb_t * siridb = query->client->siridb;
+    query_alter_t * q_alter = (query_alter_t *) query->data;
+
+    q_alter->tp = QUERY_ALTER_SERIES;
+
+    MASTER_CHECK_ACCESSIBLE(siridb)
+    MASTER_CHECK_VERSION(siridb, "2.0.38")
+
+    cleri_node_t * tag_node =
+                    query->nodes->node->children->next->node;
+    siridb_tag_t * tag;
+
+    char name[tag_node->len - 1];
+    xstr_extract_string(name, tag_node->str, tag_node->len);
+
+    tag = ct_get(siridb->tags->tags, name);
+
+    if (tag == NULL)
+    {
+        snprintf(query->err_msg,
+                SIRIDB_MAX_SIZE_ERR_MSG,
+                "Cannot find tag: '%s'",
+                name);
+        siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+        return;
+    }
+
+    uv_mutex_lock(&siridb->tags->mutex);
+
+    q_alter->n = q_alter->series_map->len;
+
+    imap_difference_ref(
+            tag->series,
+            q_alter->series_map,
+            (imap_free_cb) &siridb__series_decref);
+
+    siridb_tags_set_require_save(siridb->tags, tag);
+
+    uv_mutex_unlock(&siridb->tags->mutex);
+
+    q_alter->series_map = NULL;
+
+    QP_ADD_SUCCESS
+
+    if (IS_MASTER)
+    {
+        siridb_query_forward(
+                handle,
+                SIRIDB_QUERY_FWD_UPDATE,
+                (sirinet_promises_cb) on_tag_response,
+                0);
+    }
+    else
+    {
+        qp_add_int64(query->packer, q_alter->n);
+        SIRIPARSER_ASYNC_NEXT_NODE
+    }
+}
+
 static void enter_where_xxx(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -2867,6 +2942,45 @@ static void exit_drop_shards(uv_async_t * handle)
     }
 }
 
+static void exit_drop_tag(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+
+    MASTER_CHECK_ACCESSIBLE(siridb)
+
+    cleri_node_t * tag_node =
+            query->nodes->node->children->next->node;
+
+    char name[tag_node->len - 1];
+
+    xstr_extract_string(name, tag_node->str, tag_node->len);
+
+    if (siridb_tags_drop_tag(siridb->tags, name, query->err_msg))
+    {
+        siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+    }
+    else
+    {
+        QP_ADD_SUCCESS
+        log_info(MSG_SUCCESS_DROP_TAG, name);
+        qp_add_fmt_safe(query->packer, MSG_SUCCESS_DROP_TAG, name);
+
+        if (IS_MASTER)
+        {
+            siridb_query_forward(
+                    handle,
+                    SIRIDB_QUERY_FWD_UPDATE,
+                    (sirinet_promises_cb) on_update_xxx_response,
+                    0);
+        }
+        else
+        {
+            SIRIPARSER_ASYNC_NEXT_NODE
+        }
+    }
+}
+
 static void exit_drop_user(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -4728,6 +4842,21 @@ static void exit_show_stmt(uv_async_t * handle)
     SIRIPARSER_ASYNC_NEXT_NODE
 }
 
+static void exit_tag_series(uv_async_t * handle)
+{
+    siridb_query_t * query = (siridb_query_t *) handle->data;
+
+    if (IS_MASTER)
+    {
+        qp_add_fmt_safe(
+                query->packer,
+                MSG_SUCCESS_TAG,
+                ((query_alter_t *) query->data)->n);
+    }
+
+    SIRIPARSER_ASYNC_NEXT_NODE
+}
+
 static void exit_timeit_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -4768,6 +4897,21 @@ static void exit_timeit_stmt(uv_async_t * handle)
     SIRIPARSER_ASYNC_NEXT_NODE
 }
 
+static void exit_untag_series(uv_async_t * handle)
+{
+    siridb_query_t * query = (siridb_query_t *) handle->data;
+
+    if (IS_MASTER)
+    {
+        qp_add_fmt_safe(
+                query->packer,
+                MSG_SUCCESS_UNTAG,
+                ((query_alter_t *) query->data)->n);
+    }
+
+    SIRIPARSER_ASYNC_NEXT_NODE
+}
+
 /******************************************************************************
  * Async loop functions.
  *****************************************************************************/
index d2954b2c9b739cb264944c6610093034ffc1157c..bc9dfd6ac18732a46081f8cd8769640c1a8f0f3a 100644 (file)
 /*
  * Returns tag when successful or NULL in case of an error.
  */
-siridb_tag_t * siridb_tag_new(char * name)
+siridb_tag_t * siridb_tag_new(siridb_tags_t * tags, uint64_t id)
 {
     siridb_tag_t * tag = (siridb_tag_t *) malloc(sizeof(siridb_tag_t));
     if (tag != NULL)
     {
         tag->ref = 1;
         tag->flags = 0;
-        tag->name = name;
+        tag->id = id;
+        tag->name = NULL;
+        tag->tags = tags;
         tag->series = imap_new();
     }
     return tag;
 }
 
+char * siridb_tag_fn(siridb_tag_t * tag)
+{
+    char * fn;
+    if (asprintf(&fn, "%s%09"PRIx64".tag", tag->tags->path, tag->id) < 0)
+    {
+        return NULL;
+    }
+    return fn;
+}
+
 /*
  * Returns tag when successful or NULL in case of an error.
  */
-siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn)
+siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * _fn)
 {
-    siridb_tag_t * tag = siridb_tag_new(
-            (uint32_t) atoll(fn),
-            siridb->tags->path);
-    if (tag != NULL)
+    char * fn;
+    qp_obj_t qp_tn;
+    qp_obj_t qp_series_id;
+    uint64_t series_id;
+    siridb_series_t * series;
+    qp_unpacker_t * unpacker;
+    siridb_tag_t * tag = siridb_tag_new(siridb->tags, (uint32_t) atoll(_fn));
+
+    if (tag == NULL)
+    {
+        log_critical("Memory allocation error");
+        return NULL;
+    }
+
+    fn = siridb_tag_fn(tag);
+    if (fn == NULL)
     {
-        qp_unpacker_t * unpacker = qp_unpacker_ff(tag->fn);
-        if (unpacker == NULL)
+        log_critical("Memory allocation error");
+        goto fail0;
+    }
+
+    unpacker = qp_unpacker_ff(fn);
+    if (unpacker == NULL)
+    {
+        log_critical("Cannot open tag file for reading: %s", fn);
+        goto fail1;
+    }
+
+
+    if (!qp_is_array(qp_next(unpacker, NULL)) ||
+        qp_next(unpacker, &qp_tn) != QP_RAW ||
+        (tag->name = strndup((const char *) qp_tn.via.raw, qp_tn.len)) == NULL)
+    {
+        /* or a memory allocation error, but the same result */
+        log_critical("Expected an array with a tag name in file: %s", fn);
+        goto fail2;
+    }
+
+
+    while (qp_next(unpacker, &qp_series_id) == QP_INT64)
+    {
+        series_id = (uint64_t) qp_series_id.via.int64;
+        series = imap_get(siridb->series_map, series_id);
+
+        if (series == NULL)
         {
-            log_critical("cannot open tag file for reading: %s", tag->fn);
-            siridb__tag_free(tag);
-            tag = NULL;
+            siridb_tags_set_require_save(siridb->tags, tag);
+
+            log_error(
+                    "Cannot find series id %" PRId64
+                    " which was tagged with '%s'",
+                    qp_series_id.via.int64,
+                    tag->name);
+        }
+        else if (imap_add(tag->series, series_id, series) == 0)
+        {
+            siridb_series_incref(series);
         }
         else
         {
-            qp_obj_t qp_tn;
-
-            if (!qp_is_array(qp_next(unpacker, NULL)) ||
-                qp_next(unpacker, &qp_tn) != QP_RAW ||
-                (tag->name = strndup(
-                        (const char *) qp_tn.via.raw,
-                        qp_tn.len)) == NULL)
-            {
-                /* or a memory allocation error, but the same result */
-                log_critical(
-                        "expected an array with a tag name in file: %s",
-                        tag->fn);
-                siridb__tag_free(tag);
-                tag = NULL;
-            }
-            else
-            {
-                qp_obj_t qp_series_id;
-                uint64_t series_id;
-                siridb_series_t * series;
-
-                while (qp_next(unpacker, &qp_series_id) == QP_INT64)
-                {
-                    series_id = (uint64_t) qp_series_id.via.int64;
-                    series = imap_get(siridb->series_map, series_id);
-
-                    if (series == NULL)
-                    {
-                        siridb_tags_set_require_save(siridb->tags, tag);
-
-                        log_error(
-                                "cannot find series id %" PRId64
-                                " which was tagged with '%s'",
-                                qp_series_id.via.int64,
-                                tag->name);
-                    }
-                    else if (imap_add(tag->series, series_id, series) == 0)
-                    {
-                        siridb_series_incref(series);
-                    }
-                    else
-                    {
-                        log_critical(
-                                "cannot add series '%s' to tag '%s'",
-                                series->name,
-                                tag->name);
-                    }
-                }
-            }
-            qp_unpacker_ff_free(unpacker);
+            log_error(
+                    "Cannot add series '%s' to tag '%s'",
+                    series->name,
+                    tag->name);
         }
     }
+
+    qp_unpacker_ff_free(unpacker);
+    free(fn);
     return tag;
+
+fail2:
+    qp_unpacker_ff_free(unpacker);
+fail1:
+    free(fn);
+fail0:
+    siridb__tag_free(tag);
+    return NULL;
+}
+
+
+
+static int tag__save_cb(siridb_series_t * series, qp_fpacker_t * fpacker)
+{
+    return qp_fadd_int64(fpacker, (int64_t) series->id);
 }
 
 /*
@@ -117,12 +147,19 @@ siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn)
  */
 int siridb_tag_save(siridb_tag_t * tag)
 {
+    int rc = -1;
     qp_fpacker_t * fpacker;
 
-    fpacker = qp_open(tag->fn, "w");
+    char * fn = siridb_tag_fn(tag);
+    if (fn == NULL)
+    {
+        return rc;
+    }
+
+    fpacker = qp_open(fn, "w");
     if (fpacker == NULL)
     {
-        return -1;
+        goto fail0;
     }
 
     if (/* open a new array */
@@ -131,29 +168,17 @@ int siridb_tag_save(siridb_tag_t * tag)
         /* write the tag name */
         qp_fadd_string(fpacker, tag->name))
     {
-        qp_close(fpacker);
-        return -1;
+        goto fail1;
     }
 
-    /* TODO: maybe replace with walk */
-    vec_t * series_list = imap_vec(tag->series);
+    rc = imap_walk(tag->series, (imap_cb) tag__save_cb, fpacker);
 
-    if (series_list != NULL)
-    {
-        siridb_series_t * series;
-        for (size_t i = 0; i < series_list->len; i++)
-        {
-            series = (siridb_series_t *) series_list->data[i];
-            qp_fadd_int64(fpacker, (int64_t) series->id);
-        }
-    }
+fail1:
+    rc = qp_close(fpacker) || rc;
 
-    if (qp_close(fpacker) || series_list == NULL)
-    {
-        return -1;
-    }
-
-    return 0;
+fail0:
+    free(fn);
+    return rc;
 }
 
 /*
@@ -176,7 +201,7 @@ void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop)
         qp_add_string(packer, tag->name);
         break;
     case CLERI_GID_K_SERIES:
-        qp_add_int64(packer, (int64_t) tag->id);
+        qp_add_int64(packer, (int64_t) tag->n);
         break;
     }
 }
@@ -188,7 +213,7 @@ int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond)
     case CLERI_GID_K_NAME:
         return cexpr_str_cmp(cond->operator, tag->name, cond->str);
     case CLERI_GID_K_SERIES:
-        return cexpr_int_cmp(cond->operator, (int64_t) tag->id, cond->int64);
+        return cexpr_int_cmp(cond->operator, (int64_t) tag->n, cond->int64);
     }
 
     log_critical("Unknown group property received: %d", cond->prop);
@@ -218,17 +243,22 @@ void siridb__tag_free(siridb_tag_t * tag)
     log_debug("Free tag: '%s'", tag->name);
 #endif
 
-    if ((tag->flags & TAG_FLAG_CLEANUP) && unlink(tag->fn))
+    if ((tag->flags & TAG_FLAG_CLEANUP))
     {
-        log_critical("Cannot remove tag file: '%s'", tag->fn);
+        char * fn = siridb_tag_fn(tag);
+        if (!fn || unlink(fn))
+        {
+            log_critical("Cannot remove tag (file): '%s'", tag->name);
+        }
+        free(fn);
     }
     else if ((tag->flags & TAG_FLAG_REQUIRE_SAVE) && siridb_tag_save(tag))
     {
-        log_critical("Cannot save tag file: '%s'", tag->fn);
+        log_critical("Cannot save tag '%s'", tag->name);
     }
 
     free(tag->name);
-    free(tag->fn);
+
     if (tag->series != NULL)
     {
         imap_free(tag->series, (imap_free_cb) siridb__series_decref);
index 9c8a88b047ecae41cbabd8975726e9c0b938fb03..66b8ee71b5f5dc67889d1398114c50599d73fbc5 100644 (file)
 
 static void TAGS_free(siridb_tags_t * tags);
 static int TAGS_load(siridb_t * siridb);
-static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer);
-static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup);
-static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list);
-static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag);
+static int TAGS_dropped_series(
+        siridb_tag_t * tag,
+        void * data __attribute__((unused)));
 static int TAGS_nseries(
         siridb_tag_t * tag,
         void * data __attribute__((unused)));
@@ -43,8 +42,8 @@ int siridb_tags_init(siridb_t * siridb)
     }
     siridb->tags->flags = 0;
     siridb->tags->ref = 1;
+    siridb->tags->next_id = 0;
     siridb->tags->tags = ct_new();
-    siridb->tags->cleanup = vec_new(VEC_DEFAULT_SIZE);
 
     uv_mutex_init(&siridb->tags->mutex);
 
@@ -54,8 +53,7 @@ int siridb_tags_init(siridb_t * siridb)
             siridb->dbpath,
             SIRIDB_TAGS_PATH) < 0 ||
             siridb->tags->tags == NULL ||
-            siridb->tags->cleanup == NULL ||
-        TAGS_load(siridb))
+            TAGS_load(siridb))
     {
         TAGS_free(siridb->tags);
         siridb->tags = NULL;
@@ -78,180 +76,92 @@ void siridb_tags_decref(siridb_tags_t * tags)
     }
 }
 
-siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
-{
-    siridb_tag_t * tag = siridb_tag_new(name);
-    if (tag != NULL)
-    {
-        tag->name = strdup(name);
-        if (tag->name == NULL || ct_add(tags->tags, name, tag))
-        {
-            siridb_tag_decref(tag);
-            tag = NULL;
-        }
-    }
-    return tag;
-}
-
 /*
  * Main thread.
  *
- * Returns NULL and raises a signal in case of an error.
+ * Returns 0 if successful or -1 when the group is not found.
+ * (in case not found an error message is set)
+ *
+ * Note: when saving the groups to disk has failed, we log critical but
+ * the function still returns 0;
  */
-sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid)
+int siridb_tags_drop_tag(
+        siridb_tags_t * tags,
+        const char * name,
+        char * err_msg)
 {
-    qp_packer_t * packer = sirinet_packer_new(8192);
-    int rc;
+    uv_mutex_lock(&tags->mutex);
 
-    if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN))
-    {
-        return NULL;  /* signal is raised */
-    }
+    siridb_tag_t * tag = (siridb_tag_t *) ct_pop(tags->tags, name);
 
-    rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer);
+    uv_mutex_unlock(&tags->mutex);
 
-    if (rc)
+    if (tag == NULL)
     {
-        /*  signal is raised when not 0 */
-        qp_packer_free(packer);
-        return NULL;
+        snprintf(err_msg,
+                SIRIDB_MAX_SIZE_ERR_MSG,
+                "Tag '%s' does not exist.",
+                name);
+        return -1;
     }
 
-    return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS);
+    tag->flags |= TAG_FLAG_CLEANUP;
+    siridb_tag_decref(tag);
+
+    return 0;
 }
 
-/*
- * This function will set and unset the mutex lock.
- */
-void siridb_tags_cleanup(uv_async_t * handle)
+siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
 {
-    siridb_tags_t * tags = (siridb_tags_t *) handle->data;
-    siridb_tag_t * tag, * rmtag;
-
-    uv_mutex_lock(&tags->mutex);
+    siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++);
 
-    while (tags->cleanup->len)
+    if (tag != NULL)
     {
-        tag = (siridb_tag_t *) vec_pop(tags->cleanup);
-
-        if (!tag->series->len &&
-            (rmtag = (siridb_tag_t *) ct_pop(tags->tags, tag->name)) != NULL)
+        tag->name = strdup(name);
+        if (tag->name == NULL || ct_add(tags->tags, name, tag))
         {
-#ifdef DEBUG
-            assert(rmtag == tag && (tag->flags & TAG_FLAG_CLEANUP));
-#endif
-            siridb_tag_decref(rmtag);
+            siridb_tag_decref(tag);
+            tag = NULL;
         }
     }
-
-    uv_mutex_unlock(&tags->mutex);
-
-    siridb_tags_decref(tags);
-
-    uv_close((uv_handle_t *) handle, (uv_close_cb) free);
+    return tag;
 }
 
-ct_t * siridb_tags_lookup(siridb_tags_t * tags)
-{
-    ct_t * lookup = ct_new();
-    if (lookup != NULL)
-    {
-        ct_values(tags->tags, (ct_val_cb) &TAGS_ctmap_update, lookup);
-    }
-    return lookup;
-}
 
 /*
  * This function is called from the "Group" thread.
  */
 void siridb_tags_dropped_series(siridb_tags_t * tags)
 {
-    siridb_tag_t * tag;
-    vec_t * tags_list;
-
     uv_mutex_lock(&tags->mutex);
 
-    tags_list = vec_new(tags->tags->len);
+    ct_values(tags->tags, (ct_val_cb) TAGS_dropped_series, tags);
 
     tags->flags &= ~TAGS_FLAG_DROPPED_SERIES;
 
-    ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list);
-
     uv_mutex_unlock(&tags->mutex);
+}
 
-    while (tags_list->len)
-    {
-        tag = (siridb_tag_t *) vec_pop(tags_list);
-
-        uv_mutex_lock(&tags->mutex);
-
-        TAGS_dropped_series(tags, tag);
-
-        siridb_tag_decref(tag);
-
-        uv_mutex_unlock(&tags->mutex);
-
-        usleep(10000);  // 10ms
-    }
-
-    vec_free(tags_list);
-
-    if (tags->cleanup->len)
-    {
-        uv_async_t * cleanup = (uv_async_t *) malloc(sizeof(uv_async_t));
-
-        if (cleanup == NULL)
-        {
-            log_critical("Allocation error while creating cleanup task");
-            return;
-        }
-        siridb_tags_incref(tags);
-
-        cleanup->data = (void *) tags;
+static int TAGS__save_cb(
+        siridb_tag_t * tag,
+        void * data __attribute__((unused)))
+{
+    siridb_tag_save(tag);
+    tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE;
 
-        uv_async_init(siri.loop,
-                cleanup,
-                (uv_async_cb) siridb_tags_cleanup);
-        uv_async_send(cleanup);
-    }
+    usleep(10000);  // 10ms
+    return 0;
 }
 
 void siridb_tags_save(siridb_tags_t * tags)
 {
-    siridb_tag_t * tag;
-    vec_t * tags_list;
-
     uv_mutex_lock(&tags->mutex);
 
-    tags_list = vec_new(tags->tags->len);
+    ct_values(tags->tags, (ct_val_cb) TAGS__save_cb, NULL);
 
     tags->flags &= ~TAGS_FLAG_REQUIRE_SAVE;
 
-    ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list);
-
     uv_mutex_unlock(&tags->mutex);
-
-    while (tags_list->len)
-    {
-        tag = (siridb_tag_t *) vec_pop(tags_list);
-
-        if (tag->flags & TAG_FLAG_REQUIRE_SAVE)
-        {
-            uv_mutex_lock(&tags->mutex);
-
-            siridb_tag_save(tag);
-
-            tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE;
-
-            uv_mutex_unlock(&tags->mutex);
-        }
-
-        siridb_tag_decref(tag);
-
-        usleep(10000);  // 10ms
-    }
-
-    vec_free(tags_list);
 }
 
 /*
@@ -276,81 +186,34 @@ static int TAGS_nseries(
 /*
  * This function is called from the "Group" thread.
  */
-static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list)
-{
-    siridb_tag_incref(tag);
-    vec_append(tags_list, tag);
-    return 0;
-}
-
-/*
- * This function is called from the "Group" thread.
- */
-static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag)
+static int TAGS_dropped_series(
+        siridb_tag_t * tag,
+        void * data __attribute__((unused)))
 {
     vec_t * tag_series = imap_vec_pop(tag->series);
-    siridb_series_t * series, * s = NULL;
+    siridb_series_t * series;
 
     if (tag_series != NULL)
     {
         for (size_t i = 0; i < tag_series->len; i++)
         {
             series = (siridb_series_t *) tag_series->data[i];
-            if (series->flags & SIRIDB_SERIES_IS_DROPPED)
+            if ((series->flags & SIRIDB_SERIES_IS_DROPPED) &&
+                imap_pop(tag->series, series->id))
             {
-                s = (siridb_series_t *) imap_pop(tag->series, series->id);
-                assert (s != NULL);
-                siridb_series_decref(s);
+                siridb_series_decref(series);
+                siridb_tags_set_require_save(tag->tags, tag);
             }
         }
 
-        if (s == NULL)
-        {
-            /* unchanged, we can put the list back */
-            tag->series->vec = tag_series;
-        }
-        else
-        {
-            vec_free(tag_series);
-
-            if (!tag->series->len && (~tag->flags & TAG_FLAG_CLEANUP))
-            {
-                tag->flags |= TAG_FLAG_CLEANUP;
-                if (vec_append_safe(&tags->cleanup, tag))
-                {
-                    log_critical(
-                            "Unexpected error while appending tag to "
-                            "cleanup list");
-                }
-            }
-        }
+        vec_free(tag_series);
     }
 
-    return 0;
-}
+    usleep(10000);  // 10ms
 
-static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup)
-{
-    if (tag->series->len)
-    {
-        volatile uintptr_t iptr = (uint32_t) tag->series->len;
-        return ct_add(lookup, tag->name, (uint32_t *) iptr);
-    }
     return 0;
 }
 
-/*
- * Main thread.
- */
-static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer)
-{
-    int rc = 0;
-    rc += qp_add_type(packer, QP_ARRAY2);
-    rc += qp_add_string_term(packer, tag->name);
-    rc += qp_add_int64(packer, (int64_t) tag->series->len);
-    return rc;
-}
-
 static int TAGS_load(siridb_t * siridb)
 {
     struct stat st = {0};
@@ -432,16 +295,8 @@ static int TAGS_load(siridb_t * siridb)
 
 static void TAGS_free(siridb_tags_t * tags)
 {
-#ifdef DEBUG
-    log_debug("Free tags");
-#endif
     free(tags->path);
 
-    if (tags->cleanup != NULL)
-    {
-        vec_free(tags->cleanup);
-    }
-
     uv_mutex_lock(&tags->mutex);
 
     if (tags->tags != NULL)
index 5c685d2a41957e993602a1ca18721e46bda9012b..0d50d335bd70f31e514bd589d6b57aecce3d5c65 100644 (file)
@@ -86,7 +86,6 @@ const char * sirinet_bproto_client_str(bproto_client_t n)
     case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
     case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
     case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
-    case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
     default:
         sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
         return protocol_str;