From: Jeroen van der Heijden Date: Tue, 28 Jul 2020 07:16:51 +0000 (+0200) Subject: Work on tag support X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~3^2~6^2~9 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=3d717f2b0e6753b8bb8577df72f81634c5f05287;p=siridb-server.git Work on tag support --- diff --git a/include/siri/db/tag.h b/include/siri/db/tag.h index 60406a97..960cc571 100644 --- a/include/siri/db/tag.h +++ b/include/siri/db/tag.h @@ -16,12 +16,13 @@ enum #include #include -siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path); +siridb_tag_t * siridb_tag_new(siridb_tags_t * tags, uint64_t id); void siridb__tag_decref(siridb_tag_t * tag); void siridb__tag_free(siridb_tag_t * tag); int siridb_tag_is_valid_fn(const char * fn); -siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn); +siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * _fn); int siridb_tag_save(siridb_tag_t * tag); +char * siridb_tag_fn(siridb_tag_t * tag); int siridb_tag_is_remote_prop(uint32_t prop); void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop); int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond); @@ -36,7 +37,9 @@ struct siridb_tag_s uint16_t ref; uint16_t flags; uint32_t n; + uint64_t id; char * name; + siridb_tags_t * tags; imap_t * series; }; diff --git a/include/siri/db/tags.h b/include/siri/db/tags.h index c8355661..5bf36ed0 100644 --- a/include/siri/db/tags.h +++ b/include/siri/db/tags.h @@ -25,19 +25,21 @@ struct siridb_tags_s { uint16_t flags; uint16_t ref; + uint32_t pad0; + uint64_t next_id; char * path; ct_t * tags; - vec_t * cleanup; uv_mutex_t mutex; }; int siridb_tags_init(siridb_t * siridb); void siridb_tags_incref(siridb_tags_t * tags); void siridb_tags_decref(siridb_tags_t * tags); +int siridb_tags_drop_tag( + siridb_tags_t * tags, + const char * name, + char * err_msg); siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name); -sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid); -ct_t * siridb_tags_lookup(siridb_tags_t * tags); -void siridb_tags_cleanup(uv_async_t * handle); void siridb_tags_dropped_series(siridb_tags_t * tags); void siridb_tags_save(siridb_tags_t * tags); void siridb_tags_init_nseries(siridb_tags_t * tags); diff --git a/include/siri/net/protocol.h b/include/siri/net/protocol.h index fb7dedb5..23c5f17d 100644 --- a/include/siri/net/protocol.h +++ b/include/siri/net/protocol.h @@ -79,7 +79,6 @@ typedef enum BPROTO_DISABLE_BACKUP_MODE, /* empty */ BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */ BPROTO_DROP_DATABASE, /* empty */ - BPROTO_REQ_TAGS, /* empty */ } bproto_client_t; /* diff --git a/include/siri/version.h b/include/siri/version.h index 98e2795b..912ebc22 100644 --- a/include/siri/version.h +++ b/include/siri/version.h @@ -6,7 +6,7 @@ #define SIRIDB_VERSION_MAJOR 2 #define SIRIDB_VERSION_MINOR 0 -#define SIRIDB_VERSION_PATCH 37 +#define SIRIDB_VERSION_PATCH 38 /* * Use SIRIDB_VERSION_PRE_RELEASE for alpha release versions. @@ -15,7 +15,7 @@ * Note that debian alpha packages should use versions like this: * 2.0.34-0alpha0 */ -#define SIRIDB_VERSION_PRE_RELEASE "" +#define SIRIDB_VERSION_PRE_RELEASE "-alpha-0" #ifndef NDEBUG #define SIRIDB_VERSION_BUILD_RELEASE "+debug" diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 3b7eb4fa..74f7bd4f 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -193,7 +193,12 @@ if (IS_MASTER && siridb_is_reindexing(siridb)) \ #define MSG_ERR_SERVER_ADDRESS \ "Its only possible to change a servers address or port when the server " \ "is not connected." - +#define MSG_SUCCESS_TAG \ + "Successfully tagged %zu series." +#define MSG_SUCCESS_UNTAG \ + "Successfully unagged %zu series." +#define MSG_SUCCESS_DROP_TAG \ + "Successfully dropped tag '%s'." static void enter_access_expr(uv_async_t * handle); static void enter_alter_group(uv_async_t * handle); @@ -225,6 +230,7 @@ static void enter_series_re(uv_async_t * handle); static void enter_series_setopr(uv_async_t * handle); static void enter_tag_series(uv_async_t * handle); static void enter_timeit_stmt(uv_async_t * handle); +static void enter_untag_series(uv_async_t * handle); static void enter_where_xxx(uv_async_t * handle); static void enter_xxx_columns(uv_async_t * handle); @@ -251,6 +257,7 @@ static void exit_drop_group(uv_async_t * handle); static void exit_drop_series(uv_async_t * handle); static void exit_drop_server(uv_async_t * handle); static void exit_drop_shards(uv_async_t * handle); +static void exit_drop_tag(uv_async_t * handle); static void exit_drop_user(uv_async_t * handle); static void exit_grant_user(uv_async_t * handle); static void exit_help_xxx(uv_async_t * handle); @@ -278,7 +285,9 @@ static void exit_set_select_points_limit(uv_async_t * handle); static void exit_set_tee_pipe_name(uv_async_t * handle); static void exit_set_timezone(uv_async_t * handle); static void exit_show_stmt(uv_async_t * handle); +static void exit_tag_series(uv_async_t * handle); static void exit_timeit_stmt(uv_async_t * handle); +static void exit_untag_series(uv_async_t * handle); /* async loop functions */ static void async_count_series(uv_async_t * handle); @@ -468,6 +477,7 @@ void siridb_init_listener(void) SIRIDB_NODE_ENTER[CLERI_GID_TAG_COLUMNS] = enter_xxx_columns; SIRIDB_NODE_ENTER[CLERI_GID_TAG_SERIES] = enter_tag_series; SIRIDB_NODE_ENTER[CLERI_GID_TIMEIT_STMT] = enter_timeit_stmt; + SIRIDB_NODE_ENTER[CLERI_GID_UNTAG_SERIES] = enter_untag_series; SIRIDB_NODE_ENTER[CLERI_GID_USER_COLUMNS] = enter_xxx_columns; SIRIDB_NODE_ENTER[CLERI_GID_WHERE_GROUP] = enter_where_xxx; SIRIDB_NODE_ENTER[CLERI_GID_WHERE_POOL] = enter_where_xxx; @@ -501,6 +511,7 @@ void siridb_init_listener(void) SIRIDB_NODE_EXIT[CLERI_GID_DROP_SERIES] = exit_drop_series; SIRIDB_NODE_EXIT[CLERI_GID_DROP_SERVER] = exit_drop_server; SIRIDB_NODE_EXIT[CLERI_GID_DROP_SHARDS] = exit_drop_shards; + SIRIDB_NODE_EXIT[CLERI_GID_DROP_TAG] = exit_drop_tag; SIRIDB_NODE_EXIT[CLERI_GID_DROP_USER] = exit_drop_user; SIRIDB_NODE_EXIT[CLERI_GID_GRANT_USER] = exit_grant_user; SIRIDB_NODE_EXIT[CLERI_GID_LIST_GROUPS] = exit_list_groups; @@ -527,7 +538,9 @@ void siridb_init_listener(void) SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name; SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone; SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt; + SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series; SIRIDB_NODE_EXIT[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt; + SIRIDB_NODE_EXIT[CLERI_GID_UNTAG_SERIES] = exit_untag_series; for (i = HELP_OFFSET; i < HELP_OFFSET + HELP_COUNT; i++) { @@ -1588,7 +1601,7 @@ static void enter_tag_series(uv_async_t * handle) q_alter->tp = QUERY_ALTER_SERIES; MASTER_CHECK_ACCESSIBLE(siridb) - MASTER_CHECK_VERSION(siridb, "2.0.19") + MASTER_CHECK_VERSION(siridb, "2.0.38") cleri_node_t * tag_node = query->nodes->node->children->next->node; @@ -1679,6 +1692,68 @@ static void enter_timeit_stmt(uv_async_t * handle) SIRIPARSER_NEXT_NODE } +static void enter_untag_series(uv_async_t * handle) +{ + siridb_query_t * query = (siridb_query_t *) handle->data; + siridb_t * siridb = query->client->siridb; + query_alter_t * q_alter = (query_alter_t *) query->data; + + q_alter->tp = QUERY_ALTER_SERIES; + + MASTER_CHECK_ACCESSIBLE(siridb) + MASTER_CHECK_VERSION(siridb, "2.0.38") + + cleri_node_t * tag_node = + query->nodes->node->children->next->node; + siridb_tag_t * tag; + + char name[tag_node->len - 1]; + xstr_extract_string(name, tag_node->str, tag_node->len); + + tag = ct_get(siridb->tags->tags, name); + + if (tag == NULL) + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Cannot find tag: '%s'", + name); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + + uv_mutex_lock(&siridb->tags->mutex); + + q_alter->n = q_alter->series_map->len; + + imap_difference_ref( + tag->series, + q_alter->series_map, + (imap_free_cb) &siridb__series_decref); + + siridb_tags_set_require_save(siridb->tags, tag); + + uv_mutex_unlock(&siridb->tags->mutex); + + q_alter->series_map = NULL; + + QP_ADD_SUCCESS + + if (IS_MASTER) + { + siridb_query_forward( + handle, + SIRIDB_QUERY_FWD_UPDATE, + (sirinet_promises_cb) on_tag_response, + 0); + } + else + { + qp_add_int64(query->packer, q_alter->n); + SIRIPARSER_ASYNC_NEXT_NODE + } +} + static void enter_where_xxx(uv_async_t * handle) { siridb_query_t * query = handle->data; @@ -2867,6 +2942,45 @@ static void exit_drop_shards(uv_async_t * handle) } } +static void exit_drop_tag(uv_async_t * handle) +{ + siridb_query_t * query = handle->data; + siridb_t * siridb = query->client->siridb; + + MASTER_CHECK_ACCESSIBLE(siridb) + + cleri_node_t * tag_node = + query->nodes->node->children->next->node; + + char name[tag_node->len - 1]; + + xstr_extract_string(name, tag_node->str, tag_node->len); + + if (siridb_tags_drop_tag(siridb->tags, name, query->err_msg)) + { + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + } + else + { + QP_ADD_SUCCESS + log_info(MSG_SUCCESS_DROP_TAG, name); + qp_add_fmt_safe(query->packer, MSG_SUCCESS_DROP_TAG, name); + + if (IS_MASTER) + { + siridb_query_forward( + handle, + SIRIDB_QUERY_FWD_UPDATE, + (sirinet_promises_cb) on_update_xxx_response, + 0); + } + else + { + SIRIPARSER_ASYNC_NEXT_NODE + } + } +} + static void exit_drop_user(uv_async_t * handle) { siridb_query_t * query = handle->data; @@ -4728,6 +4842,21 @@ static void exit_show_stmt(uv_async_t * handle) SIRIPARSER_ASYNC_NEXT_NODE } +static void exit_tag_series(uv_async_t * handle) +{ + siridb_query_t * query = (siridb_query_t *) handle->data; + + if (IS_MASTER) + { + qp_add_fmt_safe( + query->packer, + MSG_SUCCESS_TAG, + ((query_alter_t *) query->data)->n); + } + + SIRIPARSER_ASYNC_NEXT_NODE +} + static void exit_timeit_stmt(uv_async_t * handle) { siridb_query_t * query = handle->data; @@ -4768,6 +4897,21 @@ static void exit_timeit_stmt(uv_async_t * handle) SIRIPARSER_ASYNC_NEXT_NODE } +static void exit_untag_series(uv_async_t * handle) +{ + siridb_query_t * query = (siridb_query_t *) handle->data; + + if (IS_MASTER) + { + qp_add_fmt_safe( + query->packer, + MSG_SUCCESS_UNTAG, + ((query_alter_t *) query->data)->n); + } + + SIRIPARSER_ASYNC_NEXT_NODE +} + /****************************************************************************** * Async loop functions. *****************************************************************************/ diff --git a/src/siri/db/tag.c b/src/siri/db/tag.c index d2954b2c..bc9dfd6a 100644 --- a/src/siri/db/tag.c +++ b/src/siri/db/tag.c @@ -25,91 +25,121 @@ /* * Returns tag when successful or NULL in case of an error. */ -siridb_tag_t * siridb_tag_new(char * name) +siridb_tag_t * siridb_tag_new(siridb_tags_t * tags, uint64_t id) { siridb_tag_t * tag = (siridb_tag_t *) malloc(sizeof(siridb_tag_t)); if (tag != NULL) { tag->ref = 1; tag->flags = 0; - tag->name = name; + tag->id = id; + tag->name = NULL; + tag->tags = tags; tag->series = imap_new(); } return tag; } +char * siridb_tag_fn(siridb_tag_t * tag) +{ + char * fn; + if (asprintf(&fn, "%s%09"PRIx64".tag", tag->tags->path, tag->id) < 0) + { + return NULL; + } + return fn; +} + /* * Returns tag when successful or NULL in case of an error. */ -siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn) +siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * _fn) { - siridb_tag_t * tag = siridb_tag_new( - (uint32_t) atoll(fn), - siridb->tags->path); - if (tag != NULL) + char * fn; + qp_obj_t qp_tn; + qp_obj_t qp_series_id; + uint64_t series_id; + siridb_series_t * series; + qp_unpacker_t * unpacker; + siridb_tag_t * tag = siridb_tag_new(siridb->tags, (uint32_t) atoll(_fn)); + + if (tag == NULL) + { + log_critical("Memory allocation error"); + return NULL; + } + + fn = siridb_tag_fn(tag); + if (fn == NULL) { - qp_unpacker_t * unpacker = qp_unpacker_ff(tag->fn); - if (unpacker == NULL) + log_critical("Memory allocation error"); + goto fail0; + } + + unpacker = qp_unpacker_ff(fn); + if (unpacker == NULL) + { + log_critical("Cannot open tag file for reading: %s", fn); + goto fail1; + } + + + if (!qp_is_array(qp_next(unpacker, NULL)) || + qp_next(unpacker, &qp_tn) != QP_RAW || + (tag->name = strndup((const char *) qp_tn.via.raw, qp_tn.len)) == NULL) + { + /* or a memory allocation error, but the same result */ + log_critical("Expected an array with a tag name in file: %s", fn); + goto fail2; + } + + + while (qp_next(unpacker, &qp_series_id) == QP_INT64) + { + series_id = (uint64_t) qp_series_id.via.int64; + series = imap_get(siridb->series_map, series_id); + + if (series == NULL) { - log_critical("cannot open tag file for reading: %s", tag->fn); - siridb__tag_free(tag); - tag = NULL; + siridb_tags_set_require_save(siridb->tags, tag); + + log_error( + "Cannot find series id %" PRId64 + " which was tagged with '%s'", + qp_series_id.via.int64, + tag->name); + } + else if (imap_add(tag->series, series_id, series) == 0) + { + siridb_series_incref(series); } else { - qp_obj_t qp_tn; - - if (!qp_is_array(qp_next(unpacker, NULL)) || - qp_next(unpacker, &qp_tn) != QP_RAW || - (tag->name = strndup( - (const char *) qp_tn.via.raw, - qp_tn.len)) == NULL) - { - /* or a memory allocation error, but the same result */ - log_critical( - "expected an array with a tag name in file: %s", - tag->fn); - siridb__tag_free(tag); - tag = NULL; - } - else - { - qp_obj_t qp_series_id; - uint64_t series_id; - siridb_series_t * series; - - while (qp_next(unpacker, &qp_series_id) == QP_INT64) - { - series_id = (uint64_t) qp_series_id.via.int64; - series = imap_get(siridb->series_map, series_id); - - if (series == NULL) - { - siridb_tags_set_require_save(siridb->tags, tag); - - log_error( - "cannot find series id %" PRId64 - " which was tagged with '%s'", - qp_series_id.via.int64, - tag->name); - } - else if (imap_add(tag->series, series_id, series) == 0) - { - siridb_series_incref(series); - } - else - { - log_critical( - "cannot add series '%s' to tag '%s'", - series->name, - tag->name); - } - } - } - qp_unpacker_ff_free(unpacker); + log_error( + "Cannot add series '%s' to tag '%s'", + series->name, + tag->name); } } + + qp_unpacker_ff_free(unpacker); + free(fn); return tag; + +fail2: + qp_unpacker_ff_free(unpacker); +fail1: + free(fn); +fail0: + siridb__tag_free(tag); + return NULL; +} + + + +static int tag__save_cb(siridb_series_t * series, qp_fpacker_t * fpacker) +{ + return qp_fadd_int64(fpacker, (int64_t) series->id); } /* @@ -117,12 +147,19 @@ siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn) */ int siridb_tag_save(siridb_tag_t * tag) { + int rc = -1; qp_fpacker_t * fpacker; - fpacker = qp_open(tag->fn, "w"); + char * fn = siridb_tag_fn(tag); + if (fn == NULL) + { + return rc; + } + + fpacker = qp_open(fn, "w"); if (fpacker == NULL) { - return -1; + goto fail0; } if (/* open a new array */ @@ -131,29 +168,17 @@ int siridb_tag_save(siridb_tag_t * tag) /* write the tag name */ qp_fadd_string(fpacker, tag->name)) { - qp_close(fpacker); - return -1; + goto fail1; } - /* TODO: maybe replace with walk */ - vec_t * series_list = imap_vec(tag->series); + rc = imap_walk(tag->series, (imap_cb) tag__save_cb, fpacker); - if (series_list != NULL) - { - siridb_series_t * series; - for (size_t i = 0; i < series_list->len; i++) - { - series = (siridb_series_t *) series_list->data[i]; - qp_fadd_int64(fpacker, (int64_t) series->id); - } - } +fail1: + rc = qp_close(fpacker) || rc; - if (qp_close(fpacker) || series_list == NULL) - { - return -1; - } - - return 0; +fail0: + free(fn); + return rc; } /* @@ -176,7 +201,7 @@ void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop) qp_add_string(packer, tag->name); break; case CLERI_GID_K_SERIES: - qp_add_int64(packer, (int64_t) tag->id); + qp_add_int64(packer, (int64_t) tag->n); break; } } @@ -188,7 +213,7 @@ int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond) case CLERI_GID_K_NAME: return cexpr_str_cmp(cond->operator, tag->name, cond->str); case CLERI_GID_K_SERIES: - return cexpr_int_cmp(cond->operator, (int64_t) tag->id, cond->int64); + return cexpr_int_cmp(cond->operator, (int64_t) tag->n, cond->int64); } log_critical("Unknown group property received: %d", cond->prop); @@ -218,17 +243,22 @@ void siridb__tag_free(siridb_tag_t * tag) log_debug("Free tag: '%s'", tag->name); #endif - if ((tag->flags & TAG_FLAG_CLEANUP) && unlink(tag->fn)) + if ((tag->flags & TAG_FLAG_CLEANUP)) { - log_critical("Cannot remove tag file: '%s'", tag->fn); + char * fn = siridb_tag_fn(tag); + if (!fn || unlink(fn)) + { + log_critical("Cannot remove tag (file): '%s'", tag->name); + } + free(fn); } else if ((tag->flags & TAG_FLAG_REQUIRE_SAVE) && siridb_tag_save(tag)) { - log_critical("Cannot save tag file: '%s'", tag->fn); + log_critical("Cannot save tag '%s'", tag->name); } free(tag->name); - free(tag->fn); + if (tag->series != NULL) { imap_free(tag->series, (imap_free_cb) siridb__series_decref); diff --git a/src/siri/db/tags.c b/src/siri/db/tags.c index 9c8a88b0..66b8ee71 100644 --- a/src/siri/db/tags.c +++ b/src/siri/db/tags.c @@ -22,10 +22,9 @@ static void TAGS_free(siridb_tags_t * tags); static int TAGS_load(siridb_t * siridb); -static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer); -static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup); -static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list); -static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag); +static int TAGS_dropped_series( + siridb_tag_t * tag, + void * data __attribute__((unused))); static int TAGS_nseries( siridb_tag_t * tag, void * data __attribute__((unused))); @@ -43,8 +42,8 @@ int siridb_tags_init(siridb_t * siridb) } siridb->tags->flags = 0; siridb->tags->ref = 1; + siridb->tags->next_id = 0; siridb->tags->tags = ct_new(); - siridb->tags->cleanup = vec_new(VEC_DEFAULT_SIZE); uv_mutex_init(&siridb->tags->mutex); @@ -54,8 +53,7 @@ int siridb_tags_init(siridb_t * siridb) siridb->dbpath, SIRIDB_TAGS_PATH) < 0 || siridb->tags->tags == NULL || - siridb->tags->cleanup == NULL || - TAGS_load(siridb)) + TAGS_load(siridb)) { TAGS_free(siridb->tags); siridb->tags = NULL; @@ -78,180 +76,92 @@ void siridb_tags_decref(siridb_tags_t * tags) } } -siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name) -{ - siridb_tag_t * tag = siridb_tag_new(name); - if (tag != NULL) - { - tag->name = strdup(name); - if (tag->name == NULL || ct_add(tags->tags, name, tag)) - { - siridb_tag_decref(tag); - tag = NULL; - } - } - return tag; -} - /* * Main thread. * - * Returns NULL and raises a signal in case of an error. + * Returns 0 if successful or -1 when the group is not found. + * (in case not found an error message is set) + * + * Note: when saving the groups to disk has failed, we log critical but + * the function still returns 0; */ -sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid) +int siridb_tags_drop_tag( + siridb_tags_t * tags, + const char * name, + char * err_msg) { - qp_packer_t * packer = sirinet_packer_new(8192); - int rc; + uv_mutex_lock(&tags->mutex); - if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN)) - { - return NULL; /* signal is raised */ - } + siridb_tag_t * tag = (siridb_tag_t *) ct_pop(tags->tags, name); - rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer); + uv_mutex_unlock(&tags->mutex); - if (rc) + if (tag == NULL) { - /* signal is raised when not 0 */ - qp_packer_free(packer); - return NULL; + snprintf(err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Tag '%s' does not exist.", + name); + return -1; } - return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS); + tag->flags |= TAG_FLAG_CLEANUP; + siridb_tag_decref(tag); + + return 0; } -/* - * This function will set and unset the mutex lock. - */ -void siridb_tags_cleanup(uv_async_t * handle) +siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name) { - siridb_tags_t * tags = (siridb_tags_t *) handle->data; - siridb_tag_t * tag, * rmtag; - - uv_mutex_lock(&tags->mutex); + siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++); - while (tags->cleanup->len) + if (tag != NULL) { - tag = (siridb_tag_t *) vec_pop(tags->cleanup); - - if (!tag->series->len && - (rmtag = (siridb_tag_t *) ct_pop(tags->tags, tag->name)) != NULL) + tag->name = strdup(name); + if (tag->name == NULL || ct_add(tags->tags, name, tag)) { -#ifdef DEBUG - assert(rmtag == tag && (tag->flags & TAG_FLAG_CLEANUP)); -#endif - siridb_tag_decref(rmtag); + siridb_tag_decref(tag); + tag = NULL; } } - - uv_mutex_unlock(&tags->mutex); - - siridb_tags_decref(tags); - - uv_close((uv_handle_t *) handle, (uv_close_cb) free); + return tag; } -ct_t * siridb_tags_lookup(siridb_tags_t * tags) -{ - ct_t * lookup = ct_new(); - if (lookup != NULL) - { - ct_values(tags->tags, (ct_val_cb) &TAGS_ctmap_update, lookup); - } - return lookup; -} /* * This function is called from the "Group" thread. */ void siridb_tags_dropped_series(siridb_tags_t * tags) { - siridb_tag_t * tag; - vec_t * tags_list; - uv_mutex_lock(&tags->mutex); - tags_list = vec_new(tags->tags->len); + ct_values(tags->tags, (ct_val_cb) TAGS_dropped_series, tags); tags->flags &= ~TAGS_FLAG_DROPPED_SERIES; - ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list); - uv_mutex_unlock(&tags->mutex); +} - while (tags_list->len) - { - tag = (siridb_tag_t *) vec_pop(tags_list); - - uv_mutex_lock(&tags->mutex); - - TAGS_dropped_series(tags, tag); - - siridb_tag_decref(tag); - - uv_mutex_unlock(&tags->mutex); - - usleep(10000); // 10ms - } - - vec_free(tags_list); - - if (tags->cleanup->len) - { - uv_async_t * cleanup = (uv_async_t *) malloc(sizeof(uv_async_t)); - - if (cleanup == NULL) - { - log_critical("Allocation error while creating cleanup task"); - return; - } - siridb_tags_incref(tags); - - cleanup->data = (void *) tags; +static int TAGS__save_cb( + siridb_tag_t * tag, + void * data __attribute__((unused))) +{ + siridb_tag_save(tag); + tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE; - uv_async_init(siri.loop, - cleanup, - (uv_async_cb) siridb_tags_cleanup); - uv_async_send(cleanup); - } + usleep(10000); // 10ms + return 0; } void siridb_tags_save(siridb_tags_t * tags) { - siridb_tag_t * tag; - vec_t * tags_list; - uv_mutex_lock(&tags->mutex); - tags_list = vec_new(tags->tags->len); + ct_values(tags->tags, (ct_val_cb) TAGS__save_cb, NULL); tags->flags &= ~TAGS_FLAG_REQUIRE_SAVE; - ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list); - uv_mutex_unlock(&tags->mutex); - - while (tags_list->len) - { - tag = (siridb_tag_t *) vec_pop(tags_list); - - if (tag->flags & TAG_FLAG_REQUIRE_SAVE) - { - uv_mutex_lock(&tags->mutex); - - siridb_tag_save(tag); - - tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE; - - uv_mutex_unlock(&tags->mutex); - } - - siridb_tag_decref(tag); - - usleep(10000); // 10ms - } - - vec_free(tags_list); } /* @@ -276,81 +186,34 @@ static int TAGS_nseries( /* * This function is called from the "Group" thread. */ -static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list) -{ - siridb_tag_incref(tag); - vec_append(tags_list, tag); - return 0; -} - -/* - * This function is called from the "Group" thread. - */ -static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag) +static int TAGS_dropped_series( + siridb_tag_t * tag, + void * data __attribute__((unused))) { vec_t * tag_series = imap_vec_pop(tag->series); - siridb_series_t * series, * s = NULL; + siridb_series_t * series; if (tag_series != NULL) { for (size_t i = 0; i < tag_series->len; i++) { series = (siridb_series_t *) tag_series->data[i]; - if (series->flags & SIRIDB_SERIES_IS_DROPPED) + if ((series->flags & SIRIDB_SERIES_IS_DROPPED) && + imap_pop(tag->series, series->id)) { - s = (siridb_series_t *) imap_pop(tag->series, series->id); - assert (s != NULL); - siridb_series_decref(s); + siridb_series_decref(series); + siridb_tags_set_require_save(tag->tags, tag); } } - if (s == NULL) - { - /* unchanged, we can put the list back */ - tag->series->vec = tag_series; - } - else - { - vec_free(tag_series); - - if (!tag->series->len && (~tag->flags & TAG_FLAG_CLEANUP)) - { - tag->flags |= TAG_FLAG_CLEANUP; - if (vec_append_safe(&tags->cleanup, tag)) - { - log_critical( - "Unexpected error while appending tag to " - "cleanup list"); - } - } - } + vec_free(tag_series); } - return 0; -} + usleep(10000); // 10ms -static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup) -{ - if (tag->series->len) - { - volatile uintptr_t iptr = (uint32_t) tag->series->len; - return ct_add(lookup, tag->name, (uint32_t *) iptr); - } return 0; } -/* - * Main thread. - */ -static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer) -{ - int rc = 0; - rc += qp_add_type(packer, QP_ARRAY2); - rc += qp_add_string_term(packer, tag->name); - rc += qp_add_int64(packer, (int64_t) tag->series->len); - return rc; -} - static int TAGS_load(siridb_t * siridb) { struct stat st = {0}; @@ -432,16 +295,8 @@ static int TAGS_load(siridb_t * siridb) static void TAGS_free(siridb_tags_t * tags) { -#ifdef DEBUG - log_debug("Free tags"); -#endif free(tags->path); - if (tags->cleanup != NULL) - { - vec_free(tags->cleanup); - } - uv_mutex_lock(&tags->mutex); if (tags->tags != NULL) diff --git a/src/siri/net/protocol.c b/src/siri/net/protocol.c index 5c685d2a..0d50d335 100644 --- a/src/siri/net/protocol.c +++ b/src/siri/net/protocol.c @@ -86,7 +86,6 @@ const char * sirinet_bproto_client_str(bproto_client_t n) case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE"; case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE"; case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE"; - case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS"; default: sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n); return protocol_str;