Source code for ResearchNotes.cli

# -*- coding: utf-8 -*-
"""
Module to take command line instructions. We later register them as click commands.

This module should not contain any views or calls to web functions. For some reason,
functions are not found by sphinx autodoc - most likely has to do with the cli.command
decorator.
"""

# import csv
import datetime
import os
import typing
import re
import dataclasses
import pprint

import click

from flask import current_app, Blueprint

from werkzeug.utils import secure_filename

# from flask.cli import with_appcontext
# from flask.json import dump, load

from ResearchNotes.database import (
    db,
    User,
    Groups,
    Samples,
    MeasurementType,
    Measurements,
    Role,
    ParentLocations,
    Locations,
    Reports,
    Documents,
    TemplateSamples,
    TemplateMeasurements,
    Instrument,
    InstrumentationJournalEntry,
    TemplateInstrument,
    TemplateInstrumentationJournalEntry,
    EntryType,
    Items,
    ItemType,
    ItemsTemplate,
    ItemsCheckOut,
)

# from ResearchNotes.search import add_to_index, query_index

import ResearchNotes.database_transactions as dbt


# bp = Blueprint("main", __name__, url_prefix="")

# =============================================================================
# Init database commands
# =============================================================================

bp = Blueprint("cli_commands", __name__)


@bp.cli.command("init_db")
def init_db():
    """
    Create the databases needed.

    Init the database by deleting exiting tables, creating all tables and
    filling in the minimum amount of fields needed (like users, roles, Measurement type none etc.)

    Returns
    -------
    None.

    """
    click.echo("Creating databases ...")

    db.drop_all()
    db.create_all()

    dbt.create_roles(db)
    dbt.create_admin(db)
    # dbt.create_index(db)

    click.echo("Initialized the database.")


# ===============================================================================
# Search related stuff
# ===============================================================================


@bp.cli.command("update_search_index")
def update_search_index():
    """Reindex all relevant entries to the search engine"""

    Samples.reindex()
    Measurements.reindex()
    Reports.reindex()
    Documents.reindex()


# =============================================================================
# Function to database maintenance
# =============================================================================


@bp.cli.command("database_maintenance")
def database_maintenance():
    """
    Clean and correct entries in the Document database.

    We check, if doc is child of itself. We should do an orphan check (all documents need to be linked to
    other document except the 'index' document).

    Returns
    -------
    None.

    """
    click.echo("Checking Document database ...")
    docs = db.session.execute(db.select(Documents)).scalars()

    click.echo("    Checking for self-reference ...")
    for doc in docs:
        if doc.is_child(doc):
            click.echo(f"{doc} is its own child! Fixing")
            with dbt.Transaction(db):
                doc.remove_child(doc)
    #
    click.echo("     Checking for orphans ...")
    for doc in docs:
        parent_dict = [(p.id, p.label) for p in doc.parent_docs]
        if len(parent_dict) == 0:
            click.echo(f"{doc} is orphan")
            if doc.label.endswith("_index"):
                click.echo("        But it is an index page, therefor ok.")
            else:
                pass
        if doc.id == 1 and doc.label == "index" and doc.group_id == 1:
            click.echo(f"           {doc} is the old index page - it will be removed.")
            with dbt.Transaction(db) as db_session:
                db_session.delete(doc)

    click.echo("Checking if all roles exist ...")

    role_name = ["admin", "Supervisor", "Student", "ExStudent", "StudentAdmin"]

    for role in role_name:
        found = db.session.execute(db.select(Role).filter_by(name=role)).scalar()
        if found is not None:
            click.echo(f"   Found {found.name} with ID: {found.id}")
        else:
            click.echo(f"   Did not find role: {role}. Creating it.")
            with dbt.Transaction(db) as db_session:
                db_session.add(Role(name=role, description="Missing role by database_maintenance"))

        if role == "admin":
            if found.members.scalar() is None:
                click.echo("    Did not find an administrator. Create one.")
                dbt.create_admin(db)
            else:
                click.echo("    Found at minimum one active administrator.")

    click.echo(
        "Checking if needed entries for location, measurement type, journal entry type as ell as EES and PPM template exist ..."
    )

    required_entry = db.session.get(ParentLocations, 1)
    if required_entry is None:
        click.echo("    Required Location entry does not exist. Recreating")
        with dbt.Transaction(db) as db_session:
            db_session.add(ParentLocations(name="Not defined", group_id=1))

    click.echo("    Scanning for Location without parent")
    found = db.session.execute(db.select(Locations).filter_by(parent_location_id=None)).scalars()
    for entry in found:
        click.echo(f"    Locations {entry.name} without parent. Setting it to 'Not Defined'.")
        with dbt.Transaction(db):
            entry.parent_location_id = 1

    click.echo("    Rename old Location 'Not defined' to 'None'")
    entry = db.session.get(ParentLocations, 1)
    if entry.name != "Not defined":
        click.echo(f"    Rename Parent location 1 to 'Not Defined'.")
        with dbt.Transaction(db):
            entry.name = "Not defined"

    if "Not defined" in [l.name for l in entry.sublocations]:
        loc = db.session.get(Locations, 1)
        click.echo(f"    Rename location 1 to 'None'.")
        with dbt.Transaction(db):
            loc.name = "None"
            loc.group_id = 1
            loc.parent_location_id = 1

    click.echo("Checking for wrong instrument_id entries in PPMs ...")
    for entry in db.session.execute(db.select(Measurements)).scalars():
        # click.echo(f"{entry} with instrument_id: {entry.instrument_id}")
        if entry.instrument_id and db.session.get(Instrument, entry.instrument_id) is None:
            click.echo(f"Upsi for entry {entry} with instrument_id: {entry.instrument_id}. Fixing.")
            with dbt.Transaction(db):
                entry.instrument_id = None

        if entry.instrument_id == 0:
            click.echo(f"Upsi for entry {entry} with instrument_id: {entry.instrument_id}. Fixing.")
            with dbt.Transaction(db):
                entry.instrument_id = None

    # required_entry = db.session.get(MeasurementType, 1)
    # if (required_entry.name != "None") or (required_entry is None):
    #     click.echo(f"   Required MeasurementType entry does not exist. Recreating")


