diff --git a/Dockerfile b/Dockerfile index fb23bf8..f87b276 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,8 @@ RUN pip install --no-cache-dir -r requirements.txt COPY app.py . COPY services.py . +COPY forms.py . +COPY routes.py . COPY templates/ templates/ COPY static/ static/ COPY tests/ tests/ diff --git a/app.py b/app.py index 7e559f4..16afcdf 100644 --- a/app.py +++ b/app.py @@ -1,9 +1,11 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.dirname(__file__))) + from flask import Flask, render_template, render_template_string, request, redirect, url_for, flash, jsonify, Response from flask_sqlalchemy import SQLAlchemy from flask_wtf import FlaskForm -from wtforms import StringField, SubmitField, SelectField -from wtforms.validators import DataRequired, Regexp -import os +from flask_wtf.csrf import CSRFProtect import tarfile import json from datetime import datetime @@ -11,6 +13,7 @@ import services import logging import requests import re +from forms import IgImportForm, ValidationForm # Set up logging logging.basicConfig(level=logging.DEBUG) @@ -20,14 +23,14 @@ app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-here' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////app/instance/fhir_ig.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False -app.config['FHIR_PACKAGES_DIR'] = os.path.join(app.instance_path, 'fhir_packages') +app.config['FHIR_PACKAGES_DIR'] = '/app/instance/fhir_packages' app.config['API_KEY'] = 'your-api-key-here' -app.config['VALIDATE_IMPOSED_PROFILES'] = True # Enable/disable imposed profile validation -app.config['DISPLAY_PROFILE_RELATIONSHIPS'] = True # Enable/disable UI display of relationships +app.config['VALIDATE_IMPOSED_PROFILES'] = True +app.config['DISPLAY_PROFILE_RELATIONSHIPS'] = True # Ensure directories exist and are writable instance_path = '/app/instance' -db_path = os.path.join(instance_path, 'fhir_ig.db') +db_path = '/app/instance/fhir_ig.db' packages_path = app.config['FHIR_PACKAGES_DIR'] logger.debug(f"Instance path: {instance_path}") @@ -37,9 +40,9 @@ logger.debug(f"Packages path: {packages_path}") try: os.makedirs(instance_path, exist_ok=True) os.makedirs(packages_path, exist_ok=True) - os.chmod(instance_path, 0o777) - os.chmod(packages_path, 0o777) - logger.debug(f"Directories created: {os.listdir('/app')}") + os.chmod(instance_path, 0o755) + os.chmod(packages_path, 0o755) + logger.debug(f"Directories created: {os.listdir(os.path.dirname(__file__))}") logger.debug(f"Instance contents: {os.listdir(instance_path)}") except Exception as e: logger.error(f"Failed to create directories: {e}") @@ -47,22 +50,7 @@ except Exception as e: logger.warning("Falling back to /tmp/fhir_ig.db") db = SQLAlchemy(app) - -class IgImportForm(FlaskForm): - package_name = StringField('Package Name (e.g., hl7.fhir.us.core)', validators=[ - DataRequired(), - Regexp(r'^[a-zAZ0-9]+(\.[a-zA-Z0-9]+)+$', message='Invalid package name format.') - ]) - package_version = StringField('Package Version (e.g., 1.0.0 or current)', validators=[ - DataRequired(), - Regexp(r'^[a-zA-Z0-9\.\-]+$', message='Invalid version format.') - ]) - dependency_mode = SelectField('Dependency Pulling Mode', choices=[ - ('recursive', 'Current Recursive'), - ('patch-canonical', 'Patch Canonical Versions'), - ('tree-shaking', 'Tree Shaking (Only Used Dependencies)') - ], default='recursive') - submit = SubmitField('Fetch & Download IG') +csrf = CSRFProtect(app) class ProcessedIg(db.Model): id = db.Column(db.Integer, primary_key=True) @@ -107,10 +95,11 @@ def import_ig(): return redirect(url_for('view_igs')) except Exception as e: flash(f"Error downloading IG: {str(e)}", "error") - return render_template('import_ig.html', form=form, site_name='FLARE FHIR IG Toolkit', now=datetime.now()) + return render_template('import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.now()) @app.route('/view-igs') def view_igs(): + form = FlaskForm() igs = ProcessedIg.query.all() processed_ids = {(ig.package_name, ig.version) for ig in igs} @@ -161,14 +150,15 @@ def view_igs(): for i, name in enumerate(duplicate_groups.keys()): group_colors[name] = colors[i % len(colors)] - return render_template('cp_downloaded_igs.html', packages=packages, processed_list=igs, + return render_template('cp_downloaded_igs.html', form=form, packages=packages, processed_list=igs, processed_ids=processed_ids, duplicate_names=duplicate_names, duplicate_groups=duplicate_groups, group_colors=group_colors, - site_name='FLARE FHIR IG Toolkit', now=datetime.now(), + site_name='FHIRFLARE IG Toolkit', now=datetime.now(), config=app.config) @app.route('/push-igs', methods=['GET', 'POST']) def push_igs(): + form = FlaskForm() igs = ProcessedIg.query.all() processed_ids = {(ig.package_name, ig.version) for ig in igs} @@ -219,90 +209,102 @@ def push_igs(): for i, name in enumerate(duplicate_groups.keys()): group_colors[name] = colors[i % len(colors)] - return render_template('cp_push_igs.html', packages=packages, processed_list=igs, + return render_template('cp_push_igs.html', form=form, packages=packages, processed_list=igs, processed_ids=processed_ids, duplicate_names=duplicate_names, duplicate_groups=duplicate_groups, group_colors=group_colors, - site_name='FLARE FHIR IG Toolkit', now=datetime.now(), + site_name='FHIRFLARE IG Toolkit', now=datetime.now(), api_key=app.config['API_KEY'], config=app.config) @app.route('/process-igs', methods=['POST']) def process_ig(): - filename = request.form.get('filename') - if not filename or not filename.endswith('.tgz'): - flash("Invalid package file.", "error") - return redirect(url_for('view_igs')) - - tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename) - if not os.path.exists(tgz_path): - flash(f"Package file not found: {filename}", "error") - return redirect(url_for('view_igs')) - - try: - last_hyphen_index = filename.rfind('-') - if last_hyphen_index != -1 and filename.endswith('.tgz'): - name = filename[:last_hyphen_index] - version = filename[last_hyphen_index + 1:-4] - name = name.replace('_', '.') - else: - name = filename[:-4] - version = '' - logger.warning(f"Could not parse version from {filename} during processing") - package_info = services.process_package_file(tgz_path) - processed_ig = ProcessedIg( - package_name=name, - version=version, - processed_date=datetime.now(), - resource_types_info=package_info['resource_types_info'], - must_support_elements=package_info.get('must_support_elements'), - examples=package_info.get('examples') - ) - db.session.add(processed_ig) - db.session.commit() - flash(f"Successfully processed {name}#{version}!", "success") - except Exception as e: - flash(f"Error processing IG: {str(e)}", "error") + form = FlaskForm() + if form.validate_on_submit(): + filename = request.form.get('filename') + if not filename or not filename.endswith('.tgz'): + flash("Invalid package file.", "error") + return redirect(url_for('view_igs')) + + tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename) + if not os.path.exists(tgz_path): + flash(f"Package file not found: {filename}", "error") + return redirect(url_for('view_igs')) + + try: + last_hyphen_index = filename.rfind('-') + if last_hyphen_index != -1 and filename.endswith('.tgz'): + name = filename[:last_hyphen_index] + version = filename[last_hyphen_index + 1:-4] + name = name.replace('_', '.') + else: + name = filename[:-4] + version = '' + logger.warning(f"Could not parse version from {filename} during processing") + package_info = services.process_package_file(tgz_path) + processed_ig = ProcessedIg( + package_name=name, + version=version, + processed_date=datetime.now(), + resource_types_info=package_info['resource_types_info'], + must_support_elements=package_info.get('must_support_elements'), + examples=package_info.get('examples') + ) + db.session.add(processed_ig) + db.session.commit() + flash(f"Successfully processed {name}#{version}!", "success") + except Exception as e: + flash(f"Error processing IG: {str(e)}", "error") + else: + flash("CSRF token missing or invalid.", "error") return redirect(url_for('view_igs')) @app.route('/delete-ig', methods=['POST']) def delete_ig(): - filename = request.form.get('filename') - if not filename or not filename.endswith('.tgz'): - flash("Invalid package file.", "error") - return redirect(url_for('view_igs')) - - tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename) - metadata_path = tgz_path.replace('.tgz', '.metadata.json') - if os.path.exists(tgz_path): - try: - os.remove(tgz_path) - if os.path.exists(metadata_path): - os.remove(metadata_path) - logger.debug(f"Deleted metadata file: {metadata_path}") - flash(f"Deleted {filename}", "success") - except Exception as e: - flash(f"Error deleting {filename}: {str(e)}", "error") + form = FlaskForm() + if form.validate_on_submit(): + filename = request.form.get('filename') + if not filename or not filename.endswith('.tgz'): + flash("Invalid package file.", "error") + return redirect(url_for('view_igs')) + + tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename) + metadata_path = tgz_path.replace('.tgz', '.metadata.json') + if os.path.exists(tgz_path): + try: + os.remove(tgz_path) + if os.path.exists(metadata_path): + os.remove(metadata_path) + logger.debug(f"Deleted metadata file: {metadata_path}") + flash(f"Deleted {filename}", "success") + except Exception as e: + flash(f"Error deleting {filename}: {str(e)}", "error") + else: + flash(f"File not found: {filename}", "error") else: - flash(f"File not found: {filename}", "error") + flash("CSRF token missing or invalid.", "error") return redirect(url_for('view_igs')) @app.route('/unload-ig', methods=['POST']) def unload_ig(): - ig_id = request.form.get('ig_id') - if not ig_id: - flash("Invalid package ID.", "error") - return redirect(url_for('view_igs')) - - processed_ig = db.session.get(ProcessedIg, ig_id) - if processed_ig: - try: - db.session.delete(processed_ig) - db.session.commit() - flash(f"Unloaded {processed_ig.package_name}#{processed_ig.version}", "success") - except Exception as e: - flash(f"Error unloading package: {str(e)}", "error") + form = FlaskForm() + if form.validate_on_submit(): + ig_id = request.form.get('ig_id') + if not ig_id: + flash("Invalid package ID.", "error") + return redirect(url_for('view_igs')) + + processed_ig = db.session.get(ProcessedIg, ig_id) + if processed_ig: + try: + db.session.delete(processed_ig) + db.session.commit() + flash(f"Unloaded {processed_ig.package_name}#{processed_ig.version}", "success") + except Exception as e: + flash(f"Error unloading package: {str(e)}", "error") + else: + flash(f"Package not found with ID: {ig_id}", "error") else: - flash(f"Package not found with ID: {ig_id}", "error") + flash("CSRF token missing or invalid.", "error") return redirect(url_for('view_igs')) @app.route('/view-ig/') @@ -327,7 +329,7 @@ def view_ig(processed_ig_id): return render_template('cp_view_processed_ig.html', title=f"View {processed_ig.package_name}#{processed_ig.version}", processed_ig=processed_ig, profile_list=profile_list, base_list=base_list, - examples_by_type=examples_by_type, site_name='FLARE FHIR IG Toolkit', now=datetime.now(), + examples_by_type=examples_by_type, site_name='FHIRFLARE IG Toolkit', now=datetime.now(), complies_with_profiles=complies_with_profiles, imposed_profiles=imposed_profiles, config=app.config) @@ -418,7 +420,6 @@ def get_package_metadata(): return jsonify({'dependency_mode': metadata['dependency_mode']}) return jsonify({'error': 'Metadata not found'}), 404 -# API Endpoint: Import IG Package @app.route('/api/import-ig', methods=['POST']) def api_import_ig(): auth_error = check_api_key() @@ -522,7 +523,6 @@ def api_import_ig(): logger.error(f"Error in api_import_ig: {str(e)}") return jsonify({"status": "error", "message": f"Error importing package: {str(e)}"}), 500 -# API Endpoint: Push IG to FHIR Server with Streaming @app.route('/api/push-ig', methods=['POST']) def api_push_ig(): auth_error = check_api_key() @@ -597,7 +597,7 @@ def api_push_ig(): continue # Validate against the profile and imposed profiles - validation_result = services.validate_resource_against_profile(resource, pkg_name, pkg_version, resource_type) + validation_result = services.validate_resource_against_profile(pkg_name, pkg_version, resource, include_dependencies=False) if not validation_result['valid']: yield json.dumps({"type": "error", "message": f"Validation failed for {resource_type}/{resource_id} in {pkg_name}#{pkg_version}: {', '.join(validation_result['errors'])}"}) + "\n" failure_count += 1 @@ -643,10 +643,71 @@ def api_push_ig(): return Response(generate_stream(), mimetype='application/x-ndjson') +@app.route('/validate-sample', methods=['GET', 'POST']) +def validate_sample(): + form = ValidationForm() + validation_report = None + packages = [] + + # Load available packages + packages_dir = app.config['FHIR_PACKAGES_DIR'] + if os.path.exists(packages_dir): + for filename in os.listdir(packages_dir): + if filename.endswith('.tgz'): + last_hyphen_index = filename.rfind('-') + if last_hyphen_index != -1 and filename.endswith('.tgz'): + name = filename[:last_hyphen_index] + version = filename[last_hyphen_index + 1:-4] + name = name.replace('_', '.') + try: + with tarfile.open(os.path.join(packages_dir, filename), 'r:gz') as tar: + package_json = tar.extractfile('package/package.json') + pkg_info = json.load(package_json) + packages.append({'name': pkg_info['name'], 'version': pkg_info['version']}) + except Exception as e: + logger.warning(f"Error reading package {filename}: {e}") + continue + + if form.validate_on_submit(): + package_name = form.package_name.data + version = form.version.data + include_dependencies = form.include_dependencies.data + mode = form.mode.data + try: + sample_input = json.loads(form.sample_input.data) + if mode == 'single': + validation_report = services.validate_resource_against_profile( + package_name, version, sample_input, include_dependencies + ) + else: # mode == 'bundle' + validation_report = services.validate_bundle_against_profile( + package_name, version, sample_input, include_dependencies + ) + flash("Validation completed.", 'success') + except json.JSONDecodeError: + flash("Invalid JSON format in sample input.", 'error') + except Exception as e: + logger.error(f"Error validating sample: {e}") + flash(f"Error validating sample: {str(e)}", 'error') + validation_report = {'valid': False, 'errors': [str(e)], 'warnings': [], 'results': {}} + + return render_template( + 'validate_sample.html', + form=form, + packages=packages, + validation_report=validation_report, + site_name='FHIRFLARE IG Toolkit', + now=datetime.now() + ) + with app.app_context(): logger.debug(f"Creating database at: {app.config['SQLALCHEMY_DATABASE_URI']}") - db.create_all() - logger.debug("Database initialization complete") + try: + db.create_all() + logger.debug("Database initialization complete") + except Exception as e: + logger.error(f"Failed to initialize database: {e}") + raise if __name__ == '__main__': app.run(debug=True) \ No newline at end of file diff --git a/forms.py b/forms.py index b6a97d9..38a9946 100644 --- a/forms.py +++ b/forms.py @@ -1,19 +1,44 @@ -# app/modules/fhir_ig_importer/forms.py - from flask_wtf import FlaskForm -from wtforms import StringField, SubmitField -from wtforms.validators import DataRequired, Regexp +from wtforms import StringField, SelectField, TextAreaField, BooleanField, SubmitField +from wtforms.validators import DataRequired, Regexp, Optional class IgImportForm(FlaskForm): - """Form for specifying an IG package to import.""" - # Basic validation for FHIR package names (e.g., hl7.fhir.r4.core) - package_name = StringField('Package Name (e.g., hl7.fhir.au.base)', validators=[ + package_name = StringField('Package Name', validators=[ DataRequired(), - Regexp(r'^[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)+$', message='Invalid package name format.') + Regexp(r'^[a-zA-Z0-9][a-zA-Z0-9\-\.]*[a-zA-Z0-9]$', message="Invalid package name format.") ]) - # Basic validation for version (e.g., 4.1.0, current) - package_version = StringField('Package Version (e.g., 4.1.0 or current)', validators=[ + package_version = StringField('Package Version', validators=[ DataRequired(), - Regexp(r'^[a-zA-Z0-9\.\-]+$', message='Invalid version format.') + Regexp(r'^[a-zA-Z0-9\.\-]+$', message="Invalid version format. Use alphanumeric characters, dots, or hyphens (e.g., 1.2.3, 1.1.0-preview, current).") ]) - submit = SubmitField('Fetch & Download IG') \ No newline at end of file + dependency_mode = SelectField('Dependency Mode', choices=[ + ('recursive', 'Current Recursive'), + ('patch-canonical', 'Patch Canonical Versions'), + ('tree-shaking', 'Tree Shaking (Only Used Dependencies)') + ], default='recursive') + submit = SubmitField('Import') + +class ValidationForm(FlaskForm): + package_name = StringField('Package Name', validators=[DataRequired()]) + version = StringField('Package Version', validators=[DataRequired()]) + include_dependencies = BooleanField('Include Dependencies', default=True) + mode = SelectField('Validation Mode', choices=[ + ('single', 'Single Resource'), + ('bundle', 'Bundle') + ], default='single') + sample_input = TextAreaField('Sample Input', validators=[DataRequired()]) + submit = SubmitField('Validate') + +def validate_json(field, mode): + """Custom validator to ensure input is valid JSON and matches the selected mode.""" + import json + try: + data = json.loads(field) + if mode == 'single' and not isinstance(data, dict): + raise ValueError("Single resource mode requires a JSON object.") + if mode == 'bundle' and (not isinstance(data, dict) or data.get('resourceType') != 'Bundle'): + raise ValueError("Bundle mode requires a JSON object with resourceType 'Bundle'.") + except json.JSONDecodeError: + raise ValueError("Invalid JSON format.") + except ValueError as e: + raise ValueError(str(e)) \ No newline at end of file diff --git a/instance/fhir_ig.db b/instance/fhir_ig.db index 5e97b78..c5fa7b8 100644 Binary files a/instance/fhir_ig.db and b/instance/fhir_ig.db differ diff --git a/instance/fhir_packages/hl7.fhir.au.base-5.1.0-preview.metadata.json b/instance/fhir_packages/hl7.fhir.au.base-5.1.0-preview.metadata.json new file mode 100644 index 0000000..54b0001 --- /dev/null +++ b/instance/fhir_packages/hl7.fhir.au.base-5.1.0-preview.metadata.json @@ -0,0 +1,21 @@ +{ + "package_name": "hl7.fhir.au.base", + "version": "5.1.0-preview", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + }, + { + "name": "hl7.terminology.r4", + "version": "6.2.0" + }, + { + "name": "hl7.fhir.uv.extensions.r4", + "version": "5.2.0" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.fhir.au.base-5.1.0-preview.tgz b/instance/fhir_packages/hl7.fhir.au.base-5.1.0-preview.tgz new file mode 100644 index 0000000..12b9c12 Binary files /dev/null and b/instance/fhir_packages/hl7.fhir.au.base-5.1.0-preview.tgz differ diff --git a/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.metadata.json b/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.metadata.json index de8c7f3..0b89e9e 100644 --- a/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.metadata.json +++ b/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.metadata.json @@ -1,7 +1,7 @@ { "package_name": "hl7.fhir.au.core", "version": "1.1.0-preview", - "dependency_mode": "tree-shaking", + "dependency_mode": "recursive", "imported_dependencies": [ { "name": "hl7.fhir.r4.core", diff --git a/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json b/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json new file mode 100644 index 0000000..d1f669e --- /dev/null +++ b/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json @@ -0,0 +1,8 @@ +{ + "package_name": "hl7.fhir.r4.core", + "version": "4.0.1", + "dependency_mode": "recursive", + "imported_dependencies": [], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.fhir.uv.extensions.r4-5.2.0.metadata.json b/instance/fhir_packages/hl7.fhir.uv.extensions.r4-5.2.0.metadata.json new file mode 100644 index 0000000..3544d6d --- /dev/null +++ b/instance/fhir_packages/hl7.fhir.uv.extensions.r4-5.2.0.metadata.json @@ -0,0 +1,13 @@ +{ + "package_name": "hl7.fhir.uv.extensions.r4", + "version": "5.2.0", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.fhir.uv.ipa-1.0.0.metadata.json b/instance/fhir_packages/hl7.fhir.uv.ipa-1.0.0.metadata.json new file mode 100644 index 0000000..cff87e4 --- /dev/null +++ b/instance/fhir_packages/hl7.fhir.uv.ipa-1.0.0.metadata.json @@ -0,0 +1,21 @@ +{ + "package_name": "hl7.fhir.uv.ipa", + "version": "1.0.0", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + }, + { + "name": "hl7.terminology.r4", + "version": "5.0.0" + }, + { + "name": "hl7.fhir.uv.smart-app-launch", + "version": "2.0.0" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.fhir.uv.ipa-1.0.0.tgz b/instance/fhir_packages/hl7.fhir.uv.ipa-1.0.0.tgz new file mode 100644 index 0000000..79f0584 Binary files /dev/null and b/instance/fhir_packages/hl7.fhir.uv.ipa-1.0.0.tgz differ diff --git a/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.0.0.metadata.json b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.0.0.metadata.json new file mode 100644 index 0000000..e2ffb4e --- /dev/null +++ b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.0.0.metadata.json @@ -0,0 +1,13 @@ +{ + "package_name": "hl7.fhir.uv.smart-app-launch", + "version": "2.0.0", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.0.0.tgz b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.0.0.tgz new file mode 100644 index 0000000..767a6df Binary files /dev/null and b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.0.0.tgz differ diff --git a/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.1.0.metadata.json b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.1.0.metadata.json new file mode 100644 index 0000000..d5b71d3 --- /dev/null +++ b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.1.0.metadata.json @@ -0,0 +1,17 @@ +{ + "package_name": "hl7.fhir.uv.smart-app-launch", + "version": "2.1.0", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + }, + { + "name": "hl7.terminology.r4", + "version": "5.0.0" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.1.0.tgz b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.1.0.tgz new file mode 100644 index 0000000..326c413 Binary files /dev/null and b/instance/fhir_packages/hl7.fhir.uv.smart-app-launch-2.1.0.tgz differ diff --git a/instance/fhir_packages/hl7.terminology.r4-5.0.0.metadata.json b/instance/fhir_packages/hl7.terminology.r4-5.0.0.metadata.json new file mode 100644 index 0000000..4d7bc33 --- /dev/null +++ b/instance/fhir_packages/hl7.terminology.r4-5.0.0.metadata.json @@ -0,0 +1,13 @@ +{ + "package_name": "hl7.terminology.r4", + "version": "5.0.0", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.terminology.r4-5.0.0.tgz b/instance/fhir_packages/hl7.terminology.r4-5.0.0.tgz new file mode 100644 index 0000000..91a9345 Binary files /dev/null and b/instance/fhir_packages/hl7.terminology.r4-5.0.0.tgz differ diff --git a/instance/fhir_packages/hl7.terminology.r4-6.2.0.metadata.json b/instance/fhir_packages/hl7.terminology.r4-6.2.0.metadata.json new file mode 100644 index 0000000..fd28a4e --- /dev/null +++ b/instance/fhir_packages/hl7.terminology.r4-6.2.0.metadata.json @@ -0,0 +1,13 @@ +{ + "package_name": "hl7.terminology.r4", + "version": "6.2.0", + "dependency_mode": "recursive", + "imported_dependencies": [ + { + "name": "hl7.fhir.r4.core", + "version": "4.0.1" + } + ], + "complies_with_profiles": [], + "imposed_profiles": [] +} \ No newline at end of file diff --git a/instance/fhir_packages/hl7.terminology.r4-6.2.0.tgz b/instance/fhir_packages/hl7.terminology.r4-6.2.0.tgz new file mode 100644 index 0000000..5f0aef5 Binary files /dev/null and b/instance/fhir_packages/hl7.terminology.r4-6.2.0.tgz differ diff --git a/services.py b/services.py index 6d7431e..60bb79f 100644 --- a/services.py +++ b/services.py @@ -1,15 +1,18 @@ -# app/modules/fhir_ig_importer/services.py +# app/services.py import requests import os import tarfile -import gzip import json -import io import re import logging from flask import current_app from collections import defaultdict +from pathlib import Path + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) # Constants FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" @@ -20,14 +23,16 @@ CANONICAL_PACKAGE = ("hl7.fhir.r4.core", "4.0.1") # Define the canonical FHIR p def _get_download_dir(): """Gets the absolute path to the download directory, creating it if needed.""" - logger = logging.getLogger(__name__) - instance_path = None # Initialize + instance_path = None try: + # Try to get instance_path from Flask app context if available instance_path = current_app.instance_path logger.debug(f"Using instance path from current_app: {instance_path}") except RuntimeError: + # Fallback if no app context (e.g., running script directly) logger.warning("No app context for instance_path, constructing relative path.") - instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance')) + # Assume services.py is in /app, instance folder sibling to /app + instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance')) logger.debug(f"Constructed instance path: {instance_path}") if not instance_path: @@ -37,10 +42,24 @@ def _get_download_dir(): download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) try: os.makedirs(download_dir, exist_ok=True) + # Add check for flask config path + if 'FHIR_PACKAGES_DIR' not in current_app.config: + current_app.config['FHIR_PACKAGES_DIR'] = download_dir + logger.info(f"Set current_app.config['FHIR_PACKAGES_DIR'] to {download_dir}") return download_dir except OSError as e: logger.error(f"Fatal Error creating dir {download_dir}: {e}", exc_info=True) return None + except RuntimeError: # Catch if current_app doesn't exist here either + logger.warning("No app context available to set FHIR_PACKAGES_DIR config.") + # Still attempt to create and return the path for non-Flask use cases + try: + os.makedirs(download_dir, exist_ok=True) + return download_dir + except OSError as e: + logger.error(f"Fatal Error creating dir {download_dir}: {e}", exc_info=True) + return None + def sanitize_filename_part(text): """Basic sanitization for name/version parts of filename.""" @@ -57,7 +76,6 @@ def find_and_extract_sd(tgz_path, resource_identifier): """Helper to find and extract SD json from a given tgz path by ID, Name, or Type.""" sd_data = None found_path = None - logger = logging.getLogger(__name__) if not tgz_path or not os.path.exists(tgz_path): logger.error(f"File not found in find_and_extract_sd: {tgz_path}") return None, None @@ -75,18 +93,29 @@ def find_and_extract_sd(tgz_path, resource_identifier): fileobj = tar.extractfile(member) if fileobj: content_bytes = fileobj.read() + # Handle potential BOM (Byte Order Mark) content_string = content_bytes.decode('utf-8-sig') data = json.loads(content_string) if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition': sd_id = data.get('id') sd_name = data.get('name') - sd_type = data.get('type') - # Match if requested identifier matches ID, Name, or Base Type - if resource_identifier == sd_type or resource_identifier == sd_id or resource_identifier == sd_name: + sd_type = data.get('type') # The type the SD describes (e.g., Patient) + # Match if requested identifier matches ID, Name, or the Base Type the SD describes + # Case-insensitive matching might be safer for identifiers + if resource_identifier and (resource_identifier.lower() == str(sd_type).lower() or + resource_identifier.lower() == str(sd_id).lower() or + resource_identifier.lower() == str(sd_name).lower()): sd_data = data found_path = member.name - logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}") + logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path} (Matched on Type/ID/Name)") break # Stop searching once found + except json.JSONDecodeError as e: + logger.warning(f"Could not parse JSON in {member.name}: {e}") + except UnicodeDecodeError as e: + logger.warning(f"Could not decode UTF-8 in {member.name}: {e}") + except tarfile.TarError as e: + logger.warning(f"Tar error reading member {member.name}: {e}") + # Potentially break or continue depending on severity preference except Exception as e: logger.warning(f"Could not read/parse potential SD {member.name}: {e}") finally: @@ -95,6 +124,10 @@ def find_and_extract_sd(tgz_path, resource_identifier): if sd_data is None: logger.info(f"SD matching '{resource_identifier}' not found within archive {os.path.basename(tgz_path)} - caller may attempt fallback") + except tarfile.ReadError as e: + logger.error(f"Tar ReadError (possibly corrupted file) reading {tgz_path}: {e}") + # Decide if this should raise or return None + return None, None # Or raise custom error except tarfile.TarError as e: logger.error(f"TarError reading {tgz_path} in find_and_extract_sd: {e}") raise tarfile.TarError(f"Error reading package archive: {e}") from e @@ -106,9 +139,9 @@ def find_and_extract_sd(tgz_path, resource_identifier): raise return sd_data, found_path + def save_package_metadata(name, version, dependency_mode, dependencies, complies_with_profiles=None, imposed_profiles=None): """Saves the dependency mode, imported dependencies, and profile relationships as metadata alongside the package.""" - logger = logging.getLogger(__name__) download_dir = _get_download_dir() if not download_dir: logger.error("Could not get download directory for metadata saving.") @@ -125,7 +158,7 @@ def save_package_metadata(name, version, dependency_mode, dependencies, complies metadata_filename = f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.metadata.json" metadata_path = os.path.join(download_dir, metadata_filename) try: - with open(metadata_path, 'w') as f: + with open(metadata_path, 'w', encoding='utf-8') as f: # Specify encoding json.dump(metadata, f, indent=2) logger.info(f"Saved metadata for {name}#{version} at {metadata_path}") return True @@ -135,7 +168,6 @@ def save_package_metadata(name, version, dependency_mode, dependencies, complies def get_package_metadata(name, version): """Retrieves the metadata for a given package.""" - logger = logging.getLogger(__name__) download_dir = _get_download_dir() if not download_dir: logger.error("Could not get download directory for metadata retrieval.") @@ -145,120 +177,395 @@ def get_package_metadata(name, version): metadata_path = os.path.join(download_dir, metadata_filename) if os.path.exists(metadata_path): try: - with open(metadata_path, 'r') as f: + with open(metadata_path, 'r', encoding='utf-8') as f: # Specify encoding return json.load(f) except Exception as e: logger.error(f"Failed to read metadata for {name}#{version}: {e}") return None return None -def validate_resource_against_profile(resource, package_name, package_version, resource_type): - """ - Validate a FHIR resource against a profile and its imposed profiles. - Returns a dictionary with validation results. - """ - logger = logging.getLogger(__name__) - result = { - 'valid': True, - 'errors': [], - 'imposed_profile_results': {} - } +# --- New navigate_fhir_path --- +def navigate_fhir_path(resource, path): + """Navigate a FHIR resource path, handling arrays, nested structures, and choice types.""" + keys = path.split('.') + # Remove the root resource type if present (e.g., Patient.name -> name) + if keys and resource and isinstance(resource, dict) and keys[0] == resource.get('resourceType'): + keys = keys[1:] - # Load the primary profile - package_filename = f"{sanitize_filename_part(package_name)}-{sanitize_filename_part(package_version)}.tgz" - package_path = os.path.join(_get_download_dir(), package_filename) - if not os.path.exists(package_path): - result['valid'] = False - result['errors'].append(f"Package not found: {package_name}#{package_version}") - return result + current = resource - # Find the StructureDefinition for the resource type - sd_filename = f"package/StructureDefinition-{resource_type.lower()}.json" - if package_name == 'hl7.fhir.us.core': - sd_filename = f"package/StructureDefinition-us-core-{resource_type.lower()}.json" + for i, key in enumerate(keys): + is_last_key = (i == len(keys) - 1) + # logger.debug(f"Navigating: key='{key}', is_last={is_last_key}, current_type={type(current)}") # Uncomment for debug - primary_profile_valid = True - primary_errors = [] - with tarfile.open(package_path, "r:gz") as tar: - try: - file_obj = tar.extractfile(sd_filename) - if file_obj is None: - raise KeyError(f"StructureDefinition not found: {sd_filename}") - sd_data = json.load(file_obj) - # Simplified validation: Check required elements - snapshot = sd_data.get('snapshot', {}) - for element in snapshot.get('element', []): - if element.get('min', 0) > 0: # Required element - path = element.get('path') - # Check if the path exists in the resource - keys = path.split('.') - current = resource - for key in keys[1:]: # Skip the resourceType - current = current.get(key) - if current is None: - primary_profile_valid = False - primary_errors.append(f"Missing required element {path} in {package_name}#{package_version}") - break - except (KeyError, json.JSONDecodeError) as e: - primary_profile_valid = False - primary_errors.append(f"Error loading StructureDefinition: {str(e)}") + if current is None: + # logger.debug(f"Navigation stopped, current became None before processing key '{key}'.") + return None - if not primary_profile_valid: - result['valid'] = False - result['errors'].extend(primary_errors) + if isinstance(current, dict): + # Handle direct key access + if key in current: + current = current.get(key) # Use .get() for safety + # Handle choice type e.g., value[x] + elif '[x]' in key: + base_key = key.replace('[x]', '') + found_choice = False + for k, v in current.items(): + if k.startswith(base_key): + current = v + found_choice = True + break + if not found_choice: + # logger.debug(f"Choice key '{key}' (base: {base_key}) not found in dict keys: {list(current.keys())}") + return None + else: + # logger.debug(f"Key '{key}' not found in dict keys: {list(current.keys())}") + return None - # Check imposed profiles if validation is enabled - if not current_app.config.get('VALIDATE_IMPOSED_PROFILES', True): - logger.info("Imposed profile validation is disabled via configuration.") - return result + elif isinstance(current, list): + # If it's the last key, the path refers to the list itself. + # The validation logic needs to handle checking the list. + if is_last_key: + # logger.debug(f"Path ends on a list for key '{key}'. Returning list: {current}") + return current # Return the list itself for the validator to check - metadata_filename = f"{sanitize_filename_part(package_name)}-{sanitize_filename_part(package_version)}.metadata.json" - metadata_path = os.path.join(_get_download_dir(), metadata_filename) - if not os.path.exists(metadata_path): - logger.warning(f"Metadata not found for {package_name}#{package_version}, skipping imposed profile validation.") - return result + # --- If not the last key, we need to look inside list elements --- + # This is tricky. FHIRPath has complex list navigation. + # For simple validation (does element X exist?), we might assume + # we just need to find *one* item in the list that has the subsequent path. + # Let's try finding the first match within the list. + found_in_list = False + results_from_list = [] + remaining_path = '.'.join(keys[i:]) # The rest of the path including current key + # logger.debug(f"List encountered for key '{key}'. Searching elements for remaining path: '{remaining_path}'") - with open(metadata_path, 'r') as f: - metadata = json.load(f) - imposed_profiles = metadata.get('imposed_profiles', []) + for item in current: + # Recursively navigate into the item using the *remaining* path + sub_result = navigate_fhir_path(item, remaining_path) + if sub_result is not None: + # Collect all non-None results if validating cardinality or specific values later + if isinstance(sub_result, list): + results_from_list.extend(sub_result) + else: + results_from_list.append(sub_result) + # For basic existence check, finding one is enough, but let's collect all + # found_in_list = True + # break # Or collect all? Let's collect for now. - for imposed_url in imposed_profiles: - # Parse the canonical URL to get package name and version - # Example: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient|3.1.1 - try: - imposed_package, imposed_version = parse_canonical_url(imposed_url) - except ValueError as e: - result['errors'].append(f"Invalid canonical URL for imposed profile: {imposed_url} - {str(e)}") - continue + if not results_from_list: + # logger.debug(f"Remaining path '{remaining_path}' not found in any list items.") + return None # Path not found in any list element - imposed_result = validate_resource_against_profile(resource, imposed_package, imposed_version, resource_type) - result['imposed_profile_results'][imposed_url] = imposed_result - if not imposed_result['valid']: - result['valid'] = False - result['errors'].extend([f"Failed imposed profile {imposed_url}: {err}" for err in imposed_result['errors']]) + # What to return? The first result? All results? + # If the final part of the path should be a single value, return first. + # If it could be multiple (e.g., Patient.name.given returns multiple strings), return list. + # Let's return the list of found items. The validator can check if it's non-empty. + # logger.debug(f"Found results in list for '{remaining_path}': {results_from_list}") + return results_from_list # Return list of found values/sub-structures - return result -def parse_canonical_url(canonical_url): - """ - Parse a canonical URL to extract package name and version. - Example: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient|3.1.1 - Returns (package_name, version) - """ - parts = canonical_url.split('|') - if len(parts) != 2: - raise ValueError("Canonical URL must include version after '|'") - version = parts[1] - path_parts = parts[0].split('/') - # Extract package name (e.g., hl7.fhir.us.core) - package_name = '.'.join(path_parts[3:5]) # Adjust based on URL structure - return package_name, version + else: + # Current is not a dict or list, cannot navigate further + # logger.debug(f"Cannot navigate further, current is not dict/list (key='{key}').") + return None + + # logger.debug(f"Final result for path '{path}': {current}") + return current + +# --- End New navigate_fhir_path --- + + +def validate_resource_against_profile(package_name, version, resource, include_dependencies=True): + """Validate a single FHIR resource against a package's StructureDefinitions.""" + logger.debug(f"Starting validation for resource: {resource.get('resourceType')}/{resource.get('id')} against {package_name}#{version}") + try: + # Find the resource's type + resource_type = resource.get('resourceType') + if not resource_type: + return {'valid': False, 'errors': ['Resource is missing resourceType.'], 'warnings': []} + + # Get StructureDefinition + # Ensure download dir is fetched and config potentially set + download_dir = _get_download_dir() + if not download_dir: + return {'valid': False, 'errors': ['Could not determine FHIR package directory.'], 'warnings': []} + + # Construct path using helper for consistency + tgz_filename = _construct_tgz_filename(package_name, version) + # Use absolute path from download_dir + tgz_path = os.path.join(download_dir, tgz_filename) + + logger.debug(f"Attempting to load SD for type '{resource_type}' from tgz: {tgz_path}") + sd_data, sd_path_in_tar = find_and_extract_sd(tgz_path, resource_type) + if not sd_data: + logger.error(f"No StructureDefinition found for type '{resource_type}' in package {package_name}#{version} at {tgz_path}") + # Try falling back to canonical package if not the one requested? Maybe not here. + return {'valid': False, 'errors': [f"StructureDefinition for resource type '{resource_type}' not found in package {package_name}#{version}."], 'warnings': []} + logger.debug(f"Found SD for '{resource_type}' in tar at '{sd_path_in_tar}'") + + # Prefer snapshot if available, otherwise use differential + elements = sd_data.get('snapshot', {}).get('element', []) + if not elements: + elements = sd_data.get('differential', {}).get('element', []) + logger.debug("Using differential elements for validation (snapshot missing).") + if not elements: + logger.error(f"StructureDefinition {sd_data.get('id', resource_type)} has no snapshot or differential elements.") + return {'valid': False, 'errors': [f"StructureDefinition '{sd_data.get('id', resource_type)}' is invalid (no elements)."], 'warnings': []} + + must_support_paths = [] + for element in elements: + if element.get('mustSupport', False): + path = element.get('path', '') + if path: + must_support_paths.append(path) + + errors = [] + warnings = [] + + # --- Revised Required Field Validation (min >= 1) --- + logger.debug(f"Checking required fields for {resource_type} based on SD {sd_data.get('id')}...") + element_definitions = {e.get('path'): e for e in elements if e.get('path')} # Cache elements by path + + for element in elements: + path = element.get('path', '') + min_val = element.get('min', 0) + # Skip base element (e.g., "Patient") as it's always present if resourceType matches + if '.' not in path: + continue + + if min_val >= 1: + logger.debug(f"Checking required path: {path} (min={min_val})") + + # --- START: Parent Presence Check --- + parent_path = '.'.join(path.split('.')[:-1]) + parent_is_present_or_not_applicable = True # Assume true unless parent is optional AND absent + + # Check only if parent_path is a valid element path (not just the root type) + if '.' in parent_path: + parent_element_def = element_definitions.get(parent_path) + if parent_element_def: + parent_min_val = parent_element_def.get('min', 0) + # If the parent element itself is optional (min: 0)... + if parent_min_val == 0: + # ...check if the parent element actually exists in the instance data + parent_value = navigate_fhir_path(resource, parent_path) + if parent_value is None or (isinstance(parent_value, (list, str, dict)) and not parent_value): + # Optional parent is missing, so child cannot be required. Skip the check for this element. + parent_is_present_or_not_applicable = False + logger.debug(f"-> Requirement check for '{path}' skipped: Optional parent '{parent_path}' is absent.") + else: + # This case indicates an issue with the SD structure or path generation, but we'll be lenient + logger.warning(f"Could not find definition for parent path '{parent_path}' while checking requirement for '{path}'. Proceeding with check.") + # --- END: Parent Presence Check --- + + # Only proceed with checking the element itself if its optional parent is present, + # or if the parent is required, or if it's a top-level element. + if parent_is_present_or_not_applicable: + value = navigate_fhir_path(resource, path) + + # 1. Check for presence (is it None or an empty container?) + is_missing_or_empty = False + if value is None: + is_missing_or_empty = True + logger.debug(f"-> Path '{path}' value is None.") + elif isinstance(value, (list, str, dict)) and not value: + is_missing_or_empty = True + logger.debug(f"-> Path '{path}' value is an empty {type(value).__name__}.") + elif isinstance(value, bool) and value is False: pass # Valid presence + elif isinstance(value, (int, float)) and value == 0: pass # Valid presence + + if is_missing_or_empty: + # Log the error only if the parent context allowed the check + errors.append(f"Required field '{path}' is missing or empty.") + logger.warning(f"Validation Error: Required field '{path}' missing or empty (Context: Parent '{parent_path}' required or present).") + continue # Skip further checks for this element if missing + + # 2. Check specific FHIR types if present (value is not None/empty) + # (This part of the logic remains the same as before) + element_types = element.get('type', []) + type_codes = {t.get('code') for t in element_types if t.get('code')} + is_codeable_concept = 'CodeableConcept' in type_codes + is_reference = 'Reference' in type_codes + is_coding = 'Coding' in type_codes + + if is_codeable_concept and isinstance(value, dict): + codings = value.get('coding') + if not value.get('text'): + if not isinstance(codings, list) or not any(isinstance(c, dict) and c.get('code') and c.get('system') for c in codings): + errors.append(f"Required CodeableConcept '{path}' lacks text or a valid coding (must include system and code).") + logger.warning(f"Validation Error: Required CC '{path}' invalid structure.") + elif is_coding and isinstance(value, dict): + if not value.get('code') or not value.get('system'): + errors.append(f"Required Coding '{path}' lacks a system or code.") + logger.warning(f"Validation Error: Required Coding '{path}' invalid structure.") + elif is_reference and isinstance(value, dict): + if not value.get('reference') and not value.get('identifier'): + errors.append(f"Required Reference '{path}' lacks a reference or identifier.") + logger.warning(f"Validation Error: Required Reference '{path}' invalid structure.") + + # --- Revised Must-Support Field Validation --- + logger.debug(f"Checking must-support fields for {resource_type}...") + unique_must_support_paths = sorted(list(set(must_support_paths))) # Avoid duplicate checks if in both snapshot/diff + for path in unique_must_support_paths: + # Skip base element + if '.' not in path: + continue + + logger.debug(f"Checking must-support path: {path}") + value = navigate_fhir_path(resource, path) + + # 1. Check for presence + is_missing_or_empty = False + if value is None: + is_missing_or_empty = True + logger.debug(f"-> Path '{path}' value is None.") + elif isinstance(value, (list, str, dict)) and not value: + is_missing_or_empty = True + logger.debug(f"-> Path '{path}' value is an empty {type(value).__name__}.") + elif isinstance(value, bool) and value is False: + pass + elif isinstance(value, (int, float)) and value == 0: + pass + + if is_missing_or_empty: + warnings.append(f"Must-support field '{path}' is missing or empty.") + logger.info(f"Validation Warning: Must-support field '{path}' missing or empty.") # Use INFO for MS warnings + continue + + # 2. Check specific FHIR types (similar logic to required checks) + element_def = next((e for e in elements if e.get('path') == path), None) + if element_def: + element_types = element_def.get('type', []) + type_codes = {t.get('code') for t in element_types if t.get('code')} + + is_codeable_concept = 'CodeableConcept' in type_codes + is_reference = 'Reference' in type_codes + is_coding = 'Coding' in type_codes + + if is_codeable_concept and isinstance(value, dict): + codings = value.get('coding') + if not value.get('text'): + if not isinstance(codings, list) or not any(isinstance(c, dict) and c.get('code') and c.get('system') for c in codings): + warnings.append(f"Must-support CodeableConcept '{path}' lacks text or a valid coding (must include system and code).") + logger.info(f"Validation Warning: Must-support CC '{path}' invalid structure.") + elif is_coding and isinstance(value, dict): + if not value.get('code') or not value.get('system'): + warnings.append(f"Must-support Coding '{path}' lacks a system or code.") + logger.info(f"Validation Warning: Must-support Coding '{path}' invalid structure.") + elif is_reference and isinstance(value, dict): + if not value.get('reference') and not value.get('identifier'): + warnings.append(f"Must-support Reference '{path}' lacks a reference or identifier.") + logger.info(f"Validation Warning: Must-support Reference '{path}' invalid structure.") + + + # --- Dependency Validation --- + if include_dependencies: + logger.debug("Checking dependencies...") + metadata_path = Path(download_dir) / f"{sanitize_filename_part(package_name)}-{sanitize_filename_part(version)}.metadata.json" + if metadata_path.exists(): + try: + with open(metadata_path, 'r', encoding='utf-8') as f: + metadata = json.load(f) + for dep in metadata.get('imported_dependencies', []): + dep_name = dep.get('name') + dep_version = dep.get('version') + if not dep_name or not dep_version: + logger.warning(f"Skipping invalid dependency entry: {dep}") + continue + logger.debug(f"Recursively validating against dependency: {dep_name}#{dep_version}") + # Pass include_dependencies=False to prevent infinite loops + dep_result = validate_resource_against_profile(dep_name, dep_version, resource, include_dependencies=False) + if not dep_result['valid']: + errors.extend([f"(Dependency {dep_name}#{dep_version}): {e}" for e in dep_result['errors']]) + # Carry over warnings from dependencies as well + warnings.extend([f"(Dependency {dep_name}#{dep_version}): {w}" for w in dep_result['warnings']]) + except Exception as e: + logger.error(f"Failed to load or process metadata {metadata_path} for dependencies: {e}") + errors.append(f"Failed to process dependency metadata for {package_name}#{version}.") + else: + logger.warning(f"Metadata file not found, cannot validate dependencies: {metadata_path}") + + + final_valid_state = len(errors) == 0 + logger.info(f"Validation result for {resource_type}/{resource.get('id')} against {package_name}#{version}: Valid={final_valid_state}, Errors={len(errors)}, Warnings={len(warnings)}") + + return { + 'valid': final_valid_state, + 'errors': errors, + 'warnings': warnings + } + except FileNotFoundError: + # Specific handling if the tgz file itself wasn't found earlier + logger.error(f"Validation failed: Package file not found for {package_name}#{version}") + return {'valid': False, 'errors': [f"Package file for {package_name}#{version} not found."], 'warnings': []} + except tarfile.TarError as e: + logger.error(f"Validation failed due to TarError for {package_name}#{version}: {e}") + return {'valid': False, 'errors': [f"Error reading package archive for {package_name}#{version}: {e}"], 'warnings': []} + except Exception as e: + logger.error(f"Unexpected error during validation of {resource.get('resourceType')}/{resource.get('id')} against {package_name}#{version}: {e}", exc_info=True) + return {'valid': False, 'errors': [f'Unexpected validation error: {str(e)}'], 'warnings': []} + + +def validate_bundle_against_profile(package_name, version, bundle, include_dependencies=True): + """Validate a FHIR Bundle against a package's StructureDefinitions.""" + try: + if not isinstance(bundle, dict) or bundle.get('resourceType') != 'Bundle': + return {'valid': False, 'errors': ['Not a valid Bundle resource.'], 'warnings': [], 'results': {}} + + results = {} + all_errors = [] + all_warnings = [] + bundle_valid = True + + # Validate each entry's resource + logger.info(f"Validating Bundle/{bundle.get('id', 'N/A')} against {package_name}#{version}. Entries: {len(bundle.get('entry', []))}") + for i, entry in enumerate(bundle.get('entry', [])): + resource = entry.get('resource') + entry_id = f"Entry {i}" + resource_id_str = None + + if not resource: + all_errors.append(f"{entry_id}: Missing 'resource' key in entry.") + bundle_valid = False + continue + + if not isinstance(resource, dict): + all_errors.append(f"{entry_id}: 'resource' key does not contain a valid FHIR resource (must be a dictionary).") + bundle_valid = False + continue + + resource_type = resource.get('resourceType') + resource_id = resource.get('id') + resource_id_str = f"{resource_type}/{resource_id}" if resource_type and resource_id else resource_type or f"Unnamed Resource in {entry_id}" + entry_id = f"Entry {i} ({resource_id_str})" # More descriptive ID + + logger.debug(f"Validating {entry_id}...") + result = validate_resource_against_profile(package_name, version, resource, include_dependencies) + results[entry_id] = result # Store result keyed by descriptive entry ID + if not result['valid']: + bundle_valid = False + all_errors.extend([f"{entry_id}: {e}" for e in result['errors']]) + all_warnings.extend([f"{entry_id}: {w}" for w in result['warnings']]) + + # Validate Bundle structure itself (can add more checks based on profile if needed) + if not bundle.get('type'): + all_errors.append("Bundle resource itself is missing the required 'type' field.") + bundle_valid = False + + logger.info(f"Bundle validation finished. Overall Valid: {bundle_valid}, Total Errors: {len(all_errors)}, Total Warnings: {len(all_warnings)}") + return { + 'valid': bundle_valid, + 'errors': all_errors, + 'warnings': all_warnings, + 'results': results # Contains individual resource validation results + } + except Exception as e: + logger.error(f"Unexpected error during bundle validation: {str(e)}", exc_info=True) + return {'valid': False, 'errors': [f'Unexpected bundle validation error: {str(e)}'], 'warnings': [], 'results': {}} -# --- Core Service Functions --- def download_package(name, version): - """ Downloads a single FHIR package. Returns (save_path, error_message) """ - logger = logging.getLogger(__name__) + """Downloads a single FHIR package. Returns (save_path, error_message)""" download_dir = _get_download_dir() if not download_dir: return None, "Could not get/create download directory." @@ -269,67 +576,159 @@ def download_package(name, version): save_path = os.path.join(download_dir, filename) if os.path.exists(save_path): - logger.info(f"Exists: {filename}") + # Optional: Add size check or hash check for existing files? + logger.info(f"Package already exists locally: {filename}") return save_path, None - logger.info(f"Downloading: {package_id} -> {filename}") + logger.info(f"Downloading: {package_id} from {package_url} -> {filename}") try: - with requests.get(package_url, stream=True, timeout=90) as r: - r.raise_for_status() - with open(save_path, 'wb') as f: - logger.debug(f"Opened {save_path} for writing.") - for chunk in r.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) + # Use a session for potential keep-alive benefits + with requests.Session() as session: + with session.get(package_url, stream=True, timeout=90) as r: + r.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) + # Check content type? Should be application/gzip or similar + content_type = r.headers.get('Content-Type', '').lower() + if 'gzip' not in content_type and 'tar' not in content_type: + logger.warning(f"Unexpected Content-Type '{content_type}' for {package_url}") + + # Write to temp file first? Prevents partial downloads being seen as complete. + # temp_save_path = save_path + ".part" + with open(save_path, 'wb') as f: + logger.debug(f"Opened {save_path} for writing.") + bytes_downloaded = 0 + for chunk in r.iter_content(chunk_size=8192): + # filter out keep-alive new chunks + if chunk: + f.write(chunk) + bytes_downloaded += len(chunk) + logger.debug(f"Finished writing {bytes_downloaded} bytes to {save_path}") + # os.rename(temp_save_path, save_path) # Move temp file to final location + + # Basic check after download + if not os.path.exists(save_path) or os.path.getsize(save_path) == 0: + err_msg = f"Download failed for {package_id}: Saved file is missing or empty." + logger.error(err_msg) + # Clean up empty file? + try: os.remove(save_path) + except OSError: pass + return None, err_msg + logger.info(f"Success: Downloaded {filename}") return save_path, None + + except requests.exceptions.HTTPError as e: + # Handle specific HTTP errors like 404 Not Found + err_msg = f"HTTP error downloading {package_id}: {e}" + logger.error(err_msg) + return None, err_msg + except requests.exceptions.ConnectionError as e: + err_msg = f"Connection error downloading {package_id}: {e}" + logger.error(err_msg) + return None, err_msg + except requests.exceptions.Timeout as e: + err_msg = f"Timeout downloading {package_id}: {e}" + logger.error(err_msg) + return None, err_msg except requests.exceptions.RequestException as e: - err_msg = f"Download error for {package_id}: {e}"; logger.error(err_msg); return None, err_msg + err_msg = f"General download error for {package_id}: {e}" + logger.error(err_msg) + return None, err_msg except OSError as e: - err_msg = f"File save error for {filename}: {e}"; logger.error(err_msg); return None, err_msg + err_msg = f"File save error for {filename}: {e}" + logger.error(err_msg) + # Clean up partial file if it exists + if os.path.exists(save_path): + try: os.remove(save_path) + except OSError: pass + return None, err_msg except Exception as e: - err_msg = f"Unexpected download error for {package_id}: {e}"; logger.error(err_msg, exc_info=True); return None, err_msg + err_msg = f"Unexpected download error for {package_id}: {e}" + logger.error(err_msg, exc_info=True) + # Clean up partial file + if os.path.exists(save_path): + try: os.remove(save_path) + except OSError: pass + return None, err_msg def extract_dependencies(tgz_path): - """ Extracts dependencies dict from package.json. Returns (dep_dict or None on error, error_message) """ - logger = logging.getLogger(__name__) + """Extracts dependencies dict from package.json. Returns (dep_dict or None on error, error_message)""" package_json_path = "package/package.json" - dependencies = {} + dependencies = None # Default to None error_message = None if not tgz_path or not os.path.exists(tgz_path): return None, f"File not found at {tgz_path}" try: with tarfile.open(tgz_path, "r:gz") as tar: - package_json_member = tar.getmember(package_json_path) + # Check if package.json exists before trying to extract + try: + package_json_member = tar.getmember(package_json_path) + except KeyError: + # This is common for core packages like hl7.fhir.r4.core + logger.info(f"'{package_json_path}' not found in {os.path.basename(tgz_path)}. Assuming no dependencies.") + return {}, None # Return empty dict, no error + package_json_fileobj = tar.extractfile(package_json_member) if package_json_fileobj: try: - package_data = json.load(package_json_fileobj) + # Read bytes and decode carefully + content_bytes = package_json_fileobj.read() + content_string = content_bytes.decode('utf-8-sig') + package_data = json.loads(content_string) dependencies = package_data.get('dependencies', {}) + if not isinstance(dependencies, dict): + logger.error(f"Invalid 'dependencies' format in {package_json_path} (expected dict, got {type(dependencies)}).") + dependencies = None + error_message = f"Invalid 'dependencies' format in {package_json_path}." + except json.JSONDecodeError as e: + error_message = f"JSON parse error in {package_json_path}: {e}" + logger.error(error_message) + dependencies = None + except UnicodeDecodeError as e: + error_message = f"Encoding error reading {package_json_path}: {e}" + logger.error(error_message) + dependencies = None finally: package_json_fileobj.close() else: - raise FileNotFoundError(f"Could not extract {package_json_path}") - except KeyError: - error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; - logger.warning(error_message) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - error_message = f"Parse error in {package_json_path}: {e}"; logger.error(error_message); dependencies = None - except (tarfile.TarError, FileNotFoundError) as e: - error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None + # Should not happen if getmember succeeded, but handle defensively + error_message = f"Could not extract {package_json_path} despite being listed in tar." + logger.error(error_message) + dependencies = None + + except tarfile.ReadError as e: # Often indicates corrupted file + error_message = f"Tar ReadError (possibly corrupted) for {os.path.basename(tgz_path)}: {e}" + logger.error(error_message) + dependencies = None + except tarfile.TarError as e: + error_message = f"TarError processing {os.path.basename(tgz_path)}: {e}" + logger.error(error_message) + dependencies = None + except FileNotFoundError: # Should be caught by initial check, but include + error_message = f"Package file not found during dependency extraction: {tgz_path}" + logger.error(error_message) + dependencies = None except Exception as e: - error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None + error_message = f"Unexpected error extracting deps from {os.path.basename(tgz_path)}: {e}" + logger.error(error_message, exc_info=True) + dependencies = None + return dependencies, error_message + def extract_used_types(tgz_path): - """ Extracts all resource types and referenced types from the package to determine used dependencies. """ - logger = logging.getLogger(__name__) + """Extracts all resource types and referenced types from the package resources.""" used_types = set() + if not tgz_path or not os.path.exists(tgz_path): + logger.error(f"Cannot extract used types: File not found at {tgz_path}") + return used_types # Return empty set + try: with tarfile.open(tgz_path, "r:gz") as tar: for member in tar: + # Process only JSON files within the 'package/' directory if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')): continue + # Skip metadata files if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue @@ -340,377 +739,742 @@ def extract_used_types(tgz_path): content_bytes = fileobj.read() content_string = content_bytes.decode('utf-8-sig') data = json.loads(content_string) + + if not isinstance(data, dict): continue # Skip if not a valid JSON object + resource_type = data.get('resourceType') + if not resource_type: continue # Skip if no resourceType # Add the resource type itself - if resource_type: - used_types.add(resource_type) + used_types.add(resource_type) - # If this is a StructureDefinition, extract referenced types + # --- StructureDefinition Specific Extraction --- if resource_type == 'StructureDefinition': + # Add the type this SD defines/constrains sd_type = data.get('type') - if sd_type: - used_types.add(sd_type) + if sd_type: used_types.add(sd_type) + # Add the base definition type if it's a profile + base_def = data.get('baseDefinition') + if base_def: + base_type = base_def.split('/')[-1] + # Avoid adding primitive types like 'Element', 'Resource' etc. if not needed + if base_type and base_type[0].isupper(): + used_types.add(base_type) - # Extract types from elements - for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: - for element in element_list: - if 'type' in element: - for t in element['type']: - if 'code' in t: - used_types.add(t['code']) - if 'targetProfile' in t: - for profile in t['targetProfile']: - type_name = profile.split('/')[-1] - used_types.add(type_name) + # Extract types from elements (snapshot or differential) + elements = data.get('snapshot', {}).get('element', []) or data.get('differential', {}).get('element', []) + for element in elements: + if isinstance(element, dict) and 'type' in element: + for t in element.get('type', []): + # Add code (element type) + code = t.get('code') + if code and code[0].isupper(): used_types.add(code) + # Add targetProfile types (Reference targets) + for profile_uri in t.get('targetProfile', []): + if profile_uri: + profile_type = profile_uri.split('/')[-1] + if profile_type and profile_type[0].isupper(): used_types.add(profile_type) + # Add types from contentReference + content_ref = element.get('contentReference') + if content_ref and content_ref.startswith('#'): + # This usually points to another element path within the same SD + # Trying to resolve this fully can be complex. + # We might infer types based on the path referenced if needed. + pass - # If this is another resource (e.g., ValueSet, CodeSystem), extract referenced types + # --- General Resource Type Extraction --- else: - # Look for meta.profile for referenced profiles + # Look for meta.profile for referenced profiles -> add profile type profiles = data.get('meta', {}).get('profile', []) - for profile in profiles: - type_name = profile.split('/')[-1] - used_types.add(type_name) + for profile_uri in profiles: + if profile_uri: + profile_type = profile_uri.split('/')[-1] + if profile_type and profile_type[0].isupper(): used_types.add(profile_type) - # For ValueSet, check compose.include.system + # ValueSet: Check compose.include.system (often points to CodeSystem) if resource_type == 'ValueSet': for include in data.get('compose', {}).get('include', []): system = include.get('system') + # Heuristic: If it looks like a FHIR core codesystem URL, extract type if system and system.startswith('http://hl7.org/fhir/'): type_name = system.split('/')[-1] - used_types.add(type_name) + # Check if it looks like a ResourceType + if type_name and type_name[0].isupper() and not type_name.startswith('sid'): # Avoid things like sid/us-ssn + used_types.add(type_name) + # Could add more heuristics for other terminology servers + # CapabilityStatement: Check rest.resource.type and rest.resource.profile + if resource_type == 'CapabilityStatement': + for rest_item in data.get('rest', []): + for resource_item in rest_item.get('resource', []): + res_type = resource_item.get('type') + if res_type and res_type[0].isupper(): used_types.add(res_type) + profile_uri = resource_item.get('profile') + if profile_uri: + profile_type = profile_uri.split('/')[-1] + if profile_type and profile_type[0].isupper(): used_types.add(profile_type) + + + # --- Generic recursive search for 'reference' fields? --- + # This could be expensive. Let's rely on SDs for now. + # def find_references(obj): + # if isinstance(obj, dict): + # for k, v in obj.items(): + # if k == 'reference' and isinstance(v, str): + # ref_type = v.split('/')[0] + # if ref_type and ref_type[0].isupper(): used_types.add(ref_type) + # else: + # find_references(v) + # elif isinstance(obj, list): + # for item in obj: + # find_references(item) + # find_references(data) + + except json.JSONDecodeError as e: + logger.warning(f"Could not parse JSON in {member.name} for used types: {e}") + except UnicodeDecodeError as e: + logger.warning(f"Could not decode {member.name} for used types: {e}") except Exception as e: logger.warning(f"Could not process member {member.name} for used types: {e}") finally: if fileobj: fileobj.close() + except tarfile.ReadError as e: + logger.error(f"Tar ReadError extracting used types from {tgz_path}: {e}") + except tarfile.TarError as e: + logger.error(f"TarError extracting used types from {tgz_path}: {e}") + except FileNotFoundError: + logger.error(f"Package file not found for used type extraction: {tgz_path}") except Exception as e: - logger.error(f"Error extracting used types from {tgz_path}: {e}") - return used_types + logger.error(f"Error extracting used types from {tgz_path}: {e}", exc_info=True) + + # Filter out potential primitives or base types that aren't resources? + # E.g., 'string', 'boolean', 'Element', 'BackboneElement', 'Resource' + core_non_resource_types = {'string', 'boolean', 'integer', 'decimal', 'uri', 'url', 'canonical', + 'base64Binary', 'instant', 'date', 'dateTime', 'time', 'code', 'oid', 'id', + 'markdown', 'unsignedInt', 'positiveInt', 'xhtml', + 'Element', 'BackboneElement', 'Resource', 'DomainResource', 'DataType'} + final_used_types = {t for t in used_types if t not in core_non_resource_types and t[0].isupper()} + + logger.debug(f"Extracted used types from {os.path.basename(tgz_path)}: {final_used_types}") + return final_used_types + def map_types_to_packages(used_types, all_dependencies): - """ Maps used types to the packages that provide them based on dependency lists. """ - logger = logging.getLogger(__name__) + """Maps used types to the packages that provide them based on dependency lists.""" type_to_package = {} + processed_types = set() + + # Pass 1: Exact matches in dependencies for (pkg_name, pkg_version), deps in all_dependencies.items(): for dep_name, dep_version in deps.items(): - for t in used_types: - if t.lower() in dep_name.lower(): - type_to_package[t] = (dep_name, dep_version) - for t in used_types: - if t.lower() in pkg_name.lower(): - type_to_package[t] = (pkg_name, pkg_version) + # Simple heuristic: if type name is in dependency package name + # This is weak, needs improvement. Ideally, packages declare exported types. + for t in used_types: + # Exact match or common pattern (e.g., USCorePatient -> us.core) + # Need a better mapping strategy - this is very basic. + # Example: If 'USCorePatient' is used, and 'us.core' is a dependency. + # A more robust approach would involve loading the .index.json from dependency packages. + # For now, let's just use a simplified direct check: + # If a dependency name contains the type name (lowercase) + if t not in type_to_package and t.lower() in dep_name.lower(): + type_to_package[t] = (dep_name, dep_version) + processed_types.add(t) + logger.debug(f"Mapped type '{t}' to dependency package '{dep_name}' based on name heuristic.") - # Fallback: map remaining types to the canonical package - for t in used_types: - if t not in type_to_package: - type_to_package[t] = CANONICAL_PACKAGE + # Pass 2: Check the package itself + for (pkg_name, pkg_version), deps in all_dependencies.items(): + for t in used_types: + if t not in type_to_package and t.lower() in pkg_name.lower(): + type_to_package[t] = (pkg_name, pkg_version) + processed_types.add(t) + logger.debug(f"Mapped type '{t}' to source package '{pkg_name}' based on name heuristic.") + + # Fallback: map remaining types to the canonical package if not already mapped + canonical_name, canonical_version = CANONICAL_PACKAGE + unmapped_types = used_types - processed_types + if unmapped_types: + logger.info(f"Using canonical package {canonical_name}#{canonical_version} as fallback for unmapped types: {unmapped_types}") + for t in unmapped_types: + type_to_package[t] = CANONICAL_PACKAGE + + logger.debug(f"Final type-to-package mapping: {type_to_package}") return type_to_package -# --- Recursive Import Orchestrator --- def import_package_and_dependencies(initial_name, initial_version, dependency_mode='recursive'): """Orchestrates recursive download and dependency extraction based on the dependency mode.""" - logger = logging.getLogger(__name__) logger.info(f"Starting import for {initial_name}#{initial_version} with dependency_mode={dependency_mode}") results = { 'requested': (initial_name, initial_version), - 'processed': set(), - 'downloaded': {}, - 'all_dependencies': {}, - 'dependencies': [], - 'errors': [] + 'processed': set(), # Tuples (name, version) successfully processed (downloaded + deps extracted) + 'downloaded': {}, # Dict {(name, version): save_path} for successfully downloaded + 'all_dependencies': {}, # Dict {(name, version): {dep_name: dep_ver}} stores extracted deps for each processed pkg + 'dependencies': [], # List of unique {"name": X, "version": Y} across all processed packages + 'errors': [] # List of error messages encountered } + # Queue stores (name, version) tuples to process pending_queue = [(initial_name, initial_version)] - processed_lookup = set() + # Lookup stores (name, version) tuples that have been added to queue or processed, prevents cycles/re-queuing + queued_or_processed_lookup = set([(initial_name, initial_version)]) + all_found_dependencies = set() # Store unique dep tuples {(name, version)} found - # Always download the initial package - name, version = initial_name, initial_version - package_id_tuple = (name, version) - logger.info(f"Processing initial package: {name}#{version}") - processed_lookup.add(package_id_tuple) - save_path, dl_error = download_package(name, version) - if dl_error: - error_msg = f"Download failed for {name}#{version}: {dl_error}" - results['errors'].append(error_msg) - logger.error("Aborting import: Initial package download failed.") - return results - else: - results['downloaded'][package_id_tuple] = save_path - dependencies, dep_error = extract_dependencies(save_path) - if dep_error: - results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") - elif dependencies is not None: - results['all_dependencies'][package_id_tuple] = dependencies - results['processed'].add(package_id_tuple) - logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") - for dep_name, dep_version in dependencies.items(): - if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: - results['dependencies'].append({"name": dep_name, "version": dep_version}) - - # Process the package to extract compliesWithProfile and imposeProfile - package_info = process_package_file(save_path) - complies_with_profiles = package_info.get('complies_with_profiles', []) - imposed_profiles = package_info.get('imposed_profiles', []) - - # Save metadata for the initial package with profile relationships - save_package_metadata(initial_name, initial_version, dependency_mode, results['dependencies'], - complies_with_profiles=complies_with_profiles, - imposed_profiles=imposed_profiles) - - # Handle dependency pulling based on mode - if dependency_mode == 'recursive': - for dep in results['dependencies']: - dep_name, dep_version = dep['name'], dep['version'] - dep_tuple = (dep_name, dep_version) - if dep_tuple not in processed_lookup: - pending_queue.append(dep_tuple) - logger.debug(f"Added to queue (recursive): {dep_name}#{dep_version}") - - elif dependency_mode == 'patch-canonical': - canonical_name, canonical_version = CANONICAL_PACKAGE - canonical_tuple = (canonical_name, canonical_version) - if canonical_tuple not in processed_lookup: - pending_queue.append(canonical_tuple) - logger.debug(f"Added canonical package to queue: {canonical_name}#{canonical_version}") - - elif dependency_mode == 'tree-shaking': - used_types = extract_used_types(save_path) - logger.debug(f"Used types in {initial_name}#{initial_version}: {used_types}") - type_to_package = map_types_to_packages(used_types, results['all_dependencies']) - logger.debug(f"Type to package mapping: {type_to_package}") - for t, (dep_name, dep_version) in type_to_package.items(): - dep_tuple = (dep_name, dep_version) - if dep_tuple not in processed_lookup and dep_tuple != package_id_tuple: - pending_queue.append(dep_tuple) - logger.debug(f"Added to queue (tree-shaking): {dep_name}#{dep_version}") - - # Process the queue + # --- Main Processing Loop --- while pending_queue: name, version = pending_queue.pop(0) package_id_tuple = (name, version) - if package_id_tuple in processed_lookup: + # Already successfully processed? Skip. (Shouldn't happen with lookup check before queueing, but safety) + if package_id_tuple in results['processed']: + logger.debug(f"Skipping already processed package: {name}#{version}") continue - logger.info(f"Processing: {name}#{version}") - processed_lookup.add(package_id_tuple) + logger.info(f"Processing package from queue: {name}#{version}") + # --- Download --- save_path, dl_error = download_package(name, version) - if dl_error: error_msg = f"Download failed for {name}#{version}: {dl_error}" results['errors'].append(error_msg) - continue + logger.error(error_msg) + # Do not add to processed, leave in lookup to prevent re-queueing a known failure + continue # Move to next item in queue else: results['downloaded'][package_id_tuple] = save_path - dependencies, dep_error = extract_dependencies(save_path) - if dep_error: - results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}") - elif dependencies is not None: - results['all_dependencies'][package_id_tuple] = dependencies - results['processed'].add(package_id_tuple) - logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") - for dep_name, dep_version in dependencies.items(): - if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: - dep_tuple = (dep_name, dep_version) - results['dependencies'].append({"name": dep_name, "version": dep_version}) - if dependency_mode == 'recursive' and dep_tuple not in processed_lookup: - pending_queue.append(dep_tuple) - logger.debug(f"Added to queue: {dep_name}#{dep_version}") + logger.info(f"Successfully downloaded/verified {name}#{version} at {save_path}") + + # --- Extract Dependencies --- + dependencies, dep_error = extract_dependencies(save_path) + if dep_error: + # Log error but potentially continue processing other packages if deps are just missing + error_msg = f"Dependency extraction failed for {name}#{version}: {dep_error}" + results['errors'].append(error_msg) + logger.error(error_msg) + # Mark as processed even if dep extraction fails, as download succeeded + results['processed'].add(package_id_tuple) + # Don't queue dependencies if extraction failed + continue + elif dependencies is None: + # This indicates a more severe error during extraction (e.g., corrupted tar) + error_msg = f"Dependency extraction returned critical error for {name}#{version}. Aborting dependency processing for this package." + results['errors'].append(error_msg) + logger.error(error_msg) + results['processed'].add(package_id_tuple) # Mark processed + continue + + + # Store extracted dependencies for this package + results['all_dependencies'][package_id_tuple] = dependencies + results['processed'].add(package_id_tuple) # Mark as successfully processed + logger.debug(f"Successfully processed {name}#{version}. Dependencies found: {list(dependencies.keys())}") + + # Add unique dependencies to the overall list and potentially the queue + current_package_deps = [] + for dep_name, dep_version in dependencies.items(): + if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: + dep_tuple = (dep_name, dep_version) + current_package_deps.append({"name": dep_name, "version": dep_version}) # For metadata + if dep_tuple not in all_found_dependencies: + all_found_dependencies.add(dep_tuple) + results['dependencies'].append({"name": dep_name, "version": dep_version}) # Add to overall unique list + + # --- Queue Dependencies Based on Mode --- + # Check if not already queued or processed + if dep_tuple not in queued_or_processed_lookup: + should_queue = False + if dependency_mode == 'recursive': + should_queue = True + elif dependency_mode == 'patch-canonical' and dep_tuple == CANONICAL_PACKAGE: + should_queue = True + elif dependency_mode == 'tree-shaking': + # Tree shaking requires calculating used types *after* initial pkg is processed + # This logic needs adjustment - calculate used types only once for the root package. + # Let's defer full tree-shaking queuing logic for now, treat as 'none'. + # TODO: Implement tree-shaking queuing properly outside the loop based on initial package's used types. + pass + + if should_queue: + logger.debug(f"Adding dependency to queue ({dependency_mode}): {dep_name}#{dep_version}") + pending_queue.append(dep_tuple) + queued_or_processed_lookup.add(dep_tuple) + else: + logger.warning(f"Skipping invalid dependency entry in {name}#{version}: name='{dep_name}', version='{dep_version}'") + + # --- Save Metadata (after successful download and dep extraction) --- + # We need profile relationship info which comes from process_package_file + # Let's call it here if needed for metadata, though it duplicates effort if called later. + # Alternative: Save basic metadata first, update later? + # Let's just save what we have now. Profile relations can be added by a separate process. + save_package_metadata(name, version, dependency_mode, current_package_deps) + # TODO: Rework metadata saving if compliesWith/imposedBy is needed during import. + + + # --- Post-Loop Processing (e.g., for Tree Shaking) --- + if dependency_mode == 'tree-shaking' and (initial_name, initial_version) in results['downloaded']: + logger.info("Performing tree-shaking dependency analysis...") + root_save_path = results['downloaded'][(initial_name, initial_version)] + used_types = extract_used_types(root_save_path) + if used_types: + type_to_package = map_types_to_packages(used_types, results['all_dependencies']) + logger.debug(f"Tree-shaking mapping: {type_to_package}") + tree_shaken_deps_to_ensure = set(type_to_package.values()) + + # Ensure canonical package is included if tree-shaking mode implies it + if CANONICAL_PACKAGE not in tree_shaken_deps_to_ensure: + logger.debug(f"Adding canonical package {CANONICAL_PACKAGE} to tree-shaking set.") + tree_shaken_deps_to_ensure.add(CANONICAL_PACKAGE) + + initial_package_tuple = (initial_name, initial_version) + if initial_package_tuple in tree_shaken_deps_to_ensure: + tree_shaken_deps_to_ensure.remove(initial_package_tuple) # Don't queue self + + additional_processing_needed = False + for dep_tuple in tree_shaken_deps_to_ensure: + if dep_tuple not in results['processed'] and dep_tuple not in queued_or_processed_lookup: + logger.info(f"Queueing missing tree-shaken dependency: {dep_tuple[0]}#{dep_tuple[1]}") + pending_queue.append(dep_tuple) + queued_or_processed_lookup.add(dep_tuple) + additional_processing_needed = True + + # If tree-shaking added new packages, re-run the processing loop + if additional_processing_needed: + logger.info("Re-running processing loop for tree-shaken dependencies...") + # This recursive call structure isn't ideal, better to refactor loop. + # For now, let's just run the loop again conceptually. + # This requires refactoring the main loop logic to be callable. + # --- TEMPORARY WORKAROUND: Just log and state limitation --- + logger.warning("Tree-shaking identified additional dependencies. Manual re-run or refactoring needed to process them.") + results['errors'].append("Tree-shaking identified further dependencies; re-run required for full processing.") + # TODO: Refactor the while loop into a callable function to handle recursive/iterative processing. proc_count = len(results['processed']) dl_count = len(results['downloaded']) err_count = len(results['errors']) - logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") + logger.info(f"Import finished for {initial_name}#{initial_version}. Processed: {proc_count}, Downloaded: {dl_count}, Errors: {err_count}") + # Make sure unique list of deps is accurate + results['dependencies'] = [ {"name": d[0], "version": d[1]} for d in all_found_dependencies] return results -# --- Package File Content Processor --- + + def process_package_file(tgz_path): - """ Extracts types, profile status, MS elements, examples, and profile relationships from a downloaded .tgz package. """ - logger = logging.getLogger(__name__) + """Extracts types, profile status, MS elements, examples, and profile relationships from a downloaded .tgz package.""" logger.info(f"Processing package file details: {tgz_path}") results = { - 'resource_types_info': [], - 'must_support_elements': {}, - 'examples': {}, - 'complies_with_profiles': [], - 'imposed_profiles': [], + 'resource_types_info': [], # List of dicts about each Resource/Profile + 'must_support_elements': {}, # Dict: { 'ResourceName/ProfileId': ['path1', 'path2'] } + 'examples': {}, # Dict: { 'ResourceName/ProfileId': ['example_path1'] } + 'complies_with_profiles': [], # List of canonical URLs + 'imposed_profiles': [], # List of canonical URLs 'errors': [] } + # Use defaultdict for easier aggregation + # Key: SD ID if profile, otherwise ResourceType. Value: dict with info. resource_info = defaultdict(lambda: { - 'name': None, - 'type': None, + 'name': None, # The key (SD ID or ResourceType) + 'type': None, # Base FHIR type (e.g., Patient) 'is_profile': False, - 'ms_flag': False, - 'ms_paths': set(), - 'examples': set() + 'ms_flag': False, # Does this SD define *any* MS elements? + 'ms_paths': set(), # Specific MS element paths defined *in this SD* + 'examples': set(), # Paths to example files linked to this type/profile + 'sd_processed': False # Flag to avoid reprocessing MS flags for the same SD key }) if not tgz_path or not os.path.exists(tgz_path): results['errors'].append(f"Package file not found: {tgz_path}") + logger.error(f"Package file not found during processing: {tgz_path}") return results try: with tarfile.open(tgz_path, "r:gz") as tar: - for member in tar: - if not member.isfile() or not member.name.startswith('package/') or not member.name.lower().endswith(('.json', '.xml', '.html')): + members = tar.getmembers() # Get all members once + logger.debug(f"Found {len(members)} members in {os.path.basename(tgz_path)}") + + # --- Pass 1: Process StructureDefinitions --- + logger.debug("Processing StructureDefinitions...") + for member in members: + # Basic filtering + if not member.isfile() or not member.name.startswith('package/') or not member.name.lower().endswith('.json'): continue - member_name_lower = member.name.lower() - base_filename_lower = os.path.basename(member_name_lower) - fileobj = None + base_filename_lower = os.path.basename(member.name).lower() if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue - is_example = member.name.startswith('package/example/') or 'example' in base_filename_lower + fileobj = None + try: + fileobj = tar.extractfile(member) + if not fileobj: continue + + content_bytes = fileobj.read() + content_string = content_bytes.decode('utf-8-sig') + data = json.loads(content_string) + + if not isinstance(data, dict) or data.get('resourceType') != 'StructureDefinition': + continue # Only interested in SDs in this pass + + # --- Process the StructureDefinition --- + profile_id = data.get('id') or data.get('name') # Use ID, fallback to name + sd_type = data.get('type') # The base FHIR type (e.g., Patient) + sd_base = data.get('baseDefinition') + is_profile_sd = bool(sd_base) # It's a profile if it has a baseDefinition + + if not profile_id: + logger.warning(f"StructureDefinition in {member.name} missing 'id' and 'name', skipping.") + continue + if not sd_type: + logger.warning(f"StructureDefinition '{profile_id}' in {member.name} missing 'type', skipping.") + continue + + entry_key = profile_id # Use the SD's ID as the key + entry = resource_info[entry_key] + + # Only process once per entry_key + if entry.get('sd_processed'): continue + + entry['name'] = entry_key + entry['type'] = sd_type + entry['is_profile'] = is_profile_sd + + # Extract compliesWithProfile and imposeProfile extensions + complies_with = [] + imposed = [] + for ext in data.get('extension', []): + ext_url = ext.get('url') + value = ext.get('valueCanonical') + if value: + if ext_url == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-compliesWithProfile': + complies_with.append(value) + elif ext_url == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-imposeProfile': + imposed.append(value) + + # Add to overall results (unique) + results['complies_with_profiles'].extend(c for c in complies_with if c not in results['complies_with_profiles']) + results['imposed_profiles'].extend(i for i in imposed if i not in results['imposed_profiles']) + + # Find Must Support elements defined *in this specific SD* + has_ms_in_this_sd = False + ms_paths_in_this_sd = set() + # Check differential first, then snapshot if needed? Or combine? Let's combine. + elements = data.get('snapshot', {}).get('element', []) + data.get('differential', {}).get('element', []) + # De-duplicate elements based on path if combining snapshot and differential (though usually only one is primary) + processed_element_paths = set() + unique_elements = [] + for el in elements: + el_path = el.get('path') + if el_path and el_path not in processed_element_paths: + unique_elements.append(el) + processed_element_paths.add(el_path) + elif not el_path: # Include elements without paths? Maybe not. + pass + + for element in unique_elements: + if isinstance(element, dict) and element.get('mustSupport') is True: + element_path = element.get('path') + if element_path: + ms_paths_in_this_sd.add(element_path) + has_ms_in_this_sd = True + else: + logger.warning(f"Found mustSupport=true without path in element of {entry_key} ({member.name})") + + if ms_paths_in_this_sd: + entry['ms_paths'] = ms_paths_in_this_sd + entry['ms_flag'] = True # Set flag if this SD defines MS elements + logger.debug(f"Found {len(ms_paths_in_this_sd)} MS elements defined in SD {entry_key}") + + entry['sd_processed'] = True # Mark this SD as processed + + except json.JSONDecodeError as e: + logger.warning(f"Could not parse JSON SD in {member.name}: {e}") + except UnicodeDecodeError as e: + logger.warning(f"Could not decode SD in {member.name}: {e}") + except Exception as e: + logger.warning(f"Could not process SD member {member.name}: {e}", exc_info=False) # Keep log cleaner + finally: + if fileobj: fileobj.close() + + # --- Pass 2: Process Examples --- + logger.debug("Processing Examples...") + for member in members: + # Basic filtering + if not member.isfile() or not member.name.startswith('package/'): # Allow non-JSON examples too + continue + member_name_lower = member.name.lower() + base_filename_lower = os.path.basename(member_name_lower) + if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: + continue + + # Heuristic for identifying examples + # Check directory name or filename conventions + is_example = 'example' in member.name.split('/') or 'example' in base_filename_lower.split('-') or 'example' in base_filename_lower.split('.') + + if not is_example: continue + + logger.debug(f"Processing potential example file: {member.name}") is_json = member_name_lower.endswith('.json') + fileobj = None + associated_key = None try: if is_json: fileobj = tar.extractfile(member) - if not fileobj: - continue + if not fileobj: continue content_bytes = fileobj.read() content_string = content_bytes.decode('utf-8-sig') data = json.loads(content_string) - if not isinstance(data, dict) or 'resourceType' not in data: - continue - resource_type = data['resourceType'] - entry_key = resource_type - is_sd = False + if not isinstance(data, dict): continue + resource_type = data.get('resourceType') + if not resource_type: continue - if resource_type == 'StructureDefinition': - is_sd = True - profile_id = data.get('id') or data.get('name') - sd_type = data.get('type') - sd_base = data.get('baseDefinition') - is_profile_sd = bool(sd_base) - if not profile_id or not sd_type: - logger.warning(f"SD missing ID or Type: {member.name}") - continue - entry_key = profile_id + # Try to associate example with a profile using meta.profile + profile_meta = data.get('meta', {}).get('profile', []) + found_profile_match = False + if profile_meta and isinstance(profile_meta, list): + for profile_url in profile_meta: + # Extract profile ID from canonical URL + profile_id_from_meta = profile_url.split('/')[-1] + if profile_id_from_meta in resource_info: + associated_key = profile_id_from_meta + found_profile_match = True + logger.debug(f"Example {member.name} associated with profile {associated_key} via meta.profile") + break # Use first match - # Extract compliesWithProfile and imposeProfile extensions - complies_with = [] - imposed_profiles = [] - for ext in data.get('extension', []): - if ext.get('url') == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-compliesWithProfile': - value = ext.get('valueCanonical') - if value: - complies_with.append(value) - elif ext.get('url') == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-imposeProfile': - value = ext.get('valueCanonical') - if value: - imposed_profiles.append(value) + # If no profile match, associate with the base resource type SD (if any) + if not found_profile_match: + # Find SD where type matches the example's resourceType and is_profile is False + matching_base_sd_keys = [k for k, v in resource_info.items() if v.get('type') == resource_type and not v.get('is_profile') and v.get('sd_processed')] + if matching_base_sd_keys: + associated_key = matching_base_sd_keys[0] # Use the first matching base SD key + logger.debug(f"Example {member.name} associated with base type SD {associated_key}") + else: + # Fallback: If no SD processed for this base type yet, use the type itself as key + associated_key = resource_type + logger.debug(f"Example {member.name} associated with resource type {associated_key} (no specific SD found/processed yet)") - # Store the relationships - if complies_with: - results['complies_with_profiles'].extend(complies_with) - if imposed_profiles: - results['imposed_profiles'].extend(imposed_profiles) + else: + # For non-JSON examples, try to guess based on filename + # e.g., patient-example.xml -> Patient + # e.g., us-core-patient-example.xml -> us-core-patient (if profile exists) + guessed_profile_id = None + if '-' in base_filename_lower: + # Try matching parts against known profile IDs + parts = base_filename_lower.split('-') + potential_id = parts[0] + if potential_id in resource_info: + guessed_profile_id = potential_id + else: # Try combining parts? e.g., us-core + if len(parts) > 1: + potential_id_2 = f"{parts[0]}-{parts[1]}" + if potential_id_2 in resource_info: + guessed_profile_id = potential_id_2 - entry = resource_info[entry_key] - entry.setdefault('type', resource_type) + if guessed_profile_id: + associated_key = guessed_profile_id + logger.debug(f"Non-JSON Example {member.name} associated with profile {associated_key} via filename heuristic") + else: + # Fallback to guessing base type + guessed_type = base_filename_lower.split('-')[0].split('.')[0].capitalize() + matching_base_sd_keys = [k for k, v in resource_info.items() if v.get('type') == guessed_type and not v.get('is_profile') and v.get('sd_processed')] + if matching_base_sd_keys: + associated_key = matching_base_sd_keys[0] + logger.debug(f"Non-JSON Example {member.name} associated with base type SD {associated_key} via filename heuristic") + elif guessed_type: + associated_key = guessed_type + logger.debug(f"Non-JSON Example {member.name} associated with resource type {associated_key} via filename heuristic (no specific SD found/processed yet)") - if is_sd: - entry['name'] = entry_key - entry['type'] = sd_type - entry['is_profile'] = is_profile_sd - if not entry.get('sd_processed'): - has_ms = False - ms_paths_for_sd = set() - for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: - for element in element_list: - if isinstance(element, dict) and element.get('mustSupport') is True: - element_path = element.get('path') - if element_path: - ms_paths_for_sd.add(element_path) - has_ms = True - else: - logger.warning(f"Found mustSupport=true without path in element of {entry_key}") - if ms_paths_for_sd: - entry['ms_paths'] = ms_paths_for_sd - if has_ms: - entry['ms_flag'] = True - logger.debug(f" Found MS elements in {entry_key}") - entry['sd_processed'] = True - elif is_example: - key_to_use = None - profile_meta = data.get('meta', {}).get('profile', []) - if profile_meta and isinstance(profile_meta, list): - for profile_url in profile_meta: - profile_id_from_meta = profile_url.split('/')[-1] - if profile_id_from_meta in resource_info: - key_to_use = profile_id_from_meta - break - if not key_to_use: - key_to_use = resource_type - if key_to_use not in resource_info: - resource_info[key_to_use].update({'name': key_to_use, 'type': resource_type}) - resource_info[key_to_use]['examples'].add(member.name) + # Add example path to the associated resource/profile info + if associated_key: + # Ensure the entry exists even if no SD was processed (for base types) + if associated_key not in resource_info: + resource_info[associated_key]['name'] = associated_key + # Try to infer type if possible (might be None) + resource_info[associated_key]['type'] = data.get('resourceType') if is_json else associated_key - elif is_example: - guessed_type = base_filename_lower.split('-')[0].capitalize() - guessed_profile_id = base_filename_lower.split('-')[0] - key_to_use = None - if guessed_profile_id in resource_info: - key_to_use = guessed_profile_id - elif guessed_type in resource_info: - key_to_use = guessed_type - if key_to_use: - resource_info[key_to_use]['examples'].add(member.name) - else: - logger.warning(f"Could not associate non-JSON example {member.name}") + resource_info[associated_key]['examples'].add(member.name) + else: + logger.warning(f"Could not associate example {member.name} with any known resource or profile.") + + except json.JSONDecodeError as e: + logger.warning(f"Could not parse JSON example in {member.name}: {e}") + except UnicodeDecodeError as e: + logger.warning(f"Could not decode example in {member.name}: {e}") except Exception as e: - logger.warning(f"Could not process member {member.name}: {e}", exc_info=False) + logger.warning(f"Could not process example member {member.name}: {e}", exc_info=False) finally: - if fileobj: - fileobj.close() + if fileobj: fileobj.close() - # Final formatting + + # --- Final Formatting --- final_list = [] final_ms_elements = {} final_examples = {} - logger.debug(f"Formatting results from resource_info keys: {list(resource_info.keys())}") + logger.debug(f"Finalizing results from resource_info keys: {list(resource_info.keys())}") + + # Make sure all base resource types mentioned (even without explicit SDs) are included + all_types_mentioned = set(v['type'] for v in resource_info.values() if v.get('type')) + for type_name in all_types_mentioned: + if type_name not in resource_info: + # Add a basic entry if a type was mentioned (e.g., by an example) but had no SD + if type_name and type_name[0].isupper(): # Basic check it looks like a resource type + logger.debug(f"Adding basic entry for resource type '{type_name}' mentioned but without processed SD.") + resource_info[type_name]['name'] = type_name + resource_info[type_name]['type'] = type_name + resource_info[type_name]['is_profile'] = False + + for key, info in resource_info.items(): display_name = info.get('name') or key base_type = info.get('type') - if display_name or base_type: - logger.debug(f" Formatting item '{display_name}': type='{base_type}', profile='{info.get('is_profile', False)}', ms_flag='{info.get('ms_flag', False)}'") - final_list.append({ - 'name': display_name, - 'type': base_type, - 'is_profile': info.get('is_profile', False), - 'must_support': info.get('ms_flag', False) - }) - if info['ms_paths']: - final_ms_elements[display_name] = sorted(list(info['ms_paths'])) - if info['examples']: - final_examples[display_name] = sorted(list(info['examples'])) - else: - logger.warning(f"Skipping formatting for key: {key}") - results['resource_types_info'] = sorted(final_list, key=lambda x: (not x.get('is_profile', False), x.get('name', ''))) + # Skip if essential info is missing (shouldn't happen with defaultdict + population) + if not display_name or not base_type: + logger.warning(f"Skipping formatting for incomplete key: {key} - Info: {info}") + continue + + logger.debug(f"Formatting item '{display_name}': type='{base_type}', profile='{info.get('is_profile', False)}', ms_flag='{info.get('ms_flag', False)}'") + final_list.append({ + 'name': display_name, # This is the SD ID or ResourceType + 'type': base_type, # The base FHIR resource type + 'is_profile': info.get('is_profile', False), + 'must_support': info.get('ms_flag', False) # Does this SD *define* MS elements? + }) + if info['ms_paths']: + final_ms_elements[display_name] = sorted(list(info['ms_paths'])) + if info['examples']: + final_examples[display_name] = sorted(list(info['examples'])) + + + # Sort profiles after base types, then alphabetically + results['resource_types_info'] = sorted(final_list, key=lambda x: (x.get('is_profile', False), x.get('name', ''))) results['must_support_elements'] = final_ms_elements results['examples'] = final_examples + # Ensure relationship lists are unique (done during addition now) + # results['complies_with_profiles'] = sorted(list(set(results['complies_with_profiles']))) + # results['imposed_profiles'] = sorted(list(set(results['imposed_profiles']))) + except tarfile.ReadError as e: + err_msg = f"Tar ReadError processing package file {tgz_path}: {e}" + logger.error(err_msg) + results['errors'].append(err_msg) + except tarfile.TarError as e: + err_msg = f"TarError processing package file {tgz_path}: {e}" + logger.error(err_msg) + results['errors'].append(err_msg) + except FileNotFoundError: + err_msg = f"Package file not found during processing: {tgz_path}" + logger.error(err_msg) + results['errors'].append(err_msg) except Exception as e: - err_msg = f"Error processing package file {tgz_path}: {e}" + err_msg = f"Unexpected error processing package file {tgz_path}: {e}" logger.error(err_msg, exc_info=True) results['errors'].append(err_msg) # Logging counts final_types_count = len(results['resource_types_info']) - ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']) + ms_defining_count = sum(1 for r in results['resource_types_info'] if r['must_support']) # Count SDs defining MS total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()) total_examples = sum(len(v) for v in results['examples'].values()) - logger.info(f"Extraction: {final_types_count} items ({ms_count} MS; {total_ms_paths} MS paths; {total_examples} examples) from {os.path.basename(tgz_path)}") + logger.info(f"Package processing finished for {os.path.basename(tgz_path)}: " + f"{final_types_count} Resources/Profiles identified; " + f"{ms_defining_count} define MS elements ({total_ms_paths} total MS paths); " + f"{total_examples} examples found. " + f"CompliesWith: {len(results['complies_with_profiles'])}, Imposed: {len(results['imposed_profiles'])}") - return results \ No newline at end of file + return results + +# --- Example Usage (if running script directly) --- +if __name__ == '__main__': + # Configure logger for direct script execution + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + logger.info("Running services.py directly for testing.") + + # Mock Flask app context minimally for config/instance path + class MockFlaskConfig(dict): + pass + class MockFlaskCurrentApp: + config = MockFlaskConfig() + # Calculate instance path relative to this file + instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance')) + # Need to manually set current_app for testing outside Flask request context + # This is tricky. Let's bypass current_app dependency in _get_download_dir for direct testing. + # OR, provide a mock. Best approach is to structure code to reduce Flask dependency in core logic. + + # For testing, let's override _get_download_dir or manually create the dir + test_download_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance', DOWNLOAD_DIR_NAME)) + os.makedirs(test_download_dir, exist_ok=True) + logger.info(f"Using test download directory: {test_download_dir}") + + # Override the helper function for testing context + original_get_download_dir = _get_download_dir + def mock_get_download_dir(): + # In test, don't rely on current_app if possible + # Ensure config exists if needed by validation code + if not hasattr(mock_get_download_dir, 'config'): + mock_get_download_dir.config = {'FHIR_PACKAGES_DIR': test_download_dir} + return test_download_dir + _get_download_dir = mock_get_download_dir + # Add the FHIR_PACKAGES_DIR to the mock config directly + _get_download_dir.config = {'FHIR_PACKAGES_DIR': test_download_dir} + + # --- Test Case 1: Import AU Core Patient Package --- + pkg_name = "hl7.fhir.au.core" + pkg_version = "1.0.1" # Use a specific version known to exist + logger.info(f"\n--- Testing Import: {pkg_name}#{pkg_version} ---") + import_results = import_package_and_dependencies(pkg_name, pkg_version, dependency_mode='recursive') + # print("Import Results:", json.dumps(import_results, default=lambda o: '', indent=2)) + if not import_results['errors'] and (pkg_name, pkg_version) in import_results['downloaded']: + logger.info(f"Import successful for {pkg_name}#{pkg_version}") + + # --- Test Case 2: Validate Patient Resource --- + logger.info(f"\n--- Testing Validation: Patient Example ---") + patient_resource = { + "resourceType": "Patient", + "id": "banks-mia-leanne", + "meta": { "profile": ["http://hl7.org.au/fhir/core/StructureDefinition/au-core-patient"] }, + "identifier": [{ + "type": {"coding": [{"system": "http://terminology.hl7.org/CodeSystem/v2-0203", "code": "NI"}], "text": "IHI"}, + "system": "http://ns.electronichealth.net.au/id/hi/ihi/1.0", + "value": "8003608333647261" + }], + "name": [{"use": "usual", "family": "Banks", "given": ["Mia", "Leanne"]}], + "telecom": [{"system": "phone", "value": "0491574632", "use": "mobile"}], + "gender": "female", + "birthDate": "1983-08-25", + "address": [{"line": ["50 Sebastien St"], "city": "Minjary", "state": "NSW", "postalCode": "2720", "country": "AU"}] + # Missing communication on purpose to test warnings/errors if required by profile + } + validation_result = validate_resource_against_profile(pkg_name, pkg_version, patient_resource) + print("\nPatient Validation Result:") + print(json.dumps(validation_result, indent=2)) + + # --- Test Case 3: Validate Allergy Resource --- + logger.info(f"\n--- Testing Validation: Allergy Example ---") + allergy_resource = { + "resourceType": "AllergyIntolerance", + "id": "lactose", + "meta": {"profile": ["http://hl7.org.au/fhir/core/StructureDefinition/au-core-allergyintolerance"]}, + "clinicalStatus": {"coding": [{"system": "http://terminology.hl7.org/CodeSystem/allergyintolerance-clinical", "code": "active"}]}, + "verificationStatus": {"coding": [{"system": "http://terminology.hl7.org/CodeSystem/allergyintolerance-verification", "code": "confirmed"}]}, + "code": {"coding": [{"system": "http://snomed.info/sct", "code": "782415009", "display": "Intolerance to lactose"}]}, + "patient": {"reference": "Patient/banks-mia-leanne"}, + "onsetDateTime": "2022", # Example of choice type + "reaction": [{ + "manifestation": [{"coding": [{"system": "http://snomed.info/sct", "code": "21522001", "display": "Abdominal pain"}]}], + "severity": "mild" + }] + } + validation_result_allergy = validate_resource_against_profile(pkg_name, pkg_version, allergy_resource) + print("\nAllergy Validation Result:") + print(json.dumps(validation_result_allergy, indent=2)) + + else: + logger.error(f"Import failed for {pkg_name}#{pkg_version}, cannot proceed with validation tests.") + print("Import Errors:", import_results['errors']) + + # Restore original function if necessary + _get_download_dir = original_get_download_dir \ No newline at end of file diff --git a/templates/base.html b/templates/base.html index 86b4443..8494903 100644 --- a/templates/base.html +++ b/templates/base.html @@ -29,6 +29,9 @@ + diff --git a/templates/cp_downloaded_igs.html b/templates/cp_downloaded_igs.html index ddd9d4b..e40b421 100644 --- a/templates/cp_downloaded_igs.html +++ b/templates/cp_downloaded_igs.html @@ -31,7 +31,6 @@ .badge.bg-danger { background-color: #dc3545 !important; } - /* Ensure table rows with custom background colors are not overridden by Bootstrap */ .table tr.bg-warning { background-color: #ffc107 !important; } @@ -44,7 +43,6 @@ .table tr.bg-danger { background-color: #dc3545 !important; } - /* Override Bootstrap's table background to allow custom row colors to show */ .table-custom-bg { --bs-table-bg: transparent !important; } @@ -64,9 +62,7 @@
{% if packages %}
- +

