Fix empty tags synchronization
authorJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 29 Jul 2020 12:26:36 +0000 (14:26 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 29 Jul 2020 12:26:36 +0000 (14:26 +0200)
include/siri/db/tags.h
include/siri/net/protocol.h
itest/test_tags.py
src/siri/db/groups.c
src/siri/db/initsync.c
src/siri/db/reindex.c
src/siri/db/tags.c
src/siri/net/bserver.c
src/siri/net/protocol.c

index 95c5b87911070188b1f6961e416a2674e8a7dbe3..9a24c6c6b12d58ca8a3d9506c4846b7f0d9b1072 100644 (file)
@@ -49,6 +49,7 @@ void siridb_tags_save(siridb_tags_t * tags);
 void siridb_tags_init_nseries(siridb_tags_t * tags);
 sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid);
 sirinet_pkg_t * siridb_tags_series(siridb_series_t * series);
+sirinet_pkg_t * siridb_tags_empty(siridb_tags_t * tags);
 
 
 #define siridb_tags_set_require_save(__tags, __tag) \
index 7efb3374177b3a1cf6eaba906f7bad5c332bcb0c..d3787e84b2c6f3acd137fc1a0bc4fe95128ed9dc 100644 (file)
@@ -81,6 +81,7 @@ typedef enum
     BPROTO_DROP_DATABASE,               /* empty                            */
     BPROTO_REQ_TAGS,                    /* empty                            */
     BPROTO_SERIES_TAGS,                 /* [series name, tag name, ...]     */
+    BPROTO_EMPTY_TAGS,                  /* [tag name, tag name, ...]        */
 } bproto_client_t;
 
 /*
@@ -131,7 +132,8 @@ typedef enum
     BPROTO_ACK_TEE_PIPE_NAME,                   /* empty                    */
     BPROTO_ACK_DROP_DATABASE,                   /* empty                    */
     BPROTO_RES_TAGS,                            /* [[name, series], ...]    */
-    BPROTO_ACK_SERIES_TAGS                      /* empty                    */
+    BPROTO_ACK_SERIES_TAGS,                     /* empty                    */
+    BPROTO_ACK_EMPTY_TAGS,                      /* empty                    */
 } bproto_server_t;
 
 #define sirinet_protocol_is_error(tp) (tp >= 64 && tp < 192)
index 6b871a4394c8baf863b4f9b5aeb7b78d8bbccfe0..f1d86dcc3aca410bbd6ca6ff9e048daa7b5d6aab 100644 (file)
@@ -113,6 +113,12 @@ class TestTags(TestBase):
         self.assertEqual(
             res, {"success_msg": "Successfully tagged 13 series."})
 
