static deltas: Process each part as soon as its done
authorAlexander Larsson <alexl@redhat.com>
Wed, 25 Oct 2017 20:32:02 +0000 (22:32 +0200)
committerAtomic Bot <atomic-devel@projectatomic.io>
Fri, 27 Oct 2017 21:49:26 +0000 (21:49 +0000)
Directly when we allocate a new part we finish the old one,
writing the compressed data to a temporary file and generating
the delta header for it.

When all these are done we loop over them and collect the headers,
sizes and either copy the tempfile data into the inlined superblock
or link the tempfiles to disk with the proper names.

Closes: #1309
Approved by: cgwalters

src/libostree/ostree-repo-static-delta-compilation.c

index 70a18c365793bbf519e328a6f95f6b695e99a01f..41cd347eadaf3b50ce901b81c522b9e55e9c313c 100644 (file)
@@ -44,6 +44,7 @@ typedef enum {
 } DeltaOpts;
 
 typedef struct {
+  guint64 compressed_size;
   guint64 uncompressed_size;
   GPtrArray *objects;
   GString *payload;
@@ -52,6 +53,8 @@ typedef struct {
   GPtrArray *modes;
   GHashTable *xattr_set; /* GVariant(ayay) -> offset */
   GPtrArray *xattrs;
+  GLnxTmpfile part_tmpf;
+  GVariant *header;
 } OstreeStaticDeltaPartBuilder;
 
 typedef struct {
@@ -66,6 +69,8 @@ typedef struct {
   guint n_bsdiff;
   guint n_fallback;
   gboolean swap_endian;
+  int parts_dfd;
+  DeltaOpts delta_opts;
 } OstreeStaticDeltaBuilder;
 
 /* Get an input stream for a GVariant */
@@ -119,6 +124,9 @@ ostree_static_delta_part_builder_unref (OstreeStaticDeltaPartBuilder *part_build
   g_ptr_array_unref (part_builder->modes);
   g_hash_table_unref (part_builder->xattr_set);
   g_ptr_array_unref (part_builder->xattrs);
+  glnx_tmpfile_clear (&part_builder->part_tmpf);
+  if (part_builder->header)
+    g_variant_unref (part_builder->header);
   g_free (part_builder);
 }
 
@@ -200,10 +208,123 @@ xattr_chunk_equals (const void *one, const void *two)
   return memcmp (g_variant_get_data (v1), g_variant_get_data (v2), l1) == 0;
 }
 
+static gboolean
+finish_part (OstreeStaticDeltaBuilder *builder, GError **error)
+{
+  OstreeStaticDeltaPartBuilder *part_builder = builder->parts->pdata[builder->parts->len - 1];
+  g_autofree guchar *part_checksum = NULL;
+  g_autoptr(GBytes) objtype_checksum_array = NULL;
+  g_autoptr(GBytes) checksum_bytes = NULL;
+  g_autoptr(GOutputStream) part_temp_outstream = NULL;
+  g_autoptr(GInputStream) part_in = NULL;
+  g_autoptr(GInputStream) part_payload_in = NULL;
+  g_autoptr(GMemoryOutputStream) part_payload_out = NULL;
+  g_autoptr(GConverterOutputStream) part_payload_compressor = NULL;
+  g_autoptr(GConverter) compressor = NULL;
+  g_autoptr(GVariant) delta_part_content = NULL;
+  g_autoptr(GVariant) delta_part = NULL;
+  g_autoptr(GVariant) delta_part_header = NULL;
+  g_auto(GVariantBuilder) mode_builder = OT_VARIANT_BUILDER_INITIALIZER;
+  g_auto(GVariantBuilder) xattr_builder = OT_VARIANT_BUILDER_INITIALIZER;
+  guint8 compression_type_char;
+
+  g_variant_builder_init (&mode_builder, G_VARIANT_TYPE ("a(uuu)"));
+  g_variant_builder_init (&xattr_builder, G_VARIANT_TYPE ("aa(ayay)"));
+  guint j;
+
+  for (j = 0; j < part_builder->modes->len; j++)
+    g_variant_builder_add_value (&mode_builder, part_builder->modes->pdata[j]);
+
+  for (j = 0; j < part_builder->xattrs->len; j++)
+    g_variant_builder_add_value (&xattr_builder, part_builder->xattrs->pdata[j]);
+
+  {
+    g_autoptr(GBytes) payload_b;
+    g_autoptr(GBytes) operations_b;
+
+    payload_b = g_string_free_to_bytes (part_builder->payload);
+    part_builder->payload = NULL;
+
+    operations_b = g_string_free_to_bytes (part_builder->operations);
+    part_builder->operations = NULL;
+
+    delta_part_content = g_variant_new ("(a(uuu)aa(ayay)@ay@ay)",
+                                        &mode_builder, &xattr_builder,
+                                        ot_gvariant_new_ay_bytes (payload_b),
+                                        ot_gvariant_new_ay_bytes (operations_b));
+    g_variant_ref_sink (delta_part_content);
+  }
+
+  /* Hardcode xz for now */
+  compressor = (GConverter*)_ostree_lzma_compressor_new (NULL);
+  compression_type_char = 'x';
+  part_payload_in = variant_to_inputstream (delta_part_content);
+  part_payload_out = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+  part_payload_compressor = (GConverterOutputStream*)g_converter_output_stream_new ((GOutputStream*)part_payload_out, compressor);
+
+  {
+    gssize n_bytes_written = g_output_stream_splice ((GOutputStream*)part_payload_compressor, part_payload_in,
+                                                     G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET | G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+                                                     NULL, error);
+    if (n_bytes_written < 0)
+      return FALSE;
+  }
+
+  g_clear_pointer (&delta_part_content, g_variant_unref);
+
+  { g_autoptr(GBytes) payload = g_memory_output_stream_steal_as_bytes (part_payload_out);
+    delta_part = g_variant_ref_sink (g_variant_new ("(y@ay)",
+                                                    compression_type_char,
+                                                    ot_gvariant_new_ay_bytes (payload)));
+  }
+
+  if (!glnx_open_tmpfile_linkable_at (builder->parts_dfd, ".", O_RDWR | O_CLOEXEC,
+                                      &part_builder->part_tmpf, error))
+    return FALSE;
+
+  part_temp_outstream = g_unix_output_stream_new (part_builder->part_tmpf.fd, FALSE);
+
+  part_in = variant_to_inputstream (delta_part);
+  if (!ot_gio_splice_get_checksum (part_temp_outstream, part_in,
+                                   &part_checksum,
+                                   NULL, error))
+    return FALSE;
+
+  checksum_bytes = g_bytes_new (part_checksum, OSTREE_SHA256_DIGEST_LEN);
+  objtype_checksum_array = objtype_checksum_array_new (part_builder->objects);
+  delta_part_header = g_variant_new ("(u@aytt@ay)",
+                                     maybe_swap_endian_u32 (builder->swap_endian, OSTREE_DELTAPART_VERSION),
+                                     ot_gvariant_new_ay_bytes (checksum_bytes),
+                                     maybe_swap_endian_u64 (builder->swap_endian, (guint64) g_variant_get_size (delta_part)),
+                                     maybe_swap_endian_u64 (builder->swap_endian, part_builder->uncompressed_size),
+                                     ot_gvariant_new_ay_bytes (objtype_checksum_array));
+  g_variant_ref_sink (delta_part_header);
+
+  part_builder->header = g_variant_ref (delta_part_header);
+  part_builder->compressed_size = g_variant_get_size (delta_part);
+
+  if (builder->delta_opts & DELTAOPT_FLAG_VERBOSE)
+    {
+      g_printerr ("part %u n:%u compressed:%" G_GUINT64_FORMAT " uncompressed:%" G_GUINT64_FORMAT "\n",
+                  builder->parts->len, part_builder->objects->len,
+                  part_builder->compressed_size,
+                  part_builder->uncompressed_size);
+    }
+
+  return TRUE;
+}
+
 static OstreeStaticDeltaPartBuilder *
-allocate_part (OstreeStaticDeltaBuilder *builder)
+allocate_part (OstreeStaticDeltaBuilder *builder, GError **error)
 {
   OstreeStaticDeltaPartBuilder *part = g_new0 (OstreeStaticDeltaPartBuilder, 1);
+
+  if (builder->parts->len > 0)
+    {
+      if (!finish_part (builder, error))
+        return NULL;
+    }
+
   part->objects = g_ptr_array_new_with_free_func ((GDestroyNotify)g_variant_unref);
   part->payload = g_string_new (NULL);
   part->operations = g_string_new (NULL);
@@ -351,7 +472,10 @@ process_one_object (OstreeRepo                       *repo,
   if (current_part->objects->len > 0 &&
       current_part->payload->len + content_size > builder->max_chunk_size_bytes)
     {
-      *current_part_val = current_part = allocate_part (builder);
+      current_part = allocate_part (builder, error);
+      if (current_part == NULL)
+        return FALSE;
+      *current_part_val = current_part;
     }
 
   guint64 compressed_size;
@@ -590,7 +714,10 @@ process_one_rollsum (OstreeRepo                       *repo,
   if (current_part->objects->len > 0 &&
       current_part->payload->len > builder->max_chunk_size_bytes)
     {
-      *current_part_val = current_part = allocate_part (builder);
+      current_part = allocate_part (builder, error);
+      if (current_part == NULL)
+        return FALSE;
+      *current_part_val = current_part;
     }
 
   g_autoptr(GBytes) tmp_to = NULL;
@@ -705,7 +832,10 @@ process_one_bsdiff (OstreeRepo                       *repo,
   if (current_part->objects->len > 0 &&
       current_part->payload->len > builder->max_chunk_size_bytes)
     {
-      *current_part_val = current_part = allocate_part (builder);
+      current_part = allocate_part (builder, error);
+      if (current_part == NULL)
+        return FALSE;
+      *current_part_val = current_part;
     }
 
   g_autoptr(GBytes) tmp_from = NULL;
@@ -977,7 +1107,9 @@ generate_delta_lowlatency (OstreeRepo                       *repo,
                   g_hash_table_size (modified_regfile_content));
     }
 
-  current_part = allocate_part (builder);
+  current_part = allocate_part (builder, error);
+  if (current_part == NULL)
+    return FALSE;
 
   /* Pack the metadata first */
   g_hash_table_iter_init (&hashiter, new_reachable_metadata);
@@ -1093,6 +1225,9 @@ generate_delta_lowlatency (OstreeRepo                       *repo,
         return FALSE;
     }
 
+  if (!finish_part (builder, error))
+    return FALSE;
+
   return TRUE;
 }
 
@@ -1258,6 +1393,26 @@ ostree_repo_static_delta_generate (OstreeRepo                   *self,
                                  &to_commit, error))
     goto out;
 
+  if (opt_filename)
+    {
+      g_autofree char *dnbuf = g_strdup (opt_filename);
+      const char *dn = dirname (dnbuf);
+      if (!glnx_opendirat (AT_FDCWD, dn, TRUE, &tmp_dfd, error))
+        goto out;
+    }
+  else
+    {
+      tmp_dfd = fcntl (self->tmp_dir_fd, F_DUPFD_CLOEXEC, 3);
+      if (tmp_dfd < 0)
+        {
+          glnx_set_error_from_errno (error);
+          goto out;
+        }
+    }
+
+  builder.parts_dfd = tmp_dfd;
+  builder.delta_opts = delta_opts;
+
   /* Ignore optimization flags */
   if (!generate_delta_lowlatency (self, from, to, delta_opts, &builder,
                                   cancellable, error))
@@ -1331,153 +1486,45 @@ ostree_repo_static_delta_generate (OstreeRepo                   *self,
       goto out;
   }
 
-  if (opt_filename)
-    {
-      g_autofree char *dnbuf = g_strdup (opt_filename);
-      const char *dn = dirname (dnbuf);
-      if (!glnx_opendirat (AT_FDCWD, dn, TRUE, &tmp_dfd, error))
-        goto out;
-    }
-  else
-    {
-      tmp_dfd = fcntl (self->tmp_dir_fd, F_DUPFD_CLOEXEC, 3);
-      if (tmp_dfd < 0)
-        {
-          glnx_set_error_from_errno (error);
-          goto out;
-        }
-    }
-
   part_headers = g_variant_builder_new (G_VARIANT_TYPE ("a" OSTREE_STATIC_DELTA_META_ENTRY_FORMAT));
   part_temp_paths = g_ptr_array_new_with_free_func ((GDestroyNotify)glnx_tmpfile_clear);
   for (i = 0; i < builder.parts->len; i++)
     {
       OstreeStaticDeltaPartBuilder *part_builder = builder.parts->pdata[i];
-      g_autoptr(GBytes) payload_b;
-      g_autoptr(GBytes) operations_b;
-      g_autofree guchar *part_checksum = NULL;
-      g_autoptr(GBytes) objtype_checksum_array = NULL;
-      g_autoptr(GBytes) checksum_bytes = NULL;
-      g_autoptr(GOutputStream) part_temp_outstream = NULL;
-      g_autoptr(GInputStream) part_in = NULL;
-      g_autoptr(GInputStream) part_payload_in = NULL;
-      g_autoptr(GMemoryOutputStream) part_payload_out = NULL;
-      g_autoptr(GConverterOutputStream) part_payload_compressor = NULL;
-      g_autoptr(GConverter) compressor = NULL;
-      g_autoptr(GVariant) delta_part_content = NULL;
-      g_autoptr(GVariant) delta_part = NULL;
-      g_autoptr(GVariant) delta_part_header = NULL;
-      g_auto(GVariantBuilder) mode_builder = OT_VARIANT_BUILDER_INITIALIZER;
-      g_auto(GVariantBuilder) xattr_builder = OT_VARIANT_BUILDER_INITIALIZER;
-      guint8 compression_type_char;
-
-      g_variant_builder_init (&mode_builder, G_VARIANT_TYPE ("a(uuu)"));
-      g_variant_builder_init (&xattr_builder, G_VARIANT_TYPE ("aa(ayay)"));
-      { guint j;
-        for (j = 0; j < part_builder->modes->len; j++)
-          g_variant_builder_add_value (&mode_builder, part_builder->modes->pdata[j]);
-
-        for (j = 0; j < part_builder->xattrs->len; j++)
-          g_variant_builder_add_value (&xattr_builder, part_builder->xattrs->pdata[j]);
-      }
-
-      payload_b = g_string_free_to_bytes (part_builder->payload);
-      part_builder->payload = NULL;
-
-      operations_b = g_string_free_to_bytes (part_builder->operations);
-      part_builder->operations = NULL;
-      /* FIXME - avoid duplicating memory here */
-      delta_part_content = g_variant_new ("(a(uuu)aa(ayay)@ay@ay)",
-                                          &mode_builder, &xattr_builder,
-                                          ot_gvariant_new_ay_bytes (payload_b),
-                                          ot_gvariant_new_ay_bytes (operations_b));
-      g_variant_ref_sink (delta_part_content);
-
-      /* Hardcode xz for now */
-      compressor = (GConverter*)_ostree_lzma_compressor_new (NULL);
-      compression_type_char = 'x';
-      part_payload_in = variant_to_inputstream (delta_part_content);
-      part_payload_out = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-      part_payload_compressor = (GConverterOutputStream*)g_converter_output_stream_new ((GOutputStream*)part_payload_out, compressor);
-
-      {
-        gssize n_bytes_written = g_output_stream_splice ((GOutputStream*)part_payload_compressor, part_payload_in,
-                                                         G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET | G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
-                                                         cancellable, error);
-        if (n_bytes_written < 0)
-          goto out;
-      }
-
-      /* FIXME - avoid duplicating memory here */
-      { g_autoptr(GBytes) payload = g_memory_output_stream_steal_as_bytes (part_payload_out);
-        delta_part = g_variant_ref_sink (g_variant_new ("(y@ay)",
-                                                        compression_type_char,
-                                                        ot_gvariant_new_ay_bytes (payload)));
-      }
 
       if (inline_parts)
         {
           g_autofree char *part_relpath = _ostree_get_relative_static_delta_part_path (from, to, i);
-          if (!ot_variant_builder_add (descriptor_builder, error, "{sv}", part_relpath, delta_part))
+
+          lseek (part_builder->part_tmpf.fd, 0, SEEK_SET);
+
+          if (!ot_variant_builder_open (descriptor_builder, G_VARIANT_TYPE ("{sv}"), error) ||
+              !ot_variant_builder_add (descriptor_builder, error, "s", part_relpath) ||
+              !ot_variant_builder_open (descriptor_builder, G_VARIANT_TYPE ("v"), error) ||
+              !ot_variant_builder_add_from_fd (descriptor_builder, G_VARIANT_TYPE ("(yay)"), part_builder->part_tmpf.fd, part_builder->compressed_size, error) ||
+              !ot_variant_builder_close (descriptor_builder, error) ||
+              !ot_variant_builder_close (descriptor_builder, error))
             goto out;
         }
       else
         {
-          GLnxTmpfile *part_tmpf = g_new0 (GLnxTmpfile, 1);
+          g_autofree char *partstr = g_strdup_printf ("%u", i);
 
-          if (!glnx_open_tmpfile_linkable_at (tmp_dfd, ".", O_WRONLY | O_CLOEXEC,
-                                              part_tmpf, error))
-            goto out;
+          if (fchmod (part_builder->part_tmpf.fd, 0644) < 0)
+            {
+              glnx_set_error_from_errno (error);
+              goto out;
+            }
 
-          /* Transfer tempfile ownership */
-          part_temp_outstream = g_unix_output_stream_new (part_tmpf->fd, FALSE);
-          g_ptr_array_add (part_temp_paths, g_steal_pointer (&part_tmpf));
+          if (!glnx_link_tmpfile_at (&part_builder->part_tmpf, GLNX_LINK_TMPFILE_REPLACE,
+                                     descriptor_dfd, partstr, error))
+            goto out;
         }
 
-      part_in = variant_to_inputstream (delta_part);
-      if (!ot_gio_splice_get_checksum (part_temp_outstream, part_in,
-                                       &part_checksum,
-                                       cancellable, error))
-        goto out;
-
-      checksum_bytes = g_bytes_new (part_checksum, OSTREE_SHA256_DIGEST_LEN);
-      objtype_checksum_array = objtype_checksum_array_new (part_builder->objects);
-      delta_part_header = g_variant_new ("(u@aytt@ay)",
-                                         maybe_swap_endian_u32 (builder.swap_endian, OSTREE_DELTAPART_VERSION),
-                                         ot_gvariant_new_ay_bytes (checksum_bytes),
-                                         maybe_swap_endian_u64 (builder.swap_endian, (guint64) g_variant_get_size (delta_part)),
-                                         maybe_swap_endian_u64 (builder.swap_endian, part_builder->uncompressed_size),
-                                         ot_gvariant_new_ay_bytes (objtype_checksum_array));
+      g_variant_builder_add_value (part_headers, g_variant_ref (part_builder->header));
 
-      g_variant_builder_add_value (part_headers, g_variant_ref (delta_part_header));
-
-      total_compressed_size += g_variant_get_size (delta_part);
+      total_compressed_size += part_builder->compressed_size;
       total_uncompressed_size += part_builder->uncompressed_size;
-
-      if (delta_opts & DELTAOPT_FLAG_VERBOSE)
-        {
-          g_printerr ("part %u n:%u compressed:%" G_GUINT64_FORMAT " uncompressed:%" G_GUINT64_FORMAT "\n",
-                      i, part_builder->objects->len,
-                      (guint64)g_variant_get_size (delta_part),
-                      part_builder->uncompressed_size);
-        }
-    }
-  for (i = 0; i < part_temp_paths->len; i++)
-    {
-      g_autofree char *partstr = g_strdup_printf ("%u", i);
-      /* Take ownership of the path/fd here */
-      g_auto(GLnxTmpfile) tmpf = *((GLnxTmpfile*)part_temp_paths->pdata[i]);
-      g_clear_pointer (&(part_temp_paths->pdata[i]), g_free);
-
-      if (fchmod (tmpf.fd, 0644) < 0)
-        {
-          glnx_set_error_from_errno (error);
-          goto out;
-        }
-
-      if (!glnx_link_tmpfile_at (&tmpf, GLNX_LINK_TMPFILE_REPLACE,
-                                 descriptor_dfd, partstr, error))
-        goto out;
     }
 
   if (!get_fallback_headers (self, &builder, &fallback_headers,