Change compression so it's streamed. Decompression is still on a chunk-by-
authorJonathan Dieter <jdieter@gmail.com>
Sat, 24 Mar 2018 14:54:19 +0000 (16:54 +0200)
committerJonathan Dieter <jdieter@gmail.com>
Sat, 24 Mar 2018 14:54:19 +0000 (16:54 +0200)
chunk basis.

Signed-off-by: Jonathan Dieter <jdieter@gmail.com>
include/zck.h
src/lib/comp/comp.c
src/lib/comp/nocomp/nocomp.c
src/lib/comp/zstd/zstd.c
src/lib/index/index_create.c
src/lib/zck.c
src/lib/zck_private.h
src/zck.c

index 193e4e69cf2182776dfd2071d8a468460baa3340..59d833d08d7350b992ce7b07813772f1341eba3f 100644 (file)
@@ -118,6 +118,8 @@ int zck_comp_init(zckCtx *zck);
 int zck_comp_close(zckCtx *zck);
 /* Compress data src of size src_size, and write to chunk */
 int zck_compress(zckCtx *zck, const char *src, const size_t src_size);
+/* Finish compressing chunk */
+int zck_end_chunk(zckCtx *zck);
 /* Decompress data src of size src_size, and write to dst, while setting
  * dst_size */
 int zck_decompress(zckCtx *zck, const char *src, const size_t src_size,
index 11c75062711518fe1232b07ef1f3f63022ef8464..59e1bd5a2e19935430b621d810e0ab0287acee85 100644 (file)
 #endif
 
 #define BLK_SIZE 32768
+#define VALIDATE(f)     if(!f) { \
+                            zck_log(ZCK_LOG_ERROR, "zckCtx not initialized\n"); \
+                            return False; \
+                        }
 
 static char unknown[] = "Unknown(\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
 