+        res = await self.client0.query('''
+            alter series /empty/ tag `EMPTY`
+        ''')
+        self.assertEqual(
+            res, {"success_msg": "Successfully tagged 0 series."})
+
         await asyncio.sleep(3.0)
 
         res = await self.client0.query('''
@@ -164,12 +170,6 @@ class TestTags(TestBase):
         self.assertEqual(
             res, {"success_msg": "Successfully tagged 3 series."})
 
-        res = await self.client0.query('''
-            alter series /empty/ tag `EMPTY`
-        ''')
-        self.assertEqual(
-            res, {"success_msg": "Successfully tagged 0 series."})
-
         await self.client0.query('''
             alter series 'variance', 'pvariance' untag `OTHER`
         ''')
index 5a1a0311f5867d4af1290ad6d47ca63657e22e97..274667c550cff49a92d11e481f98db2d1ce2d60d 100644 (file)
@@ -429,8 +429,14 @@ static void GROUPS_loop(void * arg)
     {
         sleep(GROUPS_LOOP_SLEEP);
 
-        if (siridb_is_reindexing(siridb) && (++mod_test % GROUPS_LOOP_DEEP))
+        if (groups->status == GROUPS_STOPPING)
+            break;
+
+        if ((siridb_is_reindexing(siridb) ||
+                siridb_server_self_synchronizing(siridb->server)) &&
+                (++mod_test % GROUPS_LOOP_DEEP))
         {
+            /* less frequently when re-indexing or synchronizing */
             continue;
         }
 
index d8d5702ec0b93bd6bd9d10ad234721f8be18f468..3d7f3ca48e98f6257a33794bc10efa838e4a7ef3 100644 (file)
@@ -223,6 +223,28 @@ void siridb_initsync_fopen(siridb_initsync_t * initsync, const char * opentype)
     }
 }
 
+/*
+ * Call-back function: sirinet_promise_cb
+ */
+static void INITSYNC_on_empty_tags_response(
+        sirinet_promise_t * promise,
+        sirinet_pkg_t * pkg,
+        int status)
+{
+    if (status)
+    {
+        log_error("Error while sending empty tags (%d)", status);
+    }
+    else if (sirinet_protocol_is_error(pkg->tp))
+    {
+        log_error(
+                "Error occurred while processing data on the new server: "
+                "(response type: %u)", pkg->tp);
+    }
+
+    sirinet_promise_decref(promise);
+}
+
 /*
  * Read the next series id and truncate the synchronization file to remove
  * the last synchronization id.
@@ -271,6 +293,24 @@ static void INITSYNC_next_series_id(siridb_t * siridb)
     }
     else
     {
+        sirinet_pkg_t * pkg;
+
+        /* send empty tags if required */
+        pkg = siridb_tags_empty(siridb->tags);
+        if (pkg)
+        {
+            if (siridb_server_send_pkg(
+                    siridb->replica,
+                    pkg,
+                    INITSYNC_TIMEOUT,
+                    (sirinet_promise_cb) INITSYNC_on_empty_tags_response,
+                    NULL,
+                    0))
+            {
+                free(pkg);
+            }
+        }
+
         log_info("Finished initial replica synchronization");
         INITSYNC_unlink(initsync);
         siridb_initsync_free(&siridb->replicate->initsync);
index 9906ea50fd46a88e1829574d31b8a9a81b9d8099..3370df91f1a41d17b13809104cb36228fa6595a4 100644 (file)
@@ -400,6 +400,28 @@ static int REINDEX_next_series_id(siridb_reindex_t * reindex)
     return rc;
 }
 
+/*
+ * Call-back function: sirinet_promise_cb
+ */
+static void REINDEX_on_empty_tags_response(
+        sirinet_promise_t * promise,
+        sirinet_pkg_t * pkg,
+        int status)
+{
+    if (status)
+    {
+        log_error("Error while sending empty tags (%d)", status);
+    }
+    else if (sirinet_protocol_is_error(pkg->tp))
+    {
+        log_error(
+                "Error occurred while processing data on the new server: "
+                "(response type: %u)", pkg->tp);
+    }
+
+    sirinet_promise_decref(promise);
+}
+
 /*
  * This function can raise a SIGNAL
  */
@@ -416,6 +438,25 @@ static void REINDEX_next(siridb_t * siridb)
         break;
 
     case NEXT_SERIES_END:
+    {
+        sirinet_pkg_t * pkg;
+
+        /* send empty tags if required */
+        pkg = siridb_tags_empty(siridb->tags);
+        if (pkg)
+        {
+            if (siridb_server_send_pkg(
+                    siridb->reindex->server,
+                    pkg,
+                    REINDEX_TIMEOUT,
+                    (sirinet_promise_cb) REINDEX_on_empty_tags_response,
+                    NULL,
+                    0))
+            {
+                free(pkg);
+            }
+        }
+
         /* update and send the flags */
         siridb->server->flags &= ~SERVER_FLAG_REINDEXING;
         siridb_servers_send_flags(siridb->servers);
@@ -431,7 +472,7 @@ static void REINDEX_next(siridb_t * siridb)
 
         siri_optimize_continue();
         break;
-
+    }
     case NEXT_SERIES_ERR:
         break; /* signal is raised */
     }
index 91b1c6a912f22fb345f7c4d3dfcb45dc6d032299..b36fd8c44cc0b073d2700a490d6cedf6c4d61031 100644 (file)
@@ -249,7 +249,7 @@ static int TAGS_series_pkg(siridb_tag_t * tag, TAGS_series_t * w)
 /*
  * Main thread.
  *
- * Returns NULL and raises a signal in case of an error.
+ * Returns NULL in case of an error or no tags.
  */
 sirinet_pkg_t * siridb_tags_series(siridb_series_t * series)
 {
@@ -277,6 +277,42 @@ sirinet_pkg_t * siridb_tags_series(siridb_series_t * series)
     return sirinet_packer2pkg(w.packer, 0, BPROTO_SERIES_TAGS);
 }
 
+/*
+ * Main thread.
+ */
+static int TAGS_empty_tag_pkg(siridb_tag_t * tag, qp_packer_t * packer)
+{
+    return tag->series->len == 0
+            ? qp_add_string(packer, tag->name) == 0
+            : 0;
+}
+
+/*
+ * Main thread.
+ *
+ * Returns NULL in case of an error or no empty tags.
+ */
+sirinet_pkg_t * siridb_tags_empty(siridb_tags_t * tags)
+{
+    qp_packer_t * packer = sirinet_packer_new(1024);
+
+    if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN))
+    {
+        return NULL;
+    }
+
+    if (ct_values(
+            tags->tags,
+            (ct_val_cb) TAGS_empty_tag_pkg,
+            packer) == 0)
+    {
+        free(packer);
+        return NULL;
+    }
+
+    return sirinet_packer2pkg(packer, 0, BPROTO_EMPTY_TAGS);
+}
+
 /*
  * Main thread.
  */
index 0b6fca9504c242a8c1784e6dde2bcfc60330f7b6..1a7744a4da8397641481ae67bd3f03247ec5ff1d 100644 (file)
@@ -65,6 +65,7 @@ static void on_disable_backup_mode(
         sirinet_pkg_t * pkg);
 static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
+static void on_empty_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 
 static uv_loop_t * loop = NULL;
 static struct sockaddr_storage server_addr;
@@ -291,6 +292,9 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg)
     case BPROTO_SERIES_TAGS:
         on_series_tags(client, pkg);
         break;
+    case BPROTO_EMPTY_TAGS:
+        on_empty_tags(client, pkg);
+        break;
     }
 
 }
@@ -940,3 +944,72 @@ static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg)
         sirinet_pkg_send(client, package);
     }
 }
+
+static void on_empty_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg)
+{
+    SERVER_CHECK_AUTHENTICATED(client, server)
+
+    sirinet_pkg_t * package = NULL;
+    siridb_t * siridb = client->siridb;
+
+    LOGC("on empty tags...");
+
+    if (~siridb->server->flags & SERVER_FLAG_RUNNING)
+    {
+        log_error("Cannot tag series because of having status %d",
+                siridb->server->flags);
+
+        package = sirinet_pkg_new(
+                pkg->pid,
+                0,
+                BPROTO_ERR_DROP_SERIES,
+                NULL);
+    }
+    else
+    {
+        qp_unpacker_t unpacker;
+        qp_unpacker_init(&unpacker, pkg->data, pkg->len);
+
+        if (qp_is_array(qp_next(&unpacker, NULL)))
+        {
+            qp_obj_t qp_tag_name;
+
+            uv_mutex_lock(&siridb->tags->mutex);
+
+            while (qp_next(&unpacker, &qp_tag_name) == QP_RAW)
+            {
+                siridb_tag_t * tag = ct_getn(
+                        siridb->tags->tags,
+                        qp_tag_name.via.str,
+                        qp_tag_name.len);
+
+                if (tag == NULL)
+                {
+                    tag = siridb_tags_add_n(
+                            siridb->tags,
+                            qp_tag_name.via.str,
+                            qp_tag_name.len);
+
+                    siridb_tags_set_require_save(siridb->tags, tag);
+                }
+            }
+
+            uv_mutex_unlock(&siridb->tags->mutex);
+
+            package = sirinet_pkg_new(
+                    pkg->pid,
+                    0,
+                    BPROTO_ACK_EMPTY_TAGS,
+                    NULL);
+        }
+        else
+        {
+            log_error("Illegal back-end empty tags package received");
+        }
+    }
+
+    if (package != NULL)
+    {
+        sirinet_pkg_send(client, package);
+    }
+}
index f07d2508f2a51bc1d71d6b6fe46283a251d16a25..fc90cd9847af9faaba1b80a8cf44296b094f28e0 100644 (file)
@@ -88,6 +88,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n)
     case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
     case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS";
     case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS";
+    case BPROTO_EMPTY_TAGS: return "BPROTO_EMPTY_TAGS";
     default:
         sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
         return protocol_str;
@@ -127,6 +128,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n)
     case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE";
     case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS";
     case BPROTO_ACK_SERIES_TAGS: return "BPROTO_ACK_SERIES_TAGS";
+    case BPROTO_ACK_EMPTY_TAGS: return "BPROTO_ACK_EMPTY_TAGS";
     default:
         sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
         return protocol_str;