From cc11ea806416065d2b8efe6f786b3767120ddbb6 Mon Sep 17 00:00:00 2001 From: Mark Pitblado Date: Thu, 17 Jul 2025 12:20:04 -0700 Subject: [PATCH] fix: generate thumbnail for multilayer tiff When there is more than one layer in a tiff file, ImageMagick's convert will generate multiple files as output, with a -0, -1 (and so on) suffix. This causes a 404 error when specify goes to search for the thumbnails, and produces excess images in the thumbnails directly. This commit checks if there are multiple layers in the tiff, and if so, uses the first layer to generate the thumbnail (similar to how PDF thumbnails are handled). --- server.py | 207 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 74 deletions(-) diff --git a/server.py b/server.py index b906f66..65b5391 100644 --- a/server.py +++ b/server.py @@ -10,12 +10,19 @@ import hmac import json import time -from sh import convert +from sh import convert, identify, ErrorReturnCode import settings from bottle import ( - Response, request, response, static_file, template, abort, - HTTPResponse, route) + Response, + request, + response, + static_file, + template, + abort, + HTTPResponse, + route, +) def log(msg): @@ -23,6 +30,16 @@ def log(msg): print(msg) +def has_multiple_layers(image_path): + """Checks if a tiff image contains multiple layers.""" + try: + result = identify("-format", "%n", image_path) + frame_count = int(result.strip()) + return frame_count > 1 + except ErrorReturnCode: + return False + + def get_rel_path(coll, thumb_p): """Return originals or thumbnails subdirectory of the main attachments directory for the given collection. @@ -45,12 +62,13 @@ def generate_token(timestamp, filename): This is for comparing to the client submited token. """ timestamp = str(timestamp) - mac = hmac.new(settings.KEY.encode(), timestamp.encode() + filename.encode(), 'md5') - return ':'.join((mac.hexdigest(), timestamp)) + mac = hmac.new(settings.KEY.encode(), timestamp.encode() + filename.encode(), "md5") + return ":".join((mac.hexdigest(), timestamp)) class TokenException(Exception): """Raised when an auth token is invalid for some reason.""" + pass @@ -68,12 +86,12 @@ def validate_token(token_in, filename): """ if settings.KEY is None: return - if token_in == '': + if token_in == "": raise TokenException("Auth token is missing.") - if ':' not in token_in: + if ":" not in token_in: raise TokenException("Auth token is malformed.") - mac_in, timestr = token_in.split(':') + mac_in, timestr = token_in.split(":") try: timestamp = int(timestr) except ValueError: @@ -82,7 +100,10 @@ def validate_token(token_in, filename): if settings.TIME_TOLERANCE is not None: current_time = get_timestamp() if not abs(current_time - timestamp) < settings.TIME_TOLERANCE: - raise TokenException("Auth token timestamp out of range: %s vs %s" % (timestamp, current_time)) + raise TokenException( + "Auth token timestamp out of range: %s vs %s" + % (timestamp, current_time) + ) if token_in != generate_token(timestamp, filename): raise TokenException("Auth token is invalid.") @@ -105,12 +126,16 @@ def decorator(func): @include_timestamp @wraps(func) def wrapper(*args, **kwargs): - if always or request.method not in ('GET', 'HEAD') or settings.REQUIRE_KEY_FOR_GET: - params = request.forms if request.method == 'POST' else request.query + if ( + always + or request.method not in ("GET", "HEAD") + or settings.REQUIRE_KEY_FOR_GET + ): + params = request.forms if request.method == "POST" else request.query try: validate_token(params.token, params.get(filename_param)) except TokenException as e: - response.content_type = 'text/plain; charset=utf-8' + response.content_type = "text/plain; charset=utf-8" response.status = 403 return e return func(*args, **kwargs) @@ -128,8 +153,9 @@ def include_timestamp(func): @wraps(func) def wrapper(*args, **kwargs): result = func(*args, **kwargs) - (result if isinstance(result, Response) else response) \ - .set_header('X-Timestamp', str(get_timestamp())) + (result if isinstance(result, Response) else response).set_header( + "X-Timestamp", str(get_timestamp()) + ) return result return wrapper @@ -143,11 +169,12 @@ def wrapper(*args, **kwargs): try: result = func(*args, **kwargs) except HTTPResponse as r: - r.set_header('Access-Control-Allow-Origin', '*') + r.set_header("Access-Control-Allow-Origin", "*") raise - (result if isinstance(result, Response) else response) \ - .set_header('Access-Control-Allow-Origin', '*') + (result if isinstance(result, Response) else response).set_header( + "Access-Control-Allow-Origin", "*" + ) return result return wrapper @@ -161,7 +188,7 @@ def resolve_file(): Returns the relative path to the requested file in the base attachments directory. """ - thumb_p = (request.query['type'] == "T") + thumb_p = request.query["type"] == "T" storename = request.query.filename relpath = get_rel_path(request.query.coll, thumb_p) @@ -177,9 +204,9 @@ def resolve_file(): root, ext = path.splitext(storename) - if mimetype in ('application/pdf', 'image/tiff'): + if mimetype in ("application/pdf", "image/tiff"): # use PNG for PDF thumbnails - ext = '.png' + ext = ".png" scaled_name = "%s_%d%s" % (root, scale, ext) scaled_pathname = path.join(basepath, scaled_name) @@ -191,17 +218,27 @@ def resolve_file(): if not path.exists(basepath): mkdir(basepath) - orig_dir = path.join(settings.BASE_DIR, get_rel_path(request.query.coll, thumb_p=False)) + orig_dir = path.join( + settings.BASE_DIR, get_rel_path(request.query.coll, thumb_p=False) + ) orig_path = path.join(orig_dir, storename) if not path.exists(orig_path): abort(404, "Missing original: %s" % orig_path) input_spec = orig_path - convert_args = ('-resize', "%dx%d>" % (scale, scale)) - if mimetype == 'application/pdf': - input_spec += '[0]' # only thumbnail first page of PDF - convert_args += ('-background', 'white', '-flatten') # add white background to PDFs + convert_args = ("-resize", "%dx%d>" % (scale, scale)) + if mimetype == "application/pdf": + input_spec += "[0]" # only thumbnail first page of PDF + convert_args += ( + "-background", + "white", + "-flatten", + ) # add white background to PDFs + if (mimetype == "image/tiff" or mimetype == "image/tif") and has_multiple_layers( + input_spec + ): + input_spec += "[0]" # only thumbnail the first layer of the TIFF image log("Scaling thumbnail to %d" % scale) convert(input_spec, *(convert_args + (scaled_pathname,))) @@ -209,7 +246,7 @@ def resolve_file(): return path.join(relpath, scaled_name) -@route('/static/') +@route("/static/") def static(path): """Serve static files to the client. Primarily for Web Portal.""" if not settings.ALLOW_STATIC_FILE_ACCESS: @@ -217,50 +254,55 @@ def static(path): return static_file(path, root=settings.BASE_DIR) -@route('/getfileref') +@route("/getfileref") @allow_cross_origin def getfileref(): """Returns a URL to the static file indicated by the query parameters.""" if not settings.ALLOW_STATIC_FILE_ACCESS: abort(404) - response.content_type = 'text/plain; charset=utf-8' - return "http://%s:%d/static/%s" % (settings.HOST, settings.PORT, - pathname2url(resolve_file())) + response.content_type = "text/plain; charset=utf-8" + return "http://%s:%d/static/%s" % ( + settings.HOST, + settings.PORT, + pathname2url(resolve_file()), + ) -@route('/fileget') -@require_token('filename') +@route("/fileget") +@require_token("filename") def fileget(): """Returns the file data of the file indicated by the query parameters.""" r = static_file(resolve_file(), root=settings.BASE_DIR) download_name = request.query.downloadname if download_name: - download_name = quote(path.basename(download_name).encode('ascii', 'replace')) - r.set_header('Content-Disposition', "inline; filename*=utf-8''%s" % download_name) + download_name = quote(path.basename(download_name).encode("ascii", "replace")) + r.set_header( + "Content-Disposition", "inline; filename*=utf-8''%s" % download_name + ) return r -@route('/fileupload', method='OPTIONS') +@route("/fileupload", method="OPTIONS") @allow_cross_origin def fileupload_options(): response.content_type = "text/plain; charset=utf-8" - return '' + return "" -@route('/fileupload', method='POST') +@route("/fileupload", method="POST") @allow_cross_origin -@require_token('store') +@require_token("store") def fileupload(): """Accept original file uploads and store them in the proper attchment subdirectory. """ - thumb_p = (request.forms['type'] == "T") + thumb_p = request.forms["type"] == "T" storename = request.forms.store basepath = path.join(settings.BASE_DIR, get_rel_path(request.forms.coll, thumb_p)) pathname = path.join(basepath, storename) if thumb_p: - return 'Ignoring thumbnail upload!' + return "Ignoring thumbnail upload!" if not path.exists(basepath): mkdir(basepath) @@ -268,20 +310,24 @@ def fileupload(): upload = list(request.files.values())[0] upload.save(pathname, overwrite=True) - response.content_type = 'text/plain; charset=utf-8' - return 'Ok.' + response.content_type = "text/plain; charset=utf-8" + return "Ok." -@route('/filedelete', method='POST') -@require_token('filename') +@route("/filedelete", method="POST") +@require_token("filename") def filedelete(): """Delete the file indicated by the query parameters. Returns 404 if the original file does not exist. Any associated thumbnails will also be deleted. """ storename = request.forms.filename - basepath = path.join(settings.BASE_DIR, get_rel_path(request.forms.coll, thumb_p=False)) - thumbpath = path.join(settings.BASE_DIR, get_rel_path(request.forms.coll, thumb_p=True)) + basepath = path.join( + settings.BASE_DIR, get_rel_path(request.forms.coll, thumb_p=False) + ) + thumbpath = path.join( + settings.BASE_DIR, get_rel_path(request.forms.coll, thumb_p=True) + ) pathname = path.join(basepath, storename) if not path.exists(pathname): @@ -290,84 +336,97 @@ def filedelete(): log("Deleting %s" % pathname) remove(pathname) - prefix = storename.split('.att')[0] - pattern = path.join(thumbpath, prefix + '*') + prefix = storename.split(".att")[0] + pattern = path.join(thumbpath, prefix + "*") log("Deleting thumbnails matching %s" % pattern) for name in glob(pattern): remove(name) - response.content_type = 'text/plain; charset=utf-8' - return 'Ok.' + response.content_type = "text/plain; charset=utf-8" + return "Ok." -@route('/getmetadata') -@require_token('filename') +@route("/getmetadata") +@require_token("filename") def getmetadata(): """Provides access to EXIF metadata.""" storename = request.query.filename - basepath = path.join(settings.BASE_DIR, get_rel_path(request.query.coll, thumb_p=False)) + basepath = path.join( + settings.BASE_DIR, get_rel_path(request.query.coll, thumb_p=False) + ) pathname = path.join(basepath, storename) datatype = request.query.dt if not path.exists(pathname): abort(404) - with open(pathname, 'rb') as f: + with open(pathname, "rb") as f: try: tags = exifread.process_file(f) except: log("Error reading exif data.") tags = {} - if datatype == 'date': + if datatype == "date": try: - return str(tags['EXIF DateTimeOriginal']) + return str(tags["EXIF DateTimeOriginal"]) except KeyError: - abort(404, 'DateTime not found in EXIF') + abort(404, "DateTime not found in EXIF") data = defaultdict(dict) for key, value in list(tags.items()): parts = key.split() - if len(parts) < 2: continue + if len(parts) < 2: + continue try: - v = str(value).decode('ascii', 'replace').encode('utf-8') + v = str(value).decode("ascii", "replace").encode("utf-8") except TypeError: v = repr(value) data[parts[0]][parts[1]] = str(v) - response.content_type = 'application/json' - data = [OrderedDict((('Name', key), ('Fields', value))) - for key, value in list(data.items())] + response.content_type = "application/json" + data = [ + OrderedDict((("Name", key), ("Fields", value))) + for key, value in list(data.items()) + ] return json.dumps(data, indent=4) -@route('/testkey') -@require_token('random', always=True) +@route("/testkey") +@require_token("random", always=True) def testkey(): """If access to this resource succeeds, clients can conclude that they have a valid access key. """ - response.content_type = 'text/plain; charset=utf-8' - return 'Ok.' + response.content_type = "text/plain; charset=utf-8" + return "Ok." -@route('/web_asset_store.xml') +@route("/web_asset_store.xml") @include_timestamp def web_asset_store(): """Serve an XML description of the URLs available here.""" - response.content_type = 'text/xml; charset=utf-8' - return template('web_asset_store.xml', host="%s:%d" % (settings.SERVER_NAME, settings.SERVER_PORT)) + response.content_type = "text/xml; charset=utf-8" + return template( + "web_asset_store.xml", + host="%s:%d" % (settings.SERVER_NAME, settings.SERVER_PORT), + ) -@route('/') +@route("/") def main_page(): - return 'It works!' + return "It works!" -if __name__ == '__main__': +if __name__ == "__main__": from bottle import run - run(host='0.0.0.0', port=settings.PORT, server=settings.SERVER, - debug=settings.DEBUG, reloader=settings.DEBUG) + run( + host="0.0.0.0", + port=settings.PORT, + server=settings.SERVER, + debug=settings.DEBUG, + reloader=settings.DEBUG, + )