@@ -46,6 +50,8 @@ const static char *COMP_NAME[] = {
 };
 
 int zck_comp_init(zckCtx *zck) {
+    VALIDATE(zck);
+
     zckComp *comp = &(zck->comp);
     char *dst = NULL;
     size_t dst_size = 0;
@@ -66,12 +72,22 @@ int zck_comp_init(zckCtx *zck) {
         if(!zck->comp.compress(comp, zck->comp.dict, zck->comp.dict_size, &dst,
                                &dst_size, 0))
             return False;
-
         if(!zck_write(zck->temp_fd, dst, dst_size)) {
             free(dst);
             return False;
         }
         zck_index_add_to_chunk(zck, dst, dst_size, zck->comp.dict_size);
+        free(dst);
+        dst = NULL;
+        dst_size = 0;
+
+        if(!zck->comp.end_chunk(comp, &dst, &dst_size, 0))
+            return False;
+        if(!zck_write(zck->temp_fd, dst, dst_size)) {
+            free(dst);
+            return False;
+        }
+        zck_index_add_to_chunk(zck, dst, dst_size, 0);
         zck_index_finish_chunk(zck);
         free(dst);
     }
@@ -82,9 +98,7 @@ int zck_comp_init(zckCtx *zck) {
 }
 
 int zck_compress(zckCtx *zck, const char *src, const size_t src_size) {
-    zckComp *comp = &(zck->comp);
-    char *dst;
-    size_t dst_size;
+    VALIDATE(zck);
 
     if(!zck->comp.started) {
         zck_log(ZCK_LOG_ERROR, "Compression hasn't been initialized yet\n");
@@ -94,21 +108,61 @@ int zck_compress(zckCtx *zck, const char *src, const size_t src_size) {
     if(src_size == 0)
         return True;
 
-    if(!zck->comp.compress(comp, src, src_size, &dst, &dst_size, 1))
+    char *dst = NULL;
+    size_t dst_size = 0;
+    if(!zck->comp.compress(&(zck->comp), src, src_size, &dst, &dst_size, 1))
         return False;
+    if(dst_size > 0 && !zck_write(zck->temp_fd, dst, dst_size)) {
+        free(dst);
+        return False;
+    }
+    if(!zck_index_add_to_chunk(zck, dst, dst_size, src_size)) {
+        free(dst);
+        return False;
+    }
+    free(dst);
+    return True;
+}
 
-    if(!zck_write(zck->temp_fd, dst, dst_size)) {
+int zck_end_chunk(zckCtx *zck) {
+    VALIDATE(zck);
+
+    if(!zck->comp.started) {
+        zck_log(ZCK_LOG_ERROR, "Compression hasn't been initialized yet\n");
+        return False;
+    }
+
+    /* No point in compressing empty data */
+    if(zck->comp.data_size == 0) {
+        if(!zck_index_finish_chunk(zck))
+            return False;
+        return True;
+    }
+
+    char *dst = NULL;
+    size_t dst_size = 0;
+    if(!zck->comp.end_chunk(&(zck->comp), &dst, &dst_size, 1))
+        return False;
+    if(dst_size > 0 && !zck_write(zck->temp_fd, dst, dst_size)) {
+        free(dst);
+        return False;
+    }
+    if(!zck_index_add_to_chunk(zck, dst, dst_size, 0)) {
+        free(dst);
+        return False;
+    }
+    if(!zck_index_finish_chunk(zck)) {
         free(dst);
         return False;
     }
-    zck_index_add_to_chunk(zck, dst, dst_size, src_size);
-    zck_index_finish_chunk(zck);
     free(dst);
     return True;
 }
 
 int zck_decompress(zckCtx *zck, const char *src, const size_t src_size,
                    char **dst, size_t dst_size) {
+    VALIDATE(zck);
+
     zckComp *comp = &(zck->comp);
     *dst = NULL;
 
@@ -127,14 +181,17 @@ int zck_decompress(zckCtx *zck, const char *src, const size_t src_size,
 }
 
 int zck_comp_close(zckCtx *zck) {
-    if(zck->comp.cctx) {
-        zck->comp.started = 0;
-        return zck->comp.close(&(zck->comp));
-    }
-    return True;
+    VALIDATE(zck);
+
+    zck->comp.started = 0;
+    if(zck->comp.close == NULL)
+        return True;
+    return zck->comp.close(&(zck->comp));
 }
 
 int zck_set_compression_type(zckCtx *zck, int type) {
+    VALIDATE(zck);
+
     zckComp *comp = &(zck->comp);
 
     /* Cannot change compression type after compression has started */
@@ -163,6 +220,8 @@ int zck_set_compression_type(zckCtx *zck, int type) {
 }
 
 int zck_set_comp_parameter(zckCtx *zck, int option, void *value) {
+    VALIDATE(zck);
+
     /* Cannot change compression parameters after compression has started */
     if(zck && zck->comp.started) {
         zck_log(ZCK_LOG_ERROR,
index 11cb77c983d163c655b64fd87b76ef7c3024a0b5..0b672066d634325ea5d50c9955b46cf240edbb4b 100644 (file)
@@ -35,8 +35,16 @@ static int zck_nocomp_init(zckComp *comp) {
     return True;
 }
 
+static int zck_nocomp_end_chunk(zckComp *comp, char **dst, size_t *dst_size,
+                                int use_dict) {
+    *dst = NULL;
+    *dst_size = 0;
+
+    return True;
+}
+
 static int zck_nocomp_comp(zckComp *comp, const char *src, const size_t src_size,
-                        char **dst, size_t *dst_size, int use_dict) {
+                           char **dst, size_t *dst_size, int use_dict) {
     *dst = zmalloc(src_size);
     if(dst == NULL) {
         zck_log(ZCK_LOG_ERROR, "Unable to allocate %lu bytes\n", src_size);
@@ -49,8 +57,9 @@ static int zck_nocomp_comp(zckComp *comp, const char *src, const size_t src_size
     return True;
 }
 
-static int zck_nocomp_decomp(zckComp *comp, const char *src, const size_t src_size,
-                        char **dst, size_t dst_size, int use_dict) {
+static int zck_nocomp_decomp(zckComp *comp, const char *src,
+                             const size_t src_size, char **dst, size_t dst_size,
+                             int use_dict) {
     *dst = zmalloc(src_size);
     if(dst == NULL) {
         zck_log(ZCK_LOG_ERROR, "Unable to allocate %lu bytes\n", src_size);
@@ -82,6 +91,7 @@ int zck_nocomp_setup(zckComp *comp) {
     comp->init = zck_nocomp_init;
     comp->set_parameter = zck_nocomp_set_parameter;
     comp->compress = zck_nocomp_comp;
+    comp->end_chunk = zck_nocomp_end_chunk;
     comp->decompress = zck_nocomp_decomp;
     comp->close = zck_nocomp_close;
     comp->type = ZCK_COMP_NONE;
index 924f6d69af6213202fb365f34b0534ec699b42f1..51a9ed2c36888c5036c0b7b4eaebc50eef85dd85 100644 (file)
 
 #include <stdlib.h>
 #include <stdint.h>
+#include <string.h>
 #include <zstd.h>
 #include <zck.h>
 
 #include "zck_private.h"
 
-static int zck_zstd_init(zckComp *comp) {
+#define VALIDATE(f)     if(!f) { \
+                            zck_log(ZCK_LOG_ERROR, "zckComp not initialized\n"); \
+                            return False; \
+                        }
+
+static int init(zckComp *comp) {
     comp->cctx = ZSTD_createCCtx();
     comp->dctx = ZSTD_createDCtx();
     if(comp->dict && comp->dict_size > 0) {
@@ -52,10 +58,31 @@ static int zck_zstd_init(zckComp *comp) {
     return True;
 }
 
-static int zck_zstd_compress(zckComp *comp, const char *src,
-                             const size_t src_size, char **dst,
-                             size_t *dst_size, int use_dict) {
-    size_t max_size = ZSTD_compressBound(src_size);
+/* The zstd compression format doesn't allow streaming compression with a dict
+ * unless you statically link to it.  If we have a dict, we do pseudo-streaming
+ * compression where we buffer the data until the chunk ends. */
+static int compress(zckComp *comp, const char *src, const size_t src_size,
+                    char **dst, size_t *dst_size, int use_dict) {
+    if(comp->data == NULL)
+        comp->data = zmalloc(src_size);
+    else
+        comp->data = realloc(comp->data, comp->data_size + src_size);
+    if(comp->data == NULL) {
+        zck_log(ZCK_LOG_ERROR, "Unable to allocate %lu bytes\n",
+                comp->data_size + src_size);
+        return False;
+    }
+    memcpy(comp->data + comp->data_size, src, src_size);
+    *dst = NULL;
+    *dst_size = 0;
+    comp->data_size += src_size;
+    return True;
+}
+
+static int end_chunk(zckComp *comp, char **dst, size_t *dst_size,
+                     int use_dict) {
+    VALIDATE(comp);
+    size_t max_size = ZSTD_compressBound(comp->data_size);
     if(ZSTD_isError(max_size)) {
         zck_log(ZCK_LOG_ERROR, "zstd compression error: %s\n",
                 ZSTD_getErrorName(max_size));
@@ -69,12 +96,16 @@ static int zck_zstd_compress(zckComp *comp, const char *src,
     }
 
     if(use_dict && comp->cdict_ctx) {
-        *dst_size = ZSTD_compress_usingCDict(comp->cctx, *dst, max_size, src,
-                                             src_size, comp->cdict_ctx);
+        *dst_size = ZSTD_compress_usingCDict(comp->cctx, *dst, max_size,
+                                             comp->data, comp->data_size,
+                                             comp->cdict_ctx);
     } else {
-        *dst_size = ZSTD_compressCCtx(comp->cctx, *dst, max_size, src, src_size,
-                                      comp->level);
+        *dst_size = ZSTD_compressCCtx(comp->cctx, *dst, max_size, comp->data,
+                                      comp->data_size, comp->level);
     }
+    free(comp->data);
+    comp->data = NULL;
+    comp->data_size = 0;
     if(ZSTD_isError(*dst_size)) {
         zck_log(ZCK_LOG_ERROR, "zstd compression error: %s\n",
                 ZSTD_getErrorName(*dst_size));
@@ -83,9 +114,10 @@ static int zck_zstd_compress(zckComp *comp, const char *src,
     return True;
 }
 
-static int zck_zstd_decompress(zckComp *comp, const char *src,
-                               const size_t src_size, char **dst,
-                               size_t dst_size, int use_dict) {
+static int decompress(zckComp *comp, const char *src, const size_t src_size,
+                      char **dst, size_t dst_size, int use_dict) {
+    VALIDATE(comp);
+
     *dst = zmalloc(dst_size);
     if(dst == NULL) {
         zck_log(ZCK_LOG_ERROR, "Unable to allocate %lu bytes\n", dst_size);
@@ -109,7 +141,7 @@ static int zck_zstd_decompress(zckComp *comp, const char *src,
     return True;
 }
 
-static int zck_zstd_close(zckComp *comp) {
+static int close(zckComp *comp) {
     if(comp->cdict_ctx) {
         ZSTD_freeCDict(comp->cdict_ctx);
         comp->cdict_ctx = NULL;
@@ -126,10 +158,15 @@ static int zck_zstd_close(zckComp *comp) {
         ZSTD_freeDCtx(comp->dctx);
         comp->dctx = NULL;
     }
+    if(comp->data) {
+        free(comp->data);
+        comp->data = NULL;
+    }
+    comp->data_size = 0;
     return True;
 }
 
-static int zck_zstd_set_parameter(zckComp *comp, int option, void *value) {
+static int set_parameter(zckComp *comp, int option, void *value) {
     if(option == ZCK_ZCK_COMP_LEVEL) {
         if(*(int*)value >= 0 && *(int*)value <= ZSTD_maxCLevel()) {
             comp->level = *(int*)value;
@@ -140,18 +177,19 @@ static int zck_zstd_set_parameter(zckComp *comp, int option, void *value) {
     return False;
 }
 
-static int zck_zstd_set_default_parameters(zckComp *comp) {
+static int set_default_parameters(zckComp *comp) {
     /* Set default compression level to 16 */
     int level=16;
-    return zck_zstd_set_parameter(comp, ZCK_ZCK_COMP_LEVEL, &level);
+    return set_parameter(comp, ZCK_ZCK_COMP_LEVEL, &level);
 }
 
 int zck_zstd_setup(zckComp *comp) {
-    comp->init = zck_zstd_init;
-    comp->set_parameter = zck_zstd_set_parameter;
-    comp->compress = zck_zstd_compress;
-    comp->decompress = zck_zstd_decompress;
-    comp->close = zck_zstd_close;
+    comp->init = init;
+    comp->set_parameter = set_parameter;
+    comp->compress = compress;
+    comp->end_chunk = end_chunk;
+    comp->decompress = decompress;
+    comp->close = close;
     comp->type = ZCK_COMP_ZSTD;
-    return zck_zstd_set_default_parameters(comp);
+    return set_default_parameters(comp);
 }
index 9902b380c651f98570367d9d657feb75219dce56..d0eeeae27c952dde22ef3e08964849e8ce84e5a2 100644 (file)
@@ -199,19 +199,19 @@ int zck_index_add_to_chunk(zckCtx *zck, char *data, size_t comp_size,
                            size_t orig_size) {
     VALIDATE(zck);
 
-    if(comp_size == 0)
-        return True;
-
     if(zck->work_index_item == NULL && !zck_index_create_chunk(zck))
         return False;
 
+    zck->work_index_item->length += orig_size;
+    if(comp_size == 0)
+        return True;
+
     if(!zck_hash_update(&(zck->full_hash), data, comp_size))
         return False;
     if(!zck_hash_update(&(zck->work_index_hash), data, comp_size))
         return False;
 
     zck->work_index_item->comp_length += comp_size;
-    zck->work_index_item->length += orig_size;
     return True;
 }
 
index d236b1b53f9250154d05a8c127150d1ac01cf339..63c713cd24b65ac7fe40a98d9be3f0cb33189a65 100644 (file)
@@ -262,12 +262,16 @@ int zck_import_dict(zckCtx *zck, char *data, size_t size) {
                 "Attempting to initialize an empty compression dictionary\n");
         return False;
     }
+    zck_log(ZCK_LOG_DEBUG, "Closing compression\n");
     if(!zck_comp_close(zck))
         return False;
+    zck_log(ZCK_LOG_DEBUG, "setting dict 1\n");
     if(!zck_set_comp_parameter(zck, ZCK_COMMON_DICT, data))
         return False;
+    zck_log(ZCK_LOG_DEBUG, "setting dict 2\n");
     if(!zck_set_comp_parameter(zck, ZCK_COMMON_DICT_SIZE, &size))
         return False;
+    zck_log(ZCK_LOG_DEBUG, "Initializing\n");
     if(!zck_comp_init(zck))
         return False;
     return True;
@@ -325,11 +329,13 @@ int zck_validate_file(zckCtx *zck) {
                 "Unable to calculate %s checksum for full file\n");
         return False;
     }
+    zck_log(ZCK_LOG_DEBUG, "Checking data checksum\n");
     if(memcmp(digest, zck->full_hash_digest, zck->hash_type.digest_size) != 0) {
         free(digest);
-        zck_log(ZCK_LOG_ERROR, "Final file failed checksum\n");
+        zck_log(ZCK_LOG_ERROR, "Data checksum failed!\n");
         return False;
     }
+    zck_log(ZCK_LOG_DEBUG, "Data checksum valid\n");
     free(digest);
     return True;
 }
index 80f8c899351b461159d8cf50b5fa2a37e8baaa15..5aa57647722fab70ad6c3f52619c0d9ce28ebb07 100644 (file)
@@ -15,6 +15,8 @@ struct zckComp;
 
 typedef int (*finit)(struct zckComp *comp);
 typedef int (*fparam)(struct zckComp *comp, int option, void *value);
+typedef int (*fcompend)(struct zckComp *comp, char **dst, size_t *dst_size,
+                        int use_dict);
 typedef int (*fcomp)(struct zckComp *comp, const char *src,
                      const size_t src_size, char **dst, size_t *dst_size,
                      int use_dict);
@@ -71,9 +73,14 @@ typedef struct zckComp {
     void *ddict_ctx;
     void *dict;
     size_t dict_size;
+
+    char *data;
+    size_t data_size;
+
     finit init;
     fparam set_parameter;
     fcomp compress;
+    fcompend end_chunk;
     fdecomp decompress;
     fcclose close;
 } zckComp;
index e2cfc13dc4c41b300d4dd3eff617489b03b77484..23d84ce627968078d1ec19749ae9cabce430233a 100644 (file)
--- a/src/zck.c
+++ b/src/zck.c
@@ -43,7 +43,6 @@ int main (int argc, char *argv[]) {
     char *out_name;
     char *dict = NULL;
     size_t dict_size = 0;
-    zckCtx *zck = zck_create();
 
     zck_set_log_level(ZCK_LOG_DEBUG);
 
@@ -86,6 +85,10 @@ int main (int argc, char *argv[]) {
         exit(1);
     }
     free(out_name);
+
+    zckCtx *zck = zck_create();
+    if(zck == NULL)
+        exit(1);
     if(!zck_init_write(zck, dst_fd))
         exit(1);
 
@@ -161,6 +164,8 @@ int main (int argc, char *argv[]) {
                     printf("Compressing %li bytes\n", next-found);
                     if(!zck_compress(zck, found, next-found))
                         exit(1);
+                    if(!zck_end_chunk(zck))
+                        exit(1);
                     found = next;
                     search = next + 1;
                     if(search > data + in_size)
@@ -169,6 +174,8 @@ int main (int argc, char *argv[]) {
                     printf("Completing %li bytes\n", data+in_size-found);
                     if(!zck_compress(zck, found, data+in_size-found))
                         exit(1);
+                    if(!zck_end_chunk(zck))
+                        exit(1);
                     search = NULL;
                 }
             }
@@ -201,6 +208,8 @@ int main (int argc, char *argv[]) {
                 printf("Completing %li bytes\n", cur_loc-start);
                 if(!zck_compress(zck, start, cur_loc-start))
                     exit(1);
+                if(!zck_end_chunk(zck))
+                    exit(1);
                 start = cur_loc;
             }
         }