Added where support for tag series selection
authorJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 29 Jul 2020 13:47:12 +0000 (15:47 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 29 Jul 2020 13:47:12 +0000 (15:47 +0200)
src/imap/imap.c
src/siri/db/listener.c
src/siri/db/queries.c

index dac1eaf646e227320d538efe95f59791d923e1b4..dd47739dba5b7d28be3804c28d2906456850ee2d 100644 (file)
@@ -96,8 +96,6 @@ void imap_free(imap_t * imap, imap_free_cb cb)
     free(imap);
 }
 
-
-
 /*
  * Add data by id to the map.
  *
index 2b97dc3b9a43f1853093024600929e5d42ef389e..01c6213e46a3d0ba96934f28670133aef5e9cede 100644 (file)
@@ -202,6 +202,7 @@ if (IS_MASTER && siridb_is_reindexing(siridb))                              \
 
 static void enter_access_expr(uv_async_t * handle);
 static void enter_alter_group(uv_async_t * handle);
+static void enter_alter_series(uv_async_t * handle);
 static void enter_alter_server(uv_async_t * handle);
 static void enter_alter_servers(uv_async_t * handle);
 static void enter_alter_stmt(uv_async_t * handle);
@@ -443,6 +444,7 @@ void siridb_init_listener(void)
 
     SIRIDB_NODE_ENTER[CLERI_GID_ACCESS_EXPR] = enter_access_expr;
     SIRIDB_NODE_ENTER[CLERI_GID_ALTER_GROUP] = enter_alter_group;
+    SIRIDB_NODE_ENTER[CLERI_GID_ALTER_SERIES] = enter_alter_series;
     SIRIDB_NODE_ENTER[CLERI_GID_ALTER_SERVER] = enter_alter_server;
     SIRIDB_NODE_ENTER[CLERI_GID_ALTER_SERVERS] = enter_alter_servers;
     SIRIDB_NODE_ENTER[CLERI_GID_ALTER_STMT] = enter_alter_stmt;
@@ -595,6 +597,19 @@ static void enter_alter_group(uv_async_t * handle)
     }
 }
 
+static void enter_alter_series(uv_async_t * handle)
+{
+    siridb_query_t * query = handle->data;
+    siridb_t * siridb = query->client->siridb;
+    query_alter_t * q_alter = (query_alter_t *) query->data;
+
+    MASTER_CHECK_ACCESSIBLE(siridb)
+
+    q_alter->alter_tp = QUERY_ALTER_SERIES;
+
+    SIRIPARSER_NEXT_NODE
+}
+
 static void enter_alter_server(uv_async_t * handle)
 {
     siridb_query_t * query = handle->data;
@@ -1591,6 +1606,42 @@ static void enter_series_setopr(uv_async_t * handle)
     SIRIPARSER_NEXT_NODE
 }
 
+typedef struct
+{
+    cexpr_t * where_expr;
+    siridb_tag_t * tag;
+} LISTENER_tag_t;
+
+static int LISTENER_tag__cb(siridb_series_t * series, LISTENER_tag_t * w)
+{
+    int rc = cexpr_run(
+        w->where_expr,
+        (cexpr_cb_t) siridb_series_cexpr_cb,
+        series);
+
+    if (rc == 0 || imap_add(w->tag->series, series->id, series) != 0)
+    {
+        siridb_series_decref(series);
+    }
+    return rc;
+}
+
+static int LISTENER_untag__cb(siridb_series_t * series, LISTENER_tag_t * w)
+{
+    int rc = cexpr_run(
+        w->where_expr,
+        (cexpr_cb_t) siridb_series_cexpr_cb,
+        series);
+
+    if (rc == 1 && imap_pop(w->tag->series, series->id) == series)
+    {
+        siridb_series_decref(series);
+    }
+
+    siridb_series_decref(series);
+    return rc;
+}
+
 static void enter_tag_series(uv_async_t * handle)
 {
 
@@ -1598,20 +1649,16 @@ static void enter_tag_series(uv_async_t * handle)
     siridb_t * siridb = query->client->siridb;
     query_alter_t * q_alter = (query_alter_t *) query->data;
 
-    q_alter->tp = QUERY_ALTER_SERIES;
-
     MASTER_CHECK_ACCESSIBLE(siridb)
     MASTER_CHECK_VERSION(siridb, "2.0.38")
 
     cleri_node_t * tag_node =
                     query->nodes->node->children->next->node;
     siridb_tag_t * tag;
-
     char name[tag_node->len - 1];
     xstr_extract_string(name, tag_node->str, tag_node->len);
 
     tag = ct_get(siridb->tags->tags, name);
-
     if (tag == NULL)
     {
         if (ct_get(siridb->groups->groups, name) != NULL)
@@ -1645,12 +1692,27 @@ static void enter_tag_series(uv_async_t * handle)
         uv_mutex_lock(&siridb->tags->mutex);
     }
 
-    q_alter->n = q_alter->series_map->len;
+    if (q_alter->where_expr == NULL)
+    {
+        q_alter->n = q_alter->series_map->len;
+
+        imap_union_ref(
+                tag->series,
+                q_alter->series_map,
+                (imap_free_cb) &siridb__series_decref);
+    }
+    else
+    {
+        LISTENER_tag_t w = {
+                .where_expr = q_alter->where_expr,
+                .tag = tag,
+        };
+
+        q_alter->n = imap_walk(
+                q_alter->series_map, (imap_cb) LISTENER_tag__cb, &w);
 
-    imap_union_ref(
-            tag->series,
-            q_alter->series_map,
-            (imap_free_cb) &siridb__series_decref);
+        imap_free(q_alter->series_map, NULL);
+    }
 
     siridb_tags_set_require_save(siridb->tags, tag);
 
@@ -1724,12 +1786,28 @@ static void enter_untag_series(uv_async_t * handle)
 
     uv_mutex_lock(&siridb->tags->mutex);
 
-    q_alter->n = q_alter->series_map->len;
+    if (q_alter->where_expr == NULL)
+    {
+        q_alter->n = q_alter->series_map->len;
+
+        imap_difference_ref(
+                tag->series,
+                q_alter->series_map,
+                (imap_free_cb) &siridb__series_decref);
+    }
+    else
+    {
+        LISTENER_tag_t w = {
+                .where_expr = q_alter->where_expr,
+                .tag = tag,
+        };
+
+        q_alter->n = imap_walk(
+                q_alter->series_map, (imap_cb) LISTENER_untag__cb, &w);
+
+        imap_free(q_alter->series_map, NULL);
+    }
 
-    imap_difference_ref(
-            tag->series,
-            q_alter->series_map,
-            (imap_free_cb) &siridb__series_decref);
 
     siridb_tags_set_require_save(siridb->tags, tag);
 
index 7668b2c479f457f01dc3fac6abe771af6137e883..5f8341f84492fe8b91c5d40216e0058623c62593 100644 (file)
@@ -179,6 +179,7 @@ void query_alter_free(uv_handle_t * handle)
     case QUERY_ALTER_NONE:
     case QUERY_ALTER_DATABASE:
     case QUERY_ALTER_SERVERS:
+    case QUERY_ALTER_SERIES:
         break;
     case QUERY_ALTER_GROUP:
         siridb_group_decref(q_alter->via.group);