-#!/usr/bin/python2
+#!/usr/bin/python
from wsgiref.handlers import format_date_time
from email.utils import parsedate
from calendar import timegm
import gzip
-from urlparse import parse_qs
-import BaseHTTPServer
+import sys
import time
import zlib
-from StringIO import StringIO
+
+if sys.version_info[0] >= 3:
+ from urllib.parse import parse_qs
+ import http.server as http_server
+ from io import BytesIO
+else:
+ from urlparse import parse_qs
+ import BaseHTTPServer as http_server
+ from StringIO import StringIO as BytesIO
server_start_time = int(time.time())
else:
return None
-class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+class RequestHandler(http_server.BaseHTTPRequestHandler):
def do_GET(self):
parts = self.path.split('?', 1)
path = parts[0]
if accept_encoding and accept_encoding == 'gzip':
self.send_header("Content-Encoding", "gzip")
- buf = StringIO()
- gzfile = gzip.GzipFile(mode='w', fileobj=buf)
- gzfile.write(contents)
+ buf = BytesIO()
+ gzfile = gzip.GzipFile(mode='wb', fileobj=buf)
+ if isinstance(contents, bytes):
+ gzfile.write(contents)
+ else:
+ gzfile.write(contents.encode('utf-8'))
gzfile.close()
contents = buf.getvalue()
self.end_headers()
if response == 200:
- self.wfile.write(contents)
+ if isinstance(contents, bytes):
+ self.wfile.write(contents)
+ else:
+ self.wfile.write(contents.encode('utf-8'))
def test():
- BaseHTTPServer.test(RequestHandler)
+ if sys.version_info[0] >= 3:
+ http_server.test(RequestHandler, port=0)
+ else:
+ http_server.test(RequestHandler)
if __name__ == '__main__':
test()
-#!/usr/bin/python2
+#!/usr/bin/python
+
+from __future__ import print_function
-import httplib
-import urllib
import sys
+if sys.version_info[0] >= 3:
+ import http.client as http_client
+ import urllib.parse as urllib_parse
+else:
+ import httplib as http_client
+ import urllib as urllib_parse
+
if sys.argv[2] == 'add':
detach_icons = '--detach-icons' in sys.argv
if detach_icons:
params = {'d': sys.argv[5]}
if detach_icons:
params['detach-icons'] = 1
- query = urllib.urlencode(params)
- conn = httplib.HTTPConnection(sys.argv[1])
+ query = urllib_parse.urlencode(params)
+ conn = http_client.HTTPConnection(sys.argv[1])
path = "/testing/{repo}/{tag}?{query}".format(repo=sys.argv[3],
tag=sys.argv[4],
query=query)
conn.request("POST", path)
response = conn.getresponse()
if response.status != 200:
- print >>sys.stderr, response.read()
- print >>sys.stderr, "Failed: status={}".format(response.status)
+ print(response.read(), file=sys.stderr)
+ print("Failed: status={}".format(response.status), file=sys.stderr)
sys.exit(1)
elif sys.argv[2] == 'delete':
- conn = httplib.HTTPConnection(sys.argv[1])
+ conn = http_client.HTTPConnection(sys.argv[1])
path = "/testing/{repo}/{ref}".format(repo=sys.argv[3],
ref=sys.argv[4])
conn.request("DELETE", path)
response = conn.getresponse()
if response.status != 200:
- print >>sys.stderr, response.read()
- print >>sys.stderr, "Failed: status={}".format(response.status)
+ print(response.read(), file=sys.stderr)
+ print("Failed: status={}".format(response.status), file=sys.stderr)
sys.exit(1)
else:
- print >>sys.stderr, "Usage: oci-registry-client.py [add|remove] ARGS"
+ print("Usage: oci-registry-client.py [add|remove] ARGS", file=sys.stderr)
sys.exit(1)
-#!/usr/bin/python2
+#!/usr/bin/python
+
+from __future__ import print_function
-import BaseHTTPServer
import base64
import hashlib
import json
import os
import sys
-from urlparse import parse_qs
import time
+if sys.version_info[0] >= 3:
+ from urllib.parse import parse_qs
+ import http.server as http_server
+else:
+ from urlparse import parse_qs
+ import BaseHTTPServer as http_server
+
repositories = {}
icons = {}
else:
return None
-class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+class RequestHandler(http_server.BaseHTTPRequestHandler):
def check_route(self, route):
parts = self.path.split('?', 1)
path = parts[0].split('/')
result = []
route_path = route.split('/')
- print(route_path, path)
+ print((route_path, path))
if len(route_path) != len(path):
return False
if response == 200:
if response_file:
- with open(response_file) as f:
+ with open(response_file, 'rb') as f:
response_string = f.read()
- self.wfile.write(response_string)
+
+ if isinstance(response_string, bytes):
+ self.wfile.write(response_string)
+ else:
+ self.wfile.write(response_string.encode('utf-8'))
def do_POST(self):
if self.check_route('/testing/@repo_name/@tag'):
return
def test():
- BaseHTTPServer.test(RequestHandler)
+ if sys.version_info[0] >= 3:
+ http_server.test(RequestHandler, port=0)
+ else:
+ http_server.test(RequestHandler)
if __name__ == '__main__':
test()
. $(dirname $0)/libtest.sh
-$(dirname $0)/test-webserver.sh "" "python2 $test_srcdir/http-utils-test-server.py 0"
+$(dirname $0)/test-webserver.sh "" "python $test_srcdir/http-utils-test-server.py 0"
FLATPAK_HTTP_PID=$(cat httpd-pid)
mv httpd-port httpd-port-main
port=$(cat httpd-port-main)
# Start the fake registry server
-$(dirname $0)/test-webserver.sh "" "python2 $test_srcdir/oci-registry-server.py 0"
+$(dirname $0)/test-webserver.sh "" "python $test_srcdir/oci-registry-server.py 0"
FLATPAK_HTTP_PID=$(cat httpd-pid)
mv httpd-port httpd-port-main
port=$(cat httpd-port-main)
-client="python2 $test_srcdir/oci-registry-client.py 127.0.0.1:$port"
+client="python $test_srcdir/oci-registry-client.py 127.0.0.1:$port"
setup_repo_no_add oci