@bp.cli.command("dump_database_json")
# @with_appcontext
def dump_database_json():
    """
    Dump databases into a json file for saving and/or reimporting.

    We only dump the important stuff - so, some information will get lost

    Returns
    -------
    None.

    """
    current_app.logger.info(": Dumping databases in to json")
    click.echo("Dumping the database to json.")

    dump_path = os.path.join(current_app.config["UPLOAD_PATH"], "database_dump")
    if not os.path.exists(dump_path):
        os.makedirs(dump_path)

    file_list = [
        "group.json",
        "measurementtype.json",
        "role.json",
        "ParentLocations.json",
        "locations.json",
        "user.json",
        "samples.json",
        "measurements.json",
        "reports.json",
        "documents.json",
        "temp_samples.json",
        "temp_measurements.json",
        "instrument.json",
        "InstrumentationJournalEntry.json",
        "TemplateInstrument.json",
        "TemplateInstrumentationJournalEntry.json",
        "EntryType.json",
        "Items.json",
        "ItemType.json",
        "ItemsTemplate.json",
        "ItemsCheckOut.json",
    ]
    dbmodel_list = [
        Groups,
        MeasurementType,
        Role,
        ParentLocations,
        Locations,
        User,
        Samples,
        Measurements,
        Reports,
        Documents,
        TemplateSamples,
        TemplateMeasurements,
        Instrument,
        InstrumentationJournalEntry,
        TemplateInstrument,
        TemplateInstrumentationJournalEntry,
        EntryType,
        Items,
        ItemType,
        ItemsTemplate,
        ItemsCheckOut,
    ]

    with click.progressbar(zip(file_list, dbmodel_list)) as p_bar:
        for f_name, m_name in p_bar:

            with open(os.path.join(dump_path, f_name), "w", encoding="utf-8") as file:

                to_dump = [
                    dataclasses.asdict(obj) for obj in db.session.execute(db.select(m_name)).scalars()
                ]
                current_app.json.dump(to_dump, file)

    shared_sample_list = [
        {str(sample.id): sample.sharedsamplelist}
        for sample in db.session.execute(db.select(Samples)).scalars()
        if sample.is_shared
    ]
    with open(os.path.join(dump_path, "shared_sample.json"), "w", encoding="utf-8") as file:
        click.echo("Dumping shared samples")
        current_app.json.dump(shared_sample_list, file)

    doc_child_list = [
        {str(doc.id): list(doc.childrenlist)}
        for doc in db.session.execute(db.select(Documents)).scalars()
        if len(doc.childrenlist) > 0
    ]
    with open(os.path.join(dump_path, "doc_children.json"), "w", encoding="utf-8") as file:
        click.echo("Dumping document relations")
        current_app.json.dump(doc_child_list, file)


