fetcher/soup3: Rewrite without threads
authorDan Nicholson <dbn@endlessos.org>
Fri, 7 Apr 2023 15:47:14 +0000 (09:47 -0600)
committerDan Nicholson <dbn@endlessos.org>
Thu, 13 Apr 2023 15:00:23 +0000 (09:00 -0600)
soup3 works best using only the async API from a single thread[1].
Rework the fetcher to stop using worker threads. In order to maximize
session usage across requests, sessions will be reused for each main
context.

1. https://libsoup.org/libsoup-3.0/client-thread-safety.html

src/libostree/ostree-fetcher-soup3.c

index 9f495a72037c024fd7f8d6f8a0f431e1b4ee7383..b46ce34f8ef7599b429e8b989f87aaf634cf5a92 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2011 Colin Walters <walters@verbum.org>
  * Copyright (C) 2022 Igalia S.L.
+ * Copyright (C) 2023 Endless OS Foundation, LLC
  *
  * SPDX-License-Identifier: LGPL-2.0+
  *
  *
  * Author: Colin Walters <walters@verbum.org>
  * Author: Daniel Kolesa <dkolesa@igalia.com>
+ * Author: Dan Nicholson <dbn@endlessos.org>
  */
 
 #include "config.h"
 
 #include <gio/gio.h>
-#include <gio/gfiledescriptorbased.h>
 #include <gio/gunixoutputstream.h>
 #include <libsoup/soup.h>
 
 #include "ostree-fetcher-util.h"
 #include "ostree-tls-cert-interaction-private.h"
 #include "ostree-enumtypes.h"
-#include "ostree.h"
 #include "ostree-repo-private.h"
