import functools
import random
import time
+import math
from testing import Client
from testing import default_test_setup
from testing import gen_data
PIPE_NAME = '/tmp/dbtest_tee.sock'
DATA = {
- 'series num_float': [
+ 'series float': [
[1471254705, 1.5],
[1471254707, -3.5],
[1471254710, -7.3]],
- 'series num_integer': [
+ 'series integer': [
[1471254705, 5],
[1471254708, -3],
[1471254710, -7]],
- 'series_log': [
+ 'aggr': [
+ [1447249033, 531], [1447249337, 534],
+ [1447249633, 535], [1447249937, 531],
+ [1447250249, 532], [1447250549, 537],
+ [1447250868, 530], [1447251168, 520],
+ [1447251449, 54], [1447251749, 54],
+ [1447252049, 513], [1447252349, 537],
+ [1447252649, 528], [1447252968, 531],
+ [1447253244, 533], [1447253549, 538],
+ [1447253849, 534], [1447254149, 532],
+ [1447254449, 533], [1447254748, 537]],
+ # 'huge': [
+ # [1471254705, 9223372036854775807],
+ # [1471254706, 9223372036854775806],
+ # [1471254707, 9223372036854775805],
+ # [1471254708, 9223372036854775804]],
+ 'equal ts': [
+ [1471254705, 0], [1471254705, 1], [1471254705, 1],
+ [1471254707, 0], [1471254707, 1], [1471254708, 0],
+ ],
+ 'variance': [
+ [1471254705, 2.75], [1471254706, 1.75], [1471254707, 1.25],
+ [1471254708, 0.25], [1471254709, 0.5], [1471254710, 1.25],
+ [1471254711, 3.5]
+ ],
+ 'pvariance': [
+ [1471254705, 0.0], [1471254706, 0.25], [1471254707, 0.25],
+ [1471254708, 1.25], [1471254709, 1.5], [1471254710, 1.75],
+ [1471254711, 2.75], [1471254712, 3.25]
+ ],
+ 'filter': [
+ [1471254705, 5],
+ [1471254710, -3],
+ [1471254715, -7],
+ [1471254720, 7]
+ ],
+ 'one': [
+ [1471254710, 1]
+ ],
+ 'log': [
[1471254710, 'log line one'],
[1471254712, 'log line two'],
[1471254714, 'another line (three)'],
- [1471254716, 'and yet one more']]
+ [1471254716, 'and yet one more'],
+ ],
+ # 'special': [
+ # [1471254705, 0.1],
+ # [1471254706, math.nan],
+ # [1471254707, math.inf],
+ # [1471254708, -math.inf],
+ # ]
}
if os.path.exists(PIPE_NAME):
self._tee_data[k].extend(v)
- @default_test_setup(1, pipe_name=PIPE_NAME)
+ @default_test_setup(2)
async def run(self):
self._tee_data = {}
self.assertEqual(
await self.client0.insert(DATA),
- {'success_msg': 'Successfully inserted 10 point(s).'})
+ {'success_msg': 'Successfully inserted 56 point(s).'})
self.assertAlmostEqual(
- await self.client0.query('select * from "series num_float"'),
- {'series num_float': DATA['series num_float']})
+ await self.client0.query('select * from "series float"'),
+ {'series float': DATA['series float']})
self.assertEqual(
- await self.client0.query('select * from "series num_integer"'),
- {'series num_integer': DATA['series num_integer']})
+ await self.client0.query('select * from "series integer"'),
+ {'series integer': DATA['series integer']})
self.assertEqual(
- await self.client0.query('select * from "series_log"'),
- {'series_log': DATA['series_log']})
+ await self.client0.query('select * from "log"'),
+ {'log': DATA['log']})
+
+ await asyncio.sleep(1)
+
+ self.assertEqual(DATA, self._tee_data)
+
+ await self.db.add_pool(self.server1, sleep=3)
+ await self.assertIsRunning(self.db, self.client0, timeout=50)
+
+ await self.client1.connect()
+
+ self._tee_data = {}
+ await self.client0.query('drop series set ignore_threshold true')
await asyncio.sleep(1)
+ await self.client0.query(
+ 'alter servers set tee_pipe_name "{}"'.format(PIPE_NAME))
+
+ await asyncio.sleep(1)
+
+ self.assertEqual(
+ await self.client0.insert(DATA),
+ {'success_msg': 'Successfully inserted 56 point(s).'})
+
self.assertEqual(DATA, self._tee_data)
self.client0.close()
query->nodes->node->children->next->next->node->children->node;
char pipe_name[node->len - 1];
- xstr_extract_string(pipe_name, node->str, node->len);
+ char * p_pipe_name = NULL;
+
+ if (node->cl_obj->gid == CLERI_GID_STRING)
+ {
+ xstr_extract_string(pipe_name, node->str, node->len);
+ p_pipe_name = pipe_name;
+ }
if (q_alter->alter_tp == QUERY_ALTER_SERVERS)
{
(cexpr_cb_t) siridb_server_cexpr_cb,
&wserver))
{
- (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
if (siridb_save(siridb))
{
log_critical("Could not save database changes (database: '%s')",
QP_ADD_SUCCESS
qp_add_fmt_safe(query->packer,
MSG_SUCCES_SET_TEE_PIPE_NAME,
- pipe_name,
+ p_pipe_name ? p_pipe_name : "disabled",
server->name);
if (server == siridb->server)
{
- (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ (void) siridb_tee_set_pipe_name(siridb->tee, p_pipe_name);
if (siridb_save(siridb))
{
log_critical("Could not save database changes (database: '%s')",
{
sirinet_pkg_t * pkg = sirinet_pkg_new(
0,
- strlen(pipe_name),
+ p_pipe_name ? strlen(p_pipe_name) : 0,
BPROTO_TEE_PIPE_NAME_UPDATE,
- (unsigned char *) pipe_name);
+ (unsigned char *) p_pipe_name);
if (pkg != NULL)
{
/* handle will be bound to a timer so we should increment */
static void tee__runtime_init(uv_pipe_t * pipe);
static void tee__write_cb(uv_write_t * req, int status);
static void tee__on_connect(uv_connect_t * req, int status);
+static void tee__close_cb(uv_pipe_t * pipe);
static void tee__alloc_buffer(
uv_handle_t * handle,
size_t suggsz,
tee->flags |= SIRIDB_TEE_FLAG_INIT;
tee->pipe.data = tee;
- free(tee->err_msg_);
-
uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect);
return 0;
}
int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name)
{
free(tee->pipe_name_);
+ free(tee->err_msg_);
+ tee->err_msg_ = NULL;
+
+ if (pipe_name == NULL)
+ {
+ tee->pipe_name_ = NULL;
+
+ if (tee->flags & SIRIDB_TEE_FLAG_CONNECTED)
+ {
+ uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__close_cb);
+ }
+ return 0;
+ }
+
tee->pipe_name_ = strdup(pipe_name);
if (!tee->pipe_name_)
{
}
}
+static void tee__close_cb(uv_pipe_t * pipe)
+{
+ siridb_tee_t * tee = pipe->data;
+
+ tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
+ tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
+}
+
static void tee__write_cb(uv_write_t * req, int status)
{
sirinet_promise_t * promise = req->data;
log_info("Connection created to pipe: '%s'", tee->pipe_name_);
if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data))
{
+ free(tee->err_msg_);
if (asprintf(&tee->err_msg_,
"Cannot open pipe '%s' for reading",
tee->pipe_name_) >= 0)
goto done;
}
+ free(tee->err_msg_);
if (asprintf(
&tee->err_msg_,
"Cannot connect to pipe '%s' (%s)",
}
fail:
- uv_close((uv_handle_t *) req->handle, NULL);
+ uv_close((uv_handle_t *) req->handle, (uv_close_cb) tee__close_cb);
done:
free(req);
}
ssize_t nread,
const uv_buf_t * buf __attribute__((unused)))
{
- siridb_tee_t * tee = client->data;
-
if (nread < 0)
{
if (nread != UV_EOF)
uv_err_name(nread));
}
log_info("Disconnected from tee");
- tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
- tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
- uv_close((uv_handle_t *) client, NULL);
+ uv_close((uv_handle_t *) client, (uv_close_cb) tee__close_cb);
}
if (nread > 0)