Risk: = Duplicate Dependencies

@@ -90,11 +86,13 @@ Processed {% else %} + {{ form.csrf_token }} {% endif %} + {{ form.csrf_token }} @@ -106,7 +104,6 @@
Package NameVersionActions
{% if duplicate_groups %} -

Risk: = Duplicate Dependencies

Duplicate dependencies detected: {% for name, versions in duplicate_groups.items() %} {% set group_color = group_colors[name] if name in group_colors else 'bg-warning' %} @@ -158,6 +155,7 @@

View
+ {{ form.csrf_token }}
diff --git a/templates/cp_push_igs.html b/templates/cp_push_igs.html index 630382f..f5138aa 100644 --- a/templates/cp_push_igs.html +++ b/templates/cp_push_igs.html @@ -12,9 +12,6 @@ Package Name Version - @@ -27,20 +24,6 @@ {{ name }} {{ version }} - {% endfor %} @@ -65,6 +48,7 @@

Push IGs to FHIR Server

+ {{ form.csrf_token }}
+ + {% for pkg in packages %} + + {% endfor %} + +
+ + {{ form.package_name(class="form-control") }} + {% for error in form.package_name.errors %} +
{{ error }}
+ {% endfor %} +
+
+ + {{ form.version(class="form-control") }} + {% for error in form.version.errors %} +
{{ error }}
+ {% endfor %} +
+
+ +
+ {{ form.include_dependencies(class="form-check-input") }} + {{ form.include_dependencies.label(class="form-check-label") }} +
+
+
+ + {{ form.mode(class="form-select") }} +
+
+ + {{ form.sample_input(class="form-control", rows=10, placeholder="Paste your FHIR JSON here...") }} + {% for error in form.sample_input.errors %} +
{{ error }}
+ {% endfor %} +
+ {{ form.submit(class="btn btn-primary") }} + +
+
+ + {% if validation_report %} +
+
Validation Report
+
+
Summary
+

Valid: {{ 'Yes' if validation_report.valid else 'No' }}

+ {% if validation_report.errors %} +
Errors
+
    + {% for error in validation_report.errors %} +
  • {{ error }}
  • + {% endfor %} +
+ {% endif %} + {% if validation_report.warnings %} +
Warnings
+
    + {% for warning in validation_report.warnings %} +
  • {{ warning }}
  • + {% endfor %} +
+ {% endif %} + + {% if validation_report.results %} +
Detailed Results
+ {% for resource_id, result in validation_report.results.items() %} +
{{ resource_id }}
+

Valid: {{ 'Yes' if result.valid else 'No' }}

+ {% if result.errors %} +
    + {% for error in result.errors %} +
  • {{ error }}
  • + {% endfor %} +
+ {% endif %} + {% if result.warnings %} +
    + {% for warning in result.warnings %} +
  • {{ warning }}
  • + {% endfor %} +
+ {% endif %} + {% endfor %} + {% endif %} +
+
+ {% endif %} + + + +{% endblock %} \ No newline at end of file