#include <imap/imap.h>
#include <siri/db/db.h>
-siridb_tag_t * siridb_tag_new(uint32_t id, const char * tags_path);
+siridb_tag_t * siridb_tag_new(siridb_tags_t * tags, uint64_t id);
void siridb__tag_decref(siridb_tag_t * tag);
void siridb__tag_free(siridb_tag_t * tag);
int siridb_tag_is_valid_fn(const char * fn);
-siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn);
+siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * _fn);
int siridb_tag_save(siridb_tag_t * tag);
+char * siridb_tag_fn(siridb_tag_t * tag);
int siridb_tag_is_remote_prop(uint32_t prop);
void siridb_tag_prop(siridb_tag_t * tag, qp_packer_t * packer, int prop);
int siridb_tag_cexpr_cb(siridb_tag_t * tag, cexpr_condition_t * cond);
uint16_t ref;
uint16_t flags;
uint32_t n;
+ uint64_t id;
char * name;
+ siridb_tags_t * tags;
imap_t * series;
};
{
uint16_t flags;
uint16_t ref;
+ uint32_t pad0;
+ uint64_t next_id;
char * path;
ct_t * tags;
- vec_t * cleanup;
uv_mutex_t mutex;
};
int siridb_tags_init(siridb_t * siridb);
void siridb_tags_incref(siridb_tags_t * tags);
void siridb_tags_decref(siridb_tags_t * tags);
+int siridb_tags_drop_tag(
+ siridb_tags_t * tags,
+ const char * name,
+ char * err_msg);
siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name);
-sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid);
-ct_t * siridb_tags_lookup(siridb_tags_t * tags);
-void siridb_tags_cleanup(uv_async_t * handle);
void siridb_tags_dropped_series(siridb_tags_t * tags);
void siridb_tags_save(siridb_tags_t * tags);
void siridb_tags_init_nseries(siridb_tags_t * tags);
BPROTO_DISABLE_BACKUP_MODE, /* empty */
BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */
BPROTO_DROP_DATABASE, /* empty */
- BPROTO_REQ_TAGS, /* empty */
} bproto_client_t;
/*
#define SIRIDB_VERSION_MAJOR 2
#define SIRIDB_VERSION_MINOR 0
-#define SIRIDB_VERSION_PATCH 37
+#define SIRIDB_VERSION_PATCH 38
/*
* Use SIRIDB_VERSION_PRE_RELEASE for alpha release versions.
* Note that debian alpha packages should use versions like this:
* 2.0.34-0alpha0
*/
-#define SIRIDB_VERSION_PRE_RELEASE ""
+#define SIRIDB_VERSION_PRE_RELEASE "-alpha-0"
#ifndef NDEBUG
#define SIRIDB_VERSION_BUILD_RELEASE "+debug"
#define MSG_ERR_SERVER_ADDRESS \
"Its only possible to change a servers address or port when the server " \
"is not connected."
-
+#define MSG_SUCCESS_TAG \
+ "Successfully tagged %zu series."
+#define MSG_SUCCESS_UNTAG \
+ "Successfully unagged %zu series."
+#define MSG_SUCCESS_DROP_TAG \
+ "Successfully dropped tag '%s'."
static void enter_access_expr(uv_async_t * handle);
static void enter_alter_group(uv_async_t * handle);
static void enter_series_setopr(uv_async_t * handle);
static void enter_tag_series(uv_async_t * handle);
static void enter_timeit_stmt(uv_async_t * handle);
+static void enter_untag_series(uv_async_t * handle);
static void enter_where_xxx(uv_async_t * handle);
static void enter_xxx_columns(uv_async_t * handle);
static void exit_drop_series(uv_async_t * handle);
static void exit_drop_server(uv_async_t * handle);
static void exit_drop_shards(uv_async_t * handle);
+static void exit_drop_tag(uv_async_t * handle);
static void exit_drop_user(uv_async_t * handle);
static void exit_grant_user(uv_async_t * handle);
static void exit_help_xxx(uv_async_t * handle);
static void exit_set_tee_pipe_name(uv_async_t * handle);
static void exit_set_timezone(uv_async_t * handle);
static void exit_show_stmt(uv_async_t * handle);
+static void exit_tag_series(uv_async_t * handle);
static void exit_timeit_stmt(uv_async_t * handle);
+static void exit_untag_series(uv_async_t * handle);
/* async loop functions */
static void async_count_series(uv_async_t * handle);
SIRIDB_NODE_ENTER[CLERI_GID_TAG_COLUMNS] = enter_xxx_columns;
SIRIDB_NODE_ENTER[CLERI_GID_TAG_SERIES] = enter_tag_series;
SIRIDB_NODE_ENTER[CLERI_GID_TIMEIT_STMT] = enter_timeit_stmt;
+ SIRIDB_NODE_ENTER[CLERI_GID_UNTAG_SERIES] = enter_untag_series;
SIRIDB_NODE_ENTER[CLERI_GID_USER_COLUMNS] = enter_xxx_columns;
SIRIDB_NODE_ENTER[CLERI_GID_WHERE_GROUP] = enter_where_xxx;
SIRIDB_NODE_ENTER[CLERI_GID_WHERE_POOL] = enter_where_xxx;
SIRIDB_NODE_EXIT[CLERI_GID_DROP_SERIES] = exit_drop_series;
SIRIDB_NODE_EXIT[CLERI_GID_DROP_SERVER] = exit_drop_server;
SIRIDB_NODE_EXIT[CLERI_GID_DROP_SHARDS] = exit_drop_shards;
+ SIRIDB_NODE_EXIT[CLERI_GID_DROP_TAG] = exit_drop_tag;
SIRIDB_NODE_EXIT[CLERI_GID_DROP_USER] = exit_drop_user;
SIRIDB_NODE_EXIT[CLERI_GID_GRANT_USER] = exit_grant_user;
SIRIDB_NODE_EXIT[CLERI_GID_LIST_GROUPS] = exit_list_groups;
SIRIDB_NODE_EXIT[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name;
SIRIDB_NODE_EXIT[CLERI_GID_SET_TIMEZONE] = exit_set_timezone;
SIRIDB_NODE_EXIT[CLERI_GID_SHOW_STMT] = exit_show_stmt;
+ SIRIDB_NODE_EXIT[CLERI_GID_TAG_SERIES] = exit_tag_series;
SIRIDB_NODE_EXIT[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt;
+ SIRIDB_NODE_EXIT[CLERI_GID_UNTAG_SERIES] = exit_untag_series;
for (i = HELP_OFFSET; i < HELP_OFFSET + HELP_COUNT; i++)
{
q_alter->tp = QUERY_ALTER_SERIES;
MASTER_CHECK_ACCESSIBLE(siridb)
- MASTER_CHECK_VERSION(siridb, "2.0.19")
+ MASTER_CHECK_VERSION(siridb, "2.0.38")
cleri_node_t * tag_node =
query->nodes->node->children->next->node;
SIRIPARSER_NEXT_NODE
}
+static void enter_untag_series(uv_async_t * handle)
+{
+ siridb_query_t * query = (siridb_query_t *) handle->data;
+ siridb_t * siridb = query->client->siridb;
+ query_alter_t * q_alter = (query_alter_t *) query->data;
+
+ q_alter->tp = QUERY_ALTER_SERIES;
+
+ MASTER_CHECK_ACCESSIBLE(siridb)
+ MASTER_CHECK_VERSION(siridb, "2.0.38")
+
+ cleri_node_t * tag_node =
+ query->nodes->node->children->next->node;
+ siridb_tag_t * tag;
+
+ char name[tag_node->len - 1];
+ xstr_extract_string(name, tag_node->str, tag_node->len);
+
+ tag = ct_get(siridb->tags->tags, name);
+
+ if (tag == NULL)
+ {
+ snprintf(query->err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Cannot find tag: '%s'",
+ name);
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ return;
+ }
+
+ uv_mutex_lock(&siridb->tags->mutex);
+
+ q_alter->n = q_alter->series_map->len;
+
+ imap_difference_ref(
+ tag->series,
+ q_alter->series_map,
+ (imap_free_cb) &siridb__series_decref);
+
+ siridb_tags_set_require_save(siridb->tags, tag);
+
+ uv_mutex_unlock(&siridb->tags->mutex);
+
+ q_alter->series_map = NULL;
+
+ QP_ADD_SUCCESS
+
+ if (IS_MASTER)
+ {
+ siridb_query_forward(
+ handle,
+ SIRIDB_QUERY_FWD_UPDATE,
+ (sirinet_promises_cb) on_tag_response,
+ 0);
+ }
+ else
+ {
+ qp_add_int64(query->packer, q_alter->n);
+ SIRIPARSER_ASYNC_NEXT_NODE
+ }
+}
+
static void enter_where_xxx(uv_async_t * handle)
{
siridb_query_t * query = handle->data;
}
}
+static void exit_drop_tag(uv_async_t * handle)
+{
+ siridb_query_t * query = handle->data;
+ siridb_t * siridb = query->client->siridb;
+
+ MASTER_CHECK_ACCESSIBLE(siridb)
+
+ cleri_node_t * tag_node =
+ query->nodes->node->children->next->node;
+
+ char name[tag_node->len - 1];
+
+ xstr_extract_string(name, tag_node->str, tag_node->len);
+
+ if (siridb_tags_drop_tag(siridb->tags, name, query->err_msg))
+ {
+ siridb_query_send_error(handle, CPROTO_ERR_QUERY);
+ }
+ else
+ {
+ QP_ADD_SUCCESS
+ log_info(MSG_SUCCESS_DROP_TAG, name);
+ qp_add_fmt_safe(query->packer, MSG_SUCCESS_DROP_TAG, name);
+
+ if (IS_MASTER)
+ {
+ siridb_query_forward(
+ handle,
+ SIRIDB_QUERY_FWD_UPDATE,
+ (sirinet_promises_cb) on_update_xxx_response,
+ 0);
+ }
+ else
+ {
+ SIRIPARSER_ASYNC_NEXT_NODE
+ }
+ }
+}
+
static void exit_drop_user(uv_async_t * handle)
{
siridb_query_t * query = handle->data;
SIRIPARSER_ASYNC_NEXT_NODE
}
+static void exit_tag_series(uv_async_t * handle)
+{
+ siridb_query_t * query = (siridb_query_t *) handle->data;
+
+ if (IS_MASTER)
+ {
+ qp_add_fmt_safe(
+ query->packer,
+ MSG_SUCCESS_TAG,
+ ((query_alter_t *) query->data)->n);
+ }
+
+ SIRIPARSER_ASYNC_NEXT_NODE
+}
+
static void exit_timeit_stmt(uv_async_t * handle)
{
siridb_query_t * query = handle->data;
SIRIPARSER_ASYNC_NEXT_NODE
}
+static void exit_untag_series(uv_async_t * handle)
+{
+ siridb_query_t * query = (siridb_query_t *) handle->data;
+
+ if (IS_MASTER)
+ {
+ qp_add_fmt_safe(
+ query->packer,
+ MSG_SUCCESS_UNTAG,
+ ((query_alter_t *) query->data)->n);
+ }
+
+ SIRIPARSER_ASYNC_NEXT_NODE
+}
+
/******************************************************************************
* Async loop functions.
*****************************************************************************/
/*
* Returns tag when successful or NULL in case of an error.
*/
-siridb_tag_t * siridb_tag_new(char * name)
+siridb_tag_t * siridb_tag_new(siridb_tags_t * tags, uint64_t id)
{
siridb_tag_t * tag = (siridb_tag_t *) malloc(sizeof(siridb_tag_t));
if (tag != NULL)
{
tag->ref = 1;
tag->flags = 0;
- tag->name = name;
+ tag->id = id;
+ tag->name = NULL;
+ tag->tags = tags;
tag->series = imap_new();
}
return tag;
}
+char * siridb_tag_fn(siridb_tag_t * tag)
+{
+ char * fn;
+ if (asprintf(&fn, "%s%09"PRIx64".tag", tag->tags->path, tag->id) < 0)
+ {
+ return NULL;
+ }
+ return fn;
+}
+
/*
* Returns tag when successful or NULL in case of an error.
*/
-siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * fn)
+siridb_tag_t * siridb_tag_load(siridb_t * siridb, const char * _fn)
{
- siridb_tag_t * tag = siridb_tag_new(
- (uint32_t) atoll(fn),
- siridb->tags->path);
- if (tag != NULL)
+ char * fn;
+ qp_obj_t qp_tn;
+ qp_obj_t qp_series_id;
+ uint64_t series_id;
+ siridb_series_t * series;
+ qp_unpacker_t * unpacker;
+ siridb_tag_t * tag = siridb_tag_new(siridb->tags, (uint32_t) atoll(_fn));
+
+ if (tag == NULL)
+ {
+ log_critical("Memory allocation error");
+ return NULL;
+ }
+
+ fn = siridb_tag_fn(tag);
+ if (fn == NULL)
{
- qp_unpacker_t * unpacker = qp_unpacker_ff(tag->fn);
- if (unpacker == NULL)
+ log_critical("Memory allocation error");
+ goto fail0;
+ }
+
+ unpacker = qp_unpacker_ff(fn);
+ if (unpacker == NULL)
+ {
+ log_critical("Cannot open tag file for reading: %s", fn);
+ goto fail1;
+ }
+
+
+ if (!qp_is_array(qp_next(unpacker, NULL)) ||
+ qp_next(unpacker, &qp_tn) != QP_RAW ||
+ (tag->name = strndup((const char *) qp_tn.via.raw, qp_tn.len)) == NULL)
+ {
+ /* or a memory allocation error, but the same result */
+ log_critical("Expected an array with a tag name in file: %s", fn);
+ goto fail2;
+ }
+
+
+ while (qp_next(unpacker, &qp_series_id) == QP_INT64)
+ {
+ series_id = (uint64_t) qp_series_id.via.int64;
+ series = imap_get(siridb->series_map, series_id);
+
+ if (series == NULL)
{
- log_critical("cannot open tag file for reading: %s", tag->fn);
- siridb__tag_free(tag);
- tag = NULL;
+ siridb_tags_set_require_save(siridb->tags, tag);
+
+ log_error(
+ "Cannot find series id %" PRId64
+ " which was tagged with '%s'",
+ qp_series_id.via.int64,
+ tag->name);
+ }
+ else if (imap_add(tag->series, series_id, series) == 0)
+ {
+ siridb_series_incref(series);
}
else
{
- qp_obj_t qp_tn;
-
- if (!qp_is_array(qp_next(unpacker, NULL)) ||
- qp_next(unpacker, &qp_tn) != QP_RAW ||
- (tag->name = strndup(
- (const char *) qp_tn.via.raw,
- qp_tn.len)) == NULL)
- {
- /* or a memory allocation error, but the same result */
- log_critical(
- "expected an array with a tag name in file: %s",
- tag->fn);
- siridb__tag_free(tag);
- tag = NULL;
- }
- else
- {
- qp_obj_t qp_series_id;
- uint64_t series_id;
- siridb_series_t * series;
-
- while (qp_next(unpacker, &qp_series_id) == QP_INT64)
- {
- series_id = (uint64_t) qp_series_id.via.int64;
- series = imap_get(siridb->series_map, series_id);
-
- if (series == NULL)
- {
- siridb_tags_set_require_save(siridb->tags, tag);
-
- log_error(
- "cannot find series id %" PRId64
- " which was tagged with '%s'",
- qp_series_id.via.int64,
- tag->name);
- }
- else if (imap_add(tag->series, series_id, series) == 0)
- {
- siridb_series_incref(series);
- }
- else
- {
- log_critical(
- "cannot add series '%s' to tag '%s'",
- series->name,
- tag->name);
- }
- }
- }
- qp_unpacker_ff_free(unpacker);
+ log_error(
+ "Cannot add series '%s' to tag '%s'",
+ series->name,
+ tag->name);
}
}
+
+ qp_unpacker_ff_free(unpacker);
+ free(fn);
return tag;
+
+fail2:
+ qp_unpacker_ff_free(unpacker);
+fail1:
+ free(fn);
+fail0:
+ siridb__tag_free(tag);
+ return NULL;
+}
+
+
+
+static int tag__save_cb(siridb_series_t * series, qp_fpacker_t * fpacker)
+{
+ return qp_fadd_int64(fpacker, (int64_t) series->id);
}
/*
*/
int siridb_tag_save(siridb_tag_t * tag)
{
+ int rc = -1;
qp_fpacker_t * fpacker;
- fpacker = qp_open(tag->fn, "w");
+ char * fn = siridb_tag_fn(tag);
+ if (fn == NULL)
+ {
+ return rc;
+ }
+
+ fpacker = qp_open(fn, "w");
if (fpacker == NULL)
{
- return -1;
+ goto fail0;
}
if (/* open a new array */
/* write the tag name */
qp_fadd_string(fpacker, tag->name))
{
- qp_close(fpacker);
- return -1;
+ goto fail1;
}
- /* TODO: maybe replace with walk */
- vec_t * series_list = imap_vec(tag->series);
+ rc = imap_walk(tag->series, (imap_cb) tag__save_cb, fpacker);
- if (series_list != NULL)
- {
- siridb_series_t * series;
- for (size_t i = 0; i < series_list->len; i++)
- {
- series = (siridb_series_t *) series_list->data[i];
- qp_fadd_int64(fpacker, (int64_t) series->id);
- }
- }
+fail1:
+ rc = qp_close(fpacker) || rc;
- if (qp_close(fpacker) || series_list == NULL)
- {
- return -1;
- }
-
- return 0;
+fail0:
+ free(fn);
+ return rc;
}
/*
qp_add_string(packer, tag->name);
break;
case CLERI_GID_K_SERIES:
- qp_add_int64(packer, (int64_t) tag->id);
+ qp_add_int64(packer, (int64_t) tag->n);
break;
}
}
case CLERI_GID_K_NAME:
return cexpr_str_cmp(cond->operator, tag->name, cond->str);
case CLERI_GID_K_SERIES:
- return cexpr_int_cmp(cond->operator, (int64_t) tag->id, cond->int64);
+ return cexpr_int_cmp(cond->operator, (int64_t) tag->n, cond->int64);
}
log_critical("Unknown group property received: %d", cond->prop);
log_debug("Free tag: '%s'", tag->name);
#endif
- if ((tag->flags & TAG_FLAG_CLEANUP) && unlink(tag->fn))
+ if ((tag->flags & TAG_FLAG_CLEANUP))
{
- log_critical("Cannot remove tag file: '%s'", tag->fn);
+ char * fn = siridb_tag_fn(tag);
+ if (!fn || unlink(fn))
+ {
+ log_critical("Cannot remove tag (file): '%s'", tag->name);
+ }
+ free(fn);
}
else if ((tag->flags & TAG_FLAG_REQUIRE_SAVE) && siridb_tag_save(tag))
{
- log_critical("Cannot save tag file: '%s'", tag->fn);
+ log_critical("Cannot save tag '%s'", tag->name);
}
free(tag->name);
- free(tag->fn);
+
if (tag->series != NULL)
{
imap_free(tag->series, (imap_free_cb) siridb__series_decref);
static void TAGS_free(siridb_tags_t * tags);
static int TAGS_load(siridb_t * siridb);
-static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer);
-static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup);
-static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list);
-static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag);
+static int TAGS_dropped_series(
+ siridb_tag_t * tag,
+ void * data __attribute__((unused)));
static int TAGS_nseries(
siridb_tag_t * tag,
void * data __attribute__((unused)));
}
siridb->tags->flags = 0;
siridb->tags->ref = 1;
+ siridb->tags->next_id = 0;
siridb->tags->tags = ct_new();
- siridb->tags->cleanup = vec_new(VEC_DEFAULT_SIZE);
uv_mutex_init(&siridb->tags->mutex);
siridb->dbpath,
SIRIDB_TAGS_PATH) < 0 ||
siridb->tags->tags == NULL ||
- siridb->tags->cleanup == NULL ||
- TAGS_load(siridb))
+ TAGS_load(siridb))
{
TAGS_free(siridb->tags);
siridb->tags = NULL;
}
}
-siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
-{
- siridb_tag_t * tag = siridb_tag_new(name);
- if (tag != NULL)
- {
- tag->name = strdup(name);
- if (tag->name == NULL || ct_add(tags->tags, name, tag))
- {
- siridb_tag_decref(tag);
- tag = NULL;
- }
- }
- return tag;
-}
-
/*
* Main thread.
*
- * Returns NULL and raises a signal in case of an error.
+ * Returns 0 if successful or -1 when the group is not found.
+ * (in case not found an error message is set)
+ *
+ * Note: when saving the groups to disk has failed, we log critical but
+ * the function still returns 0;
*/
-sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid)
+int siridb_tags_drop_tag(
+ siridb_tags_t * tags,
+ const char * name,
+ char * err_msg)
{
- qp_packer_t * packer = sirinet_packer_new(8192);
- int rc;
+ uv_mutex_lock(&tags->mutex);
- if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN))
- {
- return NULL; /* signal is raised */
- }
+ siridb_tag_t * tag = (siridb_tag_t *) ct_pop(tags->tags, name);
- rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer);
+ uv_mutex_unlock(&tags->mutex);
- if (rc)
+ if (tag == NULL)
{
- /* signal is raised when not 0 */
- qp_packer_free(packer);
- return NULL;
+ snprintf(err_msg,
+ SIRIDB_MAX_SIZE_ERR_MSG,
+ "Tag '%s' does not exist.",
+ name);
+ return -1;
}
- return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS);
+ tag->flags |= TAG_FLAG_CLEANUP;
+ siridb_tag_decref(tag);
+
+ return 0;
}
-/*
- * This function will set and unset the mutex lock.
- */
-void siridb_tags_cleanup(uv_async_t * handle)
+siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
{
- siridb_tags_t * tags = (siridb_tags_t *) handle->data;
- siridb_tag_t * tag, * rmtag;
-
- uv_mutex_lock(&tags->mutex);
+ siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++);
- while (tags->cleanup->len)
+ if (tag != NULL)
{
- tag = (siridb_tag_t *) vec_pop(tags->cleanup);
-
- if (!tag->series->len &&
- (rmtag = (siridb_tag_t *) ct_pop(tags->tags, tag->name)) != NULL)
+ tag->name = strdup(name);
+ if (tag->name == NULL || ct_add(tags->tags, name, tag))
{
-#ifdef DEBUG
- assert(rmtag == tag && (tag->flags & TAG_FLAG_CLEANUP));
-#endif
- siridb_tag_decref(rmtag);
+ siridb_tag_decref(tag);
+ tag = NULL;
}
}
-
- uv_mutex_unlock(&tags->mutex);
-
- siridb_tags_decref(tags);
-
- uv_close((uv_handle_t *) handle, (uv_close_cb) free);
+ return tag;
}
-ct_t * siridb_tags_lookup(siridb_tags_t * tags)
-{
- ct_t * lookup = ct_new();
- if (lookup != NULL)
- {
- ct_values(tags->tags, (ct_val_cb) &TAGS_ctmap_update, lookup);
- }
- return lookup;
-}
/*
* This function is called from the "Group" thread.
*/
void siridb_tags_dropped_series(siridb_tags_t * tags)
{
- siridb_tag_t * tag;
- vec_t * tags_list;
-
uv_mutex_lock(&tags->mutex);
- tags_list = vec_new(tags->tags->len);
+ ct_values(tags->tags, (ct_val_cb) TAGS_dropped_series, tags);
tags->flags &= ~TAGS_FLAG_DROPPED_SERIES;
- ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list);
-
uv_mutex_unlock(&tags->mutex);
+}
- while (tags_list->len)
- {
- tag = (siridb_tag_t *) vec_pop(tags_list);
-
- uv_mutex_lock(&tags->mutex);
-
- TAGS_dropped_series(tags, tag);
-
- siridb_tag_decref(tag);
-
- uv_mutex_unlock(&tags->mutex);
-
- usleep(10000); // 10ms
- }
-
- vec_free(tags_list);
-
- if (tags->cleanup->len)
- {
- uv_async_t * cleanup = (uv_async_t *) malloc(sizeof(uv_async_t));
-
- if (cleanup == NULL)
- {
- log_critical("Allocation error while creating cleanup task");
- return;
- }
- siridb_tags_incref(tags);
-
- cleanup->data = (void *) tags;
+static int TAGS__save_cb(
+ siridb_tag_t * tag,
+ void * data __attribute__((unused)))
+{
+ siridb_tag_save(tag);
+ tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE;
- uv_async_init(siri.loop,
- cleanup,
- (uv_async_cb) siridb_tags_cleanup);
- uv_async_send(cleanup);
- }
+ usleep(10000); // 10ms
+ return 0;
}
void siridb_tags_save(siridb_tags_t * tags)
{
- siridb_tag_t * tag;
- vec_t * tags_list;
-
uv_mutex_lock(&tags->mutex);
- tags_list = vec_new(tags->tags->len);
+ ct_values(tags->tags, (ct_val_cb) TAGS__save_cb, NULL);
tags->flags &= ~TAGS_FLAG_REQUIRE_SAVE;
- ct_values(tags->tags, (ct_val_cb) TAGS_to_vec_cb, tags_list);
-
uv_mutex_unlock(&tags->mutex);
-
- while (tags_list->len)
- {
- tag = (siridb_tag_t *) vec_pop(tags_list);
-
- if (tag->flags & TAG_FLAG_REQUIRE_SAVE)
- {
- uv_mutex_lock(&tags->mutex);
-
- siridb_tag_save(tag);
-
- tag->flags &= ~ TAG_FLAG_REQUIRE_SAVE;
-
- uv_mutex_unlock(&tags->mutex);
- }
-
- siridb_tag_decref(tag);
-
- usleep(10000); // 10ms
- }
-
- vec_free(tags_list);
}
/*
/*
* This function is called from the "Group" thread.
*/
-static int TAGS_to_vec_cb(siridb_tag_t * tag, vec_t * tags_list)
-{
- siridb_tag_incref(tag);
- vec_append(tags_list, tag);
- return 0;
-}
-
-/*
- * This function is called from the "Group" thread.
- */
-static int TAGS_dropped_series(siridb_tags_t * tags, siridb_tag_t * tag)
+static int TAGS_dropped_series(
+ siridb_tag_t * tag,
+ void * data __attribute__((unused)))
{
vec_t * tag_series = imap_vec_pop(tag->series);
- siridb_series_t * series, * s = NULL;
+ siridb_series_t * series;
if (tag_series != NULL)
{
for (size_t i = 0; i < tag_series->len; i++)
{
series = (siridb_series_t *) tag_series->data[i];
- if (series->flags & SIRIDB_SERIES_IS_DROPPED)
+ if ((series->flags & SIRIDB_SERIES_IS_DROPPED) &&
+ imap_pop(tag->series, series->id))
{
- s = (siridb_series_t *) imap_pop(tag->series, series->id);
- assert (s != NULL);
- siridb_series_decref(s);
+ siridb_series_decref(series);
+ siridb_tags_set_require_save(tag->tags, tag);
}
}
- if (s == NULL)
- {
- /* unchanged, we can put the list back */
- tag->series->vec = tag_series;
- }
- else
- {
- vec_free(tag_series);
-
- if (!tag->series->len && (~tag->flags & TAG_FLAG_CLEANUP))
- {
- tag->flags |= TAG_FLAG_CLEANUP;
- if (vec_append_safe(&tags->cleanup, tag))
- {
- log_critical(
- "Unexpected error while appending tag to "
- "cleanup list");
- }
- }
- }
+ vec_free(tag_series);
}
- return 0;
-}
+ usleep(10000); // 10ms
-static int TAGS_ctmap_update(siridb_tag_t * tag, ct_t * lookup)
-{
- if (tag->series->len)
- {
- volatile uintptr_t iptr = (uint32_t) tag->series->len;
- return ct_add(lookup, tag->name, (uint32_t *) iptr);
- }
return 0;
}
-/*
- * Main thread.
- */
-static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer)
-{
- int rc = 0;
- rc += qp_add_type(packer, QP_ARRAY2);
- rc += qp_add_string_term(packer, tag->name);
- rc += qp_add_int64(packer, (int64_t) tag->series->len);
- return rc;
-}
-
static int TAGS_load(siridb_t * siridb)
{
struct stat st = {0};
static void TAGS_free(siridb_tags_t * tags)
{
-#ifdef DEBUG
- log_debug("Free tags");
-#endif
free(tags->path);
- if (tags->cleanup != NULL)
- {
- vec_free(tags->cleanup);
- }
-
uv_mutex_lock(&tags->mutex);
if (tags->tags != NULL)
case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
- case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
default:
sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
return protocol_str;