int qp_add_raw(qp_packer_t * packer, const unsigned char * raw, size_t len);
int qp_add_string(qp_packer_t * packer, const char * str);
int qp_add_string_term(qp_packer_t * packer, const char * str);
+int qp_add_string_term_n(qp_packer_t * packer, const char * str, size_t n);
int qp_add_raw_term(qp_packer_t * packer, const unsigned char * raw, size_t len);
int qp_add_double(qp_packer_t * packer, double real);
int fd;
long int size;
uint32_t * next_series_id;
- sirinet_pkg_t * pkg;
+ sirinet_pkg_t * pkg_points;
+ sirinet_pkg_t * pkg_tags;
siridb_series_t * series;
siridb_server_t * server;
uv_timer_t * timer;
siridb_tags_t * tags,
const char * name,
char * err_msg);
+siridb_tag_t * siridb_tags_add_n(
+ siridb_tags_t * tags,
+ const char * name,
+ size_t name_len);
siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name);
void siridb_tags_dropped_series(siridb_tags_t * tags);
void siridb_tags_save(siridb_tags_t * tags);
void siridb_tags_init_nseries(siridb_tags_t * tags);
+sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid);
+sirinet_pkg_t * siridb_tags_series(siridb_series_t * series);
#define siridb_tags_set_require_save(__tags, __tag) \
BPROTO_DISABLE_BACKUP_MODE, /* empty */
BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */
BPROTO_DROP_DATABASE, /* empty */
+ BPROTO_REQ_TAGS, /* empty */
+ BPROTO_SERIES_TAGS, /* [series name, tag name, ...] */
} bproto_client_t;
/*
BPROTO_RES_GROUPS, /* [[name, series], ...] */
BPROTO_ACK_TEE_PIPE_NAME, /* empty */
BPROTO_ACK_DROP_DATABASE, /* empty */
- BPROTO_RES_TAGS, /* [[name, series], ...] */
-
+ BPROTO_RES_TAGS, /* [[name, series], ...] */
+ BPROTO_ACK_SERIES_TAGS /* empty */
} bproto_server_t;
#define sirinet_protocol_is_error(tp) (tp >= 64 && tp < 192)
import functools
import random
import time
+import math
from testing import Client
from testing import default_test_setup
from testing import gen_data
from testing import parse_args
+DATA = {
+ 'series-001 float': [
+ [1471254705, 1.5],
+ [1471254707, -3.5],
+ [1471254710, -7.3]],
+ 'series-001 integer': [
+ [1471254705, 5],
+ [1471254708, -3],
+ [1471254710, -7]],
+ 'series-002 float': [
+ [1471254705, 3.5],
+ [1471254707, -2.5],
+ [1471254710, -8.3]],
+ 'series-002 integer': [
+ [1471254705, 4],
+ [1471254708, -1],
+ [1471254710, -8]],
+ 'aggr': [
+ [1447249033, 531], [1447249337, 534],
+ [1447249633, 535], [1447249937, 531],
+ [1447250249, 532], [1447250549, 537],
+ [1447250868, 530], [1447251168, 520],
+ [1447251449, 54], [1447251749, 54],
+ [1447252049, 513], [1447252349, 537],
+ [1447252649, 528], [1447252968, 531],
+ [1447253244, 533], [1447253549, 538],
+ [1447253849, 534], [1447254149, 532],
+ [1447254449, 533], [1447254748, 537]],
+ 'huge': [
+ [1471254705, 9223372036854775807],
+ [1471254706, 9223372036854775806],
+ [1471254707, 9223372036854775805],
+ [1471254708, 9223372036854775804]],
+ 'equal ts': [
+ [1471254705, 0], [1471254705, 1], [1471254705, 1],
+ [1471254707, 0], [1471254707, 1], [1471254708, 0],
+ ],
+ 'variance': [
+ [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25],
+ [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25],
+ [1471254711, 3.5]
+ ],
+ 'pvariance': [
+ [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25],
+ [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75],
+ [1471254711, 2.75], [1471254712, 3.25]
+ ],
+ 'filter': [
+ [1471254705, 5],
+ [1471254710, -3],
+ [1471254715, -7],
+ [1471254720, 7]
+ ],
+ 'one': [
+ [1471254710, 1]
+ ],
+ 'log': [
+ [1471254710, 'log line one'],
+ [1471254712, 'log line two'],
+ [1471254714, 'another line (three)'],
+ [1471254716, 'and yet one more'],
+ ],
+ 'special': [
+ [1471254705, 0.1],
+ [1471254706, math.nan],
+ [1471254707, math.inf],
+ [1471254708, -math.inf],
+ ]
+}
+
+
class TestCluster(TestBase):
title = 'Test siridb-cluster'
- @default_test_setup(4, time_precision='s')
+ @default_test_setup(2, time_precision='s')
async def run(self):
await self.client0.connect()
+ await self.client0.insert(DATA)
+
+ await self.client0.query('''
+ alter series /series.*/ tag `SERIES`
+ ''')
+
+ await self.client0.query('''
+ alter series /series.*/ tag `OTHER`
+ ''')
+
await self.db.add_pool(self.server1)
await self.assertIsRunning(self.db, self.client0, timeout=12)
- await asyncio.sleep(45)
+ # await asyncio.sleep(45)
- await self.db.add_replica(self.server2, 0)
- await self.assertIsRunning(self.db, self.client0, timeout=12)
+ # await self.db.add_replica(self.server2, 0)
+ # await self.assertIsRunning(self.db, self.client0, timeout=12)
- await asyncio.sleep(45)
- await self.db.add_replica(self.server3, 1)
- await self.assertIsRunning(self.db, self.client0, timeout=12)
+ # await asyncio.sleep(45)
+
+ # await self.db.add_replica(self.server3, 1)
+ # await self.assertIsRunning(self.db, self.client0, timeout=12)
# await asyncio.sleep(35)
{
return qp_add_raw(packer, (unsigned char *) str, strlen(str) + 1);
}
+int qp_add_string_term_n(qp_packer_t * packer, const char * str, size_t n)
+{
+ return qp_add_raw(packer, (unsigned char *) str, n + 1);
+}
/*
* Adds a raw string to the packer and appends a terminator (0) so the written
}
else
{
- sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_GROUPS, NULL);
+ sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_TAGS, NULL);
if (pkg != NULL)
{
}
else
{
- sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_GROUPS, NULL);
+ sirinet_pkg_t * pkg = sirinet_pkg_new(0, 0, BPROTO_REQ_TAGS, NULL);
if (pkg != NULL)
{
pkg = (sirinet_pkg_t *) promise->data;
- if (pkg != NULL && pkg->tp == BPROTO_RES_GROUPS)
+ if (pkg != NULL && pkg->tp == BPROTO_RES_TAGS)
{
qp_unpacker_init(&unpacker, pkg->data, pkg->len);
sirinet_promise_t * promise,
sirinet_pkg_t * pkg,
int status);
+static void REINDEX_on_tag_response(
+ sirinet_promise_t * promise,
+ sirinet_pkg_t * pkg,
+ int status);
static char reindex_progress[30];
reindex->fn = NULL;
reindex->fp = NULL;
reindex->next_series_id = NULL;
- reindex->pkg = NULL;
+ reindex->pkg_points = NULL;
+ reindex->pkg_tags = NULL;
reindex->timer = NULL;
reindex->server = NULL;
if (REINDEX_fn(siridb, reindex) < 0)
}
free((*reindex)->fn);
free((*reindex)->next_series_id);
- free((*reindex)->pkg);
+ free((*reindex)->pkg_points);
+ free((*reindex)->pkg_tags);
free(*reindex);
*reindex = NULL;
}
static void REINDEX_send(uv_timer_t * timer)
{
siridb_t * siridb = (siridb_t *) timer->data;
- assert (siridb->reindex->pkg != NULL);
+ assert (siridb->reindex->pkg_points != NULL);
/* actually 'available' is sufficient since the destination server has
* never status 're-indexing' unless one day we support down-scaling.
*/
{
siridb_server_send_pkg(
siridb->reindex->server,
- siridb->reindex->pkg,
+ siridb->reindex->pkg_points,
REINDEX_TIMEOUT,
(sirinet_promise_cb) REINDEX_on_insert_response,
siridb,
FLAG_KEEP_PKG);
+ if (siridb->reindex->pkg_tags)
+ {
+ siridb_server_send_pkg(
+ siridb->reindex->server,
+ siridb->reindex->pkg_tags,
+ REINDEX_TIMEOUT,
+ (sirinet_promise_cb) REINDEX_on_tag_response,
+ NULL,
+ FLAG_KEEP_PKG);
+ }
}
else
{
static int REINDEX_next_series_id(siridb_reindex_t * reindex)
{
/* free re-index package */
- free(reindex->pkg);
- reindex->pkg = NULL;
+ free(reindex->pkg_points);
+ free(reindex->pkg_tags);
+ reindex->pkg_points = NULL;
+ reindex->pkg_tags = NULL;
int rc;
reindex->size -= sizeof(uint32_t);
assert (SIRI_OPTIMZE_IS_PAUSED);
assert (reindex != NULL);
- assert (siridb->reindex->pkg == NULL);
+ assert (siridb->reindex->pkg_points == NULL);
+ assert (siridb->reindex->pkg_tags == NULL);
reindex->series = imap_get(siridb->series_map, *reindex->next_series_id);
if (siridb_points_pack(points, packer) == 0)
{
- reindex->pkg = sirinet_packer2pkg(
+ reindex->pkg_points = sirinet_packer2pkg(
packer,
0,
BPROTO_INSERT_TESTED_SERVER);
+
+ /* tag package may be NULL when no tag need to be
+ * synchronized */
+ reindex->pkg_tags = siridb_tags_series(reindex->series);
+
uv_timer_start(
reindex->timer,
REINDEX_send,
}
}
siridb_points_free(points);
+
+
}
}
}
siridb_series_flush_dropped(siridb);
}
}
+
/*
* Call-back function: sirinet_promise_cb
*/
sirinet_promise_decref(promise);
}
+/*
+ * Call-back function: sirinet_promise_cb
+ */
+static void REINDEX_on_tag_response(
+ sirinet_promise_t * promise,
+ sirinet_pkg_t * pkg,
+ int status)
+{
+ if (status)
+ {
+ log_error("Error while sending tags (%d)", status);
+ }
+
+ sirinet_promise_decref(promise);
+}
+
/*
* Typedef: imap_cb
*
return 0;
}
+siridb_tag_t * siridb_tags_add_n(
+ siridb_tags_t * tags,
+ const char * name,
+ size_t name_len)
+{
+ siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++);
+
+ if (tag != NULL)
+ {
+ tag->name = strndup(name, name_len);
+ if (tag->name == NULL || ct_add(tags->tags, tag->name, tag))
+ {
+ siridb_tag_decref(tag);
+ tag = NULL;
+ }
+ }
+ return tag;
+}
+
siridb_tag_t * siridb_tags_add(siridb_tags_t * tags, const char * name)
{
siridb_tag_t * tag = siridb_tag_new(tags, tags->next_id++);
ct_values(tags->tags, (ct_val_cb) TAGS_nseries, NULL);
}
+/*
+ * Main thread.
+ */
+static int TAGS_pkg(siridb_tag_t * tag, qp_packer_t * packer)
+{
+ int rc = 0;
+ rc += qp_add_type(packer, QP_ARRAY2);
+ rc += qp_add_string_term(packer, tag->name);
+ rc += qp_add_int64(packer, tag->series->len);
+ return rc;
+}
+
+/*
+ * Main thread.
+ *
+ * Returns NULL and raises a signal in case of an error.
+ */
+sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid)
+{
+ qp_packer_t * packer = sirinet_packer_new(8192);
+ int rc;
+
+ if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN))
+ {
+ return NULL; /* signal is raised */
+ }
+
+ rc = ct_values(tags->tags, (ct_val_cb) TAGS_pkg, packer);
+
+ if (rc)
+ {
+ /* signal is raised when not 0 */
+ qp_packer_free(packer);
+ return NULL;
+ }
+
+ return sirinet_packer2pkg(packer, pid, BPROTO_RES_TAGS);
+}
+
+typedef struct
+{
+ qp_packer_t * packer;
+ uint64_t id;
+} TAGS_series_t;
+
+/*
+ * Main thread.
+ */
+static int TAGS_series_pkg(siridb_tag_t * tag, TAGS_series_t * w)
+{
+ return imap_get(tag->series, w->id)
+ ? qp_add_string(w->packer, tag->name) == 0
+ : 0;
+}
+
+/*
+ * Main thread.
+ *
+ * Returns NULL and raises a signal in case of an error.
+ */
+sirinet_pkg_t * siridb_tags_series(siridb_series_t * series)
+{
+ TAGS_series_t w = {
+ .packer = sirinet_packer_new(1024),
+ .id = series->id,
+ };
+
+ if (w.packer == NULL ||
+ qp_add_type(w.packer, QP_ARRAY_OPEN) ||
+ qp_add_string_term_n(w.packer, series->name, series->name_len))
+ {
+ return NULL;
+ }
+
+ if (ct_values(
+ series->siridb->tags->tags,
+ (ct_val_cb) TAGS_series_pkg,
+ &w) == 0)
+ {
+ free(w.packer);
+ return NULL;
+ }
+
+ return sirinet_packer2pkg(w.packer, 0, BPROTO_SERIES_TAGS);
+}
+
/*
* Main thread.
*/
static void on_disable_backup_mode(
sirinet_stream_t * client,
sirinet_pkg_t * pkg);
+static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
+static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static uv_loop_t * loop = NULL;
static struct sockaddr_storage server_addr;
case BPROTO_DROP_DATABASE:
on_drop_database(client, pkg);
break;
+ case BPROTO_REQ_TAGS:
+ on_req_tags(client, pkg);
+ break;
+ case BPROTO_SERIES_TAGS:
+ on_series_tags(client, pkg);
+ break;
}
}
sirinet_pkg_send(client, package);
}
}
+
+static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg)
+{
+ SERVER_CHECK_AUTHENTICATED(client, server)
+
+ siridb_t * siridb = client->siridb;
+ sirinet_pkg_t * package = siridb_tags_pkg(siridb->tags, pkg->pid);
+
+ if (package != NULL)
+ {
+ sirinet_pkg_send(client, package);
+ }
+}
+
+static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg)
+{
+ SERVER_CHECK_AUTHENTICATED(client, server)
+
+ sirinet_pkg_t * package = NULL;
+ siridb_t * siridb = client->siridb;
+
+ if (~siridb->server->flags & SERVER_FLAG_RUNNING)
+ {
+ log_error("Cannot tag series because of having status %d",
+ siridb->server->flags);
+
+ package = sirinet_pkg_new(
+ pkg->pid,
+ 0,
+ BPROTO_ERR_DROP_SERIES,
+ NULL);
+ }
+ else
+ {
+ qp_obj_t qp_series_name;
+ qp_unpacker_t unpacker;
+ qp_unpacker_init(&unpacker, pkg->data, pkg->len);
+
+ if (qp_is_array(qp_next(&unpacker, NULL)) &&
+ qp_next(&unpacker, &qp_series_name) == QP_RAW &&
+ qp_is_raw_term(&qp_series_name))
+ {
+ siridb_series_t * series;
+
+ series = ct_get(
+ siridb->series,
+ (const char *) qp_series_name.via.raw);
+
+ if (series != NULL)
+ {
+ qp_obj_t qp_tag_name;
+
+ /* take a reference since this task might wait for a lock */
+
+ ++series->ref;
+
+ uv_mutex_lock(&siridb->tags->mutex);
+
+ while (qp_next(&unpacker, &qp_tag_name) == QP_RAW)
+ {
+ siridb_tag_t * tag = ct_getn(
+ siridb->tags->tags,
+ qp_tag_name.via.str,
+ qp_tag_name.len);
+
+ if (tag == NULL)
+ {
+ tag = siridb_tags_add_n(
+ siridb->tags,
+ qp_tag_name.via.str,
+ qp_tag_name.len);
+ }
+
+ if (tag && imap_add(tag->series, series->id, series) == 0)
+ {
+ ++series->ref;
+ }
+ }
+
+ uv_mutex_unlock(&siridb->tags->mutex);
+
+ siridb_series_decref(series);
+ }
+ else
+ {
+ log_warning(
+ "Received a request to tag series '%s' but "
+ "the series is not found (already dropped?)",
+ qp_series_name.via.raw);
+ }
+
+ package = sirinet_pkg_new(
+ pkg->pid,
+ 0,
+ BPROTO_ACK_SERIES_TAGS,
+ NULL);
+ }
+ else
+ {
+ log_error(
+ "Illegal back-end tag series package "
+ "received, probably the series name was not "
+ "terminated?");
+ }
+ }
+
+ if (package != NULL)
+ {
+ sirinet_pkg_send(client, package);
+ }
+}
case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
+ case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
+ case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS";
default:
sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
return protocol_str;
case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME";
case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE";
case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS";
+ case BPROTO_ACK_SERIES_TAGS: return "BPROTO_ACK_SERIES_TAGS";
default:
sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
return protocol_str;