-#include "otutil.h"
-
-typedef enum {
-  OSTREE_FETCHER_STATE_PENDING,
-  OSTREE_FETCHER_STATE_DOWNLOADING,
-  OSTREE_FETCHER_STATE_COMPLETE
-} OstreeFetcherState;
-
-typedef struct {
-  int ref_count;  /* atomic */
-
-  SoupSession *session;  /* not referenced */
-  GMainContext *main_context;
-  volatile gint running;
-  GError *initialization_error; /* Any failure to load the db */
-
-  char *remote_name;
-  int base_tmpdir_dfd;
-
-  GVariant *extra_headers;
-  gboolean transfer_gzip;
-
-  /* Our active HTTP requests */
-  GHashTable *outstanding;
-
-  /* Shared across threads; be sure to lock. */
-  GHashTable *output_stream_set;  /* set<GOutputStream> */
-  GMutex output_stream_set_lock;
-
-  /* Also protected by output_stream_set_lock. */
-  guint64 total_downloaded;
-
-  GError *oob_error;
-} ThreadClosure;
 
 typedef struct {
-  int ref_count;  /* atomic */
-
-  ThreadClosure *thread_closure;
   GPtrArray *mirrorlist; /* list of base URIs */
   char *filename; /* relative name to fetch or NULL */
   guint mirrorlist_idx;
 
-  OstreeFetcherState state;
-
   SoupMessage *message;
-  GFile *file;
   struct OstreeFetcher *fetcher;
+  GMainContext *mainctx;
+  SoupSession *session;
+  GFile *file;
 
   gboolean is_membuf;
   OstreeFetcherRequestFlags flags;
   char *if_none_match;  /* request ETag */
   guint64 if_modified_since;  /* seconds since the epoch */
-  GInputStream *request_body;
+  GInputStream *response_body;
   GLnxTmpfile tmpf;
   GOutputStream *out_stream;
   gboolean out_not_modified;  /* TRUE if the server gave a HTTP 304 Not Modified response, which we don’t propagate as an error */
@@ -97,29 +60,26 @@ typedef struct {
 
   guint64 max_size;
   guint64 current_size;
-  guint64 content_length;
-} OstreeFetcherPendingURI;
-
-/* Used by session_thread_idle_add() */
-typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
-                                   gpointer data);
-
-/* Used by session_thread_idle_add() */
-typedef struct {
-  ThreadClosure *thread_closure;
-  SessionThreadFunc function;
-  gpointer data;
-  GDestroyNotify notify;
-} IdleClosure;
+  goffset content_length;
+} FetcherRequest;
 
 struct OstreeFetcher
 {
   GObject parent_instance;
 
   OstreeFetcherConfigFlags config_flags;
+  char *remote_name;
+  int tmpdir_dfd;
 
-  GThread *session_thread;
-  ThreadClosure *thread_closure;
+  GHashTable *sessions; /* (element-type GMainContext SoupSession ) */
+  GProxyResolver *proxy_resolver;
+  SoupCookieJar *cookie_jar;
+  OstreeTlsCertInteraction *client_cert;
+  GTlsDatabase *tls_database;
+  GVariant *extra_headers;
+  char *user_agent;
+
+  guint64 bytes_transferred;
 };
 
 enum {
@@ -129,399 +89,113 @@ enum {
 
 G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
 
-static ThreadClosure *
-thread_closure_ref (ThreadClosure *thread_closure)
-{
-  int refcount;
-  g_return_val_if_fail (thread_closure != NULL, NULL);
-  refcount = g_atomic_int_add (&thread_closure->ref_count, 1);
-  g_assert (refcount > 0);
-  return thread_closure;
-}
-
-static void
-thread_closure_unref (ThreadClosure *thread_closure)
-{
-  g_return_if_fail (thread_closure != NULL);
-
-  if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
-    {
-      /* The session thread should have cleared this by now. */
-      g_assert (thread_closure->session == NULL);
-
-      g_clear_pointer (&thread_closure->main_context, g_main_context_unref);
-
-      g_clear_pointer (&thread_closure->extra_headers, g_variant_unref);
-
-      g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
-      g_mutex_clear (&thread_closure->output_stream_set_lock);
-
-      g_clear_pointer (&thread_closure->oob_error, g_error_free);
-
-      g_free (thread_closure->remote_name);
-
-      g_slice_free (ThreadClosure, thread_closure);
-    }
-}
-
 static void
-idle_closure_free (IdleClosure *idle_closure)
-{
-  g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);
-
-  if (idle_closure->notify != NULL)
-    idle_closure->notify (idle_closure->data);
-
-  g_slice_free (IdleClosure, idle_closure);
-}
-
-static OstreeFetcherPendingURI *
-pending_uri_ref (OstreeFetcherPendingURI *pending)
-{
-  gint refcount;
-  g_assert (pending);
-  refcount = g_atomic_int_add (&pending->ref_count, 1);
-  g_assert (refcount > 0);
-  return pending;
+fetcher_request_free (FetcherRequest *request)
+{
+  g_debug ("Freeing request for %s", request->filename);
+  g_clear_pointer (&request->mirrorlist, g_ptr_array_unref);
+  g_clear_pointer (&request->filename, g_free);
+  g_clear_object (&request->message);
+  g_clear_pointer (&request->mainctx, g_main_context_unref);
+  g_clear_object (&request->session);
+  g_clear_object (&request->file);
+  g_clear_pointer (&request->if_none_match, g_free);
+  g_clear_object (&request->response_body);
+  glnx_tmpfile_clear (&request->tmpf);
+  g_clear_object (&request->out_stream);
+  g_clear_pointer (&request->out_etag, g_free);
+  g_free (request);
 }
 
 static void
-pending_uri_unref (OstreeFetcherPendingURI *pending)
-{
-  if (!g_atomic_int_dec_and_test (&pending->ref_count))
-    return;
-
-  g_clear_pointer (&pending->thread_closure, thread_closure_unref);
-
-  g_clear_pointer (&pending->mirrorlist, g_ptr_array_unref);
-  g_free (pending->filename);
-  g_clear_object (&pending->message);
-  g_clear_object (&pending->file);
-  g_clear_object (&pending->request_body);
-  g_free (pending->if_none_match);
-  glnx_tmpfile_clear (&pending->tmpf);
-  g_clear_object (&pending->out_stream);
-  g_free (pending->out_etag);
-  g_free (pending);
-}
-
-static gboolean
-session_thread_idle_dispatch (gpointer data)
-{
-  IdleClosure *idle_closure = data;
-
-  idle_closure->function (idle_closure->thread_closure,
-                          idle_closure->data);
-
-  return G_SOURCE_REMOVE;
-}
-
-static void
-session_thread_idle_add (ThreadClosure *thread_closure,
-                         SessionThreadFunc function,
-                         gpointer data,
-                         GDestroyNotify notify)
-{
-  IdleClosure *idle_closure;
-
-  g_return_if_fail (thread_closure != NULL);
-  g_return_if_fail (function != NULL);
-
-  idle_closure = g_slice_new (IdleClosure);
-  idle_closure->thread_closure = thread_closure_ref (thread_closure);
-  idle_closure->function = function;
-  idle_closure->data = data;
-  idle_closure->notify = notify;
-
-  g_main_context_invoke_full (thread_closure->main_context,
-                              G_PRIORITY_DEFAULT,
-                              session_thread_idle_dispatch,
-                              idle_closure,  /* takes ownership */
-                              (GDestroyNotify) idle_closure_free);
-}
-
-static void
-session_thread_add_logger (ThreadClosure *thread_closure,
-                           gpointer data)
-{
-  glnx_unref_object SoupLogger *logger = NULL;
-
-  logger = soup_logger_new (SOUP_LOGGER_LOG_BODY);
-  soup_logger_set_max_body_size (logger, 500);
-  soup_session_add_feature (thread_closure->session,
-                            SOUP_SESSION_FEATURE (logger));
-}
-
-static void
-session_thread_set_proxy_cb (ThreadClosure *thread_closure,
-                             gpointer data)
-{
-  GProxyResolver *resolver = data;
-
-  g_object_set (thread_closure->session,
-                "proxy-resolver",
-                g_object_ref (resolver), NULL);
-}
-
-static void
-session_thread_set_cookie_jar_cb (ThreadClosure *thread_closure,
-                                  gpointer data)
-{
-  SoupCookieJar *jar = data;
-
-  soup_session_add_feature (thread_closure->session,
-                            SOUP_SESSION_FEATURE (jar));
-}
-
-static void
-session_thread_set_headers_cb (ThreadClosure *thread_closure,
-                               gpointer data)
-{
-  GVariant *headers = data;
-
-  g_clear_pointer (&thread_closure->extra_headers, g_variant_unref);
-  thread_closure->extra_headers = g_variant_ref (headers);
-}
-
-static void
-session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
-                                       gpointer data)
-{
-  const char *cert_and_key_path = data; /* str\0str\0 in one malloc buf */
-  const char *cert_path = cert_and_key_path;
-  const char *key_path = cert_and_key_path + strlen (cert_and_key_path) + 1;
-  g_autoptr(OstreeTlsCertInteraction) interaction = NULL;
-
-  /* The GTlsInteraction instance must be created in the
-   * session thread so it uses the correct GMainContext. */
-  interaction = _ostree_tls_cert_interaction_new (cert_path, key_path);
-
-  g_object_set (thread_closure->session,
-                "tls-interaction",
-                interaction, NULL);
-}
-
-static void
-session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
-                                    gpointer data)
-{
-  const char *db_path = data;
-
-  if (db_path != NULL)
-    {
-      glnx_unref_object GTlsDatabase *tlsdb = NULL;
-
-      g_clear_error (&thread_closure->initialization_error);
-      tlsdb = g_tls_file_database_new (db_path, &thread_closure->initialization_error);
-
-      if (tlsdb)
-        g_object_set (thread_closure->session,
-                      "tls-database",
-                      tlsdb, NULL);
-    }
-}
-
-static void
-session_thread_set_extra_user_agent_cb (ThreadClosure *thread_closure,
-                                        gpointer data)
-{
-  const char *extra_user_agent = data;
-  if (extra_user_agent != NULL)
-    {
-      g_autofree char *ua =
-        g_strdup_printf ("%s %s", OSTREE_FETCHER_USERAGENT_STRING, extra_user_agent);
-      g_object_set (thread_closure->session, "user-agent", ua, NULL);
-    }
-  else
-    {
-      g_object_set (thread_closure->session, "user-agent",
-                    OSTREE_FETCHER_USERAGENT_STRING, NULL);
-    }
-}
-
-static void
-on_request_sent (GObject        *object, GAsyncResult   *result, gpointer        user_data);
-
-static void
-request_send_async (ThreadClosure *thread_closure,
-                    GTask *task)
-{
-  OstreeFetcherPendingURI *pending;
-  GCancellable *cancellable;
-
-  pending = g_task_get_task_data (task);
-  cancellable = g_task_get_cancellable (task);
-
-  if (pending->file)
-    g_file_read_async (pending->file, g_task_get_priority (task),
-                       cancellable, on_request_sent, task);
-  else
-    {
-      soup_session_send_async (thread_closure->session, pending->message,
-                               g_task_get_priority (task),
-                               cancellable, on_request_sent, task);
-    }
-}
-
-static void
-start_pending_request (ThreadClosure *thread_closure,
-                       GTask         *task)
-{
-
-  OstreeFetcherPendingURI *pending;
-
-  pending = g_task_get_task_data (task);
-
-  g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
-  request_send_async (thread_closure, g_object_ref (task));
-}
+on_request_sent (GObject      *object,
+                 GAsyncResult *result,
+                 gpointer      user_data);
 
 static gboolean
 _message_accept_cert_loose (SoupMessage          *msg,
                             GTlsCertificate      *tls_peer_certificate,
-                            GTlsCertificateFlags tls_peer_errors,
+                            GTlsCertificateFlags  tls_peer_errors,
                             gpointer              data)
 {
-  OstreeFetcherConfigFlags config_flags;
-
-  config_flags = GPOINTER_TO_UINT (data);
-
-  return ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0);
+  return TRUE;
 }
 
 static void
