# -*- coding: utf-8 -*-
"""
File module.
All file functionality should go here. Deals with uploads and delivery of uploaded files.
"""
# import imghdr
import os
import shutil
import fnmatch
import platform
from typing import List
from threading import Thread
from json import JSONDecodeError
from flask import (
Blueprint,
flash,
request,
redirect,
url_for,
abort,
current_app,
send_from_directory,
make_response,
g,
)
from PIL import Image
from werkzeug.utils import secure_filename
from werkzeug.wrappers.response import Response
from ResearchNotes.auth import login_required
from ResearchNotes.database import db, Documents
import ResearchNotes.database_transactions as dbt
bp = Blueprint("files", __name__, url_prefix="/files")
# def validate_image(stream):
# """
# Determine image extension from an upload stream.
# Parameters
# ----------
# stream : TYPE
# DESCRIPTION.
# Returns
# -------
# TYPE
# DESCRIPTION.
# """
# header = stream.read(512)
# stream.seek(0)
# format_ = imghdr.what(None, header)
# if not format_:
# return None
# return "." + (format_ if format_ != "jpeg" else "jpg")
[docs]def uploaddir_path(info: List, only_sub_dir: bool = False) -> str:
"""
Provide a string with the complete path to the ESS, PPM or report directory.
Parameters
----------
info : List
["s", sample.id,sample.identifier]
['m',measurement.id, measurement.sample_id, measurement.measurement_sample.identifier]
['r',report.id, report.sample_id, report.reports_sample.identifier]
['i',instrument.id, instrument.identifier]
only_sub_dir : bool, optional
Only return the subdirectories without the UPLOAD_DIR prefix The default is False.
Returns
-------
path : str
String containing the path to the ESS, PMM, or report. For the use
in static only the sub-dirs starting from the UPLOAD_DIR can be given.
"""
dir_type = info[0]
# samples, measurements, and reports
if dir_type == "s":
pre = secure_filename("Sample_" + str(info[1]).zfill(4) + "-" + str(info[2]))
elif dir_type == "m":
pre = os.path.join(
secure_filename("Sample_" + str(info[2]).zfill(4) + "-" + str(info[3])),
secure_filename("Measurement" + str(info[1]).zfill(4)),
)
elif dir_type == "r":
pre = os.path.join(
secure_filename("Sample_" + str(info[2]).zfill(4) + "-" + str(info[3])),
secure_filename("Report" + str(info[1]).zfill(4)),
)
# Documents
elif dir_type == "d":
pre = secure_filename("Document_" + str(info[1]).zfill(4))
# instruments, instrumentation journal entries
elif dir_type == "i":
pre = os.path.join(
secure_filename("Instrument_" + str(info[1]).zfill(4) + "-" + str(info[2])),
)
elif dir_type == "e":
pre = os.path.join(
secure_filename("Instrument_" + str(info[2]).zfill(4) + "-" + str(info[3])),
secure_filename("Entry" + str(info[1]).zfill(4)),
)
elif dir_type == "it":
pre = os.path.join("Inventory", secure_filename("Item_" + str(info[1]).zfill(4)))
else:
abort(
400,
description=": uploaddir_path : Resource not found or type unclear {dir_type}",
)
#
if only_sub_dir:
return pre
#
path = os.path.normpath(os.path.join(current_app.config["UPLOAD_PATH"], pre))
current_app.logger.debug(f" uploaddir_path : {path}")
return path
[docs]def make_file_list(path: str) -> list[str]:
"""
Make a list of the files in the directory and sort after extension.
Parameters
----------
path : str
Path to the files. Normally comes from files.uploaddir_path.
Returns
-------
list[str]
List of files in the path to be shown (and handed to preview).
"""
files = []
if os.path.exists(path):
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
files.sort()
files.sort(key=lambda f: os.path.splitext(f)[1])
return files
[docs]def make_image_url_list(path: str, prefix: str, img_1st: str = None, zip_files=False) -> list[str]:
"""
Creates a list of all image files in a given path and returns a preview url for it.
Parameters
----------
prefix : str
Static file prefix to be given to the send_preview function.
path : str
The real path to the files
img_1st : str (Optional)
Name of an image file toi be sorted to the top of the list. Default is None.
zip_files : bool
If true, zip files and url for later iteration. Default is False
Returns
-------
list[str]
List of url to be given e.g. to send_preview to show the image.
"""
urls = []
if os.path.exists(path):
files = [
os.path.join(prefix, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))
]
imgs = fnmatch.filter(files, "*.gif")
imgs.extend(fnmatch.filter(files, "*.jpg"))
imgs.extend(fnmatch.filter(files, "*.jpeg"))
# imgs.extend(fnmatch.filter(files, "*.tif"))
imgs.extend(fnmatch.filter(files, "*.svg"))
imgs.extend(fnmatch.filter(files, "*.png"))
if platform.system() != "Windows":
imgs.extend(fnmatch.filter(files, "*.JPG"))
imgs.extend(fnmatch.filter(files, "*.JPEG"))
# imgs.extend(fnmatch.filter(files, "*.TIF"))
imgs.extend(fnmatch.filter(files, "*.SVG"))
imgs.extend(fnmatch.filter(files, "*.PNG"))
imgs.extend(fnmatch.filter(files, "*.GIF"))
imgs.sort()
current_app.logger.debug(f"Image list: {imgs}")
if img_1st:
img = os.path.join(prefix, secure_filename(img_1st))
imgs.insert(0, imgs.pop(imgs.index(img)))
urls = [url_for("files.send_preview", filename=img) for img in imgs]
files = [os.path.split(f)[1] for f in imgs]
if zip_files:
return zip(urls, files)
return urls
[docs]def async_tif_covert(tif_file: str) -> None:
"""
Make a jpg file for a uploaded tif file.
Parameters
----------
tif_file : str
File name
Returns
-------
None
"""
current_app.logger.debug(tif_file)
f_name, _ = os.path.splitext(tif_file)
outfile = f_name + ".jpg"
if tif_file != outfile:
try:
with Image.open(tif_file) as image:
image.save(outfile)
except OSError:
flash(f"Cannot convert {tif_file}", "alert-danger")
[docs]@bp.route("/<list:info>/upload_files", methods=["POST"])
@login_required
def upload_files(info: List) -> Response:
"""
Upload files from user to Sample/Measurement/Report directory.
Responses are changed to work with dropzone. So, make_response is used and not abort.
Parameters
----------
info : List
List of information needed, e.g. kind of directory (ESS, PPM, report) and ids etc.
Returns
-------
Flask.Response
Redirect to view uploading the file.
"""
path = uploaddir_path(info)
uploaded_file = request.files["file"]
if uploaded_file.filename is None:
current_app.logger.info(": upload_file : File not found in arg")
return make_response("File not found", 404)
# assert uploaded_file.filename is not None
filename = secure_filename(uploaded_file.filename)
if filename != "":
file_ext = os.path.splitext(filename)[1]
if file_ext in current_app.config["FORBIDDEN_EXTENSIONS"]:
current_app.logger.info(
f": upload_file : Forbidden file extension by user {g.user.id} ({g.user.name})"
)
return make_response("Forbidden file extension", 415)
if not os.path.exists(path):
try:
os.makedirs(path)
except OSError:
current_app.logger.debug(f": upload_file : Try to make {path} and failed.")
uploaded_file.save(os.path.join(path, filename))
else:
current_app.logger.info(": upload file : File not found")
return make_response("File not found", 404)
if os.path.splitext(filename)[1] in [".tif", ".TIF", ".TIFF", ".tiff"]:
flash("Auto create a .jpg version of file", "alert-info")
Thread(target=async_tif_covert, args=(os.path.join(path, filename),)).start()
return make_response("Should be fine", 200)
[docs]@bp.route("/upload_json", methods=["POST"])
@login_required
def upload_json() -> Response:
"""
Upload a JSON file containing the information to create a sample.
Re-importing the JSOn that the export fucntion wrote. We do little checking and assume that the
is not any bad data in the JSON. But the identifier does through secure_file and long description
is treated by bleach.
Responses are changed to work with dropzone. So, make_response is used and not abort.
Returns
-------
"""
current_app.logger.debug(" htmx : Process uploaded json")
uploaded_file = request.files["file"]
if uploaded_file.filename is None:
current_app.logger.info("File not found")
return make_response("File not found", 404)
filename = secure_filename(uploaded_file.filename)
uppath = os.path.join(current_app.config["UPLOAD_PATH"], "json_upload")
if filename != "":
file_ext = os.path.splitext(filename)[1]
if file_ext not in [".JSON", ".json"]:
current_app.logger.info(": upload_json : Forbidden file extension")
return make_response("Forbidden file extension", 415)
if not os.path.exists(uppath):
os.makedirs(uppath)
uploaded_file.save(os.path.join(uppath, filename))
else:
current_app.logger.info(": htmx_upload_json : File not found")
return make_response("File not found", 404)
with open(os.path.join(uppath, filename), "r", encoding="utf-8") as file:
try:
ess_dict = current_app.json.load(file)
except JSONDecodeError:
return make_response(f"Not an json file or could not decode", 415)
if "identifier" in list(ess_dict.keys()):
ess_dict["creator_id"] = g.user.id
ess_dict["group_id"] = g.user.group_id
sid = dbt.create_sample(db, ess_dict)
for ppm in ess_dict["ppm_list"]:
ppm["creator_id"] = g.user.id
ppm["mtype_id"] = 1
ppm["instrument_id"] = None
ppm["sample_id"] = sid
mid = dbt.create_measurement(db, ppm)
for report in ppm["report_list"]:
report["creator_id"] = g.user.id
report["sample_id"] = sid
report["measurement_id"] = mid
rid = dbt.create_report(db, report)
else:
current_app.logger.info("Dict has the wrong form")
return make_response("Dict has the wrong form", 415)
current_app.logger.debug(" htmx : Done")
return make_response("Should be fine", 200)
[docs]@bp.route("/send_upload/<list:info>")
@login_required
def send_upload(info: List) -> Response:
"""
Send uploaded file to user.
Parameters
----------
info : List
List of information needed, e.g. kind of directory (ESS, PPM, report) and ids etc.
Returns
-------
Flask.Response
Send the file as attachment to user.
"""
file = request.args.get("file")
path = uploaddir_path(info)
if file is None:
abort(
404, description=f"file : send_upload : File is None in call to function for user {g.user}"
)
return send_from_directory(path, file, as_attachment=True, download_name=file)
[docs]@bp.route("/send_preview/<path:filename>")
@login_required
def send_preview(filename: str) -> Response:
"""
Send a file in the upload directory.
For sending files in the UPLOAD dir, we implemented a send_file function that checks, if the user is
signed in. Later, we could also restrict access too files to directories that the user has access.
As dirs and filenames are hard to guess, we do not do this at the current state.
Due to the behavior of send_from_directory, we create first the full path and then split it into
filename and directory. By adding UPLOAD dir as default, no files from other directories can be sent
(path strings and filename are joined by saFe-Join to avoid dir escapes or directory transversal).
Parameters
----------
filename : str
File name to the file to be sent as preview. Can include a path.
Returns
-------
Flask.Response
File to be sent as preview. Normally, this should be an image to be displayed either in image preview
or markup text.
"""
if not (
filename.startswith("Sample")
or filename.startswith("Instrument")
or filename.startswith("Document")
or filename.startswith("Inventory")
):
current_app.logger.warning(f": file.send_preview : {g.user} tried to send {filename}")
return make_response("Forbidden", 403)
if not os.path.isfile(os.path.normpath(os.path.join(current_app.config["UPLOAD_PATH"], filename))):
current_app.logger.info(f": file.send_preview : {g.user} tried to send not existing {filename}")
return make_response("Not found.", 404)
path, file = os.path.split(
os.path.normpath(os.path.join(current_app.config["UPLOAD_PATH"], filename))
)
return send_from_directory(path, file)
[docs]@bp.route("/delete_upload/<list:info>")
@login_required
def delete_upload(info: List) -> Response:
"""
Delete uploaded file.
Parameters
----------
info : List
List of information needed, e.g. kind of directory (ESS, PPM, report) and ids etc.
Returns
-------
Flask.Response
Go back to original view.
"""
file = request.args.get("file")
path = uploaddir_path(info)
if file is None:
abort(404, description=f"file : delete_upload : File None call to function for user {g.user}")
try:
os.remove(os.path.join(path, file))
flash(f"Deleted {file}", "alert-success")
except OSError as error:
abort(
500,
description=f": delete_upload : Failed to delete file {file} in {path}."
+ f" Exception: {str(error)}",
)
return redirect_to_view(view_type=info[0], vid=info[1])
def _prepare_download(root: str, path: str):
"""
Create a zip file of directory path in root.
Parameters
----------
root : str
Root dir to create the file in
path : str
Sub directory to be zipped and the file later moved to.
Returns
-------
"""
shutil.make_archive(os.path.join(root, "download"), "zip", path)
shutil.move(
os.path.join(root, "download.zip"),
os.path.join(root, path, "download.zip"),
)
[docs]@bp.route("/download/<list:info>", methods=["POST"])
@login_required
def download(info: List) -> Response:
"""
Download all files in a Sample/Measurement/Report directory uploaded before.
Function is broken and leads to time-out for large directories
Parameters
----------
info : List
First entry defines the type (sample,measurement or report). The
following members of the list are sample id and identifier as well as
measurement id and report id, respectively.
Returns
-------
Flask.Response
Sends zip file from directory.
"""
path = uploaddir_path(info)
flash("Download is prepared. Refresh page after a while and use download.zip", "alert-info")
Thread(target=_prepare_download, args=(current_app.config["UPLOAD_PATH"], path)).start()
return redirect_to_view(view_type=info[0], vid=info[1])
########################################################################################################
# This reproduces the same function as preview.redirect_home. The two functions should be merged.
# Maybe move to util therefore
########################################################################################################
[docs]def redirect_to_view(view_type: str, vid: int):
"""
Create a return url depending on the type of directory or view.
Parameters
----------
view_type : str
Denote the type of view to return to
vid : int
ID of the view item to return to
Returns
-------
Response
"""
match view_type:
case "s":
response = redirect(url_for("samples.sample_view", sid=int(vid)))
case "m":
response = redirect(url_for("measurements.measurement_view", mid=int(vid)))
case "r":
response = redirect(url_for("report.report_view", rid=int(vid)))
case "d":
doc = db.session.get(Documents, int(vid))
response = redirect(url_for("documents.show", label=doc.label))
case "i":
response = redirect(url_for("instruments.instrument_view", iid=int(vid)))
case "e":
response = redirect(url_for("entry.entry_view", eid=int(vid)))
case "it":
response = redirect(url_for("inventory.item_view", iid=int(vid)))
case _:
response = make_response("Not found", 404)
return response