Work on TCP tee
authorJeroen van der Heijden <jeroen@cesbit.com>
Thu, 21 Apr 2022 10:31:03 +0000 (12:31 +0200)
committerJeroen van der Heijden <jeroen@cesbit.com>
Thu, 21 Apr 2022 10:31:03 +0000 (12:31 +0200)
include/siri/db/tee.h
include/siri/version.h
itest/tee_server.py [changed mode: 0644->0755]
itest/test_cluster.py
src/siri/db/db.c
src/siri/db/insert.c
src/siri/db/tee.c
src/siri/service/client.c
src/siri/siri.c

index cbee3562eb0a5c22c4e4d8562a5b139234f6f594..7a2be870e9e4e86fc27584fa31a60e821ee30d6a 100644 (file)
@@ -24,14 +24,15 @@ enum siridb_tee_e_t
 
 #include <uv.h>
 #include <stdbool.h>
-#include <siri/net/promise.h>
+#include <siri/net/pkg.h>
 
 siridb_tee_t * siridb_tee_new(void);
+void siridb_tee_close(siridb_tee_t * tee);
 int siridb_tee_set_address_port(
         siridb_tee_t * tee,
         const char * address,
         uint16_t port);
-void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise);
+void siridb_tee_write(siridb_tee_t * tee, sirinet_pkg_t * pkg);
 void siridb_tee_free(siridb_tee_t * tee);
 const char * siridb_tee_str(siridb_tee_t * tee);
 
@@ -46,7 +47,6 @@ struct siridb_tee_s
     char * address;
     uv_tcp_t * tcp;
     uv_mutex_t lock_;
-    sirinet_promise_t * promise_;
 };
 
 
index 5c05559cd12695034508787ac85f8d819ae08faf..19a3834248b0726abdfe57835a6cc39580443d8d 100644 (file)
@@ -15,7 +15,7 @@
  * Note that debian alpha packages should use versions like this:
  *   2.0.34-0alpha0
  */
-#define SIRIDB_VERSION_PRE_RELEASE "-alpha-1"
+#define SIRIDB_VERSION_PRE_RELEASE "-alpha-2"
 
 #ifndef NDEBUG
 #define SIRIDB_VERSION_BUILD_RELEASE "+debug"
old mode 100644 (file)
new mode 100755 (executable)
index 11b8ffc..1989b5f
@@ -1,6 +1,40 @@
+#!/usr/bin/env python
+import argparse
 import asyncio
+import qpack
 
 
+class TeeServerProtocol(asyncio.Protocol):
+    def connection_made(self, transport):
+        peername = transport.get_extra_info('peername')
+        print('Connection from {}'.format(peername))
+        self.transport = transport
+
+    def data_received(self, data):
+        print('Data received: {}'.format(len(data)))
+        self.transport.write(b'test')
+
+
+async def main(args):
+    loop = asyncio.get_running_loop()
+
+    server = await loop.create_server(
+        lambda: TeeServerProtocol(),
+        '127.0.0.1',
+        args.port)
+
+    async with server:
+        await server.serve_forever()
+
 
 if __name__ == '__main__':
