From: Jeroen van der Heijden Date: Mon, 20 Jan 2020 12:57:41 +0000 (+0100) Subject: Added services requests X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~5^2~28 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=ea1c15dba0ed94af0bdf56ee6f7af427ba7f394d;p=siridb-server.git Added services requests --- diff --git a/include/siri/api.h b/include/siri/api.h index 24857a99..cad41e58 100644 --- a/include/siri/api.h +++ b/include/siri/api.h @@ -7,6 +7,7 @@ #include #include #include +#include typedef enum { @@ -62,6 +63,7 @@ struct siri_api_request_s siri_api_content_t content_type; siri_api_req_t request_type; service_request_t service_type; + _Bool service_authenticated; http_parser parser; uv_write_t req; }; diff --git a/include/siri/service/account.h b/include/siri/service/account.h index 1de94bf6..8467cb65 100644 --- a/include/siri/service/account.h +++ b/include/siri/service/account.h @@ -27,6 +27,10 @@ int siri_service_account_change_password( qp_obj_t * qp_account, qp_obj_t * qp_password, char * err_msg); +_Bool siri_service_account_check_basic( + siri_t * siri, + const char * data, + size_t n); int siri_service_account_drop( siri_t * siri, qp_obj_t * qp_account, diff --git a/include/siri/service/request.h b/include/siri/service/request.h index cb58fad7..825bf504 100644 --- a/include/siri/service/request.h +++ b/include/siri/service/request.h @@ -26,7 +26,6 @@ void siri_service_request_destroy(void); cproto_server_t siri_service_request( int tp, qp_unpacker_t * qp_unpacker, - qp_obj_t * qp_account, qp_packer_t ** packaddr, uint16_t pid, sirinet_stream_t * client, diff --git a/itest/run_all.py b/itest/run_all.py index 7c67b200..dccabad1 100644 --- a/itest/run_all.py +++ b/itest/run_all.py @@ -2,41 +2,43 @@ from testing import run_test from testing import Server from testing import parse_args +from test_buffer import TestBuffer from test_cluster import TestCluster +from test_compression import TestCompression +from test_create_database import TestCreateDatabase from test_group import TestGroup -from test_list import TestList +from test_http_api import TestHTTPAPI from test_insert import TestInsert +from test_list import TestList +from test_log import TestLog +from test_log import TestLog +from test_parentheses import TestParenth +from test_pipe_support import TestPipeSupport from test_pool import TestPool from test_select import TestSelect from test_select_ns import TestSelectNano from test_series import TestSeries from test_server import TestServer -from test_user import TestUser -from test_compression import TestCompression -from test_log import TestLog -from test_log import TestLog -from test_pipe_support import TestPipeSupport -from test_buffer import TestBuffer from test_tee import TestTee -from test_parentheses import TestParenth -from test_create_database import TestCreateDatabase +from test_user import TestUser if __name__ == '__main__': parse_args() + run_test(TestBuffer()) run_test(TestCompression()) + run_test(TestCreateDatabase()) run_test(TestGroup()) - run_test(TestList()) + run_test(TestHTTPAPI()) run_test(TestInsert()) + run_test(TestList()) + run_test(TestLog()) + run_test(TestParenth()) + run_test(TestPipeSupport()) run_test(TestPool()) run_test(TestSelect()) run_test(TestSelectNano()) run_test(TestSeries()) run_test(TestServer()) - run_test(TestUser()) - run_test(TestLog()) - run_test(TestPipeSupport()) - run_test(TestBuffer()) run_test(TestTee()) - run_test(TestParenth()) - run_test(TestCreateDatabase()) + run_test(TestUser()) diff --git a/itest/test_http_api.py b/itest/test_http_api.py index 7f8f62b4..5967a0b7 100644 --- a/itest/test_http_api.py +++ b/itest/test_http_api.py @@ -1,47 +1,176 @@ import requests import json from testing import gen_points +import asyncio +import functools +import random +import time +import math +import re +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import ServerError +from testing import SiriDB +from testing import TestBase +from testing import UserAuthError +from testing import parse_args TIME_PRECISION = 's' -data = { - 'q': 'select * from "aggr"', -} - -x = requests.post( - f'http://localhost:9020/query/dbtest', - data=json.dumps(data), - auth=('iris', 'siri'), - headers={'Content-Type': 'application/json'} -) - -print(x.status_code) - -if x.status_code == 200: - print(x.json()) -else: - print(x.text) - -series_float = gen_points( - tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') - -series_int = gen_points( - tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') - -data = { - 'my_float': series_float, - 'my_int': series_int -} - -x = requests.post( - f'http://localhost:9020/insert/dbtest', - data=json.dumps(data), - auth=('iris', 'siri'), - headers={'Content-Type': 'application/json'} -) - -print(x.status_code) -if x.status_code == 200: - print(x.json()) -else: - print(x.text) + +class TestHTTPAPI(TestBase): + title = 'Test HTTP API requests' + + @default_test_setup(3, time_precision=TIME_PRECISION) + async def run(self): + await self.client0.connect() + + series_float = gen_points( + tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') + + series_int = gen_points( + tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') + + data = { + 'my_float': series_float, + 'my_int': series_int + } + + x = requests.post( + f'http://localhost:9020/insert/dbtest', + data=json.dumps(data), + auth=('iris', 'siri'), + headers={'Content-Type': 'application/json'} + ) + + self.assertEqual(x.status_code, 200) + self.assertDictEqual(x.json(), { + 'success_msg': 'Successfully inserted 20000 point(s).'}) + + data = { + 'dbname': 'dbtest', + 'host': 'localhost', + 'port': 9000, + 'username': 'iris', + 'password': 'siri' + } + + x = requests.post( + f'http://localhost:9021/new-pool', + data=json.dumps(data), + auth=('sa', 'siri'), + headers={'Content-Type': 'application/json'}) + + self.assertEqual(x.status_code, 200) + self.assertEqual(x.json(), 'OK') + + self.db.servers.append(self.server1) + await self.assertIsRunning(self.db, self.client0, timeout=30) + + data = {'data': [[1579521271, 10], [1579521573, 20]]} + x = requests.post( + f'http://localhost:9020/insert/dbtest', + json=data, + auth=('iris', 'siri')) + + self.assertEqual(x.status_code, 200) + self.assertDictEqual(x.json(), { + 'success_msg': 'Successfully inserted 2 point(s).'}) + + x = requests.post( + f'http://localhost:9020/query/dbtest', + json={'q': 'select * from "data"'}, + auth=('iris', 'siri')) + + self.assertEqual(x.status_code, 200) + self.assertEqual(x.json(), data) + + x = requests.post( + f'http://localhost:9020/query/dbtest', + json={'q': 'select * from "data"', 't': 'ms'}, + auth=('iris', 'siri')) + + data = { + 'data': [[p[0] * 1000, p[1]] for p in data['data']] + } + + self.assertEqual(x.status_code, 200) + self.assertEqual(x.json(), data) + + x = requests.post( + f'http://localhost:9021/new-account', + json={'account': 't', 'password': ''}, + auth=('sa', 'siri')) + + self.assertEqual(x.status_code, 400) + self.assertEqual(x.json(), {'error_msg': + 'service account name should have at least 2 characters'}) + + x = requests.post( + f'http://localhost:9021/new-account', + json={'account': 'tt', 'password': 'pass'}, + auth=('sa', 'siri')) + + self.assertEqual(x.status_code, 200) + + data = { + 'dbname': 'dbtest', + 'host': 'localhost', + 'port': 1234, + 'pool': 0, + 'username': 'iris', + 'password': 'siri' + } + + auth = ('tt', 'pass') + x = requests.post( + f'http://localhost:9021/new-replica', json=data, auth=auth) + + self.assertEqual(x.status_code, 400) + self.assertEqual(x.json(), { + 'error_msg': "database name already exists: 'dbtest'"}) + + x = requests.post( + f'http://localhost:9022/new-replica', json=data, auth=auth) + self.assertEqual(x.status_code, 401) + + auth = ('sa', 'siri') + x = requests.post( + f'http://localhost:9022/new-replica', json=data, auth=auth) + + self.assertEqual(x.status_code, 400) + self.assertEqual(x.json(), { + 'error_msg': + "connecting to server 'localhost:1234' failed with error: " + "connection refused"}) + + data['port'] = 9000 + x = requests.post( + f'http://localhost:9022/new-replica', json=data, auth=auth) + self.assertEqual(x.status_code, 200) + self.assertEqual(x.json(), 'OK') + + self.db.servers.append(self.server2) + await self.assertIsRunning(self.db, self.client0, timeout=30) + + x = requests.get( + f'http://localhost:9022/get-databases', auth=auth) + self.assertEqual(x.status_code, 200) + self.assertEqual(x.json(), ['dbtest']) + + self.client0.close() + + +if __name__ == '__main__': + parse_args() + run_test(TestHTTPAPI()) + diff --git a/src/siri/api.c b/src/siri/api.c index 45f8c2e8..56315187 100644 --- a/src/siri/api.c +++ b/src/siri/api.c @@ -10,6 +10,7 @@ #include #include #include +#include #define API__HEADER_MAX_SZ 256 @@ -347,6 +348,12 @@ static int api__on_authorization(siri_api_request_t * ar, const char * at, size_ if (api__istarts_with(&at, &n, "basic ", strlen("basic "))) { + if (ar->request_type == SIRI_APT_RT_SERVICE) + { + ar->service_authenticated = siri_service_account_check_basic( + &siri, at, n); + return 0; + } siridb_user_t * user; user = ar->siridb ? siridb_users_get_user_from_basic(ar->siridb, at, n) @@ -684,10 +691,14 @@ static int api__query_cb(http_parser * parser) static int api__service_cb(http_parser * parser) { - api__query_t q; + qp_unpacker_t up; + cproto_server_t res; siri_api_request_t * ar = parser->data; + qp_packer_t * packer = NULL; + sirinet_pkg_t * package; + char err_msg[SIRI_MAX_SIZE_ERR_MSG]; - switch (ar->service_type) + switch ((service_request_t) ar->service_type) { case SERVICE_NEW_ACCOUNT: case SERVICE_CHANGE_PASSWORD: @@ -708,7 +719,56 @@ static int api__service_cb(http_parser * parser) break; } + if (!ar->service_authenticated) + return api__plain_response(ar, E401_UNAUTHORIZED); + + switch (ar->content_type) + { + case SIRI_API_CT_TEXT: + return api__plain_response(ar, E415_UNSUPPORTED_MEDIA_TYPE); + case SIRI_API_CT_JSON: + { + char * dst; + size_t dst_n; + if (qpjson_json_to_qp(ar->buf, ar->len, &dst, &dst_n)) + return api__plain_response(ar, E400_BAD_REQUEST); + free(ar->buf); + ar->buf = dst; + ar->len = dst_n; + break; + } + case SIRI_API_CT_QPACK: + break; + } + qp_unpacker_init(&up, (unsigned char *) ar->buf, ar->len); + + res = siri_service_request( + ar->service_type, + &up, + &packer, + 0, + (sirinet_stream_t *) ar, + err_msg); + + if (res == CPROTO_DEFERRED) + { + sirinet_stream_incref(ar); + } + + package = + (res == CPROTO_DEFERRED) ? NULL : + (res == CPROTO_ERR_SERVICE) ? sirinet_pkg_err( + 0, + strlen(err_msg), + res, + err_msg) : + (res == CPROTO_ACK_SERVICE_DATA) ? sirinet_packer2pkg( + packer, + 0, + res) : sirinet_pkg_new(0, 0, res, NULL); + + return package ? sirinet_pkg_send((sirinet_stream_t *) ar, package) : 0; } static int api__message_complete_cb(http_parser * parser) @@ -730,18 +790,6 @@ static int api__message_complete_cb(http_parser * parser) return api__plain_response(ar, E500_INTERNAL_SERVER_ERROR); } -static int api__chunk_header_cb(http_parser * parser) -{ - LOGC("Chunk header\n Content-Length: %zu", parser->content_length); - return 0; -} - -static int api__chunk_complete_cb(http_parser * parser) -{ - LOGC("Chunk complete\n Content-Length: %zu", parser->content_length); - return 0; -} - static void api__write_free_cb(uv_write_t * req, int status) { free(req->data); @@ -787,8 +835,6 @@ int siri_api_init(void) api__settings.on_header_value = api__header_value_cb; api__settings.on_message_complete = api__message_complete_cb; api__settings.on_body = api__body_cb; - api__settings.on_chunk_header = api__chunk_header_cb; - api__settings.on_chunk_complete = api__chunk_complete_cb; api__settings.on_headers_complete = api__headers_complete_cb; if ( @@ -869,6 +915,16 @@ int siri_api_send( size_t n) { unsigned char * data; + if (n == 0) + { + if (ht != E200_OK) + { + return api__plain_response(ar, ht); + } + src = (unsigned char *) "\x82OK"; + n = 3; + } + if (ar->content_type == SIRI_API_CT_JSON) { size_t tmp_sz; diff --git a/src/siri/net/clserver.c b/src/siri/net/clserver.c index 21c37092..c8324efd 100644 --- a/src/siri/net/clserver.c +++ b/src/siri/net/clserver.c @@ -847,7 +847,6 @@ static void on_req_service(sirinet_stream_t * client, sirinet_pkg_t * pkg) siri_service_request( qp_request.via.int64, &unpacker, - &qp_username, &packer, pkg->pid, client, diff --git a/src/siri/service/account.c b/src/siri/service/account.c index 1f7af030..b0a43a51 100644 --- a/src/siri/service/account.c +++ b/src/siri/service/account.c @@ -8,6 +8,7 @@ #include #include #include +#include #define SIRI_SERVICE_ACCOUNT_SCHEMA 1 #define FILENAME ".accounts.dat" @@ -263,7 +264,7 @@ int siri_service_account_change_password( return -1; } - password= strndup( + password = strndup( (const char *) qp_password->via.raw, qp_password->len); if (password == NULL) @@ -294,6 +295,49 @@ int siri_service_account_change_password( return 0; } +static int ACCOUNT_cmp_str(siri_service_account_t * account, char * str) +{ + return strcmp(account->account, str) == 0; +} + + +_Bool siri_service_account_check_basic( + siri_t * siri, + const char * data, + size_t n) +{ + siri_service_account_t * account; + size_t size, nn, end; + char * b64 = base64_decode(data, n, &size); + _Bool is_valid = false; + char pw[OWCRYPT_SZ]; + + for (nn = 0, end = size; nn < end; ++nn) + { + if (b64[nn] == ':') + { + b64[nn] = '\0'; + + if (++nn > end) + break; + + account = (siri_service_account_t *) llist_get( + siri->accounts, + (llist_cb) ACCOUNT_cmp_str, + b64); + if (account) + { + owcrypt(b64 + nn, account->password, pw); + is_valid = strcmp(pw, account->password) == 0; + } + break; + } + } + + free(b64); + return is_valid; +} + /* * Returns 0 if dropped or -1 in case the account was not found. * diff --git a/src/siri/service/request.c b/src/siri/service/request.c index 421e2987..e73f12ed 100644 --- a/src/siri/service/request.c +++ b/src/siri/service/request.c @@ -60,12 +60,12 @@ "invalid database name: '%.*s'", \ (int) qp_dbname.len, \ qp_dbname.via.raw); \ - return CPROTO_ERR_SERVICE; \ + return CPROTO_ERR_SERVICE; \ } \ \ if (llist_get( \ siri.siridb_list, \ - (llist_cb) SERVICE_find_database, \ + (llist_cb) SERVICE_find_database, \ &qp_dbname) != NULL) \ { \ snprintf( \ @@ -74,7 +74,7 @@ "database name already exists: '%.*s'", \ (int) qp_dbname.len, \ qp_dbname.via.raw); \ - return CPROTO_ERR_SERVICE; \ + return CPROTO_ERR_SERVICE; \ } \ \ dbpath_len = strlen(siri.cfg->default_db_path) + qp_dbname.len + 2; \ @@ -92,7 +92,7 @@ SIRI_MAX_SIZE_ERR_MSG, \ "database directory already exists: %s", \ dbpath); \ - return CPROTO_ERR_SERVICE; \ + return CPROTO_ERR_SERVICE; \ } \ \ if (mkdir(dbpath, 0700) == -1) \ @@ -102,7 +102,7 @@ SIRI_MAX_SIZE_ERR_MSG, \ "cannot create directory: %s", \ dbpath); \ - return CPROTO_ERR_SERVICE; \ + return CPROTO_ERR_SERVICE; \ } \ \ char dbfn[dbpath_len + max_filename_sz]; \ @@ -111,26 +111,26 @@ fp = fopen(dbfn, "w"); \ if (fp == NULL) \ { \ - siri_service_request_rollback(dbpath); \ + siri_service_request_rollback(dbpath); \ snprintf( \ err_msg, \ SIRI_MAX_SIZE_ERR_MSG, \ "cannot open file for writing: %s", \ dbfn); \ - return CPROTO_ERR_SERVICE; \ + return CPROTO_ERR_SERVICE; \ } \ \ rc = fputs(DEFAULT_CONF, fp); \ \ if (fclose(fp) || rc < 0) \ { \ - siri_service_request_rollback(dbpath); \ + siri_service_request_rollback(dbpath); \ snprintf( \ err_msg, \ SIRI_MAX_SIZE_ERR_MSG, \ "cannot write file: %s", \ dbfn); \ - return CPROTO_ERR_SERVICE; \ + return CPROTO_ERR_SERVICE; \ } static cproto_server_t SERVICE_on_new_account( @@ -141,7 +141,6 @@ static cproto_server_t SERVICE_on_change_password( char * err_msg); static cproto_server_t SERVICE_on_drop_account( qp_unpacker_t * qp_unpacker, - qp_obj_t * qp_account, char * err_msg); static cproto_server_t SERVICE_on_drop_database( qp_unpacker_t * qp_unpacker, @@ -238,7 +237,6 @@ void siri_service_request_destroy(void) cproto_server_t siri_service_request( int tp, qp_unpacker_t * qp_unpacker, - qp_obj_t * qp_account, qp_packer_t ** packaddr, uint16_t pid, sirinet_stream_t * client, @@ -251,7 +249,7 @@ cproto_server_t siri_service_request( case SERVICE_CHANGE_PASSWORD: return SERVICE_on_change_password(qp_unpacker, err_msg); case SERVICE_DROP_ACCOUNT: - return SERVICE_on_drop_account(qp_unpacker, qp_account, err_msg); + return SERVICE_on_drop_account(qp_unpacker, err_msg); case SERVICE_NEW_DATABASE: return SERVICE_on_new_database(qp_unpacker, err_msg); case SERVICE_NEW_POOL: @@ -397,7 +395,6 @@ static cproto_server_t SERVICE_on_change_password( */ static cproto_server_t SERVICE_on_drop_account( qp_unpacker_t * qp_unpacker, - qp_obj_t * qp_account, char * err_msg) { qp_obj_t qp_key, qp_target; @@ -427,13 +424,11 @@ static cproto_server_t SERVICE_on_drop_account( return CPROTO_ERR_SERVICE_INVALID_REQUEST; } - if (qp_target.len == qp_account->len && - strncmp( - (const char *) qp_target.via.raw, - (const char *) qp_account->via.raw, - qp_target.len) == 0) + if (siri.accounts->len == 1) { - sprintf(err_msg, "cannot drop your own account"); + sprintf(err_msg, + "at least one service account is required, " + "cannot drop the last service account"); return CPROTO_ERR_SERVICE; } @@ -511,9 +506,9 @@ static cproto_server_t SERVICE_on_drop_database( if (!ignore_offline && !siridb_servers_online(siridb)) { sprintf(err_msg, - "at least one server is offline, " + "at least one server is off-line, " "set `ignore_offline` to true if you want to " - "ignore offline servers"); + "ignore off-line servers"); return CPROTO_ERR_SERVICE; } @@ -802,7 +797,7 @@ static cproto_server_t SERVICE_on_new_replica_or_pool( qp_username.tp = QP_HOOK; qp_password.tp = QP_HOOK; - if (!qp_is_map(qp_next(qp_unpacker, NULL))) + if (!qp_is_map(qp_next(qp_unpacker, &qp_key))) { return CPROTO_ERR_SERVICE_INVALID_REQUEST; }