[docs]def check_datetime(obj: str) -> typing.Union[str, datetime.datetime]: """ Convert a string to a datetime object. If fail (ValueError) return the string. Parameters ---------- obj : str String to be converted should follow the fingerprint: "%a, %d %b %Y %H:%M:%S %Z". This is the current format of a json dump Returns ------- str|datetime If succeed, return the datetime object otherwise the original string """ try: r_obj = datetime.datetime.strptime(obj, "%a, %d %b %Y %H:%M:%S %Z") except ValueError: r_obj = obj return r_obj
@bp.cli.command("load_database_json") def load_database_json(): """ Import database from json dump. Will look for a json dump of the database and reimport it. Will delete all existing entries in database. Returns ------- None. """ current_app.logger.info(": Loading databases from json") click.echo( "Loading the database from json. This will reinitialize the database and" + " all existing data will be overwritten!" ) if click.confirm("Are you sure?"): dump_path = os.path.join(current_app.config["UPLOAD_PATH"], "database_dump") db.drop_all() db.create_all() file_list = [ "group.json", "measurementtype.json", "role.json", "ParentLocations.json", "locations.json", "user.json", "samples.json", "measurements.json", "reports.json", "documents.json", "temp_samples.json", "temp_measurements.json", "instrument.json", "InstrumentationJournalEntry.json", "TemplateInstrument.json", "TemplateInstrumentationJournalEntry.json", "EntryType.json", "Items.json", "ItemType.json", "ItemsTemplate.json", "ItemsCheckOut.json", ] dbmodel_list = [ Groups, MeasurementType, Role, ParentLocations, Locations, User, Samples, Measurements, Reports, Documents, TemplateSamples, TemplateMeasurements, Instrument, InstrumentationJournalEntry, TemplateInstrument, TemplateInstrumentationJournalEntry, EntryType, Items, ItemType, ItemsTemplate, ItemsCheckOut, ] with click.progressbar(zip(file_list, dbmodel_list)) as p_bar: for j_file, db_model in p_bar: with open(os.path.join(dump_path, j_file), "r", encoding="utf-8") as file: entries = current_app.json.load(file) for entry in entries: for key, value in entry.items(): if isinstance(value, str) and re.search(" GMT", value): entry[key] = check_datetime(value) with dbt.Transaction(db) as db_session: db_session.add(db_model(**entry)) with open(os.path.join(dump_path, "shared_sample.json"), "r", encoding="utf-8") as file: shared_sample_list = current_app.json.load(file) for shared_sample in shared_sample_list: for key in shared_sample: sample = db.session.get(Samples, int(key)) for uid in shared_sample[key]: with dbt.Transaction(db): user = db.session.get(User, uid) sample.sharedsamples.append(user) with open(os.path.join(dump_path, "doc_children.json"), "r", encoding="utf-8") as file: doc_child_list = current_app.json.load(file) for doc_child in doc_child_list: for key in doc_child: doc_p = db.session.get(Documents, int(key)) for did in doc_child[key]: with dbt.Transaction(db): doc_c = db.session.get(Documents, did) doc_p.add_child(doc_c) @bp.cli.command("dump_meilisearch") # @with_appcontext def dump_meilisearch() -> None: """ Dumps out the meilisearch database. needed for updating from one version to next. Returns ------- None """ if current_app.config["MEILISEARCH_URL"]: current_app.logger.info(": Dumping meilisearch database out") click.echo("Following search indexes exists") click.echo(pprint.pformat(current_app.meilisearch.get_all_stats())) click.echo("Create dump for Meilisearch database") click.echo(pprint.pformat(current_app.meilisearch.create_dump())) else: click.echo("Meilisearch not active or configured") @bp.cli.command("fix_filenames") # @with_appcontext def fix_filenames() -> None: """ Fixes the filenames in the UPLOAD_PATH to some secure filename. Returns ------- None """ for subdir, dirs, files in os.walk(current_app.config["UPLOAD_PATH"]): for file in files: click.echo( f"file: {os.path.join(subdir, file)} will be renamed to {os.path.join(subdir, secure_filename(file))}" ) os.rename(os.path.join(subdir, file), os.path.join(subdir, secure_filename(file)))