-    pass
\ No newline at end of file
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument("-p", "--port", type=int, default=9104)
+
+    args = parser.parse_args()
+
+    loop = asyncio.get_event_loop()
+
+    loop.run_until_complete(main(args))
index 34be4b8612a358bc4f876367bffaf1edec2dc386..34af260c6c5c7bc083ff505c693eb1ba5ee4f00b 100644 (file)
@@ -95,7 +95,7 @@ DATA = {
 class TestCluster(TestBase):
     title = 'Test siridb-cluster'
 
-    @default_test_setup(3, time_precision='s')
+    @default_test_setup(4, time_precision='s')
     async def run(self):
         await self.client0.connect()
 
@@ -111,8 +111,8 @@ class TestCluster(TestBase):
         #     alter series /.*/ - `SERIES` tag `OTHER`
         # ''')
 
-        await self.db.add_replica(self.server1, 0)
-        await self.assertIsRunning(self.db, self.client0, timeout=30)
+        await self.db.add_replica(self.server1, 0)
+        await self.assertIsRunning(self.db, self.client0, timeout=30)
 
         # await asyncio.sleep(35)
 
@@ -124,18 +124,18 @@ class TestCluster(TestBase):
         # await self.db.add_replica(self.server3, 1)
         # await self.assertIsRunning(self.db, self.client0, timeout=12)
 
-        await asyncio.sleep(35)
+        await asyncio.sleep(35)
 
-        # await self.db.add_pool(self.server4)
-        await self.assertIsRunning(self.db, self.client0, timeout=12)
+        await self.db.add_pool(self.server2)
+        await self.assertIsRunning(self.db, self.client0, timeout=12)
 
-        await asyncio.sleep(35)
+        await asyncio.sleep(35)
 
         # await self.db.add_pool(self.server5)
         # await self.assertIsRunning(self.db, self.client0, timeout=12)
 
-        # await self.db.add_replica(self.server1, 0)
-        await asyncio.sleep(5)
+        await self.db.add_replica(self.server3, 1)
+        await asyncio.sleep(5)
 
         # await self.db.add_replica(self.server3, 1)
         # await asyncio.sleep(5)
index 374fe8b763ef5f8142a42e1903ff635cd8a7f8f0..0da96dfd074dec69fb2ca948266e815c865e88c1 100644 (file)
@@ -478,7 +478,6 @@ static int siridb__from_unpacker(
     /* for older schemas we keep the default tee_pipe_name=NULL */
     if (qp_schema.via.int64 >= 5 && qp_schema.via.int64 <=6)
     {
-        LOGC("HERE");
         qp_next(unpacker, &qp_obj);
         /* Skip the tee pipe name */
     }
index a581a5571aa7032d48ced5f86e61794b20aea2f6..6b2c290814c109e1072024539fed6cc4e13240ff 100644 (file)
@@ -345,9 +345,9 @@ int insert_init_backend_local(
     siridb_tasks_inc(siridb->tasks);
     siridb->insert_tasks++;
 
-    if (siridb_tee_is_configured(siridb->tee))
+    if (siridb_tee_is_configured(siridb->tee) && (flags & INSERT_FLAG_POOL))
     {
-        siridb_tee_write(siridb->tee, promise);
+        siridb_tee_write(siridb->tee, pkg);
     }
 
     uv_async_init(siri.loop, handle, INSERT_local_task);
@@ -1077,7 +1077,7 @@ static int INSERT_init_local(
 
     if (siridb_tee_is_configured(siridb->tee))
     {
-        siridb_tee_write(siridb->tee, promise);
+        siridb_tee_write(siridb->tee, pkg);
     }
 
     uv_async_init(siri.loop, handle, INSERT_local_task);
index 568f2ebccd738913a04f85fee4c7d19173ab9146..c10ecc8b552c5966c8592ef39db0e9a2d7ec4a82 100644 (file)
@@ -22,21 +22,14 @@ static void tee__alloc_buffer(
     buf->len = TEE__BUF_SZ;
 }
 
-static inline void tee__release(siridb_tee_t * tee)
-{
-    sirinet_promise_decref(tee->promise_);
-    tee->promise_ = NULL;
-    uv_mutex_unlock(&tee->lock_);
-}
-
 static void tee__write_cb(uv_write_t * req, int status)
 {
-    sirinet_promise_t * promise = req->data;
+    sirinet_pkg_t * pkg = req->data;
     if (status)
     {
         log_error("Socket (tee) write error: %s", uv_strerror(status));
     }
-    sirinet_promise_decref(promise);
+    free(pkg);
     free(req);
 }
 
@@ -72,32 +65,41 @@ static void tee__on_data(
             tee->address);
 }
 
-static void tee__do_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+static void tee__do_write(siridb_tee_t * tee, sirinet_pkg_t * pkg)
 {
-    uv_write_t * req = malloc(sizeof(uv_write_t));
-    uv_buf_t wrbuf = uv_buf_init(
-            (char *) promise->pkg,
-            sizeof(sirinet_pkg_t) + promise->pkg->len);
+    uv_write_t * req;
+    uv_buf_t buf;
+
+    pkg = sirinet_pkg_dup(pkg);
+    if (!pkg)
+    {
+        log_error("Failed to create duplicate for tee");
+        return;
+    }
+
+    req = malloc(sizeof(uv_write_t));
+    buf = uv_buf_init((char *) pkg, sizeof(sirinet_pkg_t) + pkg->len);
 
     if (req)
     {
         int rc;
-        req->data = promise;
-        sirinet_promise_incref(promise);
-        rc = uv_write(req, (uv_stream_t *) tee->tcp, &wrbuf, 1, tee__write_cb);
+        req->data = pkg;
+        rc = uv_write(req, (uv_stream_t *) tee->tcp, &buf, 1, tee__write_cb);
         if (rc == 0)
         {
             return;  /* success */
         }
-        sirinet_promise_decref(promise);
+        free(req);
     }
+    free(pkg);
     log_error("Cannot write to tee");
     return;
 }
 
 static void tee__on_connect(uv_connect_t * req, int status)
 {
-    siridb_tee_t * tee = req->data;
+    uv_tcp_t * tcp = (uv_tcp_t *) req->handle;
+    siridb_tee_t * tee = tcp->data;
 
     if (status == 0)
     {
@@ -108,13 +110,16 @@ static void tee__on_connect(uv_connect_t * req, int status)
         {
             /* Failed to start reading the tee connection */
             tee->err_code = SIRIDB_TEE_E_READ;
-            log_error("Failed to open tee `%s` for reading", tee->address);
+            log_error(
+                    "Failed to open tee `%s:%u` for reading",
+                    tee->address, tee->port);
             goto fail;
         }
 
         /* success */
-        log_info("Connection created to tee: '%s'", tee->address);
-        tee__do_write(tee, tee->promise_);
+        tee->tcp = tcp;
+        log_info(
+                "Connection created to tee: '%s:%u'", tee->address, tee->port);
         goto done;
     }
 
@@ -126,34 +131,33 @@ static void tee__on_connect(uv_connect_t * req, int status)
             uv_strerror(status));
 
 fail:
-    uv_close((uv_handle_t *) req->handle, (uv_close_cb) free);
-    tee->tcp = NULL;
+    uv_close((uv_handle_t *) tcp, (uv_close_cb) free);
 done:
-    tee__release(tee);
     free(req);
+    uv_mutex_unlock(&tee->lock_);
 }
 
 void tee__make_connection(siridb_tee_t * tee, const struct sockaddr * dest)
 {
     uv_connect_t * req = malloc(sizeof(uv_connect_t));
-    tee->tcp = malloc(sizeof(uv_tcp_t));
-    if (tee->tcp == NULL || req == NULL)
+    uv_tcp_t * tcp = malloc(sizeof(uv_tcp_t));
+    if (tcp == NULL || req == NULL)
     {
         goto fail0;
     }
+    tcp->data = tee;
 
-    req->data = tee;
+    log_debug("Trying to connect to tee '%s:%u'...", tee->address, tee->port);
+
+    (void) uv_tcp_init(siri.loop, tcp);
+    (void) uv_tcp_connect(req, tcp, dest, tee__on_connect);
 
-    log_debug("Trying to connect to tee '%s'...", tee->address);
-    (void) uv_tcp_init(siri.loop, tee->tcp);
-    (void) uv_tcp_connect(req, tee->tcp, dest, tee__on_connect);
     return;
 
 fail0:
     free(req);
-    free(tee->tcp);
-    tee->tcp = NULL;
-    tee__release(tee);
+    free(tcp);
+    uv_mutex_unlock(&tee->lock_);
 }
 
 static void tee__on_resolved(
@@ -169,8 +173,8 @@ static void tee__on_resolved(
         log_error("Cannot resolve ip address for tee '%s' (error: %s)",
                 tee->address,
                 uv_err_name(status));
-        tee__release(tee);
-        return;
+        uv_mutex_unlock(&tee->lock_);
+        goto final;
     }
 
     if (Logger.level == LOGGER_DEBUG)
@@ -195,10 +199,11 @@ static void tee__on_resolved(
                 "Resolved ip address '%s' for tee '%s', "
                 "trying to connect...",
                 addr, tee->address);
-
     }
 
     tee__make_connection(tee, (const struct sockaddr *) res->ai_addr);
+final:
+    uv_freeaddrinfo(res);
 }
 
 static void tee__resolve_dns(siridb_tee_t * tee, int ai_family)
@@ -273,18 +278,27 @@ siridb_tee_t * siridb_tee_new(void)
     }
     tee->address = NULL;
     tee->tcp = NULL;
