From: Jeroen van der Heijden Date: Thu, 21 Apr 2022 10:31:03 +0000 (+0200) Subject: Work on TCP tee X-Git-Tag: archive/raspbian/2.0.48-1+rpi1^2~6^2^2~9^2~4 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=5c7907f35c4ff3a400dc7267685f56faeccd9884;p=siridb-server.git Work on TCP tee --- diff --git a/include/siri/db/tee.h b/include/siri/db/tee.h index cbee3562..7a2be870 100644 --- a/include/siri/db/tee.h +++ b/include/siri/db/tee.h @@ -24,14 +24,15 @@ enum siridb_tee_e_t #include #include -#include +#include siridb_tee_t * siridb_tee_new(void); +void siridb_tee_close(siridb_tee_t * tee); int siridb_tee_set_address_port( siridb_tee_t * tee, const char * address, uint16_t port); -void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise); +void siridb_tee_write(siridb_tee_t * tee, sirinet_pkg_t * pkg); void siridb_tee_free(siridb_tee_t * tee); const char * siridb_tee_str(siridb_tee_t * tee); @@ -46,7 +47,6 @@ struct siridb_tee_s char * address; uv_tcp_t * tcp; uv_mutex_t lock_; - sirinet_promise_t * promise_; }; diff --git a/include/siri/version.h b/include/siri/version.h index 5c05559c..19a38342 100644 --- a/include/siri/version.h +++ b/include/siri/version.h @@ -15,7 +15,7 @@ * Note that debian alpha packages should use versions like this: * 2.0.34-0alpha0 */ -#define SIRIDB_VERSION_PRE_RELEASE "-alpha-1" +#define SIRIDB_VERSION_PRE_RELEASE "-alpha-2" #ifndef NDEBUG #define SIRIDB_VERSION_BUILD_RELEASE "+debug" diff --git a/itest/tee_server.py b/itest/tee_server.py old mode 100644 new mode 100755 index 11b8ffc0..1989b5fa --- a/itest/tee_server.py +++ b/itest/tee_server.py @@ -1,6 +1,40 @@ +#!/usr/bin/env python +import argparse import asyncio +import qpack +class TeeServerProtocol(asyncio.Protocol): + def connection_made(self, transport): + peername = transport.get_extra_info('peername') + print('Connection from {}'.format(peername)) + self.transport = transport + + def data_received(self, data): + print('Data received: {}'.format(len(data))) + self.transport.write(b'test') + + +async def main(args): + loop = asyncio.get_running_loop() + + server = await loop.create_server( + lambda: TeeServerProtocol(), + '127.0.0.1', + args.port) + + async with server: + await server.serve_forever() + if __name__ == '__main__': - pass \ No newline at end of file + + parser = argparse.ArgumentParser() + + parser.add_argument("-p", "--port", type=int, default=9104) + + args = parser.parse_args() + + loop = asyncio.get_event_loop() + + loop.run_until_complete(main(args)) diff --git a/itest/test_cluster.py b/itest/test_cluster.py index 34be4b86..34af260c 100644 --- a/itest/test_cluster.py +++ b/itest/test_cluster.py @@ -95,7 +95,7 @@ DATA = { class TestCluster(TestBase): title = 'Test siridb-cluster' - @default_test_setup(3, time_precision='s') + @default_test_setup(4, time_precision='s') async def run(self): await self.client0.connect() @@ -111,8 +111,8 @@ class TestCluster(TestBase): # alter series /.*/ - `SERIES` tag `OTHER` # ''') - # await self.db.add_replica(self.server1, 0) - # await self.assertIsRunning(self.db, self.client0, timeout=30) + await self.db.add_replica(self.server1, 0) + await self.assertIsRunning(self.db, self.client0, timeout=30) # await asyncio.sleep(35) @@ -124,18 +124,18 @@ class TestCluster(TestBase): # await self.db.add_replica(self.server3, 1) # await self.assertIsRunning(self.db, self.client0, timeout=12) - # await asyncio.sleep(35) + await asyncio.sleep(35) - # await self.db.add_pool(self.server4) - # await self.assertIsRunning(self.db, self.client0, timeout=12) + await self.db.add_pool(self.server2) + await self.assertIsRunning(self.db, self.client0, timeout=12) - # await asyncio.sleep(35) + await asyncio.sleep(35) # await self.db.add_pool(self.server5) # await self.assertIsRunning(self.db, self.client0, timeout=12) - # await self.db.add_replica(self.server1, 0) - # await asyncio.sleep(5) + await self.db.add_replica(self.server3, 1) + await asyncio.sleep(5) # await self.db.add_replica(self.server3, 1) # await asyncio.sleep(5) diff --git a/src/siri/db/db.c b/src/siri/db/db.c index 374fe8b7..0da96dfd 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -478,7 +478,6 @@ static int siridb__from_unpacker( /* for older schemas we keep the default tee_pipe_name=NULL */ if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6) { - LOGC("HERE"); qp_next(unpacker, &qp_obj); /* Skip the tee pipe name */ } diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index a581a557..6b2c2908 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -345,9 +345,9 @@ int insert_init_backend_local( siridb_tasks_inc(siridb->tasks); siridb->insert_tasks++; - if (siridb_tee_is_configured(siridb->tee)) + if (siridb_tee_is_configured(siridb->tee) && (flags & INSERT_FLAG_POOL)) { - siridb_tee_write(siridb->tee, promise); + siridb_tee_write(siridb->tee, pkg); } uv_async_init(siri.loop, handle, INSERT_local_task); @@ -1077,7 +1077,7 @@ static int INSERT_init_local( if (siridb_tee_is_configured(siridb->tee)) { - siridb_tee_write(siridb->tee, promise); + siridb_tee_write(siridb->tee, pkg); } uv_async_init(siri.loop, handle, INSERT_local_task); diff --git a/src/siri/db/tee.c b/src/siri/db/tee.c index 568f2ebc..c10ecc8b 100644 --- a/src/siri/db/tee.c +++ b/src/siri/db/tee.c @@ -22,21 +22,14 @@ static void tee__alloc_buffer( buf->len = TEE__BUF_SZ; } -static inline void tee__release(siridb_tee_t * tee) -{ - sirinet_promise_decref(tee->promise_); - tee->promise_ = NULL; - uv_mutex_unlock(&tee->lock_); -} - static void tee__write_cb(uv_write_t * req, int status) { - sirinet_promise_t * promise = req->data; + sirinet_pkg_t * pkg = req->data; if (status) { log_error("Socket (tee) write error: %s", uv_strerror(status)); } - sirinet_promise_decref(promise); + free(pkg); free(req); } @@ -72,32 +65,41 @@ static void tee__on_data( tee->address); } -static void tee__do_write(siridb_tee_t * tee, sirinet_promise_t * promise) +static void tee__do_write(siridb_tee_t * tee, sirinet_pkg_t * pkg) { - uv_write_t * req = malloc(sizeof(uv_write_t)); - uv_buf_t wrbuf = uv_buf_init( - (char *) promise->pkg, - sizeof(sirinet_pkg_t) + promise->pkg->len); + uv_write_t * req; + uv_buf_t buf; + + pkg = sirinet_pkg_dup(pkg); + if (!pkg) + { + log_error("Failed to create duplicate for tee"); + return; + } + + req = malloc(sizeof(uv_write_t)); + buf = uv_buf_init((char *) pkg, sizeof(sirinet_pkg_t) + pkg->len); if (req) { int rc; - req->data = promise; - sirinet_promise_incref(promise); - rc = uv_write(req, (uv_stream_t *) tee->tcp, &wrbuf, 1, tee__write_cb); + req->data = pkg; + rc = uv_write(req, (uv_stream_t *) tee->tcp, &buf, 1, tee__write_cb); if (rc == 0) { return; /* success */ } - sirinet_promise_decref(promise); + free(req); } + free(pkg); log_error("Cannot write to tee"); return; } static void tee__on_connect(uv_connect_t * req, int status) { - siridb_tee_t * tee = req->data; + uv_tcp_t * tcp = (uv_tcp_t *) req->handle; + siridb_tee_t * tee = tcp->data; if (status == 0) { @@ -108,13 +110,16 @@ static void tee__on_connect(uv_connect_t * req, int status) { /* Failed to start reading the tee connection */ tee->err_code = SIRIDB_TEE_E_READ; - log_error("Failed to open tee `%s` for reading", tee->address); + log_error( + "Failed to open tee `%s:%u` for reading", + tee->address, tee->port); goto fail; } /* success */ - log_info("Connection created to tee: '%s'", tee->address); - tee__do_write(tee, tee->promise_); + tee->tcp = tcp; + log_info( + "Connection created to tee: '%s:%u'", tee->address, tee->port); goto done; } @@ -126,34 +131,33 @@ static void tee__on_connect(uv_connect_t * req, int status) uv_strerror(status)); fail: - uv_close((uv_handle_t *) req->handle, (uv_close_cb) free); - tee->tcp = NULL; + uv_close((uv_handle_t *) tcp, (uv_close_cb) free); done: - tee__release(tee); free(req); + uv_mutex_unlock(&tee->lock_); } void tee__make_connection(siridb_tee_t * tee, const struct sockaddr * dest) { uv_connect_t * req = malloc(sizeof(uv_connect_t)); - tee->tcp = malloc(sizeof(uv_tcp_t)); - if (tee->tcp == NULL || req == NULL) + uv_tcp_t * tcp = malloc(sizeof(uv_tcp_t)); + if (tcp == NULL || req == NULL) { goto fail0; } + tcp->data = tee; - req->data = tee; + log_debug("Trying to connect to tee '%s:%u'...", tee->address, tee->port); + + (void) uv_tcp_init(siri.loop, tcp); + (void) uv_tcp_connect(req, tcp, dest, tee__on_connect); - log_debug("Trying to connect to tee '%s'...", tee->address); - (void) uv_tcp_init(siri.loop, tee->tcp); - (void) uv_tcp_connect(req, tee->tcp, dest, tee__on_connect); return; fail0: free(req); - free(tee->tcp); - tee->tcp = NULL; - tee__release(tee); + free(tcp); + uv_mutex_unlock(&tee->lock_); } static void tee__on_resolved( @@ -169,8 +173,8 @@ static void tee__on_resolved( log_error("Cannot resolve ip address for tee '%s' (error: %s)", tee->address, uv_err_name(status)); - tee__release(tee); - return; + uv_mutex_unlock(&tee->lock_); + goto final; } if (Logger.level == LOGGER_DEBUG) @@ -195,10 +199,11 @@ static void tee__on_resolved( "Resolved ip address '%s' for tee '%s', " "trying to connect...", addr, tee->address); - } tee__make_connection(tee, (const struct sockaddr *) res->ai_addr); +final: + uv_freeaddrinfo(res); } static void tee__resolve_dns(siridb_tee_t * tee, int ai_family) @@ -273,18 +278,27 @@ siridb_tee_t * siridb_tee_new(void) } tee->address = NULL; tee->tcp = NULL; - tee->promise_ = NULL; tee->flags = SIRIDB_TEE_FLAG; tee->err_code = 0; uv_mutex_init(&tee->lock_); return tee; } +void siridb_tee_close(siridb_tee_t * tee) +{ + uv_mutex_lock(&tee->lock_); + if (tee->tcp && !uv_is_closing((uv_handle_t *) tee->tcp)) + { + uv_close((uv_handle_t *) tee->tcp, (uv_close_cb) free); + tee->tcp = NULL; + } + uv_mutex_unlock(&tee->lock_); +} + void siridb_tee_free(siridb_tee_t * tee) { /* must be closed before free can be used */ assert (tee->tcp == NULL); - assert (tee->promise_ == NULL); uv_mutex_destroy(&tee->lock_); free(tee->address); @@ -310,32 +324,22 @@ const char * siridb_tee_str(siridb_tee_t * tee) return "disabled"; } -void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise) +void siridb_tee_write(siridb_tee_t * tee, sirinet_pkg_t * pkg) { - if (!tee->address) - { - /* Tee is not configured */ - return; - } - + assert (tee->address); if (tee->tcp) { - tee__do_write(tee, promise); + tee__do_write(tee, pkg); } else { - uv_mutex_lock(&tee->lock_); + log_debug("No tee connection (yet)..."); - if (!tee->address) + if (uv_mutex_trylock(&tee->lock_)) { - /* Tee is not configured */ - uv_mutex_unlock(&tee->lock_); return; } - assert (tee->promise_ == NULL); - tee->promise_ = promise; - sirinet_promise_incref(promise); tee__connect(tee); } } diff --git a/src/siri/service/client.c b/src/siri/service/client.c index 6e433f7e..b35baa08 100644 --- a/src/siri/service/client.c +++ b/src/siri/service/client.c @@ -717,8 +717,6 @@ static void CLIENT_on_file_database( return; } - LOGC("HERE"); - rc = ( qp_fadd_type(fpacker, QP_ARRAY_OPEN) || qp_fadd_int64(fpacker, SIRIDB_SCHEMA) || qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) || diff --git a/src/siri/siri.c b/src/siri/siri.c index ce112dd2..83b4fc0d 100644 --- a/src/siri/siri.c +++ b/src/siri/siri.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -637,8 +638,7 @@ static void SIRI_walk_close_handlers( } else if (siridb_tee_is_handle(handle)) { - // TODO: close tee handle - assert (0); + siridb_tee_close((siridb_tee_t *) handle->data); } else if (siri_health_is_handle(handle)) {