#include <lib/http_parser.h>
#include <uv.h>
#include <siri/db/db.h>
+#include <siri/service/request.h>
typedef enum
{
siri_api_content_t content_type;
siri_api_req_t request_type;
service_request_t service_type;
+ _Bool service_authenticated;
http_parser parser;
uv_write_t req;
};
qp_obj_t * qp_account,
qp_obj_t * qp_password,
char * err_msg);
+_Bool siri_service_account_check_basic(
+ siri_t * siri,
+ const char * data,
+ size_t n);
int siri_service_account_drop(
siri_t * siri,
qp_obj_t * qp_account,
cproto_server_t siri_service_request(
int tp,
qp_unpacker_t * qp_unpacker,
- qp_obj_t * qp_account,
qp_packer_t ** packaddr,
uint16_t pid,
sirinet_stream_t * client,
from testing import run_test
from testing import Server
from testing import parse_args
+from test_buffer import TestBuffer
from test_cluster import TestCluster
+from test_compression import TestCompression
+from test_create_database import TestCreateDatabase
from test_group import TestGroup
-from test_list import TestList
+from test_http_api import TestHTTPAPI
from test_insert import TestInsert
+from test_list import TestList
+from test_log import TestLog
+from test_log import TestLog
+from test_parentheses import TestParenth
+from test_pipe_support import TestPipeSupport
from test_pool import TestPool
from test_select import TestSelect
from test_select_ns import TestSelectNano
from test_series import TestSeries
from test_server import TestServer
-from test_user import TestUser
-from test_compression import TestCompression
-from test_log import TestLog
-from test_log import TestLog
-from test_pipe_support import TestPipeSupport
-from test_buffer import TestBuffer
from test_tee import TestTee
-from test_parentheses import TestParenth
-from test_create_database import TestCreateDatabase
+from test_user import TestUser
if __name__ == '__main__':
parse_args()
+ run_test(TestBuffer())
run_test(TestCompression())
+ run_test(TestCreateDatabase())
run_test(TestGroup())
- run_test(TestList())
+ run_test(TestHTTPAPI())
run_test(TestInsert())
+ run_test(TestList())
+ run_test(TestLog())
+ run_test(TestParenth())
+ run_test(TestPipeSupport())
run_test(TestPool())
run_test(TestSelect())
run_test(TestSelectNano())
run_test(TestSeries())
run_test(TestServer())
- run_test(TestUser())
- run_test(TestLog())
- run_test(TestPipeSupport())
- run_test(TestBuffer())
run_test(TestTee())
- run_test(TestParenth())
- run_test(TestCreateDatabase())
+ run_test(TestUser())
import requests
import json
from testing import gen_points
+import asyncio
+import functools
+import random
+import time
+import math
+import re
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+from testing import parse_args
TIME_PRECISION = 's'
-data = {
- 'q': 'select * from "aggr"',
-}
-
-x = requests.post(
- f'http://localhost:9020/query/dbtest',
- data=json.dumps(data),
- auth=('iris', 'siri'),
- headers={'Content-Type': 'application/json'}
-)
-
-print(x.status_code)
-
-if x.status_code == 200:
- print(x.json())
-else:
- print(x.text)
-
-series_float = gen_points(
- tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
-
-series_int = gen_points(
- tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
-
-data = {
- 'my_float': series_float,
- 'my_int': series_int
-}
-
-x = requests.post(
- f'http://localhost:9020/insert/dbtest',
- data=json.dumps(data),
- auth=('iris', 'siri'),
- headers={'Content-Type': 'application/json'}
-)
-
-print(x.status_code)
-if x.status_code == 200:
- print(x.json())
-else:
- print(x.text)
+
+class TestHTTPAPI(TestBase):
+ title = 'Test HTTP API requests'
+
+ @default_test_setup(3, time_precision=TIME_PRECISION)
+ async def run(self):
+ await self.client0.connect()
+
+ series_float = gen_points(
+ tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
+
+ series_int = gen_points(
+ tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
+
+ data = {
+ 'my_float': series_float,
+ 'my_int': series_int
+ }
+
+ x = requests.post(
+ f'http://localhost:9020/insert/dbtest',
+ data=json.dumps(data),
+ auth=('iris', 'siri'),
+ headers={'Content-Type': 'application/json'}
+ )
+
+ self.assertEqual(x.status_code, 200)
+ self.assertDictEqual(x.json(), {
+ 'success_msg': 'Successfully inserted 20000 point(s).'})
+
+ data = {
+ 'dbname': 'dbtest',
+ 'host': 'localhost',
+ 'port': 9000,
+ 'username': 'iris',
+ 'password': 'siri'
+ }
+
+ x = requests.post(
+ f'http://localhost:9021/new-pool',
+ data=json.dumps(data),
+ auth=('sa', 'siri'),
+ headers={'Content-Type': 'application/json'})
+
+ self.assertEqual(x.status_code, 200)
+ self.assertEqual(x.json(), 'OK')
+
+ self.db.servers.append(self.server1)
+ await self.assertIsRunning(self.db, self.client0, timeout=30)
+
+ data = {'data': [[1579521271, 10], [1579521573, 20]]}
+ x = requests.post(
+ f'http://localhost:9020/insert/dbtest',
+ json=data,
+ auth=('iris', 'siri'))
+
+ self.assertEqual(x.status_code, 200)
+ self.assertDictEqual(x.json(), {
+ 'success_msg': 'Successfully inserted 2 point(s).'})
+
+ x = requests.post(
+ f'http://localhost:9020/query/dbtest',
+ json={'q': 'select * from "data"'},
+ auth=('iris', 'siri'))
+
+ self.assertEqual(x.status_code, 200)
+ self.assertEqual(x.json(), data)
+
+ x = requests.post(
+ f'http://localhost:9020/query/dbtest',
+ json={'q': 'select * from "data"', 't': 'ms'},
+ auth=('iris', 'siri'))
+
+ data = {
+ 'data': [[p[0] * 1000, p[1]] for p in data['data']]
+ }
+
+ self.assertEqual(x.status_code, 200)
+ self.assertEqual(x.json(), data)
+
+ x = requests.post(
+ f'http://localhost:9021/new-account',
+ json={'account': 't', 'password': ''},
+ auth=('sa', 'siri'))
+
+ self.assertEqual(x.status_code, 400)
+ self.assertEqual(x.json(), {'error_msg':
+ 'service account name should have at least 2 characters'})
+
+ x = requests.post(
+ f'http://localhost:9021/new-account',
+ json={'account': 'tt', 'password': 'pass'},
+ auth=('sa', 'siri'))
+
+ self.assertEqual(x.status_code, 200)
+
+ data = {
+ 'dbname': 'dbtest',
+ 'host': 'localhost',
+ 'port': 1234,
+ 'pool': 0,
+ 'username': 'iris',
+ 'password': 'siri'
+ }
+
+ auth = ('tt', 'pass')
+ x = requests.post(
+ f'http://localhost:9021/new-replica', json=data, auth=auth)
+
+ self.assertEqual(x.status_code, 400)
+ self.assertEqual(x.json(), {
+ 'error_msg': "database name already exists: 'dbtest'"})
+
+ x = requests.post(
+ f'http://localhost:9022/new-replica', json=data, auth=auth)
+ self.assertEqual(x.status_code, 401)
+
+ auth = ('sa', 'siri')
+ x = requests.post(
+ f'http://localhost:9022/new-replica', json=data, auth=auth)
+
+ self.assertEqual(x.status_code, 400)
+ self.assertEqual(x.json(), {
+ 'error_msg':
+ "connecting to server 'localhost:1234' failed with error: "
+ "connection refused"})
+
+ data['port'] = 9000
+ x = requests.post(
+ f'http://localhost:9022/new-replica', json=data, auth=auth)
+ self.assertEqual(x.status_code, 200)
+ self.assertEqual(x.json(), 'OK')
+
+ self.db.servers.append(self.server2)
+ await self.assertIsRunning(self.db, self.client0, timeout=30)
+
+ x = requests.get(
+ f'http://localhost:9022/get-databases', auth=auth)
+ self.assertEqual(x.status_code, 200)
+ self.assertEqual(x.json(), ['dbtest'])
+
+ self.client0.close()
+
+
+if __name__ == '__main__':
+ parse_args()
+ run_test(TestHTTPAPI())
+
#include <qpjson/qpjson.h>
#include <siri/db/query.h>
#include <siri/db/insert.h>
+#include <siri/service/account.h>
#define API__HEADER_MAX_SZ 256
if (api__istarts_with(&at, &n, "basic ", strlen("basic ")))
{
+ if (ar->request_type == SIRI_APT_RT_SERVICE)
+ {
+ ar->service_authenticated = siri_service_account_check_basic(
+ &siri, at, n);
+ return 0;
+ }
siridb_user_t * user;
user = ar->siridb
? siridb_users_get_user_from_basic(ar->siridb, at, n)
static int api__service_cb(http_parser * parser)
{
- api__query_t q;
+ qp_unpacker_t up;
+ cproto_server_t res;
siri_api_request_t * ar = parser->data;
+ qp_packer_t * packer = NULL;
+ sirinet_pkg_t * package;
+ char err_msg[SIRI_MAX_SIZE_ERR_MSG];
- switch (ar->service_type)
+ switch ((service_request_t) ar->service_type)
{
case SERVICE_NEW_ACCOUNT:
case SERVICE_CHANGE_PASSWORD:
break;
}
+ if (!ar->service_authenticated)
+ return api__plain_response(ar, E401_UNAUTHORIZED);
+
+ switch (ar->content_type)
+ {
+ case SIRI_API_CT_TEXT:
+ return api__plain_response(ar, E415_UNSUPPORTED_MEDIA_TYPE);
+ case SIRI_API_CT_JSON:
+ {
+ char * dst;
+ size_t dst_n;
+ if (qpjson_json_to_qp(ar->buf, ar->len, &dst, &dst_n))
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ free(ar->buf);
+ ar->buf = dst;
+ ar->len = dst_n;
+ break;
+ }
+ case SIRI_API_CT_QPACK:
+ break;
+ }
+ qp_unpacker_init(&up, (unsigned char *) ar->buf, ar->len);
+
+ res = siri_service_request(
+ ar->service_type,
+ &up,
+ &packer,
+ 0,
+ (sirinet_stream_t *) ar,
+ err_msg);
+
+ if (res == CPROTO_DEFERRED)
+ {
+ sirinet_stream_incref(ar);
+ }
+
+ package =
+ (res == CPROTO_DEFERRED) ? NULL :
+ (res == CPROTO_ERR_SERVICE) ? sirinet_pkg_err(
+ 0,
+ strlen(err_msg),
+ res,
+ err_msg) :
+ (res == CPROTO_ACK_SERVICE_DATA) ? sirinet_packer2pkg(
+ packer,
+ 0,
+ res) : sirinet_pkg_new(0, 0, res, NULL);
+
+ return package ? sirinet_pkg_send((sirinet_stream_t *) ar, package) : 0;
}
static int api__message_complete_cb(http_parser * parser)
return api__plain_response(ar, E500_INTERNAL_SERVER_ERROR);
}
-static int api__chunk_header_cb(http_parser * parser)
-{
- LOGC("Chunk header\n Content-Length: %zu", parser->content_length);
- return 0;
-}
-
-static int api__chunk_complete_cb(http_parser * parser)
-{
- LOGC("Chunk complete\n Content-Length: %zu", parser->content_length);
- return 0;
-}
-
static void api__write_free_cb(uv_write_t * req, int status)
{
free(req->data);
api__settings.on_header_value = api__header_value_cb;
api__settings.on_message_complete = api__message_complete_cb;
api__settings.on_body = api__body_cb;
- api__settings.on_chunk_header = api__chunk_header_cb;
- api__settings.on_chunk_complete = api__chunk_complete_cb;
api__settings.on_headers_complete = api__headers_complete_cb;
if (
size_t n)
{
unsigned char * data;
+ if (n == 0)
+ {
+ if (ht != E200_OK)
+ {
+ return api__plain_response(ar, ht);
+ }
+ src = (unsigned char *) "\x82OK";
+ n = 3;
+ }
+
if (ar->content_type == SIRI_API_CT_JSON)
{
size_t tmp_sz;
siri_service_request(
qp_request.via.int64,
&unpacker,
- &qp_username,
&packer,
pkg->pid,
client,
#include <siri/siri.h>
#include <stdarg.h>
#include <logger/logger.h>
+#include <base64/base64.h>
#define SIRI_SERVICE_ACCOUNT_SCHEMA 1
#define FILENAME ".accounts.dat"
return -1;
}
- password= strndup(
+ password = strndup(
(const char *) qp_password->via.raw, qp_password->len);
if (password == NULL)
return 0;
}
+static int ACCOUNT_cmp_str(siri_service_account_t * account, char * str)
+{
+ return strcmp(account->account, str) == 0;
+}
+
+
+_Bool siri_service_account_check_basic(
+ siri_t * siri,
+ const char * data,
+ size_t n)
+{
+ siri_service_account_t * account;
+ size_t size, nn, end;
+ char * b64 = base64_decode(data, n, &size);
+ _Bool is_valid = false;
+ char pw[OWCRYPT_SZ];
+
+ for (nn = 0, end = size; nn < end; ++nn)
+ {
+ if (b64[nn] == ':')
+ {
+ b64[nn] = '\0';
+
+ if (++nn > end)
+ break;
+
+ account = (siri_service_account_t *) llist_get(
+ siri->accounts,
+ (llist_cb) ACCOUNT_cmp_str,
+ b64);
+ if (account)
+ {
+ owcrypt(b64 + nn, account->password, pw);
+ is_valid = strcmp(pw, account->password) == 0;
+ }
+ break;
+ }
+ }
+
+ free(b64);
+ return is_valid;
+}
+
/*
* Returns 0 if dropped or -1 in case the account was not found.
*
"invalid database name: '%.*s'", \
(int) qp_dbname.len, \
qp_dbname.via.raw); \
- return CPROTO_ERR_SERVICE; \
+ return CPROTO_ERR_SERVICE; \
} \
\
if (llist_get( \
siri.siridb_list, \
- (llist_cb) SERVICE_find_database, \
+ (llist_cb) SERVICE_find_database, \
&qp_dbname) != NULL) \
{ \
snprintf( \
"database name already exists: '%.*s'", \
(int) qp_dbname.len, \
qp_dbname.via.raw); \
- return CPROTO_ERR_SERVICE; \
+ return CPROTO_ERR_SERVICE; \
} \
\
dbpath_len = strlen(siri.cfg->default_db_path) + qp_dbname.len + 2; \
SIRI_MAX_SIZE_ERR_MSG, \
"database directory already exists: %s", \
dbpath); \
- return CPROTO_ERR_SERVICE; \
+ return CPROTO_ERR_SERVICE; \
} \
\
if (mkdir(dbpath, 0700) == -1) \
SIRI_MAX_SIZE_ERR_MSG, \
"cannot create directory: %s", \
dbpath); \
- return CPROTO_ERR_SERVICE; \
+ return CPROTO_ERR_SERVICE; \
} \
\
char dbfn[dbpath_len + max_filename_sz]; \
fp = fopen(dbfn, "w"); \
if (fp == NULL) \
{ \
- siri_service_request_rollback(dbpath); \
+ siri_service_request_rollback(dbpath); \
snprintf( \
err_msg, \
SIRI_MAX_SIZE_ERR_MSG, \
"cannot open file for writing: %s", \
dbfn); \
- return CPROTO_ERR_SERVICE; \
+ return CPROTO_ERR_SERVICE; \
} \
\
rc = fputs(DEFAULT_CONF, fp); \
\
if (fclose(fp) || rc < 0) \
{ \
- siri_service_request_rollback(dbpath); \
+ siri_service_request_rollback(dbpath); \
snprintf( \
err_msg, \
SIRI_MAX_SIZE_ERR_MSG, \
"cannot write file: %s", \
dbfn); \
- return CPROTO_ERR_SERVICE; \
+ return CPROTO_ERR_SERVICE; \
}
static cproto_server_t SERVICE_on_new_account(
char * err_msg);
static cproto_server_t SERVICE_on_drop_account(
qp_unpacker_t * qp_unpacker,
- qp_obj_t * qp_account,
char * err_msg);
static cproto_server_t SERVICE_on_drop_database(
qp_unpacker_t * qp_unpacker,
cproto_server_t siri_service_request(
int tp,
qp_unpacker_t * qp_unpacker,
- qp_obj_t * qp_account,
qp_packer_t ** packaddr,
uint16_t pid,
sirinet_stream_t * client,
case SERVICE_CHANGE_PASSWORD:
return SERVICE_on_change_password(qp_unpacker, err_msg);
case SERVICE_DROP_ACCOUNT:
- return SERVICE_on_drop_account(qp_unpacker, qp_account, err_msg);
+ return SERVICE_on_drop_account(qp_unpacker, err_msg);
case SERVICE_NEW_DATABASE:
return SERVICE_on_new_database(qp_unpacker, err_msg);
case SERVICE_NEW_POOL:
*/
static cproto_server_t SERVICE_on_drop_account(
qp_unpacker_t * qp_unpacker,
- qp_obj_t * qp_account,
char * err_msg)
{
qp_obj_t qp_key, qp_target;
return CPROTO_ERR_SERVICE_INVALID_REQUEST;
}
- if (qp_target.len == qp_account->len &&
- strncmp(
- (const char *) qp_target.via.raw,
- (const char *) qp_account->via.raw,
- qp_target.len) == 0)
+ if (siri.accounts->len == 1)
{
- sprintf(err_msg, "cannot drop your own account");
+ sprintf(err_msg,
+ "at least one service account is required, "
+ "cannot drop the last service account");
return CPROTO_ERR_SERVICE;
}
if (!ignore_offline && !siridb_servers_online(siridb))
{
sprintf(err_msg,
- "at least one server is offline, "
+ "at least one server is off-line, "
"set `ignore_offline` to true if you want to "
- "ignore offline servers");
+ "ignore off-line servers");
return CPROTO_ERR_SERVICE;
}
qp_username.tp = QP_HOOK;
qp_password.tp = QP_HOOK;
- if (!qp_is_map(qp_next(qp_unpacker, NULL)))
+ if (!qp_is_map(qp_next(qp_unpacker, &qp_key)))
{
return CPROTO_ERR_SERVICE_INVALID_REQUEST;
}