-    tee->promise_ = NULL;
     tee->flags = SIRIDB_TEE_FLAG;
     tee->err_code = 0;
     uv_mutex_init(&tee->lock_);
     return tee;
 }
 
+void siridb_tee_close(siridb_tee_t * tee)
+{
+    uv_mutex_lock(&tee->lock_);
+    if (tee->tcp && !uv_is_closing((uv_handle_t *) tee->tcp))
+    {
+        uv_close((uv_handle_t *) tee->tcp, (uv_close_cb) free);
+        tee->tcp = NULL;
+    }
+    uv_mutex_unlock(&tee->lock_);
+}
+
 void siridb_tee_free(siridb_tee_t * tee)
 {
     /* must be closed before free can be used */
     assert (tee->tcp == NULL);
-    assert (tee->promise_ == NULL);
 
     uv_mutex_destroy(&tee->lock_);
     free(tee->address);
@@ -310,32 +324,22 @@ const char * siridb_tee_str(siridb_tee_t * tee)
     return "disabled";
 }
 
-void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise)
+void siridb_tee_write(siridb_tee_t * tee, sirinet_pkg_t * pkg)
 {
-    if (!tee->address)
-    {
-        /* Tee is not configured */
-        return;
-    }
-
+    assert (tee->address);
     if (tee->tcp)
     {
-        tee__do_write(tee, promise);
+        tee__do_write(tee, pkg);
     }
     else
     {
-        uv_mutex_lock(&tee->lock_);
+        log_debug("No tee connection (yet)...");
 
-        if (!tee->address)
+        if (uv_mutex_trylock(&tee->lock_))
         {
-            /* Tee is not configured */
-            uv_mutex_unlock(&tee->lock_);
             return;
         }
 
-        assert (tee->promise_ == NULL);
-        tee->promise_ = promise;
-        sirinet_promise_incref(promise);
         tee__connect(tee);
     }
 }
index 6e433f7e4a47e446be2e6450f88d8ae5a244c885..b35baa089c883dd26bd833d89b882e34dd92e615 100644 (file)
@@ -717,8 +717,6 @@ static void CLIENT_on_file_database(
         return;
     }
 
-    LOGC("HERE");
-
     rc = (  qp_fadd_type(fpacker, QP_ARRAY_OPEN) ||
             qp_fadd_int64(fpacker, SIRIDB_SCHEMA) ||
             qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) ||
index ce112dd2abf38d5726a635b9437e95db449c8cdd..83b4fc0d9f096e46782d369f2f73ff28a3fedf0b 100644 (file)
@@ -29,6 +29,7 @@
 #include <siri/db/series.h>
 #include <siri/db/server.h>
 #include <siri/db/servers.h>
+#include <siri/db/tee.h>
 #include <siri/db/users.h>
 #include <siri/api.h>
 #include <siri/err.h>
@@ -637,8 +638,7 @@ static void SIRI_walk_close_handlers(
             }
             else if (siridb_tee_is_handle(handle))
             {
-                // TODO: close tee handle
-                assert (0);
+                siridb_tee_close((siridb_tee_t *) handle->data);
             }
             else if (siri_health_is_handle(handle))
             {