from test_log import TestLog
from test_pipe_support import TestPipeSupport
from test_buffer import TestBuffer
+from test_tee import TestTee
if __name__ == '__main__':
run_test(TestLog())
run_test(TestPipeSupport())
run_test(TestBuffer())
+ run_test(TestTee())
--- /dev/null
+import os
+import asyncio
+import functools
+import random
+import time
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import SiriDB
+from testing import TestBase
+from testing import SiriDBAsyncUnixServer
+from testing import parse_args
+
+
+PIPE_NAME = '/tmp/dbtest_tee.sock'
+
+DATA = {
+ 'series num_float': [
+ [1471254705, 1.5],
+ [1471254707, -3.5],
+ [1471254710, -7.3]],
+ 'series num_integer': [
+ [1471254705, 5],
+ [1471254708, -3],
+ [1471254710, -7]],
+ 'series_log': [
+ [1471254710, 'log line one'],
+ [1471254712, 'log line two'],
+ [1471254714, 'another line (three)'],
+ [1471254716, 'and yet one more']]
+}
+
+if os.path.exists(PIPE_NAME):
+ os.unlink(PIPE_NAME)
+
+
+class TestTee(TestBase):
+ title = 'Test tee'
+
+ def on_data(self, data):
+ for k, v in data.items():
+ if k not in self._tee_data:
+ self._tee_data[k] = []
+ self._tee_data[k].extend(v)
+
+
+ @default_test_setup(1, pipe_name=PIPE_NAME)
+ async def run(self):
+ self._tee_data = {}
+
+ server = SiriDBAsyncUnixServer(PIPE_NAME, self.on_data)
+
+ await server.create()
+
+ await self.client0.connect()
+
+ await self.client0.query(
+ 'alter servers set tee_pipe_name "{}"'.format(PIPE_NAME))
+
+ await asyncio.sleep(1)
+
+ self.assertEqual(
+ await self.client0.insert(DATA),
+ {'success_msg': 'Successfully inserted 10 point(s).'})
+
+ self.assertAlmostEqual(
+ await self.client0.query('select * from "series num_float"'),
+ {'series num_float': DATA['series num_float']})
+
+ self.assertEqual(
+ await self.client0.query('select * from "series num_integer"'),
+ {'series num_integer': DATA['series num_integer']})
+
+ self.assertEqual(
+ await self.client0.query('select * from "series_log"'),
+ {'series_log': DATA['series_log']})
+
+ await asyncio.sleep(1)
+
+ self.assertEqual(DATA, self._tee_data)
+
+ self.client0.close()
+
+ return False
+
+
+if __name__ == '__main__':
+ parse_args()
+ run_test(TestTee())
from .testbase import TestBase
from .series import Series
from .pipe_client import PipeClient as SiriDBAsyncUnixConnection
+from .pipe_server import PipeServer as SiriDBAsyncUnixServer
from .args import parse_args
from .task import Task
--- /dev/null
+
+import logging
+import asyncio
+import struct
+import qpack
+from siridb.connector import SiriDBProtocol
+from siridb.connector.lib.connection import SiriDBAsyncConnection
+
+
+class Package:
+
+ __slots__ = ('pid', 'length', 'tipe', 'checkbit', 'data')
+
+ struct_datapackage = struct.Struct('<IHBB')
+
+ def __init__(self, barray):
+ self.length, self.pid, self.tipe, self.checkbit = \
+ self.__class__.struct_datapackage.unpack_from(barray, offset=0)
+ self.length += self.__class__.struct_datapackage.size
+ self.data = None
+
+ def extract_data_from(self, barray):
+ try:
+ self.data = qpack.unpackb(
+ barray[self.__class__.struct_datapackage.size:self.length],
+ decode='utf-8')
+ finally:
+ del barray[:self.length]
+
+
+class SiriDBServerProtocol(asyncio.Protocol):
+
+ def __init__(self, on_package_received):
+ self._buffered_data = bytearray()
+ self._data_package = None
+ self._on_package_received = on_package_received
+
+ def data_received(self, data):
+ '''
+ override asyncio.Protocol
+ '''
+ self._buffered_data.extend(data)
+ while self._buffered_data:
+ size = len(self._buffered_data)
+ if self._data_package is None:
+ if size < Package.struct_datapackage.size:
+ return None
+ self._data_package = Package(self._buffered_data)
+ if size < self._data_package.length:
+ return None
+ try:
+ self._data_package.extract_data_from(self._buffered_data)
+ except KeyError as e:
+ logging.error('Unsupported package received: {}'.format(e))
+ except Exception as e:
+ logging.exception(e)
+ # empty the byte-array to recover from this error
+ self._buffered_data.clear()
+ else:
+ self._on_package_received(self._data_package.data)
+ self._data_package = None
+
+
+class PipeServer(SiriDBAsyncConnection):
+ def __init__(self, pipe_name, on_data):
+ self._pipe_name = pipe_name
+ self._protocol = None
+ self._server = None
+ self._on_data_cb = on_data
+
+ async def create(self, loop=None):
+ loop = loop or asyncio.get_event_loop()
+
+ self._server = await loop.create_unix_server(
+ path=self._pipe_name,
+ protocol_factory=lambda: SiriDBServerProtocol(self._on_data))
+
+ def _on_data(self, data):
+ '''
+ series names are returned as c strings (0 terminated)
+ '''
+ data = {k.rstrip('\x00'): v for k, v in data.items()}
+ self._on_data_cb(data)
(*siridb)->tee->pipe_name_ = strndup(
(char *) qp_obj.via.raw,
qp_obj.len);
- READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
+
+ if (!(*siridb)->tee->pipe_name_)
+ {
+ READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.")
+ }
}
else if (qp_obj.tp != QP_NULL)
{
siridb_tasks_inc(siridb->tasks);
siridb->insert_tasks++;
+ if (siridb_tee_is_connected(siridb->tee))
+ {
+ siridb_tee_write(siridb->tee, promise);
+ }
+
uv_async_init(siri.loop, handle, INSERT_local_task);
uv_async_send(handle);
(cexpr_cb_t) siridb_server_cexpr_cb,
&wserver))
{
- siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ if (siridb_save(siridb))
+ {
+ log_critical("Could not save database changes (database: '%s')",
+ siridb->dbname);
+ }
q_alter->n++;
}
if (server == siridb->server)
{
(void) siridb_tee_set_pipe_name(siridb->tee, pipe_name);
+ if (siridb_save(siridb))
+ {
+ log_critical("Could not save database changes (database: '%s')",
+ siridb->dbname);
+ }
SIRIPARSER_ASYNC_NEXT_NODE
}
"Cannot open pipe '%s' for reading",
tee->pipe_name_) >= 0)
{
- log_error(tee->err_msg_);
+ log_warning(tee->err_msg_);
}
+ goto fail;
}
- else
- {
- tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
- }
+ tee->flags |= SIRIDB_TEE_FLAG_CONNECTED;
+ goto done;
}
- else
+
+ if (asprintf(
+ &tee->err_msg_,
+ "Cannot connect to pipe '%s' (%s)",
+ tee->pipe_name_,
+ uv_strerror(status)) >= 0)
{
- if (asprintf(
- &tee->err_msg_,
- "Cannot connect to pipe '%s' (%s)",
- tee->pipe_name_,
- uv_strerror(status)) >= 0)
- {
- log_error(tee->err_msg_);
- }
+ log_warning(tee->err_msg_);
}
+
+fail:
+ uv_close((uv_handle_t *) req->handle, NULL);
+done:
free(req);
}
sirinet_pipe_name((uv_pipe_t * ) client),
uv_err_name(nread));
}
- log_info("Disconnected from tee pipe: '%s'",
- sirinet_pipe_name((uv_pipe_t * ) client));
+ log_info("Disconnected from tee");
+ tee->flags &= ~SIRIDB_TEE_FLAG_INIT;
tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED;
uv_close((uv_handle_t *) client, NULL);
}
siri_service_client_t * adm_client,
sirinet_pkg_t * pkg)
{
- FILE * fp;
+ qp_fpacker_t * fpacker;
qp_unpacker_t unpacker;
- qp_obj_t qp_uuid;
+ qp_obj_t
+ qp_uuid,
+ qp_schema,
+ qp_dbname,
+ qp_time_precision,
+ qp_buffer_size,
+ qp_duration_num,
+ qp_duration_log,
+ qp_timezone,
+ qp_drop_threshold,
+ qp_points_limit,
+ qp_list_limit;
siridb_t * siridb;
int rc;
/* 13 = strlen("database.dat")+1 */
qp_unpacker_init(&unpacker, pkg->data, pkg->len);
- if (qp_is_array(qp_next(&unpacker, NULL)) &&
- /* schema check is not required at this moment but can be done here */
- qp_next(&unpacker, NULL) == QP_INT64 &&
- qp_next(&unpacker, &qp_uuid) == QP_RAW &&
- qp_uuid.len == 16)
- {
- memcpy(unpacker.pt - 16, &adm_client->uuid, 16);
- }
- else
+ if (!qp_is_array(qp_next(&unpacker, NULL)) ||
+ qp_next(&unpacker, &qp_schema) != QP_INT64 ||
+ qp_next(&unpacker, &qp_uuid) != QP_RAW || qp_uuid.len != 16 ||
+ qp_next(&unpacker, &qp_dbname) != QP_RAW ||
+ qp_next(&unpacker, &qp_time_precision) != QP_INT64 ||
+ qp_next(&unpacker, &qp_buffer_size) != QP_INT64 ||
+ qp_next(&unpacker, &qp_duration_num) != QP_INT64 ||
+ qp_next(&unpacker, &qp_duration_log) != QP_INT64 ||
+ qp_next(&unpacker, &qp_timezone) != QP_RAW ||
+ qp_next(&unpacker, &qp_drop_threshold) != QP_DOUBLE)
{
CLIENT_err(adm_client, "invalid database file received");
return;
}
- fp = fopen(fn, "w");
+ /* list and points limit require at least schema 1 */
+ (void) qp_next(&unpacker, &qp_points_limit);
+ (void) qp_next(&unpacker, &qp_list_limit);
- if (fp == NULL)
+ /* this is the tee pipe name when schema is >= 5 */
+ (void) qp_next(&unpacker, NULL);
+
+ if ((fpacker = qp_open(fn, "w")) == NULL)
{
CLIENT_err(adm_client, "cannot write or create file: %s", fn);
return;
}
- rc = fwrite(pkg->data, pkg->len, 1, fp);
-
- if (fclose(fp) || rc != 1)
+ rc = ( qp_fadd_type(fpacker, QP_ARRAY_OPEN) ||
+ qp_fadd_int64(fpacker, SIRIDB_SCHEMA) ||
+ qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) ||
+ qp_fadd_raw(fpacker, qp_dbname.via.raw, qp_dbname.len) ||
+ qp_fadd_int64(fpacker, qp_time_precision.via.int64) ||
+ qp_fadd_int64(fpacker, qp_buffer_size.via.int64) ||
+ qp_fadd_int64(fpacker, qp_duration_num.via.int64) ||
+ qp_fadd_int64(fpacker, qp_duration_log.via.int64) ||
+ qp_fadd_raw(fpacker, qp_timezone.via.raw, qp_timezone.len) ||
+ qp_fadd_double(fpacker, qp_drop_threshold.via.real) ||
+ qp_fadd_int64(fpacker, qp_points_limit.tp == QP_INT64
+ ? qp_points_limit.via.int64
+ : DEF_SELECT_POINTS_LIMIT) ||
+ qp_fadd_int64(fpacker, qp_list_limit.tp == QP_INT64
+ ? qp_list_limit.via.int64
+ : DEF_LIST_LIMIT) ||
+ qp_fadd_type(fpacker, QP_NULL) ||
+ qp_fadd_type(fpacker, QP_ARRAY_CLOSE) ||
+ qp_close(fpacker));
+
+ if (rc != 0)
{
CLIENT_err(adm_client, "cannot write or create file: %s", fn);
return;
qp_fadd_double(fp, DEF_DROP_THRESHOLD) ||
qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) ||
qp_fadd_int64(fp, DEF_LIST_LIMIT) ||
+ qp_fadd_type(fp, QP_NULL) ||
qp_fadd_type(fp, QP_ARRAY_CLOSE))
{
rc = -1;
/*
* xstr.c - Extra String functions used by SiriDB.
*/
+#include <assert.h>
#include <ctype.h>
#include <inttypes.h>
#include <logger/logger.h>
double xstr_to_double(const char * src, size_t len)
{
char * pt = (char *) src;
+ assert (len);
double d = 0;
double convert;
+ uint64_t r1 = 0;
switch (*pt)
{
case '-':
+ assert (len > 1);
convert = -1.0;
- pt++;
+ ++pt;
+ --len;
break;
case '+':
- pt++;
- /* FALLTHRU */
- /* no break */
+ assert (len > 1);
+ convert = 1.0;
+ ++pt;
+ --len;
+ break;
default:
convert = 1.0;
}
- uint64_t r1 = *pt - '0';
-
- while (--len && isdigit(*(++pt)))
+ for (; len && isdigit(*pt); --len, ++pt)
{
r1 = 10 * r1 + *pt - '0';
}
d = (double) r1;
- if (--len && *(pt++) == '.')
+ if (len && --len)
{
- uint64_t r2 = *pt - '0';
- ssize_t i;
- for (i = -1; --len && isdigit(*(++pt)); i--)
+ uint64_t r2;
+ double power;
+ ++pt;
+ r2 = *pt - '0';
+ for (power = -1.0f; --len && isdigit(*(++pt)); power--)
{
- r2 = 10 * r2 + *pt - '0';
+ r2 = 10 * r2 + *pt - '0';
}
- d += pow(10, i) * (double) r2;
+ d += pow(10.0f, power) * (double) r2;
}
return convert * d;
{
test_start("grammar");
- cleri_grammar_t * grammar = compile_grammar();
+ cleri_grammar_t * grammar = compile_siri_grammar_grammar();
assert_invalid(grammar, "select * from");
assert_invalid(grammar, "list");
../src/siri/db/lookup.c
../src/siri/err.c
-../src/logger/logger.c
\ No newline at end of file
+../src/logger/logger.c
../src/siri/db/shard.c
../src/siri/db/shards.c
../src/siri/db/tasks.c
+../src/siri/db/tee.c
../src/siri/db/time.c
../src/siri/db/user.c
../src/siri/db/users.c
../src/siri/service/request.c
../src/siri/help/help.c
../src/siri/cfg/cfg.c
-../src/siri/grammar/grammar.c
\ No newline at end of file
+../src/siri/grammar/grammar.c
_assert (xstr_to_double("0.55", 3) == 0.5);
_assert (xstr_to_double("123.456", 7) == 123.456);
_assert (xstr_to_double("123", 3) == 123);
+ _assert (xstr_to_double("1234", 3) == 123);
_assert (xstr_to_double("123.", 4) == 123);
+ _assert (xstr_to_double("123.", 3) == 123);
+ _assert (xstr_to_double("+1234", 4) == 123);
+ _assert (xstr_to_double("-1234", 4) == -123);
_assert (xstr_to_double("123456.", 3) == 123);
- _assert (xstr_to_double("-0.5", 3) == -0.5);
+ _assert (xstr_to_double("-0.5", 4) == -0.5);
+ _assert (xstr_to_double("-0.56", 4) == -0.5);
+ _assert (xstr_to_double("+0.5", 4) == 0.5);
+ _assert (xstr_to_double("+0.56", 4) == 0.5);
+ _assert (xstr_to_double("-.5", 3) == -0.5);
+ _assert (xstr_to_double("+.55", 3) == 0.5);
+ _assert (xstr_to_double(".55", 2) == 0.5);
+ _assert (xstr_to_double("-.55", 3) == -0.5);
}
return test_end();