Source code for ResearchNotes.files

# -*- coding: utf-8 -*-
"""
File module.

All file functionality should go here. Deals with uploads and delivery of uploaded files.
"""
# import imghdr
import os
import shutil
import fnmatch
import platform

from typing import List
from threading import Thread
from json import JSONDecodeError

from flask import (
    Blueprint,
    flash,
    request,
    redirect,
    url_for,
    abort,
    current_app,
    send_from_directory,
    make_response,
    g,
)

from PIL import Image

from werkzeug.utils import secure_filename
from werkzeug.wrappers.response import Response

from ResearchNotes.auth import login_required
from ResearchNotes.database import db, Documents
import ResearchNotes.database_transactions as dbt

bp = Blueprint("files", __name__, url_prefix="/files")


# def validate_image(stream):
#     """
#     Determine image extension from an upload stream.

#     Parameters
#     ----------
#     stream : TYPE
#         DESCRIPTION.

#     Returns
#     -------
#     TYPE
#         DESCRIPTION.

#     """
#     header = stream.read(512)
#     stream.seek(0)
#     format_ = imghdr.what(None, header)
#     if not format_:
#         return None
#     return "." + (format_ if format_ != "jpeg" else "jpg")


[docs]def uploaddir_path(info: List, only_sub_dir: bool = False) -> str: """ Provide a string with the complete path to the ESS, PPM or report directory. Parameters ---------- info : List ["s", sample.id,sample.identifier] ['m',measurement.id, measurement.sample_id, measurement.measurement_sample.identifier] ['r',report.id, report.sample_id, report.reports_sample.identifier] ['i',instrument.id, instrument.identifier] only_sub_dir : bool, optional Only return the subdirectories without the UPLOAD_DIR prefix The default is False. Returns ------- path : str String containing the path to the ESS, PMM, or report. For the use in static only the sub-dirs starting from the UPLOAD_DIR can be given. """ dir_type = info[0] # samples, measurements, and reports if dir_type == "s": pre = secure_filename("Sample_" + str(info[1]).zfill(4) + "-" + str(info[2])) elif dir_type == "m": pre = os.path.join( secure_filename("Sample_" + str(info[2]).zfill(4) + "-" + str(info[3])), secure_filename("Measurement" + str(info[1]).zfill(4)), ) elif dir_type == "r": pre = os.path.join( secure_filename("Sample_" + str(info[2]).zfill(4) + "-" + str(info[3])), secure_filename("Report" + str(info[1]).zfill(4)), ) # Documents elif dir_type == "d": pre = secure_filename("Document_" + str(info[1]).zfill(4)) # instruments, instrumentation journal entries elif dir_type == "i": pre = os.path.join( secure_filename("Instrument_" + str(info[1]).zfill(4) + "-" + str(info[2])), ) elif dir_type == "e": pre = os.path.join( secure_filename("Instrument_" + str(info[2]).zfill(4) + "-" + str(info[3])), secure_filename("Entry" + str(info[1]).zfill(4)), ) elif dir_type == "it": pre = os.path.join("Inventory", secure_filename("Item_" + str(info[1]).zfill(4))) else: abort( 400, description=": uploaddir_path : Resource not found or type unclear {dir_type}", ) # if only_sub_dir: return pre # path = os.path.normpath(os.path.join(current_app.config["UPLOAD_PATH"], pre)) current_app.logger.debug(f" uploaddir_path : {path}") return path
[docs]def make_file_list(path: str) -> list[str]: """ Make a list of the files in the directory and sort after extension. Parameters ---------- path : str Path to the files. Normally comes from files.uploaddir_path. Returns ------- list[str] List of files in the path to be shown (and handed to preview). """ files = [] if os.path.exists(path): files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] files.sort() files.sort(key=lambda f: os.path.splitext(f)[1]) return files
[docs]def make_image_url_list(path: str, prefix: str, img_1st: str = None, zip_files=False) -> list[str]: """ Creates a list of all image files in a given path and returns a preview url for it. Parameters ---------- prefix : str Static file prefix to be given to the send_preview function. path : str The real path to the files img_1st : str (Optional) Name of an image file toi be sorted to the top of the list. Default is None. zip_files : bool If true, zip files and url for later iteration. Default is False Returns ------- list[str] List of url to be given e.g. to send_preview to show the image. """ urls = [] if os.path.exists(path): files = [ os.path.join(prefix, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) ] imgs = fnmatch.filter(files, "*.gif") imgs.extend(fnmatch.filter(files, "*.jpg")) imgs.extend(fnmatch.filter(files, "*.jpeg")) # imgs.extend(fnmatch.filter(files, "*.tif")) imgs.extend(fnmatch.filter(files, "*.svg")) imgs.extend(fnmatch.filter(files, "*.png")) if platform.system() != "Windows": imgs.extend(fnmatch.filter(files, "*.JPG")) imgs.extend(fnmatch.filter(files, "*.JPEG")) # imgs.extend(fnmatch.filter(files, "*.TIF")) imgs.extend(fnmatch.filter(files, "*.SVG")) imgs.extend(fnmatch.filter(files, "*.PNG")) imgs.extend(fnmatch.filter(files, "*.GIF")) imgs.sort() current_app.logger.debug(f"Image list: {imgs}") if img_1st: img = os.path.join(prefix, secure_filename(img_1st)) imgs.insert(0, imgs.pop(imgs.index(img))) urls = [url_for("files.send_preview", filename=img) for img in imgs] files = [os.path.split(f)[1] for f in imgs] if zip_files: return zip(urls, files) return urls
[docs]def async_tif_covert(tif_file: str) -> None: """ Make a jpg file for a uploaded tif file. Parameters ---------- tif_file : str File name Returns ------- None """ current_app.logger.debug(tif_file) f_name, _ = os.path.splitext(tif_file) outfile = f_name + ".jpg" if tif_file != outfile: try: with Image.open(tif_file) as image: image.save(outfile) except OSError: flash(f"Cannot convert {tif_file}", "alert-danger")
[docs]@bp.route("/<list:info>/upload_files", methods=["POST"]) @login_required def upload_files(info: List) -> Response: """ Upload files from user to Sample/Measurement/Report directory. Responses are changed to work with dropzone. So, make_response is used and not abort. Parameters ---------- info : List List of information needed, e.g. kind of directory (ESS, PPM, report) and ids etc. Returns ------- Flask.Response Redirect to view uploading the file. """ path = uploaddir_path(info) uploaded_file = request.files["file"] if uploaded_file.filename is None: current_app.logger.info(": upload_file : File not found in arg") return make_response("File not found", 404) # assert uploaded_file.filename is not None filename = secure_filename(uploaded_file.filename) if filename != "": file_ext = os.path.splitext(filename)[1] if file_ext in current_app.config["FORBIDDEN_EXTENSIONS"]: current_app.logger.info( f": upload_file : Forbidden file extension by user {g.user.id} ({g.user.name})" ) return make_response("Forbidden file extension", 415) if not os.path.exists(path): try: os.makedirs(path) except OSError: current_app.logger.debug(f": upload_file : Try to make {path} and failed.") uploaded_file.save(os.path.join(path, filename)) else: current_app.logger.info(": upload file : File not found") return make_response("File not found", 404) if os.path.splitext(filename)[1] in [".tif", ".TIF", ".TIFF", ".tiff"]: flash("Auto create a .jpg version of file", "alert-info") Thread(target=async_tif_covert, args=(os.path.join(path, filename),)).start() return make_response("Should be fine", 200)
[docs]@bp.route("/upload_json", methods=["POST"]) @login_required def upload_json() -> Response: """ Upload a JSON file containing the information to create a sample. Re-importing the JSOn that the export fucntion wrote. We do little checking and assume that the is not any bad data in the JSON. But the identifier does through secure_file and long description is treated by bleach. Responses are changed to work with dropzone. So, make_response is used and not abort. Returns ------- """ current_app.logger.debug(" htmx : Process uploaded json") uploaded_file = request.files["file"] if uploaded_file.filename is None: current_app.logger.info("File not found") return make_response("File not found", 404) filename = secure_filename(uploaded_file.filename) uppath = os.path.join(current_app.config["UPLOAD_PATH"], "json_upload") if filename != "": file_ext = os.path.splitext(filename)[1] if file_ext not in [".JSON", ".json"]: current_app.logger.info(": upload_json : Forbidden file extension") return make_response("Forbidden file extension", 415) if not os.path.exists(uppath): os.makedirs(uppath) uploaded_file.save(os.path.join(uppath, filename)) else: current_app.logger.info(": htmx_upload_json : File not found") return make_response("File not found", 404) with open(os.path.join(uppath, filename), "r", encoding="utf-8") as file: try: ess_dict = current_app.json.load(file) except JSONDecodeError: return make_response(f"Not an json file or could not decode", 415) if "identifier" in list(ess_dict.keys()): ess_dict["creator_id"] = g.user.id ess_dict["group_id"] = g.user.group_id sid = dbt.create_sample(db, ess_dict) for ppm in ess_dict["ppm_list"]: ppm["creator_id"] = g.user.id ppm["mtype_id"] = 1 ppm["instrument_id"] = None ppm["sample_id"] = sid mid = dbt.create_measurement(db, ppm) for report in ppm["report_list"]: report["creator_id"] = g.user.id report["sample_id"] = sid report["measurement_id"] = mid rid = dbt.create_report(db, report) else: current_app.logger.info("Dict has the wrong form") return make_response("Dict has the wrong form", 415) current_app.logger.debug(" htmx : Done") return make_response("Should be fine", 200)
[docs]@bp.route("/send_upload/<list:info>") @login_required def send_upload(info: List) -> Response: """ Send uploaded file to user. Parameters ---------- info : List List of information needed, e.g. kind of directory (ESS, PPM, report) and ids etc. Returns ------- Flask.Response Send the file as attachment to user. """ file = request.args.get("file") path = uploaddir_path(info) if file is None: abort( 404, description=f"file : send_upload : File is None in call to function for user {g.user}" ) return send_from_directory(path, file, as_attachment=True, download_name=file)
[docs]@bp.route("/send_preview/<path:filename>") @login_required def send_preview(filename: str) -> Response: """ Send a file in the upload directory. For sending files in the UPLOAD dir, we implemented a send_file function that checks, if the user is signed in. Later, we could also restrict access too files to directories that the user has access. As dirs and filenames are hard to guess, we do not do this at the current state. Due to the behavior of send_from_directory, we create first the full path and then split it into filename and directory. By adding UPLOAD dir as default, no files from other directories can be sent (path strings and filename are joined by saFe-Join to avoid dir escapes or directory transversal). Parameters ---------- filename : str File name to the file to be sent as preview. Can include a path. Returns ------- Flask.Response File to be sent as preview. Normally, this should be an image to be displayed either in image preview or markup text. """ if not ( filename.startswith("Sample") or filename.startswith("Instrument") or filename.startswith("Document") or filename.startswith("Inventory") ): current_app.logger.warning(f": file.send_preview : {g.user} tried to send {filename}") return make_response("Forbidden", 403) if not os.path.isfile(os.path.normpath(os.path.join(current_app.config["UPLOAD_PATH"], filename))): current_app.logger.info(f": file.send_preview : {g.user} tried to send not existing {filename}") return make_response("Not found.", 404) path, file = os.path.split( os.path.normpath(os.path.join(current_app.config["UPLOAD_PATH"], filename)) ) return send_from_directory(path, file)
[docs]@bp.route("/delete_upload/<list:info>") @login_required def delete_upload(info: List) -> Response: """ Delete uploaded file. Parameters ---------- info : List List of information needed, e.g. kind of directory (ESS, PPM, report) and ids etc. Returns ------- Flask.Response Go back to original view. """ file = request.args.get("file") path = uploaddir_path(info) if file is None: abort(404, description=f"file : delete_upload : File None call to function for user {g.user}") try: os.remove(os.path.join(path, file)) flash(f"Deleted {file}", "alert-success") except OSError as error: abort( 500, description=f": delete_upload : Failed to delete file {file} in {path}." + f" Exception: {str(error)}", ) return redirect_to_view(view_type=info[0], vid=info[1])
def _prepare_download(root: str, path: str): """ Create a zip file of directory path in root. Parameters ---------- root : str Root dir to create the file in path : str Sub directory to be zipped and the file later moved to. Returns ------- """ shutil.make_archive(os.path.join(root, "download"), "zip", path) shutil.move( os.path.join(root, "download.zip"), os.path.join(root, path, "download.zip"), )
[docs]@bp.route("/download/<list:info>", methods=["POST"]) @login_required def download(info: List) -> Response: """ Download all files in a Sample/Measurement/Report directory uploaded before. Function is broken and leads to time-out for large directories Parameters ---------- info : List First entry defines the type (sample,measurement or report). The following members of the list are sample id and identifier as well as measurement id and report id, respectively. Returns ------- Flask.Response Sends zip file from directory. """ path = uploaddir_path(info) flash("Download is prepared. Refresh page after a while and use download.zip", "alert-info") Thread(target=_prepare_download, args=(current_app.config["UPLOAD_PATH"], path)).start() return redirect_to_view(view_type=info[0], vid=info[1])
######################################################################################################## # This reproduces the same function as preview.redirect_home. The two functions should be merged. # Maybe move to util therefore ########################################################################################################
[docs]def redirect_to_view(view_type: str, vid: int): """ Create a return url depending on the type of directory or view. Parameters ---------- view_type : str Denote the type of view to return to vid : int ID of the view item to return to Returns ------- Response """ match view_type: case "s": response = redirect(url_for("samples.sample_view", sid=int(vid))) case "m": response = redirect(url_for("measurements.measurement_view", mid=int(vid))) case "r": response = redirect(url_for("report.report_view", rid=int(vid))) case "d": doc = db.session.get(Documents, int(vid)) response = redirect(url_for("documents.show", label=doc.label)) case "i": response = redirect(url_for("instruments.instrument_view", iid=int(vid))) case "e": response = redirect(url_for("entry.entry_view", eid=int(vid))) case "it": response = redirect(url_for("inventory.item_view", iid=int(vid))) case _: response = make_response("Not found", 404) return response