Source code for ResearchNotes.documents

# -*- coding: utf-8 -*-
"""
All documentation related functions/views go here.

Basically, all the wiki functionality is defined here.
"""

import os

import typing
from typing import Tuple, Dict, List

from flask import (
    Blueprint,
    flash,
    redirect,
    request,
    url_for,
    current_app,
    abort,
    send_from_directory,
    g,
    render_template,
    session as login_session,
)

from werkzeug.utils import secure_filename
from werkzeug.wrappers.response import Response

from sqlalchemy import desc

import ResearchNotes.database_transactions as dbt
from ResearchNotes.database import db, User, Documents
from ResearchNotes.form import DocCreateForm
from ResearchNotes.auth import login_required, role_required
from ResearchNotes.files import make_file_list, uploaddir_path

# import ResearchNotes.database_transactions as dbt

# visited_label = None

bp = Blueprint("documents", __name__, url_prefix="/documents")


[docs]@bp.route("/<list:label>/create", methods=("GET", "POST")) @login_required def create(label: typing.List) -> typing.Union[str, Response]: """ Create document entry. We do a quick check, if document exist and in case do an internal server error abort (abort(500)). Parameters ---------- label : List List containing the parent id (p_id) and the label of document (e_label). Returns ------- str, Flask.Response Redirects to document. show or shows the creation form. """ p_id, d_label = label doc = db.session.execute( db.select(Documents).filter_by(label=d_label, group_id=g.user.group_id) ).scalar() if doc is not None: flash("Document already exits", "alert-warning") abort(500, description=f" Create failure : User {g.user} tried to recreate document {label}") form = DocCreateForm( d_label, data={ "label": d_label, "title": d_label, "body": "", }, ) if form.validate_on_submit(): dbt.create_document( db, { "label": form.label.data, "title": form.title.data, "body": form.body.data, "group_id": g.user.group_id, "creator_id": g.user.id, "updatetor_id": g.user.id, }, p_id, ) flash("Document created.", "alert-info") if form.label.data != d_label: flash( "Document label was changed. It is not in your text body and you have to change " + f"the original label [[{d_label}]]!", "alert-warning", ) return redirect(url_for("documents.show", label=form.label.data, p_id=p_id)) return render_template("docs/create.html", form=form, label=label)
[docs]def get_doc(label: str, check_author: bool = True) -> Documents: """ Get a single Document (Wiki page) from the database. Parameters ---------- label : Str Name of the document in the database. check_author : TYPE, optional DESCRIPTION. The default is True. Returns ------- doc : Documents Database entry of the document. """ doc = db.first_or_404( db.select(Documents).filter_by(label=label, group_id=g.user.group_id), description=f"User {g.user} tried to load not existing document", ) # That check here should not be needed. Index is available per grop and search is happening on # per group id. if check_author and int(doc.group_id) != g.user.group_id and label != "index": abort( 403, description=f" : Authorization failure. User {g.user} tried to load document {label}", ) return doc
[docs]@bp.route("/<list:label>/update", methods=("GET", "POST")) @login_required def update(label: typing.List) -> typing.Union[str, Response]: """ Update document entry. Parameters ---------- label : List List containing the parent id (p_id) and the label of document (e_label). Returns ------- str, Flask.Response Redirects to document. show or shows the creation form. """ p_id, d_label = label doc = get_doc(d_label) form = DocCreateForm(d_label, obj=doc) if form.validate_on_submit(): dbt.update_document( db, doc, { "label": form.label.data, "title": form.title.data, "body": form.body.data, "updator": g.user.id, }, ) flash(f"Document {doc.label} updated", "alert-info") if form.label.data != d_label: flash( "Document label was changed. It is not in your text body and you have to change" + f" the original label [[{d_label}]]!", "alert-warning", ) return redirect(url_for("documents.show", label=form.label.data, p_id=p_id)) return render_template("docs/create.html", form=form, doc=doc, label=label)
[docs]@bp.route("/<string:label>/show") @login_required def show(label: str, check_author: bool = True) -> typing.Union[str, Response]: """ Display as well as create documents. If the document is not found but implemented as link in an existing document, we create it if first clicked. Parameters ---------- label : str Label of the document to be shown (or None, if index). check_author : bool, optional If ture access is checked. The default is True. Returns ------- str|Flask.Response Render a Documents class database entry. """ # f_id,e_label=label e_label = label pid = request.args.get("p_id", 0) urls = login_session["urls"] # Handel our index label that is special. Every group has its own index and the index label is # rewritten. # if e_label not in urls: urls.append(e_label) else: u_index = urls.index(e_label) del urls[u_index + 1 :] if label == "index": e_label = g.user.group_member.name + "_index" urls = [e_label] login_session["urls"] = urls # Try to load the doc. If the result is None, we have to check, if it is the index and create it # as index. If it comes without pid, we redirect to index. If it does not exist but can be created # inside the wiki, we redirect to the create view. # doc = db.session.execute( db.select(Documents).filter_by(label=e_label, group_id=g.user.group_id) ).scalar() if doc is None: if label == "index": flash("Documentation index not found. Creating it.", "alert-info") dbt.create_index( db, e_label, {"group": g.user.group_id, "group_name": g.user.group_member.name, "user_id": g.user.id}, ) return redirect(url_for("documents.show", label="index")) if pid == 0: flash( "Documents can only be created from an existing Document page", "alert-info", ) return redirect(url_for("documents.show", label="index")) return redirect(url_for("documents.create", label=[pid, e_label])) # # Check, if user has the right to see document as it is not picked over doc_get() if check_author and int(doc.group_id) != g.user.group_id: abort( 403, description=f": Authorization failure. {g.user} tried to load document {label}", ) # # Pylint error and correct. We have to see, if this is solved g.pid = doc.id # pylint: disable=assigning-non-slot back = "index" creator = db.session.get(User, doc.creator_id).UserName updator = db.session.get(User, doc.updatetor_id).UserName files = make_file_list( uploaddir_path( [ "d", doc.id, ] ) ) if pid != 0: p_doc = db.session.get(Documents, pid) back = p_doc.label # If not child of the called document add parent but make sure, we do # not make child and parent same document. if (not p_doc.is_child(doc)) and (int(pid) != int(doc.id)): current_app.logger.debug(f"Creating a child for {doc.id} to {pid}") with dbt.Transaction(db): p_doc.add_child(doc) tree, child_dict = _create_tree() return render_template( "docs/show_new_layout.html", tree=tree, urls=urls, doc=doc, back=back, creator=creator, update=updator, files=files, )
[docs]def export_doc_text(doc: Documents) -> str: """ Create a Markdown string for a document for later export. Parameters ---------- doc : Documents Entry of Documents to create string for. Returns ------- str Markdown string. """ summery = f"# {doc.title} \n" summery += ( f'by *{ doc.creator_id }* created *{ doc.created.strftime("%Y-%m-%d") }*. ' + f'Last updated by *{ doc.updatetor_id }* in *{ doc.updated.strftime("%Y-%m-%d") }*.' ) summery += "\n \n" summery += doc.body.replace("\r\n", "\n") summery.encode("utf-8") return summery
[docs]@bp.route("/<string:label>/download") @login_required def download(label: str) -> Response: """ Download a given document as a Markdown text file. This is a good way, e.g. for archiving it or sharing. Parameters ---------- label : str Label of the document to download. Returns ------- Flask.Response Sends file as attachment to user. """ doc = get_doc(label) summery = export_doc_text(doc) with open( os.path.join(current_app.config["UPLOAD_PATH"], secure_filename(doc.label + ".md")), "w", encoding="utf-8", ) as out_file: out_file.write(summery) return send_from_directory( current_app.config["UPLOAD_PATH"], secure_filename(doc.label + ".md"), as_attachment=True, download_name=secure_filename(doc.label + ".md"), )
def _create_tree(for_menu: bool = True) -> tuple[str, dict[str, list[tuple[int, str]]]]: """ Create entries for the tree view on the side of the menu in the diocument view. Parameters ---------- for_menu : bool Tell if the tree is used for the side menu or not. Returns ------- """ child_dict = {} # type: typing.Dict[str, typing.List[typing.Tuple[int,str]]] parent_dict = {} # type: typing.Dict def r_tree(index: str, label: str, depth: int) -> str: """ Recursive function that calls its self to go though the childs of a given parent side. Parameters ---------- index : str String of the database id of the document label : str Label of the document depth : int Increase which each recursive back call. Returns ------- """ r_dict = child_dict.pop(str(index)) rstr = f"<li> <a href={url_for('documents.show', label=label)} > {label}</a> </li>\n" if len(r_dict) == 0: # No children return rstr else: # Children and go through children entries rstr += "<ol>" for d_index, d_label in r_dict: if str(d_index) in child_dict: rstr += f"{r_tree(d_index,d_label,depth+1)}\n" else: rstr += f"<li> <a href={url_for('documents.show', label=d_label)} > {d_label}</a> </li> \n" rstr += "</ol>" return rstr docs = db.session.execute(db.select(Documents).filter_by(group_id=g.user.group_id)).scalars() for doc in docs: child_dict[str(doc.id)] = [(c.id, c.label) for c in doc.children] parent_dict[str(doc.label)] = (doc.id, doc.label) start, start_label = parent_dict[str(g.user.group_member.name + "_index")] body = f"{r_tree(start, start_label, depth=0)}" current_app.logger.debug(f"Child dict for group {child_dict}") if len(child_dict) != 0: flash(f"Group {g.user.group_member.name} seem to have unreferenced documents.", "alert-warning") return body, child_dict
[docs]@bp.route("/tree") @login_required def docs_tree() -> str: """ Display the document tree, i.e. all documents linked to and below this document. Returns ------- str Returns the document tree view for the group of the user. """ login_session["urls"] = [] body, child_dict = _create_tree(for_menu=False) return render_template("docs/tree.html", body=body, child_dict=child_dict)
[docs]@bp.route("<int:d_id>/remove_child") @login_required def remove_child(d_id: int) -> Response: """ Remove a child from the child list, if there is more than one parent around. This still can lead to orphans, but should minimize the risk. Parameters ---------- d_id : int ID of the document to be removed. As parameter, we also have to pass the parent doc.id Returns ------- Flask.Response Redirect back to parent page. """ pid = request.args.get("p_id", 0) if pid == 0: abort(500, description="Failed to provide a parent document") doc = db.get_or_404( Documents, d_id, description=f" sample.get_sample : {g.user} tried to load sample {d_id}" + " which does not exist", ) p_doc = db.get_or_404( Documents, pid, description=f" sample.get_sample : {g.user} tried to load sample {pid}" + " which does not exist", ) parent_dict = [(p.id, p.label) for p in doc.parent_docs] if len(parent_dict) > 1: flash(f"{doc.label} was removed from {p_doc.label} as child.", "alert-info") with dbt.Transaction(db): p_doc.remove_child(doc) return redirect(url_for("documents.show", label=p_doc.label)) flash( f"Document {doc.label} cannot be removed as {p_doc.label} is the only parent.", "alert-warning" ) return redirect(url_for("documents.show", label=p_doc.label))
[docs]@bp.route("<string:label>/delete") @role_required(["Supervisor"]) def delete(label: str) -> Response: """ Delete a document from the database. The document to be deleted is not allowed to have any children to not create orphan pages that are not linked. Parameters ---------- label : str Document label of document to be deleted. Returns ------- Flask.Response Redirect to 'doc.index' """ doc = get_doc(label) if doc.children.count() > 0: flash("A document with children cannot be deleted.", "alert-warning") return redirect(url_for("documents.show", label=doc.label)) with dbt.Transaction(db) as db_session: db_session.delete(doc) return redirect(url_for("documents.show", label="index"))
[docs]def last_updates() -> Documents: """ Return the database records of the last updated documents. Needed for the index page at sign in. Returns ------- Documents Last five database entries of Documents ordered after update. """ return db.session.execute( db.select(Documents).filter_by(group_id=g.user.group_id).order_by(desc("updated")).limit(5) ).scalars()