+#!/usr/bin/env python
+import argparse
import asyncio
+import qpack
+class TeeServerProtocol(asyncio.Protocol):
+ def connection_made(self, transport):
+ peername = transport.get_extra_info('peername')
+ print('Connection from {}'.format(peername))
+ self.transport = transport
+
+ def data_received(self, data):
+ print('Data received: {}'.format(len(data)))
+ self.transport.write(b'test')
+
+
+async def main(args):
+ loop = asyncio.get_running_loop()
+
+ server = await loop.create_server(
+ lambda: TeeServerProtocol(),
+ '127.0.0.1',
+ args.port)
+
+ async with server:
+ await server.serve_forever()
+
if __name__ == '__main__':
- pass
\ No newline at end of file
+
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("-p", "--port", type=int, default=9104)
+
+ args = parser.parse_args()
+
+ loop = asyncio.get_event_loop()
+
+ loop.run_until_complete(main(args))
class TestCluster(TestBase):
title = 'Test siridb-cluster'
- @default_test_setup(3, time_precision='s')
+ @default_test_setup(4, time_precision='s')
async def run(self):
await self.client0.connect()
# alter series /.*/ - `SERIES` tag `OTHER`
# ''')
- # await self.db.add_replica(self.server1, 0)
- # await self.assertIsRunning(self.db, self.client0, timeout=30)
+ await self.db.add_replica(self.server1, 0)
+ await self.assertIsRunning(self.db, self.client0, timeout=30)
# await asyncio.sleep(35)
# await self.db.add_replica(self.server3, 1)
# await self.assertIsRunning(self.db, self.client0, timeout=12)
- # await asyncio.sleep(35)
+ await asyncio.sleep(35)
- # await self.db.add_pool(self.server4)
- # await self.assertIsRunning(self.db, self.client0, timeout=12)
+ await self.db.add_pool(self.server2)
+ await self.assertIsRunning(self.db, self.client0, timeout=12)
- # await asyncio.sleep(35)
+ await asyncio.sleep(35)
# await self.db.add_pool(self.server5)
# await self.assertIsRunning(self.db, self.client0, timeout=12)
- # await self.db.add_replica(self.server1, 0)
- # await asyncio.sleep(5)
+ await self.db.add_replica(self.server3, 1)
+ await asyncio.sleep(5)
# await self.db.add_replica(self.server3, 1)
# await asyncio.sleep(5)
buf->len = TEE__BUF_SZ;
}
-static inline void tee__release(siridb_tee_t * tee)
-{
- sirinet_promise_decref(tee->promise_);
- tee->promise_ = NULL;
- uv_mutex_unlock(&tee->lock_);
-}
-
static void tee__write_cb(uv_write_t * req, int status)
{
- sirinet_promise_t * promise = req->data;
+ sirinet_pkg_t * pkg = req->data;
if (status)
{
log_error("Socket (tee) write error: %s", uv_strerror(status));
}
- sirinet_promise_decref(promise);
+ free(pkg);
free(req);
}
tee->address);
}
-static void tee__do_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+static void tee__do_write(siridb_tee_t * tee, sirinet_pkg_t * pkg)
{
- uv_write_t * req = malloc(sizeof(uv_write_t));
- uv_buf_t wrbuf = uv_buf_init(
- (char *) promise->pkg,
- sizeof(sirinet_pkg_t) + promise->pkg->len);
+ uv_write_t * req;
+ uv_buf_t buf;
+
+ pkg = sirinet_pkg_dup(pkg);
+ if (!pkg)
+ {
+ log_error("Failed to create duplicate for tee");
+ return;
+ }
+
+ req = malloc(sizeof(uv_write_t));
+ buf = uv_buf_init((char *) pkg, sizeof(sirinet_pkg_t) + pkg->len);
if (req)
{
int rc;
- req->data = promise;
- sirinet_promise_incref(promise);
- rc = uv_write(req, (uv_stream_t *) tee->tcp, &wrbuf, 1, tee__write_cb);
+ req->data = pkg;
+ rc = uv_write(req, (uv_stream_t *) tee->tcp, &buf, 1, tee__write_cb);
if (rc == 0)
{
return; /* success */
}
- sirinet_promise_decref(promise);
+ free(req);
}
+ free(pkg);
log_error("Cannot write to tee");
return;
}
static void tee__on_connect(uv_connect_t * req, int status)
{
- siridb_tee_t * tee = req->data;
+ uv_tcp_t * tcp = (uv_tcp_t *) req->handle;
+ siridb_tee_t * tee = tcp->data;
if (status == 0)
{
{
/* Failed to start reading the tee connection */
tee->err_code = SIRIDB_TEE_E_READ;
- log_error("Failed to open tee `%s` for reading", tee->address);
+ log_error(
+ "Failed to open tee `%s:%u` for reading",
+ tee->address, tee->port);
goto fail;
}
/* success */
- log_info("Connection created to tee: '%s'", tee->address);
- tee__do_write(tee, tee->promise_);
+ tee->tcp = tcp;
+ log_info(
+ "Connection created to tee: '%s:%u'", tee->address, tee->port);
goto done;
}
uv_strerror(status));
fail:
- uv_close((uv_handle_t *) req->handle, (uv_close_cb) free);
- tee->tcp = NULL;
+ uv_close((uv_handle_t *) tcp, (uv_close_cb) free);
done:
- tee__release(tee);
free(req);
+ uv_mutex_unlock(&tee->lock_);
}
void tee__make_connection(siridb_tee_t * tee, const struct sockaddr * dest)
{
uv_connect_t * req = malloc(sizeof(uv_connect_t));
- tee->tcp = malloc(sizeof(uv_tcp_t));
- if (tee->tcp == NULL || req == NULL)
+ uv_tcp_t * tcp = malloc(sizeof(uv_tcp_t));
+ if (tcp == NULL || req == NULL)
{
goto fail0;
}
+ tcp->data = tee;
- req->data = tee;
+ log_debug("Trying to connect to tee '%s:%u'...", tee->address, tee->port);
+
+ (void) uv_tcp_init(siri.loop, tcp);
+ (void) uv_tcp_connect(req, tcp, dest, tee__on_connect);
- log_debug("Trying to connect to tee '%s'...", tee->address);
- (void) uv_tcp_init(siri.loop, tee->tcp);
- (void) uv_tcp_connect(req, tee->tcp, dest, tee__on_connect);
return;
fail0:
free(req);
- free(tee->tcp);
- tee->tcp = NULL;
- tee__release(tee);
+ free(tcp);
+ uv_mutex_unlock(&tee->lock_);
}
static void tee__on_resolved(
log_error("Cannot resolve ip address for tee '%s' (error: %s)",
tee->address,
uv_err_name(status));
- tee__release(tee);
- return;
+ uv_mutex_unlock(&tee->lock_);
+ goto final;
}
if (Logger.level == LOGGER_DEBUG)
"Resolved ip address '%s' for tee '%s', "
"trying to connect...",
addr, tee->address);
-
}
tee__make_connection(tee, (const struct sockaddr *) res->ai_addr);
+final:
+ uv_freeaddrinfo(res);
}
static void tee__resolve_dns(siridb_tee_t * tee, int ai_family)
}
tee->address = NULL;
tee->tcp = NULL;
- tee->promise_ = NULL;
tee->flags = SIRIDB_TEE_FLAG;
tee->err_code = 0;
uv_mutex_init(&tee->lock_);
return tee;
}
+void siridb_tee_close(siridb_tee_t * tee)
+{
+ uv_mutex_lock(&tee->lock_);
+ if (tee->tcp && !uv_is_closing((uv_handle_t *) tee->tcp))
+ {
+ uv_close((uv_handle_t *) tee->tcp, (uv_close_cb) free);
+ tee->tcp = NULL;
+ }
+ uv_mutex_unlock(&tee->lock_);
+}
+
void siridb_tee_free(siridb_tee_t * tee)
{
/* must be closed before free can be used */
assert (tee->tcp == NULL);
- assert (tee->promise_ == NULL);
uv_mutex_destroy(&tee->lock_);
free(tee->address);
return "disabled";
}
-void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+void siridb_tee_write(siridb_tee_t * tee, sirinet_pkg_t * pkg)
{
- if (!tee->address)
- {
- /* Tee is not configured */
- return;
- }
-
+ assert (tee->address);
if (tee->tcp)
{
- tee__do_write(tee, promise);
+ tee__do_write(tee, pkg);
}
else
{
- uv_mutex_lock(&tee->lock_);
+ log_debug("No tee connection (yet)...");
- if (!tee->address)
+ if (uv_mutex_trylock(&tee->lock_))
{
- /* Tee is not configured */
- uv_mutex_unlock(&tee->lock_);
return;
}
- assert (tee->promise_ == NULL);
- tee->promise_ = promise;
- sirinet_promise_incref(promise);
tee__connect(tee);
}
}