-create_pending_soup_request (OstreeFetcherPendingURI  *pending)
+create_request_message (FetcherRequest *request)
 {
-  OstreeFetcherURI *next_mirror = NULL;
-  g_autoptr(OstreeFetcherURI) uri = NULL;
-  GUri *guri = NULL;
+  g_assert (request->mirrorlist);
+  g_assert (request->mirrorlist_idx < request->mirrorlist->len);
 
-  g_assert (pending->mirrorlist);
-  g_assert (pending->mirrorlist_idx < pending->mirrorlist->len);
-
-  next_mirror = g_ptr_array_index (pending->mirrorlist, pending->mirrorlist_idx);
-  if (pending->filename)
-    uri = _ostree_fetcher_uri_new_subpath (next_mirror, pending->filename);
+  OstreeFetcherURI *next_mirror = g_ptr_array_index (request->mirrorlist, request->mirrorlist_idx);
+  g_autoptr(OstreeFetcherURI) uri = NULL;
+  if (request->filename)
+    uri = _ostree_fetcher_uri_new_subpath (next_mirror, request->filename);
   if (!uri)
-    uri = (OstreeFetcherURI*)g_uri_ref ((GUri*)next_mirror);
+    uri = (OstreeFetcherURI *) g_uri_ref ((GUri *) next_mirror);
 
-  g_clear_object (&pending->message);
-  g_clear_object (&pending->file);
+  g_clear_object (&request->message);
+  g_clear_object (&request->file);
+  g_clear_object (&request->response_body);
 
-  guri = (GUri*)(uri ? uri : next_mirror);
+  GUri *guri = (GUri *) uri;
 
   /* file:// URI is handle via GFile */
   if (!strcmp (g_uri_get_scheme (guri), "file"))
     {
-      char *str = g_uri_to_string (guri);
-      pending->file = g_file_new_for_uri (str);
-      g_free (str);
+      g_autofree char *str = g_uri_to_string (guri);
+      request->file = g_file_new_for_uri (str);
       return;
     }
 
-  pending->message = soup_message_new_from_uri ("GET", guri);
+  request->message = soup_message_new_from_uri ("GET", guri);
 
-  if (pending->if_none_match != NULL)
-    {
-      soup_message_headers_append (soup_message_get_request_headers (pending->message),
-                                   "If-None-Match", pending->if_none_match);
-    }
+  if (request->if_none_match != NULL)
+    soup_message_headers_append (soup_message_get_request_headers (request->message),
+                                 "If-None-Match", request->if_none_match);
 
-  if (pending->if_modified_since > 0)
+  if (request->if_modified_since > 0)
     {
-      g_autoptr(GDateTime) date_time = g_date_time_new_from_unix_utc (pending->if_modified_since);
+      g_autoptr(GDateTime) date_time = g_date_time_new_from_unix_utc (request->if_modified_since);
       g_autofree char *mod_date = g_date_time_format (date_time, "%a, %d %b %Y %H:%M:%S %Z");
-      soup_message_headers_append (soup_message_get_request_headers (pending->message),
+      soup_message_headers_append (soup_message_get_request_headers (request->message),
                                    "If-Modified-Since", mod_date);
     }
 
-  if (pending->message && pending->fetcher->config_flags != 0)
-    {
-      g_signal_connect (pending->message, "accept-certificate",
-                        G_CALLBACK (_message_accept_cert_loose),
-                        GUINT_TO_POINTER (pending->fetcher->config_flags));
-    }
-}
+  if ((request->fetcher->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) != 0)
+    g_signal_connect (request->message, "accept-certificate",
+                      G_CALLBACK (_message_accept_cert_loose), NULL);
 
-static void
-session_thread_request_uri (ThreadClosure *thread_closure,
-                            gpointer data)
-{
-  GTask *task = G_TASK (data);
-  OstreeFetcherPendingURI *pending;
-
-  pending = g_task_get_task_data (task);
-
-  /* If we caught an error in init, re-throw it for every request */
-  if (thread_closure->initialization_error)
+  if (request->fetcher->extra_headers)
     {
-      g_task_return_error (task, g_error_copy (thread_closure->initialization_error));
-      return;
-    }
-
-  create_pending_soup_request (pending);
-
-  if (pending->message && thread_closure->extra_headers)
-    {
-      g_autoptr(GVariantIter) viter = g_variant_iter_new (thread_closure->extra_headers);
+      g_autoptr(GVariantIter) viter = g_variant_iter_new (request->fetcher->extra_headers);
       const char *key;
       const char *value;
 
       while (g_variant_iter_next (viter, "(&s&s)", &key, &value))
-        soup_message_headers_append (soup_message_get_request_headers (pending->message), key, value);
+        soup_message_headers_append (soup_message_get_request_headers (request->message), key, value);
     }
-
-  if (pending->is_membuf)
-    request_send_async (thread_closure, g_object_ref (task));
-  else
-    start_pending_request (thread_closure, task);
 }
 
-static gpointer
-ostree_fetcher_session_thread (gpointer data)
+static void
+initiate_task_request (GTask *task)
 {
-  ThreadClosure *closure = data;
-  g_autoptr(GMainContext) mainctx = g_main_context_ref (closure->main_context);
-
-  /* This becomes the GMainContext that SoupSession schedules async
-   * callbacks and emits signals from.  Make it the thread-default
-   * context for this thread before creating the session. */
-  g_main_context_push_thread_default (mainctx);
-
-  /* We retain ownership of the SoupSession reference. */
-  closure->session = soup_session_new_with_options ("user-agent", OSTREE_FETCHER_USERAGENT_STRING,
-                                                    "timeout", 60,
-                                                    "idle-timeout", 60,
-                                                    "max-conns-per-host", _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS,
-                                                    NULL);
+  FetcherRequest *request = g_task_get_task_data (task);
+  create_request_message (request);
 
-  /* SoupContentDecoder is included in the session by default. Remove it
-   * if gzip compression isn't in use.
-   */
-  if (!closure->transfer_gzip)
-    soup_session_remove_feature_by_type (closure->session, SOUP_TYPE_CONTENT_DECODER);
-
-  /* This model ensures we don't hit a race using g_main_loop_quit();
-   * see also what pull_termination_condition() in ostree-repo-pull.c
-   * is doing.
-   */
-  while (g_atomic_int_get (&closure->running))
-    g_main_context_iteration (closure->main_context, TRUE);
-
-  /* Since the ThreadClosure may be finalized from any thread we
-   * unreference all data related to the SoupSession ourself to ensure
-   * it's freed in the same thread where it was created. */
-  g_clear_pointer (&closure->outstanding, g_hash_table_unref);
-  g_clear_pointer (&closure->session, g_object_unref);
-
-  thread_closure_unref (closure);
-
-  /* Do this last, since libsoup uses g_main_current_source() which
-   * relies on it.
-   */
-  g_main_context_pop_thread_default (mainctx);
-
-  return NULL;
+  GCancellable *cancellable = g_task_get_cancellable (task);
+  int priority = g_task_get_priority (task);
+  if (request->file)
+    g_file_read_async (request->file, priority, cancellable, on_request_sent, task);
+  else
+    {
+      g_autofree char *uri = g_uri_to_string (soup_message_get_uri (request->message));
+      const char *dest = request->is_membuf ? "memory" : "tmpfile";
+      g_debug ("Requesting %s to %s for session %p in main context %p",
+               uri, dest, request->session, request->mainctx);
+      soup_session_send_async (request->session, request->message,
+                               priority, cancellable, on_request_sent, task);
+    }
 }
 
 static void
@@ -567,18 +241,14 @@ _ostree_fetcher_finalize (GObject *object)
 {
   OstreeFetcher *self = OSTREE_FETCHER (object);
 
-  /* Terminate the session thread. */
-  g_atomic_int_set (&self->thread_closure->running, 0);
-  g_main_context_wakeup (self->thread_closure->main_context);
-  if (self->session_thread)
-    {
-      /* We need to explicitly synchronize to clean up TLS */
-      if (self->session_thread != g_thread_self ())
-        g_thread_join (self->session_thread);
-      else
-        g_clear_pointer (&self->session_thread, g_thread_unref);
-    }
-  g_clear_pointer (&self->thread_closure, thread_closure_unref);
+  g_clear_pointer (&self->remote_name, g_free);
+  g_clear_pointer (&self->sessions, g_hash_table_unref);
+  g_clear_object (&self->proxy_resolver);
+  g_clear_object (&self->cookie_jar);
+  g_clear_object (&self->client_cert);
+  g_clear_object (&self->tls_database);
+  g_clear_pointer (&self->extra_headers, g_variant_unref);
+  g_clear_pointer (&self->user_agent, g_free);
 
   G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
 }
@@ -587,40 +257,11 @@ static void
 _ostree_fetcher_constructed (GObject *object)
 {
   OstreeFetcher *self = OSTREE_FETCHER (object);
-  g_autoptr(GMainContext) main_context = NULL;
-  const char *http_proxy;
-
-  main_context = g_main_context_new ();
 
-  self->thread_closure = g_slice_new0 (ThreadClosure);
-  self->thread_closure->ref_count = 1;
-  self->thread_closure->main_context = g_main_context_ref (main_context);
-  self->thread_closure->running = 1;
-  self->thread_closure->transfer_gzip = (self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) != 0;
-
-  self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
-  self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
-                                                                   (GDestroyNotify) NULL,
-                                                                   (GDestroyNotify) g_object_unref);
-  g_mutex_init (&self->thread_closure->output_stream_set_lock);
-
-  if (g_getenv ("OSTREE_DEBUG_HTTP"))
-    {
-      session_thread_idle_add (self->thread_closure,
-                               session_thread_add_logger,
-                               NULL, (GDestroyNotify) NULL);
-    }
-
-  http_proxy = g_getenv ("http_proxy");
+  const char *http_proxy = g_getenv ("http_proxy");
   if (http_proxy != NULL && http_proxy[0] != '\0')
     _ostree_fetcher_set_proxy (self, http_proxy);
 
-  /* FIXME Maybe implement GInitableIface and use g_thread_try_new()
-   *       so we can try to handle thread creation errors gracefully? */
-  self->session_thread = g_thread_new ("fetcher-session-thread",
-                                       ostree_fetcher_session_thread,
-                                       thread_closure_ref (self->thread_closure));
-
   G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
 }
 
