Added services requests
authorJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 20 Jan 2020 12:57:41 +0000 (13:57 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 20 Jan 2020 12:57:41 +0000 (13:57 +0100)
include/siri/api.h
include/siri/service/account.h
include/siri/service/request.h
itest/run_all.py
itest/test_http_api.py
src/siri/api.c
src/siri/net/clserver.c
src/siri/service/account.c
src/siri/service/request.c

index 24857a9996557a3edfb94e256d5d186ec5116cf7..cad41e58ca1a1af03304d7d0dc0b4da9a4fddc09 100644 (file)
@@ -7,6 +7,7 @@
 #include <lib/http_parser.h>
 #include <uv.h>
 #include <siri/db/db.h>
+#include <siri/service/request.h>
 
 typedef enum
 {
@@ -62,6 +63,7 @@ struct siri_api_request_s
     siri_api_content_t content_type;
     siri_api_req_t request_type;
     service_request_t service_type;
+    _Bool service_authenticated;
     http_parser parser;
     uv_write_t req;
 };
index 1de94bf6cbc524a8380028bbadfa6a48ff46fd07..8467cb65d836957afe149bca9b3f1216a64ee2e0 100644 (file)
@@ -27,6 +27,10 @@ int siri_service_account_change_password(
         qp_obj_t * qp_account,
         qp_obj_t * qp_password,
         char * err_msg);
+_Bool siri_service_account_check_basic(
+        siri_t * siri,
+        const char * data,
+        size_t n);
 int siri_service_account_drop(
         siri_t * siri,
         qp_obj_t * qp_account,
index cb58fad77bb2a10f847789981cad87f0386b50a4..825bf504c2a4cbaeccc6653467746528ce7cf2f3 100644 (file)
@@ -26,7 +26,6 @@ void siri_service_request_destroy(void);
 cproto_server_t siri_service_request(
         int tp,
         qp_unpacker_t * qp_unpacker,
-        qp_obj_t * qp_account,
         qp_packer_t ** packaddr,
         uint16_t pid,
         sirinet_stream_t * client,
index 7c67b2000c8f0758cb13e993a38b38afc2f6bda4..dccabad1bc036a4659fe5ab32a3ec2e48082c507 100644 (file)
@@ -2,41 +2,43 @@
 from testing import run_test
 from testing import Server
 from testing import parse_args
+from test_buffer import TestBuffer
 from test_cluster import TestCluster
+from test_compression import TestCompression
+from test_create_database import TestCreateDatabase
 from test_group import TestGroup
-from test_list import TestList
+from test_http_api import TestHTTPAPI
 from test_insert import TestInsert
+from test_list import TestList
+from test_log import TestLog
+from test_log import TestLog
+from test_parentheses import TestParenth
+from test_pipe_support import TestPipeSupport
 from test_pool import TestPool
 from test_select import TestSelect
 from test_select_ns import TestSelectNano
 from test_series import TestSeries
 from test_server import TestServer
-from test_user import TestUser
-from test_compression import TestCompression
-from test_log import TestLog
-from test_log import TestLog
-from test_pipe_support import TestPipeSupport
-from test_buffer import TestBuffer
 from test_tee import TestTee
-from test_parentheses import TestParenth
-from test_create_database import TestCreateDatabase
+from test_user import TestUser
 
 
 if __name__ == '__main__':
     parse_args()
+    run_test(TestBuffer())
     run_test(TestCompression())
+    run_test(TestCreateDatabase())
     run_test(TestGroup())
-    run_test(TestList())
+    run_test(TestHTTPAPI())
     run_test(TestInsert())
+    run_test(TestList())
+    run_test(TestLog())
+    run_test(TestParenth())
+    run_test(TestPipeSupport())
     run_test(TestPool())
     run_test(TestSelect())
     run_test(TestSelectNano())
     run_test(TestSeries())
     run_test(TestServer())
-    run_test(TestUser())
-    run_test(TestLog())
-    run_test(TestPipeSupport())
-    run_test(TestBuffer())
     run_test(TestTee())
-    run_test(TestParenth())
-    run_test(TestCreateDatabase())
+    run_test(TestUser())
index 7f8f62b4958b5cc05e49e158068944a3ee271ac2..5967a0b7b6b958b04af04b721f80dbe2b19bf1a0 100644 (file)
 import requests
 import json
 from testing import gen_points
+import asyncio
+import functools
+import random
+import time
+import math
+import re
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
 
 TIME_PRECISION = 's'
 
-data = {
-    'q': 'select * from "aggr"',
-}
-
-x = requests.post(
-    f'http://localhost:9020/query/dbtest',
-    data=json.dumps(data),
-    auth=('iris', 'siri'),
-    headers={'Content-Type': 'application/json'}
-)
-
-print(x.status_code)
-
-if x.status_code == 200:
-    print(x.json())
-else:
-    print(x.text)
-
-series_float = gen_points(
-    tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
-
-series_int = gen_points(
-    tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
-
-data = {
-    'my_float': series_float,
-    'my_int': series_int
-}
-
-x = requests.post(
-    f'http://localhost:9020/insert/dbtest',
-    data=json.dumps(data),
-    auth=('iris', 'siri'),
-    headers={'Content-Type': 'application/json'}
-)
-
-print(x.status_code)
-if x.status_code == 200:
-    print(x.json())
-else:
-    print(x.text)
+
+class TestHTTPAPI(TestBase):
+    title = 'Test HTTP API requests'
+
+    @default_test_setup(3, time_precision=TIME_PRECISION)
+    async def run(self):
+        await self.client0.connect()
+
+        series_float = gen_points(
+            tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
+
+        series_int = gen_points(
+            tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
+
+        data = {
+            'my_float': series_float,
+            'my_int': series_int
+        }
+
+        x = requests.post(
+            f'http://localhost:9020/insert/dbtest',
+            data=json.dumps(data),
+            auth=('iris', 'siri'),
+            headers={'Content-Type': 'application/json'}
+        )
+
+        self.assertEqual(x.status_code, 200)
+        self.assertDictEqual(x.json(), {
+            'success_msg': 'Successfully inserted 20000 point(s).'})
+
+        data = {
+            'dbname': 'dbtest',
+            'host': 'localhost',
+            'port': 9000,
+            'username': 'iris',
+            'password': 'siri'
+        }
+
+        x = requests.post(
+            f'http://localhost:9021/new-pool',
+            data=json.dumps(data),
+            auth=('sa', 'siri'),
+            headers={'Content-Type': 'application/json'})
+
+        self.assertEqual(x.status_code, 200)
+        self.assertEqual(x.json(), 'OK')
+
+        self.db.servers.append(self.server1)
+        await self.assertIsRunning(self.db, self.client0, timeout=30)
+
+        data = {'data': [[1579521271, 10], [1579521573, 20]]}
+        x = requests.post(
+            f'http://localhost:9020/insert/dbtest',
+            json=data,
+            auth=('iris', 'siri'))
+
+        self.assertEqual(x.status_code, 200)
+        self.assertDictEqual(x.json(), {
+            'success_msg': 'Successfully inserted 2 point(s).'})
+
+        x = requests.post(
+            f'http://localhost:9020/query/dbtest',
+            json={'q': 'select * from "data"'},
+            auth=('iris', 'siri'))
+
+        self.assertEqual(x.status_code, 200)
+        self.assertEqual(x.json(), data)
+
+        x = requests.post(
+            f'http://localhost:9020/query/dbtest',
+            json={'q': 'select * from "data"', 't': 'ms'},
+            auth=('iris', 'siri'))
+
+        data = {
+            'data': [[p[0] * 1000, p[1]] for p in data['data']]
+        }
+
+        self.assertEqual(x.status_code, 200)
+        self.assertEqual(x.json(), data)
+
+        x = requests.post(
+            f'http://localhost:9021/new-account',
+            json={'account': 't', 'password': ''},
+            auth=('sa', 'siri'))
+
+        self.assertEqual(x.status_code, 400)
+        self.assertEqual(x.json(), {'error_msg':
+                'service account name should have at least 2 characters'})
+
+        x = requests.post(
+            f'http://localhost:9021/new-account',
+            json={'account': 'tt', 'password': 'pass'},
+            auth=('sa', 'siri'))
+
+        self.assertEqual(x.status_code, 200)
+
+        data = {
+            'dbname': 'dbtest',
+            'host': 'localhost',
+            'port': 1234,
+            'pool': 0,
+            'username': 'iris',
+            'password': 'siri'
+        }
+
+        auth = ('tt', 'pass')
+        x = requests.post(
+            f'http://localhost:9021/new-replica', json=data, auth=auth)
+
+        self.assertEqual(x.status_code, 400)
+        self.assertEqual(x.json(), {
+            'error_msg': "database name already exists: 'dbtest'"})
+
+        x = requests.post(
+            f'http://localhost:9022/new-replica', json=data, auth=auth)
+        self.assertEqual(x.status_code, 401)
+
+        auth = ('sa', 'siri')
+        x = requests.post(
+            f'http://localhost:9022/new-replica', json=data, auth=auth)
+
+        self.assertEqual(x.status_code, 400)
+        self.assertEqual(x.json(), {
+            'error_msg':
+                "connecting to server 'localhost:1234' failed with error: "
+                "connection refused"})
+
+        data['port'] = 9000
+        x = requests.post(
+            f'http://localhost:9022/new-replica', json=data, auth=auth)
+        self.assertEqual(x.status_code, 200)
+        self.assertEqual(x.json(), 'OK')
+
+        self.db.servers.append(self.server2)
+        await self.assertIsRunning(self.db, self.client0, timeout=30)
+
+        x = requests.get(
+            f'http://localhost:9022/get-databases', auth=auth)
+        self.assertEqual(x.status_code, 200)
+        self.assertEqual(x.json(), ['dbtest'])
+
+        self.client0.close()
+
+
+if __name__ == '__main__':
+    parse_args()
+    run_test(TestHTTPAPI())
+
index 45f8c2e8015c3d67386af368d89d8f0b7ded39e3..56315187ebd9fedb3e0d888dd5489d3bc39039b5 100644 (file)
@@ -10,6 +10,7 @@
 #include <qpjson/qpjson.h>
 #include <siri/db/query.h>
 #include <siri/db/insert.h>
+#include <siri/service/account.h>
 
 #define API__HEADER_MAX_SZ 256
 
@@ -347,6 +348,12 @@ static int api__on_authorization(siri_api_request_t * ar, const char * at, size_
 
     if (api__istarts_with(&at, &n, "basic ", strlen("basic ")))
     {
+        if (ar->request_type == SIRI_APT_RT_SERVICE)
+        {
+            ar->service_authenticated = siri_service_account_check_basic(
+                    &siri, at, n);
+            return 0;
+        }
         siridb_user_t * user;
         user = ar->siridb
                 ? siridb_users_get_user_from_basic(ar->siridb, at, n)
@@ -684,10 +691,14 @@ static int api__query_cb(http_parser * parser)
 
 static int api__service_cb(http_parser * parser)
 {
-    api__query_t q;
+    qp_unpacker_t up;
+    cproto_server_t res;
     siri_api_request_t * ar = parser->data;
+    qp_packer_t * packer = NULL;
+    sirinet_pkg_t * package;
+    char err_msg[SIRI_MAX_SIZE_ERR_MSG];
 
-    switch (ar->service_type)
+    switch ((service_request_t) ar->service_type)
     {
     case SERVICE_NEW_ACCOUNT:
     case SERVICE_CHANGE_PASSWORD:
@@ -708,7 +719,56 @@ static int api__service_cb(http_parser * parser)
         break;
     }
 
+    if (!ar->service_authenticated)
+        return api__plain_response(ar, E401_UNAUTHORIZED);
+
+    switch (ar->content_type)
+    {
+    case SIRI_API_CT_TEXT:
+        return api__plain_response(ar, E415_UNSUPPORTED_MEDIA_TYPE);
+    case SIRI_API_CT_JSON:
+    {
+        char * dst;
+        size_t dst_n;
+        if (qpjson_json_to_qp(ar->buf, ar->len, &dst, &dst_n))
+            return api__plain_response(ar, E400_BAD_REQUEST);
+        free(ar->buf);
+        ar->buf = dst;
+        ar->len = dst_n;
+        break;
+    }
+    case SIRI_API_CT_QPACK:
+        break;
+    }
 
+    qp_unpacker_init(&up, (unsigned char *) ar->buf, ar->len);
+
+    res = siri_service_request(
+            ar->service_type,
+            &up,
+            &packer,
+            0,
+            (sirinet_stream_t *) ar,
+            err_msg);
+
+    if (res == CPROTO_DEFERRED)
+    {
+        sirinet_stream_incref(ar);
+    }
+
+    package =
+            (res == CPROTO_DEFERRED) ? NULL :
+            (res == CPROTO_ERR_SERVICE) ? sirinet_pkg_err(
+                    0,
+                    strlen(err_msg),
+                    res,
+                    err_msg) :
+            (res == CPROTO_ACK_SERVICE_DATA) ? sirinet_packer2pkg(
+                    packer,
+                    0,
+                    res) : sirinet_pkg_new(0, 0, res, NULL);
+
+    return package ? sirinet_pkg_send((sirinet_stream_t *) ar, package) : 0;
 }
 
 static int api__message_complete_cb(http_parser * parser)
@@ -730,18 +790,6 @@ static int api__message_complete_cb(http_parser * parser)
     return api__plain_response(ar, E500_INTERNAL_SERVER_ERROR);
 }
 
-static int api__chunk_header_cb(http_parser * parser)
-{
-    LOGC("Chunk header\n Content-Length: %zu", parser->content_length);
-    return 0;
-}
-
-static int api__chunk_complete_cb(http_parser * parser)
-{
-    LOGC("Chunk complete\n Content-Length: %zu", parser->content_length);
-    return 0;
-}
-
 static void api__write_free_cb(uv_write_t * req, int status)
 {
     free(req->data);
@@ -787,8 +835,6 @@ int siri_api_init(void)
     api__settings.on_header_value = api__header_value_cb;
     api__settings.on_message_complete = api__message_complete_cb;
     api__settings.on_body = api__body_cb;
-    api__settings.on_chunk_header = api__chunk_header_cb;
-    api__settings.on_chunk_complete = api__chunk_complete_cb;
     api__settings.on_headers_complete = api__headers_complete_cb;
 
     if (
@@ -869,6 +915,16 @@ int siri_api_send(
         size_t n)
 {
     unsigned char * data;
+    if (n == 0)
+    {
+        if (ht != E200_OK)
+        {
+            return api__plain_response(ar, ht);
+        }
+        src = (unsigned char *) "\x82OK";
+        n = 3;
+    }
+
     if (ar->content_type == SIRI_API_CT_JSON)
     {
         size_t tmp_sz;
index 21c370923e3a61e0a67d4c974724c3b33c3b049c..c8324efdff70aa1575f6418216b42e94c5e7db49 100644 (file)
@@ -847,7 +847,6 @@ static void on_req_service(sirinet_stream_t * client, sirinet_pkg_t * pkg)
             siri_service_request(
                     qp_request.via.int64,
                     &unpacker,
-                    &qp_username,
                     &packer,
                     pkg->pid,
                     client,
index 1f7af0304ee30f5b1d765e5624d4e8afdc0cbbd9..b0a43a5121d735691c908f08ff69c9c1facfb8ad 100644 (file)
@@ -8,6 +8,7 @@
 #include <siri/siri.h>
 #include <stdarg.h>
 #include <logger/logger.h>
+#include <base64/base64.h>
 
 #define SIRI_SERVICE_ACCOUNT_SCHEMA 1
 #define FILENAME ".accounts.dat"
@@ -263,7 +264,7 @@ int siri_service_account_change_password(
         return -1;
     }
 
-    password= strndup(
+    password = strndup(
             (const char *) qp_password->via.raw, qp_password->len);
 
     if (password == NULL)
@@ -294,6 +295,49 @@ int siri_service_account_change_password(
     return 0;
 }
 
+static int ACCOUNT_cmp_str(siri_service_account_t * account, char * str)
+{
+    return strcmp(account->account, str) == 0;
+}
+
+
+_Bool siri_service_account_check_basic(
+        siri_t * siri,
+        const char * data,
+        size_t n)
+{
+    siri_service_account_t * account;
+    size_t size, nn, end;
+    char * b64 = base64_decode(data, n, &size);
+    _Bool is_valid = false;
+    char pw[OWCRYPT_SZ];
+
+    for (nn = 0, end = size; nn < end; ++nn)
+    {
+        if (b64[nn] == ':')
+        {
+            b64[nn] = '\0';
+
+            if (++nn > end)
+                break;
+
+            account = (siri_service_account_t *) llist_get(
+                    siri->accounts,
+                    (llist_cb) ACCOUNT_cmp_str,
+                    b64);
+            if (account)
+            {
+                owcrypt(b64 + nn, account->password, pw);
+                is_valid = strcmp(pw, account->password) == 0;
+            }
+            break;
+        }
+    }
+
+    free(b64);
+    return is_valid;
+}
+
 /*
  * Returns 0 if dropped or -1 in case the account was not found.
  *
index 421e29876e37188452c0e0fc84bdbf11c52ed0ee..e73f12ed76bbcaec086f95ffa895b5525c1cd48b 100644 (file)
                 "invalid database name: '%.*s'",                            \
                 (int) qp_dbname.len,                                        \
                 qp_dbname.via.raw);                                         \
-        return CPROTO_ERR_SERVICE;                                            \
+        return CPROTO_ERR_SERVICE;                                          \
     }                                                                       \
                                                                             \
     if (llist_get(                                                          \
             siri.siridb_list,                                               \
-            (llist_cb) SERVICE_find_database,                                 \
+            (llist_cb) SERVICE_find_database,                               \
             &qp_dbname) != NULL)                                            \
     {                                                                       \
         snprintf(                                                           \
@@ -74,7 +74,7 @@
                 "database name already exists: '%.*s'",                     \
                 (int) qp_dbname.len,                                        \
                 qp_dbname.via.raw);                                         \
-        return CPROTO_ERR_SERVICE;                                            \
+        return CPROTO_ERR_SERVICE;                                          \
     }                                                                       \
                                                                             \
     dbpath_len = strlen(siri.cfg->default_db_path) + qp_dbname.len + 2;     \
@@ -92,7 +92,7 @@
                 SIRI_MAX_SIZE_ERR_MSG,                                      \
                 "database directory already exists: %s",                    \
                 dbpath);                                                    \
-        return CPROTO_ERR_SERVICE;                                            \
+        return CPROTO_ERR_SERVICE;                                          \
     }                                                                       \
                                                                             \
     if (mkdir(dbpath, 0700) == -1)                                          \
                 SIRI_MAX_SIZE_ERR_MSG,                                      \
                 "cannot create directory: %s",                              \
                 dbpath);                                                    \
-        return CPROTO_ERR_SERVICE;                                            \
+        return CPROTO_ERR_SERVICE;                                          \
     }                                                                       \
                                                                             \
     char dbfn[dbpath_len + max_filename_sz];                                \
     fp = fopen(dbfn, "w");                                                  \
     if (fp == NULL)                                                         \
     {                                                                       \
-        siri_service_request_rollback(dbpath);                                \
+        siri_service_request_rollback(dbpath);                              \
         snprintf(                                                           \
                 err_msg,                                                    \
                 SIRI_MAX_SIZE_ERR_MSG,                                      \
                 "cannot open file for writing: %s",                         \
                 dbfn);                                                      \
-        return CPROTO_ERR_SERVICE;                                            \
+        return CPROTO_ERR_SERVICE;                                          \
     }                                                                       \
                                                                             \
     rc = fputs(DEFAULT_CONF, fp);                                           \
                                                                             \
     if (fclose(fp) || rc < 0)                                               \
     {                                                                       \
-        siri_service_request_rollback(dbpath);                                \
+        siri_service_request_rollback(dbpath);                              \
         snprintf(                                                           \
                 err_msg,                                                    \
                 SIRI_MAX_SIZE_ERR_MSG,                                      \
                 "cannot write file: %s",                                    \
                 dbfn);                                                      \
-        return CPROTO_ERR_SERVICE;                                            \
+        return CPROTO_ERR_SERVICE;                                          \
     }
 
 static cproto_server_t SERVICE_on_new_account(
@@ -141,7 +141,6 @@ static cproto_server_t SERVICE_on_change_password(
         char * err_msg);
 static cproto_server_t SERVICE_on_drop_account(
         qp_unpacker_t * qp_unpacker,
-        qp_obj_t * qp_account,
         char * err_msg);
 static cproto_server_t SERVICE_on_drop_database(
         qp_unpacker_t * qp_unpacker,
@@ -238,7 +237,6 @@ void siri_service_request_destroy(void)
 cproto_server_t siri_service_request(
         int tp,
         qp_unpacker_t * qp_unpacker,
-        qp_obj_t * qp_account,
         qp_packer_t ** packaddr,
         uint16_t pid,
         sirinet_stream_t * client,
@@ -251,7 +249,7 @@ cproto_server_t siri_service_request(
     case SERVICE_CHANGE_PASSWORD:
         return SERVICE_on_change_password(qp_unpacker, err_msg);
     case SERVICE_DROP_ACCOUNT:
-        return SERVICE_on_drop_account(qp_unpacker, qp_account, err_msg);
+        return SERVICE_on_drop_account(qp_unpacker, err_msg);
     case SERVICE_NEW_DATABASE:
         return SERVICE_on_new_database(qp_unpacker, err_msg);
     case SERVICE_NEW_POOL:
@@ -397,7 +395,6 @@ static cproto_server_t SERVICE_on_change_password(
  */
 static cproto_server_t SERVICE_on_drop_account(
         qp_unpacker_t * qp_unpacker,
-        qp_obj_t * qp_account,
         char * err_msg)
 {
     qp_obj_t qp_key, qp_target;
@@ -427,13 +424,11 @@ static cproto_server_t SERVICE_on_drop_account(
         return CPROTO_ERR_SERVICE_INVALID_REQUEST;
     }
 
-    if (qp_target.len == qp_account->len &&
-        strncmp(
-            (const char *) qp_target.via.raw,
-            (const char *) qp_account->via.raw,
-            qp_target.len) == 0)
+    if (siri.accounts->len == 1)
     {
-        sprintf(err_msg, "cannot drop your own account");
+        sprintf(err_msg,
+                "at least one service account is required, "
+                "cannot drop the last service account");
         return CPROTO_ERR_SERVICE;
     }
 
@@ -511,9 +506,9 @@ static cproto_server_t SERVICE_on_drop_database(
     if (!ignore_offline && !siridb_servers_online(siridb))
     {
         sprintf(err_msg,
-                "at least one server is offline, "
+                "at least one server is off-line, "
                 "set `ignore_offline` to true if you want to "
-                "ignore offline servers");
+                "ignore off-line servers");
         return CPROTO_ERR_SERVICE;
     }
 
@@ -802,7 +797,7 @@ static cproto_server_t SERVICE_on_new_replica_or_pool(
     qp_username.tp = QP_HOOK;
     qp_password.tp = QP_HOOK;
 
-    if (!qp_is_map(qp_next(qp_unpacker, NULL)))
+    if (!qp_is_map(qp_next(qp_unpacker, &qp_key)))
     {
         return CPROTO_ERR_SERVICE_INVALID_REQUEST;
     }