#include <assert.h>
#include <qpjson/qpjson.h>
#include <qpack/qpack.h>
+#include <logger/logger.h>
static yajl_gen_status qp__to_json(yajl_gen g, qp_unpacker_t * up, qp_obj_t * obj)
{
return yajl_gen_in_error_state;
}
-
yajl_gen_status qpjson_qp_to_json(
void * src,
size_t src_n,
yajl_gen g;
yajl_status stat;
- assert (src_n);
+ if (src_n == 0)
+ {
+ *dst_n = 0;
+ *dst = NULL;
+ return yajl_gen_status_ok;
+ }
+
qp_unpacker_init(&up, src, src_n);
g = yajl_gen_alloc(NULL);
return stat;
}
+static int reformat_null(void * ctx)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_null(pk);
+}
+
+static int reformat_boolean(void * ctx, int boolean)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == (boolean ? qp_add_true(pk) : qp_add_false(pk));
+}
+
+static int reformat_integer(void * ctx, long long i)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_int64(pk, i);
+}
+
+static int reformat_double(void * ctx, double d)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_double(pk, d);
+}
+
+static int reformat_string(void * ctx, const unsigned char * s, size_t n)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_raw(pk, s, n);
+}
+
+static int reformat_map_key(void * ctx, const unsigned char * s, size_t n)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_raw(pk, s, n);
+}
+
+static int reformat_start_map(void * ctx)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_type(pk, QP_MAP_OPEN);
+}
+
+
+static int reformat_end_map(void * ctx)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_type(pk, QP_MAP_CLOSE);
+}
+
+static int reformat_start_array(void * ctx)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_type(pk, QP_ARRAY_OPEN);
+}
+
+static int reformat_end_array(void * ctx)
+{
+ qp_packer_t * pk = (qp_packer_t *) ctx;
+ return 0 == qp_add_type(pk, QP_ARRAY_CLOSE);
+}
+
+static void take_buffer(
+ qp_packer_t * pk,
+ char ** dst,
+ size_t * dst_n)
+{
+ *dst = (char *) pk->buffer;
+ *dst_n = pk->len;
+ pk->buffer = NULL;
+ pk->buffer_size = 0;
+}
+
+static yajl_callbacks qpjson__callbacks = {
+ reformat_null,
+ reformat_boolean,
+ reformat_integer,
+ reformat_double,
+ NULL,
+ reformat_string,
+ reformat_start_map,
+ reformat_map_key,
+ reformat_end_map,
+ reformat_start_array,
+ reformat_end_array
+};
yajl_status qpjson_json_to_qp(
const void * src,
{
yajl_handle hand;
yajl_status stat = yajl_status_error;
- qp_packer_t * packer = qp_packer_new(src_n);
- if (!packer)
+ qp_packer_t * pk = qp_packer_new(src_n);
+
+ if (pk == NULL)
return stat;
-// hand = yajl_alloc(&callbacks, NULL, c);
-// if (!hand)
-// goto fail1;
-//
-// stat = yajl_parse(hand, src, src_n);
-// if (stat == yajl_status_ok)
-// take_buffer(&buffer, dst, dst_n);
-// yajl_free(hand);
- qp_packer_free(packer);
+ hand = yajl_alloc(&qpjson__callbacks, NULL, pk);
+ if (!hand)
+ goto fail1;
+
+ stat = yajl_parse(hand, src, src_n);
+
+ if (stat == yajl_status_ok)
+ take_buffer(pk, dst, dst_n);
+
+ yajl_free(hand);
+
+fail1:
+ qp_packer_free(pk);
return stat;
}
/*
* api.c
*/
+#include <string.h>
#include <siri/api.h>
#include <assert.h>
+#include <math.h>
#include <siri/siri.h>
#include <siri/db/users.h>
#include <qpjson/qpjson.h>
#include <siri/db/query.h>
+#include <siri/db/insert.h>
#define API__HEADER_MAX_SZ 256
-#define CONTENT_TYPE_JSON "application/json"
static uv_tcp_t api__uv_server;
static http_parser_settings api__settings;
+typedef struct
+{
+ char * query;
+ size_t query_n;
+ double factor;
+} api__query_t;
+
#define API__ICMP_WITH(__s, __n, __w) \
- __n == strlen(__w) && strncasecmp(__s, __w, __n) == 0
+ (__n == strlen(__w) && strncasecmp(__s, __w, __n) == 0)
-static const char api__content_type[2][20] = {
+static const char api__content_type[3][20] = {
"text/plain",
"application/json",
+ "application/qpack",
};
-static const char api__html_header[8][32] = {
+static const char api__html_header[10][32] = {
"200 OK",
"400 Bad Request",
"401 Unauthorized",
"403 Forbidden",
"404 Not Found",
+ "405 Method Not Allowed",
"415 Unsupported Media Type",
+ "422 Unprocessable Entity",
"500 Internal Server Error",
"503 Service Unavailable",
};
-static const char api__default_body[8][30] = {
+static const char api__default_body[10][30] = {
"OK\r\n",
"BAD REQUEST\r\n",
"UNAUTHORIZED\r\n",
"FORBIDDEN\r\n",
"NOT FOUND\r\n",
+ "METHOD NOT ALLOWED\r\n",
"UNSUPPORTED MEDIA TYPE\r\n",
+ "UNPROCESSABLE ENTITY\r\n",
"INTERNAL SERVER ERROR\r\n",
"SERVICE UNAVAILABLE\r\n",
};
-typedef enum
-{
- E200_OK,
- E400_BAD_REQUEST,
- E401_UNAUTHORIZED,
- E403_FORBIDDEN,
- E404_NOT_FOUND,
- E415_UNSUPPORTED_MEDIA_TYPE,
- E500_INTERNAL_SERVER_ERROR,
- E503_SERVICE_UNAVAILABLE
-} api__header_t;
inline static int api__header(
char * ptr,
- const api__header_t ht,
- const siridb_api_content_t ct,
+ const siri_api_header_t ht,
+ const siri_api_content_t ct,
size_t content_length)
{
int len = sprintf(
return len;
}
-static inline _Bool api__starts_with(
+static inline _Bool api__istarts_with(
const char ** str,
size_t * strn,
const char * with,
return true;
}
+static inline _Bool api__starts_with(
+ const char ** str,
+ size_t * strn,
+ const char * with,
+ size_t withn)
+{
+ if (*strn < withn || strncmp(*str, with, withn))
+ {
+ return false;
+ }
+ *str += withn;
+ *strn -= withn;
+ return true;
+}
+
static void api__alloc_cb(
uv_handle_t * UNUSED_handle __attribute__((unused)),
size_t UNUSED_sugsz __attribute__((unused)),
assert (!ar->buf);
ar->buf = malloc(parser->content_length);
- if (ar->len)
+ if (ar->buf)
{
ar->len = parser->content_length;
}
return 0;
}
+static void api__get_siridb(siri_api_request_t * ar, const char * at, size_t n)
+{
+ const char * pt = at;
+ size_t nn = 0;
+
+ while (n && *pt != '?')
+ {
+ ++pt;
+ --n;
+ ++nn;
+ }
+
+ ar->siridb = siridb_getn(siri.siridb_list, at, nn);
+ if (ar->siridb)
+ {
+ siridb_incref(ar->siridb);
+ }
+}
+
static int api__url_cb(http_parser * parser, const char * at, size_t n)
{
siri_api_request_t * ar = parser->data;
if (api__starts_with(&at, &n, "/query/", strlen("/query/")))
{
- ar->siridb = siridb_getn(siri.siridb_list, at, n);
- if (ar->siridb)
- {
- siridb_incref(ar->siridb);
- }
+ ar->request_type = SIRI_API_RT_QUERY;
+ api__get_siridb(ar, at, n);
+ }
+ else if (api__starts_with(&at, &n, "/insert/", strlen("/insert/")))
+ {
+ ar->request_type = SIRI_API_RT_INSERT;
+ api__get_siridb(ar, at, n);
}
return 0;
static int api__on_content_type(siri_api_request_t * ar, const char * at, size_t n)
{
- if (API__ICMP_WITH(at, n, CONTENT_TYPE_JSON))
+ if (API__ICMP_WITH(at, n, api__content_type[SIRI_API_CT_JSON]) ||
+ API__ICMP_WITH(at, n, "text/json"))
{
- ar->content_type = SIRIDB_API_CT_JSON;
+ ar->content_type = SIRI_API_CT_JSON;
return 0;
}
- if (API__ICMP_WITH(at, n, "text/json"))
+ if (API__ICMP_WITH(at, n, api__content_type[SIRI_API_CT_QPACK]) ||
+ API__ICMP_WITH(at, n, "application/x-qpack"))
{
- ar->content_type = SIRIDB_API_CT_JSON;
+ ar->content_type = SIRI_API_CT_QPACK;
return 0;
}
static int api__on_authorization(siri_api_request_t * ar, const char * at, size_t n)
{
- if (api__starts_with(&at, &n, "token ", strlen("token ")))
+ if (api__istarts_with(&at, &n, "bearer ", strlen("bearer ")))
{
log_debug("token authorization is not supported yet");
}
- if (api__starts_with(&at, &n, "basic ", strlen("basic ")))
+ if (api__istarts_with(&at, &n, "basic ", strlen("basic ")))
{
siridb_user_t * user;
- user = siridb_users_get_user_from_basic(ar->siridb, at, n);
+ user = ar->siridb
+ ? siridb_users_get_user_from_basic(ar->siridb, at, n)
+ : NULL;
if (user)
{
sirinet_stream_decref((siri_api_request_t *) req->handle->data);
}
-static int api__plain_response(siri_api_request_t * ar, const api__header_t ht)
+static int api__plain_response(
+ siri_api_request_t * ar,
+ const siri_api_header_t ht)
{
const char * body = api__default_body[ht];
char header[API__HEADER_MAX_SZ];
int header_size;
body_size = strlen(body);
- header_size = api__header(header, ht, SIRIDB_API_CT_TEXT, body_size);
+ header_size = api__header(header, ht, SIRI_API_CT_TEXT, body_size);
if (header_size > 0)
{
return -1;
}
-static int api__query(siri_api_request_t * ar)
+static int api__query(siri_api_request_t * ar, api__query_t * q)
{
- const char q[100] = "select * from 'aggr'";
+ sirinet_stream_incref(ar);
siridb_query_run(
0,
(sirinet_stream_t *) ar,
- q, strlen(q),
- 0.0,
+ q->query,
+ q->query_n,
+ q->factor,
SIRIDB_QUERY_FLAG_MASTER);
return 0;
}
-static int api__message_complete_cb(http_parser * parser)
+static int api__read_factor(
+ siridb_timep_t precision,
+ qp_unpacker_t * up,
+ api__query_t * q)
{
- siri_api_request_t * ar = parser->data;
+ qp_obj_t obj;
+
+ if (!qp_is_raw(qp_next(up, &obj)))
+ return -1;
+
+ if (obj.len == 1 && obj.via.raw[0] == 's')
+ {
+ q->factor = pow(1000.0, SIRIDB_TIME_SECONDS - precision);
+ return 0;
+ }
+
+ if (obj.len == 2 && obj.via.raw[1] == 's')
+ {
+ switch (obj.via.raw[0])
+ {
+ case 'm':
+ q->factor = pow(1000.0, SIRIDB_TIME_MILLISECONDS - precision);
+ return 0;
+ case 'u':
+ q->factor = pow(1000.0, SIRIDB_TIME_MICROSECONDS - precision);
+ return 0;
+ case 'n':
+ q->factor = pow(1000.0, SIRIDB_TIME_NANOSECONDS - precision);
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static int api__query_from_qp(api__query_t * q, siri_api_request_t * ar)
+{
+ qp_obj_t obj;
+ qp_unpacker_t up;
+ qp_unpacker_init(&up, (unsigned char *) ar->buf, ar->len);
+
+ q->factor = 0.0;
+ q->query = NULL;
+
+ if (!qp_is_map(qp_next(&up, &obj)))
+ {
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ }
+
+ while (qp_next(&up, &obj) && !qp_is_close(obj.tp))
+ {
+ if (!qp_is_raw(obj.tp) || obj.len != 1)
+ {
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ }
+
+ switch(*obj.via.str)
+ {
+ case 't':
+ if (api__read_factor(ar->siridb->time->precision, &up, q))
+ {
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ }
+ break;
+ case 'q':
+ if (!qp_is_raw(qp_next(&up, &obj)))
+ {
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ }
+ q->query = obj.via.str;
+ q->query_n = obj.len;
+ break;
+ default:
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ }
+ }
+
+ return q->query == NULL
+ ? api__plain_response(ar, E400_BAD_REQUEST)
+ : api__query(ar, q);
+}
+
+static int api__insert_from_qp(siri_api_request_t * ar)
+{
+ qp_unpacker_t unpacker;
+ qp_unpacker_init(&unpacker, (unsigned char *) ar->buf, ar->len);
+
+ siridb_insert_t * insert = siridb_insert_new(
+ ar->siridb,
+ 0,
+ (sirinet_stream_t *) ar);
+
+ if (insert == NULL)
+ {
+ return api__plain_response(ar, E500_INTERNAL_SERVER_ERROR);
+ }
+
+ ssize_t rc = siridb_insert_assign_pools(
+ ar->siridb,
+ &unpacker,
+ insert->packer);
+
+ switch ((siridb_insert_err_t) rc)
+ {
+ case ERR_EXPECTING_ARRAY:
+ case ERR_EXPECTING_SERIES_NAME:
+ case ERR_EXPECTING_MAP_OR_ARRAY:
+ case ERR_EXPECTING_INTEGER_TS:
+ case ERR_TIMESTAMP_OUT_OF_RANGE:
+ case ERR_UNSUPPORTED_VALUE:
+ case ERR_EXPECTING_AT_LEAST_ONE_POINT:
+ case ERR_EXPECTING_NAME_AND_POINTS:
+ case ERR_INCOMPATIBLE_SERVER_VERSION:
+ case ERR_MEM_ALLOC:
+ {
+ /* something went wrong, get correct err message */
+ const char * err_msg = siridb_insert_err_msg(rc);
+
+ log_error("Insert error: '%s' at position %lu",
+ err_msg, unpacker.pt - (unsigned char *) ar->buf);
+
+ /* create and send package */
+ sirinet_pkg_t * package = sirinet_pkg_err(
+ 0,
+ strlen(err_msg),
+ CPROTO_ERR_INSERT,
+ err_msg);
+
+ if (package != NULL)
+ {
+ /* ignore result code, signal can be raised */
+ sirinet_pkg_send((sirinet_stream_t *) ar, package);
+ }
+ }
+
+ /* error, free insert */
+ siridb_insert_free(insert);
+ break;
+
+ default:
+ if (siridb_insert_points_to_pools(insert, (size_t) rc))
+ {
+ siridb_insert_free(insert); /* signal is raised */
+ }
+ else
+ {
+ /* extra increment for the insert task */
+ sirinet_stream_incref(ar);
+ }
+ break;
+ }
+ return 0;
+}
+
+static int api__insert_cb(siri_api_request_t * ar)
+{
+ assert (ar->request_type == SIRI_API_RT_INSERT);
if (!ar->siridb)
return api__plain_response(ar, E404_NOT_FOUND);
if (!ar->origin)
return api__plain_response(ar, E401_UNAUTHORIZED);
+
+ if (!(((siridb_user_t *) ar->origin)->access_bit & SIRIDB_ACCESS_INSERT))
+ return api__plain_response(ar, E403_FORBIDDEN);
+
+ if ((
+ ar->siridb->server->flags != SERVER_FLAG_RUNNING &&
+ ar->siridb->server->flags != SERVER_FLAG_RUNNING + SERVER_FLAG_REINDEXING
+ ) ||
+ !siridb_pools_accessible(ar->siridb))
+ return api__plain_response(ar, E503_SERVICE_UNAVAILABLE);
+
switch (ar->content_type)
{
- case SIRIDB_API_CT_TEXT:
+ case SIRI_API_CT_TEXT:
/* Or, shall we allow text and return we some sort of CSV format? */
- return api__plain_response(ar, E400_BAD_REQUEST);
- case SIRIDB_API_CT_JSON:
- return api__query(ar);
+ break;
+ case SIRI_API_CT_JSON:
+ {
+ char * dst;
+ size_t dst_n;
+ if (qpjson_json_to_qp(ar->buf, ar->len, &dst, &dst_n))
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ free(ar->buf);
+ ar->buf = dst;
+ ar->len = dst_n;
+ }
+ /* fall through */
+ case SIRI_API_CT_QPACK:
+ return api__insert_from_qp(ar);
+ }
+
+ return api__plain_response(ar, E415_UNSUPPORTED_MEDIA_TYPE);
+}
+
+static int api__query_cb(siri_api_request_t * ar)
+{
+ api__query_t q;
+ assert (ar->request_type == SIRI_API_RT_QUERY);
+
+ if (!ar->siridb)
+ return api__plain_response(ar, E404_NOT_FOUND);
+
+ if (!ar->origin)
+ return api__plain_response(ar, E401_UNAUTHORIZED);
+
+ switch (ar->content_type)
+ {
+ case SIRI_API_CT_TEXT:
+ /* Or, shall we allow text and return we some sort of CSV format? */
+ break;
+ case SIRI_API_CT_JSON:
+ {
+ char * dst;
+ size_t dst_n;
+ if (qpjson_json_to_qp(ar->buf, ar->len, &dst, &dst_n))
+ return api__plain_response(ar, E400_BAD_REQUEST);
+ free(ar->buf);
+ ar->buf = dst;
+ ar->len = dst_n;
+ }
+ /* fall through */
+ case SIRI_API_CT_QPACK:
+ return api__query_from_qp(&q, ar);
+ }
+
+ return api__plain_response(ar, E415_UNSUPPORTED_MEDIA_TYPE);
+}
+
+static int api__message_complete_cb(http_parser * parser)
+{
+ siri_api_request_t * ar = parser->data;
+
+ if (parser->method != HTTP_POST)
+ return api__plain_response(ar, E405_METHOD_NOT_ALLOWED);
+
+ switch(ar->request_type)
+ {
+ case SIRI_API_RT_NONE:
+ return api__plain_response(ar, E404_NOT_FOUND);
+ case SIRI_API_RT_QUERY:
+ return api__query_cb(ar);
+ case SIRI_API_RT_INSERT:
+ return api__insert_cb(ar);
}
return api__plain_response(ar, E500_INTERNAL_SERVER_ERROR);
api__write_cb(req, status);
}
-static int api__close_resp(siri_api_request_t * ar, void * data, size_t size)
+static int api__close_resp(
+ siri_api_request_t * ar,
+ const siri_api_header_t ht,
+ void * data,
+ size_t size)
{
char header[API__HEADER_MAX_SZ];
int header_size = 0;
- header_size = api__header(header, E200_OK, ar->content_type, size);
+ header_size = api__header(header, ht, ar->content_type, size);
uv_buf_t uvbufs[2] = {
uv_buf_init(header, (unsigned int) header_size),
return 0;
}
-int siri_api_send(siri_api_request_t * ar, unsigned char * src, size_t n)
+#define API__DUP(__data, __n, __s) \
+ do { \
+ __n = strlen("{\"error_msg\":\""__s"\"}"); \
+ __data = strndup("{\"error_msg\":\""__s"\"}", __n); \
+ if (!__data) __n = 0; \
+ } while(0)
+
+static void api__yajl_parse_error(
+ char ** str,
+ size_t * size,
+ siri_api_header_t * ht,
+ yajl_gen_status stat)
+{
+ switch (stat)
+ {
+ case yajl_gen_status_ok:
+ return;
+ case yajl_gen_keys_must_be_strings:
+ *ht = E400_BAD_REQUEST;
+ API__DUP(*str, *size, "JSON keys must be strings");
+ return;
+ case yajl_max_depth_exceeded:
+ *ht = E400_BAD_REQUEST;
+ API__DUP(*str, *size, "JSON max depth exceeded");
+ return;
+ case yajl_gen_in_error_state:
+ *ht = E500_INTERNAL_SERVER_ERROR;
+ API__DUP(*str, *size, "JSON general error");
+ return;
+ case yajl_gen_generation_complete:
+ *ht = E500_INTERNAL_SERVER_ERROR;
+ API__DUP(*str, *size, "JSON completed");
+ return;
+ case yajl_gen_invalid_number:
+ *ht = E400_BAD_REQUEST;
+ API__DUP(*str, *size, "JSON invalid number");
+ return;
+ case yajl_gen_no_buf:
+ *ht = E500_INTERNAL_SERVER_ERROR;
+ API__DUP(*str, *size, "JSON no buffer has been set");
+ return;
+ case yajl_gen_invalid_string:
+ *ht = E400_BAD_REQUEST;
+ API__DUP(*str, *size, "JSON only accepts valid UTF8 strings");
+ return;
+ }
+ *ht = E500_INTERNAL_SERVER_ERROR;
+ API__DUP(*str, *size, "JSON unexpected error");
+}
+
+int siri_api_send(
+ siri_api_request_t * ar,
+ siri_api_header_t ht,
+ unsigned char * src,
+ size_t n)
{
unsigned char * data;
- if (ar->content_type == SIRIDB_API_CT_JSON)
+ if (ar->content_type == SIRI_API_CT_JSON)
{
size_t tmp_sz;
yajl_gen_status stat = qpjson_qp_to_json(
&tmp_sz,
0);
if (stat)
- {
- // api__set_yajl_gen_status_error(&ar->e, stat);
- // return ti_api_close_with_err(ar, &ar->e);
- // TODO : return error
- LOGC("HERE: %d", stat);
- }
+ api__yajl_parse_error((char **) &data, &tmp_sz, &ht, stat);
+
n = tmp_sz;
}
else
{
+ assert (ar->content_type == SIRI_API_CT_QPACK);
data = malloc(n);
if (data == NULL)
{
- // TODO : return error
+ ht = E500_INTERNAL_SERVER_ERROR;
+ n = 0;
+ }
+ else
+ {
+ memcpy(data, src, n);
}
- memcpy(data, src, n);
}
- return api__close_resp(ar, data, n);
+
+ return api__close_resp(ar, ht, data, n);
}