commit 6c76b47f040ae7d3c7e8752cf8939fa0e4f487aa Author: Sudo-JHare Date: Thu Apr 10 14:47:33 2025 +1000 Add files via upload diff --git a/Database setup.md b/Database setup.md new file mode 100644 index 0000000..7dbb4f7 --- /dev/null +++ b/Database setup.md @@ -0,0 +1,45 @@ +we need to create the actual database file and the user table inside it. We use Flask-Migrate for this. These commands need to be run inside the running container. + +Find your container ID (if you don't know it): + +Bash + +docker ps +(Copy the CONTAINER ID or Name for webapp-base). + +Initialize the Migration Repository (ONLY RUN ONCE EVER): This creates a migrations folder in your project (inside the container, and locally if you map volumes later, but the command runs inside). + +Bash + +docker exec flask db init +(Replace ) + +Create the First Migration Script: Flask-Migrate compares your models (app/models.py) to the (non-existent) database and generates a script to create the necessary tables. + +Bash + +docker exec flask db migrate -m "Initial migration; create user table." +(You can change the message after -m). This creates a script file inside the migrations/versions/ directory. + +Apply the Migration to the Database: This runs the script generated in the previous step, actually creating the app.db file (in /app/instance/) and the user table inside it. + +Bash + +docker exec flask db upgrade +After running these docker exec flask db ... commands, you should have a migrations folder in your project root locally (because it was created by code running inside the container using the project files mounted/copied) and an app.db file inside the /app/instance/ directory within the container. + +Your database is now set up with a user table! + + +flask db init +flask db migrate -m "Initial migration; create user table." +flask db upgrade + +flask db migrate -m "Add role column to user table" +flask db upgrade + +flask db migrate -m "Add ModuleRegistry table" +flask db upgrade + +flask db migrate -m "Add ProcessedIg table" +flask db upgrade \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..579885d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +# 1. Base Image: Use an official Python runtime as a parent image +FROM python:3.10-slim AS base + +# Set environment variables +# Prevents python creating .pyc files +ENV PYTHONDONTWRITEBYTECODE=1 +# Prevents python buffering stdout/stderr +ENV PYTHONUNBUFFERED=1 + +# 2. Set Work Directory: Create and set the working directory in the container +WORKDIR /app + +# 3. Install Dependencies: Copy only the requirements file first to leverage Docker cache +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# 4. Copy Application Code: Copy the rest of your application code into the work directory +COPY . . + +# 5. Expose Port: Tell Docker the container listens on port 5000 +EXPOSE 5000 + +# 6. Run Command: Specify the command to run when the container starts +# Using "flask run --host=0.0.0.0" makes the app accessible from outside the container +# Note: FLASK_APP should be set, often via ENV or run.py structure +# Note: For development, FLASK_DEBUG=1 might be useful (e.g., ENV FLASK_DEBUG=1) +CMD ["flask", "run", "--host=0.0.0.0"] \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e55779c --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,33 @@ +# app/__init__.py + +import datetime +import os +import importlib +import logging +from flask import Flask, render_template, Blueprint, current_app +from config import Config +from flask_sqlalchemy import SQLAlchemy +from flask_migrate import Migrate +from flask_login import LoginManager + +# Instantiate Extensions +db = SQLAlchemy() +migrate = Migrate() + +def create_app(config_class='config.DevelopmentConfig'): + app = Flask(__name__) + app.config.from_object(config_class) + + db.init_app(app) + migrate.init_app(app, db) + + from app.fhir_ig_importer import bp as fhir_ig_importer_bp + app.register_blueprint(fhir_ig_importer_bp) + + @app.route('/') + def index(): + return render_template('index.html') + + return app + +from app import models \ No newline at end of file diff --git a/app/auth/__init__.py b/app/auth/__init__.py new file mode 100644 index 0000000..84c30c6 --- /dev/null +++ b/app/auth/__init__.py @@ -0,0 +1,8 @@ +# app/auth/__init__.py +from flask import Blueprint + +# Define the auth blueprint +bp = Blueprint('auth', __name__, template_folder='templates') + +# Import routes at the bottom +from app.auth import routes \ No newline at end of file diff --git a/app/auth/forms.py b/app/auth/forms.py new file mode 100644 index 0000000..ae89c0b --- /dev/null +++ b/app/auth/forms.py @@ -0,0 +1,13 @@ +# app/auth/forms.py +from flask_wtf import FlaskForm +from wtforms import StringField, PasswordField, BooleanField, SubmitField +from wtforms.validators import DataRequired + +# Moved from control_panel forms +class LoginForm(FlaskForm): + username = StringField('Username', validators=[DataRequired()]) + password = PasswordField('Password', validators=[DataRequired()]) + remember_me = BooleanField('Remember Me') + submit = SubmitField('Sign In') + +# Add RegistrationForm, ResetPasswordRequestForm etc. here later \ No newline at end of file diff --git a/app/auth/routes.py b/app/auth/routes.py new file mode 100644 index 0000000..06ca0af --- /dev/null +++ b/app/auth/routes.py @@ -0,0 +1,59 @@ +# app/auth/routes.py +from flask import render_template, flash, redirect, url_for, request +from flask_login import current_user, login_user, logout_user # Keep current_user for checking auth status +from app import db +from app.models import User +from app.auth import bp # Import the auth blueprint +from .forms import LoginForm # Import LoginForm from within auth blueprint + +@bp.route('/login', methods=['GET', 'POST']) +def login(): + if current_user.is_authenticated: + # Redirect authenticated users away from login page + # Maybe check role here too? Or just send to core index. + if current_user.role == 'admin': + return redirect(url_for('control_panel.index')) + else: + return redirect(url_for('core.index')) + + form = LoginForm() + if form.validate_on_submit(): + user = User.query.filter_by(username=form.username.data).first() + if user is None or not user.check_password(form.password.data): + flash('Invalid username or password', 'danger') + return redirect(url_for('auth.login')) + + # Log the user in + login_user(user, remember=form.remember_me.data) + flash(f'Welcome back, {user.username}!', 'success') + + # --- Redirect Logic Modified --- + next_page = request.args.get('next') + + # IMPORTANT: Validate next_page to prevent Open Redirect attacks + # Ensure it's a relative path within our site + if next_page and not next_page.startswith('/'): + flash('Invalid redirect specified.', 'warning') # Optional feedback + next_page = None # Discard invalid or external URLs + + # If no valid 'next' page was provided, determine default based on role + if not next_page: + if user.role == 'admin': + # Default redirect for admins + next_page = url_for('control_panel.index') + else: + # Default redirect for non-admins (e.g., 'user' role) + next_page = url_for('core.index') + # --- End of Modified Redirect Logic --- + + return redirect(next_page) + + # Render login template (GET request or failed POST validation) + # Assuming template is directly in blueprint's template folder + return render_template('login.html', title='Sign In', form=form) + +@bp.route('/logout') +def logout(): + logout_user() + flash('You have been logged out.', 'info') + return redirect(url_for('core.index')) \ No newline at end of file diff --git a/app/auth/templates/login.html b/app/auth/templates/login.html new file mode 100644 index 0000000..799c20c --- /dev/null +++ b/app/auth/templates/login.html @@ -0,0 +1,43 @@ +{% extends "base.html" %} + +{% block content %} +
+
+
+

Sign In

+
+
+ {{ form.hidden_tag() }}
+ {{ form.username.label(class="form-label") }} + {{ form.username(class="form-control" + (" is-invalid" if form.username.errors else ""), size=32) }} + {% if form.username.errors %} +
+ {% for error in form.username.errors %}{{ error }}{% endfor %} +
+ {% endif %} +
+ +
+ {{ form.password.label(class="form-label") }} + {{ form.password(class="form-control" + (" is-invalid" if form.password.errors else ""), size=32) }} + {% if form.password.errors %} +
+ {% for error in form.password.errors %}{{ error }}{% endfor %} +
+ {% endif %} +
+ +
+ {{ form.remember_me(class="form-check-input") }} + {{ form.remember_me.label(class="form-check-label") }} +
+ +
+ {{ form.submit(class="btn btn-primary") }} +
+ +
+
+
+
+{% endblock %} \ No newline at end of file diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..b6bc2d2 --- /dev/null +++ b/app/core/__init__.py @@ -0,0 +1,14 @@ +# app/core/__init__.py +# Initialize the core blueprint + +from flask import Blueprint + +# Create a Blueprint instance for core routes +# 'core' is the name of the blueprint +# __name__ helps Flask locate the blueprint's resources (like templates) +# template_folder='templates' specifies a blueprint-specific template folder (optional) +bp = Blueprint('core', __name__, template_folder='templates') + +# Import the routes module associated with this blueprint +# This import is at the bottom to avoid circular dependencies +from . import routes # noqa: F401 E402 diff --git a/app/core/routes.py b/app/core/routes.py new file mode 100644 index 0000000..475433d --- /dev/null +++ b/app/core/routes.py @@ -0,0 +1,18 @@ +# app/core/routes.py +# Defines routes for the core part of the application (e.g., home page) + +from flask import render_template +from . import bp # Import the blueprint instance defined in __init__.py + +# --- Core Routes --- + +@bp.route('/') +@bp.route('/index') +def index(): + """Renders the main home page of the application.""" + # This will look for 'index.html' first in the blueprint's template folder + # (if defined, e.g., 'app/core/templates/index.html') + # and then fall back to the main application's template folder ('app/templates/index.html') + return render_template('index.html', title='Home') + +# Add other core routes here (e.g., about page, contact page) if needed diff --git a/app/decorators.py b/app/decorators.py new file mode 100644 index 0000000..6b07869 --- /dev/null +++ b/app/decorators.py @@ -0,0 +1,19 @@ +# app/decorators.py +from functools import wraps +from flask_login import current_user +from flask import abort + +def admin_required(func): + """ + Decorator to ensure the user is logged in and has the 'admin' role. + Aborts with 403 Forbidden if conditions are not met. + """ + @wraps(func) + def decorated_view(*args, **kwargs): + # Check if user is logged in and has the admin role (using the property we added) + if not current_user.is_authenticated or not current_user.is_admin: + # If not admin, return a 403 Forbidden error + abort(403) + # If admin, proceed with the original route function + return func(*args, **kwargs) + return decorated_view \ No newline at end of file diff --git a/app/fhir_ig_importer/__init__.py b/app/fhir_ig_importer/__init__.py new file mode 100644 index 0000000..dff3722 --- /dev/null +++ b/app/fhir_ig_importer/__init__.py @@ -0,0 +1,28 @@ +# app/modules/fhir_ig_importer/__init__.py + +from flask import Blueprint + +# --- Module Metadata --- +metadata = { + 'module_id': 'fhir_ig_importer', # Matches folder name + 'display_name': 'FHIR IG Importer', + 'description': 'Imports FHIR Implementation Guide packages from a registry.', + 'version': '0.1.0', + # No main nav items, will be accessed via Control Panel + 'nav_items': [] +} +# --- End Module Metadata --- + +# Define Blueprint +# We'll mount this under the control panel later +bp = Blueprint( + metadata['module_id'], + __name__, + template_folder='templates', + # Define a URL prefix if mounting standalone, but we'll likely register + # it under /control-panel via app/__init__.py later + # url_prefix='/fhir-importer' +) + +# Import routes after creating blueprint +from . import routes, forms # Import forms too \ No newline at end of file diff --git a/app/fhir_ig_importer/forms.py b/app/fhir_ig_importer/forms.py new file mode 100644 index 0000000..a54f25e --- /dev/null +++ b/app/fhir_ig_importer/forms.py @@ -0,0 +1,19 @@ +# app/modules/fhir_ig_importer/forms.py + +from flask_wtf import FlaskForm +from wtforms import StringField, SubmitField +from wtforms.validators import DataRequired, Regexp + +class IgImportForm(FlaskForm): + """Form for specifying an IG package to import.""" + # Basic validation for FHIR package names (e.g., hl7.fhir.r4.core) + package_name = StringField('Package Name (e.g., hl7.fhir.au.base)', validators=[ + DataRequired(), + Regexp(r'^[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)+$', message='Invalid package name format.') + ]) + # Basic validation for version (e.g., 4.1.0, current) + package_version = StringField('Package Version (e.g., 4.1.0 or current)', validators=[ + DataRequired(), + Regexp(r'^[a-zA-Z0-9\.\-]+$', message='Invalid version format.') + ]) + submit = SubmitField('Fetch & Download IG') \ No newline at end of file diff --git a/app/fhir_ig_importer/routes.py b/app/fhir_ig_importer/routes.py new file mode 100644 index 0000000..863330d --- /dev/null +++ b/app/fhir_ig_importer/routes.py @@ -0,0 +1,162 @@ +# app/modules/fhir_ig_importer/routes.py + +import requests +import os +import tarfile # Needed for find_and_extract_sd +import gzip +import json +import io +import re +from flask import (render_template, redirect, url_for, flash, request, + current_app, jsonify, send_file) +from flask_login import login_required +from app.decorators import admin_required +from werkzeug.utils import secure_filename +from . import bp +from .forms import IgImportForm +# Import the services module +from . import services +# Import ProcessedIg model for get_structure_definition +from app.models import ProcessedIg +from app import db + + +# --- Helper: Find/Extract SD --- +# Moved from services.py to be local to routes that use it, or keep in services and call services.find_and_extract_sd +def find_and_extract_sd(tgz_path, resource_identifier): + """Helper to find and extract SD json from a given tgz path by ID, Name, or Type.""" + sd_data = None; found_path = None; logger = current_app.logger # Use current_app logger + if not tgz_path or not os.path.exists(tgz_path): logger.error(f"File not found in find_and_extract_sd: {tgz_path}"); return None, None + try: + with tarfile.open(tgz_path, "r:gz") as tar: + logger.debug(f"Searching for SD matching '{resource_identifier}' in {os.path.basename(tgz_path)}") + for member in tar: + if member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json'): + if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue + fileobj = None + try: + fileobj = tar.extractfile(member) + if fileobj: + content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) + if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition': + sd_id = data.get('id'); sd_name = data.get('name'); sd_type = data.get('type') + if resource_identifier == sd_type or resource_identifier == sd_id or resource_identifier == sd_name: + sd_data = data; found_path = member.name; logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}"); break + except Exception as e: logger.warning(f"Could not read/parse potential SD {member.name}: {e}") + finally: + if fileobj: fileobj.close() + if sd_data is None: logger.warning(f"SD matching '{resource_identifier}' not found within archive {os.path.basename(tgz_path)}") + except Exception as e: logger.error(f"Error reading archive {tgz_path} in find_and_extract_sd: {e}", exc_info=True); raise + return sd_data, found_path +# --- End Helper --- + + +# --- Route for the main import page --- +@bp.route('/import-ig', methods=['GET', 'POST']) +@login_required +@admin_required +def import_ig(): + """Handles FHIR IG recursive download using services.""" + form = IgImportForm() + template_context = {"title": "Import FHIR IG", "form": form, "results": None } + if form.validate_on_submit(): + package_name = form.package_name.data; package_version = form.package_version.data + template_context.update(package_name=package_name, package_version=package_version) + flash(f"Starting full import for {package_name}#{package_version}...", "info"); current_app.logger.info(f"Calling import service for: {package_name}#{package_version}") + try: + # Call the CORRECT orchestrator service function + import_results = services.import_package_and_dependencies(package_name, package_version) + template_context["results"] = import_results + # Flash summary messages + dl_count = len(import_results.get('downloaded', {})); proc_count = len(import_results.get('processed', set())); error_count = len(import_results.get('errors', [])) + if dl_count > 0: flash(f"Downloaded/verified {dl_count} package file(s).", "success") + if proc_count < dl_count and dl_count > 0 : flash(f"Dependency data extraction failed for {dl_count - proc_count} package(s).", "warning") + if error_count > 0: flash(f"{error_count} total error(s) occurred.", "danger") + elif dl_count == 0 and error_count == 0: flash("No packages needed downloading or initial package failed.", "info") + elif error_count == 0: flash("Import process completed successfully.", "success") + except Exception as e: + fatal_error = f"Critical unexpected error during import: {e}"; template_context["fatal_error"] = fatal_error; current_app.logger.error(f"Critical import error: {e}", exc_info=True); flash(fatal_error, "danger") + return render_template('fhir_ig_importer/import_ig_page.html', **template_context) + return render_template('fhir_ig_importer/import_ig_page.html', **template_context) + + +# --- Route to get StructureDefinition elements --- +@bp.route('/get-structure') +@login_required +@admin_required +def get_structure_definition(): + """API endpoint to fetch SD elements and pre-calculated Must Support paths.""" + package_name = request.args.get('package_name'); package_version = request.args.get('package_version'); resource_identifier = request.args.get('resource_type') + error_response_data = {"elements": [], "must_support_paths": []} + if not all([package_name, package_version, resource_identifier]): error_response_data["error"] = "Missing query parameters"; return jsonify(error_response_data), 400 + current_app.logger.info(f"Request for structure: {package_name}#{package_version} / {resource_identifier}") + + # Find the primary package file + package_dir_name = 'fhir_packages'; download_dir = os.path.join(current_app.instance_path, package_dir_name) + # Use service helper for consistency + filename = services._construct_tgz_filename(package_name, package_version) + tgz_path = os.path.join(download_dir, filename) + if not os.path.exists(tgz_path): error_response_data["error"] = f"Package file not found: {filename}"; return jsonify(error_response_data), 404 + + sd_data = None; found_path = None; error_msg = None + try: + # Call the local helper function correctly + sd_data, found_path = find_and_extract_sd(tgz_path, resource_identifier) + # Fallback check + if sd_data is None: + core_pkg_name = "hl7.fhir.r4.core"; core_pkg_version = "4.0.1" # TODO: Make dynamic + core_filename = services._construct_tgz_filename(core_pkg_name, core_pkg_version) + core_tgz_path = os.path.join(download_dir, core_filename) + if os.path.exists(core_tgz_path): + current_app.logger.info(f"Trying fallback search in {core_pkg_name}...") + sd_data, found_path = find_and_extract_sd(core_tgz_path, resource_identifier) # Call local helper + else: current_app.logger.warning(f"Core package {core_tgz_path} not found.") + except Exception as e: + error_msg = f"Error searching package(s): {e}"; current_app.logger.error(error_msg, exc_info=True); error_response_data["error"] = error_msg; return jsonify(error_response_data), 500 + + if sd_data is None: error_msg = f"SD for '{resource_identifier}' not found."; error_response_data["error"] = error_msg; return jsonify(error_response_data), 404 + + # Extract elements + elements = sd_data.get('snapshot', {}).get('element', []) + if not elements: elements = sd_data.get('differential', {}).get('element', []) + + # Fetch pre-calculated Must Support paths from DB + must_support_paths = []; + try: + stmt = db.select(ProcessedIg).filter_by(package_name=package_name, package_version=package_version); processed_ig_record = db.session.scalar(stmt) + if processed_ig_record: all_ms_paths_dict = processed_ig_record.must_support_elements; must_support_paths = all_ms_paths_dict.get(resource_identifier, []) + else: current_app.logger.warning(f"No ProcessedIg record found for {package_name}#{package_version}") + except Exception as e: current_app.logger.error(f"Error fetching MS paths from DB: {e}", exc_info=True) + + current_app.logger.info(f"Returning {len(elements)} elements for {resource_identifier} from {found_path or 'Unknown File'}") + return jsonify({"elements": elements, "must_support_paths": must_support_paths}) + + +# --- Route to get raw example file content --- +@bp.route('/get-example') +@login_required +@admin_required +def get_example_content(): + # ... (Function remains the same as response #147) ... + package_name = request.args.get('package_name'); package_version = request.args.get('package_version'); example_member_path = request.args.get('filename') + if not all([package_name, package_version, example_member_path]): return jsonify({"error": "Missing query parameters"}), 400 + current_app.logger.info(f"Request for example: {package_name}#{package_version} / {example_member_path}") + package_dir_name = 'fhir_packages'; download_dir = os.path.join(current_app.instance_path, package_dir_name) + pkg_filename = services._construct_tgz_filename(package_name, package_version) # Use service helper + tgz_path = os.path.join(download_dir, pkg_filename) + if not os.path.exists(tgz_path): return jsonify({"error": f"Package file not found: {pkg_filename}"}), 404 + # Basic security check on member path + safe_member_path = secure_filename(example_member_path.replace("package/","")) # Allow paths within package/ + if not example_member_path.startswith('package/') or '..' in example_member_path: return jsonify({"error": "Invalid example file path."}), 400 + + try: + with tarfile.open(tgz_path, "r:gz") as tar: + try: example_member = tar.getmember(example_member_path) # Use original path here + except KeyError: return jsonify({"error": f"Example file '{example_member_path}' not found."}), 404 + example_fileobj = tar.extractfile(example_member) + if not example_fileobj: return jsonify({"error": "Could not extract example file."}), 500 + try: content_bytes = example_fileobj.read() + finally: example_fileobj.close() + return content_bytes # Return raw bytes + except tarfile.TarError as e: err_msg = f"Error reading {tgz_path}: {e}"; current_app.logger.error(err_msg); return jsonify({"error": err_msg}), 500 + except Exception as e: err_msg = f"Unexpected error getting example {example_member_path}: {e}"; current_app.logger.error(err_msg, exc_info=True); return jsonify({"error": err_msg}), 500 \ No newline at end of file diff --git a/app/fhir_ig_importer/services.py b/app/fhir_ig_importer/services.py new file mode 100644 index 0000000..b98a6ee --- /dev/null +++ b/app/fhir_ig_importer/services.py @@ -0,0 +1,345 @@ +# app/modules/fhir_ig_importer/services.py + +import requests +import os +import tarfile +import gzip +import json +import io +import re +import logging +from flask import current_app +from collections import defaultdict + +# Constants +FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" +DOWNLOAD_DIR_NAME = "fhir_packages" + +# --- Helper Functions --- + +def _get_download_dir(): + """Gets the absolute path to the download directory, creating it if needed.""" + logger = logging.getLogger(__name__) + instance_path = None # Initialize + try: + # --- FIX: Indent code inside try block --- + instance_path = current_app.instance_path + logger.debug(f"Using instance path from current_app: {instance_path}") + except RuntimeError: + # --- FIX: Indent code inside except block --- + logger.warning("No app context for instance_path, constructing relative path.") + instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) + logger.debug(f"Constructed instance path: {instance_path}") + + # This part depends on instance_path being set above + if not instance_path: + logger.error("Fatal Error: Could not determine instance path.") + return None + + download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) + try: + # --- FIX: Indent code inside try block --- + os.makedirs(download_dir, exist_ok=True) + return download_dir + except OSError as e: + # --- FIX: Indent code inside except block --- + logger.error(f"Fatal Error creating dir {download_dir}: {e}", exc_info=True) + return None + +def sanitize_filename_part(text): # Public version + """Basic sanitization for name/version parts of filename.""" + # --- FIX: Indent function body --- + safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) + safe_text = re.sub(r'_+', '_', safe_text) # Uses re + safe_text = safe_text.strip('_-.') + return safe_text if safe_text else "invalid_name" + +def _construct_tgz_filename(name, version): + """Constructs the standard filename using the sanitized parts.""" + # --- FIX: Indent function body --- + return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" + +def find_and_extract_sd(tgz_path, resource_identifier): # Public version + """Helper to find and extract SD json from a given tgz path by ID, Name, or Type.""" + # --- FIX: Ensure consistent indentation --- + sd_data = None + found_path = None + logger = logging.getLogger(__name__) + if not tgz_path or not os.path.exists(tgz_path): + logger.error(f"File not found in find_and_extract_sd: {tgz_path}") + return None, None + try: + with tarfile.open(tgz_path, "r:gz") as tar: + logger.debug(f"Searching for SD matching '{resource_identifier}' in {os.path.basename(tgz_path)}") + for member in tar: + if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')): + continue + if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: + continue + + fileobj = None + try: + fileobj = tar.extractfile(member) + if fileobj: + content_bytes = fileobj.read() + content_string = content_bytes.decode('utf-8-sig') + data = json.loads(content_string) + if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition': + sd_id = data.get('id') + sd_name = data.get('name') + sd_type = data.get('type') + # Match if requested identifier matches ID, Name, or Base Type + if resource_identifier == sd_type or resource_identifier == sd_id or resource_identifier == sd_name: + sd_data = data + found_path = member.name + logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}") + break # Stop searching once found + except Exception as e: + # Log issues reading/parsing individual files but continue search + logger.warning(f"Could not read/parse potential SD {member.name}: {e}") + finally: + if fileobj: fileobj.close() # Ensure resource cleanup + + if sd_data is None: + logger.warning(f"SD matching '{resource_identifier}' not found within archive {os.path.basename(tgz_path)}") + except tarfile.TarError as e: + logger.error(f"TarError reading {tgz_path} in find_and_extract_sd: {e}") + raise tarfile.TarError(f"Error reading package archive: {e}") from e + except FileNotFoundError as e: + logger.error(f"FileNotFoundError reading {tgz_path} in find_and_extract_sd: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error in find_and_extract_sd for {tgz_path}: {e}", exc_info=True) + raise + return sd_data, found_path + +# --- Core Service Functions --- + +def download_package(name, version): + """ Downloads a single FHIR package. Returns (save_path, error_message) """ + # --- FIX: Ensure consistent indentation --- + logger = logging.getLogger(__name__) + download_dir = _get_download_dir() + if not download_dir: + return None, "Could not get/create download directory." + + package_id = f"{name}#{version}" + package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" + filename = _construct_tgz_filename(name, version) # Uses public sanitize via helper + save_path = os.path.join(download_dir, filename) + + if os.path.exists(save_path): + logger.info(f"Exists: {filename}") + return save_path, None + + logger.info(f"Downloading: {package_id} -> {filename}") + try: + with requests.get(package_url, stream=True, timeout=90) as r: + r.raise_for_status() + with open(save_path, 'wb') as f: + logger.debug(f"Opened {save_path} for writing.") + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + logger.info(f"Success: Downloaded {filename}") + return save_path, None + except requests.exceptions.RequestException as e: + err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg + except OSError as e: + err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg + except Exception as e: + err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg + +def extract_dependencies(tgz_path): + """ Extracts dependencies dict from package.json. Returns (dep_dict or None on error, error_message) """ + # --- FIX: Ensure consistent indentation --- + logger = logging.getLogger(__name__) + package_json_path = "package/package.json" + dependencies = {} + error_message = None + if not tgz_path or not os.path.exists(tgz_path): + return None, f"File not found at {tgz_path}" + try: + with tarfile.open(tgz_path, "r:gz") as tar: + package_json_member = tar.getmember(package_json_path) + package_json_fileobj = tar.extractfile(package_json_member) + if package_json_fileobj: + try: + package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig')) + dependencies = package_data.get('dependencies', {}) + finally: + package_json_fileobj.close() + else: + raise FileNotFoundError(f"Could not extract {package_json_path}") + except KeyError: + error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; + logger.warning(error_message) # OK if missing + except (json.JSONDecodeError, UnicodeDecodeError) as e: + error_message = f"Parse error in {package_json_path}: {e}"; logger.error(error_message); dependencies = None # Parsing failed + except (tarfile.TarError, FileNotFoundError) as e: + error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Archive read failed + except Exception as e: + error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None + return dependencies, error_message + + +# --- Recursive Import Orchestrator --- +def import_package_and_dependencies(initial_name, initial_version): + """Orchestrates recursive download and dependency extraction.""" + # --- FIX: Ensure consistent indentation --- + logger = logging.getLogger(__name__) + logger.info(f"Starting recursive import for {initial_name}#{initial_version}") + results = {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'errors': [] } + pending_queue = [(initial_name, initial_version)] + processed_lookup = set() + + while pending_queue: + name, version = pending_queue.pop(0) + package_id_tuple = (name, version) + + if package_id_tuple in processed_lookup: + continue + + logger.info(f"Processing: {name}#{version}") + processed_lookup.add(package_id_tuple) + + save_path, dl_error = download_package(name, version) + + if dl_error: + error_msg = f"Download failed for {name}#{version}: {dl_error}" + results['errors'].append(error_msg) + if package_id_tuple == results['requested']: + logger.error("Aborting import: Initial package download failed.") + break + else: + continue + else: # Download OK + results['downloaded'][package_id_tuple] = save_path + # --- Correctly indented block --- + dependencies, dep_error = extract_dependencies(save_path) + if dep_error: + results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") + elif dependencies is not None: + results['all_dependencies'][package_id_tuple] = dependencies + results['processed'].add(package_id_tuple) + logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") + for dep_name, dep_version in dependencies.items(): + if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: + dep_tuple = (dep_name, dep_version) + if dep_tuple not in processed_lookup: + if dep_tuple not in pending_queue: + pending_queue.append(dep_tuple) + logger.debug(f"Added to queue: {dep_name}#{dep_version}") + else: + logger.warning(f"Skipping invalid dependency '{dep_name}': '{dep_version}' in {name}#{version}") + # --- End Correctly indented block --- + + proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) + logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") + return results + + +# --- Package File Content Processor (V6.2 - Fixed MS path handling) --- +def process_package_file(tgz_path): + """ Extracts types, profile status, MS elements, and examples from a downloaded .tgz package (Single Pass). """ + logger = logging.getLogger(__name__) + logger.info(f"Processing package file details (V6.2 Logic): {tgz_path}") + + results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } + resource_info = defaultdict(lambda: {'name': None, 'type': None, 'is_profile': False, 'ms_flag': False, 'ms_paths': set(), 'examples': set()}) + + if not tgz_path or not os.path.exists(tgz_path): + results['errors'].append(f"Package file not found: {tgz_path}"); return results + + try: + with tarfile.open(tgz_path, "r:gz") as tar: + for member in tar: + if not member.isfile() or not member.name.startswith('package/') or not member.name.lower().endswith(('.json', '.xml', '.html')): continue + member_name_lower = member.name.lower(); base_filename_lower = os.path.basename(member_name_lower); fileobj = None + if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue + + is_example = member.name.startswith('package/example/') or 'example' in base_filename_lower + is_json = member_name_lower.endswith('.json') + + try: # Process individual member + if is_json: + fileobj = tar.extractfile(member); + if not fileobj: continue + content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) + if not isinstance(data, dict) or 'resourceType' not in data: continue + + resource_type = data['resourceType']; entry_key = resource_type; is_sd = False + + if resource_type == 'StructureDefinition': + is_sd = True; profile_id = data.get('id') or data.get('name'); sd_type = data.get('type'); sd_base = data.get('baseDefinition'); is_profile_sd = bool(sd_base); + if not profile_id or not sd_type: logger.warning(f"SD missing ID or Type: {member.name}"); continue + entry_key = profile_id + + entry = resource_info[entry_key]; entry.setdefault('type', resource_type) # Ensure type exists + + if is_sd: + entry['name'] = entry_key; entry['type'] = sd_type; entry['is_profile'] = is_profile_sd; + if not entry.get('sd_processed'): + has_ms = False; ms_paths_for_sd = set() + for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: + for element in element_list: + if isinstance(element, dict) and element.get('mustSupport') is True: + # --- FIX: Check path safely --- + element_path = element.get('path') + if element_path: # Only add if path exists + ms_paths_for_sd.add(element_path) + has_ms = True # Mark MS found if we added a path + else: + logger.warning(f"Found mustSupport=true without path in element of {entry_key}") + # --- End FIX --- + if ms_paths_for_sd: entry['ms_paths'] = ms_paths_for_sd # Store the set of paths + if has_ms: entry['ms_flag'] = True; logger.debug(f" Found MS elements in {entry_key}") # Use boolean flag + entry['sd_processed'] = True # Mark MS check done + + elif is_example: # JSON Example + key_to_use = None; profile_meta = data.get('meta', {}).get('profile', []) + if profile_meta and isinstance(profile_meta, list): + for profile_url in profile_meta: profile_id_from_meta = profile_url.split('/')[-1]; + if profile_id_from_meta in resource_info: key_to_use = profile_id_from_meta; break + if not key_to_use: key_to_use = resource_type + if key_to_use not in resource_info: resource_info[key_to_use].update({'name': key_to_use, 'type': resource_type}) + resource_info[key_to_use]['examples'].add(member.name) + + elif is_example: # XML/HTML examples + # ... (XML/HTML example association logic) ... + guessed_type = base_filename_lower.split('-')[0].capitalize(); guessed_profile_id = base_filename_lower.split('-')[0]; key_to_use = None + if guessed_profile_id in resource_info: key_to_use = guessed_profile_id + elif guessed_type in resource_info: key_to_use = guessed_type + if key_to_use: resource_info[key_to_use]['examples'].add(member.name) + else: logger.warning(f"Could not associate non-JSON example {member.name}") + + except Exception as e: logger.warning(f"Could not process member {member.name}: {e}", exc_info=False) + finally: + if fileobj: fileobj.close() + # -- End Member Loop -- + + # --- Final formatting moved INSIDE the main try block --- + final_list = []; final_ms_elements = {}; final_examples = {} + logger.debug(f"Formatting results from resource_info keys: {list(resource_info.keys())}") + for key, info in resource_info.items(): + display_name = info.get('name') or key; base_type = info.get('type') + if display_name or base_type: + logger.debug(f" Formatting item '{display_name}': type='{base_type}', profile='{info.get('is_profile', False)}', ms_flag='{info.get('ms_flag', False)}'") + final_list.append({'name': display_name, 'type': base_type, 'is_profile': info.get('is_profile', False), 'must_support': info.get('ms_flag', False)}) # Ensure 'must_support' key uses 'ms_flag' + if info['ms_paths']: final_ms_elements[display_name] = sorted(list(info['ms_paths'])) + if info['examples']: final_examples[display_name] = sorted(list(info['examples'])) + else: logger.warning(f"Skipping formatting for key: {key}") + + results['resource_types_info'] = sorted(final_list, key=lambda x: (not x.get('is_profile', False), x.get('name', ''))) + results['must_support_elements'] = final_ms_elements + results['examples'] = final_examples + # --- End formatting moved inside --- + + except Exception as e: + err_msg = f"Error processing package file {tgz_path}: {e}"; logger.error(err_msg, exc_info=True); results['errors'].append(err_msg) + + # Logging counts + final_types_count = len(results['resource_types_info']); ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']); total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()); total_examples = sum(len(v) for v in results['examples'].values()) + logger.info(f"V6.2 Extraction: {final_types_count} items ({ms_count} MS; {total_ms_paths} MS paths; {total_examples} examples) from {os.path.basename(tgz_path)}") + + return results \ No newline at end of file diff --git a/app/fhir_ig_importer/services.py.MERGE NAMES.py b/app/fhir_ig_importer/services.py.MERGE NAMES.py new file mode 100644 index 0000000..883a5f0 --- /dev/null +++ b/app/fhir_ig_importer/services.py.MERGE NAMES.py @@ -0,0 +1,364 @@ +# app/modules/fhir_ig_importer/services.py + +import requests +import os +import tarfile +import gzip +import json +import io +import re +import logging +from flask import current_app +from collections import defaultdict + +# Constants +FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" +DOWNLOAD_DIR_NAME = "fhir_packages" + +# --- Helper Functions --- + +def _get_download_dir(): + """Gets the absolute path to the download directory, creating it if needed.""" + logger = logging.getLogger(__name__) + try: + instance_path = current_app.instance_path + except RuntimeError: + logger.warning("No app context for instance_path, constructing relative path.") + instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) + logger.debug(f"Constructed instance path: {instance_path}") + download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) + try: + os.makedirs(download_dir, exist_ok=True) + return download_dir + except OSError as e: + logger.error(f"Fatal Error: Could not create dir {download_dir}: {e}", exc_info=True) + return None + +def sanitize_filename_part(text): + """Basic sanitization for creating filenames.""" + safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) + safe_text = re.sub(r'_+', '_', safe_text) + safe_text = safe_text.strip('_-.') + return safe_text if safe_text else "invalid_name" + +def _construct_tgz_filename(name, version): + return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" + +# --- Helper to Find/Extract SD --- +def _find_and_extract_sd(tgz_path, resource_type_to_find): + sd_data = None + found_path = None + logger = current_app.logger if current_app else logging.getLogger(__name__) + try: + with tarfile.open(tgz_path, "r:gz") as tar: + logger.debug(f"Searching for SD type '{resource_type_to_find}' in {tgz_path}") + potential_paths = [ + f'package/StructureDefinition-{resource_type_to_find.lower()}.json', + f'package/StructureDefinition-{resource_type_to_find}.json' + ] + member_found = None + for potential_path in potential_paths: + try: + member_found = tar.getmember(potential_path) + if member_found: break + except KeyError: + pass + + if not member_found: + for member in tar: + if member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json'): + filename_lower = os.path.basename(member.name).lower() + if filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: + continue + sd_fileobj = None + try: + sd_fileobj = tar.extractfile(member) + if sd_fileobj: + content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) + if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition' and data.get('type') == resource_type_to_find: + member_found = member + break + except Exception: + pass + finally: + if sd_fileobj: sd_fileobj.close() + + if member_found: + sd_fileobj = None + try: + sd_fileobj = tar.extractfile(member_found) + if sd_fileobj: + content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); sd_data = json.loads(content_string) + found_path = member_found.name; logger.info(f"Found matching SD at path: {found_path}") + except Exception as e: + logger.warning(f"Could not read/parse member {member_found.name} after finding it: {e}") + sd_data = None; found_path = None + finally: + if sd_fileobj: sd_fileobj.close() + + except tarfile.TarError as e: + logger.error(f"TarError reading {tgz_path}: {e}") + raise + except FileNotFoundError: + logger.error(f"FileNotFoundError reading {tgz_path}") + raise + except Exception as e: + logger.error(f"Unexpected error in _find_and_extract_sd for {tgz_path}: {e}", exc_info=True) + raise + return sd_data, found_path + +# --- Core Service Functions --- + +def download_package(name, version): + logger = logging.getLogger(__name__) + download_dir = _get_download_dir() + if not download_dir: return None, "Could not get/create download directory." + + package_id = f"{name}#{version}" + package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" + filename = _construct_tgz_filename(name, version) + save_path = os.path.join(download_dir, filename) + + if os.path.exists(save_path): + logger.info(f"Package already exists: {filename}") + return save_path, None + + logger.info(f"Downloading: {package_id} -> {filename}") + try: + with requests.get(package_url, stream=True, timeout=90) as r: + r.raise_for_status() + with open(save_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): f.write(chunk) + logger.info(f"Success: Downloaded {filename}") + return save_path, None + except requests.exceptions.RequestException as e: err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg + except OSError as e: err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg + except Exception as e: err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg + +def extract_dependencies(tgz_path): + logger = logging.getLogger(__name__) + package_json_path = "package/package.json" + dependencies = {} + error_message = None + if not tgz_path or not os.path.exists(tgz_path): return None, f"File not found at {tgz_path}" + try: + with tarfile.open(tgz_path, "r:gz") as tar: + package_json_member = tar.getmember(package_json_path) + package_json_fileobj = tar.extractfile(package_json_member) + if package_json_fileobj: + try: + package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig')) + dependencies = package_data.get('dependencies', {}) + finally: package_json_fileobj.close() + else: raise FileNotFoundError(f"Could not extract {package_json_path}") + except KeyError: error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; logger.warning(error_message) + except (json.JSONDecodeError, UnicodeDecodeError) as e: error_message = f"Parse error in {package_json_path} from {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None + except (tarfile.TarError, FileNotFoundError) as e: error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None + except Exception as e: error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None + return dependencies, error_message + +def import_package_and_dependencies(initial_name, initial_version): + logger = logging.getLogger(__name__) + logger.info(f"Starting recursive import for {initial_name}#{initial_version}") + results = {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'errors': [] } + pending_queue = [(initial_name, initial_version)]; processed_lookup = set() + + while pending_queue: + name, version = pending_queue.pop(0) + package_id_tuple = (name, version) + + if package_id_tuple in processed_lookup: continue + + logger.info(f"Processing: {name}#{version}"); processed_lookup.add(package_id_tuple) + + save_path, dl_error = download_package(name, version) + + if dl_error: + error_msg = f"Download failed for {name}#{version}: {dl_error}" + results['errors'].append(error_msg) + if package_id_tuple == results['requested']: + logger.error("Aborting import: Initial package download failed.") + break + else: + continue + else: + results['downloaded'][package_id_tuple] = save_path + dependencies, dep_error = extract_dependencies(save_path) + + if dep_error: + results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") + elif dependencies is not None: + results['all_dependencies'][package_id_tuple] = dependencies + results['processed'].add(package_id_tuple) + logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") + for dep_name, dep_version in dependencies.items(): + if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: + dep_tuple = (dep_name, dep_version) + if dep_tuple not in processed_lookup: + if dep_tuple not in pending_queue: + pending_queue.append(dep_tuple) + logger.debug(f"Added to queue: {dep_name}#{dep_version}") + else: + logger.warning(f"Skipping invalid dependency entry '{dep_name}': '{dep_version}' in {name}#{version}") + + proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) + logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") + return results + +def process_package_file(tgz_path): + logger = logging.getLogger(__name__) + logger.info(f"Processing package file details: {tgz_path}") + results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } + resource_info = {} + + if not os.path.exists(tgz_path): + results['errors'].append(f"Package file not found: {tgz_path}") + return results + + try: + with tarfile.open(tgz_path, "r:gz") as tar: + for member in tar: + if not member.isfile(): + continue + member_name_lower = member.name.lower() + base_filename_lower = os.path.basename(member_name_lower) + fileobj = None + + if member.name.startswith('package/') and member_name_lower.endswith('.json') and \ + base_filename_lower not in ['package.json', '.index.json', 'validation-summary.json']: + try: + fileobj = tar.extractfile(member) + if not fileobj: + continue + content_string = fileobj.read().decode('utf-8-sig') + data = json.loads(content_string) + + if isinstance(data, dict) and data.get('resourceType'): + resource_type = data['resourceType'] + + if member.name.startswith('package/example/'): + ex_type = resource_type + entry = resource_info.setdefault(ex_type, { + 'base_type': ex_type, + 'ms_flag': False, + 'ms_paths': [], + 'examples': [], + 'sd_processed': False + }) + entry['examples'].append(member.name) + continue + + if resource_type == 'StructureDefinition': + profile_id = data.get('id') or data.get('name') + fhir_type = data.get('type') + + if not profile_id: + logger.warning(f"StructureDefinition missing id or name: {member.name}") + continue + + entry = resource_info.setdefault(profile_id, { + 'base_type': fhir_type, + 'ms_flag': False, + 'ms_paths': [], + 'examples': [], + 'sd_processed': False + }) + + if entry['sd_processed']: + continue + + ms_paths = [] + has_ms = False + for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: + for element in element_list: + if not isinstance(element, dict): + continue + if element.get('mustSupport') is True: + path = element.get('path') + if path: + ms_paths.append(path) + has_ms = True + for t in element.get('type', []): + for ext in t.get('extension', []): + ext_url = ext.get('url') + if ext_url: + ms_paths.append(f"{path}.type.extension[{ext_url}]") + has_ms = True + for ext in element.get('extension', []): + ext_url = ext.get('url') + if ext_url: + ms_paths.append(f"{path}.extension[{ext_url}]") + has_ms = True + if ms_paths: + entry['ms_paths'] = sorted(set(ms_paths)) + if has_ms: + entry['ms_flag'] = True + entry['sd_processed'] = True + + except Exception as e: + logger.warning(f"Could not read/parse member {member.name}: {e}") + finally: + if fileobj: + fileobj.close() + + elif (member.name.startswith('package/example/') or ('example' in base_filename_lower and member.name.startswith('package/'))) \ + and (member_name_lower.endswith('.xml') or member_name_lower.endswith('.html')): + guessed_type = base_filename_lower.split('-', 1)[0].capitalize() + if guessed_type in resource_info: + resource_info[guessed_type]['examples'].append(member.name) + + except Exception as e: + err_msg = f"Error processing package file {tgz_path}: {e}" + logger.error(err_msg, exc_info=True) + results['errors'].append(err_msg) + + # --- New logic: merge profiles of same base_type --- + merged_info = {} + grouped_by_type = defaultdict(list) + + for profile_id, entry in resource_info.items(): + base_type = entry['base_type'] or profile_id + grouped_by_type[base_type].append((profile_id, entry)) + + for base_type, profiles in grouped_by_type.items(): + merged_paths = set() + merged_examples = [] + has_ms = False + + for _, profile_entry in profiles: + merged_paths.update(profile_entry.get('ms_paths', [])) + merged_examples.extend(profile_entry.get('examples', [])) + if profile_entry.get('ms_flag'): + has_ms = True + + merged_info[base_type] = { + 'base_type': base_type, + 'ms_flag': has_ms, + 'ms_paths': sorted(merged_paths), + 'examples': sorted(merged_examples), + } + + results['resource_types_info'] = sorted([ + {'name': k, 'base_type': v.get('base_type'), 'must_support': v['ms_flag']} + for k, v in merged_info.items() + ], key=lambda x: x['name']) + + results['must_support_elements'] = { + k: v['ms_paths'] for k, v in merged_info.items() if v['ms_paths'] + } + + results['examples'] = { + k: v['examples'] for k, v in merged_info.items() if v['examples'] + } + + logger.info(f"Extracted {len(results['resource_types_info'])} profiles " + f"({sum(1 for r in results['resource_types_info'] if r['must_support'])} with MS; " + f"{sum(len(v) for v in results['must_support_elements'].values())} MS paths; " + f"{sum(len(v) for v in results['examples'].values())} examples) from {tgz_path}") + + return results + +# --- Remove or Comment Out old/unused functions --- +# def _fetch_package_metadata(package_name, package_version): ... (REMOVED) +# def resolve_all_dependencies(initial_package_name, initial_package_version): ... (REMOVED) +# def process_ig_import(package_name, package_version): ... (OLD orchestrator - REMOVED) \ No newline at end of file diff --git a/app/fhir_ig_importer/services.py.old b/app/fhir_ig_importer/services.py.old new file mode 100644 index 0000000..ce60a85 --- /dev/null +++ b/app/fhir_ig_importer/services.py.old @@ -0,0 +1,315 @@ +# app/modules/fhir_ig_importer/services.py + +import requests +import os +import tarfile +import gzip +import json +import io +import re +import logging +from flask import current_app + +# Constants +FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" +DOWNLOAD_DIR_NAME = "fhir_packages" + +# --- Helper Functions --- + +def _get_download_dir(): + """Gets the absolute path to the download directory, creating it if needed.""" + logger = logging.getLogger(__name__) + try: + instance_path = current_app.instance_path + # logger.debug(f"Using instance path from current_app: {instance_path}") # Can be noisy + except RuntimeError: + logger.warning("No app context for instance_path, constructing relative path.") + instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) + logger.debug(f"Constructed instance path: {instance_path}") + download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) + try: + os.makedirs(download_dir, exist_ok=True) + return download_dir + except OSError as e: + logger.error(f"Fatal Error: Could not create dir {download_dir}: {e}", exc_info=True) + return None + +def sanitize_filename_part(text): + """Basic sanitization for creating filenames.""" + # Replace common invalid chars, keep ., - + safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) + # Replace multiple underscores with single one + safe_text = re.sub(r'_+', '_', safe_text) + # Remove leading/trailing underscores/hyphens/periods + safe_text = safe_text.strip('_-.') + return safe_text if safe_text else "invalid_name" # Ensure not empty + +def _construct_tgz_filename(name, version): + """Constructs the standard filename for the package.""" + return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" + +# --- Helper to Find/Extract SD --- +def _find_and_extract_sd(tgz_path, resource_type_to_find): + """Helper to find and extract SD json from a given tgz path.""" + sd_data = None + found_path = None + logger = current_app.logger if current_app else logging.getLogger(__name__) # Use app logger if possible + try: + with tarfile.open(tgz_path, "r:gz") as tar: + logger.debug(f"Searching for SD type '{resource_type_to_find}' in {tgz_path}") + # Prioritize paths like 'package/StructureDefinition-[Type].json' + potential_paths = [ + f'package/StructureDefinition-{resource_type_to_find.lower()}.json', + f'package/StructureDefinition-{resource_type_to_find}.json' + ] + member_found = None + for potential_path in potential_paths: + try: + member_found = tar.getmember(potential_path) + if member_found: break + except KeyError: + pass + + # If specific paths failed, iterate + if not member_found: + for member in tar: + if member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json'): + filename_lower = os.path.basename(member.name).lower() + if filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: + continue + sd_fileobj = None + try: + sd_fileobj = tar.extractfile(member) + if sd_fileobj: + content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) + if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition' and data.get('type') == resource_type_to_find: + member_found = member + break + except Exception: + pass + finally: + if sd_fileobj: sd_fileobj.close() + + if member_found: + sd_fileobj = None + try: + sd_fileobj = tar.extractfile(member_found) + if sd_fileobj: + content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); sd_data = json.loads(content_string) + found_path = member_found.name; logger.info(f"Found matching SD at path: {found_path}") + except Exception as e: + logger.warning(f"Could not read/parse member {member_found.name} after finding it: {e}") + sd_data = None; found_path = None + finally: + if sd_fileobj: sd_fileobj.close() + + except tarfile.TarError as e: + logger.error(f"TarError reading {tgz_path}: {e}") + raise + except FileNotFoundError: + logger.error(f"FileNotFoundError reading {tgz_path}") + raise + except Exception as e: + logger.error(f"Unexpected error in _find_and_extract_sd for {tgz_path}: {e}", exc_info=True) + raise + return sd_data, found_path +# --- End Helper --- + +# --- Core Service Functions --- + +def download_package(name, version): + """ Downloads a single FHIR package. Returns (save_path, error_message) """ + logger = logging.getLogger(__name__) + download_dir = _get_download_dir() + if not download_dir: return None, "Could not get/create download directory." + + package_id = f"{name}#{version}" + package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" + filename = _construct_tgz_filename(name, version) + save_path = os.path.join(download_dir, filename) + + if os.path.exists(save_path): + logger.info(f"Package already exists: {filename}") + return save_path, None + + logger.info(f"Downloading: {package_id} -> {filename}") + try: + with requests.get(package_url, stream=True, timeout=90) as r: + r.raise_for_status() + with open(save_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): f.write(chunk) + logger.info(f"Success: Downloaded {filename}") + return save_path, None + except requests.exceptions.RequestException as e: err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg + except OSError as e: err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg + except Exception as e: err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg + +def extract_dependencies(tgz_path): + """ Extracts dependencies dict from package.json. Returns (dep_dict, error_message) """ + logger = logging.getLogger(__name__) + package_json_path = "package/package.json" + dependencies = {} + error_message = None + if not tgz_path or not os.path.exists(tgz_path): return None, f"File not found at {tgz_path}" + try: + with tarfile.open(tgz_path, "r:gz") as tar: + package_json_member = tar.getmember(package_json_path) # Raises KeyError if not found + package_json_fileobj = tar.extractfile(package_json_member) + if package_json_fileobj: + try: + package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig')) + dependencies = package_data.get('dependencies', {}) + finally: package_json_fileobj.close() + else: raise FileNotFoundError(f"Could not extract {package_json_path}") + except KeyError: error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; logger.warning(error_message) # No deps is okay + except (json.JSONDecodeError, UnicodeDecodeError) as e: error_message = f"Parse error in {package_json_path} from {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Parsing failed + except (tarfile.TarError, FileNotFoundError) as e: error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Archive read failed + except Exception as e: error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None + return dependencies, error_message + +# --- Recursive Import Orchestrator (Corrected Indentation) --- +def import_package_and_dependencies(initial_name, initial_version): + """Orchestrates recursive download and dependency extraction.""" + logger = logging.getLogger(__name__) + logger.info(f"Starting recursive import for {initial_name}#{initial_version}") + results = {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'errors': [] } + pending_queue = [(initial_name, initial_version)]; processed_lookup = set() + + while pending_queue: + name, version = pending_queue.pop(0) + package_id_tuple = (name, version) + + if package_id_tuple in processed_lookup: continue + + logger.info(f"Processing: {name}#{version}"); processed_lookup.add(package_id_tuple) + + # 1. Download + save_path, dl_error = download_package(name, version) + + if dl_error: + error_msg = f"Download failed for {name}#{version}: {dl_error}" + results['errors'].append(error_msg) + if package_id_tuple == results['requested']: + logger.error("Aborting import: Initial package download failed.") + break # Stop processing queue if initial download fails + else: + continue # Skip dependency extraction for this failed download + else: + # Download succeeded (or file existed) + results['downloaded'][package_id_tuple] = save_path + + # --- FIX: Indent dependency extraction under the else block --- + # 2. Extract Dependencies from downloaded file + dependencies, dep_error = extract_dependencies(save_path) + + if dep_error: + results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") + # Still mark as 'downloaded', but maybe not 'processed'? Let's allow queue processing. + elif dependencies is not None: # Not None means extraction attempt happened (even if deps is {}) + results['all_dependencies'][package_id_tuple] = dependencies + results['processed'].add(package_id_tuple) # Mark as successfully processed (downloaded + deps extracted) + logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") + # Add new dependencies to queue + for dep_name, dep_version in dependencies.items(): + # Basic validation of dependency entry format + if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: + dep_tuple = (dep_name, dep_version) + if dep_tuple not in processed_lookup: # Check processed_lookup to prevent re-queueing + if dep_tuple not in pending_queue: # Avoid duplicate queue entries + pending_queue.append(dep_tuple) + logger.debug(f"Added to queue: {dep_name}#{dep_version}") + else: + logger.warning(f"Skipping invalid dependency entry '{dep_name}': '{dep_version}' in {name}#{version}") + # --- End Indentation Fix --- + + # Final Summary Log + proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) + logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") + return results + +# --- Package File Content Processor --- +def process_package_file(tgz_path): + """ Extracts types, MS elements, and examples from a downloaded .tgz package (Single Pass). """ + logger = logging.getLogger(__name__) + logger.info(f"Processing package file details: {tgz_path}") + results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } + resource_info = {} # Temp dict: {'Type': {'ms_flag': False, 'ms_paths': [], 'examples': [], 'sd_processed': False}} + + if not os.path.exists(tgz_path): errors.append(f"Package file not found: {tgz_path}"); return results + + try: + with tarfile.open(tgz_path, "r:gz") as tar: + for member in tar: + if not member.isfile(): continue + member_name_lower = member.name.lower() + base_filename_lower = os.path.basename(member_name_lower) + fileobj = None + + # Check if JSON file inside package/ (excluding known non-resources) + if member.name.startswith('package/') and member_name_lower.endswith('.json') and \ + base_filename_lower not in ['package.json', '.index.json', 'validation-summary.json']: + try: + fileobj = tar.extractfile(member) + if not fileobj: continue + content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) + + if isinstance(data, dict) and 'resourceType' in data: + resource_type = data['resourceType'] + # Ensure entry exists for this type + type_entry = resource_info.setdefault(resource_type, {'ms_flag': False, 'ms_paths': [], 'examples': [], 'sd_processed': False}) + + # If it's an example file, record it + if member.name.startswith('package/example/'): + type_entry['examples'].append(member.name) + logger.debug(f"Found example for {resource_type}: {member.name}") + + # If it's a StructureDefinition, process MS flags (only once per type) + if resource_type == 'StructureDefinition' and not type_entry['sd_processed']: + sd_type = data.get('type') + if sd_type and sd_type in resource_info: # Check against types we've already seen + ms_paths_for_type = []; has_ms = False + for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: + for element in element_list: + if isinstance(element, dict) and element.get('mustSupport') is True: + element_path = element.get('path') + if element_path: + ms_paths_for_type.append(element_path); has_ms = True + if ms_paths_for_type: + resource_info[sd_type]['ms_paths'] = sorted(list(set(ms_paths_for_type))) # Store unique sorted paths + if has_ms: + resource_info[sd_type]['ms_flag'] = True + resource_info[sd_type]['sd_processed'] = True # Mark as processed for MS + logger.debug(f"Processed SD for {sd_type}, MS found: {has_ms}") + else: + logger.warning(f"SD {member.name} defines type '{sd_type}' which wasn't found as a resource type key.") + + except Exception as e: logger.warning(f"Could not read/parse member {member.name}: {e}") + finally: + if fileobj: fileobj.close() + + # Also find XML and HTML examples + elif (member.name.startswith('package/example/') or ('example' in base_filename_lower and member.name.startswith('package/'))) \ + and (member_name_lower.endswith('.xml') or member_name_lower.endswith('.html')): + # Try to guess type (imperfect) + guessed_type = base_filename_lower.split('-', 1)[0].capitalize() + if guessed_type in resource_info: # Only add if type is known + resource_info[guessed_type]['examples'].append(member.name) + logger.debug(f"Found non-JSON example for {guessed_type}: {member.name}") + + except Exception as e: err_msg = f"Error processing package file {tgz_path}: {e}"; logger.error(err_msg, exc_info=True); results['errors'].append(err_msg) + + # Format final results + results['resource_types_info'] = sorted([{'name': rt, 'must_support': info['ms_flag']} for rt, info in resource_info.items()], key=lambda x: x['name']) + results['must_support_elements'] = {rt: info['ms_paths'] for rt, info in resource_info.items() if info['ms_paths']} + results['examples'] = {rt: sorted(info['examples']) for rt, info in resource_info.items() if info['examples']} + + # Logging final counts + final_types_count = len(results['resource_types_info']); ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']); total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()); total_examples = sum(len(v) for v in results['examples'].values()) + logger.info(f"Extracted {final_types_count} types ({ms_count} with MS; {total_ms_paths} MS paths; {total_examples} examples) from {tgz_path}") + + return results + +# --- Remove or Comment Out old/unused functions --- +# def _fetch_package_metadata(package_name, package_version): ... (REMOVED) +# def resolve_all_dependencies(initial_package_name, initial_package_version): ... (REMOVED) +# def process_ig_import(package_name, package_version): ... (OLD orchestrator - REMOVED) \ No newline at end of file diff --git a/app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html b/app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html new file mode 100644 index 0000000..1fd8d11 --- /dev/null +++ b/app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html @@ -0,0 +1,124 @@ +{# app/modules/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html #} +{% extends "base.html" %} {# Or your control panel base template "cp_base.html" #} +{% from "_form_helpers.html" import render_field %} {# Assumes you have this macro #} + +{% block content %} {# Or your specific CP content block #} +
+

Import FHIR Implementation Guide

+ +
+
+ {# --- Import Form --- #} +
+
+
Enter Package Details
+
{# Use relative endpoint for blueprint #} + {{ form.hidden_tag() }} {# CSRF token #} +
+ {{ render_field(form.package_name, class="form-control" + (" is-invalid" if form.package_name.errors else ""), placeholder="e.g., hl7.fhir.au.base") }} +
+
+ {{ render_field(form.package_version, class="form-control" + (" is-invalid" if form.package_version.errors else ""), placeholder="e.g., 4.1.0 or latest") }} +
+
+ {{ form.submit(class="btn btn-primary", value="Fetch & Download IG") }} +
+
+
+
+ + {# --- Results Section --- #} +
+ {# Display Fatal Error if passed #} + {% if fatal_error %} + + {% endif %} + + {# Display results if the results dictionary exists (meaning POST happened) #} + {% if results %} +
+
+ {# Use results.requested for package info #} +
Import Results for: {{ results.requested[0] }}#{{ results.requested[1] }}
+
+ + {# --- Process Errors --- #} + {% if results.errors %} +
Errors Encountered ({{ results.errors|length }}):
+
    + {% for error in results.errors %} +
  • {{ error }}
  • + {% endfor %} +
+ {% endif %} + + {# --- Downloaded Files --- #} +
Downloaded Packages ({{ results.downloaded|length }} / {{ results.processed|length }} processed):
+ {# Check results.downloaded dictionary #} + {% if results.downloaded %} +
    + {# Iterate through results.downloaded items #} + {% for (name, version), path in results.downloaded.items()|sort %} +
  • + {{ name }}#{{ version }} + {# Display relative path cleanly #} + {{ path | replace('/app/', '') | replace('\\', '/') }} + Downloaded {# Moved badge to end #} +
  • + {% endfor %} +
+

Files saved in server's `instance/fhir_packages` directory.

+ {% else %} +

No packages were successfully downloaded.

+ {% endif %} + + + {# --- All Dependencies Found --- #} +
Consolidated Dependencies Found:
+ {# Calculate unique_deps based on results.all_dependencies #} + {% set unique_deps = {} %} + {% for pkg_id, deps_dict in results.all_dependencies.items() %} + {% for dep_name, dep_version in deps_dict.items() %} + {% set _ = unique_deps.update({(dep_name, dep_version): true}) %} + {% endfor %} + {% endfor %} + + {% if unique_deps %} +

Unique direct dependencies found across all processed packages:

+
+ {% for (name, version), _ in unique_deps.items()|sort %} +
{{ name }}
+
{{ version }}
+ {% endfor %} +
+ {% else %} + + {% endif %} + {# --- End Dependency Result --- #} + +
{# End card-body #} +
{# End card #} + {% endif %} {# End if results #} +
{# End results #} + {# --- End Results Section --- #} + +
+
+ {# --- Instructions Panel --- #} +
+
+
Instructions
+

Enter the official FHIR package name (e.g., hl7.fhir.au.base) and the desired version (e.g., 4.1.0, latest).

+

The system will download the package and attempt to list its direct dependencies.

+

Downloads are saved to the server's `instance/fhir_packages` folder.

+
+
+
+
{# End row #} +
{# End container #} +{% endblock %} \ No newline at end of file diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..0e80644 --- /dev/null +++ b/app/models.py @@ -0,0 +1,127 @@ +# app/models.py +from app import db +from flask_login import UserMixin +from werkzeug.security import generate_password_hash, check_password_hash +from datetime import datetime +import json + +class User(UserMixin, db.Model): + id = db.Column(db.Integer, primary_key=True) + username = db.Column(db.String(64), index=True, unique=True) + email = db.Column(db.String(120), index=True, unique=True) + password_hash = db.Column(db.String(256)) + # --- ADDED ROLE COLUMN --- + role = db.Column(db.String(20), index=True, default='user', nullable=False) + + # Optional: Add a helper property for easy checking + @property + def is_admin(self): + return self.role == 'admin' + # --- END ROLE COLUMN --- + + def __repr__(self): + # You might want to include the role in the representation + return f'' + + def set_password(self, password): + self.password_hash = generate_password_hash(password) + + def check_password(self, password): + # Ensure password_hash is not None before checking + if self.password_hash is None: + return False + return check_password_hash(self.password_hash, password) + +# Add this new model +class ModuleRegistry(db.Model): + id = db.Column(db.Integer, primary_key=True) + module_id = db.Column(db.String(100), unique=True, nullable=False, index=True) # Matches folder name + is_enabled = db.Column(db.Boolean, default=False, nullable=False) + display_name = db.Column(db.String(100), nullable=True) # Optional override from metadata + description = db.Column(db.Text, nullable=True) # Optional override from metadata + version = db.Column(db.String(30), nullable=True) # Store version discovered + # Add timestamp for when it was first discovered or last updated? + # last_seen = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f"" + +# --- ProcessedIg Model (MODIFIED for Examples) --- +class ProcessedIg(db.Model): + id = db.Column(db.Integer, primary_key=True) + package_name = db.Column(db.String(150), nullable=False, index=True) + package_version = db.Column(db.String(50), nullable=False, index=True) + processed_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + status = db.Column(db.String(50), default='processed', nullable=True) + # Stores list of dicts: [{'name': 'Type', 'must_support': bool}, ...] + resource_types_info_json = db.Column(db.Text, nullable=True) + # Stores dict: {'TypeName': ['path1', 'path2'], ...} + must_support_elements_json = db.Column(db.Text, nullable=True) + # --- ADDED: Store example files found, grouped by type --- + # Structure: {'TypeName': ['example1.json', 'example1.xml'], ...} + examples_json = db.Column(db.Text, nullable=True) + # --- End Add --- + + __table_args__ = (db.UniqueConstraint('package_name', 'package_version', name='uq_processed_ig_name_version'),) + + # Property for resource_types_info + @property + def resource_types_info(self): + # ... (getter as before) ... + if self.resource_types_info_json: + try: return json.loads(self.resource_types_info_json) + except json.JSONDecodeError: return [] + return [] + + @resource_types_info.setter + def resource_types_info(self, types_info_list): + # ... (setter as before) ... + if types_info_list and isinstance(types_info_list, list): + sorted_list = sorted(types_info_list, key=lambda x: x.get('name', '')) + self.resource_types_info_json = json.dumps(sorted_list) + else: self.resource_types_info_json = None + + # Property for must_support_elements + @property + def must_support_elements(self): + # ... (getter as before) ... + if self.must_support_elements_json: + try: return json.loads(self.must_support_elements_json) + except json.JSONDecodeError: return {} + return {} + + @must_support_elements.setter + def must_support_elements(self, ms_elements_dict): + # ... (setter as before) ... + if ms_elements_dict and isinstance(ms_elements_dict, dict): + self.must_support_elements_json = json.dumps(ms_elements_dict) + else: self.must_support_elements_json = None + + # --- ADDED: Property for examples --- + @property + def examples(self): + """Returns the stored example filenames as a Python dict.""" + if self.examples_json: + try: + # Return dict {'TypeName': ['file1.json', 'file2.xml'], ...} + return json.loads(self.examples_json) + except json.JSONDecodeError: + return {} # Return empty dict on parse error + return {} + + @examples.setter + def examples(self, examples_dict): + """Stores a Python dict of example filenames as a JSON string.""" + if examples_dict and isinstance(examples_dict, dict): + # Sort filenames within each list? Optional. + # for key in examples_dict: examples_dict[key].sort() + self.examples_json = json.dumps(examples_dict) + else: + self.examples_json = None + # --- End Add --- + + def __repr__(self): + count = len(self.resource_types_info) + ms_count = sum(1 for item in self.resource_types_info if item.get('must_support')) + ex_count = sum(len(v) for v in self.examples.values()) # Count total example files + return f"" diff --git a/app/static/css/custom.css b/app/static/css/custom.css new file mode 100644 index 0000000..8422d2e --- /dev/null +++ b/app/static/css/custom.css @@ -0,0 +1,14 @@ +/* app/static/css/custom.css */ +/* Add custom styles here to override or supplement Bootstrap */ + +body { + /* Example: Add a subtle background pattern or color */ + /* background-color: #f8f9fa; */ +} + +.footer { + /* Ensure footer stays at the bottom if needed, though */ + /* d-flex flex-column min-vh-100 on body usually handles this */ +} + +/* Add more custom styles as needed */ diff --git a/app/static/js/main.js b/app/static/js/main.js new file mode 100644 index 0000000..71c1266 --- /dev/null +++ b/app/static/js/main.js @@ -0,0 +1,12 @@ +/* app/static/js/main.js */ +// Add global JavaScript functions here + +// Example: Initialize Bootstrap tooltips or popovers if used +// document.addEventListener('DOMContentLoaded', function () { +// var tooltipTriggerList = [].slice.call(document.querySelectorAll('[data-bs-toggle="tooltip"]')) +// var tooltipList = tooltipTriggerList.map(function (tooltipTriggerEl) { +// return new bootstrap.Tooltip(tooltipTriggerEl) +// }) +// }); + +console.log("Custom main.js loaded."); diff --git a/app/templates/404.html b/app/templates/404.html new file mode 100644 index 0000000..ae6523e --- /dev/null +++ b/app/templates/404.html @@ -0,0 +1,12 @@ +{% extends "base.html" %} + +{% block content %} +
+

404

+

Page Not Found

+

+ Sorry, we couldn't find the page you were looking for. +

+ Go Back Home +
+{% endblock %} diff --git a/app/templates/500.html b/app/templates/500.html new file mode 100644 index 0000000..5741e51 --- /dev/null +++ b/app/templates/500.html @@ -0,0 +1,12 @@ +{% extends "base.html" %} + +{% block content %} +
+

500

+

Internal Server Error

+

+ Sorry, something went wrong on our end. We are looking into it. +

+ Go Back Home +
+{% endblock %} diff --git a/app/templates/_form_helpers.html b/app/templates/_form_helpers.html new file mode 100644 index 0000000..113287d --- /dev/null +++ b/app/templates/_form_helpers.html @@ -0,0 +1,26 @@ +{# app/templates/_form_helpers.html #} +{% macro render_field(field, label_visible=true) %} +
{# Add margin bottom for spacing #} + {% if label_visible and field.label %} + {{ field.label(class="form-label") }} {# Render label with Bootstrap class #} + {% endif %} + + {# Add is-invalid class if errors exist #} + {% set css_class = 'form-control ' + kwargs.pop('class', '') %} + {% if field.errors %} + {% set css_class = css_class + ' is-invalid' %} + {% endif %} + + {# Render the field itself, passing any extra attributes #} + {{ field(class=css_class, **kwargs) }} + + {# Display validation errors #} + {% if field.errors %} +
+ {% for error in field.errors %} + {{ error }}
+ {% endfor %} +
+ {% endif %} +
+{% endmacro %} \ No newline at end of file diff --git a/app/templates/base.html b/app/templates/base.html new file mode 100644 index 0000000..84328ac --- /dev/null +++ b/app/templates/base.html @@ -0,0 +1,91 @@ + + + + + + + + + {% if title %}{{ title }} - {% endif %}{{ site_name }} + + + + +
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} + + {% endfor %} + {% endif %} + {% endwith %} + {% block content %}{% endblock %} +
+ +
+
+ {% set current_year = now.year %} © {{ current_year }} {{ site_name }}. All rights reserved. +
+
+ + + + + {% block scripts %} + {# Tooltip Initialization Script #} + + {% endblock %} + + \ No newline at end of file diff --git a/app/templates/cp_downloaded_igs.html b/app/templates/cp_downloaded_igs.html new file mode 100644 index 0000000..588b3a3 --- /dev/null +++ b/app/templates/cp_downloaded_igs.html @@ -0,0 +1,135 @@ +{# app/control_panel/templates/cp_downloaded_igs.html #} +{% extends "base.html" %} + +{% block content %} +
+
+

Manage FHIR Packages

+ +
+ + {% if error_message %} + + {% endif %} + + {# NOTE: The block calculating processed_ids set using {% set %} was REMOVED from here #} + + {# --- Start Two Column Layout --- #} +
+ + {# --- Left Column: Downloaded Packages (Horizontal Buttons) --- #} +
+
+
Downloaded Packages ({{ packages|length }})
+
+ {% if packages %} +
+ + +

Risk:= Duplicate Dependancy with different versions

+ + + + {% for pkg in packages %} + {% set is_processed = (pkg.name, pkg.version) in processed_ids %} + {# --- ADDED: Check for duplicate name --- #} + {% set is_duplicate = pkg.name in duplicate_names %} + {# --- ADDED: Assign row class based on duplicate group --- #} + + + + + + {% endfor %} + +
Package NameVersionActions
+ {# --- ADDED: Risk Badge for duplicates --- #} + {% if is_duplicate %} + Duplicate + {% endif %} + {# --- End Add --- #} + {{ pkg.name }} + {{ pkg.version }} {# Actions #} +
+ {% if is_processed %} + Processed + {% else %} +
+ + + +
+ {% endif %} +
+ + +
+
+
+
+ {% elif not error_message %}

No downloaded FHIR packages found.

{% endif %} +
+
+
{# --- End Left Column --- #} + + + {# --- Right Column: Processed Packages (Vertical Buttons) --- #} +
+
+
Processed Packages ({{ processed_list|length }})
+
+ {% if processed_list %} +

MS = Contains Must Support Elements

+
+ + + + + + {% for processed_ig in processed_list %} + + + + + + + {% endfor %} + +
Package NameVersionResource TypesActions
{# Tooltip for Processed At / Status #} + {% set tooltip_title_parts = [] %} + {% if processed_ig.processed_at %}{% set _ = tooltip_title_parts.append("Processed: " + processed_ig.processed_at.strftime('%Y-%m-%d %H:%M')) %}{% endif %} + {% if processed_ig.status %}{% set _ = tooltip_title_parts.append("Status: " + processed_ig.status) %}{% endif %} + {% set tooltip_text = tooltip_title_parts | join('\n') %} + {{ processed_ig.package_name }} + {{ processed_ig.package_version }} {# Resource Types Cell w/ Badges #} + {% set types_info = processed_ig.resource_types_info %} + {% if types_info %}
{% for type_info in types_info %}{% if type_info.must_support %}{{ type_info.name }}{% else %}{{ type_info.name }}{% endif %}{% endfor %}
{% else %}N/A{% endif %} +
+ {# Vertical Button Group #} +
+ View +
+ + +
+
+
+
+ {% elif not error_message %}

No packages recorded as processed yet.

{% endif %} +
+
+
{# --- End Right Column --- #} + +
{# --- End Row --- #} + +
{# End container #} +{% endblock %} + +{# Tooltip JS Initializer should be in base.html #} +{% block scripts %}{{ super() }}{% endblock %} \ No newline at end of file diff --git a/app/templates/cp_view_processed_ig.html b/app/templates/cp_view_processed_ig.html new file mode 100644 index 0000000..b61a324 --- /dev/null +++ b/app/templates/cp_view_processed_ig.html @@ -0,0 +1,405 @@ +{% extends "base.html" %} + +{% from "_form_helpers.html" import render_field %} + +{% block content %} +
+
+

{{ title }}

+ Back to Package List +
+ + {% if processed_ig %} +
+
Package Details
+
+
+
Package Name
+
{{ processed_ig.package_name }}
+
Package Version
+
{{ processed_ig.package_version }}
+
Processed At
+
{{ processed_ig.processed_at.strftime('%Y-%m-%d %H:%M:%S UTC') if processed_ig.processed_at else 'N/A' }}
+
Processing Status
+
+ {{ processed_ig.status or 'N/A' }} +
+
+
+
+ +
+
Resource Types Found / Defined
+
+ {% if profile_list or base_list %} +

MS = Contains Must Support Elements

+ {% if profile_list %} +
Profiles Defined ({{ profile_list|length }}):
+ + {% endif %} + {% if base_list %} +

Examples = - Examples will be displayed when selecting Base Types if contained in the IG

+
Base Resource Types Referenced ({{ base_list|length }}):
+ + {% endif %} + {% else %} +

No resource type information extracted or stored.

+ {% endif %} +
+
+ + + + + + + + {% else %} + + {% endif %} +
+ +{% block scripts %} +{{ super() }} + + +{% endblock scripts %} +{% endblock content %} {# Main content block END #} \ No newline at end of file diff --git a/app/templates/index.html b/app/templates/index.html new file mode 100644 index 0000000..c0b0407 --- /dev/null +++ b/app/templates/index.html @@ -0,0 +1,15 @@ +{% extends "base.html" %} {% block content %}
+ PAS Logo Placeholder + +

Welcome to FLARE FHIR IG TOOLKIT

+
+

+ This is the starting point for a Journey into FHIR IG's from an implementors view!. +

+ +
+
+{% endblock %} diff --git a/config.py b/config.py new file mode 100644 index 0000000..77d1543 --- /dev/null +++ b/config.py @@ -0,0 +1,52 @@ +# config.py +# Basic configuration settings for the Flask application + +import os + +# Determine the base directory of the application (where config.py lives) +basedir = os.path.abspath(os.path.dirname(__file__)) + +# Define the instance path relative to the base directory +# This seems correct if instance folder is at the same level as config.py/run.py +instance_path = os.path.join(basedir, 'instance') + +class Config: + """Base configuration class.""" + SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess' + + # Database configuration (Development/Production) + # Points to 'instance/app.db' relative to config.py location + SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ + 'sqlite:///' + os.path.join(instance_path, 'app.db') + SQLALCHEMY_TRACK_MODIFICATIONS = False # Disable modification tracking + + # Add other global configuration variables here + SITE_NAME = "Modular PAS Framework" + # Add any other default settings your app needs + + +# --- ADDED Testing Configuration --- +class TestingConfig(Config): + """Configuration specific to testing.""" + TESTING = True + + # Use a separate database file for tests inside the instance folder + # Ensures tests don't interfere with development data + SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(instance_path, 'test.db') + + # Disable CSRF protection during tests for simplicity + WTF_CSRF_ENABLED = False + + # Ensure Flask-Login works normally during tests (not disabled) + LOGIN_DISABLED = False + + # Use a fixed, predictable secret key for testing sessions + SECRET_KEY = 'testing-secret-key' + + # Inside class TestingConfig(Config): + SERVER_NAME = 'localhost.test' # Or just 'localhost' is usually fine + +# --- You could add other configurations like ProductionConfig(Config) later --- +# class ProductionConfig(Config): +# SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') # Should be set in prod env +# # etc... \ No newline at end of file diff --git a/instance/app.db b/instance/app.db new file mode 100644 index 0000000..907fa84 Binary files /dev/null and b/instance/app.db differ diff --git a/instance/test.db b/instance/test.db new file mode 100644 index 0000000..4112158 Binary files /dev/null and b/instance/test.db differ diff --git a/migrations/README b/migrations/README new file mode 100644 index 0000000..bc8f3dd --- /dev/null +++ b/migrations/README @@ -0,0 +1 @@ +Single-database configuration for Flask. diff --git a/migrations/alembic.ini b/migrations/alembic.ini new file mode 100644 index 0000000..e7c72d1 --- /dev/null +++ b/migrations/alembic.ini @@ -0,0 +1,50 @@ +# A generic, single database configuration. + +[alembic] +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic,flask_migrate + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[logger_flask_migrate] +level = INFO +handlers = +qualname = flask_migrate + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..2e4a5fa --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,113 @@ +import logging +from logging.config import fileConfig + +from flask import current_app + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) +logger = logging.getLogger('alembic.env') + + +def get_engine(): + try: + # this works with Flask-SQLAlchemy<3 and Alchemical + return current_app.extensions['migrate'].db.get_engine() + except (TypeError, AttributeError): + # this works with Flask-SQLAlchemy>=3 + return current_app.extensions['migrate'].db.engine + + +def get_engine_url(): + try: + return get_engine().url.render_as_string(hide_password=False).replace( + '%', '%%') + except AttributeError: + return str(get_engine().url).replace('%', '%%') + + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +config.set_main_option('sqlalchemy.url', get_engine_url()) +target_db = current_app.extensions['migrate'].db + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def get_metadata(): + if hasattr(target_db, 'metadatas'): + return target_db.metadatas[None] + return target_db.metadata + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, target_metadata=get_metadata(), literal_binds=True + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + # this callback is used to prevent an auto-migration from being generated + # when there are no changes to the schema + # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html + def process_revision_directives(context, revision, directives): + if getattr(config.cmd_opts, 'autogenerate', False): + script = directives[0] + if script.upgrade_ops.is_empty(): + directives[:] = [] + logger.info('No changes in schema detected.') + + conf_args = current_app.extensions['migrate'].configure_args + if conf_args.get("process_revision_directives") is None: + conf_args["process_revision_directives"] = process_revision_directives + + connectable = get_engine() + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=get_metadata(), + **conf_args + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..c95e0ea --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/migrations/versions/5e6021b572ee_add_must_support_elements_json_to_.py b/migrations/versions/5e6021b572ee_add_must_support_elements_json_to_.py new file mode 100644 index 0000000..b244cc4 --- /dev/null +++ b/migrations/versions/5e6021b572ee_add_must_support_elements_json_to_.py @@ -0,0 +1,32 @@ +"""Add must_support_elements_json to ProcessedIg + +Revision ID: 5e6021b572ee +Revises: 8809253da459 +Create Date: 2025-04-08 11:41:38.532125 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '5e6021b572ee' +down_revision = '8809253da459' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.add_column(sa.Column('must_support_elements_json', sa.Text(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.drop_column('must_support_elements_json') + + # ### end Alembic commands ### diff --git a/migrations/versions/7d0cdff4c7ad_initial_migration_with_user_.py b/migrations/versions/7d0cdff4c7ad_initial_migration_with_user_.py new file mode 100644 index 0000000..f2a1cf2 --- /dev/null +++ b/migrations/versions/7d0cdff4c7ad_initial_migration_with_user_.py @@ -0,0 +1,80 @@ +"""Initial migration with User, ModuleRegistry, ProcessedIg + +Revision ID: 7d0cdff4c7ad +Revises: +Create Date: 2025-04-08 08:35:33.204706 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '7d0cdff4c7ad' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('module_registry', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('module_id', sa.String(length=100), nullable=False), + sa.Column('is_enabled', sa.Boolean(), nullable=False), + sa.Column('display_name', sa.String(length=100), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('version', sa.String(length=30), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('module_registry', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_module_registry_module_id'), ['module_id'], unique=True) + + op.create_table('processed_ig', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('package_name', sa.String(length=150), nullable=False), + sa.Column('package_version', sa.String(length=50), nullable=False), + sa.Column('processed_at', sa.DateTime(), nullable=False), + sa.Column('status', sa.String(length=50), nullable=True), + sa.Column('resource_types_json', sa.Text(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('package_name', 'package_version', name='uq_processed_ig_name_version') + ) + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_processed_ig_package_name'), ['package_name'], unique=False) + batch_op.create_index(batch_op.f('ix_processed_ig_package_version'), ['package_version'], unique=False) + + op.create_table('user', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('username', sa.String(length=64), nullable=True), + sa.Column('email', sa.String(length=120), nullable=True), + sa.Column('password_hash', sa.String(length=256), nullable=True), + sa.Column('role', sa.String(length=20), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('user', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_user_email'), ['email'], unique=True) + batch_op.create_index(batch_op.f('ix_user_role'), ['role'], unique=False) + batch_op.create_index(batch_op.f('ix_user_username'), ['username'], unique=True) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('user', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_user_username')) + batch_op.drop_index(batch_op.f('ix_user_role')) + batch_op.drop_index(batch_op.f('ix_user_email')) + + op.drop_table('user') + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_processed_ig_package_version')) + batch_op.drop_index(batch_op.f('ix_processed_ig_package_name')) + + op.drop_table('processed_ig') + with op.batch_alter_table('module_registry', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_module_registry_module_id')) + + op.drop_table('module_registry') + # ### end Alembic commands ### diff --git a/migrations/versions/8809253da459_rename_resource_types_json_to_resource_.py b/migrations/versions/8809253da459_rename_resource_types_json_to_resource_.py new file mode 100644 index 0000000..476d4e5 --- /dev/null +++ b/migrations/versions/8809253da459_rename_resource_types_json_to_resource_.py @@ -0,0 +1,34 @@ +"""Rename resource_types_json to resource_types_info_json in ProcessedIg + +Revision ID: 8809253da459 +Revises: 7d0cdff4c7ad +Create Date: 2025-04-08 09:20:55.669990 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '8809253da459' +down_revision = '7d0cdff4c7ad' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.add_column(sa.Column('resource_types_info_json', sa.Text(), nullable=True)) + batch_op.drop_column('resource_types_json') + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.add_column(sa.Column('resource_types_json', sa.TEXT(), nullable=True)) + batch_op.drop_column('resource_types_info_json') + + # ### end Alembic commands ### diff --git a/migrations/versions/d8f620f74fbe_add_examples_json_to_processedig.py b/migrations/versions/d8f620f74fbe_add_examples_json_to_processedig.py new file mode 100644 index 0000000..de42794 --- /dev/null +++ b/migrations/versions/d8f620f74fbe_add_examples_json_to_processedig.py @@ -0,0 +1,32 @@ +"""Add examples_json to ProcessedIg + +Revision ID: d8f620f74fbe +Revises: 5e6021b572ee +Create Date: 2025-04-08 12:45:21.475913 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'd8f620f74fbe' +down_revision = '5e6021b572ee' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.add_column(sa.Column('examples_json', sa.Text(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('processed_ig', schema=None) as batch_op: + batch_op.drop_column('examples_json') + + # ### end Alembic commands ### diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..18b3697 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,9 @@ +[pytest] +# Look for tests in the tests directory +testpaths = tests +# Set environment variables for testing +# Ensures Flask uses the testing config and finds the app +#env = +# # Use D: prefix for default values if not set externally +# D:FLASK_APP=run.py # Or app:create_app - CHECK THIS +# D:FLASK_ENV=testing \ No newline at end of file diff --git a/rebuildDB.md b/rebuildDB.md new file mode 100644 index 0000000..a4082a9 --- /dev/null +++ b/rebuildDB.md @@ -0,0 +1,82 @@ +tep 2: Stop Container(s) + +Bash + +docker stop +# or if using docker compose: +# docker compose down +Step 3: Delete Local Database and Migrations + +On your local Windows machine: + +Delete the database file: C:\GIT\SudoPas_demo\instance\app.db +Delete the test database file (if it exists): C:\GIT\SudoPas_demo\instance\test.db +Delete the entire migrations folder: C:\GIT\SudoPas_demo\migrations\ +Step 4: Start a Temporary Container + +Bash + +docker compose up -d +# OR if not using compose: +# docker run ... your-image-name ... +(Get the new container ID/name) + +Step 5: Initialize Migrations Inside Container + +Bash + +docker exec -w /app flask db init +Step 6: Copy New migrations Folder Out to Local + +Run this in your local PowerShell or Command Prompt: + +PowerShell + +docker cp :/app/migrations C:\GIT\SudoPas_demo\migrations +Verify the migrations folder now exists locally again, containing alembic.ini, env.py, etc., but the versions subfolder should be empty. + +Step 7: Stop Temporary Container + +Bash + +docker stop +# or if using docker compose: +# docker compose down +Step 8: Rebuild Docker Image + +Crucially, rebuild the image to include the latest models.py and the new empty migrations folder: + +Bash + +docker compose build +# OR +# docker build -t your-image-name . +Step 9: Start Final Container + +Bash + +docker compose up -d +# OR +# docker run ... your-image-name ... +(Get the final container ID/name) + +Step 10: Create Initial Migration Script + +Now, generate the first migration script based on your current models: + +Bash + +docker exec -w /app flask db migrate -m "Initial migration with User, ModuleRegistry, ProcessedIg" +Check that a new script appeared in your local migrations/versions/ folder. + +Step 11: Apply Migration (Create DB & Tables) + +Run upgrade to create the new app.db and apply the initial schema: + +Bash + +docker exec -w /app flask db upgrade +After this, you should have a completely fresh database matching your latest models. You will need to: + +Create your admin user again via signup or a dedicated command (if you create one). +Re-import any FHIR IGs via the UI if you want data to test with. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5ee6edb --- /dev/null +++ b/requirements.txt @@ -0,0 +1,17 @@ +# requirements.txt +# List of Python packages required for the project + +Flask +Flask-SQLAlchemy +Flask-Migrate +python-dotenv +Flask-WTF +email-validator +requests>=2.20 # Or later version +Flask-Login # <-- ADD THIS LINE +pytest>=7.0 # Or a specific later version +# Optional, but helpful for Flask fixtures later: +# pytest-flask>=1.0 + + +# We will add Flask-SQLAlchemy later when we set up the database diff --git a/run.py b/run.py new file mode 100644 index 0000000..781ccb1 --- /dev/null +++ b/run.py @@ -0,0 +1,45 @@ +# run.py +# Main entry point to start the Flask development server. + +import os +from dotenv import load_dotenv + +# Load environment variables from .env file, if it exists +# Useful for storing sensitive info like SECRET_KEY or DATABASE_URL locally +dotenv_path = os.path.join(os.path.dirname(__file__), '.env') +if os.path.exists(dotenv_path): + load_dotenv(dotenv_path) + print("Loaded environment variables from .env file.") +else: + print(".env file not found, using default config or environment variables.") + + +# Import the application factory function (from app/__init__.py, which we'll create content for next) +# We assume the 'app' package exists with an __init__.py containing create_app() +try: + from app import create_app +except ImportError as e: + # Provide a helpful message if the app structure isn't ready yet + print(f"Error importing create_app: {e}") + print("Please ensure the 'app' directory and 'app/__init__.py' exist and define the create_app function.") + # Exit or raise the error depending on desired behavior during setup + raise + +# Create the application instance using the factory +# This allows for different configurations (e.g., testing) if needed later +# We pass the configuration object from config.py +# from config import Config # Assuming Config class is defined in config.py +# flask_app = create_app(Config) +# Simpler approach if create_app handles config loading internally: +flask_app = create_app() + + +if __name__ == '__main__': + # Run the Flask development server + # debug=True enables auto-reloading and detailed error pages (DO NOT use in production) + # host='0.0.0.0' makes the server accessible on your local network + print("Starting Flask development server...") + # Port can be configured via environment variable or default to 5000 + port = int(os.environ.get('PORT', 5000)) + flask_app.run(host='0.0.0.0', port=port, debug=True) + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9bab21f --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,110 @@ +# tests/conftest.py + +import pytest +import os +from app import create_app, db, discover_and_register_modules # Keep import +from config import TestingConfig +from app.models import User +from tests.test_control_panel import create_test_user, login # Or move helpers to a shared file + + +# Determine the instance path for removing the test DB later +instance_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'instance') +TEST_DB_PATH = os.path.join(instance_path, 'test.db') + +# --- Use 'function' scope for better isolation --- +@pytest.fixture(scope='function') +def app(): + """ + Function-scoped test Flask application. Configured for testing. + Handles creation and teardown of the test database FOR EACH TEST. + Initializes modules AFTER database creation. + """ + # Create app with TestingConfig, skip module init initially + app = create_app(TestingConfig, init_modules=False) + + # Establish an application context before accessing db + with app.app_context(): + print(f"\n--- Setting up test database for function at {app.config['SQLALCHEMY_DATABASE_URI']} ---") + # Ensure instance folder exists + try: + os.makedirs(app.instance_path) + except OSError: + pass # Already exists + + # Remove old test database file if it exists (paranoid check) + if os.path.exists(TEST_DB_PATH): + # print(f"--- Removing old test database file: {TEST_DB_PATH} ---") # Can be noisy + os.remove(TEST_DB_PATH) + + # Create tables based on models + db.create_all() + print("--- Test database tables created ---") # Keep this print for confirmation + + # --- FIX: Run discovery AFTER DB setup --- + print("--- Initializing modules after DB setup ---") # Keep this print + discover_and_register_modules(app) # <-- UNCOMMENTED / ADDED THIS LINE + # --- End Module Discovery --- + + # Yield the app instance for use in the single test function + yield app + + # --- Teardown (runs after each test function) --- + # print("\n--- Tearing down test database for function ---") # Can be noisy + db.session.remove() + db.drop_all() + # Optional: Remove the test database file after test run + # if os.path.exists(TEST_DB_PATH): + # os.remove(TEST_DB_PATH) + +# --- Use 'function' scope --- +@pytest.fixture(scope='function') +def client(app): + """ + Provides a Flask test client for the function-scoped app. + """ + return app.test_client() + +# --- Use 'function' scope --- +@pytest.fixture(scope='function') +def runner(app): + """ + Provides a Flask test CLI runner for the function-scoped app. + """ + return app.test_cli_runner() + +# --- ADDED Fixture for Logged-In Admin Client --- +@pytest.fixture(scope='function') +def admin_client(client, app): + """ + Provides a test client already logged in as a pre-created admin user. + Uses the function-scoped 'client' and 'app' fixtures. + """ + admin_username = "fixture_admin" + admin_email = "fixture_admin@example.com" # Unique email + admin_password = "password" + + # Create admin user within the app context provided by the 'app' fixture + with app.app_context(): + create_test_user( + username=admin_username, + email=admin_email, + password=admin_password, + role="admin" + ) + + # Log the admin user in using the 'client' fixture + login_res = login(client, admin_username, admin_password) + # Basic check to ensure login likely succeeded (redirect expected) + if login_res.status_code != 302: + pytest.fail("Admin login failed during fixture setup.") + + # Yield the already-logged-in client for the test + yield client + + # Teardown (logout) is optional as function scope cleans up, + # but can be added for explicit cleanup if needed. + # client.get(url_for('auth.logout')) + +# --- Potential Future Fixtures --- +# (Keep commented out potential session fixture) \ No newline at end of file diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..3d1c20b --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,165 @@ +# tests/test_auth.py + +import pytest +from flask import url_for, session, request # Keep request import +from app.models import User +from app import db +from urllib.parse import urlparse, parse_qs # Keep URL parsing tools + +# --- Helper to create a user --- +# (Using the version that defaults email based on username) +def create_test_user(username="testuser", email=None, password="password", role="user"): + """Helper function to add a user to the test database.""" + if email is None: + email = f"{username}@example.test" # Default email based on username + # Check if user already exists by username or email + user = User.query.filter((User.username == username) | (User.email == email)).first() + if user: + print(f"\nDEBUG: Found existing test user '{user.username}' with ID {user.id}") + return user + user = User(username=username, email=email, role=role) + user.set_password(password) + db.session.add(user) + db.session.commit() + print(f"\nDEBUG: Created test user '{username}' (Role: {role}) with ID {user.id}") + return user + +# --- Tests --- + +def test_login_page_loads(client): + """Test that the login page loads correctly.""" + response = client.get(url_for('auth.login')) + assert response.status_code == 200 + assert b"Login" in response.data + assert b"Username" in response.data + assert b"Password" in response.data + +def test_successful_login_as_admin(client, app): + """Test logging in with correct ADMIN credentials.""" + with app.app_context(): + admin_user = create_test_user( + username="test_admin", + email="admin@example.test", + password="password", + role="admin" + ) + assert admin_user.id is not None + assert admin_user.role == 'admin' + + response = client.post(url_for('auth.login'), data={ + 'username': 'test_admin', + 'password': 'password' + }, follow_redirects=True) + + assert response.status_code == 200 + assert b"Logout" in response.data + assert bytes(f"{admin_user.username}", 'utf-8') in response.data + assert b"Control Panel" in response.data + # --- CORRECTED ASSERTION --- + assert b"Manage Modules" in response.data # Check for the button/link text + + +def test_login_wrong_password(client, app): + """Test logging in with incorrect password.""" + with app.app_context(): + create_test_user(username="wrong_pass_user", password="password") + response = client.post(url_for('auth.login'), data={ + 'username': 'wrong_pass_user', + 'password': 'wrongpassword' + }, follow_redirects=True) + assert response.status_code == 200 + assert b"Invalid username or password" in response.data + assert b"Logout" not in response.data + +def test_login_wrong_username(client): + """Test logging in with non-existent username.""" + response = client.post(url_for('auth.login'), data={ + 'username': 'nosuchuser', + 'password': 'password' + }, follow_redirects=True) + assert response.status_code == 200 + assert b"Invalid username or password" in response.data + assert b"Logout" not in response.data + +def test_successful_login_as_user(client, app): + """Test logging in with correct USER credentials.""" + with app.app_context(): + test_user = create_test_user( + username="test_user", + email="user@example.test", + password="password", + role="user" + ) + assert test_user.id is not None + assert test_user.role == 'user' + response = client.post(url_for('auth.login'), data={ + 'username': 'test_user', + 'password': 'password' + }, follow_redirects=True) + assert response.status_code == 200 + assert b"Logout" in response.data + assert bytes(f"{test_user.username}", 'utf-8') in response.data + assert b"Control Panel" not in response.data + site_name = app.config.get('SITE_NAME', 'PAS Framework') + assert bytes(site_name, 'utf-8') in response.data + + +# --- Replace the existing test_logout function with this: --- +def test_logout(client, app): + """Test logging out.""" + with app.app_context(): + user = create_test_user(username='logout_user', password='password') + login_res = client.post(url_for('auth.login'), data={'username': 'logout_user', 'password': 'password'}) + assert login_res.status_code == 302 + + logout_response = client.get(url_for('auth.logout'), follow_redirects=True) + assert logout_response.status_code == 200 + assert b"You have been logged out." in logout_response.data + assert b"Login" in logout_response.data + assert b"Logout" not in logout_response.data + + # Assert: Accessing protected page redirects to login + protected_response = client.get(url_for('control_panel.index'), follow_redirects=False) + assert protected_response.status_code == 302 + + # --- Use Manual Path Comparison --- + redirect_location = protected_response.headers.get('Location', '') + parsed_location = urlparse(redirect_location) + query_params = parse_qs(parsed_location.query) + + # Manually define the expected RELATIVE paths + expected_login_path_manual = '/auth/login' + expected_next_path_manual = '/control-panel/' # Includes trailing slash from previous logs + + # Compare the path from the header with the known relative string + assert parsed_location.path == expected_login_path_manual + + # Check the 'next' parameter + assert 'next' in query_params + assert query_params['next'][0] == expected_next_path_manual + + +# --- Replace the existing test_login_required_redirect function with this: --- +def test_login_required_redirect(client, app): + """Test that accessing a protected page redirects to login when logged out.""" + # Act: Attempt to access control panel index + response = client.get(url_for('control_panel.index'), follow_redirects=False) + + # Assert: Check for redirect status code (302) + assert response.status_code == 302 + + # --- Use Manual Path Comparison --- + redirect_location = response.headers.get('Location', '') + parsed_location = urlparse(redirect_location) + query_params = parse_qs(parsed_location.query) + + # Manually define the expected RELATIVE paths + expected_login_path_manual = '/auth/login' + expected_next_path_manual = '/control-panel/' # Includes trailing slash + + # Compare the path from the header with the known relative string + assert parsed_location.path == expected_login_path_manual + + # Check the 'next' query parameter exists and has the correct value + assert 'next' in query_params + assert query_params['next'][0] == expected_next_path_manual \ No newline at end of file diff --git a/tests/test_control_panel.py b/tests/test_control_panel.py new file mode 100644 index 0000000..db07e93 --- /dev/null +++ b/tests/test_control_panel.py @@ -0,0 +1,222 @@ +# tests/test_control_panel.py + +import pytest +from flask import url_for +from app.models import User, ModuleRegistry # Import models +from app import db +from urllib.parse import urlparse, parse_qs # Make sure this is imported + +# --- Test Helpers --- +def create_test_user(username="testuser", email=None, password="password", role="user"): + if email is None: email = f"{username}@example.test" + # Use SQLAlchemy 2.0 style query + user = db.session.scalar(db.select(User).filter((User.username == username) | (User.email == email))) + if user: print(f"\nDEBUG: Found existing test user '{user.username}' with ID {user.id}"); return user + user = User(username=username, email=email, role=role); user.set_password(password) + db.session.add(user); db.session.commit() + print(f"\nDEBUG: Created test user '{username}' (Role: {role}) with ID {user.id}"); return user + +def login(client, username, password): + return client.post(url_for('auth.login'), data=dict(username=username, password=password), follow_redirects=False) + +# --- Access Control Tests --- +def test_cp_access_admin(client, app): # PASSED + with app.app_context(): create_test_user(username="cp_admin", password="password", role="admin") + login_res = login(client, "cp_admin", "password"); assert login_res.status_code == 302 + cp_index_res = client.get(url_for('control_panel.index')); assert cp_index_res.status_code == 200; assert b"Control Panel" in cp_index_res.data + module_res = client.get(url_for('control_panel.manage_modules')); assert module_res.status_code == 200; assert b"Module Management" in module_res.data + +def test_cp_access_user(client, app): # PASSED + with app.app_context(): create_test_user(username="cp_user", password="password", role="user") + login_res = login(client, "cp_user", "password"); assert login_res.status_code == 302 + cp_index_res = client.get(url_for('control_panel.index')); assert cp_index_res.status_code == 403 + module_res = client.get(url_for('control_panel.manage_modules')); assert module_res.status_code == 403 + +def test_cp_access_logged_out(client, app): # PASSED + cp_index_res = client.get(url_for('control_panel.index'), follow_redirects=False); assert cp_index_res.status_code == 302 + module_res = client.get(url_for('control_panel.manage_modules'), follow_redirects=False); assert module_res.status_code == 302 + + +# --- Module Management Tests --- +def test_module_manager_list(client, app): # PASSED + with app.app_context(): create_test_user(username="module_admin", password="password", role="admin") + login(client, "module_admin", "password") + response = client.get(url_for('control_panel.manage_modules')) + assert response.status_code == 200; assert b"Module Management" in response.data + assert b"example_module" in response.data; assert b"Disabled" in response.data + +def test_module_manager_toggle(client, app): # PASSED + with app.app_context(): + admin = create_test_user(username="toggle_admin", password="password", role="admin") + # Use SQLAlchemy 2.0 style query + module_entry = db.session.scalar(db.select(ModuleRegistry).filter_by(module_id='example_module')) + assert module_entry is not None, "Check conftest.py discovery call." + module_entry.is_enabled = False; db.session.commit() + login(client, "toggle_admin", "password") + # Enable + enable_url = url_for('control_panel.toggle_module_status', module_id='example_module') + response_enable = client.post(enable_url, follow_redirects=False) + assert response_enable.status_code == 302; redirect_location_enable = response_enable.headers.get('Location', ''); parsed_location_enable = urlparse(redirect_location_enable); expected_path_manual = '/control-panel/modules'; assert parsed_location_enable.path == expected_path_manual + with client.session_transaction() as sess: assert '_flashes' in sess; assert "has been enabled" in sess['_flashes'][-1][1] + with app.app_context(): module_entry_after_enable = db.session.scalar(db.select(ModuleRegistry).filter_by(module_id='example_module')); assert module_entry_after_enable is not None and module_entry_after_enable.is_enabled is True + # Disable + disable_url = url_for('control_panel.toggle_module_status', module_id='example_module') + response_disable = client.post(disable_url, follow_redirects=False) + assert response_disable.status_code == 302; redirect_location_disable = response_disable.headers.get('Location', ''); parsed_location_disable = urlparse(redirect_location_disable); assert parsed_location_disable.path == expected_path_manual + with client.session_transaction() as sess: assert '_flashes' in sess; assert "has been disabled" in sess['_flashes'][-1][1] + with app.app_context(): module_entry_after_disable = db.session.scalar(db.select(ModuleRegistry).filter_by(module_id='example_module')); assert module_entry_after_disable is not None and module_entry_after_disable.is_enabled is False + + +# --- User CRUD Tests --- + +def test_add_user_page_loads(client, app): # PASSED + with app.app_context(): create_test_user(username="crud_admin", password="password", role="admin") + login(client, "crud_admin", "password") + response = client.get(url_for('control_panel.add_user')) + assert response.status_code == 200; assert b"Add New User" in response.data; assert b"Username" in response.data + assert b"Email" in response.data; assert b"Password" in response.data; assert b"Repeat Password" in response.data + assert b"Role" in response.data; assert b"Add User" in response.data + +def test_add_user_success(client, app): # PASSED + with app.app_context(): create_test_user(username="crud_admin_adder", password="password", role="admin") + login(client, "crud_admin_adder", "password") + new_username = "new_test_user"; new_email = "new@example.com"; new_password = "new_password"; new_role = "user" + response = client.post(url_for('control_panel.add_user'), data={'username': new_username, 'email': new_email, 'password': new_password,'password2': new_password,'role': new_role,'submit': 'Add User'}, follow_redirects=True) + assert response.status_code == 200; assert b"User new_test_user (user) added successfully!" in response.data + assert bytes(new_username, 'utf-8') in response.data; assert bytes(new_email, 'utf-8') in response.data + with app.app_context(): + newly_added_user = db.session.scalar(db.select(User).filter_by(username=new_username)) # Use 2.0 style + assert newly_added_user is not None; assert newly_added_user.email == new_email; assert newly_added_user.role == new_role; assert newly_added_user.check_password(new_password) is True + +# --- Edit User Tests --- + +# --- MODIFIED: Use admin_client fixture --- +def test_edit_user_page_loads(admin_client, app): # Use admin_client instead of client + """Test the 'Edit User' page loads correctly with user data.""" + # Arrange: Create ONLY the target user + with app.app_context(): + target_user = create_test_user(username="edit_target", email="edit@target.com", role="user") + target_user_id = target_user.id # Get ID + target_user_username = target_user.username # Store username if needed for assert + target_user_email = target_user.email + target_user_role = target_user.role + + # Act: Get the 'Edit User' page using the logged-in admin client + # Pass target user's ID + response = admin_client.get(url_for('control_panel.edit_user', user_id=target_user_id)) + + # Assert: Check page loads and contains correct pre-filled data + assert response.status_code == 200 + assert bytes(f"Edit User", 'utf-8') in response.data # Use simpler title from route + # Check if form fields (rendered by template using data from route) are present + # These assertions rely on how edit_user.html renders the form passed by the route + assert bytes(f'value="{target_user_username}"', 'utf-8') in response.data + assert bytes(f'value="{target_user_email}"', 'utf-8') in response.data + assert bytes(f'