void siridb_tags_init_nseries(siridb_tags_t * tags);
sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid);
sirinet_pkg_t * siridb_tags_series(siridb_series_t * series);
+sirinet_pkg_t * siridb_tags_empty(siridb_tags_t * tags);
#define siridb_tags_set_require_save(__tags, __tag) \
BPROTO_DROP_DATABASE, /* empty */
BPROTO_REQ_TAGS, /* empty */
BPROTO_SERIES_TAGS, /* [series name, tag name, ...] */
+ BPROTO_EMPTY_TAGS, /* [tag name, tag name, ...] */
} bproto_client_t;
/*
BPROTO_ACK_TEE_PIPE_NAME, /* empty */
BPROTO_ACK_DROP_DATABASE, /* empty */
BPROTO_RES_TAGS, /* [[name, series], ...] */
- BPROTO_ACK_SERIES_TAGS /* empty */
+ BPROTO_ACK_SERIES_TAGS, /* empty */
+ BPROTO_ACK_EMPTY_TAGS, /* empty */
} bproto_server_t;
#define sirinet_protocol_is_error(tp) (tp >= 64 && tp < 192)
self.assertEqual(
res, {"success_msg": "Successfully tagged 13 series."})
+ res = await self.client0.query('''
+ alter series /empty/ tag `EMPTY`
+ ''')
+ self.assertEqual(
+ res, {"success_msg": "Successfully tagged 0 series."})
+
await asyncio.sleep(3.0)
res = await self.client0.query('''
self.assertEqual(
res, {"success_msg": "Successfully tagged 3 series."})
- res = await self.client0.query('''
- alter series /empty/ tag `EMPTY`
- ''')
- self.assertEqual(
- res, {"success_msg": "Successfully tagged 0 series."})
-
await self.client0.query('''
alter series 'variance', 'pvariance' untag `OTHER`
''')
{
sleep(GROUPS_LOOP_SLEEP);
- if (siridb_is_reindexing(siridb) && (++mod_test % GROUPS_LOOP_DEEP))
+ if (groups->status == GROUPS_STOPPING)
+ break;
+
+ if ((siridb_is_reindexing(siridb) ||
+ siridb_server_self_synchronizing(siridb->server)) &&
+ (++mod_test % GROUPS_LOOP_DEEP))
{
+ /* less frequently when re-indexing or synchronizing */
continue;
}
}
}
+/*
+ * Call-back function: sirinet_promise_cb
+ */
+static void INITSYNC_on_empty_tags_response(
+ sirinet_promise_t * promise,
+ sirinet_pkg_t * pkg,
+ int status)
+{
+ if (status)
+ {
+ log_error("Error while sending empty tags (%d)", status);
+ }
+ else if (sirinet_protocol_is_error(pkg->tp))
+ {
+ log_error(
+ "Error occurred while processing data on the new server: "
+ "(response type: %u)", pkg->tp);
+ }
+
+ sirinet_promise_decref(promise);
+}
+
/*
* Read the next series id and truncate the synchronization file to remove
* the last synchronization id.
}
else
{
+ sirinet_pkg_t * pkg;
+
+ /* send empty tags if required */
+ pkg = siridb_tags_empty(siridb->tags);
+ if (pkg)
+ {
+ if (siridb_server_send_pkg(
+ siridb->replica,
+ pkg,
+ INITSYNC_TIMEOUT,
+ (sirinet_promise_cb) INITSYNC_on_empty_tags_response,
+ NULL,
+ 0))
+ {
+ free(pkg);
+ }
+ }
+
log_info("Finished initial replica synchronization");
INITSYNC_unlink(initsync);
siridb_initsync_free(&siridb->replicate->initsync);
return rc;
}
+/*
+ * Call-back function: sirinet_promise_cb
+ */
+static void REINDEX_on_empty_tags_response(
+ sirinet_promise_t * promise,
+ sirinet_pkg_t * pkg,
+ int status)
+{
+ if (status)
+ {
+ log_error("Error while sending empty tags (%d)", status);
+ }
+ else if (sirinet_protocol_is_error(pkg->tp))
+ {
+ log_error(
+ "Error occurred while processing data on the new server: "
+ "(response type: %u)", pkg->tp);
+ }
+
+ sirinet_promise_decref(promise);
+}
+
/*
* This function can raise a SIGNAL
*/
break;
case NEXT_SERIES_END:
+ {
+ sirinet_pkg_t * pkg;
+
+ /* send empty tags if required */
+ pkg = siridb_tags_empty(siridb->tags);
+ if (pkg)
+ {
+ if (siridb_server_send_pkg(
+ siridb->reindex->server,
+ pkg,
+ REINDEX_TIMEOUT,
+ (sirinet_promise_cb) REINDEX_on_empty_tags_response,
+ NULL,
+ 0))
+ {
+ free(pkg);
+ }
+ }
+
/* update and send the flags */
siridb->server->flags &= ~SERVER_FLAG_REINDEXING;
siridb_servers_send_flags(siridb->servers);
siri_optimize_continue();
break;
-
+ }
case NEXT_SERIES_ERR:
break; /* signal is raised */
}
/*
* Main thread.
*
- * Returns NULL and raises a signal in case of an error.
+ * Returns NULL in case of an error or no tags.
*/
sirinet_pkg_t * siridb_tags_series(siridb_series_t * series)
{
return sirinet_packer2pkg(w.packer, 0, BPROTO_SERIES_TAGS);
}
+/*
+ * Main thread.
+ */
+static int TAGS_empty_tag_pkg(siridb_tag_t * tag, qp_packer_t * packer)
+{
+ return tag->series->len == 0
+ ? qp_add_string(packer, tag->name) == 0
+ : 0;
+}
+
+/*
+ * Main thread.
+ *
+ * Returns NULL in case of an error or no empty tags.
+ */
+sirinet_pkg_t * siridb_tags_empty(siridb_tags_t * tags)
+{
+ qp_packer_t * packer = sirinet_packer_new(1024);
+
+ if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN))
+ {
+ return NULL;
+ }
+
+ if (ct_values(
+ tags->tags,
+ (ct_val_cb) TAGS_empty_tag_pkg,
+ packer) == 0)
+ {
+ free(packer);
+ return NULL;
+ }
+
+ return sirinet_packer2pkg(packer, 0, BPROTO_EMPTY_TAGS);
+}
+
/*
* Main thread.
*/
sirinet_pkg_t * pkg);
static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
+static void on_empty_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static uv_loop_t * loop = NULL;
static struct sockaddr_storage server_addr;
case BPROTO_SERIES_TAGS:
on_series_tags(client, pkg);
break;
+ case BPROTO_EMPTY_TAGS:
+ on_empty_tags(client, pkg);
+ break;
}
}
sirinet_pkg_send(client, package);
}
}
+
+static void on_empty_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg)
+{
+ SERVER_CHECK_AUTHENTICATED(client, server)
+
+ sirinet_pkg_t * package = NULL;
+ siridb_t * siridb = client->siridb;
+
+ LOGC("on empty tags...");
+
+ if (~siridb->server->flags & SERVER_FLAG_RUNNING)
+ {
+ log_error("Cannot tag series because of having status %d",
+ siridb->server->flags);
+
+ package = sirinet_pkg_new(
+ pkg->pid,
+ 0,
+ BPROTO_ERR_DROP_SERIES,
+ NULL);
+ }
+ else
+ {
+ qp_unpacker_t unpacker;
+ qp_unpacker_init(&unpacker, pkg->data, pkg->len);
+
+ if (qp_is_array(qp_next(&unpacker, NULL)))
+ {
+ qp_obj_t qp_tag_name;
+
+ uv_mutex_lock(&siridb->tags->mutex);
+
+ while (qp_next(&unpacker, &qp_tag_name) == QP_RAW)
+ {
+ siridb_tag_t * tag = ct_getn(
+ siridb->tags->tags,
+ qp_tag_name.via.str,
+ qp_tag_name.len);
+
+ if (tag == NULL)
+ {
+ tag = siridb_tags_add_n(
+ siridb->tags,
+ qp_tag_name.via.str,
+ qp_tag_name.len);
+
+ siridb_tags_set_require_save(siridb->tags, tag);
+ }
+ }
+
+ uv_mutex_unlock(&siridb->tags->mutex);
+
+ package = sirinet_pkg_new(
+ pkg->pid,
+ 0,
+ BPROTO_ACK_EMPTY_TAGS,
+ NULL);
+ }
+ else
+ {
+ log_error("Illegal back-end empty tags package received");
+ }
+ }
+
+ if (package != NULL)
+ {
+ sirinet_pkg_send(client, package);
+ }
+}
case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS";
+ case BPROTO_EMPTY_TAGS: return "BPROTO_EMPTY_TAGS";
default:
sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
return protocol_str;
case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE";
case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS";
case BPROTO_ACK_SERIES_TAGS: return "BPROTO_ACK_SERIES_TAGS";
+ case BPROTO_ACK_EMPTY_TAGS: return "BPROTO_ACK_EMPTY_TAGS";
default:
sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
return protocol_str;