@@ -649,72 +290,56 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass)
 static void
 _ostree_fetcher_init (OstreeFetcher *self)
 {
+  self->sessions = g_hash_table_new (g_direct_hash, g_direct_equal);
 }
 
 OstreeFetcher *
-_ostree_fetcher_new (int                      tmpdir_dfd,
-                     const char              *remote_name,
-                     OstreeFetcherConfigFlags flags)
+_ostree_fetcher_new (int                       tmpdir_dfd,
+                     const char               *remote_name,
+                     OstreeFetcherConfigFlags  flags)
 {
-  OstreeFetcher *self;
-
-  self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
-  self->thread_closure->remote_name = g_strdup (remote_name);
-  self->thread_closure->base_tmpdir_dfd = tmpdir_dfd;
+  OstreeFetcher *self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
+  self->remote_name = g_strdup (remote_name);
+  self->tmpdir_dfd = tmpdir_dfd;
 
   return self;
 }
 
 int
-_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
+_ostree_fetcher_get_dfd (OstreeFetcher *self)
 {
-  return fetcher->thread_closure->base_tmpdir_dfd;
+  return self->tmpdir_dfd;
 }
 
 void
 _ostree_fetcher_set_proxy (OstreeFetcher *self,
                            const char    *http_proxy)
 {
-  GProxyResolver *resolver;
-  GUri *guri;
-
   g_return_if_fail (OSTREE_IS_FETCHER (self));
   g_return_if_fail (http_proxy != NULL && http_proxy[0] != '\0');
 
   /* validate first */
-  guri = g_uri_parse (http_proxy, SOUP_HTTP_URI_FLAGS, NULL);
-
-  if (!guri)
+  g_autoptr(GError) local_error = NULL;
+  g_autoptr(GUri) guri = g_uri_parse (http_proxy, G_URI_FLAGS_NONE, &local_error);
+  if (guri == NULL)
     {
-      g_warning ("Invalid proxy URI '%s'", http_proxy);
+      g_warning ("Invalid proxy URI '%s': %s", http_proxy, local_error->message);
       return;
     }
 
-  g_uri_unref (guri);
-
-  resolver = g_simple_proxy_resolver_new (http_proxy, NULL);
-
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_set_proxy_cb,
-                           resolver,  /* takes ownership */
-                           (GDestroyNotify) g_object_unref);
-    }
+  g_clear_object (&self->proxy_resolver);
+  self->proxy_resolver = g_simple_proxy_resolver_new (http_proxy, NULL);
+}
 
 void
 _ostree_fetcher_set_cookie_jar (OstreeFetcher *self,
                                 const char    *jar_path)
 {
-  SoupCookieJar *jar;
-
   g_return_if_fail (OSTREE_IS_FETCHER (self));
   g_return_if_fail (jar_path != NULL);
 
-  jar = soup_cookie_jar_text_new (jar_path, TRUE);
-
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_set_cookie_jar_cb,
-                           jar,  /* takes ownership */
-                           (GDestroyNotify) g_object_unref);
+  g_clear_object (&self->cookie_jar);
+  self->cookie_jar = soup_cookie_jar_text_new (jar_path, TRUE);
 }
 
 void
@@ -722,20 +347,10 @@ _ostree_fetcher_set_client_cert (OstreeFetcher   *self,
                                  const char      *cert_path,
                                  const char      *key_path)
 {
-  g_autoptr(GString) buf = NULL;
   g_return_if_fail (OSTREE_IS_FETCHER (self));
 
-  if (cert_path)
-    {
-      buf = g_string_new (cert_path);
-      g_string_append_c (buf, '\0');
-      g_string_append (buf, key_path);
-    }
-
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_set_tls_interaction_cb,
-                           g_string_free (g_steal_pointer (&buf), FALSE),
-                           (GDestroyNotify) g_free);
+  g_clear_object (&self->client_cert);
+  self->client_cert = _ostree_tls_cert_interaction_new (cert_path, key_path);
 }
 
 void
@@ -744,235 +359,201 @@ _ostree_fetcher_set_tls_database (OstreeFetcher *self,
 {
   g_return_if_fail (OSTREE_IS_FETCHER (self));
 
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_set_tls_database_cb,
-                           g_strdup (tlsdb_path),
-                           (GDestroyNotify) g_free);
+  g_autoptr(GError) local_error = NULL;
+  GTlsDatabase *tlsdb = g_tls_file_database_new (tlsdb_path, &local_error);
+  if (tlsdb == NULL)
+    {
+      g_warning ("Invalid TLS database '%s': %s", tlsdb_path, local_error->message);
+      return;
+    }
+
+  g_clear_object (&self->tls_database);
+  self->tls_database = tlsdb;
 }
 
 void
 _ostree_fetcher_set_extra_headers (OstreeFetcher *self,
                                    GVariant      *extra_headers)
 {
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_set_headers_cb,
-                           g_variant_ref (extra_headers),
-                           (GDestroyNotify) g_variant_unref);
+  g_return_if_fail (OSTREE_IS_FETCHER (self));
+
+  g_clear_pointer (&self->extra_headers, g_variant_unref);
+  self->extra_headers = g_variant_ref (extra_headers);
 }
 
 void
 _ostree_fetcher_set_extra_user_agent (OstreeFetcher *self,
                                       const char    *extra_user_agent)
 {
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_set_extra_user_agent_cb,
-                           g_strdup (extra_user_agent),
-                           (GDestroyNotify) g_free);
+  g_return_if_fail (OSTREE_IS_FETCHER (self));
+
+  g_clear_pointer (&self->user_agent, g_free);
+  if (extra_user_agent != NULL)
+    self->user_agent = g_strdup_printf ("%s %s",
+                                        OSTREE_FETCHER_USERAGENT_STRING,
+                                        extra_user_agent);
 }
 
 static gboolean
