2025-04-25 23:16:47 +10:00

1456 lines
80 KiB
Python

import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
import datetime
import shutil
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, Response, current_app, session, send_file, make_response
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_wtf import FlaskForm
from flask_wtf.csrf import CSRFProtect
from werkzeug.utils import secure_filename
from werkzeug.formparser import FormDataParser
from werkzeug.exceptions import RequestEntityTooLarge
import tarfile
import json
import logging
import requests
import re
import services # Restore full module import
from services import services_bp, construct_tgz_filename, parse_package_filename # Keep Blueprint import
from forms import IgImportForm, ValidationForm, FSHConverterForm, TestDataUploadForm
from wtforms import SubmitField
#from models import ProcessedIg
import tempfile
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'your-fallback-secret-key-here')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:////app/instance/fhir_ig.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['FHIR_PACKAGES_DIR'] = '/app/instance/fhir_packages'
app.config['API_KEY'] = os.environ.get('API_KEY', 'your-fallback-api-key-here')
app.config['VALIDATE_IMPOSED_PROFILES'] = True
app.config['DISPLAY_PROFILE_RELATIONSHIPS'] = True
app.config['UPLOAD_FOLDER'] = '/app/static/uploads' # For GoFSH output
# Set max upload size (e.g., 12 MB, adjust as needed)
app.config['MAX_CONTENT_LENGTH'] = 6 * 1024 * 1024
# Increase max number of form parts (default is often 1000)
#app.config['MAX_FORM_PARTS'] = 1000 # Allow up to 1000 parts this is a hard coded stop limit in MAX_FORM_PARTS of werkzeug
# --- NEW: Define Custom Form Parser ---
class CustomFormDataParser(FormDataParser):
"""Subclass to increase the maximum number of form parts."""
def __init__(self, *args, **kwargs):
# Set a higher limit for max_form_parts. Adjust value as needed.
# This overrides the default limit checked by Werkzeug's parser.
# Set to a sufficiently high number for your expected maximum file count.
super().__init__(*args, max_form_parts=2000, **kwargs) # Example: Allow 2000 parts
# --- END NEW ---
# <<< ADD THIS CONTEXT PROCESSOR >>>
@app.context_processor
def inject_app_mode():
"""Injects the app_mode into template contexts."""
return dict(app_mode=app.config.get('APP_MODE', 'standalone'))
# <<< END ADD >>>
# Read application mode from environment variable, default to 'standalone'
app.config['APP_MODE'] = os.environ.get('APP_MODE', 'standalone').lower()
logger.info(f"Application running in mode: {app.config['APP_MODE']}")
# --- END mode check ---
# Ensure directories exist and are writable
instance_path = '/app/instance'
packages_path = app.config['FHIR_PACKAGES_DIR']
logger.debug(f"Instance path configuration: {instance_path}")
logger.debug(f"Database URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
logger.debug(f"Packages path: {packages_path}")
try:
instance_folder_path = app.instance_path
logger.debug(f"Flask instance folder path: {instance_folder_path}")
os.makedirs(instance_folder_path, exist_ok=True)
os.makedirs(packages_path, exist_ok=True)
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
logger.debug(f"Directories created/verified: Instance: {instance_folder_path}, Packages: {packages_path}")
except Exception as e:
logger.error(f"Failed to create/verify directories: {e}", exc_info=True)
db = SQLAlchemy(app)
csrf = CSRFProtect(app)
migrate = Migrate(app, db)
# Register Blueprint
app.register_blueprint(services_bp, url_prefix='/api')
class ProcessedIg(db.Model):
id = db.Column(db.Integer, primary_key=True)
package_name = db.Column(db.String(128), nullable=False)
version = db.Column(db.String(64), nullable=False)
processed_date = db.Column(db.DateTime, nullable=False)
resource_types_info = db.Column(db.JSON, nullable=False)
must_support_elements = db.Column(db.JSON, nullable=True)
examples = db.Column(db.JSON, nullable=True)
complies_with_profiles = db.Column(db.JSON, nullable=True)
imposed_profiles = db.Column(db.JSON, nullable=True)
optional_usage_elements = db.Column(db.JSON, nullable=True)
# --- ADD THIS LINE ---
search_param_conformance = db.Column(db.JSON, nullable=True) # Stores the extracted conformance map
# --- END ADD ---
__table_args__ = (db.UniqueConstraint('package_name', 'version', name='uq_package_version'),)
# --- Make sure to handle database migration if you use Flask-Migrate ---
# (e.g., flask db migrate -m "Add search_param_conformance to ProcessedIg", flask db upgrade)
# If not using migrations, you might need to drop and recreate the table (losing existing processed data)
# or manually alter the table using SQLite tools.
def check_api_key():
api_key = request.headers.get('X-API-Key')
if not api_key and request.is_json:
api_key = request.json.get('api_key')
if not api_key:
logger.error("API key missing in request")
return jsonify({"status": "error", "message": "API key missing"}), 401
if api_key != app.config['API_KEY']:
logger.error("Invalid API key provided.")
return jsonify({"status": "error", "message": "Invalid API key"}), 401
logger.debug("API key validated successfully")
return None
def list_downloaded_packages(packages_dir):
packages = []
errors = []
duplicate_groups = {}
logger.debug(f"Scanning packages directory: {packages_dir}")
if not os.path.exists(packages_dir):
logger.warning(f"Packages directory not found: {packages_dir}")
return packages, errors, duplicate_groups
for filename in os.listdir(packages_dir):
if filename.endswith('.tgz'):
full_path = os.path.join(packages_dir, filename)
name = filename[:-4]
version = ''
parsed_name, parsed_version = services.parse_package_filename(filename)
if parsed_name:
name = parsed_name
version = parsed_version
else:
logger.warning(f"Could not parse version from {filename}, using default name.")
errors.append(f"Could not parse {filename}")
try:
with tarfile.open(full_path, "r:gz") as tar:
pkg_json_member = tar.getmember("package/package.json")
fileobj = tar.extractfile(pkg_json_member)
if fileobj:
pkg_data = json.loads(fileobj.read().decode('utf-8-sig'))
name = pkg_data.get('name', name)
version = pkg_data.get('version', version)
fileobj.close()
except (KeyError, tarfile.TarError, json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Could not read package.json from {filename}: {e}")
errors.append(f"Error reading {filename}: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error reading package.json from {filename}: {e}", exc_info=True)
errors.append(f"Unexpected error for {filename}: {str(e)}")
if name and version:
packages.append({'name': name, 'version': version, 'filename': filename})
else:
logger.warning(f"Skipping package {filename} due to invalid name ('{name}') or version ('{version}')")
errors.append(f"Invalid package {filename}: name='{name}', version='{version}'")
name_counts = {}
for pkg in packages:
name = pkg['name']
name_counts[name] = name_counts.get(name, 0) + 1
for name, count in name_counts.items():
if count > 1:
duplicate_groups[name] = sorted([p['version'] for p in packages if p['name'] == name])
logger.debug(f"Found packages: {packages}")
logger.debug(f"Errors during package listing: {errors}")
logger.debug(f"Duplicate groups: {duplicate_groups}")
return packages, errors, duplicate_groups
@app.route('/')
def index():
return render_template('index.html', site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
@app.route('/import-ig', methods=['GET', 'POST'])
def import_ig():
form = IgImportForm()
if form.validate_on_submit():
name = form.package_name.data
version = form.package_version.data
dependency_mode = form.dependency_mode.data
try:
result = services.import_package_and_dependencies(name, version, dependency_mode=dependency_mode)
if result['errors'] and not result['downloaded']:
error_msg = result['errors'][0]
simplified_msg = error_msg
if "HTTP error" in error_msg and "404" in error_msg:
simplified_msg = "Package not found on registry (404). Check name and version."
elif "HTTP error" in error_msg:
simplified_msg = f"Registry error: {error_msg.split(': ', 1)[-1]}"
elif "Connection error" in error_msg:
simplified_msg = "Could not connect to the FHIR package registry."
flash(f"Failed to import {name}#{version}: {simplified_msg}", "error")
logger.error(f"Import failed critically for {name}#{version}: {error_msg}")
else:
if result['errors']:
flash(f"Partially imported {name}#{version} with errors during dependency processing. Check logs.", "warning")
for err in result['errors']:
logger.warning(f"Import warning for {name}#{version}: {err}")
else:
flash(f"Successfully downloaded {name}#{version} and dependencies! Mode: {dependency_mode}", "success")
return redirect(url_for('view_igs'))
except Exception as e:
logger.error(f"Unexpected error during IG import: {str(e)}", exc_info=True)
flash(f"An unexpected error occurred downloading the IG: {str(e)}", "error")
else:
for field, errors in form.errors.items():
for error in errors:
flash(f"Error in {getattr(form, field).label.text}: {error}", "danger")
return render_template('import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
@app.route('/view-igs')
def view_igs():
form = FlaskForm()
processed_igs = ProcessedIg.query.order_by(ProcessedIg.package_name, ProcessedIg.version).all()
processed_ids = {(ig.package_name, ig.version) for ig in processed_igs}
packages_dir = app.config['FHIR_PACKAGES_DIR']
packages, errors, duplicate_groups = list_downloaded_packages(packages_dir)
if errors:
flash(f"Warning: Errors encountered while listing packages: {', '.join(errors)}", "warning")
colors = ['bg-warning', 'bg-info', 'bg-success', 'bg-danger', 'bg-secondary']
group_colors = {}
for i, name in enumerate(duplicate_groups.keys()):
group_colors[name] = colors[i % len(colors)]
return render_template('cp_downloaded_igs.html', form=form, packages=packages,
processed_list=processed_igs, processed_ids=processed_ids,
duplicate_groups=duplicate_groups, group_colors=group_colors,
site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now(),
config=app.config)
@app.route('/about')
def about():
"""Renders the about page."""
# The app_mode is automatically injected by the context processor
return render_template('about.html',
title="About", # Optional title for the page
site_name='FHIRFLARE IG Toolkit') # Or get from config
@app.route('/push-igs', methods=['GET'])
def push_igs():
# form = FlaskForm() # OLD - Replace this line
form = IgImportForm() # Use a real form class that has CSRF handling built-in
processed_igs = ProcessedIg.query.order_by(ProcessedIg.package_name, ProcessedIg.version).all()
processed_ids = {(ig.package_name, ig.version) for ig in processed_igs}
packages_dir = app.config['FHIR_PACKAGES_DIR']
packages, errors, duplicate_groups = list_downloaded_packages(packages_dir)
if errors:
flash(f"Warning: Errors encountered while listing packages: {', '.join(errors)}", "warning")
colors = ['bg-warning', 'bg-info', 'bg-success', 'bg-danger', 'bg-secondary']
group_colors = {}
for i, name in enumerate(duplicate_groups.keys()):
group_colors[name] = colors[i % len(colors)]
return render_template('cp_push_igs.html', form=form, packages=packages, # Pass the form instance
processed_list=processed_igs, processed_ids=processed_ids,
duplicate_groups=duplicate_groups, group_colors=group_colors,
site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now(),
api_key=app.config['API_KEY'], config=app.config)
@app.route('/process-igs', methods=['POST'])
def process_ig():
form = FlaskForm() # Assuming a basic FlaskForm for CSRF protection
if form.validate_on_submit():
filename = request.form.get('filename')
# --- Keep existing filename and path validation ---
if not filename or not filename.endswith('.tgz'):
flash("Invalid package file selected.", "error")
return redirect(url_for('view_igs'))
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename)
if not os.path.exists(tgz_path):
flash(f"Package file not found: {filename}", "error")
return redirect(url_for('view_igs'))
name, version = services.parse_package_filename(filename)
if not name: # Add fallback naming if parse fails
name = filename[:-4].replace('_', '.') # Basic guess
version = 'unknown'
logger.warning(f"Using fallback naming for {filename} -> {name}#{version}")
try:
logger.info(f"Starting processing for {name}#{version} from file {filename}")
# This now returns the conformance map too
package_info = services.process_package_file(tgz_path)
if package_info.get('errors'):
flash(f"Processing completed with errors for {name}#{version}: {', '.join(package_info['errors'])}", "warning")
# (Keep existing optional_usage_dict logic)
optional_usage_dict = {
info['name']: True
for info in package_info.get('resource_types_info', [])
if info.get('optional_usage')
}
logger.debug(f"Optional usage elements identified: {optional_usage_dict}")
# Find existing or create new DB record
existing_ig = ProcessedIg.query.filter_by(package_name=name, version=version).first()
if existing_ig:
logger.info(f"Updating existing processed record for {name}#{version}")
processed_ig = existing_ig
else:
logger.info(f"Creating new processed record for {name}#{version}")
processed_ig = ProcessedIg(package_name=name, version=version)
db.session.add(processed_ig)
# Update all fields
processed_ig.processed_date = datetime.datetime.now(tz=datetime.timezone.utc)
processed_ig.resource_types_info = package_info.get('resource_types_info', [])
processed_ig.must_support_elements = package_info.get('must_support_elements')
processed_ig.examples = package_info.get('examples')
processed_ig.complies_with_profiles = package_info.get('complies_with_profiles', [])
processed_ig.imposed_profiles = package_info.get('imposed_profiles', [])
processed_ig.optional_usage_elements = optional_usage_dict
# --- ADD THIS LINE: Save the extracted conformance map ---
processed_ig.search_param_conformance = package_info.get('search_param_conformance') # Get map from results
# --- END ADD ---
db.session.commit() # Commit all changes
flash(f"Successfully processed {name}#{version}!", "success")
except Exception as e:
db.session.rollback() # Rollback on error
logger.error(f"Error processing IG {filename}: {str(e)}", exc_info=True)
flash(f"Error processing IG '{filename}': {str(e)}", "error")
else:
# Handle CSRF or other form validation errors
logger.warning(f"Form validation failed for process-igs: {form.errors}")
flash("CSRF token missing or invalid, or other form error.", "error")
return redirect(url_for('view_igs'))
# --- End of /process-igs Function ---
@app.route('/delete-ig', methods=['POST'])
def delete_ig():
form = FlaskForm()
if form.validate_on_submit():
filename = request.form.get('filename')
if not filename or not filename.endswith('.tgz'):
flash("Invalid package file specified.", "error")
return redirect(url_for('view_igs'))
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename)
metadata_path = tgz_path.replace('.tgz', '.metadata.json')
deleted_files = []
errors = []
if os.path.exists(tgz_path):
try:
os.remove(tgz_path)
deleted_files.append(filename)
logger.info(f"Deleted package file: {tgz_path}")
except OSError as e:
errors.append(f"Could not delete {filename}: {e}")
logger.error(f"Error deleting {tgz_path}: {e}")
else:
flash(f"Package file not found: {filename}", "warning")
if os.path.exists(metadata_path):
try:
os.remove(metadata_path)
deleted_files.append(os.path.basename(metadata_path))
logger.info(f"Deleted metadata file: {metadata_path}")
except OSError as e:
errors.append(f"Could not delete metadata for {filename}: {e}")
logger.error(f"Error deleting {metadata_path}: {e}")
if errors:
for error in errors:
flash(error, "error")
elif deleted_files:
flash(f"Deleted: {', '.join(deleted_files)}", "success")
else:
flash("No files found to delete.", "info")
else:
logger.warning(f"Form validation failed for delete-ig: {form.errors}")
flash("CSRF token missing or invalid.", "error")
return redirect(url_for('view_igs'))
@app.route('/unload-ig', methods=['POST'])
def unload_ig():
form = FlaskForm()
if form.validate_on_submit():
ig_id = request.form.get('ig_id')
try:
ig_id_int = int(ig_id)
processed_ig = db.session.get(ProcessedIg, ig_id_int)
if processed_ig:
try:
pkg_name = processed_ig.package_name
pkg_version = processed_ig.version
db.session.delete(processed_ig)
db.session.commit()
flash(f"Unloaded processed data for {pkg_name}#{pkg_version}", "success")
logger.info(f"Unloaded DB record for {pkg_name}#{pkg_version} (ID: {ig_id_int})")
except Exception as e:
db.session.rollback()
flash(f"Error unloading package data: {str(e)}", "error")
logger.error(f"Error deleting ProcessedIg record ID {ig_id_int}: {e}", exc_info=True)
else:
flash(f"Processed package data not found with ID: {ig_id}", "error")
logger.warning(f"Attempted to unload non-existent ProcessedIg record ID: {ig_id}")
except ValueError:
flash("Invalid package ID provided.", "error")
logger.warning(f"Invalid ID format received for unload-ig: {ig_id}")
except Exception as e:
flash(f"An unexpected error occurred during unload: {str(e)}", "error")
logger.error(f"Unexpected error in unload_ig for ID {ig_id}: {e}", exc_info=True)
else:
logger.warning(f"Form validation failed for unload-ig: {form.errors}")
flash("CSRF token missing or invalid.", "error")
return redirect(url_for('view_igs'))
@app.route('/view-ig/<int:processed_ig_id>')
def view_ig(processed_ig_id):
processed_ig = db.session.get(ProcessedIg, processed_ig_id)
if not processed_ig:
flash(f"Processed IG with ID {processed_ig_id} not found.", "error")
return redirect(url_for('view_igs'))
profile_list = [t for t in processed_ig.resource_types_info if t.get('is_profile')]
base_list = [t for t in processed_ig.resource_types_info if not t.get('is_profile')]
examples_by_type = processed_ig.examples or {}
optional_usage_elements = processed_ig.optional_usage_elements or {}
complies_with_profiles = processed_ig.complies_with_profiles or []
imposed_profiles = processed_ig.imposed_profiles or []
logger.debug(f"Viewing IG {processed_ig.package_name}#{processed_ig.version}: "
f"{len(profile_list)} profiles, {len(base_list)} base resources, "
f"{len(optional_usage_elements)} optional elements")
return render_template('cp_view_processed_ig.html',
title=f"View {processed_ig.package_name}#{processed_ig.version}",
processed_ig=processed_ig,
profile_list=profile_list,
base_list=base_list,
examples_by_type=examples_by_type,
site_name='FHIRFLARE IG Toolkit',
now=datetime.datetime.now(),
complies_with_profiles=complies_with_profiles,
imposed_profiles=imposed_profiles,
optional_usage_elements=optional_usage_elements,
config=current_app.config)
#---------------------------------------------------------------------------------------OLD backup-----------------------------------
#@app.route('/get-structure')
# def get_structure():
# package_name = request.args.get('package_name')
# package_version = request.args.get('package_version')
# resource_type = request.args.get('resource_type') # This is the StructureDefinition ID/Name
# view = request.args.get('view', 'snapshot') # Keep for potential future use
# if not all([package_name, package_version, resource_type]):
# logger.warning("get_structure: Missing query parameters: package_name=%s, package_version=%s, resource_type=%s", package_name, package_version, resource_type)
# return jsonify({"error": "Missing required query parameters: package_name, package_version, resource_type"}), 400
# packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
# if not packages_dir:
# logger.error("FHIR_PACKAGES_DIR not configured.")
# return jsonify({"error": "Server configuration error: Package directory not set."}), 500
# # Paths for primary and core packages
# tgz_filename = services.construct_tgz_filename(package_name, package_version)
# tgz_path = os.path.join(packages_dir, tgz_filename)
# core_package_name, core_package_version = services.CANONICAL_PACKAGE
# core_tgz_filename = services.construct_tgz_filename(core_package_name, core_package_version)
# core_tgz_path = os.path.join(packages_dir, core_tgz_filename)
# sd_data = None
# search_params_data = [] # Initialize search params list
# fallback_used = False
# source_package_id = f"{package_name}#{package_version}"
# base_resource_type_for_sp = None # Variable to store the base type for SP search
# logger.debug(f"Attempting to find SD for '{resource_type}' in {tgz_filename}")
# # --- Fetch SD Data (Primary Package) ---
# primary_package_exists = os.path.exists(tgz_path)
# core_package_exists = os.path.exists(core_tgz_path)
# if primary_package_exists:
# try:
# sd_data, _ = services.find_and_extract_sd(tgz_path, resource_type)
# if sd_data:
# base_resource_type_for_sp = sd_data.get('type')
# logger.debug(f"Determined base resource type '{base_resource_type_for_sp}' from primary SD '{resource_type}'")
# except Exception as e:
# logger.error(f"Unexpected error extracting SD '{resource_type}' from primary package {tgz_path}: {e}", exc_info=True)
# sd_data = None # Ensure sd_data is None if extraction failed
# # --- Fallback SD Check (if primary failed or file didn't exist) ---
# if sd_data is None:
# logger.info(f"SD for '{resource_type}' not found or failed to load from {source_package_id}. Attempting fallback to {services.CANONICAL_PACKAGE_ID}.")
# if not core_package_exists:
# logger.error(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally at {core_tgz_path}.")
# error_message = f"SD for '{resource_type}' not found in primary package, and core package is missing." if primary_package_exists else f"Primary package {package_name}#{package_version} and core package are missing."
# return jsonify({"error": error_message}), 500 if primary_package_exists else 404
# try:
# sd_data, _ = services.find_and_extract_sd(core_tgz_path, resource_type)
# if sd_data is not None:
# fallback_used = True
# source_package_id = services.CANONICAL_PACKAGE_ID
# base_resource_type_for_sp = sd_data.get('type') # Store base type from fallback SD
# logger.info(f"Found SD for '{resource_type}' in fallback package {source_package_id}. Base type: '{base_resource_type_for_sp}'")
# except Exception as e:
# logger.error(f"Unexpected error extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}", exc_info=True)
# return jsonify({"error": f"Unexpected error reading fallback StructureDefinition: {str(e)}"}), 500
# # --- Check if SD data was ultimately found ---
# if not sd_data:
# # This case should ideally be covered by the checks above, but as a final safety net:
# logger.error(f"SD for '{resource_type}' could not be found in primary or fallback packages.")
# return jsonify({"error": f"StructureDefinition for '{resource_type}' not found."}), 404
# # --- Fetch Search Parameters (Primary Package First) ---
# if base_resource_type_for_sp and primary_package_exists:
# try:
# logger.info(f"Fetching SearchParameters for base type '{base_resource_type_for_sp}' from primary package {tgz_path}")
# search_params_data = services.find_and_extract_search_params(tgz_path, base_resource_type_for_sp)
# except Exception as e:
# logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from primary package {tgz_path}: {e}", exc_info=True)
# search_params_data = [] # Continue with empty list on error
# elif not primary_package_exists:
# logger.warning(f"Original package {tgz_path} not found, cannot search it for specific SearchParameters.")
# elif not base_resource_type_for_sp:
# logger.warning(f"Base resource type could not be determined for '{resource_type}', cannot search for SearchParameters.")
# # --- Fetch Search Parameters (Fallback to Core Package if needed) ---
# if not search_params_data and base_resource_type_for_sp and core_package_exists:
# logger.info(f"No relevant SearchParameters found in primary package for '{base_resource_type_for_sp}'. Searching core package {core_tgz_path}.")
# try:
# search_params_data = services.find_and_extract_search_params(core_tgz_path, base_resource_type_for_sp)
# if search_params_data:
# logger.info(f"Found {len(search_params_data)} SearchParameters for '{base_resource_type_for_sp}' in core package.")
# except Exception as e:
# logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from core package {core_tgz_path}: {e}", exc_info=True)
# search_params_data = [] # Continue with empty list on error
# elif not search_params_data and not core_package_exists:
# logger.warning(f"Core package {core_tgz_path} not found, cannot perform fallback search for SearchParameters.")
# # --- Prepare Snapshot/Differential Elements (Existing Logic) ---
# snapshot_elements = sd_data.get('snapshot', {}).get('element', [])
# differential_elements = sd_data.get('differential', {}).get('element', [])
# differential_ids = {el.get('id') for el in differential_elements if el.get('id')}
# logger.debug(f"Found {len(differential_ids)} unique IDs in differential.")
# enriched_elements = []
# if snapshot_elements:
# logger.debug(f"Processing {len(snapshot_elements)} snapshot elements to add isInDifferential flag.")
# for element in snapshot_elements:
# element_id = element.get('id')
# element['isInDifferential'] = bool(element_id and element_id in differential_ids)
# enriched_elements.append(element)
# enriched_elements = [services.remove_narrative(el) for el in enriched_elements]
# else:
# logger.warning(f"No snapshot found for {resource_type} in {source_package_id}. Returning empty element list.")
# enriched_elements = []
# # --- Retrieve Must Support Paths from DB (Existing Logic - slightly refined key lookup) ---
# must_support_paths = []
# processed_ig = ProcessedIg.query.filter_by(package_name=package_name, version=package_version).first()
# if processed_ig and processed_ig.must_support_elements:
# ms_elements_dict = processed_ig.must_support_elements
# if resource_type in ms_elements_dict:
# must_support_paths = ms_elements_dict[resource_type]
# logger.debug(f"Retrieved {len(must_support_paths)} Must Support paths using profile key '{resource_type}' from processed IG DB record.")
# elif base_resource_type_for_sp and base_resource_type_for_sp in ms_elements_dict:
# must_support_paths = ms_elements_dict[base_resource_type_for_sp]
# logger.debug(f"Retrieved {len(must_support_paths)} Must Support paths using base type key '{base_resource_type_for_sp}' from processed IG DB record.")
# else:
# logger.debug(f"No specific Must Support paths found for keys '{resource_type}' or '{base_resource_type_for_sp}' in processed IG DB.")
# else:
# logger.debug(f"No processed IG record or no must_support_elements found in DB for {package_name}#{package_version}")
# # --- Construct the final response ---
# response_data = {
# 'elements': enriched_elements,
# 'must_support_paths': must_support_paths,
# 'search_parameters': search_params_data, # Include potentially populated list
# 'fallback_used': fallback_used,
# 'source_package': source_package_id
# }
# # Use Response object for consistent JSON formatting and smaller payload
# return Response(json.dumps(response_data, indent=None, separators=(',', ':')), mimetype='application/json')
#-----------------------------------------------------------------------------------------------------------------------------------
# --- Full /get-structure Function ---
@app.route('/get-structure')
def get_structure():
package_name = request.args.get('package_name')
package_version = request.args.get('package_version')
# This is the StructureDefinition ID/Name or base ResourceType
resource_type = request.args.get('resource_type')
view = request.args.get('view', 'snapshot') # Keep for potential future use
# --- Parameter Validation ---
if not all([package_name, package_version, resource_type]):
logger.warning("get_structure: Missing query parameters: package_name=%s, package_version=%s, resource_type=%s", package_name, package_version, resource_type)
return jsonify({"error": "Missing required query parameters: package_name, package_version, resource_type"}), 400
# --- Package Directory Setup ---
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
if not packages_dir:
logger.error("FHIR_PACKAGES_DIR not configured.")
return jsonify({"error": "Server configuration error: Package directory not set."}), 500
# --- Paths setup ---
tgz_filename = services.construct_tgz_filename(package_name, package_version)
tgz_path = os.path.join(packages_dir, tgz_filename)
# Assuming CANONICAL_PACKAGE is defined in services (e.g., ('hl7.fhir.r4.core', '4.0.1'))
core_package_name, core_package_version = services.CANONICAL_PACKAGE
core_tgz_filename = services.construct_tgz_filename(core_package_name, core_package_version)
core_tgz_path = os.path.join(packages_dir, core_tgz_filename)
sd_data = None
search_params_data = [] # Initialize search params list
fallback_used = False
source_package_id = f"{package_name}#{package_version}"
base_resource_type_for_sp = None # Variable to store the base type for SP search
logger.debug(f"Attempting to find SD for '{resource_type}' in {tgz_filename}")
# --- Fetch SD Data (Primary Package) ---
primary_package_exists = os.path.exists(tgz_path)
core_package_exists = os.path.exists(core_tgz_path)
if primary_package_exists:
try:
# Assuming find_and_extract_sd handles narrative removal
sd_data, _ = services.find_and_extract_sd(tgz_path, resource_type)
if sd_data:
# Determine the base resource type from the fetched SD
base_resource_type_for_sp = sd_data.get('type')
logger.debug(f"Determined base resource type '{base_resource_type_for_sp}' from primary SD '{resource_type}'")
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from primary package {tgz_path}: {e}", exc_info=True)
sd_data = None # Ensure sd_data is None if extraction failed
# --- Fallback SD Check (if primary failed or file didn't exist) ---
if sd_data is None:
logger.info(f"SD for '{resource_type}' not found or failed to load from {source_package_id}. Attempting fallback to {services.CANONICAL_PACKAGE_ID}.")
if not core_package_exists:
logger.error(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally at {core_tgz_path}.")
error_message = f"SD for '{resource_type}' not found in primary package, and core package is missing." if primary_package_exists else f"Primary package {package_name}#{package_version} and core package are missing."
return jsonify({"error": error_message}), 500 if primary_package_exists else 404
try:
sd_data, _ = services.find_and_extract_sd(core_tgz_path, resource_type)
if sd_data is not None:
fallback_used = True
source_package_id = services.CANONICAL_PACKAGE_ID
base_resource_type_for_sp = sd_data.get('type') # Store base type from fallback SD
logger.info(f"Found SD for '{resource_type}' in fallback package {source_package_id}. Base type: '{base_resource_type_for_sp}'")
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error reading fallback StructureDefinition: {str(e)}"}), 500
# --- Check if SD data was ultimately found ---
if not sd_data:
logger.error(f"SD for '{resource_type}' could not be found in primary or fallback packages.")
return jsonify({"error": f"StructureDefinition for '{resource_type}' not found."}), 404
# --- Fetch Search Parameters (Primary Package First) ---
# find_and_extract_search_params returns a list of dicts with basic SP info
if base_resource_type_for_sp and primary_package_exists:
try:
logger.info(f"Fetching SearchParameters for base type '{base_resource_type_for_sp}' from primary package {tgz_path}")
search_params_data = services.find_and_extract_search_params(tgz_path, base_resource_type_for_sp)
except Exception as e:
logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from primary package {tgz_path}: {e}", exc_info=True)
search_params_data = [] # Continue with empty list on error
elif not primary_package_exists:
logger.warning(f"Original package {tgz_path} not found, cannot search it for specific SearchParameters.")
elif not base_resource_type_for_sp:
logger.warning(f"Base resource type could not be determined for '{resource_type}', cannot search for SearchParameters.")
# --- Fetch Search Parameters (Fallback to Core Package if needed) ---
if not search_params_data and base_resource_type_for_sp and core_package_exists:
logger.info(f"No relevant SearchParameters found in primary package for '{base_resource_type_for_sp}'. Searching core package {core_tgz_path}.")
try:
search_params_data = services.find_and_extract_search_params(core_tgz_path, base_resource_type_for_sp)
if search_params_data:
logger.info(f"Found {len(search_params_data)} SearchParameters for '{base_resource_type_for_sp}' in core package.")
except Exception as e:
logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from core package {core_tgz_path}: {e}", exc_info=True)
search_params_data = [] # Continue with empty list on error
elif not search_params_data and not core_package_exists:
logger.warning(f"Core package {core_tgz_path} not found, cannot perform fallback search for SearchParameters.")
# --- Prepare Snapshot/Differential Elements ---
snapshot_elements = sd_data.get('snapshot', {}).get('element', [])
differential_elements = sd_data.get('differential', {}).get('element', [])
# Create set of IDs from differential elements for efficient lookup
differential_ids = {el.get('id') for el in differential_elements if el.get('id')}
logger.debug(f"Found {len(differential_ids)} unique IDs in differential.")
enriched_elements = []
if snapshot_elements:
logger.debug(f"Processing {len(snapshot_elements)} snapshot elements to add isInDifferential flag.")
for element in snapshot_elements:
element_id = element.get('id')
# Add the isInDifferential flag based on presence in differential_ids set
element['isInDifferential'] = bool(element_id and element_id in differential_ids)
enriched_elements.append(element)
# remove_narrative should ideally be handled within find_and_extract_sd,
# but applying it again here ensures it's done if the service function missed it.
enriched_elements = [services.remove_narrative(el) for el in enriched_elements]
else:
# If no snapshot, log warning. Front-end might need adjustment if only differential is sent.
logger.warning(f"No snapshot found for {resource_type} in {source_package_id}. Returning empty element list.")
enriched_elements = [] # Or consider returning differential and handle in JS
# --- Retrieve Must Support Paths from DB ---
must_support_paths = []
# Query DB once for the ProcessedIg record
processed_ig_record = ProcessedIg.query.filter_by(package_name=package_name, version=package_version).first()
if processed_ig_record and processed_ig_record.must_support_elements:
ms_elements_dict = processed_ig_record.must_support_elements
# Try getting MS paths using the profile ID/name first, fallback to base type
must_support_paths = ms_elements_dict.get(resource_type, [])
if not must_support_paths and base_resource_type_for_sp:
must_support_paths = ms_elements_dict.get(base_resource_type_for_sp, [])
if must_support_paths:
logger.debug(f"Retrieved {len(must_support_paths)} MS paths using base type key '{base_resource_type_for_sp}' from DB.")
elif must_support_paths:
logger.debug(f"Retrieved {len(must_support_paths)} MS paths using profile key '{resource_type}' from DB.")
else:
logger.debug(f"No specific MS paths found for keys '{resource_type}' or '{base_resource_type_for_sp}' in DB.")
else:
logger.debug(f"No processed IG record or no must_support_elements found in DB for {package_name}#{package_version}")
# --- Fetch and Merge Conformance Data ---
search_param_conformance_rules = {}
if base_resource_type_for_sp: # Only proceed if we identified the base type
# Reuse the DB record queried for Must Support if available
if processed_ig_record:
# Check if the record has the conformance data attribute and it's not None/empty
# **IMPORTANT**: This assumes 'search_param_conformance' column was added to the model
if hasattr(processed_ig_record, 'search_param_conformance') and processed_ig_record.search_param_conformance:
all_conformance_data = processed_ig_record.search_param_conformance
# Get the specific rules map for the current base resource type
search_param_conformance_rules = all_conformance_data.get(base_resource_type_for_sp, {})
logger.debug(f"Retrieved conformance rules for {base_resource_type_for_sp} from DB: {search_param_conformance_rules}")
else:
logger.warning(f"ProcessedIg record found, but 'search_param_conformance' attribute/data is missing or empty for {package_name}#{package_version}.")
else:
# This case should be rare if MS check already happened, but handles it
logger.warning(f"No ProcessedIg record found for {package_name}#{package_version} to get conformance rules.")
# Merge the retrieved conformance rules into the search_params_data list
if search_params_data:
logger.debug(f"Merging conformance data into {len(search_params_data)} search parameters.")
for param in search_params_data:
param_code = param.get('code')
if param_code:
# Lookup the code in the rules; default to 'Optional' if not found
conformance_level = search_param_conformance_rules.get(param_code, 'Optional')
param['conformance'] = conformance_level # Update the dictionary
else:
# Handle cases where SearchParameter might lack a 'code' (should be rare)
param['conformance'] = 'Unknown'
logger.debug("Finished merging conformance data.")
else:
logger.debug(f"No search parameters found for {base_resource_type_for_sp} to merge conformance data into.")
else:
logger.warning(f"Cannot fetch conformance data because base resource type (e.g., Patient) for '{resource_type}' could not be determined.")
# Ensure existing search params still have a default conformance
for param in search_params_data:
if 'conformance' not in param or param['conformance'] == 'N/A':
param['conformance'] = 'Optional'
# --- Construct the final response ---
response_data = {
'elements': enriched_elements,
'must_support_paths': must_support_paths,
# This list now includes the 'conformance' field with actual values (or 'Optional'/'Unknown')
'search_parameters': search_params_data,
'fallback_used': fallback_used,
'source_package': source_package_id
# Consider explicitly including the raw sd_data['differential'] if needed by JS,
# otherwise keep it excluded to reduce payload size.
# 'differential_elements': differential_elements
}
# Use Response object for consistent JSON formatting and smaller payload
# indent=None, separators=(',', ':') creates the most compact JSON
return Response(json.dumps(response_data, indent=None, separators=(',', ':')), mimetype='application/json')
# --- End of /get-structure Function ---
@app.route('/get-example')
def get_example():
package_name = request.args.get('package_name')
version = request.args.get('package_version')
filename = request.args.get('filename')
if not all([package_name, version, filename]):
logger.warning("get_example: Missing query parameters: package_name=%s, version=%s, filename=%s", package_name, version, filename)
return jsonify({"error": "Missing required query parameters: package_name, package_version, filename"}), 400
if not filename.startswith('package/') or '..' in filename:
logger.warning(f"Invalid example file path requested: {filename}")
return jsonify({"error": "Invalid example file path."}), 400
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
if not packages_dir:
logger.error("FHIR_PACKAGES_DIR not configured.")
return jsonify({"error": "Server configuration error: Package directory not set."}), 500
tgz_filename = services.construct_tgz_filename(package_name, version)
tgz_path = os.path.join(packages_dir, tgz_filename)
if not os.path.exists(tgz_path):
logger.error(f"Package file not found: {tgz_path}")
return jsonify({"error": f"Package {package_name}#{version} not found"}), 404
try:
with tarfile.open(tgz_path, "r:gz") as tar:
try:
example_member = tar.getmember(filename)
with tar.extractfile(example_member) as example_fileobj:
content_bytes = example_fileobj.read()
content_string = content_bytes.decode('utf-8-sig')
# Parse JSON to remove narrative
content = json.loads(content_string)
if 'text' in content:
logger.debug(f"Removing narrative text from example '{filename}'")
del content['text']
# Return filtered JSON content as a compact string
filtered_content_string = json.dumps(content, separators=(',', ':'), sort_keys=False)
return Response(filtered_content_string, mimetype='application/json')
except KeyError:
logger.error(f"Example file '{filename}' not found within {tgz_filename}")
return jsonify({"error": f"Example file '{os.path.basename(filename)}' not found in package."}), 404
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error for example '{filename}' in {tgz_filename}: {e}")
return jsonify({"error": f"Invalid JSON in example file: {str(e)}"}), 500
except UnicodeDecodeError as e:
logger.error(f"Encoding error reading example '{filename}' from {tgz_filename}: {e}")
return jsonify({"error": f"Error decoding example file (invalid UTF-8?): {str(e)}"}), 500
except tarfile.TarError as e:
logger.error(f"TarError reading example '{filename}' from {tgz_filename}: {e}")
return jsonify({"error": f"Error reading package archive: {str(e)}"}), 500
except tarfile.TarError as e:
logger.error(f"Error opening package file {tgz_path}: {e}")
return jsonify({"error": f"Error reading package archive: {str(e)}"}), 500
except FileNotFoundError:
logger.error(f"Package file disappeared: {tgz_path}")
return jsonify({"error": f"Package file not found: {package_name}#{version}"}), 404
except Exception as e:
logger.error(f"Unexpected error getting example '{filename}' from {tgz_filename}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error: {str(e)}"}), 500
@app.route('/get-package-metadata')
def get_package_metadata():
package_name = request.args.get('package_name')
version = request.args.get('version')
if not package_name or not version:
return jsonify({'error': 'Missing package_name or version parameter'}), 400
try:
metadata = services.get_package_metadata(package_name, version)
if metadata:
return jsonify({
'package_name': metadata.get('package_name'),
'version': metadata.get('version'),
'dependency_mode': metadata.get('dependency_mode'),
'imported_dependencies': metadata.get('imported_dependencies', []),
'complies_with_profiles': metadata.get('complies_with_profiles', []),
'imposed_profiles': metadata.get('imposed_profiles', [])
})
else:
return jsonify({'error': 'Metadata file not found for this package version.'}), 404
except Exception as e:
logger.error(f"Error retrieving metadata for {package_name}#{version}: {e}", exc_info=True)
return jsonify({'error': f'Error retrieving metadata: {str(e)}'}), 500
@app.route('/api/import-ig', methods=['POST'])
def api_import_ig():
auth_error = check_api_key()
if auth_error:
return auth_error
if not request.is_json:
return jsonify({"status": "error", "message": "Request must be JSON"}), 400
data = request.get_json()
package_name = data.get('package_name')
version = data.get('version')
dependency_mode = data.get('dependency_mode', 'recursive')
if not package_name or not version:
return jsonify({"status": "error", "message": "Missing package_name or version"}), 400
if not (isinstance(package_name, str) and isinstance(version, str) and
re.match(r'^[a-zA-Z0-9\-\.]+$', package_name) and
re.match(r'^[a-zA-Z0-9\.\-\+]+$', version)):
return jsonify({"status": "error", "message": "Invalid characters in package name or version"}), 400
valid_modes = ['recursive', 'patch-canonical', 'tree-shaking', 'direct']
if dependency_mode not in valid_modes:
return jsonify({"status": "error", "message": f"Invalid dependency mode: {dependency_mode}. Must be one of {valid_modes}"}), 400
try:
result = services.import_package_and_dependencies(package_name, version, dependency_mode=dependency_mode)
if result['errors'] and not result['downloaded']:
error_msg = f"Failed to import {package_name}#{version}: {result['errors'][0]}"
logger.error(f"[API] Import failed: {error_msg}")
status_code = 404 if "404" in result['errors'][0] else 500
return jsonify({"status": "error", "message": error_msg}), status_code
package_filename = services.construct_tgz_filename(package_name, version)
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR', '/app/instance/fhir_packages')
package_path = os.path.join(packages_dir, package_filename)
complies_with_profiles = []
imposed_profiles = []
processing_errors = []
if os.path.exists(package_path):
logger.info(f"[API] Processing downloaded package {package_path} for metadata.")
process_result = services.process_package_file(package_path)
complies_with_profiles = process_result.get('complies_with_profiles', [])
imposed_profiles = process_result.get('imposed_profiles', [])
if process_result.get('errors'):
processing_errors.extend(process_result['errors'])
logger.warning(f"[API] Errors during post-import processing of {package_name}#{version}: {processing_errors}")
else:
logger.warning(f"[API] Package file {package_path} not found after reported successful download.")
processing_errors.append("Package file disappeared after download.")
all_packages, errors, duplicate_groups_after = list_downloaded_packages(packages_dir)
duplicates_found = []
for name, versions in duplicate_groups_after.items():
duplicates_found.append(f"{name} (Versions present: {', '.join(versions)})")
response_status = "success"
response_message = "Package imported successfully."
if result['errors'] or processing_errors:
response_status = "warning"
response_message = "Package imported, but some errors occurred during processing or dependency handling."
all_issues = result.get('errors', []) + processing_errors
logger.warning(f"[API] Import for {package_name}#{version} completed with warnings/errors: {all_issues}")
response = {
"status": response_status,
"message": response_message,
"package_name": package_name,
"version": version,
"dependency_mode": dependency_mode,
"dependencies_processed": result.get('dependencies', []),
"complies_with_profiles": complies_with_profiles,
"imposed_profiles": imposed_profiles,
"processing_issues": result.get('errors', []) + processing_errors,
"duplicate_packages_present": duplicates_found
}
return jsonify(response), 200
except Exception as e:
logger.error(f"[API] Unexpected error in api_import_ig for {package_name}#{version}: {str(e)}", exc_info=True)
return jsonify({"status": "error", "message": f"Unexpected server error during import: {str(e)}"}), 500
@app.route('/api/push-ig', methods=['POST'])
def api_push_ig():
auth_error = check_api_key()
if auth_error: return auth_error
if not request.is_json: return jsonify({"status": "error", "message": "Request must be JSON"}), 400
data = request.get_json()
package_name = data.get('package_name')
version = data.get('version')
fhir_server_url = data.get('fhir_server_url')
include_dependencies = data.get('include_dependencies', True)
auth_type = data.get('auth_type', 'none')
auth_token = data.get('auth_token')
resource_types_filter_raw = data.get('resource_types_filter')
skip_files_raw = data.get('skip_files')
dry_run = data.get('dry_run', False)
verbose = data.get('verbose', False)
force_upload = data.get('force_upload', False) # <<< ADD: Extract force_upload
# --- Input Validation (Assume previous validation is sufficient) ---
if not all([package_name, version, fhir_server_url]): return jsonify({"status": "error", "message": "Missing required fields"}), 400
# ... (Keep other specific validations as needed) ...
valid_auth_types = ['apiKey', 'bearerToken', 'none'];
if auth_type not in valid_auth_types: return jsonify({"status": "error", "message": f"Invalid auth_type."}), 400
if auth_type == 'bearerToken' and not auth_token: return jsonify({"status": "error", "message": "auth_token required for bearerToken."}), 400
# Parse filters (same as before)
resource_types_filter = None
if resource_types_filter_raw:
if isinstance(resource_types_filter_raw, list): resource_types_filter = [s for s in resource_types_filter_raw if isinstance(s, str)]
elif isinstance(resource_types_filter_raw, str): resource_types_filter = [s.strip() for s in resource_types_filter_raw.split(',') if s.strip()]
else: return jsonify({"status": "error", "message": "Invalid resource_types_filter format."}), 400
skip_files = None
if skip_files_raw:
if isinstance(skip_files_raw, list): skip_files = [s.strip().replace('\\', '/') for s in skip_files_raw if isinstance(s, str) and s.strip()]
elif isinstance(skip_files_raw, str): skip_files = [s.strip().replace('\\', '/') for s in re.split(r'[,\n]', skip_files_raw) if s.strip()]
else: return jsonify({"status": "error", "message": "Invalid skip_files format."}), 400
# --- File Path Setup (Same as before) ---
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
if not packages_dir: return jsonify({"status": "error", "message": "Server config error: Package dir missing."}), 500
# ... (check if package tgz exists - same as before) ...
tgz_filename = services.construct_tgz_filename(package_name, version)
tgz_path = os.path.join(packages_dir, tgz_filename)
if not os.path.exists(tgz_path): return jsonify({"status": "error", "message": f"Package not found locally: {package_name}#{version}"}), 404
# --- Streaming Response ---
def generate_stream_wrapper():
yield from services.generate_push_stream(
package_name=package_name, version=version, fhir_server_url=fhir_server_url,
include_dependencies=include_dependencies, auth_type=auth_type,
auth_token=auth_token, resource_types_filter=resource_types_filter,
skip_files=skip_files, dry_run=dry_run, verbose=verbose,
force_upload=force_upload, # <<< ADD: Pass force_upload
packages_dir=packages_dir
)
return Response(generate_stream_wrapper(), mimetype='application/x-ndjson')
# Ensure csrf.exempt(api_push_ig) remains
@app.route('/validate-sample', methods=['GET'])
def validate_sample():
form = ValidationForm()
packages = []
packages_dir = app.config['FHIR_PACKAGES_DIR']
if os.path.exists(packages_dir):
for filename in os.listdir(packages_dir):
if filename.endswith('.tgz'):
try:
with tarfile.open(os.path.join(packages_dir, filename), 'r:gz') as tar:
package_json = tar.extractfile('package/package.json')
if package_json:
pkg_info = json.load(package_json)
name = pkg_info.get('name')
version = pkg_info.get('version')
if name and version:
packages.append({'name': name, 'version': version})
except Exception as e:
logger.warning(f"Error reading package {filename}: {e}")
continue
return render_template(
'validate_sample.html',
form=form,
packages=packages,
validation_report=None,
site_name='FHIRFLARE IG Toolkit',
now=datetime.datetime.now(), app_mode=app.config['APP_MODE']
)
# Exempt specific API views defined directly on 'app'
csrf.exempt(api_import_ig) # Add this line
csrf.exempt(api_push_ig) # Add this line
# Exempt the entire API blueprint (for routes defined IN services.py, like /api/validate-sample)
csrf.exempt(services_bp) # Keep this line for routes defined in the blueprint
def create_db():
logger.debug(f"Attempting to create database tables for URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
try:
db.create_all()
logger.info("Database tables created successfully (if they didn't exist).")
except Exception as e:
logger.error(f"Failed to initialize database tables: {e}", exc_info=True)
raise
with app.app_context():
create_db()
class FhirRequestForm(FlaskForm):
submit = SubmitField('Send Request')
@app.route('/fhir')
def fhir_ui():
form = FhirRequestForm()
return render_template('fhir_ui.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now(), app_mode=app.config['APP_MODE'])
@app.route('/fhir-ui-operations')
def fhir_ui_operations():
form = FhirRequestForm()
return render_template('fhir_ui_operations.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now(), app_mode=app.config['APP_MODE'])
@app.route('/fhir/<path:subpath>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy_hapi(subpath):
# Clean subpath to remove r4/, fhir/, leading/trailing slashes
clean_subpath = subpath.replace('r4/', '').replace('fhir/', '').strip('/')
hapi_url = f"http://localhost:8080/fhir/{clean_subpath}" if clean_subpath else "http://localhost:8080/fhir"
headers = {k: v for k, v in request.headers.items() if k != 'Host'}
logger.debug(f"Proxying request: {request.method} {hapi_url}")
try:
response = requests.request(
method=request.method,
url=hapi_url,
headers=headers,
data=request.get_data(),
cookies=request.cookies,
allow_redirects=False,
timeout=5
)
response.raise_for_status()
# Strip hop-by-hop headers to avoid chunked encoding issues
response_headers = {
k: v for k, v in response.headers.items()
if k.lower() not in (
'transfer-encoding', 'connection', 'content-encoding',
'content-length', 'keep-alive', 'proxy-authenticate',
'proxy-authorization', 'te', 'trailers', 'upgrade'
)
}
response_headers['Content-Length'] = str(len(response.content))
logger.debug(f"HAPI response: {response.status_code} {response.reason}")
return response.content, response.status_code, response_headers.items()
except requests.RequestException as e:
logger.error(f"HAPI proxy error for {subpath}: {str(e)}")
error_message = "HAPI FHIR server is unavailable. Please check server status."
if clean_subpath == 'metadata':
error_message = "Unable to connect to HAPI FHIR server for status check. Local validation will be used."
return jsonify({'error': error_message, 'details': str(e)}), 503
@app.route('/api/load-ig-to-hapi', methods=['POST'])
def load_ig_to_hapi():
data = request.get_json()
package_name = data.get('package_name')
version = data.get('version')
tgz_path = os.path.join(current_app.config['FHIR_PACKAGES_DIR'], construct_tgz_filename(package_name, version))
if not os.path.exists(tgz_path):
return jsonify({"error": "Package not found"}), 404
try:
with tarfile.open(tgz_path, "r:gz") as tar:
for member in tar.getmembers():
if member.name.endswith('.json') and member.name not in ['package/package.json', 'package/.index.json']:
resource = json.load(tar.extractfile(member))
resource_type = resource.get('resourceType')
resource_id = resource.get('id')
if resource_type and resource_id:
response = requests.put(
f"http://localhost:8080/fhir/{resource_type}/{resource_id}",
json=resource,
headers={'Content-Type': 'application/fhir+json'}
)
response.raise_for_status()
return jsonify({"status": "success", "message": f"Loaded {package_name}#{version} to HAPI"})
except Exception as e:
logger.error(f"Failed to load IG to HAPI: {e}")
return jsonify({"error": str(e)}), 500
# Assuming 'app' and 'logger' are defined, and other necessary imports are present above
@app.route('/fsh-converter', methods=['GET', 'POST'])
def fsh_converter():
form = FSHConverterForm()
fsh_output = None
error = None
comparison_report = None
# --- Populate package choices ---
packages = []
packages_dir = app.config.get('FHIR_PACKAGES_DIR', '/app/instance/fhir_packages') # Use .get with default
logger.debug(f"Scanning packages directory: {packages_dir}")
if os.path.exists(packages_dir):
tgz_files = [f for f in os.listdir(packages_dir) if f.endswith('.tgz')]
logger.debug(f"Found {len(tgz_files)} .tgz files: {tgz_files}")
for filename in tgz_files:
package_file_path = os.path.join(packages_dir, filename)
try:
# Check if it's a valid tar.gz file before opening
if not tarfile.is_tarfile(package_file_path):
logger.warning(f"Skipping non-tarfile or corrupted file: {filename}")
continue
with tarfile.open(package_file_path, 'r:gz') as tar:
# Find package.json case-insensitively and handle potential path variations
package_json_path = next((m for m in tar.getmembers() if m.name.lower().endswith('package.json') and m.isfile() and ('/' not in m.name.replace('package/','', 1).lower())), None) # Handle package/ prefix better
if package_json_path:
package_json_stream = tar.extractfile(package_json_path)
if package_json_stream:
try:
pkg_info = json.load(package_json_stream)
name = pkg_info.get('name')
version = pkg_info.get('version')
if name and version:
package_id = f"{name}#{version}"
packages.append((package_id, package_id))
logger.debug(f"Added package: {package_id}")
else:
logger.warning(f"Missing name or version in {filename}/package.json: name={name}, version={version}")
except json.JSONDecodeError as json_e:
logger.warning(f"Error decoding package.json from {filename}: {json_e}")
except Exception as read_e:
logger.warning(f"Error reading stream from package.json in {filename}: {read_e}")
finally:
package_json_stream.close() # Ensure stream is closed
else:
logger.warning(f"Could not extract package.json stream from {filename} (path: {package_json_path.name})")
else:
logger.warning(f"No suitable package.json found in {filename}")
except tarfile.ReadError as tar_e:
logger.warning(f"Tarfile read error for {filename}: {tar_e}")
except Exception as e:
logger.warning(f"Error processing package {filename}: {str(e)}")
continue # Continue to next file
else:
logger.warning(f"Packages directory does not exist: {packages_dir}")
unique_packages = sorted(list(set(packages)), key=lambda x: x[0])
form.package.choices = [('', 'None')] + unique_packages
logger.debug(f"Set package choices: {form.package.choices}")
# --- End package choices ---
if form.validate_on_submit(): # This block handles POST requests
input_mode = form.input_mode.data
# Use request.files.get to safely access file data
fhir_file_storage = request.files.get(form.fhir_file.name)
fhir_file = fhir_file_storage if fhir_file_storage and fhir_file_storage.filename != '' else None
fhir_text = form.fhir_text.data
alias_file_storage = request.files.get(form.alias_file.name)
alias_file = alias_file_storage if alias_file_storage and alias_file_storage.filename != '' else None
output_style = form.output_style.data
log_level = form.log_level.data
fhir_version = form.fhir_version.data if form.fhir_version.data != 'auto' else None
fishing_trip = form.fishing_trip.data
dependencies = [dep.strip() for dep in form.dependencies.data.splitlines() if dep.strip()] if form.dependencies.data else None # Use splitlines()
indent_rules = form.indent_rules.data
meta_profile = form.meta_profile.data
no_alias = form.no_alias.data
logger.debug(f"Processing input: mode={input_mode}, has_file={bool(fhir_file)}, has_text={bool(fhir_text)}, has_alias={bool(alias_file)}")
# Pass the FileStorage object directly if needed by process_fhir_input
input_file, temp_dir, alias_path, input_error = services.process_fhir_input(input_mode, fhir_file, fhir_text, alias_file)
if input_error:
error = input_error
flash(error, 'error')
logger.error(f"Input processing error: {error}")
if temp_dir and os.path.exists(temp_dir):
try: shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as cleanup_e: logger.warning(f"Error removing temp dir after input error {temp_dir}: {cleanup_e}")
else:
# Proceed only if input processing was successful
output_dir = os.path.join(app.config.get('UPLOAD_FOLDER', '/app/static/uploads'), 'fsh_output') # Use .get
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"Running GoFSH with input: {input_file}, output_dir: {output_dir}")
# Pass form data directly to run_gofsh
fsh_output, comparison_report, gofsh_error = services.run_gofsh(
input_file, output_dir, output_style, log_level, fhir_version,
fishing_trip, dependencies, indent_rules, meta_profile, alias_path, no_alias
)
# Clean up temp dir after GoFSH run
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
logger.debug(f"Successfully removed temp directory: {temp_dir}")
except Exception as cleanup_e:
logger.warning(f"Error removing temp directory {temp_dir}: {cleanup_e}")
if gofsh_error:
error = gofsh_error
flash(error, 'error')
logger.error(f"GoFSH error: {error}")
else:
# Store potentially large output carefully - session might have limits
session['fsh_output'] = fsh_output
flash('Conversion successful!', 'success')
logger.info("FSH conversion successful")
# Return response for POST (AJAX or full page)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
logger.debug("Returning partial HTML for AJAX POST request.")
return render_template('_fsh_output.html', form=form, error=error, fsh_output=fsh_output, comparison_report=comparison_report)
else:
# For standard POST, re-render the full page with results/errors
logger.debug("Handling standard POST request, rendering full page.")
return render_template('fsh_converter.html', form=form, error=error, fsh_output=fsh_output, comparison_report=comparison_report, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
# --- Handle GET request (Initial Page Load or Failed POST Validation) ---
else:
if request.method == 'POST': # POST but validation failed
logger.warning("POST request failed form validation.")
# Render the full page, WTForms errors will be displayed by render_field
return render_template('fsh_converter.html', form=form, error="Form validation failed. Please check fields.", fsh_output=None, comparison_report=None, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
else:
# This is the initial GET request
logger.debug("Handling GET request for FSH converter page.")
# **** FIX APPLIED HERE ****
# Make the response object to add headers
response = make_response(render_template(
'fsh_converter.html',
form=form, # Pass the empty form
error=None,
fsh_output=None,
comparison_report=None,
site_name='FHIRFLARE IG Toolkit',
now=datetime.datetime.now()
))
# Add headers to prevent caching
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# **** END OF FIX ****
@app.route('/download-fsh')
def download_fsh():
fsh_output = session.get('fsh_output')
if not fsh_output:
flash('No FSH output available for download.', 'error')
return redirect(url_for('fsh_converter'))
temp_file = os.path.join(app.config['UPLOAD_FOLDER'], 'output.fsh')
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(fsh_output)
return send_file(temp_file, as_attachment=True, download_name='output.fsh')
@app.route('/upload-test-data', methods=['GET'])
def upload_test_data():
"""Renders the page for uploading test data."""
form = TestDataUploadForm()
try:
processed_igs = ProcessedIg.query.order_by(ProcessedIg.package_name, ProcessedIg.version).all()
form.validation_package_id.choices = [('', '-- Select Package for Validation --')] + [
(f"{ig.package_name}#{ig.version}", f"{ig.package_name}#{ig.version}") for ig in processed_igs ]
except Exception as e:
logger.error(f"Error fetching processed IGs: {e}")
flash("Could not load processed packages for validation.", "warning")
form.validation_package_id.choices = [('', '-- Error Loading Packages --')]
api_key = current_app.config.get('API_KEY', '')
return render_template('upload_test_data.html', title="Upload Test Data", form=form, api_key=api_key)
@app.route('/api/upload-test-data', methods=['POST'])
@csrf.exempt
def api_upload_test_data():
"""API endpoint to handle test data upload and processing, using custom parser."""
auth_error = check_api_key();
if auth_error: return auth_error
temp_dir = None # Initialize temp_dir to ensure cleanup happens
try:
# --- Use Custom Form Parser ---
# Instantiate the custom parser with the desired limit
parser = CustomFormDataParser()
#parser = CustomFormDataParser(max_form_parts=2000) # Match the class definition or set higher if needed
# Parse the request using the custom parser
# We need the stream, mimetype, content_length, and options from the request
# Note: Accessing request.stream consumes it, do this first.
stream = request.stream
mimetype = request.mimetype
content_length = request.content_length
options = request.mimetype_params
# The parse method returns (stream, form_dict, files_dict)
# stream: A wrapper around the original stream
# form_dict: A MultiDict containing non-file form fields
# files_dict: A MultiDict containing FileStorage objects for uploaded files
_, form_data, files_data = parser.parse(stream, mimetype, content_length, options)
logger.debug(f"Form parsed using CustomFormDataParser. Form fields: {len(form_data)}, Files: {len(files_data)}")
# --- END Custom Form Parser Usage ---
# --- Extract Form Data (using parsed data) ---
fhir_server_url = form_data.get('fhir_server_url')
auth_type = form_data.get('auth_type', 'none')
auth_token = form_data.get('auth_token')
upload_mode = form_data.get('upload_mode', 'individual')
error_handling = form_data.get('error_handling', 'stop')
validate_before_upload_str = form_data.get('validate_before_upload', 'false')
validate_before_upload = validate_before_upload_str.lower() == 'true'
validation_package_id = form_data.get('validation_package_id') if validate_before_upload else None
use_conditional_uploads_str = form_data.get('use_conditional_uploads', 'false')
use_conditional_uploads = use_conditional_uploads_str.lower() == 'true'
logger.debug(f"API Upload Request Params: validate={validate_before_upload}, pkg_id={validation_package_id}, conditional={use_conditional_uploads}")
# --- Basic Validation (using parsed data) ---
if not fhir_server_url or not fhir_server_url.startswith(('http://', 'https://')): return jsonify({"status": "error", "message": "Invalid Target FHIR Server URL."}), 400
if auth_type not in ['none', 'bearerToken']: return jsonify({"status": "error", "message": "Invalid Authentication Type."}), 400
if auth_type == 'bearerToken' and not auth_token: return jsonify({"status": "error", "message": "Bearer Token required."}), 400
if upload_mode not in ['individual', 'transaction']: return jsonify({"status": "error", "message": "Invalid Upload Mode."}), 400
if error_handling not in ['stop', 'continue']: return jsonify({"status": "error", "message": "Invalid Error Handling mode."}), 400
if validate_before_upload and not validation_package_id: return jsonify({"status": "error", "message": "Validation Package ID required."}), 400
# --- Handle File Uploads (using parsed data) ---
# Use files_data obtained from the custom parser
uploaded_files = files_data.getlist('test_data_files')
if not uploaded_files or all(f.filename == '' for f in uploaded_files): return jsonify({"status": "error", "message": "No files selected."}), 400
temp_dir = tempfile.mkdtemp(prefix='fhirflare_upload_')
saved_file_paths = []
allowed_extensions = {'.json', '.xml', '.zip'}
try:
for file_storage in uploaded_files: # Iterate through FileStorage objects
if file_storage and file_storage.filename:
filename = secure_filename(file_storage.filename)
file_ext = os.path.splitext(filename)[1].lower()
if file_ext not in allowed_extensions: raise ValueError(f"Invalid file type: '{filename}'. Only JSON, XML, ZIP allowed.")
save_path = os.path.join(temp_dir, filename)
file_storage.save(save_path) # Use the save method of FileStorage
saved_file_paths.append(save_path)
if not saved_file_paths: raise ValueError("No valid files saved.")
logger.debug(f"Saved {len(saved_file_paths)} files to {temp_dir}")
except ValueError as ve:
if temp_dir and os.path.exists(temp_dir): shutil.rmtree(temp_dir)
logger.warning(f"Upload rejected: {ve}"); return jsonify({"status": "error", "message": str(ve)}), 400
except Exception as file_err:
if temp_dir and os.path.exists(temp_dir): shutil.rmtree(temp_dir)
logger.error(f"Error saving uploaded files: {file_err}", exc_info=True); return jsonify({"status": "error", "message": "Error saving uploaded files."}), 500
# --- Prepare Server Info and Options ---
server_info = {'url': fhir_server_url, 'auth_type': auth_type, 'auth_token': auth_token}
options = {
'upload_mode': upload_mode,
'error_handling': error_handling,
'validate_before_upload': validate_before_upload,
'validation_package_id': validation_package_id,
'use_conditional_uploads': use_conditional_uploads
}
# --- Call Service Function (Streaming Response) ---
def generate_stream_wrapper():
try:
with app.app_context():
yield from services.process_and_upload_test_data(server_info, options, temp_dir)
finally:
try: logger.debug(f"Cleaning up temp dir: {temp_dir}"); shutil.rmtree(temp_dir)
except Exception as cleanup_e: logger.error(f"Error cleaning up temp dir {temp_dir}: {cleanup_e}")
return Response(generate_stream_wrapper(), mimetype='application/x-ndjson')
except RequestEntityTooLarge as e:
# Catch the specific exception if the custom parser still fails (e.g., limit too low)
logger.error(f"RequestEntityTooLarge error in /api/upload-test-data despite custom parser: {e}", exc_info=True)
if temp_dir and os.path.exists(temp_dir):
try: shutil.rmtree(temp_dir)
except Exception as cleanup_e: logger.error(f"Error cleaning up temp dir during exception: {cleanup_e}")
return jsonify({"status": "error", "message": f"Upload failed: Request entity too large. Try increasing parser limit or reducing files/size. ({str(e)})"}), 413
except Exception as e:
# Catch other potential errors during parsing or setup
logger.error(f"Error in /api/upload-test-data: {e}", exc_info=True)
if temp_dir and os.path.exists(temp_dir):
try: shutil.rmtree(temp_dir)
except Exception as cleanup_e: logger.error(f"Error cleaning up temp dir during exception: {cleanup_e}")
return jsonify({"status": "error", "message": f"Unexpected server error: {str(e)}"}), 500
@app.route('/favicon.ico')
def favicon():
return send_file(os.path.join(app.static_folder, 'favicon.ico'), mimetype='image/x-icon')
if __name__ == '__main__':
with app.app_context():
logger.debug(f"Instance path configuration: {app.instance_path}")
logger.debug(f"Database URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
logger.debug(f"Packages path: {app.config['FHIR_PACKAGES_DIR']}")
logger.debug(f"Flask instance folder path: {app.instance_path}")
logger.debug(f"Directories created/verified: Instance: {app.instance_path}, Packages: {app.config['FHIR_PACKAGES_DIR']}")
logger.debug(f"Attempting to create database tables for URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
db.create_all()
logger.info("Database tables created successfully (if they didn't exist).")
app.run(host='0.0.0.0', port=5000, debug=False)