lib/pull: Wait for pending ops to complete on error
authorColin Walters <walters@verbum.org>
Mon, 18 Sep 2017 16:08:48 +0000 (12:08 -0400)
committerAtomic Bot <atomic-devel@projectatomic.io>
Tue, 19 Sep 2017 19:05:26 +0000 (19:05 +0000)
I saw in a stack trace that the main thread was calling `exit()` even while
worker threads were alive and doing sha256/write/fsync etc. for objects.

The stack trace was a SEGV as the main thread was calling into library
`atexit()` handlers and we were a liblz4 destructor:

```
 #0  0x00007f2db790f8d4 _fini (liblz4.so.1)
 #1  0x00007f2dbbae1c68 __run_exit_handlers (libc.so.6)
```

(Why that library has a destructor I don't know offhand, can't find
 it in the source in a quick look)

Anyways, global library destructors and worker threads continuing simply don't
mix. Let's wait for our outstanding operations before we exit. This is also a
good idea for projects using libostree as a shared library, as we don't want
worker threads outliving operations.

Our existing pull corruption tests exercise coverage here.

I added a new `caught-error` status boolean to the progress API, and use it the
commandline to tell the user that we're waiting for outstanding ops.

Closes: #1185
Approved by: jlebon

src/libostree/ostree-repo-pull.c
src/libostree/ostree-repo.c

index 92aeb5bcc57bd96f114c9f9ce249eeec5484584d..91fa7a964ab7110442d18f060e06145a60b5adab 100644 (file)
@@ -218,6 +218,7 @@ static gboolean scan_one_metadata_object_c (OtPullData                 *pull_dat
                                             const OstreeCollectionRef  *ref,
                                             GCancellable               *cancellable,
                                             GError                    **error);
+static void scan_object_queue_data_free (ScanObjectQueueData *scan_data);
 
 static gboolean
 update_progress (gpointer user_data)
@@ -258,6 +259,7 @@ update_progress (gpointer user_data)
                              "fetched", "u", fetched,
                              "requested", "u", requested,
                              "scanning", "u", g_queue_is_empty (&pull_data->scan_object_queue) ? 0 : 1,
+                             "caught-error", "b", pull_data->caught_error,
                              "scanned-metadata", "u", n_scanned_metadata,
                              "bytes-transferred", "t", bytes_transferred,
                              "start-time", "t", start_time,
@@ -309,9 +311,6 @@ pull_termination_condition (OtPullData          *pull_data)
   /* we only enter the main loop when we're fetching objects */
   g_assert (pull_data->phase == OSTREE_PULL_PHASE_FETCHING_OBJECTS);
 
-  if (pull_data->caught_error)
-    return TRUE;
-
   if (pull_data->dry_run)
     return pull_data->dry_run_emitted_progress;
 
@@ -321,6 +320,10 @@ pull_termination_condition (OtPullData          *pull_data)
   return current_idle;
 }
 
+/* Most async operations finish by calling this function; it will consume
+ * @errorp if set, update statistics, and initiate processing of any further
+ * requests as appropriate.
+ */
 static void
 check_outstanding_requests_handle_error (OtPullData          *pull_data,
                                          GError             **errorp)
@@ -340,6 +343,18 @@ check_outstanding_requests_handle_error (OtPullData          *pull_data,
           g_clear_error (errorp);
         }
     }
+
+  /* If we're in error state, we wait for any pending operations to complete,
+   * but ensure that all no further operations are queued.
+   */
+  if (pull_data->caught_error)
+    {
+      g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
+      g_queue_clear (&pull_data->scan_object_queue);
+      g_hash_table_remove_all (pull_data->pending_fetch_metadata);
+      g_hash_table_remove_all (pull_data->pending_fetch_deltaparts);
+      g_hash_table_remove_all (pull_data->pending_fetch_content);
+    }
   else
     {
       GHashTableIter hiter;
@@ -424,6 +439,15 @@ fetcher_queue_is_full (OtPullData *pull_data)
   return fetch_full || deltas_full || writes_full;
 }
 
+static void
+scan_object_queue_data_free (ScanObjectQueueData *scan_data)
+{
+  g_free (scan_data->path);
+  if (scan_data->requested_ref != NULL)
+    ostree_collection_ref_free (scan_data->requested_ref);
+  g_free (scan_data);
+}
+
 static gboolean
 idle_worker (gpointer user_data)
 {
@@ -447,11 +471,8 @@ idle_worker (gpointer user_data)
                               pull_data->cancellable,
                               &error);
   check_outstanding_requests_handle_error (pull_data, &error);
+  scan_object_queue_data_free (scan_data);
 
-  g_free (scan_data->path);
-  if (scan_data->requested_ref != NULL)
-    ostree_collection_ref_free (scan_data->requested_ref);
-  g_free (scan_data);
   return G_SOURCE_CONTINUE;
 }
 
@@ -4245,6 +4266,8 @@ ostree_repo_pull_with_options (OstreeRepo             *self,
   g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref);
+  g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
+  g_queue_clear (&pull_data->scan_object_queue);
   g_clear_pointer (&pull_data->idle_src, (GDestroyNotify) g_source_destroy);
   g_clear_pointer (&pull_data->dirs, (GDestroyNotify) g_ptr_array_unref);
   g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);
index 440d2bf9b40cf7130c2f9a8027865b838e0523e9..0056d8056eb2b899f54c2a931c93bef4f70041e0 100644 (file)
@@ -4023,7 +4023,7 @@ ostree_repo_pull_default_console_progress_changed (OstreeAsyncProgress *progress
                                                    gpointer             user_data)
 {
   g_autofree char *status = NULL;
-  gboolean scanning;
+  gboolean caught_error, scanning;
   guint outstanding_fetches;
   guint outstanding_metadata_fetches;
   guint outstanding_writes;
@@ -4039,6 +4039,7 @@ ostree_repo_pull_default_console_progress_changed (OstreeAsyncProgress *progress
                              "outstanding-fetches", "u", &outstanding_fetches,
                              "outstanding-metadata-fetches", "u", &outstanding_metadata_fetches,
                              "outstanding-writes", "u", &outstanding_writes,
+                             "caught-error", "b", &caught_error,
                              "scanning", "u", &scanning,
                              "scanned-metadata", "u", &n_scanned_metadata,
                              "fetched-delta-parts", "u", &fetched_delta_parts,
@@ -4052,6 +4053,10 @@ ostree_repo_pull_default_console_progress_changed (OstreeAsyncProgress *progress
     {
       g_string_append (buf, status);
     }
+  else if (caught_error)
+    {
+      g_string_append_printf (buf, "Caught error, waiting for outstanding tasks");
+    }
   else if (outstanding_fetches)
     {
       guint64 bytes_transferred, start_time, total_delta_part_size;