-finish_stream (OstreeFetcherPendingURI *pending,
-               GCancellable            *cancellable,
-               GError                 **error)
+finish_stream (FetcherRequest  *request,
+               GCancellable    *cancellable,
+               GError         **error)
 {
-  gboolean ret = FALSE;
-  struct stat stbuf;
-
   /* Close it here since we do an async fstat(), where we don't want
    * to hit a bad fd.
    */
-  if (pending->out_stream)
+  if (request->out_stream)
     {
-      if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
+      if ((request->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
         {
           const guint8 nulchar = 0;
-          gsize bytes_written;
 
-          if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
+          if (!g_output_stream_write_all (request->out_stream, &nulchar, 1, NULL,
                                           cancellable, error))
-            goto out;
+            return FALSE;
         }
 
-      if (!g_output_stream_close (pending->out_stream, cancellable, error))
-        goto out;
-
-      g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
-      g_hash_table_remove (pending->thread_closure->output_stream_set,
-                           pending->out_stream);
-      g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+      if (!g_output_stream_close (request->out_stream, cancellable, error))
+        return FALSE;
     }
 
-  if (!pending->is_membuf)
+  if (!request->is_membuf)
     {
-      if (!glnx_fstat (pending->tmpf.fd, &stbuf, error))
-        goto out;
-    }
+      struct stat stbuf;
 
-  pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+      if (!glnx_fstat (request->tmpf.fd, &stbuf, error))
+        return FALSE;
 
-  if (!pending->is_membuf)
-    {
-      if (stbuf.st_size < pending->content_length)
+      if (request->content_length >= 0 && stbuf.st_size < request->content_length)
         {
           g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
-          goto out;
-        }
-      else
-        {
-          g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
-          pending->thread_closure->total_downloaded += stbuf.st_size;
-          g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+          return FALSE;
         }
     }
 
-  ret = TRUE;
- out:
-  (void) g_input_stream_close (pending->request_body, NULL, NULL);
-  return ret;
+  return TRUE;
 }
 
 static void
-on_stream_read (GObject        *object,
-                GAsyncResult   *result,
-                gpointer        user_data);
+on_stream_read (GObject      *object,
+                GAsyncResult *result,
+                gpointer      user_data);
 
 static void
-remove_pending (OstreeFetcherPendingURI *pending)
+on_out_splice_complete (GObject      *object,
+                        GAsyncResult *result,
+                        gpointer      user_data)
 {
-  /* Hold a temporary ref to ensure the reference to
-   * pending->thread_closure is valid.
-   */
-  pending_uri_ref (pending);
-  g_hash_table_remove (pending->thread_closure->outstanding, pending);
-  pending_uri_unref (pending);
-}
-
-static void
-on_out_splice_complete (GObject        *object,
-                        GAsyncResult   *result,
-                        gpointer        user_data)
-{
-  GTask *task = G_TASK (user_data);
-  OstreeFetcherPendingURI *pending;
-  GCancellable *cancellable;
-  gssize bytes_written;
+  g_autoptr(GTask) task = G_TASK (user_data);
   GError *local_error = NULL;
 
-  pending = g_task_get_task_data (task);
-  cancellable = g_task_get_cancellable (task);
-
-  bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
-                                                 result,
-                                                 &local_error);
+  gssize bytes_written = g_output_stream_splice_finish ((GOutputStream *) object, result, &local_error);
   if (bytes_written < 0)
-    goto out;
+    {
+      g_task_return_error (task, local_error);
+      return;
+    }
+
+  FetcherRequest *request = g_task_get_task_data (task);
+  request->fetcher->bytes_transferred += bytes_written;
 
-  g_input_stream_read_bytes_async (pending->request_body,
+  GCancellable *cancellable = g_task_get_cancellable (task);
+  g_input_stream_read_bytes_async (request->response_body,
                                    8192, G_PRIORITY_DEFAULT,
                                    cancellable,
                                    on_stream_read,
                                    g_object_ref (task));
-
- out:
-  if (local_error)
-    {
-      g_task_return_error (task, local_error);
-      remove_pending (pending);
-    }
-
-  g_object_unref (task);
 }
 
 static void
-on_stream_read (GObject        *object,
-                GAsyncResult   *result,
-                gpointer        user_data)
+on_stream_read (GObject      *object,
+                GAsyncResult *result,
+                gpointer      user_data)
 {
-  GTask *task = G_TASK (user_data);
-  OstreeFetcherPendingURI *pending;
-  GCancellable *cancellable;
-  g_autoptr(GBytes) bytes = NULL;
-  gsize bytes_read;
+  g_autoptr(GTask) task = G_TASK (user_data);
+  FetcherRequest *request = g_task_get_task_data (task);
   GError *local_error = NULL;
 
-  pending = g_task_get_task_data (task);
-  cancellable = g_task_get_cancellable (task);
-
   /* Only open the output stream on demand to ensure we use as
    * few file descriptors as possible.
    */
-  if (!pending->out_stream)
+  if (!request->out_stream)
     {
-      if (!pending->is_membuf)
+      if (!request->is_membuf)
         {
-          if (!_ostree_fetcher_tmpf_from_flags (pending->flags, pending->thread_closure->base_tmpdir_dfd,
-                                                &pending->tmpf, &local_error))
-            goto out;
-          pending->out_stream = g_unix_output_stream_new (pending->tmpf.fd, FALSE);
+          if (!_ostree_fetcher_tmpf_from_flags (request->flags, request->fetcher->tmpdir_dfd,
+                                                &request->tmpf, &local_error))
+            {
+              g_task_return_error (task, local_error);
+              return;
+            }
+          request->out_stream = g_unix_output_stream_new (request->tmpf.fd, FALSE);
         }
       else
         {
-          pending->out_stream = g_memory_output_stream_new_resizable ();
+          request->out_stream = g_memory_output_stream_new_resizable ();
         }
-
-      g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
-      g_hash_table_add (pending->thread_closure->output_stream_set,
-                        g_object_ref (pending->out_stream));
-      g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
     }
 
   /* Get a GBytes buffer */
-  bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
+  g_autoptr(GBytes) bytes = g_input_stream_read_bytes_finish ((GInputStream *) object, result, &local_error);
   if (!bytes)
-    goto out;
-  bytes_read = g_bytes_get_size (bytes);
+    {
+      g_task_return_error (task, local_error);
+      return;
+    }
 
   /* Was this the end of the stream? */
+  GCancellable *cancellable = g_task_get_cancellable (task);
+  gsize bytes_read = g_bytes_get_size (bytes);
   if (bytes_read == 0)
     {
-      if (!finish_stream (pending, cancellable, &local_error))
-        goto out;
-      if (pending->is_membuf)
+      if (!finish_stream (request, cancellable, &local_error))
+        {
+          g_task_return_error (task, local_error);
+          return;
+        }
+      if (request->is_membuf)
         {
-          g_task_return_pointer (task,
-                                 g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
-                                 (GDestroyNotify) g_bytes_unref);
+          GBytes *mem_bytes = g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream *) request->out_stream);
+          g_task_return_pointer (task, mem_bytes, (GDestroyNotify) g_bytes_unref);
         }
       else
         {
-          if (lseek (pending->tmpf.fd, 0, SEEK_SET) < 0)
+          if (lseek (request->tmpf.fd, 0, SEEK_SET) < 0)
             {
               glnx_set_error_from_errno (&local_error);
               g_task_return_error (task, g_steal_pointer (&local_error));
+              return;
             }
-          else
-            g_task_return_boolean (task, TRUE);
+
+          g_task_return_boolean (task, TRUE);
         }
-      remove_pending (pending);
     }
   else
     {
       /* Verify max size */
-      if (pending->max_size > 0)
+      if (request->max_size > 0)
         {
-          if (bytes_read > pending->max_size ||
-              (bytes_read + pending->current_size) > pending->max_size)
+          if (bytes_read > request->max_size ||
+              (bytes_read + request->current_size) > request->max_size)
             {
               g_autofree char *uristr = NULL;
 
-              if (pending->file)
-                uristr = g_file_get_uri (pending->file);
+              if (request->file)
+                uristr = g_file_get_uri (request->file);
               else
-                uristr = g_uri_to_string (soup_message_get_uri (pending->message));
+                uristr = g_uri_to_string (soup_message_get_uri (request->message));
 
               local_error = g_error_new (G_IO_ERROR, G_IO_ERROR_FAILED,
                                          "URI %s exceeded maximum size of %" G_GUINT64_FORMAT " bytes",
-                                         uristr, pending->max_size);
-              goto out;
+                                         uristr, request->max_size);
+              g_task_return_error (task, local_error);
+              return;
             }
         }
 
-      pending->current_size += bytes_read;
+      request->current_size += bytes_read;
 
       /* We do this instead of _write_bytes_async() as that's not
        * guaranteed to do a complete write.
@@ -980,7 +561,7 @@ on_stream_read (GObject        *object,
       {
         g_autoptr(GInputStream) membuf =
           g_memory_input_stream_new_from_bytes (bytes);
-        g_output_stream_splice_async (pending->out_stream, membuf,
+        g_output_stream_splice_async (request->out_stream, membuf,
                                       G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
                                       G_PRIORITY_DEFAULT,
                                       cancellable,
@@ -988,214 +569,251 @@ on_stream_read (GObject        *object,
                                       g_object_ref (task));
       }
     }
-
- out:
-  if (local_error)
-    {
-      g_task_return_error (task, local_error);
-      remove_pending (pending);
-    }
-
-  g_object_unref (task);
 }
 
 static void
-on_request_sent (GObject        *object,
-                 GAsyncResult   *result,
-                 gpointer        user_data)
+on_request_sent (GObject      *object,
+                 GAsyncResult *result,
+                 gpointer      user_data)
 {
-  GTask *task = G_TASK (user_data);
-  /* Hold a ref to the pending across this function, since we remove
-   * it from the hash early in some cases, not in others. */
-  OstreeFetcherPendingURI *pending = pending_uri_ref (g_task_get_task_data (task));
-  GCancellable *cancellable = g_task_get_cancellable (task);
+  g_autoptr(GTask) task = G_TASK (user_data);
+  FetcherRequest *request = g_task_get_task_data (task);
   GError *local_error = NULL;
-  glnx_unref_object SoupMessage *msg = NULL;
-  SoupStatus status;
-
-  pending->state = OSTREE_FETCHER_STATE_COMPLETE;
-  if (pending->file)
-    pending->request_body = (GInputStream *)g_file_read_finish ((GFile *) object,
-                                                                result,
-                                                                &local_error);
+
+  if (request->file)
+    request->response_body = (GInputStream *) g_file_read_finish ((GFile *) object,
+                                                                  result,
+                                                                  &local_error);
   else
-    pending->request_body = soup_session_send_finish ((SoupSession*) object,
-                                                      result, &local_error);
+    request->response_body = soup_session_send_finish ((SoupSession *) object,
+                                                       result, &local_error);
 
-  if (!pending->request_body)
-    goto out;
+  if (!request->response_body)
+    {
+      g_task_return_error (task, local_error);
+      return;
+    }
 
-  if (pending->message)
+  if (request->message)
     {
-      status = soup_message_get_status (pending->message);
+      SoupStatus status = soup_message_get_status (request->message);
       if (status == SOUP_STATUS_NOT_MODIFIED &&
-          (pending->if_none_match != NULL || pending->if_modified_since > 0))
+          (request->if_none_match != NULL || request->if_modified_since > 0))
         {
           /* Version on the server is unchanged from the version we have cached locally;
            * report this as an out-argument, a zero-length response buffer, and no error */
-          pending->out_not_modified = TRUE;
+          request->out_not_modified = TRUE;
         }
       else if (!SOUP_STATUS_IS_SUCCESSFUL (status))
         {
           /* is there another mirror we can try? */
-          if (pending->mirrorlist_idx + 1 < pending->mirrorlist->len)
+          if (request->mirrorlist_idx + 1 < request->mirrorlist->len)
             {
-              pending->mirrorlist_idx++;
-              create_pending_soup_request (pending);
-
-              (void) g_input_stream_close (pending->request_body, NULL, NULL);
-
-              start_pending_request (pending->thread_closure, task);
+              request->mirrorlist_idx++;
+              initiate_task_request (g_object_ref (task));
+              return;
             }
           else
             {
-              g_autofree char *uristring = g_uri_to_string (soup_message_get_uri (pending->message));
+              g_autofree char *uristring = g_uri_to_string (soup_message_get_uri (request->message));
               GIOErrorEnum code = _ostree_fetcher_http_status_code_to_io_error (status);
               {
                 g_autofree char *errmsg =
                   g_strdup_printf ("Server returned status %u: %s",
                                    status,
                                    soup_status_get_phrase (status));
-
-                /* Let's make OOB errors be the final one since they're probably
-                 * the cause for the error here. */
-                if (pending->thread_closure->oob_error)
-                  {
-                    local_error =
-                      g_error_copy (pending->thread_closure->oob_error);
-                    g_prefix_error (&local_error, "%s: ", errmsg);
-                  }
-                else
-                  local_error = g_error_new_literal (G_IO_ERROR, code, errmsg);
+                local_error = g_error_new_literal (G_IO_ERROR, code, errmsg);
               }
 
-              if (pending->mirrorlist->len > 1)
+              if (request->mirrorlist->len > 1)
                 g_prefix_error (&local_error,
                                 "All %u mirrors failed. Last error was: ",
-                                pending->mirrorlist->len);
-              if (pending->thread_closure->remote_name &&
-                  !((pending->flags & OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT) > 0 &&
+                                request->mirrorlist->len);
+              if (request->fetcher->remote_name &&
+                  !((request->flags & OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT) > 0 &&
                     code == G_IO_ERROR_NOT_FOUND))
-                _ostree_fetcher_journal_failure (pending->thread_closure->remote_name,
+                _ostree_fetcher_journal_failure (request->fetcher->remote_name,
                                                  uristring, local_error->message);
 
+              g_task_return_error (task, local_error);
+              return;
             }
-          goto out;
         }
 
       /* Grab cache properties from the response */
-      pending->out_etag = g_strdup (soup_message_headers_get_one (soup_message_get_response_headers (pending->message), "ETag"));
-      pending->out_last_modified = 0;
+      request->out_etag = g_strdup (soup_message_headers_get_one (soup_message_get_response_headers (request->message), "ETag"));
+      request->out_last_modified = 0;
 
-      const char *last_modified_str = soup_message_headers_get_one (soup_message_get_response_headers (pending->message), "Last-Modified");
+      const char *last_modified_str = soup_message_headers_get_one (soup_message_get_response_headers (request->message), "Last-Modified");
       if (last_modified_str != NULL)
         {
           GDateTime *soup_date = soup_date_time_new_from_http_string (last_modified_str);
           if (soup_date != NULL)
             {
-              pending->out_last_modified = g_date_time_to_unix (soup_date);
+              request->out_last_modified = g_date_time_to_unix (soup_date);
               g_date_time_unref (soup_date);
             }
         }
     }
 
-  pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
-
-  if (pending->file)
+  if (request->file)
     {
-      GFileInfo *info = g_file_query_info (pending->file,
+      GFileInfo *info = g_file_query_info (request->file,
                                            G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
                                            G_FILE_ATTRIBUTE_STANDARD_SIZE,
                                            0, NULL, NULL);
       if (info)
         {
-          pending->content_length = g_file_info_get_size (info);
+          request->content_length = g_file_info_get_size (info);
           g_object_unref (info);
         }
       else
-        pending->content_length = -1;
+        request->content_length = -1;
     }
   else
-    pending->content_length = soup_message_headers_get_content_length (soup_message_get_response_headers (pending->message));
+    {
+      SoupMessageHeaders *headers = soup_message_get_response_headers (request->message);
+      if (soup_message_headers_get_list (headers, "Content-Encoding") == NULL)
+        request->content_length = soup_message_headers_get_content_length (headers);
+      else
+        request->content_length = -1;
+    }
 
-  g_input_stream_read_bytes_async (pending->request_body,
+  GCancellable *cancellable = g_task_get_cancellable (task);
+  g_input_stream_read_bytes_async (request->response_body,
                                    8192, G_PRIORITY_DEFAULT,
                                    cancellable,
                                    on_stream_read,
                                    g_object_ref (task));
+}
 
- out:
-  if (local_error)
+static SoupSession *
+create_soup_session (OstreeFetcher *self)
+{
+  const char *user_agent = self->user_agent ?: OSTREE_FETCHER_USERAGENT_STRING;
+  SoupSession *session =
+    soup_session_new_with_options ("user-agent", user_agent,
+                                   "timeout", 60,
+                                   "idle-timeout", 60,
+                                   "max-conns-per-host", _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS,
+                                   NULL);
+
+  /* SoupContentDecoder is included in the session by default. Remove it
+   * if gzip compression isn't in use.
+   */
+  if ((self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) == 0)
+    soup_session_remove_feature_by_type (session, SOUP_TYPE_CONTENT_DECODER);
+
+  if (g_getenv ("OSTREE_DEBUG_HTTP"))
     {
-      if (pending->request_body)
-        (void) g_input_stream_close (pending->request_body, NULL, NULL);
-      g_task_return_error (task, local_error);
-      remove_pending (pending);
+      glnx_unref_object SoupLogger *logger = soup_logger_new (SOUP_LOGGER_LOG_BODY);
+      soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger));
     }
 
-  pending_uri_unref (pending);
-  g_object_unref (task);
+  if (self->proxy_resolver != NULL)
+    soup_session_set_proxy_resolver (session, self->proxy_resolver);
+  if (self->cookie_jar != NULL)
+    soup_session_add_feature (session, SOUP_SESSION_FEATURE (self->cookie_jar));
+  if (self->client_cert != NULL)
+    soup_session_set_tls_interaction (session, (GTlsInteraction *) self->client_cert);
+  if (self->tls_database != NULL)
+    soup_session_set_tls_database (session, self->tls_database);
+
+  return session;
+}
+
+static gboolean
+match_value (gpointer key,
+             gpointer value,
+             gpointer user_data)
+{
+  return value == user_data;
 }
 
 static void
-_ostree_fetcher_request_async (OstreeFetcher         *self,
-                               GPtrArray             *mirrorlist,
-                               const char            *filename,
-                               OstreeFetcherRequestFlags flags,
-                               const char            *if_none_match,
-                               guint64                if_modified_since,
-                               gboolean               is_membuf,
-                               guint64                max_size,
-                               int                    priority,
-                               GCancellable          *cancellable,
-                               GAsyncReadyCallback    callback,
-                               gpointer               user_data)
+on_session_finalized (gpointer  data,
+                      GObject  *object)
 {
-  g_autoptr(GTask) task = NULL;
-  OstreeFetcherPendingURI *pending;
+  GHashTable *sessions = data;
+  g_debug ("Removing session %p from sessions hash table", object);
+  (void) g_hash_table_foreach_remove (sessions, match_value, object);
+}
 
+static void
+_ostree_fetcher_request_async (OstreeFetcher             *self,
+                               GPtrArray                 *mirrorlist,
+                               const char                *filename,
+                               OstreeFetcherRequestFlags  flags,
+                               const char                *if_none_match,
+                               guint64                    if_modified_since,
+                               gboolean                   is_membuf,
+                               guint64                    max_size,
+                               int                        priority,
+                               GCancellable              *cancellable,
+                               GAsyncReadyCallback        callback,
+                               gpointer                   user_data)
+{
   g_return_if_fail (OSTREE_IS_FETCHER (self));
   g_return_if_fail (mirrorlist != NULL);
   g_return_if_fail (mirrorlist->len > 0);
 
-  /* SoupRequest is created in session thread. */
-  pending = g_new0 (OstreeFetcherPendingURI, 1);
-  pending->ref_count = 1;
-  pending->thread_closure = thread_closure_ref (self->thread_closure);
-  pending->mirrorlist = g_ptr_array_ref (mirrorlist);
-  pending->filename = g_strdup (filename);
-  pending->flags = flags;
-  pending->if_none_match = g_strdup (if_none_match);
-  pending->if_modified_since = if_modified_since;
-  pending->max_size = max_size;
-  pending->is_membuf = is_membuf;
-  pending->fetcher = self;
-
-  task = g_task_new (self, cancellable, callback, user_data);
+  FetcherRequest *request = g_new0 (FetcherRequest, 1);
+  request->mirrorlist = g_ptr_array_ref (mirrorlist);
+  request->filename = g_strdup (filename);
+  request->flags = flags;
+  request->if_none_match = g_strdup (if_none_match);
+  request->if_modified_since = if_modified_since;
+  request->max_size = max_size;
+  request->is_membuf = is_membuf;
+  request->fetcher = self;
+  request->mainctx = g_main_context_ref_thread_default ();
+
+  /* Ideally each fetcher would have a single soup session. However, each
+   * session needs to be used from a single main context and the fetcher
+   * doesn't have that limitation. Instead, a mapping from main context to
+   * session is kept in the fetcher.
+   */
+  g_debug ("Looking up session for main context %p", request->mainctx);
+  request->session = (SoupSession *) g_hash_table_lookup (self->sessions, request->mainctx);
+  if (request->session != NULL)
+    {
+      g_debug ("Using existing session %p", request->session);
+      g_object_ref (request->session);
+    }
+  else
+    {
+      request->session = create_soup_session (self);
+      g_debug ("Created new session %p", request->session);
+      g_hash_table_insert (self->sessions, request->mainctx, request->session);
+
+      /* Add a weak ref to the session so that when it's finalized it can be
+       * removed from the hash table.
+       */
+      g_object_weak_ref (G_OBJECT (request->session), on_session_finalized, self->sessions);
+    }
+
+  g_autoptr(GTask) task = g_task_new (self, cancellable, callback, user_data);
   g_task_set_source_tag (task, _ostree_fetcher_request_async);
-  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);
+  g_task_set_task_data (task, request, (GDestroyNotify) fetcher_request_free);
 
   /* We'll use the GTask priority for our own priority queue. */
   g_task_set_priority (task, priority);
 
-  session_thread_idle_add (self->thread_closure,
-                           session_thread_request_uri,
-                           g_object_ref (task),
-                           (GDestroyNotify) g_object_unref);
+  initiate_task_request (g_object_ref (task));
 }
 
 void
