From cf055a67a80995c5b3950bf7afee70d3e2966f87 Mon Sep 17 00:00:00 2001 From: Sudo-JHare Date: Thu, 10 Apr 2025 15:15:35 +1000 Subject: [PATCH] rebase --- Database setup.md | 45 -- Dockerfile | 37 +- app.py | 162 +++++++ app/__init__.py | 33 -- app/core/__init__.py | 14 - app/core/routes.py | 18 - app/decorators.py | 19 - app/fhir_ig_importer/__init__.py | 28 -- app/fhir_ig_importer/forms.py | 19 - app/fhir_ig_importer/routes.py | 162 ------- app/fhir_ig_importer/services.py | 345 --------------- .../services.py.MERGE NAMES.py | 364 ---------------- app/fhir_ig_importer/services.py.old | 315 -------------- .../fhir_ig_importer/import_ig_page.html | 124 ------ app/models.py | 86 ---- app/static/css/custom.css | 14 - app/static/js/main.js | 12 - app/templates/404.html | 12 - app/templates/500.html | 12 - app/templates/_form_helpers.html | 26 -- app/templates/base.html | 91 ---- app/templates/downloaded_igs.html | 135 ------ app/templates/index.html | 15 - app/templates/view_processed_ig.html | 405 ------------------ config.py | 52 --- instance/app.db | Bin 167936 -> 0 bytes instance/test.db | Bin 45056 -> 0 bytes pytest.ini | 9 - rebuildDB.md | 82 ---- requirements.txt | 20 +- run.py | 45 -- tests/__init__.py | 0 tests/conftest.py | 110 ----- tests/test_auth.py | 165 ------- tests/test_control_panel.py | 222 ---------- tests/test_core.py | 31 -- 36 files changed, 185 insertions(+), 3044 deletions(-) delete mode 100644 Database setup.md create mode 100644 app.py delete mode 100644 app/__init__.py delete mode 100644 app/core/__init__.py delete mode 100644 app/core/routes.py delete mode 100644 app/decorators.py delete mode 100644 app/fhir_ig_importer/__init__.py delete mode 100644 app/fhir_ig_importer/forms.py delete mode 100644 app/fhir_ig_importer/routes.py delete mode 100644 app/fhir_ig_importer/services.py delete mode 100644 app/fhir_ig_importer/services.py.MERGE NAMES.py delete mode 100644 app/fhir_ig_importer/services.py.old delete mode 100644 app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html delete mode 100644 app/models.py delete mode 100644 app/static/css/custom.css delete mode 100644 app/static/js/main.js delete mode 100644 app/templates/404.html delete mode 100644 app/templates/500.html delete mode 100644 app/templates/_form_helpers.html delete mode 100644 app/templates/base.html delete mode 100644 app/templates/downloaded_igs.html delete mode 100644 app/templates/index.html delete mode 100644 app/templates/view_processed_ig.html delete mode 100644 config.py delete mode 100644 instance/app.db delete mode 100644 instance/test.db delete mode 100644 pytest.ini delete mode 100644 rebuildDB.md delete mode 100644 run.py delete mode 100644 tests/__init__.py delete mode 100644 tests/conftest.py delete mode 100644 tests/test_auth.py delete mode 100644 tests/test_control_panel.py delete mode 100644 tests/test_core.py diff --git a/Database setup.md b/Database setup.md deleted file mode 100644 index 7dbb4f7..0000000 --- a/Database setup.md +++ /dev/null @@ -1,45 +0,0 @@ -we need to create the actual database file and the user table inside it. We use Flask-Migrate for this. These commands need to be run inside the running container. - -Find your container ID (if you don't know it): - -Bash - -docker ps -(Copy the CONTAINER ID or Name for webapp-base). - -Initialize the Migration Repository (ONLY RUN ONCE EVER): This creates a migrations folder in your project (inside the container, and locally if you map volumes later, but the command runs inside). - -Bash - -docker exec flask db init -(Replace ) - -Create the First Migration Script: Flask-Migrate compares your models (app/models.py) to the (non-existent) database and generates a script to create the necessary tables. - -Bash - -docker exec flask db migrate -m "Initial migration; create user table." -(You can change the message after -m). This creates a script file inside the migrations/versions/ directory. - -Apply the Migration to the Database: This runs the script generated in the previous step, actually creating the app.db file (in /app/instance/) and the user table inside it. - -Bash - -docker exec flask db upgrade -After running these docker exec flask db ... commands, you should have a migrations folder in your project root locally (because it was created by code running inside the container using the project files mounted/copied) and an app.db file inside the /app/instance/ directory within the container. - -Your database is now set up with a user table! - - -flask db init -flask db migrate -m "Initial migration; create user table." -flask db upgrade - -flask db migrate -m "Add role column to user table" -flask db upgrade - -flask db migrate -m "Add ModuleRegistry table" -flask db upgrade - -flask db migrate -m "Add ProcessedIg table" -flask db upgrade \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 579885d..52c735a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,27 +1,30 @@ -# 1. Base Image: Use an official Python runtime as a parent image -FROM python:3.10-slim AS base +FROM python:3.9-slim -# Set environment variables -# Prevents python creating .pyc files -ENV PYTHONDONTWRITEBYTECODE=1 -# Prevents python buffering stdout/stderr -ENV PYTHONUNBUFFERED=1 - -# 2. Set Work Directory: Create and set the working directory in the container +# Set working directory WORKDIR /app -# 3. Install Dependencies: Copy only the requirements file first to leverage Docker cache +# Install system dependencies (if any needed by services.py, e.g., for FHIR processing) +RUN apt-get update && apt-get install -y --no-install-recommends \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements file and install Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -# 4. Copy Application Code: Copy the rest of your application code into the work directory -COPY . . +# Copy application files +COPY app.py . +COPY services.py . # Assuming you have this; replace with actual file if different +COPY instance/ instance/ # Pre-create instance dir if needed for SQLite/packages -# 5. Expose Port: Tell Docker the container listens on port 5000 +# Ensure instance directory exists for SQLite DB and FHIR packages +RUN mkdir -p /app/instance/fhir_packages + +# Expose Flask port EXPOSE 5000 -# 6. Run Command: Specify the command to run when the container starts -# Using "flask run --host=0.0.0.0" makes the app accessible from outside the container -# Note: FLASK_APP should be set, often via ENV or run.py structure -# Note: For development, FLASK_DEBUG=1 might be useful (e.g., ENV FLASK_DEBUG=1) +# Set environment variables +ENV FLASK_APP=app.py +ENV FLASK_ENV=development + +# Run the app CMD ["flask", "run", "--host=0.0.0.0"] \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..cfe370f --- /dev/null +++ b/app.py @@ -0,0 +1,162 @@ +from flask import Flask, render_template, request, redirect, url_for, flash +from flask_sqlalchemy import SQLAlchemy +import os +import services # Assuming your existing services module for FHIR IG handling + +app = Flask(__name__) +app.config['SECRET_KEY'] = 'your-secret-key-here' +app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///instance/fhir_ig.db' +app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False +app.config['FHIR_PACKAGES_DIR'] = os.path.join(app.instance_path, 'fhir_packages') +os.makedirs(app.config['FHIR_PACKAGES_DIR'], exist_ok=True) + +db = SQLAlchemy(app) + +# Simplified ProcessedIg model (no user-related fields) +class ProcessedIg(db.Model): + id = db.Column(db.Integer, primary_key=True) + package_name = db.Column(db.String(128), nullable=False) + version = db.Column(db.String(32), nullable=False) + processed_date = db.Column(db.DateTime, nullable=False) + resource_types_info = db.Column(db.JSON, nullable=False) # List of resource type metadata + must_support_elements = db.Column(db.JSON, nullable=True) # Dict of MS elements + examples = db.Column(db.JSON, nullable=True) # Dict of example filepaths + +# Landing page with two buttons +@app.route('/') +def index(): + return render_template_string(''' + + + + + FHIR IG Toolkit + + + +

FHIR IG Toolkit

+

Simple tool for importing and viewing FHIR Implementation Guides.

+ + + + + ''') + +# Import IG route +@app.route('/import-ig', methods=['GET', 'POST']) +def import_ig(): + if request.method == 'POST': + name = request.form.get('name') + version = request.form.get('version', 'latest') + try: + # Call your existing service to download package and dependencies + result = services.import_package_and_dependencies(name, version, app.config['FHIR_PACKAGES_DIR']) + downloaded_files = result.get('downloaded', []) + for file_path in downloaded_files: + # Process each downloaded package + package_info = services.process_package_file(file_path) + processed_ig = ProcessedIg( + package_name=package_info['name'], + version=package_info['version'], + processed_date=package_info['processed_date'], + resource_types_info=package_info['resource_types_info'], + must_support_elements=package_info.get('must_support_elements'), + examples=package_info.get('examples') + ) + db.session.add(processed_ig) + db.session.commit() + flash(f"Successfully imported {name} {version} and dependencies!", "success") + return redirect(url_for('view_igs')) + except Exception as e: + flash(f"Error importing IG: {str(e)}", "error") + return redirect(url_for('import_ig')) + return render_template_string(''' + + + + + Import FHIR IG + + + +

Import FHIR IG

+
+
+ + +
+
+ + +
+ + +
+ {% with messages = get_flashed_messages(with_categories=True) %} + {% if messages %} + {% for category, message in messages %} +

{{ message }}

+ {% endfor %} + {% endif %} + {% endwith %} + + + ''') + +# View Downloaded IGs route +@app.route('/view-igs') +def view_igs(): + igs = ProcessedIg.query.all() + return render_template_string(''' + + + + + View Downloaded IGs + + + +

Downloaded FHIR IGs

+ + + + + + + + {% for ig in igs %} + + + + + + + {% endfor %} +
Package NameVersionProcessed DateResource Types
{{ ig.package_name }}{{ ig.version }}{{ ig.processed_date }}{{ ig.resource_types_info | length }} types
+ + + + ''', igs=igs) + +# Initialize DB +with app.app_context(): + db.create_all() + +if __name__ == '__main__': + app.run(debug=True) \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py deleted file mode 100644 index e55779c..0000000 --- a/app/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -# app/__init__.py - -import datetime -import os -import importlib -import logging -from flask import Flask, render_template, Blueprint, current_app -from config import Config -from flask_sqlalchemy import SQLAlchemy -from flask_migrate import Migrate -from flask_login import LoginManager - -# Instantiate Extensions -db = SQLAlchemy() -migrate = Migrate() - -def create_app(config_class='config.DevelopmentConfig'): - app = Flask(__name__) - app.config.from_object(config_class) - - db.init_app(app) - migrate.init_app(app, db) - - from app.fhir_ig_importer import bp as fhir_ig_importer_bp - app.register_blueprint(fhir_ig_importer_bp) - - @app.route('/') - def index(): - return render_template('index.html') - - return app - -from app import models \ No newline at end of file diff --git a/app/core/__init__.py b/app/core/__init__.py deleted file mode 100644 index b6bc2d2..0000000 --- a/app/core/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# app/core/__init__.py -# Initialize the core blueprint - -from flask import Blueprint - -# Create a Blueprint instance for core routes -# 'core' is the name of the blueprint -# __name__ helps Flask locate the blueprint's resources (like templates) -# template_folder='templates' specifies a blueprint-specific template folder (optional) -bp = Blueprint('core', __name__, template_folder='templates') - -# Import the routes module associated with this blueprint -# This import is at the bottom to avoid circular dependencies -from . import routes # noqa: F401 E402 diff --git a/app/core/routes.py b/app/core/routes.py deleted file mode 100644 index 475433d..0000000 --- a/app/core/routes.py +++ /dev/null @@ -1,18 +0,0 @@ -# app/core/routes.py -# Defines routes for the core part of the application (e.g., home page) - -from flask import render_template -from . import bp # Import the blueprint instance defined in __init__.py - -# --- Core Routes --- - -@bp.route('/') -@bp.route('/index') -def index(): - """Renders the main home page of the application.""" - # This will look for 'index.html' first in the blueprint's template folder - # (if defined, e.g., 'app/core/templates/index.html') - # and then fall back to the main application's template folder ('app/templates/index.html') - return render_template('index.html', title='Home') - -# Add other core routes here (e.g., about page, contact page) if needed diff --git a/app/decorators.py b/app/decorators.py deleted file mode 100644 index 6b07869..0000000 --- a/app/decorators.py +++ /dev/null @@ -1,19 +0,0 @@ -# app/decorators.py -from functools import wraps -from flask_login import current_user -from flask import abort - -def admin_required(func): - """ - Decorator to ensure the user is logged in and has the 'admin' role. - Aborts with 403 Forbidden if conditions are not met. - """ - @wraps(func) - def decorated_view(*args, **kwargs): - # Check if user is logged in and has the admin role (using the property we added) - if not current_user.is_authenticated or not current_user.is_admin: - # If not admin, return a 403 Forbidden error - abort(403) - # If admin, proceed with the original route function - return func(*args, **kwargs) - return decorated_view \ No newline at end of file diff --git a/app/fhir_ig_importer/__init__.py b/app/fhir_ig_importer/__init__.py deleted file mode 100644 index dff3722..0000000 --- a/app/fhir_ig_importer/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -# app/modules/fhir_ig_importer/__init__.py - -from flask import Blueprint - -# --- Module Metadata --- -metadata = { - 'module_id': 'fhir_ig_importer', # Matches folder name - 'display_name': 'FHIR IG Importer', - 'description': 'Imports FHIR Implementation Guide packages from a registry.', - 'version': '0.1.0', - # No main nav items, will be accessed via Control Panel - 'nav_items': [] -} -# --- End Module Metadata --- - -# Define Blueprint -# We'll mount this under the control panel later -bp = Blueprint( - metadata['module_id'], - __name__, - template_folder='templates', - # Define a URL prefix if mounting standalone, but we'll likely register - # it under /control-panel via app/__init__.py later - # url_prefix='/fhir-importer' -) - -# Import routes after creating blueprint -from . import routes, forms # Import forms too \ No newline at end of file diff --git a/app/fhir_ig_importer/forms.py b/app/fhir_ig_importer/forms.py deleted file mode 100644 index a54f25e..0000000 --- a/app/fhir_ig_importer/forms.py +++ /dev/null @@ -1,19 +0,0 @@ -# app/modules/fhir_ig_importer/forms.py - -from flask_wtf import FlaskForm -from wtforms import StringField, SubmitField -from wtforms.validators import DataRequired, Regexp - -class IgImportForm(FlaskForm): - """Form for specifying an IG package to import.""" - # Basic validation for FHIR package names (e.g., hl7.fhir.r4.core) - package_name = StringField('Package Name (e.g., hl7.fhir.au.base)', validators=[ - DataRequired(), - Regexp(r'^[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)+$', message='Invalid package name format.') - ]) - # Basic validation for version (e.g., 4.1.0, current) - package_version = StringField('Package Version (e.g., 4.1.0 or current)', validators=[ - DataRequired(), - Regexp(r'^[a-zA-Z0-9\.\-]+$', message='Invalid version format.') - ]) - submit = SubmitField('Fetch & Download IG') \ No newline at end of file diff --git a/app/fhir_ig_importer/routes.py b/app/fhir_ig_importer/routes.py deleted file mode 100644 index 863330d..0000000 --- a/app/fhir_ig_importer/routes.py +++ /dev/null @@ -1,162 +0,0 @@ -# app/modules/fhir_ig_importer/routes.py - -import requests -import os -import tarfile # Needed for find_and_extract_sd -import gzip -import json -import io -import re -from flask import (render_template, redirect, url_for, flash, request, - current_app, jsonify, send_file) -from flask_login import login_required -from app.decorators import admin_required -from werkzeug.utils import secure_filename -from . import bp -from .forms import IgImportForm -# Import the services module -from . import services -# Import ProcessedIg model for get_structure_definition -from app.models import ProcessedIg -from app import db - - -# --- Helper: Find/Extract SD --- -# Moved from services.py to be local to routes that use it, or keep in services and call services.find_and_extract_sd -def find_and_extract_sd(tgz_path, resource_identifier): - """Helper to find and extract SD json from a given tgz path by ID, Name, or Type.""" - sd_data = None; found_path = None; logger = current_app.logger # Use current_app logger - if not tgz_path or not os.path.exists(tgz_path): logger.error(f"File not found in find_and_extract_sd: {tgz_path}"); return None, None - try: - with tarfile.open(tgz_path, "r:gz") as tar: - logger.debug(f"Searching for SD matching '{resource_identifier}' in {os.path.basename(tgz_path)}") - for member in tar: - if member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json'): - if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue - fileobj = None - try: - fileobj = tar.extractfile(member) - if fileobj: - content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) - if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition': - sd_id = data.get('id'); sd_name = data.get('name'); sd_type = data.get('type') - if resource_identifier == sd_type or resource_identifier == sd_id or resource_identifier == sd_name: - sd_data = data; found_path = member.name; logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}"); break - except Exception as e: logger.warning(f"Could not read/parse potential SD {member.name}: {e}") - finally: - if fileobj: fileobj.close() - if sd_data is None: logger.warning(f"SD matching '{resource_identifier}' not found within archive {os.path.basename(tgz_path)}") - except Exception as e: logger.error(f"Error reading archive {tgz_path} in find_and_extract_sd: {e}", exc_info=True); raise - return sd_data, found_path -# --- End Helper --- - - -# --- Route for the main import page --- -@bp.route('/import-ig', methods=['GET', 'POST']) -@login_required -@admin_required -def import_ig(): - """Handles FHIR IG recursive download using services.""" - form = IgImportForm() - template_context = {"title": "Import FHIR IG", "form": form, "results": None } - if form.validate_on_submit(): - package_name = form.package_name.data; package_version = form.package_version.data - template_context.update(package_name=package_name, package_version=package_version) - flash(f"Starting full import for {package_name}#{package_version}...", "info"); current_app.logger.info(f"Calling import service for: {package_name}#{package_version}") - try: - # Call the CORRECT orchestrator service function - import_results = services.import_package_and_dependencies(package_name, package_version) - template_context["results"] = import_results - # Flash summary messages - dl_count = len(import_results.get('downloaded', {})); proc_count = len(import_results.get('processed', set())); error_count = len(import_results.get('errors', [])) - if dl_count > 0: flash(f"Downloaded/verified {dl_count} package file(s).", "success") - if proc_count < dl_count and dl_count > 0 : flash(f"Dependency data extraction failed for {dl_count - proc_count} package(s).", "warning") - if error_count > 0: flash(f"{error_count} total error(s) occurred.", "danger") - elif dl_count == 0 and error_count == 0: flash("No packages needed downloading or initial package failed.", "info") - elif error_count == 0: flash("Import process completed successfully.", "success") - except Exception as e: - fatal_error = f"Critical unexpected error during import: {e}"; template_context["fatal_error"] = fatal_error; current_app.logger.error(f"Critical import error: {e}", exc_info=True); flash(fatal_error, "danger") - return render_template('fhir_ig_importer/import_ig_page.html', **template_context) - return render_template('fhir_ig_importer/import_ig_page.html', **template_context) - - -# --- Route to get StructureDefinition elements --- -@bp.route('/get-structure') -@login_required -@admin_required -def get_structure_definition(): - """API endpoint to fetch SD elements and pre-calculated Must Support paths.""" - package_name = request.args.get('package_name'); package_version = request.args.get('package_version'); resource_identifier = request.args.get('resource_type') - error_response_data = {"elements": [], "must_support_paths": []} - if not all([package_name, package_version, resource_identifier]): error_response_data["error"] = "Missing query parameters"; return jsonify(error_response_data), 400 - current_app.logger.info(f"Request for structure: {package_name}#{package_version} / {resource_identifier}") - - # Find the primary package file - package_dir_name = 'fhir_packages'; download_dir = os.path.join(current_app.instance_path, package_dir_name) - # Use service helper for consistency - filename = services._construct_tgz_filename(package_name, package_version) - tgz_path = os.path.join(download_dir, filename) - if not os.path.exists(tgz_path): error_response_data["error"] = f"Package file not found: {filename}"; return jsonify(error_response_data), 404 - - sd_data = None; found_path = None; error_msg = None - try: - # Call the local helper function correctly - sd_data, found_path = find_and_extract_sd(tgz_path, resource_identifier) - # Fallback check - if sd_data is None: - core_pkg_name = "hl7.fhir.r4.core"; core_pkg_version = "4.0.1" # TODO: Make dynamic - core_filename = services._construct_tgz_filename(core_pkg_name, core_pkg_version) - core_tgz_path = os.path.join(download_dir, core_filename) - if os.path.exists(core_tgz_path): - current_app.logger.info(f"Trying fallback search in {core_pkg_name}...") - sd_data, found_path = find_and_extract_sd(core_tgz_path, resource_identifier) # Call local helper - else: current_app.logger.warning(f"Core package {core_tgz_path} not found.") - except Exception as e: - error_msg = f"Error searching package(s): {e}"; current_app.logger.error(error_msg, exc_info=True); error_response_data["error"] = error_msg; return jsonify(error_response_data), 500 - - if sd_data is None: error_msg = f"SD for '{resource_identifier}' not found."; error_response_data["error"] = error_msg; return jsonify(error_response_data), 404 - - # Extract elements - elements = sd_data.get('snapshot', {}).get('element', []) - if not elements: elements = sd_data.get('differential', {}).get('element', []) - - # Fetch pre-calculated Must Support paths from DB - must_support_paths = []; - try: - stmt = db.select(ProcessedIg).filter_by(package_name=package_name, package_version=package_version); processed_ig_record = db.session.scalar(stmt) - if processed_ig_record: all_ms_paths_dict = processed_ig_record.must_support_elements; must_support_paths = all_ms_paths_dict.get(resource_identifier, []) - else: current_app.logger.warning(f"No ProcessedIg record found for {package_name}#{package_version}") - except Exception as e: current_app.logger.error(f"Error fetching MS paths from DB: {e}", exc_info=True) - - current_app.logger.info(f"Returning {len(elements)} elements for {resource_identifier} from {found_path or 'Unknown File'}") - return jsonify({"elements": elements, "must_support_paths": must_support_paths}) - - -# --- Route to get raw example file content --- -@bp.route('/get-example') -@login_required -@admin_required -def get_example_content(): - # ... (Function remains the same as response #147) ... - package_name = request.args.get('package_name'); package_version = request.args.get('package_version'); example_member_path = request.args.get('filename') - if not all([package_name, package_version, example_member_path]): return jsonify({"error": "Missing query parameters"}), 400 - current_app.logger.info(f"Request for example: {package_name}#{package_version} / {example_member_path}") - package_dir_name = 'fhir_packages'; download_dir = os.path.join(current_app.instance_path, package_dir_name) - pkg_filename = services._construct_tgz_filename(package_name, package_version) # Use service helper - tgz_path = os.path.join(download_dir, pkg_filename) - if not os.path.exists(tgz_path): return jsonify({"error": f"Package file not found: {pkg_filename}"}), 404 - # Basic security check on member path - safe_member_path = secure_filename(example_member_path.replace("package/","")) # Allow paths within package/ - if not example_member_path.startswith('package/') or '..' in example_member_path: return jsonify({"error": "Invalid example file path."}), 400 - - try: - with tarfile.open(tgz_path, "r:gz") as tar: - try: example_member = tar.getmember(example_member_path) # Use original path here - except KeyError: return jsonify({"error": f"Example file '{example_member_path}' not found."}), 404 - example_fileobj = tar.extractfile(example_member) - if not example_fileobj: return jsonify({"error": "Could not extract example file."}), 500 - try: content_bytes = example_fileobj.read() - finally: example_fileobj.close() - return content_bytes # Return raw bytes - except tarfile.TarError as e: err_msg = f"Error reading {tgz_path}: {e}"; current_app.logger.error(err_msg); return jsonify({"error": err_msg}), 500 - except Exception as e: err_msg = f"Unexpected error getting example {example_member_path}: {e}"; current_app.logger.error(err_msg, exc_info=True); return jsonify({"error": err_msg}), 500 \ No newline at end of file diff --git a/app/fhir_ig_importer/services.py b/app/fhir_ig_importer/services.py deleted file mode 100644 index b98a6ee..0000000 --- a/app/fhir_ig_importer/services.py +++ /dev/null @@ -1,345 +0,0 @@ -# app/modules/fhir_ig_importer/services.py - -import requests -import os -import tarfile -import gzip -import json -import io -import re -import logging -from flask import current_app -from collections import defaultdict - -# Constants -FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" -DOWNLOAD_DIR_NAME = "fhir_packages" - -# --- Helper Functions --- - -def _get_download_dir(): - """Gets the absolute path to the download directory, creating it if needed.""" - logger = logging.getLogger(__name__) - instance_path = None # Initialize - try: - # --- FIX: Indent code inside try block --- - instance_path = current_app.instance_path - logger.debug(f"Using instance path from current_app: {instance_path}") - except RuntimeError: - # --- FIX: Indent code inside except block --- - logger.warning("No app context for instance_path, constructing relative path.") - instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) - logger.debug(f"Constructed instance path: {instance_path}") - - # This part depends on instance_path being set above - if not instance_path: - logger.error("Fatal Error: Could not determine instance path.") - return None - - download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) - try: - # --- FIX: Indent code inside try block --- - os.makedirs(download_dir, exist_ok=True) - return download_dir - except OSError as e: - # --- FIX: Indent code inside except block --- - logger.error(f"Fatal Error creating dir {download_dir}: {e}", exc_info=True) - return None - -def sanitize_filename_part(text): # Public version - """Basic sanitization for name/version parts of filename.""" - # --- FIX: Indent function body --- - safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) - safe_text = re.sub(r'_+', '_', safe_text) # Uses re - safe_text = safe_text.strip('_-.') - return safe_text if safe_text else "invalid_name" - -def _construct_tgz_filename(name, version): - """Constructs the standard filename using the sanitized parts.""" - # --- FIX: Indent function body --- - return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" - -def find_and_extract_sd(tgz_path, resource_identifier): # Public version - """Helper to find and extract SD json from a given tgz path by ID, Name, or Type.""" - # --- FIX: Ensure consistent indentation --- - sd_data = None - found_path = None - logger = logging.getLogger(__name__) - if not tgz_path or not os.path.exists(tgz_path): - logger.error(f"File not found in find_and_extract_sd: {tgz_path}") - return None, None - try: - with tarfile.open(tgz_path, "r:gz") as tar: - logger.debug(f"Searching for SD matching '{resource_identifier}' in {os.path.basename(tgz_path)}") - for member in tar: - if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')): - continue - if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: - continue - - fileobj = None - try: - fileobj = tar.extractfile(member) - if fileobj: - content_bytes = fileobj.read() - content_string = content_bytes.decode('utf-8-sig') - data = json.loads(content_string) - if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition': - sd_id = data.get('id') - sd_name = data.get('name') - sd_type = data.get('type') - # Match if requested identifier matches ID, Name, or Base Type - if resource_identifier == sd_type or resource_identifier == sd_id or resource_identifier == sd_name: - sd_data = data - found_path = member.name - logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}") - break # Stop searching once found - except Exception as e: - # Log issues reading/parsing individual files but continue search - logger.warning(f"Could not read/parse potential SD {member.name}: {e}") - finally: - if fileobj: fileobj.close() # Ensure resource cleanup - - if sd_data is None: - logger.warning(f"SD matching '{resource_identifier}' not found within archive {os.path.basename(tgz_path)}") - except tarfile.TarError as e: - logger.error(f"TarError reading {tgz_path} in find_and_extract_sd: {e}") - raise tarfile.TarError(f"Error reading package archive: {e}") from e - except FileNotFoundError as e: - logger.error(f"FileNotFoundError reading {tgz_path} in find_and_extract_sd: {e}") - raise - except Exception as e: - logger.error(f"Unexpected error in find_and_extract_sd for {tgz_path}: {e}", exc_info=True) - raise - return sd_data, found_path - -# --- Core Service Functions --- - -def download_package(name, version): - """ Downloads a single FHIR package. Returns (save_path, error_message) """ - # --- FIX: Ensure consistent indentation --- - logger = logging.getLogger(__name__) - download_dir = _get_download_dir() - if not download_dir: - return None, "Could not get/create download directory." - - package_id = f"{name}#{version}" - package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" - filename = _construct_tgz_filename(name, version) # Uses public sanitize via helper - save_path = os.path.join(download_dir, filename) - - if os.path.exists(save_path): - logger.info(f"Exists: {filename}") - return save_path, None - - logger.info(f"Downloading: {package_id} -> {filename}") - try: - with requests.get(package_url, stream=True, timeout=90) as r: - r.raise_for_status() - with open(save_path, 'wb') as f: - logger.debug(f"Opened {save_path} for writing.") - for chunk in r.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) - logger.info(f"Success: Downloaded {filename}") - return save_path, None - except requests.exceptions.RequestException as e: - err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg - except OSError as e: - err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg - except Exception as e: - err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg - -def extract_dependencies(tgz_path): - """ Extracts dependencies dict from package.json. Returns (dep_dict or None on error, error_message) """ - # --- FIX: Ensure consistent indentation --- - logger = logging.getLogger(__name__) - package_json_path = "package/package.json" - dependencies = {} - error_message = None - if not tgz_path or not os.path.exists(tgz_path): - return None, f"File not found at {tgz_path}" - try: - with tarfile.open(tgz_path, "r:gz") as tar: - package_json_member = tar.getmember(package_json_path) - package_json_fileobj = tar.extractfile(package_json_member) - if package_json_fileobj: - try: - package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig')) - dependencies = package_data.get('dependencies', {}) - finally: - package_json_fileobj.close() - else: - raise FileNotFoundError(f"Could not extract {package_json_path}") - except KeyError: - error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; - logger.warning(error_message) # OK if missing - except (json.JSONDecodeError, UnicodeDecodeError) as e: - error_message = f"Parse error in {package_json_path}: {e}"; logger.error(error_message); dependencies = None # Parsing failed - except (tarfile.TarError, FileNotFoundError) as e: - error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Archive read failed - except Exception as e: - error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None - return dependencies, error_message - - -# --- Recursive Import Orchestrator --- -def import_package_and_dependencies(initial_name, initial_version): - """Orchestrates recursive download and dependency extraction.""" - # --- FIX: Ensure consistent indentation --- - logger = logging.getLogger(__name__) - logger.info(f"Starting recursive import for {initial_name}#{initial_version}") - results = {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'errors': [] } - pending_queue = [(initial_name, initial_version)] - processed_lookup = set() - - while pending_queue: - name, version = pending_queue.pop(0) - package_id_tuple = (name, version) - - if package_id_tuple in processed_lookup: - continue - - logger.info(f"Processing: {name}#{version}") - processed_lookup.add(package_id_tuple) - - save_path, dl_error = download_package(name, version) - - if dl_error: - error_msg = f"Download failed for {name}#{version}: {dl_error}" - results['errors'].append(error_msg) - if package_id_tuple == results['requested']: - logger.error("Aborting import: Initial package download failed.") - break - else: - continue - else: # Download OK - results['downloaded'][package_id_tuple] = save_path - # --- Correctly indented block --- - dependencies, dep_error = extract_dependencies(save_path) - if dep_error: - results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") - elif dependencies is not None: - results['all_dependencies'][package_id_tuple] = dependencies - results['processed'].add(package_id_tuple) - logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") - for dep_name, dep_version in dependencies.items(): - if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: - dep_tuple = (dep_name, dep_version) - if dep_tuple not in processed_lookup: - if dep_tuple not in pending_queue: - pending_queue.append(dep_tuple) - logger.debug(f"Added to queue: {dep_name}#{dep_version}") - else: - logger.warning(f"Skipping invalid dependency '{dep_name}': '{dep_version}' in {name}#{version}") - # --- End Correctly indented block --- - - proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) - logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") - return results - - -# --- Package File Content Processor (V6.2 - Fixed MS path handling) --- -def process_package_file(tgz_path): - """ Extracts types, profile status, MS elements, and examples from a downloaded .tgz package (Single Pass). """ - logger = logging.getLogger(__name__) - logger.info(f"Processing package file details (V6.2 Logic): {tgz_path}") - - results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } - resource_info = defaultdict(lambda: {'name': None, 'type': None, 'is_profile': False, 'ms_flag': False, 'ms_paths': set(), 'examples': set()}) - - if not tgz_path or not os.path.exists(tgz_path): - results['errors'].append(f"Package file not found: {tgz_path}"); return results - - try: - with tarfile.open(tgz_path, "r:gz") as tar: - for member in tar: - if not member.isfile() or not member.name.startswith('package/') or not member.name.lower().endswith(('.json', '.xml', '.html')): continue - member_name_lower = member.name.lower(); base_filename_lower = os.path.basename(member_name_lower); fileobj = None - if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue - - is_example = member.name.startswith('package/example/') or 'example' in base_filename_lower - is_json = member_name_lower.endswith('.json') - - try: # Process individual member - if is_json: - fileobj = tar.extractfile(member); - if not fileobj: continue - content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) - if not isinstance(data, dict) or 'resourceType' not in data: continue - - resource_type = data['resourceType']; entry_key = resource_type; is_sd = False - - if resource_type == 'StructureDefinition': - is_sd = True; profile_id = data.get('id') or data.get('name'); sd_type = data.get('type'); sd_base = data.get('baseDefinition'); is_profile_sd = bool(sd_base); - if not profile_id or not sd_type: logger.warning(f"SD missing ID or Type: {member.name}"); continue - entry_key = profile_id - - entry = resource_info[entry_key]; entry.setdefault('type', resource_type) # Ensure type exists - - if is_sd: - entry['name'] = entry_key; entry['type'] = sd_type; entry['is_profile'] = is_profile_sd; - if not entry.get('sd_processed'): - has_ms = False; ms_paths_for_sd = set() - for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: - for element in element_list: - if isinstance(element, dict) and element.get('mustSupport') is True: - # --- FIX: Check path safely --- - element_path = element.get('path') - if element_path: # Only add if path exists - ms_paths_for_sd.add(element_path) - has_ms = True # Mark MS found if we added a path - else: - logger.warning(f"Found mustSupport=true without path in element of {entry_key}") - # --- End FIX --- - if ms_paths_for_sd: entry['ms_paths'] = ms_paths_for_sd # Store the set of paths - if has_ms: entry['ms_flag'] = True; logger.debug(f" Found MS elements in {entry_key}") # Use boolean flag - entry['sd_processed'] = True # Mark MS check done - - elif is_example: # JSON Example - key_to_use = None; profile_meta = data.get('meta', {}).get('profile', []) - if profile_meta and isinstance(profile_meta, list): - for profile_url in profile_meta: profile_id_from_meta = profile_url.split('/')[-1]; - if profile_id_from_meta in resource_info: key_to_use = profile_id_from_meta; break - if not key_to_use: key_to_use = resource_type - if key_to_use not in resource_info: resource_info[key_to_use].update({'name': key_to_use, 'type': resource_type}) - resource_info[key_to_use]['examples'].add(member.name) - - elif is_example: # XML/HTML examples - # ... (XML/HTML example association logic) ... - guessed_type = base_filename_lower.split('-')[0].capitalize(); guessed_profile_id = base_filename_lower.split('-')[0]; key_to_use = None - if guessed_profile_id in resource_info: key_to_use = guessed_profile_id - elif guessed_type in resource_info: key_to_use = guessed_type - if key_to_use: resource_info[key_to_use]['examples'].add(member.name) - else: logger.warning(f"Could not associate non-JSON example {member.name}") - - except Exception as e: logger.warning(f"Could not process member {member.name}: {e}", exc_info=False) - finally: - if fileobj: fileobj.close() - # -- End Member Loop -- - - # --- Final formatting moved INSIDE the main try block --- - final_list = []; final_ms_elements = {}; final_examples = {} - logger.debug(f"Formatting results from resource_info keys: {list(resource_info.keys())}") - for key, info in resource_info.items(): - display_name = info.get('name') or key; base_type = info.get('type') - if display_name or base_type: - logger.debug(f" Formatting item '{display_name}': type='{base_type}', profile='{info.get('is_profile', False)}', ms_flag='{info.get('ms_flag', False)}'") - final_list.append({'name': display_name, 'type': base_type, 'is_profile': info.get('is_profile', False), 'must_support': info.get('ms_flag', False)}) # Ensure 'must_support' key uses 'ms_flag' - if info['ms_paths']: final_ms_elements[display_name] = sorted(list(info['ms_paths'])) - if info['examples']: final_examples[display_name] = sorted(list(info['examples'])) - else: logger.warning(f"Skipping formatting for key: {key}") - - results['resource_types_info'] = sorted(final_list, key=lambda x: (not x.get('is_profile', False), x.get('name', ''))) - results['must_support_elements'] = final_ms_elements - results['examples'] = final_examples - # --- End formatting moved inside --- - - except Exception as e: - err_msg = f"Error processing package file {tgz_path}: {e}"; logger.error(err_msg, exc_info=True); results['errors'].append(err_msg) - - # Logging counts - final_types_count = len(results['resource_types_info']); ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']); total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()); total_examples = sum(len(v) for v in results['examples'].values()) - logger.info(f"V6.2 Extraction: {final_types_count} items ({ms_count} MS; {total_ms_paths} MS paths; {total_examples} examples) from {os.path.basename(tgz_path)}") - - return results \ No newline at end of file diff --git a/app/fhir_ig_importer/services.py.MERGE NAMES.py b/app/fhir_ig_importer/services.py.MERGE NAMES.py deleted file mode 100644 index 883a5f0..0000000 --- a/app/fhir_ig_importer/services.py.MERGE NAMES.py +++ /dev/null @@ -1,364 +0,0 @@ -# app/modules/fhir_ig_importer/services.py - -import requests -import os -import tarfile -import gzip -import json -import io -import re -import logging -from flask import current_app -from collections import defaultdict - -# Constants -FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" -DOWNLOAD_DIR_NAME = "fhir_packages" - -# --- Helper Functions --- - -def _get_download_dir(): - """Gets the absolute path to the download directory, creating it if needed.""" - logger = logging.getLogger(__name__) - try: - instance_path = current_app.instance_path - except RuntimeError: - logger.warning("No app context for instance_path, constructing relative path.") - instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) - logger.debug(f"Constructed instance path: {instance_path}") - download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) - try: - os.makedirs(download_dir, exist_ok=True) - return download_dir - except OSError as e: - logger.error(f"Fatal Error: Could not create dir {download_dir}: {e}", exc_info=True) - return None - -def sanitize_filename_part(text): - """Basic sanitization for creating filenames.""" - safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) - safe_text = re.sub(r'_+', '_', safe_text) - safe_text = safe_text.strip('_-.') - return safe_text if safe_text else "invalid_name" - -def _construct_tgz_filename(name, version): - return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" - -# --- Helper to Find/Extract SD --- -def _find_and_extract_sd(tgz_path, resource_type_to_find): - sd_data = None - found_path = None - logger = current_app.logger if current_app else logging.getLogger(__name__) - try: - with tarfile.open(tgz_path, "r:gz") as tar: - logger.debug(f"Searching for SD type '{resource_type_to_find}' in {tgz_path}") - potential_paths = [ - f'package/StructureDefinition-{resource_type_to_find.lower()}.json', - f'package/StructureDefinition-{resource_type_to_find}.json' - ] - member_found = None - for potential_path in potential_paths: - try: - member_found = tar.getmember(potential_path) - if member_found: break - except KeyError: - pass - - if not member_found: - for member in tar: - if member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json'): - filename_lower = os.path.basename(member.name).lower() - if filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: - continue - sd_fileobj = None - try: - sd_fileobj = tar.extractfile(member) - if sd_fileobj: - content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) - if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition' and data.get('type') == resource_type_to_find: - member_found = member - break - except Exception: - pass - finally: - if sd_fileobj: sd_fileobj.close() - - if member_found: - sd_fileobj = None - try: - sd_fileobj = tar.extractfile(member_found) - if sd_fileobj: - content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); sd_data = json.loads(content_string) - found_path = member_found.name; logger.info(f"Found matching SD at path: {found_path}") - except Exception as e: - logger.warning(f"Could not read/parse member {member_found.name} after finding it: {e}") - sd_data = None; found_path = None - finally: - if sd_fileobj: sd_fileobj.close() - - except tarfile.TarError as e: - logger.error(f"TarError reading {tgz_path}: {e}") - raise - except FileNotFoundError: - logger.error(f"FileNotFoundError reading {tgz_path}") - raise - except Exception as e: - logger.error(f"Unexpected error in _find_and_extract_sd for {tgz_path}: {e}", exc_info=True) - raise - return sd_data, found_path - -# --- Core Service Functions --- - -def download_package(name, version): - logger = logging.getLogger(__name__) - download_dir = _get_download_dir() - if not download_dir: return None, "Could not get/create download directory." - - package_id = f"{name}#{version}" - package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" - filename = _construct_tgz_filename(name, version) - save_path = os.path.join(download_dir, filename) - - if os.path.exists(save_path): - logger.info(f"Package already exists: {filename}") - return save_path, None - - logger.info(f"Downloading: {package_id} -> {filename}") - try: - with requests.get(package_url, stream=True, timeout=90) as r: - r.raise_for_status() - with open(save_path, 'wb') as f: - for chunk in r.iter_content(chunk_size=8192): f.write(chunk) - logger.info(f"Success: Downloaded {filename}") - return save_path, None - except requests.exceptions.RequestException as e: err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg - except OSError as e: err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg - except Exception as e: err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg - -def extract_dependencies(tgz_path): - logger = logging.getLogger(__name__) - package_json_path = "package/package.json" - dependencies = {} - error_message = None - if not tgz_path or not os.path.exists(tgz_path): return None, f"File not found at {tgz_path}" - try: - with tarfile.open(tgz_path, "r:gz") as tar: - package_json_member = tar.getmember(package_json_path) - package_json_fileobj = tar.extractfile(package_json_member) - if package_json_fileobj: - try: - package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig')) - dependencies = package_data.get('dependencies', {}) - finally: package_json_fileobj.close() - else: raise FileNotFoundError(f"Could not extract {package_json_path}") - except KeyError: error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; logger.warning(error_message) - except (json.JSONDecodeError, UnicodeDecodeError) as e: error_message = f"Parse error in {package_json_path} from {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None - except (tarfile.TarError, FileNotFoundError) as e: error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None - except Exception as e: error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None - return dependencies, error_message - -def import_package_and_dependencies(initial_name, initial_version): - logger = logging.getLogger(__name__) - logger.info(f"Starting recursive import for {initial_name}#{initial_version}") - results = {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'errors': [] } - pending_queue = [(initial_name, initial_version)]; processed_lookup = set() - - while pending_queue: - name, version = pending_queue.pop(0) - package_id_tuple = (name, version) - - if package_id_tuple in processed_lookup: continue - - logger.info(f"Processing: {name}#{version}"); processed_lookup.add(package_id_tuple) - - save_path, dl_error = download_package(name, version) - - if dl_error: - error_msg = f"Download failed for {name}#{version}: {dl_error}" - results['errors'].append(error_msg) - if package_id_tuple == results['requested']: - logger.error("Aborting import: Initial package download failed.") - break - else: - continue - else: - results['downloaded'][package_id_tuple] = save_path - dependencies, dep_error = extract_dependencies(save_path) - - if dep_error: - results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") - elif dependencies is not None: - results['all_dependencies'][package_id_tuple] = dependencies - results['processed'].add(package_id_tuple) - logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") - for dep_name, dep_version in dependencies.items(): - if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: - dep_tuple = (dep_name, dep_version) - if dep_tuple not in processed_lookup: - if dep_tuple not in pending_queue: - pending_queue.append(dep_tuple) - logger.debug(f"Added to queue: {dep_name}#{dep_version}") - else: - logger.warning(f"Skipping invalid dependency entry '{dep_name}': '{dep_version}' in {name}#{version}") - - proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) - logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") - return results - -def process_package_file(tgz_path): - logger = logging.getLogger(__name__) - logger.info(f"Processing package file details: {tgz_path}") - results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } - resource_info = {} - - if not os.path.exists(tgz_path): - results['errors'].append(f"Package file not found: {tgz_path}") - return results - - try: - with tarfile.open(tgz_path, "r:gz") as tar: - for member in tar: - if not member.isfile(): - continue - member_name_lower = member.name.lower() - base_filename_lower = os.path.basename(member_name_lower) - fileobj = None - - if member.name.startswith('package/') and member_name_lower.endswith('.json') and \ - base_filename_lower not in ['package.json', '.index.json', 'validation-summary.json']: - try: - fileobj = tar.extractfile(member) - if not fileobj: - continue - content_string = fileobj.read().decode('utf-8-sig') - data = json.loads(content_string) - - if isinstance(data, dict) and data.get('resourceType'): - resource_type = data['resourceType'] - - if member.name.startswith('package/example/'): - ex_type = resource_type - entry = resource_info.setdefault(ex_type, { - 'base_type': ex_type, - 'ms_flag': False, - 'ms_paths': [], - 'examples': [], - 'sd_processed': False - }) - entry['examples'].append(member.name) - continue - - if resource_type == 'StructureDefinition': - profile_id = data.get('id') or data.get('name') - fhir_type = data.get('type') - - if not profile_id: - logger.warning(f"StructureDefinition missing id or name: {member.name}") - continue - - entry = resource_info.setdefault(profile_id, { - 'base_type': fhir_type, - 'ms_flag': False, - 'ms_paths': [], - 'examples': [], - 'sd_processed': False - }) - - if entry['sd_processed']: - continue - - ms_paths = [] - has_ms = False - for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: - for element in element_list: - if not isinstance(element, dict): - continue - if element.get('mustSupport') is True: - path = element.get('path') - if path: - ms_paths.append(path) - has_ms = True - for t in element.get('type', []): - for ext in t.get('extension', []): - ext_url = ext.get('url') - if ext_url: - ms_paths.append(f"{path}.type.extension[{ext_url}]") - has_ms = True - for ext in element.get('extension', []): - ext_url = ext.get('url') - if ext_url: - ms_paths.append(f"{path}.extension[{ext_url}]") - has_ms = True - if ms_paths: - entry['ms_paths'] = sorted(set(ms_paths)) - if has_ms: - entry['ms_flag'] = True - entry['sd_processed'] = True - - except Exception as e: - logger.warning(f"Could not read/parse member {member.name}: {e}") - finally: - if fileobj: - fileobj.close() - - elif (member.name.startswith('package/example/') or ('example' in base_filename_lower and member.name.startswith('package/'))) \ - and (member_name_lower.endswith('.xml') or member_name_lower.endswith('.html')): - guessed_type = base_filename_lower.split('-', 1)[0].capitalize() - if guessed_type in resource_info: - resource_info[guessed_type]['examples'].append(member.name) - - except Exception as e: - err_msg = f"Error processing package file {tgz_path}: {e}" - logger.error(err_msg, exc_info=True) - results['errors'].append(err_msg) - - # --- New logic: merge profiles of same base_type --- - merged_info = {} - grouped_by_type = defaultdict(list) - - for profile_id, entry in resource_info.items(): - base_type = entry['base_type'] or profile_id - grouped_by_type[base_type].append((profile_id, entry)) - - for base_type, profiles in grouped_by_type.items(): - merged_paths = set() - merged_examples = [] - has_ms = False - - for _, profile_entry in profiles: - merged_paths.update(profile_entry.get('ms_paths', [])) - merged_examples.extend(profile_entry.get('examples', [])) - if profile_entry.get('ms_flag'): - has_ms = True - - merged_info[base_type] = { - 'base_type': base_type, - 'ms_flag': has_ms, - 'ms_paths': sorted(merged_paths), - 'examples': sorted(merged_examples), - } - - results['resource_types_info'] = sorted([ - {'name': k, 'base_type': v.get('base_type'), 'must_support': v['ms_flag']} - for k, v in merged_info.items() - ], key=lambda x: x['name']) - - results['must_support_elements'] = { - k: v['ms_paths'] for k, v in merged_info.items() if v['ms_paths'] - } - - results['examples'] = { - k: v['examples'] for k, v in merged_info.items() if v['examples'] - } - - logger.info(f"Extracted {len(results['resource_types_info'])} profiles " - f"({sum(1 for r in results['resource_types_info'] if r['must_support'])} with MS; " - f"{sum(len(v) for v in results['must_support_elements'].values())} MS paths; " - f"{sum(len(v) for v in results['examples'].values())} examples) from {tgz_path}") - - return results - -# --- Remove or Comment Out old/unused functions --- -# def _fetch_package_metadata(package_name, package_version): ... (REMOVED) -# def resolve_all_dependencies(initial_package_name, initial_package_version): ... (REMOVED) -# def process_ig_import(package_name, package_version): ... (OLD orchestrator - REMOVED) \ No newline at end of file diff --git a/app/fhir_ig_importer/services.py.old b/app/fhir_ig_importer/services.py.old deleted file mode 100644 index ce60a85..0000000 --- a/app/fhir_ig_importer/services.py.old +++ /dev/null @@ -1,315 +0,0 @@ -# app/modules/fhir_ig_importer/services.py - -import requests -import os -import tarfile -import gzip -import json -import io -import re -import logging -from flask import current_app - -# Constants -FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" -DOWNLOAD_DIR_NAME = "fhir_packages" - -# --- Helper Functions --- - -def _get_download_dir(): - """Gets the absolute path to the download directory, creating it if needed.""" - logger = logging.getLogger(__name__) - try: - instance_path = current_app.instance_path - # logger.debug(f"Using instance path from current_app: {instance_path}") # Can be noisy - except RuntimeError: - logger.warning("No app context for instance_path, constructing relative path.") - instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) - logger.debug(f"Constructed instance path: {instance_path}") - download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) - try: - os.makedirs(download_dir, exist_ok=True) - return download_dir - except OSError as e: - logger.error(f"Fatal Error: Could not create dir {download_dir}: {e}", exc_info=True) - return None - -def sanitize_filename_part(text): - """Basic sanitization for creating filenames.""" - # Replace common invalid chars, keep ., - - safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) - # Replace multiple underscores with single one - safe_text = re.sub(r'_+', '_', safe_text) - # Remove leading/trailing underscores/hyphens/periods - safe_text = safe_text.strip('_-.') - return safe_text if safe_text else "invalid_name" # Ensure not empty - -def _construct_tgz_filename(name, version): - """Constructs the standard filename for the package.""" - return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" - -# --- Helper to Find/Extract SD --- -def _find_and_extract_sd(tgz_path, resource_type_to_find): - """Helper to find and extract SD json from a given tgz path.""" - sd_data = None - found_path = None - logger = current_app.logger if current_app else logging.getLogger(__name__) # Use app logger if possible - try: - with tarfile.open(tgz_path, "r:gz") as tar: - logger.debug(f"Searching for SD type '{resource_type_to_find}' in {tgz_path}") - # Prioritize paths like 'package/StructureDefinition-[Type].json' - potential_paths = [ - f'package/StructureDefinition-{resource_type_to_find.lower()}.json', - f'package/StructureDefinition-{resource_type_to_find}.json' - ] - member_found = None - for potential_path in potential_paths: - try: - member_found = tar.getmember(potential_path) - if member_found: break - except KeyError: - pass - - # If specific paths failed, iterate - if not member_found: - for member in tar: - if member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json'): - filename_lower = os.path.basename(member.name).lower() - if filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: - continue - sd_fileobj = None - try: - sd_fileobj = tar.extractfile(member) - if sd_fileobj: - content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) - if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition' and data.get('type') == resource_type_to_find: - member_found = member - break - except Exception: - pass - finally: - if sd_fileobj: sd_fileobj.close() - - if member_found: - sd_fileobj = None - try: - sd_fileobj = tar.extractfile(member_found) - if sd_fileobj: - content_bytes = sd_fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); sd_data = json.loads(content_string) - found_path = member_found.name; logger.info(f"Found matching SD at path: {found_path}") - except Exception as e: - logger.warning(f"Could not read/parse member {member_found.name} after finding it: {e}") - sd_data = None; found_path = None - finally: - if sd_fileobj: sd_fileobj.close() - - except tarfile.TarError as e: - logger.error(f"TarError reading {tgz_path}: {e}") - raise - except FileNotFoundError: - logger.error(f"FileNotFoundError reading {tgz_path}") - raise - except Exception as e: - logger.error(f"Unexpected error in _find_and_extract_sd for {tgz_path}: {e}", exc_info=True) - raise - return sd_data, found_path -# --- End Helper --- - -# --- Core Service Functions --- - -def download_package(name, version): - """ Downloads a single FHIR package. Returns (save_path, error_message) """ - logger = logging.getLogger(__name__) - download_dir = _get_download_dir() - if not download_dir: return None, "Could not get/create download directory." - - package_id = f"{name}#{version}" - package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" - filename = _construct_tgz_filename(name, version) - save_path = os.path.join(download_dir, filename) - - if os.path.exists(save_path): - logger.info(f"Package already exists: {filename}") - return save_path, None - - logger.info(f"Downloading: {package_id} -> {filename}") - try: - with requests.get(package_url, stream=True, timeout=90) as r: - r.raise_for_status() - with open(save_path, 'wb') as f: - for chunk in r.iter_content(chunk_size=8192): f.write(chunk) - logger.info(f"Success: Downloaded {filename}") - return save_path, None - except requests.exceptions.RequestException as e: err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg - except OSError as e: err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg - except Exception as e: err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg - -def extract_dependencies(tgz_path): - """ Extracts dependencies dict from package.json. Returns (dep_dict, error_message) """ - logger = logging.getLogger(__name__) - package_json_path = "package/package.json" - dependencies = {} - error_message = None - if not tgz_path or not os.path.exists(tgz_path): return None, f"File not found at {tgz_path}" - try: - with tarfile.open(tgz_path, "r:gz") as tar: - package_json_member = tar.getmember(package_json_path) # Raises KeyError if not found - package_json_fileobj = tar.extractfile(package_json_member) - if package_json_fileobj: - try: - package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig')) - dependencies = package_data.get('dependencies', {}) - finally: package_json_fileobj.close() - else: raise FileNotFoundError(f"Could not extract {package_json_path}") - except KeyError: error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; logger.warning(error_message) # No deps is okay - except (json.JSONDecodeError, UnicodeDecodeError) as e: error_message = f"Parse error in {package_json_path} from {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Parsing failed - except (tarfile.TarError, FileNotFoundError) as e: error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Archive read failed - except Exception as e: error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None - return dependencies, error_message - -# --- Recursive Import Orchestrator (Corrected Indentation) --- -def import_package_and_dependencies(initial_name, initial_version): - """Orchestrates recursive download and dependency extraction.""" - logger = logging.getLogger(__name__) - logger.info(f"Starting recursive import for {initial_name}#{initial_version}") - results = {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'errors': [] } - pending_queue = [(initial_name, initial_version)]; processed_lookup = set() - - while pending_queue: - name, version = pending_queue.pop(0) - package_id_tuple = (name, version) - - if package_id_tuple in processed_lookup: continue - - logger.info(f"Processing: {name}#{version}"); processed_lookup.add(package_id_tuple) - - # 1. Download - save_path, dl_error = download_package(name, version) - - if dl_error: - error_msg = f"Download failed for {name}#{version}: {dl_error}" - results['errors'].append(error_msg) - if package_id_tuple == results['requested']: - logger.error("Aborting import: Initial package download failed.") - break # Stop processing queue if initial download fails - else: - continue # Skip dependency extraction for this failed download - else: - # Download succeeded (or file existed) - results['downloaded'][package_id_tuple] = save_path - - # --- FIX: Indent dependency extraction under the else block --- - # 2. Extract Dependencies from downloaded file - dependencies, dep_error = extract_dependencies(save_path) - - if dep_error: - results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") - # Still mark as 'downloaded', but maybe not 'processed'? Let's allow queue processing. - elif dependencies is not None: # Not None means extraction attempt happened (even if deps is {}) - results['all_dependencies'][package_id_tuple] = dependencies - results['processed'].add(package_id_tuple) # Mark as successfully processed (downloaded + deps extracted) - logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") - # Add new dependencies to queue - for dep_name, dep_version in dependencies.items(): - # Basic validation of dependency entry format - if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: - dep_tuple = (dep_name, dep_version) - if dep_tuple not in processed_lookup: # Check processed_lookup to prevent re-queueing - if dep_tuple not in pending_queue: # Avoid duplicate queue entries - pending_queue.append(dep_tuple) - logger.debug(f"Added to queue: {dep_name}#{dep_version}") - else: - logger.warning(f"Skipping invalid dependency entry '{dep_name}': '{dep_version}' in {name}#{version}") - # --- End Indentation Fix --- - - # Final Summary Log - proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) - logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") - return results - -# --- Package File Content Processor --- -def process_package_file(tgz_path): - """ Extracts types, MS elements, and examples from a downloaded .tgz package (Single Pass). """ - logger = logging.getLogger(__name__) - logger.info(f"Processing package file details: {tgz_path}") - results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } - resource_info = {} # Temp dict: {'Type': {'ms_flag': False, 'ms_paths': [], 'examples': [], 'sd_processed': False}} - - if not os.path.exists(tgz_path): errors.append(f"Package file not found: {tgz_path}"); return results - - try: - with tarfile.open(tgz_path, "r:gz") as tar: - for member in tar: - if not member.isfile(): continue - member_name_lower = member.name.lower() - base_filename_lower = os.path.basename(member_name_lower) - fileobj = None - - # Check if JSON file inside package/ (excluding known non-resources) - if member.name.startswith('package/') and member_name_lower.endswith('.json') and \ - base_filename_lower not in ['package.json', '.index.json', 'validation-summary.json']: - try: - fileobj = tar.extractfile(member) - if not fileobj: continue - content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) - - if isinstance(data, dict) and 'resourceType' in data: - resource_type = data['resourceType'] - # Ensure entry exists for this type - type_entry = resource_info.setdefault(resource_type, {'ms_flag': False, 'ms_paths': [], 'examples': [], 'sd_processed': False}) - - # If it's an example file, record it - if member.name.startswith('package/example/'): - type_entry['examples'].append(member.name) - logger.debug(f"Found example for {resource_type}: {member.name}") - - # If it's a StructureDefinition, process MS flags (only once per type) - if resource_type == 'StructureDefinition' and not type_entry['sd_processed']: - sd_type = data.get('type') - if sd_type and sd_type in resource_info: # Check against types we've already seen - ms_paths_for_type = []; has_ms = False - for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: - for element in element_list: - if isinstance(element, dict) and element.get('mustSupport') is True: - element_path = element.get('path') - if element_path: - ms_paths_for_type.append(element_path); has_ms = True - if ms_paths_for_type: - resource_info[sd_type]['ms_paths'] = sorted(list(set(ms_paths_for_type))) # Store unique sorted paths - if has_ms: - resource_info[sd_type]['ms_flag'] = True - resource_info[sd_type]['sd_processed'] = True # Mark as processed for MS - logger.debug(f"Processed SD for {sd_type}, MS found: {has_ms}") - else: - logger.warning(f"SD {member.name} defines type '{sd_type}' which wasn't found as a resource type key.") - - except Exception as e: logger.warning(f"Could not read/parse member {member.name}: {e}") - finally: - if fileobj: fileobj.close() - - # Also find XML and HTML examples - elif (member.name.startswith('package/example/') or ('example' in base_filename_lower and member.name.startswith('package/'))) \ - and (member_name_lower.endswith('.xml') or member_name_lower.endswith('.html')): - # Try to guess type (imperfect) - guessed_type = base_filename_lower.split('-', 1)[0].capitalize() - if guessed_type in resource_info: # Only add if type is known - resource_info[guessed_type]['examples'].append(member.name) - logger.debug(f"Found non-JSON example for {guessed_type}: {member.name}") - - except Exception as e: err_msg = f"Error processing package file {tgz_path}: {e}"; logger.error(err_msg, exc_info=True); results['errors'].append(err_msg) - - # Format final results - results['resource_types_info'] = sorted([{'name': rt, 'must_support': info['ms_flag']} for rt, info in resource_info.items()], key=lambda x: x['name']) - results['must_support_elements'] = {rt: info['ms_paths'] for rt, info in resource_info.items() if info['ms_paths']} - results['examples'] = {rt: sorted(info['examples']) for rt, info in resource_info.items() if info['examples']} - - # Logging final counts - final_types_count = len(results['resource_types_info']); ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']); total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()); total_examples = sum(len(v) for v in results['examples'].values()) - logger.info(f"Extracted {final_types_count} types ({ms_count} with MS; {total_ms_paths} MS paths; {total_examples} examples) from {tgz_path}") - - return results - -# --- Remove or Comment Out old/unused functions --- -# def _fetch_package_metadata(package_name, package_version): ... (REMOVED) -# def resolve_all_dependencies(initial_package_name, initial_package_version): ... (REMOVED) -# def process_ig_import(package_name, package_version): ... (OLD orchestrator - REMOVED) \ No newline at end of file diff --git a/app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html b/app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html deleted file mode 100644 index 1fd8d11..0000000 --- a/app/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html +++ /dev/null @@ -1,124 +0,0 @@ -{# app/modules/fhir_ig_importer/templates/fhir_ig_importer/import_ig_page.html #} -{% extends "base.html" %} {# Or your control panel base template "cp_base.html" #} -{% from "_form_helpers.html" import render_field %} {# Assumes you have this macro #} - -{% block content %} {# Or your specific CP content block #} -
-

Import FHIR Implementation Guide

- -
-
- {# --- Import Form --- #} -
-
-
Enter Package Details
-
{# Use relative endpoint for blueprint #} - {{ form.hidden_tag() }} {# CSRF token #} -
- {{ render_field(form.package_name, class="form-control" + (" is-invalid" if form.package_name.errors else ""), placeholder="e.g., hl7.fhir.au.base") }} -
-
- {{ render_field(form.package_version, class="form-control" + (" is-invalid" if form.package_version.errors else ""), placeholder="e.g., 4.1.0 or latest") }} -
-
- {{ form.submit(class="btn btn-primary", value="Fetch & Download IG") }} -
-
-
-
- - {# --- Results Section --- #} -
- {# Display Fatal Error if passed #} - {% if fatal_error %} - - {% endif %} - - {# Display results if the results dictionary exists (meaning POST happened) #} - {% if results %} -
-
- {# Use results.requested for package info #} -
Import Results for: {{ results.requested[0] }}#{{ results.requested[1] }}
-
- - {# --- Process Errors --- #} - {% if results.errors %} -
Errors Encountered ({{ results.errors|length }}):
-
    - {% for error in results.errors %} -
  • {{ error }}
  • - {% endfor %} -
- {% endif %} - - {# --- Downloaded Files --- #} -
Downloaded Packages ({{ results.downloaded|length }} / {{ results.processed|length }} processed):
- {# Check results.downloaded dictionary #} - {% if results.downloaded %} -
    - {# Iterate through results.downloaded items #} - {% for (name, version), path in results.downloaded.items()|sort %} -
  • - {{ name }}#{{ version }} - {# Display relative path cleanly #} - {{ path | replace('/app/', '') | replace('\\', '/') }} - Downloaded {# Moved badge to end #} -
  • - {% endfor %} -
-

Files saved in server's `instance/fhir_packages` directory.

- {% else %} -

No packages were successfully downloaded.

- {% endif %} - - - {# --- All Dependencies Found --- #} -
Consolidated Dependencies Found:
- {# Calculate unique_deps based on results.all_dependencies #} - {% set unique_deps = {} %} - {% for pkg_id, deps_dict in results.all_dependencies.items() %} - {% for dep_name, dep_version in deps_dict.items() %} - {% set _ = unique_deps.update({(dep_name, dep_version): true}) %} - {% endfor %} - {% endfor %} - - {% if unique_deps %} -

Unique direct dependencies found across all processed packages:

-
- {% for (name, version), _ in unique_deps.items()|sort %} -
{{ name }}
-
{{ version }}
- {% endfor %} -
- {% else %} - - {% endif %} - {# --- End Dependency Result --- #} - -
{# End card-body #} -
{# End card #} - {% endif %} {# End if results #} -
{# End results #} - {# --- End Results Section --- #} - -
-
- {# --- Instructions Panel --- #} -
-
-
Instructions
-

Enter the official FHIR package name (e.g., hl7.fhir.au.base) and the desired version (e.g., 4.1.0, latest).

-

The system will download the package and attempt to list its direct dependencies.

-

Downloads are saved to the server's `instance/fhir_packages` folder.

-
-
-
-
{# End row #} -
{# End container #} -{% endblock %} \ No newline at end of file diff --git a/app/models.py b/app/models.py deleted file mode 100644 index 023b1b9..0000000 --- a/app/models.py +++ /dev/null @@ -1,86 +0,0 @@ -# app/models.py -from app import db -from flask_login import UserMixin -from werkzeug.security import generate_password_hash, check_password_hash -from datetime import datetime -import json - -# --- ProcessedIg Model (MODIFIED for Examples) --- -class ProcessedIg(db.Model): - id = db.Column(db.Integer, primary_key=True) - package_name = db.Column(db.String(150), nullable=False, index=True) - package_version = db.Column(db.String(50), nullable=False, index=True) - processed_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) - status = db.Column(db.String(50), default='processed', nullable=True) - # Stores list of dicts: [{'name': 'Type', 'must_support': bool}, ...] - resource_types_info_json = db.Column(db.Text, nullable=True) - # Stores dict: {'TypeName': ['path1', 'path2'], ...} - must_support_elements_json = db.Column(db.Text, nullable=True) - # --- ADDED: Store example files found, grouped by type --- - # Structure: {'TypeName': ['example1.json', 'example1.xml'], ...} - examples_json = db.Column(db.Text, nullable=True) - # --- End Add --- - - __table_args__ = (db.UniqueConstraint('package_name', 'package_version', name='uq_processed_ig_name_version'),) - - # Property for resource_types_info - @property - def resource_types_info(self): - # ... (getter as before) ... - if self.resource_types_info_json: - try: return json.loads(self.resource_types_info_json) - except json.JSONDecodeError: return [] - return [] - - @resource_types_info.setter - def resource_types_info(self, types_info_list): - # ... (setter as before) ... - if types_info_list and isinstance(types_info_list, list): - sorted_list = sorted(types_info_list, key=lambda x: x.get('name', '')) - self.resource_types_info_json = json.dumps(sorted_list) - else: self.resource_types_info_json = None - - # Property for must_support_elements - @property - def must_support_elements(self): - # ... (getter as before) ... - if self.must_support_elements_json: - try: return json.loads(self.must_support_elements_json) - except json.JSONDecodeError: return {} - return {} - - @must_support_elements.setter - def must_support_elements(self, ms_elements_dict): - # ... (setter as before) ... - if ms_elements_dict and isinstance(ms_elements_dict, dict): - self.must_support_elements_json = json.dumps(ms_elements_dict) - else: self.must_support_elements_json = None - - # --- ADDED: Property for examples --- - @property - def examples(self): - """Returns the stored example filenames as a Python dict.""" - if self.examples_json: - try: - # Return dict {'TypeName': ['file1.json', 'file2.xml'], ...} - return json.loads(self.examples_json) - except json.JSONDecodeError: - return {} # Return empty dict on parse error - return {} - - @examples.setter - def examples(self, examples_dict): - """Stores a Python dict of example filenames as a JSON string.""" - if examples_dict and isinstance(examples_dict, dict): - # Sort filenames within each list? Optional. - # for key in examples_dict: examples_dict[key].sort() - self.examples_json = json.dumps(examples_dict) - else: - self.examples_json = None - # --- End Add --- - - def __repr__(self): - count = len(self.resource_types_info) - ms_count = sum(1 for item in self.resource_types_info if item.get('must_support')) - ex_count = sum(len(v) for v in self.examples.values()) # Count total example files - return f"" diff --git a/app/static/css/custom.css b/app/static/css/custom.css deleted file mode 100644 index 8422d2e..0000000 --- a/app/static/css/custom.css +++ /dev/null @@ -1,14 +0,0 @@ -/* app/static/css/custom.css */ -/* Add custom styles here to override or supplement Bootstrap */ - -body { - /* Example: Add a subtle background pattern or color */ - /* background-color: #f8f9fa; */ -} - -.footer { - /* Ensure footer stays at the bottom if needed, though */ - /* d-flex flex-column min-vh-100 on body usually handles this */ -} - -/* Add more custom styles as needed */ diff --git a/app/static/js/main.js b/app/static/js/main.js deleted file mode 100644 index 71c1266..0000000 --- a/app/static/js/main.js +++ /dev/null @@ -1,12 +0,0 @@ -/* app/static/js/main.js */ -// Add global JavaScript functions here - -// Example: Initialize Bootstrap tooltips or popovers if used -// document.addEventListener('DOMContentLoaded', function () { -// var tooltipTriggerList = [].slice.call(document.querySelectorAll('[data-bs-toggle="tooltip"]')) -// var tooltipList = tooltipTriggerList.map(function (tooltipTriggerEl) { -// return new bootstrap.Tooltip(tooltipTriggerEl) -// }) -// }); - -console.log("Custom main.js loaded."); diff --git a/app/templates/404.html b/app/templates/404.html deleted file mode 100644 index ae6523e..0000000 --- a/app/templates/404.html +++ /dev/null @@ -1,12 +0,0 @@ -{% extends "base.html" %} - -{% block content %} -
-

404

-

Page Not Found

-

- Sorry, we couldn't find the page you were looking for. -

- Go Back Home -
-{% endblock %} diff --git a/app/templates/500.html b/app/templates/500.html deleted file mode 100644 index 5741e51..0000000 --- a/app/templates/500.html +++ /dev/null @@ -1,12 +0,0 @@ -{% extends "base.html" %} - -{% block content %} -
-

500

-

Internal Server Error

-

- Sorry, something went wrong on our end. We are looking into it. -

- Go Back Home -
-{% endblock %} diff --git a/app/templates/_form_helpers.html b/app/templates/_form_helpers.html deleted file mode 100644 index 113287d..0000000 --- a/app/templates/_form_helpers.html +++ /dev/null @@ -1,26 +0,0 @@ -{# app/templates/_form_helpers.html #} -{% macro render_field(field, label_visible=true) %} -
{# Add margin bottom for spacing #} - {% if label_visible and field.label %} - {{ field.label(class="form-label") }} {# Render label with Bootstrap class #} - {% endif %} - - {# Add is-invalid class if errors exist #} - {% set css_class = 'form-control ' + kwargs.pop('class', '') %} - {% if field.errors %} - {% set css_class = css_class + ' is-invalid' %} - {% endif %} - - {# Render the field itself, passing any extra attributes #} - {{ field(class=css_class, **kwargs) }} - - {# Display validation errors #} - {% if field.errors %} -
- {% for error in field.errors %} - {{ error }}
- {% endfor %} -
- {% endif %} -
-{% endmacro %} \ No newline at end of file diff --git a/app/templates/base.html b/app/templates/base.html deleted file mode 100644 index 84328ac..0000000 --- a/app/templates/base.html +++ /dev/null @@ -1,91 +0,0 @@ - - - - - - - - - {% if title %}{{ title }} - {% endif %}{{ site_name }} - - - - -
- {% with messages = get_flashed_messages(with_categories=true) %} - {% if messages %} - {% for category, message in messages %} - - {% endfor %} - {% endif %} - {% endwith %} - {% block content %}{% endblock %} -
- -
-
- {% set current_year = now.year %} © {{ current_year }} {{ site_name }}. All rights reserved. -
-
- - - - - {% block scripts %} - {# Tooltip Initialization Script #} - - {% endblock %} - - \ No newline at end of file diff --git a/app/templates/downloaded_igs.html b/app/templates/downloaded_igs.html deleted file mode 100644 index 49f8207..0000000 --- a/app/templates/downloaded_igs.html +++ /dev/null @@ -1,135 +0,0 @@ -{# app/control_panel/templates/cp_downloaded_igs.html #} -{% extends "base.html" %} - -{% block content %} -
-
-

Manage FHIR Packages

- -
- - {% if error_message %} - - {% endif %} - - {# NOTE: The block calculating processed_ids set using {% set %} was REMOVED from here #} - - {# --- Start Two Column Layout --- #} -
- - {# --- Left Column: Downloaded Packages (Horizontal Buttons) --- #} -
-
-
Downloaded Packages ({{ packages|length }})
-
- {% if packages %} -
- - -

Risk:= Duplicate Dependancy with different versions

- - - - {% for pkg in packages %} - {% set is_processed = (pkg.name, pkg.version) in processed_ids %} - {# --- ADDED: Check for duplicate name --- #} - {% set is_duplicate = pkg.name in duplicate_names %} - {# --- ADDED: Assign row class based on duplicate group --- #} - - - - - - {% endfor %} - -
Package NameVersionActions
- {# --- ADDED: Risk Badge for duplicates --- #} - {% if is_duplicate %} - Duplicate - {% endif %} - {# --- End Add --- #} - {{ pkg.name }} - {{ pkg.version }} {# Actions #} -
- {% if is_processed %} - Processed - {% else %} -
- - - -
- {% endif %} -
- - -
-
-
-
- {% elif not error_message %}

No downloaded FHIR packages found.

{% endif %} -
-
-
{# --- End Left Column --- #} - - - {# --- Right Column: Processed Packages (Vertical Buttons) --- #} -
-
-
Processed Packages ({{ processed_list|length }})
-
- {% if processed_list %} -

MS = Contains Must Support Elements

-
- - - - - - {% for processed_ig in processed_list %} - - - - - - - {% endfor %} - -
Package NameVersionResource TypesActions
{# Tooltip for Processed At / Status #} - {% set tooltip_title_parts = [] %} - {% if processed_ig.processed_at %}{% set _ = tooltip_title_parts.append("Processed: " + processed_ig.processed_at.strftime('%Y-%m-%d %H:%M')) %}{% endif %} - {% if processed_ig.status %}{% set _ = tooltip_title_parts.append("Status: " + processed_ig.status) %}{% endif %} - {% set tooltip_text = tooltip_title_parts | join('\n') %} - {{ processed_ig.package_name }} - {{ processed_ig.package_version }} {# Resource Types Cell w/ Badges #} - {% set types_info = processed_ig.resource_types_info %} - {% if types_info %}
{% for type_info in types_info %}{% if type_info.must_support %}{{ type_info.name }}{% else %}{{ type_info.name }}{% endif %}{% endfor %}
{% else %}N/A{% endif %} -
- {# Vertical Button Group #} -
- View -
- - -
-
-
-
- {% elif not error_message %}

No packages recorded as processed yet.

{% endif %} -
-
-
{# --- End Right Column --- #} - -
{# --- End Row --- #} - -
{# End container #} -{% endblock %} - -{# Tooltip JS Initializer should be in base.html #} -{% block scripts %}{{ super() }}{% endblock %} \ No newline at end of file diff --git a/app/templates/index.html b/app/templates/index.html deleted file mode 100644 index c0b0407..0000000 --- a/app/templates/index.html +++ /dev/null @@ -1,15 +0,0 @@ -{% extends "base.html" %} {% block content %}
- PAS Logo Placeholder - -

Welcome to FLARE FHIR IG TOOLKIT

-
-

- This is the starting point for a Journey into FHIR IG's from an implementors view!. -

- -
-
-{% endblock %} diff --git a/app/templates/view_processed_ig.html b/app/templates/view_processed_ig.html deleted file mode 100644 index e8cf5f9..0000000 --- a/app/templates/view_processed_ig.html +++ /dev/null @@ -1,405 +0,0 @@ -{% extends "base.html" %} - -{% from "_form_helpers.html" import render_field %} - -{% block content %} -
- - - {% if processed_ig %} -
-
Package Details
-
-
-
Package Name
-
{{ processed_ig.package_name }}
-
Package Version
-
{{ processed_ig.package_version }}
-
Processed At
-
{{ processed_ig.processed_at.strftime('%Y-%m-%d %H:%M:%S UTC') if processed_ig.processed_at else 'N/A' }}
-
Processing Status
-
- {{ processed_ig.status or 'N/A' }} -
-
-
-
- -
-
Resource Types Found / Defined
-
- {% if profile_list or base_list %} -

MS = Contains Must Support Elements

- {% if profile_list %} -
Profiles Defined ({{ profile_list|length }}):
- - {% endif %} - {% if base_list %} -

Examples = - Examples will be displayed when selecting Base Types if contained in the IG

-
Base Resource Types Referenced ({{ base_list|length }}):
- - {% endif %} - {% else %} -

No resource type information extracted or stored.

- {% endif %} -
-
- - - - - - - - {% else %} - - {% endif %} -
- -{% block scripts %} -{{ super() }} - - -{% endblock scripts %} -{% endblock content %} {# Main content block END #} \ No newline at end of file diff --git a/config.py b/config.py deleted file mode 100644 index 77d1543..0000000 --- a/config.py +++ /dev/null @@ -1,52 +0,0 @@ -# config.py -# Basic configuration settings for the Flask application - -import os - -# Determine the base directory of the application (where config.py lives) -basedir = os.path.abspath(os.path.dirname(__file__)) - -# Define the instance path relative to the base directory -# This seems correct if instance folder is at the same level as config.py/run.py -instance_path = os.path.join(basedir, 'instance') - -class Config: - """Base configuration class.""" - SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess' - - # Database configuration (Development/Production) - # Points to 'instance/app.db' relative to config.py location - SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ - 'sqlite:///' + os.path.join(instance_path, 'app.db') - SQLALCHEMY_TRACK_MODIFICATIONS = False # Disable modification tracking - - # Add other global configuration variables here - SITE_NAME = "Modular PAS Framework" - # Add any other default settings your app needs - - -# --- ADDED Testing Configuration --- -class TestingConfig(Config): - """Configuration specific to testing.""" - TESTING = True - - # Use a separate database file for tests inside the instance folder - # Ensures tests don't interfere with development data - SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(instance_path, 'test.db') - - # Disable CSRF protection during tests for simplicity - WTF_CSRF_ENABLED = False - - # Ensure Flask-Login works normally during tests (not disabled) - LOGIN_DISABLED = False - - # Use a fixed, predictable secret key for testing sessions - SECRET_KEY = 'testing-secret-key' - - # Inside class TestingConfig(Config): - SERVER_NAME = 'localhost.test' # Or just 'localhost' is usually fine - -# --- You could add other configurations like ProductionConfig(Config) later --- -# class ProductionConfig(Config): -# SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') # Should be set in prod env -# # etc... \ No newline at end of file diff --git a/instance/app.db b/instance/app.db deleted file mode 100644 index 907fa844a37c497dda3cda360f942dc75975a2eb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 167936 zcmeI5TW=gkcE@K#M-pXAV|%S-w43#|Csr&%>e(3%heHY*#I(%y5|%`ol-D*4V^Gsw zGt*Yz+!s=m;Q+LemwbSI4e}N8vPs_Z4Fc@bKE%mWfMku`JS71F1Uc20>FS;yUd)l; z*nd(@PS>gHuTGshRn<-HzH`?IxH51&iv>!%@_eOQt-P)%l}dHKQmHJ`^*mkI=z4*! zXXr}5s_BR6zbTJ~R=s@bpDWK@c&SpmT>HB#&t3lfk~05~^M5(ty71Do|MKj67ysts zmlwaBo(Ci#00JNY0wC~o2z>Fw%NG`|U$6efLBRSZcRferzR&fZF_b@FxwEse{$N9S zu>Qu~4W%xN>dK8v^Mib~x-bOAbKVU)V%Ng8Ktk4g7e(1W67xcKvEp7+CTuuInS*}U-&sBYUXY=0r&Iiid z8y_e)41G~F=g!vs-3L4CghB~F>B*F$7WUExynBE1op(2s8)?Ht^KU8XT@q{Y(&A^& zUs%|^;H{Mr{N~LVwSLBrW8ZEx}-1!CdYW35! z^w8&CPy7%iir<%$l9Kj}3(Q3ng@^#1wIsJcb5vh1SU#lxS@Us$+$wfgyo(O3|fi0e|4QLOtS zT+&4p-LdZ8`_p>kuK`{LoB&s|u!d9(UOI~wbSn1$PCau%Qqj|4yXz2`9xILExP!I|LNwGaUw8m|1FXb(kG&vS>=(!IyC09i$Kbi*r z^xXM{cWzD_*u~6Rg@8Fy<%>d{FVj`6{YQoV;R^&n00ck)1V8`;KmY_l00ck)1VG?h zLf}gE+;x3*u(H$~be9Kxeu>(J_5Zh|bI}YC009sH0T2KI5C8!X009sH0T4(C)aI_w z%jA)sFfu%P z@32P+=~X=L{p{zPJIdx;%4TvO|MrzQT;wiZ)nn274{wEr&eJ#Od}ZJ{7L}L2RuY#a z&i~KVep9LaruNlQ4TKB?KmY_l00ck)1V8`;KmY_l00cnb8%W^Axy5RJGveI$=G6J( zenR~G|L-fc-`D=B_HXj{H?U2p5(t0*2!H?xfB*=900@8p2!H?xe5(ok^lP{7#eB-= z=U$s{i6;zH*W-Hz-;dY-UsY;f(f6-vO6{;_)$Z2*s@BOT#2pBL00@8p2!H?xfB*=9 z00@8p2z<*4{PG*tzU{a^FXgKBN_zKl`r&H&Vf@ND=mG{TwPgfxBB{kt#z9HwXW7# zVeP?kXQ|cX{lRj-zoPNgertty*_zgEtu3(*Us`SVmzryX)plFcnrp588edsmUR&!f zcbc5HS30fLm6f$7>#nv}+U?cm3Lo@4thL6MTLae7y6rW_*(wWt?j8QD8n|-WlZnz&gy_##Ja7fw!AXvH`{BgP2TNvJ4{0`fGZt%{qftXQkC?H&@sa(=Ee}|Nj54E45$O z{(UxGj9fqf1V8`;KmY_l00ck)1V8`;KmY`u0)g+(wW^c*D(9}wwdN9db#osf2O{+{^P?huKn%eho9DMX7T!MrM_;O+#4Qj+JQqKn62^p zEu|hDxI$`&rb%~(-*Y`@V3;Coz)YW0pcVQ-&ktSK@q#q?vs+4*^Bc@Jv{_|#hqU^w=yGTg8=C$t0Z%__r9%L;AdF`H#2H!atUFh;q)6&m`? zI{vPs&9tibxNbbD9C!F9A@^sdOIcNCuBKak+C|)(DcCJ<$Y!lq+f>6D4$f_lX+bnP zXE&VWJa@?I&1#r#dydBS(3@F?JKQ8&r*Cu5pPhBu<;>GY+sva0V1_no*X5c)W_eaH zS*VKtEJtPW{A85QPg7>E++JWRWc{_0DY-<5?5O)FX!T%r9`LP52+XZle3gTgRkJ1)FoHtOgWdkD0SJ*II9M@QW;*} zWyzJ3x-(~M&!iheg-C&C$=!>68n@`v)F+&svz6i#vL|M+Jb$SA{1NkgV`$T#ROriw zapO_IZ9iF078*gT@gIoi|EskxE444_`>(J3Ypqw?sJ&eKqE>w>djTi}1V8`;KmY_l z00ck)1V8`;KmY`0OW^Nko842$ZIqVrgY?71??0Xh$p^ghjQZOA$m})-Bg3OVN=ScT z(c`T~tI<^HRAJBH`$_yiq~G78>m|Cx=NIW(pz9)C^K>oJb%Ut(urMAwh$Qs{btu4{COwBobKA73B<0w4eaAOHd&00JNY0w4eaAOHd& z00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY z0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4ea zAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd& z00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY z0w4eaATVnJKMeUTrEZ0O(DOsrb-bW{TM4|7e|Agxv~Dwt(|w(Vs^)lH^)2V2VGsR) z1)*Q3gu#IuMcC^5+}mS;;n*VB@O!T33=C5UMQ*1kW1ks*pc$SPS~Do&qmkKd3`U06 zU|~bFq19+LnyTyZJ%jHrHJ3VSb6IUQmFCjzPV4qkv$5P+5mwW)|GgOv5%bX6}wTl|^!NE*e-Oa%`UmA3pkMir6L2+zzHl zJ90#SM$u#(;~iOginT$UfiWo)H{ODN_gs)MAlU(&&$0HJbWE{P{38G38psC& zPKvl!*0juk$R+z>|6@v0EGVsQJ*QHRelX(3a3t*2hoy>9%qP{e@2WmM{SGUP*t8={ z)p#r{SKhI>Ol8Ny=);7b(2#ebCBiln`JxRdj*{=Jmh8itjD0xh3C+iiNo%4xwr)iJ zl9+Vvq<0P0Cm+V5xjvc3Gf6t>h+!s$PB@5?Eb&IN5ZehN&n63z9mFhDcG9HKY*x(z zk8#*^gAHxR4-Ac*w$KbzaydsQH=*B5f9fojZN&V0-0E|$XfA0rz&&zpGdEiHXk6|M z9M76e=J5gdI4vP|m_3|uTl3P!h}@iFxrHdEvwuvCfIph>tO}V#i@Cy_?YmqvNVAK% zpWTBi>ez&O^~uHCHDcA5p~^!l7ycbmyQzwGs(zu(gg;7sr<~VBn(2f>v!hPgNbhK2 z%lv5Fp9Pr?^+&RToD6N0i(TfCZ82OHuVS-wlAvLbt=%*9keNdH%HyOeD@kc7>gf3t zwC1H1*quZ_vbb^hPC6FziNs`RXnfmqs1orA&Qd1A2V(mpSGkP86gjcL^7>`O34uqm z#)R5WRqte(Omp*hayfh!^PTmIY%q>nZx&2pI#NUjOqlGkaJI`A!?I|>^Ee)1!spnO z0vf4fkPCD)E%c(fC2>Y_QfcofAMP0z^A2{M(4&1VYI#nY@yiA8F-|f+ObMd$PuDVN;$wC9r$AVEghZXtRib6 z*TUj@vTGF$!+Yb~aUU0g%9@g8;_**H+EFXywG)TZ24paz4M=?}noP5dQ@QT=Y{)ll zpEgX#J9v}PxSiT&YV)L6I3%%Eos?PQ_9d!uOq8^>r3K}U>SRtO#McBLTBuz;bMc1uhR~%7w~Ah$V!mnL|r1(u}oUr;VO%h)o22FR!lBZ zDfBGvH0;F~xmm5OWm<)PZ3$cxtpKC=*IS14eCkMirOB}y#DWK4gInlVva(ode*Ly-1SAJRrJUV2F#;#n_4h4$w z5jRE3F%lGb7R5RB6}nK2;7S2ecS`qB0nuWJnej2mMDlW^bd02@ryk@>#&=R1(vX~4 z&myLdzMhzmOp-SQ#qiwl&xF~am%>Vhy2ni#F*=nljxNLq+lfQAqY!zV_a)~U}=Mqt()p>~<8w1gWd4VU|K%UkXFD;kB@dqZLxh^vMuQO zL)GVxXl-l^ZLYI^G-5W=S)h^42eP%xAErn1X?BV}{ZzDM@$q$+M&e-t8V-%jDx79{ zB)3IAv_El6Xk3YNU&-N-i%-|ViFj->kBU>i_(~^?T*_3Fg&imrloyyLo5A_+*X^}4 z^w?*gmOO3{DXYu0hvew|gtXXlP5zU~hXGWRyaUJQjgOPl>=Puh$(49$JNx!&61kil zGsC1U@;H^yGpUD4t1HRX+|veXYiiNc(}6QzuLy zc9&?$tOg^Rz($U#pCVnfD|N#7rr{d8I?~Nk#WznGKX3wOsv5Q9RK*0IF*FY}+7HzE z2@^V*0ah*MKjiv}`kx#Q0a_fCXkxeUgr&KmNy|^Nf+vihIwL2Ho||9PcvC2d9^vBN ziJB!D^|VMa7RRSZr6X}HRkerH=}EEDL$u?7XGv`5Xq%G73si^mOL$Z8oJYTDvU1Zur`WdBdZ+F^es{Od6hMFv@BsZ=97@PL6P_isd3drc*rbskBoV zdDvN*>84m|d8ttCJAKkRkwd&kLBo)i*+=NlIHg6KkgRV6>d5h3g9h+al^Z8X{B}w~ zE5~S-5#5&UPLx(P6RRz;eBI+JSw32jNBaTO<|--G(nKnMYE`9VzG3Y#;W)|DR3=Y0 z^^U1>5{r&kX{kwPh!&YR;jPf7>Iv$Qrlysk8?fnf>_n~9V=BL!Zjv2S{c$`sVc0i( z$EB6KGrhO7%4DgYyn`bQqFe`6XP(KqO;a6t+td3qm&Rx03e%(4 zbb7!&+K@~p)u=zEx`+*`W7{Yq<=d$a-4iu6N~tj~Rz-S3Tb)v`lBrT|PLz+$-H|6$ zTIBOJm_(V-v&2L0*yI=H`yd7*Z@U$z?(xDti2jn~+p+EW5iHeiX(ex&1!fyJQhFE+T+nWkin~uT#eU>QqN1x65 zWdh^}0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY z0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4ea zAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd& z00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY z0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4ea zAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHeo G1pW`vH8B2Cux*&PSdhXDGj6kyI!nC)QCPKe;&T9!!%y!!SjPr{BZG_#kIJ0 zZ?E}fuU7;J5FkK+009C&2-NOBY_f$w)dcdnYA62N^XALnUd_b_5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB=E&3UuS>z391i iqIT4Zno%RFN43ZxK!5-N0t5&UAV7cs0RjXnDDVJuf)C^X diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 18b3697..0000000 --- a/pytest.ini +++ /dev/null @@ -1,9 +0,0 @@ -[pytest] -# Look for tests in the tests directory -testpaths = tests -# Set environment variables for testing -# Ensures Flask uses the testing config and finds the app -#env = -# # Use D: prefix for default values if not set externally -# D:FLASK_APP=run.py # Or app:create_app - CHECK THIS -# D:FLASK_ENV=testing \ No newline at end of file diff --git a/rebuildDB.md b/rebuildDB.md deleted file mode 100644 index a4082a9..0000000 --- a/rebuildDB.md +++ /dev/null @@ -1,82 +0,0 @@ -tep 2: Stop Container(s) - -Bash - -docker stop -# or if using docker compose: -# docker compose down -Step 3: Delete Local Database and Migrations - -On your local Windows machine: - -Delete the database file: C:\GIT\SudoPas_demo\instance\app.db -Delete the test database file (if it exists): C:\GIT\SudoPas_demo\instance\test.db -Delete the entire migrations folder: C:\GIT\SudoPas_demo\migrations\ -Step 4: Start a Temporary Container - -Bash - -docker compose up -d -# OR if not using compose: -# docker run ... your-image-name ... -(Get the new container ID/name) - -Step 5: Initialize Migrations Inside Container - -Bash - -docker exec -w /app flask db init -Step 6: Copy New migrations Folder Out to Local - -Run this in your local PowerShell or Command Prompt: - -PowerShell - -docker cp :/app/migrations C:\GIT\SudoPas_demo\migrations -Verify the migrations folder now exists locally again, containing alembic.ini, env.py, etc., but the versions subfolder should be empty. - -Step 7: Stop Temporary Container - -Bash - -docker stop -# or if using docker compose: -# docker compose down -Step 8: Rebuild Docker Image - -Crucially, rebuild the image to include the latest models.py and the new empty migrations folder: - -Bash - -docker compose build -# OR -# docker build -t your-image-name . -Step 9: Start Final Container - -Bash - -docker compose up -d -# OR -# docker run ... your-image-name ... -(Get the final container ID/name) - -Step 10: Create Initial Migration Script - -Now, generate the first migration script based on your current models: - -Bash - -docker exec -w /app flask db migrate -m "Initial migration with User, ModuleRegistry, ProcessedIg" -Check that a new script appeared in your local migrations/versions/ folder. - -Step 11: Apply Migration (Create DB & Tables) - -Run upgrade to create the new app.db and apply the initial schema: - -Bash - -docker exec -w /app flask db upgrade -After this, you should have a completely fresh database matching your latest models. You will need to: - -Create your admin user again via signup or a dedicated command (if you create one). -Re-import any FHIR IGs via the UI if you want data to test with. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5ee6edb..2e1cf5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,17 +1,3 @@ -# requirements.txt -# List of Python packages required for the project - -Flask -Flask-SQLAlchemy -Flask-Migrate -python-dotenv -Flask-WTF -email-validator -requests>=2.20 # Or later version -Flask-Login # <-- ADD THIS LINE -pytest>=7.0 # Or a specific later version -# Optional, but helpful for Flask fixtures later: -# pytest-flask>=1.0 - - -# We will add Flask-SQLAlchemy later when we set up the database +Flask==2.3.3 +Flask-SQLAlchemy==3.0.5 +Werkzeug==2.3.7 \ No newline at end of file diff --git a/run.py b/run.py deleted file mode 100644 index 781ccb1..0000000 --- a/run.py +++ /dev/null @@ -1,45 +0,0 @@ -# run.py -# Main entry point to start the Flask development server. - -import os -from dotenv import load_dotenv - -# Load environment variables from .env file, if it exists -# Useful for storing sensitive info like SECRET_KEY or DATABASE_URL locally -dotenv_path = os.path.join(os.path.dirname(__file__), '.env') -if os.path.exists(dotenv_path): - load_dotenv(dotenv_path) - print("Loaded environment variables from .env file.") -else: - print(".env file not found, using default config or environment variables.") - - -# Import the application factory function (from app/__init__.py, which we'll create content for next) -# We assume the 'app' package exists with an __init__.py containing create_app() -try: - from app import create_app -except ImportError as e: - # Provide a helpful message if the app structure isn't ready yet - print(f"Error importing create_app: {e}") - print("Please ensure the 'app' directory and 'app/__init__.py' exist and define the create_app function.") - # Exit or raise the error depending on desired behavior during setup - raise - -# Create the application instance using the factory -# This allows for different configurations (e.g., testing) if needed later -# We pass the configuration object from config.py -# from config import Config # Assuming Config class is defined in config.py -# flask_app = create_app(Config) -# Simpler approach if create_app handles config loading internally: -flask_app = create_app() - - -if __name__ == '__main__': - # Run the Flask development server - # debug=True enables auto-reloading and detailed error pages (DO NOT use in production) - # host='0.0.0.0' makes the server accessible on your local network - print("Starting Flask development server...") - # Port can be configured via environment variable or default to 5000 - port = int(os.environ.get('PORT', 5000)) - flask_app.run(host='0.0.0.0', port=port, debug=True) - diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 9bab21f..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,110 +0,0 @@ -# tests/conftest.py - -import pytest -import os -from app import create_app, db, discover_and_register_modules # Keep import -from config import TestingConfig -from app.models import User -from tests.test_control_panel import create_test_user, login # Or move helpers to a shared file - - -# Determine the instance path for removing the test DB later -instance_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'instance') -TEST_DB_PATH = os.path.join(instance_path, 'test.db') - -# --- Use 'function' scope for better isolation --- -@pytest.fixture(scope='function') -def app(): - """ - Function-scoped test Flask application. Configured for testing. - Handles creation and teardown of the test database FOR EACH TEST. - Initializes modules AFTER database creation. - """ - # Create app with TestingConfig, skip module init initially - app = create_app(TestingConfig, init_modules=False) - - # Establish an application context before accessing db - with app.app_context(): - print(f"\n--- Setting up test database for function at {app.config['SQLALCHEMY_DATABASE_URI']} ---") - # Ensure instance folder exists - try: - os.makedirs(app.instance_path) - except OSError: - pass # Already exists - - # Remove old test database file if it exists (paranoid check) - if os.path.exists(TEST_DB_PATH): - # print(f"--- Removing old test database file: {TEST_DB_PATH} ---") # Can be noisy - os.remove(TEST_DB_PATH) - - # Create tables based on models - db.create_all() - print("--- Test database tables created ---") # Keep this print for confirmation - - # --- FIX: Run discovery AFTER DB setup --- - print("--- Initializing modules after DB setup ---") # Keep this print - discover_and_register_modules(app) # <-- UNCOMMENTED / ADDED THIS LINE - # --- End Module Discovery --- - - # Yield the app instance for use in the single test function - yield app - - # --- Teardown (runs after each test function) --- - # print("\n--- Tearing down test database for function ---") # Can be noisy - db.session.remove() - db.drop_all() - # Optional: Remove the test database file after test run - # if os.path.exists(TEST_DB_PATH): - # os.remove(TEST_DB_PATH) - -# --- Use 'function' scope --- -@pytest.fixture(scope='function') -def client(app): - """ - Provides a Flask test client for the function-scoped app. - """ - return app.test_client() - -# --- Use 'function' scope --- -@pytest.fixture(scope='function') -def runner(app): - """ - Provides a Flask test CLI runner for the function-scoped app. - """ - return app.test_cli_runner() - -# --- ADDED Fixture for Logged-In Admin Client --- -@pytest.fixture(scope='function') -def admin_client(client, app): - """ - Provides a test client already logged in as a pre-created admin user. - Uses the function-scoped 'client' and 'app' fixtures. - """ - admin_username = "fixture_admin" - admin_email = "fixture_admin@example.com" # Unique email - admin_password = "password" - - # Create admin user within the app context provided by the 'app' fixture - with app.app_context(): - create_test_user( - username=admin_username, - email=admin_email, - password=admin_password, - role="admin" - ) - - # Log the admin user in using the 'client' fixture - login_res = login(client, admin_username, admin_password) - # Basic check to ensure login likely succeeded (redirect expected) - if login_res.status_code != 302: - pytest.fail("Admin login failed during fixture setup.") - - # Yield the already-logged-in client for the test - yield client - - # Teardown (logout) is optional as function scope cleans up, - # but can be added for explicit cleanup if needed. - # client.get(url_for('auth.logout')) - -# --- Potential Future Fixtures --- -# (Keep commented out potential session fixture) \ No newline at end of file diff --git a/tests/test_auth.py b/tests/test_auth.py deleted file mode 100644 index 3d1c20b..0000000 --- a/tests/test_auth.py +++ /dev/null @@ -1,165 +0,0 @@ -# tests/test_auth.py - -import pytest -from flask import url_for, session, request # Keep request import -from app.models import User -from app import db -from urllib.parse import urlparse, parse_qs # Keep URL parsing tools - -# --- Helper to create a user --- -# (Using the version that defaults email based on username) -def create_test_user(username="testuser", email=None, password="password", role="user"): - """Helper function to add a user to the test database.""" - if email is None: - email = f"{username}@example.test" # Default email based on username - # Check if user already exists by username or email - user = User.query.filter((User.username == username) | (User.email == email)).first() - if user: - print(f"\nDEBUG: Found existing test user '{user.username}' with ID {user.id}") - return user - user = User(username=username, email=email, role=role) - user.set_password(password) - db.session.add(user) - db.session.commit() - print(f"\nDEBUG: Created test user '{username}' (Role: {role}) with ID {user.id}") - return user - -# --- Tests --- - -def test_login_page_loads(client): - """Test that the login page loads correctly.""" - response = client.get(url_for('auth.login')) - assert response.status_code == 200 - assert b"Login" in response.data - assert b"Username" in response.data - assert b"Password" in response.data - -def test_successful_login_as_admin(client, app): - """Test logging in with correct ADMIN credentials.""" - with app.app_context(): - admin_user = create_test_user( - username="test_admin", - email="admin@example.test", - password="password", - role="admin" - ) - assert admin_user.id is not None - assert admin_user.role == 'admin' - - response = client.post(url_for('auth.login'), data={ - 'username': 'test_admin', - 'password': 'password' - }, follow_redirects=True) - - assert response.status_code == 200 - assert b"Logout" in response.data - assert bytes(f"{admin_user.username}", 'utf-8') in response.data - assert b"Control Panel" in response.data - # --- CORRECTED ASSERTION --- - assert b"Manage Modules" in response.data # Check for the button/link text - - -def test_login_wrong_password(client, app): - """Test logging in with incorrect password.""" - with app.app_context(): - create_test_user(username="wrong_pass_user", password="password") - response = client.post(url_for('auth.login'), data={ - 'username': 'wrong_pass_user', - 'password': 'wrongpassword' - }, follow_redirects=True) - assert response.status_code == 200 - assert b"Invalid username or password" in response.data - assert b"Logout" not in response.data - -def test_login_wrong_username(client): - """Test logging in with non-existent username.""" - response = client.post(url_for('auth.login'), data={ - 'username': 'nosuchuser', - 'password': 'password' - }, follow_redirects=True) - assert response.status_code == 200 - assert b"Invalid username or password" in response.data - assert b"Logout" not in response.data - -def test_successful_login_as_user(client, app): - """Test logging in with correct USER credentials.""" - with app.app_context(): - test_user = create_test_user( - username="test_user", - email="user@example.test", - password="password", - role="user" - ) - assert test_user.id is not None - assert test_user.role == 'user' - response = client.post(url_for('auth.login'), data={ - 'username': 'test_user', - 'password': 'password' - }, follow_redirects=True) - assert response.status_code == 200 - assert b"Logout" in response.data - assert bytes(f"{test_user.username}", 'utf-8') in response.data - assert b"Control Panel" not in response.data - site_name = app.config.get('SITE_NAME', 'PAS Framework') - assert bytes(site_name, 'utf-8') in response.data - - -# --- Replace the existing test_logout function with this: --- -def test_logout(client, app): - """Test logging out.""" - with app.app_context(): - user = create_test_user(username='logout_user', password='password') - login_res = client.post(url_for('auth.login'), data={'username': 'logout_user', 'password': 'password'}) - assert login_res.status_code == 302 - - logout_response = client.get(url_for('auth.logout'), follow_redirects=True) - assert logout_response.status_code == 200 - assert b"You have been logged out." in logout_response.data - assert b"Login" in logout_response.data - assert b"Logout" not in logout_response.data - - # Assert: Accessing protected page redirects to login - protected_response = client.get(url_for('control_panel.index'), follow_redirects=False) - assert protected_response.status_code == 302 - - # --- Use Manual Path Comparison --- - redirect_location = protected_response.headers.get('Location', '') - parsed_location = urlparse(redirect_location) - query_params = parse_qs(parsed_location.query) - - # Manually define the expected RELATIVE paths - expected_login_path_manual = '/auth/login' - expected_next_path_manual = '/control-panel/' # Includes trailing slash from previous logs - - # Compare the path from the header with the known relative string - assert parsed_location.path == expected_login_path_manual - - # Check the 'next' parameter - assert 'next' in query_params - assert query_params['next'][0] == expected_next_path_manual - - -# --- Replace the existing test_login_required_redirect function with this: --- -def test_login_required_redirect(client, app): - """Test that accessing a protected page redirects to login when logged out.""" - # Act: Attempt to access control panel index - response = client.get(url_for('control_panel.index'), follow_redirects=False) - - # Assert: Check for redirect status code (302) - assert response.status_code == 302 - - # --- Use Manual Path Comparison --- - redirect_location = response.headers.get('Location', '') - parsed_location = urlparse(redirect_location) - query_params = parse_qs(parsed_location.query) - - # Manually define the expected RELATIVE paths - expected_login_path_manual = '/auth/login' - expected_next_path_manual = '/control-panel/' # Includes trailing slash - - # Compare the path from the header with the known relative string - assert parsed_location.path == expected_login_path_manual - - # Check the 'next' query parameter exists and has the correct value - assert 'next' in query_params - assert query_params['next'][0] == expected_next_path_manual \ No newline at end of file diff --git a/tests/test_control_panel.py b/tests/test_control_panel.py deleted file mode 100644 index db07e93..0000000 --- a/tests/test_control_panel.py +++ /dev/null @@ -1,222 +0,0 @@ -# tests/test_control_panel.py - -import pytest -from flask import url_for -from app.models import User, ModuleRegistry # Import models -from app import db -from urllib.parse import urlparse, parse_qs # Make sure this is imported - -# --- Test Helpers --- -def create_test_user(username="testuser", email=None, password="password", role="user"): - if email is None: email = f"{username}@example.test" - # Use SQLAlchemy 2.0 style query - user = db.session.scalar(db.select(User).filter((User.username == username) | (User.email == email))) - if user: print(f"\nDEBUG: Found existing test user '{user.username}' with ID {user.id}"); return user - user = User(username=username, email=email, role=role); user.set_password(password) - db.session.add(user); db.session.commit() - print(f"\nDEBUG: Created test user '{username}' (Role: {role}) with ID {user.id}"); return user - -def login(client, username, password): - return client.post(url_for('auth.login'), data=dict(username=username, password=password), follow_redirects=False) - -# --- Access Control Tests --- -def test_cp_access_admin(client, app): # PASSED - with app.app_context(): create_test_user(username="cp_admin", password="password", role="admin") - login_res = login(client, "cp_admin", "password"); assert login_res.status_code == 302 - cp_index_res = client.get(url_for('control_panel.index')); assert cp_index_res.status_code == 200; assert b"Control Panel" in cp_index_res.data - module_res = client.get(url_for('control_panel.manage_modules')); assert module_res.status_code == 200; assert b"Module Management" in module_res.data - -def test_cp_access_user(client, app): # PASSED - with app.app_context(): create_test_user(username="cp_user", password="password", role="user") - login_res = login(client, "cp_user", "password"); assert login_res.status_code == 302 - cp_index_res = client.get(url_for('control_panel.index')); assert cp_index_res.status_code == 403 - module_res = client.get(url_for('control_panel.manage_modules')); assert module_res.status_code == 403 - -def test_cp_access_logged_out(client, app): # PASSED - cp_index_res = client.get(url_for('control_panel.index'), follow_redirects=False); assert cp_index_res.status_code == 302 - module_res = client.get(url_for('control_panel.manage_modules'), follow_redirects=False); assert module_res.status_code == 302 - - -# --- Module Management Tests --- -def test_module_manager_list(client, app): # PASSED - with app.app_context(): create_test_user(username="module_admin", password="password", role="admin") - login(client, "module_admin", "password") - response = client.get(url_for('control_panel.manage_modules')) - assert response.status_code == 200; assert b"Module Management" in response.data - assert b"example_module" in response.data; assert b"Disabled" in response.data - -def test_module_manager_toggle(client, app): # PASSED - with app.app_context(): - admin = create_test_user(username="toggle_admin", password="password", role="admin") - # Use SQLAlchemy 2.0 style query - module_entry = db.session.scalar(db.select(ModuleRegistry).filter_by(module_id='example_module')) - assert module_entry is not None, "Check conftest.py discovery call." - module_entry.is_enabled = False; db.session.commit() - login(client, "toggle_admin", "password") - # Enable - enable_url = url_for('control_panel.toggle_module_status', module_id='example_module') - response_enable = client.post(enable_url, follow_redirects=False) - assert response_enable.status_code == 302; redirect_location_enable = response_enable.headers.get('Location', ''); parsed_location_enable = urlparse(redirect_location_enable); expected_path_manual = '/control-panel/modules'; assert parsed_location_enable.path == expected_path_manual - with client.session_transaction() as sess: assert '_flashes' in sess; assert "has been enabled" in sess['_flashes'][-1][1] - with app.app_context(): module_entry_after_enable = db.session.scalar(db.select(ModuleRegistry).filter_by(module_id='example_module')); assert module_entry_after_enable is not None and module_entry_after_enable.is_enabled is True - # Disable - disable_url = url_for('control_panel.toggle_module_status', module_id='example_module') - response_disable = client.post(disable_url, follow_redirects=False) - assert response_disable.status_code == 302; redirect_location_disable = response_disable.headers.get('Location', ''); parsed_location_disable = urlparse(redirect_location_disable); assert parsed_location_disable.path == expected_path_manual - with client.session_transaction() as sess: assert '_flashes' in sess; assert "has been disabled" in sess['_flashes'][-1][1] - with app.app_context(): module_entry_after_disable = db.session.scalar(db.select(ModuleRegistry).filter_by(module_id='example_module')); assert module_entry_after_disable is not None and module_entry_after_disable.is_enabled is False - - -# --- User CRUD Tests --- - -def test_add_user_page_loads(client, app): # PASSED - with app.app_context(): create_test_user(username="crud_admin", password="password", role="admin") - login(client, "crud_admin", "password") - response = client.get(url_for('control_panel.add_user')) - assert response.status_code == 200; assert b"Add New User" in response.data; assert b"Username" in response.data - assert b"Email" in response.data; assert b"Password" in response.data; assert b"Repeat Password" in response.data - assert b"Role" in response.data; assert b"Add User" in response.data - -def test_add_user_success(client, app): # PASSED - with app.app_context(): create_test_user(username="crud_admin_adder", password="password", role="admin") - login(client, "crud_admin_adder", "password") - new_username = "new_test_user"; new_email = "new@example.com"; new_password = "new_password"; new_role = "user" - response = client.post(url_for('control_panel.add_user'), data={'username': new_username, 'email': new_email, 'password': new_password,'password2': new_password,'role': new_role,'submit': 'Add User'}, follow_redirects=True) - assert response.status_code == 200; assert b"User new_test_user (user) added successfully!" in response.data - assert bytes(new_username, 'utf-8') in response.data; assert bytes(new_email, 'utf-8') in response.data - with app.app_context(): - newly_added_user = db.session.scalar(db.select(User).filter_by(username=new_username)) # Use 2.0 style - assert newly_added_user is not None; assert newly_added_user.email == new_email; assert newly_added_user.role == new_role; assert newly_added_user.check_password(new_password) is True - -# --- Edit User Tests --- - -# --- MODIFIED: Use admin_client fixture --- -def test_edit_user_page_loads(admin_client, app): # Use admin_client instead of client - """Test the 'Edit User' page loads correctly with user data.""" - # Arrange: Create ONLY the target user - with app.app_context(): - target_user = create_test_user(username="edit_target", email="edit@target.com", role="user") - target_user_id = target_user.id # Get ID - target_user_username = target_user.username # Store username if needed for assert - target_user_email = target_user.email - target_user_role = target_user.role - - # Act: Get the 'Edit User' page using the logged-in admin client - # Pass target user's ID - response = admin_client.get(url_for('control_panel.edit_user', user_id=target_user_id)) - - # Assert: Check page loads and contains correct pre-filled data - assert response.status_code == 200 - assert bytes(f"Edit User", 'utf-8') in response.data # Use simpler title from route - # Check if form fields (rendered by template using data from route) are present - # These assertions rely on how edit_user.html renders the form passed by the route - assert bytes(f'value="{target_user_username}"', 'utf-8') in response.data - assert bytes(f'value="{target_user_email}"', 'utf-8') in response.data - assert bytes(f'