} OstreeFetcherState;
typedef struct {
- OstreeFetcher *self;
+ volatile int ref_count;
+
+ SoupSession *session;
+ GMainContext *main_context;
+ GMainLoop *main_loop;
+
+ int tmpdir_dfd;
+ int max_outstanding;
+
+ /* Queue for libsoup, see bgo#708591 */
+ GQueue pending_queue;
+ GHashTable *outstanding;
+
+ /* Shared across threads; be sure to lock. */
+ GHashTable *output_stream_set; /* set<GOutputStream> */
+ GMutex output_stream_set_lock;
+
+ /* Also protected by output_stream_set_lock. */
+ guint64 total_downloaded;
+} ThreadClosure;
+
+typedef struct {
+ ThreadClosure *thread_closure;
SoupURI *uri;
OstreeFetcherState state;
GTask *task;
} OstreeFetcherPendingURI;
+/* Used by session_thread_idle_add() */
+typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
+ gpointer data);
+
+/* Used by session_thread_idle_add() */
+typedef struct {
+ ThreadClosure *thread_closure;
+ SessionThreadFunc function;
+ gpointer data;
+ GDestroyNotify notify;
+} IdleClosure;
+
struct OstreeFetcher
{
GObject parent_instance;
OstreeFetcherConfigFlags config_flags;
- int tmpdir_dfd;
+
char *tmpdir_name;
GLnxLockFile tmpdir_lock;
int base_tmpdir_dfd;
- GTlsCertificate *client_cert;
-
- SoupSession *session;
- SoupRequester *requester;
-
- GHashTable *output_stream_set; /* set<GOutputStream> */
-
- guint64 total_downloaded;
-
- /* Queue for libsoup, see bgo#708591 */
- GQueue pending_queue;
- GHashTable *outstanding;
- gint max_outstanding;
+ GThread *session_thread;
+ ThreadClosure *thread_closure;
};
enum {
G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
+static ThreadClosure *
+thread_closure_ref (ThreadClosure *thread_closure)
+{
+ g_return_val_if_fail (thread_closure != NULL, NULL);
+ g_return_val_if_fail (thread_closure->ref_count > 0, NULL);
+
+ g_atomic_int_inc (&thread_closure->ref_count);
+
+ return thread_closure;
+}
+
+static void
+thread_closure_unref (ThreadClosure *thread_closure)
+{
+ g_return_if_fail (thread_closure != NULL);
+ g_return_if_fail (thread_closure->ref_count > 0);
+
+ if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
+ {
+ g_clear_object (&thread_closure->session);
+
+ g_clear_pointer (&thread_closure->main_context, g_main_context_unref);
+ g_clear_pointer (&thread_closure->main_loop, g_main_loop_unref);
+
+ if (thread_closure->tmpdir_dfd != -1)
+ close (thread_closure->tmpdir_dfd);
+
+ while (!g_queue_is_empty (&thread_closure->pending_queue))
+ g_object_unref (g_queue_pop_head (&thread_closure->pending_queue));
+
+ g_clear_pointer (&thread_closure->outstanding, g_hash_table_unref);
+
+ g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
+ g_mutex_clear (&thread_closure->output_stream_set_lock);
+
+ g_slice_free (ThreadClosure, thread_closure);
+ }
+}
+
+static void
+idle_closure_free (IdleClosure *idle_closure)
+{
+ g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);
+
+ if (idle_closure->notify != NULL)
+ idle_closure->notify (idle_closure->data);
+
+ g_slice_free (IdleClosure, idle_closure);
+}
+
static int
pending_task_compare (gconstpointer a,
gconstpointer b,
static void
pending_uri_free (OstreeFetcherPendingURI *pending)
{
- g_hash_table_remove (pending->self->outstanding, pending);
+ g_hash_table_remove (pending->thread_closure->outstanding, pending);
+
+ g_clear_pointer (&pending->thread_closure, thread_closure_unref);
soup_uri_free (pending->uri);
- g_clear_object (&pending->self);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_free (pending->out_tmpfile);
g_free (pending);
}
+static gboolean
+session_thread_idle_dispatch (gpointer data)
+{
+ IdleClosure *idle_closure = data;
+
+ idle_closure->function (idle_closure->thread_closure,
+ idle_closure->data);
+
+ return G_SOURCE_REMOVE;
+}
+
+static void
+session_thread_idle_add (ThreadClosure *thread_closure,
+ SessionThreadFunc function,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ IdleClosure *idle_closure;
+
+ g_return_if_fail (thread_closure != NULL);
+ g_return_if_fail (function != NULL);
+
+ idle_closure = g_slice_new (IdleClosure);
+ idle_closure->thread_closure = thread_closure_ref (thread_closure);
+ idle_closure->function = function;
+ idle_closure->data = data;
+ idle_closure->notify = notify;
+
+ g_main_context_invoke_full (thread_closure->main_context,
+ G_PRIORITY_DEFAULT,
+ session_thread_idle_dispatch,
+ idle_closure, /* takes ownership */
+ (GDestroyNotify) idle_closure_free);
+}
+
+static void
+session_thread_add_logger (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ glnx_unref_object SoupLogger *logger = NULL;
+
+ logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, 500);
+ soup_session_add_feature (thread_closure->session,
+ SOUP_SESSION_FEATURE (logger));
+}
+
+static void
+session_thread_config_flags (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ OstreeFetcherConfigFlags config_flags;
+
+ config_flags = GPOINTER_TO_UINT (data);
+
+ if ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
+ {
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_SSL_STRICT,
+ FALSE, NULL);
+ }
+}
+
+static void
+session_thread_set_proxy_cb (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ SoupURI *proxy_uri = data;
+
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_PROXY_URI,
+ proxy_uri, NULL);
+}
+
+static void
+session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ GTlsInteraction *interaction = data;
+
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_TLS_INTERACTION,
+ interaction, NULL);
+}
+
+static void
+session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ GTlsDatabase *database = data;
+
+ if (database != NULL)
+ {
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_TLS_DATABASE,
+ database, NULL);
+ }
+ else
+ {
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE,
+ TRUE, NULL);
+ }
+}
+
+static void
+on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
+
+static void
+session_thread_process_pending_queue (ThreadClosure *thread_closure)
+{
+
+ while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
+ g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
+ {
+ GTask *task;
+ OstreeFetcherPendingURI *pending;
+ GCancellable *cancellable;
+
+ task = g_queue_pop_head (&thread_closure->pending_queue);
+
+ pending = g_task_get_task_data (task);
+ cancellable = g_task_get_cancellable (task);
+
+ /* pending_uri_free() removes this. */
+ g_hash_table_add (thread_closure->outstanding, pending);
+
+ soup_request_send_async (pending->request,
+ cancellable,
+ on_request_sent,
+ g_object_ref (task));
+
+ g_object_unref (task);
+ }
+}
+
+static void
+session_thread_request_uri (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ GTask *task = G_TASK (data);
+ OstreeFetcherPendingURI *pending;
+ GCancellable *cancellable;
+ GError *local_error = NULL;
+
+ pending = g_task_get_task_data (task);
+ cancellable = g_task_get_cancellable (task);
+
+ pending->request = soup_session_request_uri (thread_closure->session,
+ pending->uri,
+ &local_error);
+
+ if (local_error != NULL)
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
+
+ if (pending->is_stream)
+ {
+ soup_request_send_async (pending->request,
+ cancellable,
+ on_request_sent,
+ g_object_ref (task));
+ }
+ else
+ {
+ g_autofree char *uristring = soup_uri_to_string (pending->uri, FALSE);
+ g_autofree char *tmpfile = NULL;
+ struct stat stbuf;
+ gboolean exists;
+
+ tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
+
+ if (fstatat (thread_closure->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
+ exists = TRUE;
+ else
+ {
+ if (errno == ENOENT)
+ exists = FALSE;
+ else
+ {
+ gs_set_error_from_errno (&local_error, errno);
+ g_task_return_error (task, local_error);
+ return;
+ }
+ }
+
+ if (SOUP_IS_REQUEST_HTTP (pending->request))
+ {
+ glnx_unref_object SoupMessage *msg = NULL;
+ msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
+ if (exists && stbuf.st_size > 0)
+ soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
+ }
+ pending->out_tmpfile = tmpfile;
+ tmpfile = NULL; /* Transfer ownership */
+
+ g_queue_insert_sorted (&thread_closure->pending_queue,
+ g_object_ref (task),
+ pending_task_compare, NULL);
+ session_thread_process_pending_queue (thread_closure);
+ }
+}
+
+static gpointer
+ostree_fetcher_session_thread (gpointer data)
+{
+ ThreadClosure *closure = data;
+ gint max_conns;
+
+ /* This becomes the GMainContext that SoupSession schedules async
+ * callbacks and emits signals from. Make it the thread-default
+ * context for this thread before creating the session. */
+ g_main_context_push_thread_default (closure->main_context);
+
+ closure->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
+ SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
+ SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
+ SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
+ SOUP_SESSION_TIMEOUT, 60,
+ SOUP_SESSION_IDLE_TIMEOUT, 60,
+ NULL);
+
+ g_object_get (closure->session, "max-conns-per-host", &max_conns, NULL);
+ if (max_conns < 8)
+ {
+ /* We download a lot of small objects in ostree, so this
+ * helps a lot. Also matches what most modern browsers do. */
+ max_conns = 8;
+ g_object_set (closure->session,
+ "max-conns-per-host",
+ max_conns, NULL);
+ }
+ closure->max_outstanding = 3 * max_conns;
+
+ g_main_loop_run (closure->main_loop);
+
+ g_main_context_pop_thread_default (closure->main_context);
+
+ thread_closure_unref (closure);
+
+ return NULL;
+}
+
static void
_ostree_fetcher_set_property (GObject *object,
guint prop_id,
static void
_ostree_fetcher_finalize (GObject *object)
{
- OstreeFetcher *self;
-
- self = OSTREE_FETCHER (object);
+ OstreeFetcher *self = OSTREE_FETCHER (object);
- if (self->tmpdir_dfd != -1)
- close (self->tmpdir_dfd);
+ /* Terminate the session thread. */
+ g_main_loop_quit (self->thread_closure->main_loop);
+ g_clear_pointer (&self->session_thread, g_thread_unref);
+ g_clear_pointer (&self->thread_closure, thread_closure_unref);
/* Note: We don't remove the tmpdir here, because that would cause
us to not reuse it on resume. This happens because we use two
g_free (self->tmpdir_name);
glnx_release_lock_file (&self->tmpdir_lock);
- g_clear_object (&self->session);
- g_clear_object (&self->client_cert);
+ G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
+}
- g_hash_table_destroy (self->output_stream_set);
+static void
+_ostree_fetcher_constructed (GObject *object)
+{
+ OstreeFetcher *self = OSTREE_FETCHER (object);
+ g_autoptr(GMainContext) main_context = NULL;
+ const char *http_proxy;
- while (!g_queue_is_empty (&self->pending_queue))
- g_object_unref (g_queue_pop_head (&self->pending_queue));
+ main_context = g_main_context_new ();
- g_hash_table_destroy (self->outstanding);
+ self->thread_closure = g_slice_new0 (ThreadClosure);
+ self->thread_closure->ref_count = 1;
+ self->thread_closure->main_context = g_main_context_ref (main_context);
+ self->thread_closure->main_loop = g_main_loop_new (main_context, FALSE);
+ self->thread_closure->tmpdir_dfd = -1;
- G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
+ self->thread_closure->outstanding = g_hash_table_new (NULL, NULL);
+ self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
+ (GDestroyNotify) NULL,
+ (GDestroyNotify) g_object_unref);
+
+ if (g_getenv ("OSTREE_DEBUG_HTTP"))
+ {
+ session_thread_idle_add (self->thread_closure,
+ session_thread_add_logger,
+ NULL, (GDestroyNotify) NULL);
+ }
+
+ if (self->config_flags != 0)
+ {
+ session_thread_idle_add (self->thread_closure,
+ session_thread_config_flags,
+ GUINT_TO_POINTER (self->config_flags),
+ (GDestroyNotify) NULL);
+ }
+
+ http_proxy = g_getenv ("http_proxy");
+ if (http_proxy != NULL)
+ _ostree_fetcher_set_proxy (self, http_proxy);
+
+ /* FIXME Maybe implement GInitableIface and use g_thread_try_new()
+ * so we can try to handle thread creation errors gracefully? */
+ self->session_thread = g_thread_new ("fetcher-session-thread",
+ ostree_fetcher_session_thread,
+ thread_closure_ref (self->thread_closure));
+
+ G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
}
static void
gobject_class->set_property = _ostree_fetcher_set_property;
gobject_class->get_property = _ostree_fetcher_get_property;
gobject_class->finalize = _ostree_fetcher_finalize;
+ gobject_class->constructed = _ostree_fetcher_constructed;
g_object_class_install_property (gobject_class,
PROP_CONFIG_FLAGS,
static void
_ostree_fetcher_init (OstreeFetcher *self)
{
- gint max_conns;
- const char *http_proxy;
GLnxLockFile empty_lockfile = GLNX_LOCK_FILE_INIT;
- g_queue_init (&self->pending_queue);
- self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
- SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
- SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
- SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
- SOUP_SESSION_TIMEOUT, 60,
- SOUP_SESSION_IDLE_TIMEOUT, 60,
- NULL);
-
- http_proxy = g_getenv ("http_proxy");
- if (http_proxy)
- {
- _ostree_fetcher_set_proxy (self, http_proxy);
- }
-
- if (g_getenv ("OSTREE_DEBUG_HTTP"))
- soup_session_add_feature (self->session, (SoupSessionFeature*)soup_logger_new (SOUP_LOGGER_LOG_BODY, 500));
-
- if ((self->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
- g_object_set (self->session, SOUP_SESSION_SSL_STRICT, FALSE, NULL);
-
- self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
- g_object_get (self->session, "max-conns-per-host", &max_conns, NULL);
- if (max_conns <= 8)
- {
- // We download a lot of small objects in ostree, so this helps a
- // lot. Also matches what most modern browsers do.
- max_conns = 8;
- g_object_set (self->session, "max-conns-per-host", max_conns, NULL);
- }
-
- self->max_outstanding = 3 * max_conns;
-
- self->output_stream_set = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_object_unref);
-
- self->outstanding = g_hash_table_new_full (NULL, NULL, NULL, NULL);
-
- self->tmpdir_dfd = -1;
self->tmpdir_lock = empty_lockfile;
-
}
OstreeFetcher *
if (!_ostree_repo_allocate_tmpdir (tmpdir_dfd,
"fetcher-",
&self->tmpdir_name,
- &self->tmpdir_dfd,
+ &self->thread_closure->tmpdir_dfd,
&self->tmpdir_lock,
NULL,
cancellable, error))
return NULL;
- self->tmpdir_dfd = tmpdir_dfd;
+ self->base_tmpdir_dfd = tmpdir_dfd;
return self;
}
int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
- return fetcher->tmpdir_dfd;
+ return fetcher->thread_closure->tmpdir_dfd;
}
void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
const char *http_proxy)
{
- SoupURI *proxy_uri = soup_uri_new (http_proxy);
+ SoupURI *proxy_uri;
+
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (http_proxy != NULL);
+
+ proxy_uri = soup_uri_new (http_proxy);
+
if (!proxy_uri)
{
g_warning ("Invalid proxy URI '%s'", http_proxy);
}
else
{
- g_object_set (self->session, SOUP_SESSION_PROXY_URI, proxy_uri, NULL);
- soup_uri_free (proxy_uri);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_proxy_cb,
+ proxy_uri, /* takes ownership */
+ (GDestroyNotify) soup_uri_free);
}
}
void
-_ostree_fetcher_set_client_cert (OstreeFetcher *fetcher,
- GTlsCertificate *cert)
+_ostree_fetcher_set_client_cert (OstreeFetcher *self,
+ GTlsCertificate *cert)
{
- g_clear_object (&fetcher->client_cert);
- fetcher->client_cert = g_object_ref (cert);
- if (fetcher->client_cert)
- {
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (G_IS_TLS_CERTIFICATE (cert));
+
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
- g_autoptr(GTlsInteraction) interaction =
- (GTlsInteraction*)_ostree_tls_cert_interaction_new (fetcher->client_cert);
- g_object_set (fetcher->session, "tls-interaction", interaction, NULL);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_tls_interaction_cb,
+ _ostree_tls_cert_interaction_new (cert),
+ (GDestroyNotify) g_object_unref);
#else
- g_warning ("This version of OSTree is compiled without client side certificate support");
+ g_warning ("This version of OSTree is compiled without client side certificate support");
#endif
- }
}
void
_ostree_fetcher_set_tls_database (OstreeFetcher *self,
GTlsDatabase *db)
{
- if (db)
- g_object_set ((GObject*)self->session, "tls-database", db, NULL);
- else
- g_object_set ((GObject*)self->session, "ssl-use-system-ca-file", TRUE, NULL);
-}
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (db == NULL || G_IS_TLS_DATABASE (db));
-static void
-on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
-
-static void
-ostree_fetcher_process_pending_queue (OstreeFetcher *self)
-{
-
- while (g_queue_peek_head (&self->pending_queue) != NULL &&
- g_hash_table_size (self->outstanding) < self->max_outstanding)
+ if (db != NULL)
{
- GTask *task;
- OstreeFetcherPendingURI *pending;
- GCancellable *cancellable;
-
- task = g_queue_pop_head (&self->pending_queue);
-
- pending = g_task_get_task_data (task);
- cancellable = g_task_get_cancellable (task);
-
- /* pending_uri_free() removes this. */
- g_hash_table_add (self->outstanding, pending);
-
- soup_request_send_async (pending->request,
- cancellable,
- on_request_sent,
- g_object_ref (task));
-
- g_object_unref (task);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_tls_database_cb,
+ g_object_ref (db),
+ (GDestroyNotify) g_object_unref);
+ }
+ else
+ {
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_tls_database_cb,
+ NULL, (GDestroyNotify) NULL);
}
}
{
if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out;
- g_hash_table_remove (pending->self->output_stream_set, pending->out_stream);
+
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ g_hash_table_remove (pending->thread_closure->output_stream_set,
+ pending->out_stream);
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
- if (fstatat (pending->self->tmpdir_dfd, pending->out_tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
+ if (fstatat (pending->thread_closure->tmpdir_dfd,
+ pending->out_tmpfile,
+ &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{
gs_set_error_from_errno (error, errno);
goto out;
/* Now that we've finished downloading, continue with other queued
* requests.
*/
- ostree_fetcher_process_pending_queue (pending->self);
+ session_thread_process_pending_queue (pending->thread_closure);
if (stbuf.st_size < pending->content_length)
{
}
else
{
- pending->self->total_downloaded += stbuf.st_size;
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ pending->thread_closure->total_downloaded += stbuf.st_size;
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
ret = TRUE;
else
oflags |= O_TRUNC;
- fd = openat (pending->self->tmpdir_dfd, pending->out_tmpfile, oflags, 0666);
+ fd = openat (pending->thread_closure->tmpdir_dfd,
+ pending->out_tmpfile, oflags, 0666);
if (fd == -1)
{
gs_set_error_from_errno (&local_error, errno);
goto out;
}
pending->out_stream = g_unix_output_stream_new (fd, TRUE);
- g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream));
+
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ g_hash_table_add (pending->thread_closure->output_stream_set,
+ g_object_ref (pending->out_stream));
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
gpointer user_data,
gpointer source_tag)
{
- GTask *task;
- OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1);
- GError *local_error = NULL;
+ g_autoptr(GTask) task = NULL;
+ OstreeFetcherPendingURI *pending;
- pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (uri != NULL);
- pending->self = g_object_ref (self);
+ /* SoupRequest is created in session thread. */
+ pending = g_new0 (OstreeFetcherPendingURI, 1);
+ pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->uri = soup_uri_copy (uri);
pending->max_size = max_size;
pending->is_stream = is_stream;
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
- if (is_stream)
- {
- soup_request_send_async (pending->request,
- cancellable,
- on_request_sent,
- g_object_ref (task));
- }
- else
- {
- g_autofree char *uristring = soup_uri_to_string (uri, FALSE);
- g_autofree char *tmpfile = NULL;
- struct stat stbuf;
- gboolean exists;
-
- tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
-
- if (fstatat (self->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
- exists = TRUE;
- else
- {
- if (errno == ENOENT)
- exists = FALSE;
- else
- {
- gs_set_error_from_errno (&local_error, errno);
- goto out;
- }
- }
-
- if (SOUP_IS_REQUEST_HTTP (pending->request))
- {
- glnx_unref_object SoupMessage *msg = NULL;
- msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
- if (exists && stbuf.st_size > 0)
- soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
- }
- pending->out_tmpfile = tmpfile;
- tmpfile = NULL; /* Transfer ownership */
-
- g_queue_insert_sorted (&self->pending_queue,
- g_object_ref (task),
- pending_task_compare, NULL);
- ostree_fetcher_process_pending_queue (self);
- }
-
- g_assert_no_error (local_error);
-
-out:
- g_object_unref (task);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_request_uri,
+ g_object_ref (task),
+ (GDestroyNotify) g_object_unref);
}
void
guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
- guint64 ret = self->total_downloaded;
GHashTableIter hiter;
gpointer key, value;
+ guint64 ret;
+
+ g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);
+
+ g_mutex_lock (&self->thread_closure->output_stream_set_lock);
- g_hash_table_iter_init (&hiter, self->output_stream_set);
+ ret = self->thread_closure->total_downloaded;
+
+ g_hash_table_iter_init (&hiter, self->thread_closure->output_stream_set);
while (g_hash_table_iter_next (&hiter, &key, &value))
{
GFileOutputStream *stream = key;
ret += stbuf.st_size;
}
}
-
+
+ g_mutex_unlock (&self->thread_closure->output_stream_set_lock);
+
return ret;
}