-_ostree_fetcher_request_to_tmpfile (OstreeFetcher         *self,
-                                    GPtrArray             *mirrorlist,
-                                    const char            *filename,
-                                    OstreeFetcherRequestFlags flags,
-                                    const char            *if_none_match,
-                                    guint64                if_modified_since,
-                                    guint64                max_size,
-                                    int                    priority,
-                                    GCancellable          *cancellable,
-                                    GAsyncReadyCallback    callback,
-                                    gpointer               user_data)
+_ostree_fetcher_request_to_tmpfile (OstreeFetcher             *self,
+                                    GPtrArray                 *mirrorlist,
+                                    const char                *filename,
+                                    OstreeFetcherRequestFlags  flags,
+                                    const char                *if_none_match,
+                                    guint64                    if_modified_since,
+                                    guint64                    max_size,
+                                    int                        priority,
+                                    GCancellable              *cancellable,
+                                    GAsyncReadyCallback        callback,
+                                    gpointer                   user_data)
 {
   _ostree_fetcher_request_async (self, mirrorlist, filename, flags,
                                  if_none_match, if_modified_since, FALSE,
@@ -1204,54 +822,49 @@ _ostree_fetcher_request_to_tmpfile (OstreeFetcher         *self,
 }
 
 gboolean
-_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
-                                           GAsyncResult  *result,
-                                           GLnxTmpfile   *out_tmpf,
-                                           gboolean      *out_not_modified,
-                                           char         **out_etag,
-                                           guint64       *out_last_modified,
-                                           GError       **error)
+_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher  *self,
+                                           GAsyncResult   *result,
+                                           GLnxTmpfile    *out_tmpf,
+                                           gboolean       *out_not_modified,
+                                           char          **out_etag,
+                                           guint64        *out_last_modified,
+                                           GError        **error)
 {
-  GTask *task;
-  OstreeFetcherPendingURI *pending;
-  gpointer ret;
-
   g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
   g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
 
-  task = (GTask*)result;
-  pending = g_task_get_task_data (task);
-
-  ret = g_task_propagate_pointer (task, error);
+  GTask *task = (GTask *) result;
+  gpointer ret = g_task_propagate_pointer (task, error);
   if (!ret)
     return FALSE;
 
-  g_assert (!pending->is_membuf);
-  *out_tmpf = pending->tmpf;
-  pending->tmpf.initialized = FALSE; /* Transfer ownership */
+  FetcherRequest *request = g_task_get_task_data (task);
+  g_assert (!request->is_membuf);
+  *out_tmpf = request->tmpf;
+  request->tmpf.initialized = FALSE; /* Transfer ownership */
 
   if (out_not_modified != NULL)
-    *out_not_modified = pending->out_not_modified;
+    *out_not_modified = request->out_not_modified;
   if (out_etag != NULL)
-    *out_etag = g_steal_pointer (&pending->out_etag);
+    *out_etag = g_steal_pointer (&request->out_etag);
   if (out_last_modified != NULL)
-    *out_last_modified = pending->out_last_modified;
+    *out_last_modified = request->out_last_modified;
 
   return TRUE;
 }
 
 void
-_ostree_fetcher_request_to_membuf (OstreeFetcher         *self,
-                                   GPtrArray             *mirrorlist,
-                                   const char            *filename,
-                                   OstreeFetcherRequestFlags flags,
-                                   const char            *if_none_match,
-                                   guint64                if_modified_since,
-                                   guint64                max_size,
-                                   int                    priority,
-                                   GCancellable          *cancellable,
-                                   GAsyncReadyCallback    callback,
-                                   gpointer               user_data)
+_ostree_fetcher_request_to_membuf (OstreeFetcher             *self,
+                                   GPtrArray                 *mirrorlist,
+                                   const char                *filename,
+                                   OstreeFetcherRequestFlags  flags,
+                                   const char                *if_none_match,
+                                   guint64                    if_modified_since,
+                                   guint64                    max_size,
+                                   int                        priority,
+                                   GCancellable              *cancellable,
+                                   GAsyncReadyCallback        callback,
+                                   gpointer                   user_data)
 {
   _ostree_fetcher_request_async (self, mirrorlist, filename, flags,
                                  if_none_match, if_modified_since, TRUE,
@@ -1260,66 +873,40 @@ _ostree_fetcher_request_to_membuf (OstreeFetcher         *self,
 }
 
 gboolean
-_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
-                                          GAsyncResult  *result,
-                                          GBytes       **out_buf,
-                                          gboolean      *out_not_modified,
-                                          char         **out_etag,
-                                          guint64       *out_last_modified,
-                                          GError       **error)
+_ostree_fetcher_request_to_membuf_finish (OstreeFetcher  *self,
+                                          GAsyncResult   *result,
+                                          GBytes        **out_buf,
+                                          gboolean       *out_not_modified,
+                                          char          **out_etag,
+                                          guint64        *out_last_modified,
+                                          GError        **error)
 {
-  GTask *task;
-  OstreeFetcherPendingURI *pending;
-  gpointer ret;
-
   g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
   g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
 
-  task = (GTask*)result;
-  pending = g_task_get_task_data (task);
-
-  ret = g_task_propagate_pointer (task, error);
+  GTask *task = (GTask *) result;
+  gpointer ret = g_task_propagate_pointer (task, error);
   if (!ret)
     return FALSE;
 
-  g_assert (pending->is_membuf);
+  FetcherRequest *request = g_task_get_task_data (task);
+  g_assert (request->is_membuf);
   g_assert (out_buf);
   *out_buf = ret;
 
   if (out_not_modified != NULL)
-    *out_not_modified = pending->out_not_modified;
+    *out_not_modified = request->out_not_modified;
   if (out_etag != NULL)
-    *out_etag = g_steal_pointer (&pending->out_etag);
+    *out_etag = g_steal_pointer (&request->out_etag);
   if (out_last_modified != NULL)
-    *out_last_modified = pending->out_last_modified;
+    *out_last_modified = request->out_last_modified;
 
   return TRUE;
 }
 
 
 guint64
-_ostree_fetcher_bytes_transferred (OstreeFetcher       *self)
+_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
 {
-  g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);
-
-  g_mutex_lock (&self->thread_closure->output_stream_set_lock);
-
-  guint64 ret = self->thread_closure->total_downloaded;
-
-  GLNX_HASH_TABLE_FOREACH (self->thread_closure->output_stream_set,
-                           GFileOutputStream*, stream)
-    {
-      if (G_IS_FILE_DESCRIPTOR_BASED (stream))
-        {
-          int fd = g_file_descriptor_based_get_fd ((GFileDescriptorBased*)stream);
-          struct stat stbuf;
-
-          if (glnx_fstat (fd, &stbuf, NULL))
-            ret += stbuf.st_size;
-        }
-    }
-
-  g_mutex_unlock (&self->thread_closure->output_stream_set_lock);
-
-  return ret;
+  return self->bytes_transferred;
 }