From c2e805b29a1bac91b19baecd53ea16a6a131332e Mon Sep 17 00:00:00 2001 From: Sudo-JHare Date: Mon, 14 Apr 2025 16:21:12 +1000 Subject: [PATCH] incremental update Updated, cliboard copy buttons on examples, added new UI elements, updated and added more validation for samples, and switched validator to ajax --- app.py | 41 +- instance/fhir_ig.db | Bin 28672 -> 28672 bytes services.py | 282 ++++++++-- templates/cp_view_processed_ig.html | 842 ++++++++++++++++++++++++++++ templates/validate_sample.html | 190 +++++-- 5 files changed, 1233 insertions(+), 122 deletions(-) create mode 100644 templates/cp_view_processed_ig.html diff --git a/app.py b/app.py index 58d84b0..6ffac30 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,3 @@ -# app.py import sys import os sys.path.append(os.path.abspath(os.path.dirname(__file__))) @@ -12,7 +11,8 @@ import json import logging import requests import re -import services +import services # Restore full module import +from services import services_bp # Keep Blueprint import from forms import IgImportForm, ValidationForm # Set up logging @@ -47,6 +47,9 @@ except Exception as e: db = SQLAlchemy(app) csrf = CSRFProtect(app) +# Register Blueprint +app.register_blueprint(services_bp, url_prefix='/api') + class ProcessedIg(db.Model): id = db.Column(db.Integer, primary_key=True) package_name = db.Column(db.String(128), nullable=False) @@ -786,10 +789,9 @@ def api_push_ig(): logger.error(f"[API Push] Error yielding final error message: {yield_e}") return Response(generate_stream(), mimetype='application/x-ndjson') -@app.route('/validate-sample', methods=['GET', 'POST']) +@app.route('/validate-sample', methods=['GET']) def validate_sample(): form = ValidationForm() - validation_report = None packages = [] packages_dir = app.config['FHIR_PACKAGES_DIR'] if os.path.exists(packages_dir): @@ -807,40 +809,11 @@ def validate_sample(): except Exception as e: logger.warning(f"Error reading package {filename}: {e}") continue - if form.validate_on_submit(): - package_name = form.package_name.data - version = form.version.data - include_dependencies = form.include_dependencies.data - mode = form.mode.data - try: - sample_input = json.loads(form.sample_input.data) - if mode == 'single': - validation_report = services.validate_resource_against_profile( - package_name, version, sample_input, include_dependencies - ) - else: - validation_report = services.validate_bundle_against_profile( - package_name, version, sample_input, include_dependencies - ) - flash("Validation completed.", 'success') - except json.JSONDecodeError: - flash("Invalid JSON format in sample input.", 'error') - validation_report = {'valid': False, 'errors': ['Invalid JSON format provided.'], 'warnings': [], 'results': {}} - except Exception as e: - logger.error(f"Error validating sample: {e}") - flash(f"Error validating sample: {str(e)}", 'error') - validation_report = {'valid': False, 'errors': [str(e)], 'warnings': [], 'results': {}} - else: - for field, errors in form.errors.items(): - field_obj = getattr(form, field, None) - field_label = field_obj.label.text if field_obj and hasattr(field_obj, 'label') else field - for error in errors: - flash(f"Error in field '{field_label}': {error}", "danger") return render_template( 'validate_sample.html', form=form, packages=packages, - validation_report=validation_report, + validation_report=None, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now() ) diff --git a/instance/fhir_ig.db b/instance/fhir_ig.db index be0ca841c884245d65815c03df393b55d3da1974..654bd6c4cc1a5b174008681eb95616103b142167 100644 GIT binary patch delta 53 zcmZp8z}WDBae_3X(?l6(MyHJl9%2$^RtDx)hK7137KWyl2GP|@d5O8HN>&P!A7;sH Jz9Xg*2>`R*5O@Fp delta 53 zcmZp8z}WDBae_3X)kGO*Myrhp9%2#(Rt5%EM&^2ErY06&P!A7;sH Jz9Xg*2>`Ig5M}@X diff --git a/services.py b/services.py index 0e5b943..e48b324 100644 --- a/services.py +++ b/services.py @@ -4,11 +4,14 @@ import tarfile import json import re import logging -from flask import current_app +from flask import current_app, Blueprint, request, jsonify from collections import defaultdict from pathlib import Path import datetime +# Define Blueprint +services_bp = Blueprint('services', __name__) + # Configure logging if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -53,7 +56,6 @@ FHIR_R4_BASE_TYPES = { "SubstanceSourceMaterial", "SubstanceSpecification", "SupplyDelivery", "SupplyRequest", "Task", "TerminologyCapabilities", "TestReport", "TestScript", "ValueSet", "VerificationResult", "VisionPrescription" } - # --- Helper Functions --- def _get_download_dir(): @@ -132,8 +134,8 @@ def parse_package_filename(filename): version = "" return name, version -def find_and_extract_sd(tgz_path, resource_identifier): - """Helper to find and extract StructureDefinition json from a tgz path.""" +def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None): + """Helper to find and extract StructureDefinition json from a tgz path, prioritizing profile match.""" sd_data = None found_path = None if not tgz_path or not os.path.exists(tgz_path): @@ -159,17 +161,23 @@ def find_and_extract_sd(tgz_path, resource_identifier): sd_name = data.get('name') sd_type = data.get('type') sd_url = data.get('url') - if resource_identifier and ( + # Prioritize match with profile_url if provided + if profile_url and sd_url == profile_url: + sd_data = data + found_path = member.name + logger.info(f"Found SD matching profile '{profile_url}' at path: {found_path}") + break + # Fallback to resource type or identifier match + elif resource_identifier and ( (sd_id and resource_identifier.lower() == sd_id.lower()) or (sd_name and resource_identifier.lower() == sd_name.lower()) or (sd_type and resource_identifier.lower() == sd_type.lower()) or - (sd_url and resource_identifier.lower() == sd_url.lower()) or (os.path.splitext(os.path.basename(member.name))[0].lower() == resource_identifier.lower()) ): sd_data = data found_path = member.name logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}") - break + # Continue searching in case a profile match is found except json.JSONDecodeError as e: logger.debug(f"Could not parse JSON in {member.name}, skipping: {e}") except UnicodeDecodeError as e: @@ -182,7 +190,7 @@ def find_and_extract_sd(tgz_path, resource_identifier): if fileobj: fileobj.close() if sd_data is None: - logger.info(f"SD matching identifier '{resource_identifier}' not found within archive {os.path.basename(tgz_path)}") + logger.info(f"SD matching identifier '{resource_identifier}' or profile '{profile_url}' not found within archive {os.path.basename(tgz_path)}") except tarfile.ReadError as e: logger.error(f"Tar ReadError reading {tgz_path}: {e}") return None, None @@ -196,7 +204,7 @@ def find_and_extract_sd(tgz_path, resource_identifier): logger.error(f"Unexpected error in find_and_extract_sd for {tgz_path}: {e}", exc_info=True) raise return sd_data, found_path - + # --- Metadata Saving/Loading --- def save_package_metadata(name, version, dependency_mode, dependencies, complies_with_profiles=None, imposed_profiles=None): """Saves dependency mode, imported dependencies, and profile relationships as metadata.""" @@ -250,7 +258,6 @@ def get_package_metadata(name, version): else: logger.debug(f"Metadata file not found: {metadata_path}") return None - # --- Package Processing --- def process_package_file(tgz_path): @@ -393,7 +400,7 @@ def process_package_file(tgz_path): finally: if fileobj: fileobj.close() - # --- Pass 2: Process Examples --- + # Pass 2: Process Examples logger.debug("Processing Examples...") example_members = [m for m in members if m.isfile() and m.name.startswith('package/') and 'example' in m.name.lower()] @@ -433,7 +440,7 @@ def process_package_file(tgz_path): break elif profile_url in resource_info: associated_key = profile_url - personally_match = True + found_profile_match = True logger.debug(f"Example {member.name} associated with profile {associated_key} via meta.profile") break @@ -475,7 +482,7 @@ def process_package_file(tgz_path): finally: if fileobj: fileobj.close() - # --- Pass 3: Ensure Relevant Base Types --- + # Pass 3: Ensure Relevant Base Types logger.debug("Ensuring relevant FHIR R4 base types...") essential_types = {'CapabilityStatement'} for type_name in referenced_types | essential_types: @@ -485,7 +492,7 @@ def process_package_file(tgz_path): resource_info[type_name]['is_profile'] = False logger.debug(f"Added base type entry for {type_name}") - # --- Final Consolidation --- + # Final Consolidation final_list = [] final_ms_elements = {} final_examples = {} @@ -559,12 +566,18 @@ def process_package_file(tgz_path): # --- Validation Functions --- def navigate_fhir_path(resource, path, extension_url=None): - """Navigates a FHIR resource using a FHIRPath-like expression.""" + """Navigates a FHIR resource using a FHIRPath-like expression, handling nested structures.""" + logger.debug(f"Navigating FHIR path: {path}") if not resource or not path: return None parts = path.split('.') current = resource - for part in parts: + resource_type = resource.get('resourceType') + for i, part in enumerate(parts): + # Skip resource type prefix (e.g., Patient) + if i == 0 and part == resource_type: + continue + # Handle array indexing (e.g., name[0]) match = re.match(r'^(\w+)\[(\d+)\]$', part) if match: key, index = match.groups() @@ -573,17 +586,60 @@ def navigate_fhir_path(resource, path, extension_url=None): if isinstance(current[key], list) and index < len(current[key]): current = current[key][index] else: + logger.debug(f"Path {part} invalid: key={key}, index={index}, current={current.get(key)}") return None + elif isinstance(current, list) and index < len(current): + current = current[index] else: + logger.debug(f"Path {part} not found in current={current}") return None else: - if isinstance(current, dict) and part in current: - current = current[part] - else: - return None + # Handle choice types (e.g., onset[x]) + if '[x]' in part: + part = part.replace('[x]', '') + # Try common choice type suffixes + for suffix in ['', 'DateTime', 'Age', 'Period', 'Range', 'String', 'CodeableConcept']: + test_key = part + suffix + if isinstance(current, dict) and test_key in current: + current = current[test_key] + break + else: + logger.debug(f"Choice type {part}[x] not found in current={current}") + return None + elif isinstance(current, dict): + if part in current: + current = current[part] + else: + # Handle FHIR complex types + if part == 'code' and 'coding' in current and isinstance(current['coding'], list) and current['coding']: + current = current['coding'] + elif part == 'patient' and 'reference' in current and current['reference']: + current = current['reference'] + elif part == 'manifestation' and isinstance(current, list) and current and 'coding' in current[0] and current[0]['coding']: + current = current[0]['coding'] + elif part == 'clinicalStatus' and 'coding' in current and isinstance(current['coding'], list) and current['coding']: + current = current['coding'] + else: + logger.debug(f"Path {part} not found in current={current}") + return None + elif isinstance(current, list) and len(current) > 0: + # Try to find the part in list items + found = False + for item in current: + if isinstance(item, dict) and part in item: + current = item[part] + found = True + break + if not found: + # For nested paths like communication.language, return None only if the parent is absent + logger.debug(f"Path {part} not found in list items: {current}") + return None if extension_url and isinstance(current, list): current = [item for item in current if item.get('url') == extension_url] - return current + # Return non-None/non-empty values as present + result = current if (current is not None and (not isinstance(current, list) or current)) else None + logger.debug(f"Path {path} resolved to: {result}") + return result def validate_resource_against_profile(package_name, version, resource, include_dependencies=True): """Validates a FHIR resource against a StructureDefinition in the specified package.""" @@ -593,29 +649,93 @@ def validate_resource_against_profile(package_name, version, resource, include_d if not download_dir: result['valid'] = False result['errors'].append("Could not access download directory") + logger.error("Validation failed: Could not access download directory") return result tgz_path = os.path.join(download_dir, construct_tgz_filename(package_name, version)) - sd_data, _ = find_and_extract_sd(tgz_path, resource.get('resourceType')) - if not sd_data: + logger.debug(f"Checking for package file: {tgz_path}") + if not os.path.exists(tgz_path): result['valid'] = False - result['errors'].append(f"No StructureDefinition found for {resource.get('resourceType')} in {package_name}#{version}") + result['errors'].append(f"Package file not found: {package_name}#{version}") + logger.error(f"Validation failed: Package file not found at {tgz_path}") return result - # Basic validation: check required elements and Must Support + # Use profile from meta.profile if available + profile_url = None + meta = resource.get('meta', {}) + profiles = meta.get('profile', []) + if profiles: + profile_url = profiles[0] # Use first profile + logger.debug(f"Using profile from meta.profile: {profile_url}") + + sd_data, sd_path = find_and_extract_sd(tgz_path, resource.get('resourceType'), profile_url) + if not sd_data: + result['valid'] = False + result['errors'].append(f"No StructureDefinition found for {resource.get('resourceType')} with profile {profile_url or 'any'} in {package_name}#{version}") + logger.error(f"Validation failed: No SD for {resource.get('resourceType')} in {tgz_path}") + return result + logger.debug(f"Found SD at {sd_path}") + + # Validate required elements (min=1) + errors = [] elements = sd_data.get('snapshot', {}).get('element', []) for element in elements: path = element.get('path') - if element.get('min', 0) > 0: + min_val = element.get('min', 0) + # Only check top-level required elements + if min_val > 0 and not '.' in path[1 + path.find('.'):]: value = navigate_fhir_path(resource, path) - if value is None or (isinstance(value, list) and not value): - result['valid'] = False - result['errors'].append(f"Required element {path} missing") - if element.get('mustSupport', False): - value = navigate_fhir_path(resource, path) - if value is None or (isinstance(value, list) and not value): - result['warnings'].append(f"Must Support element {path} missing or empty") + if value is None or (isinstance(value, list) and not any(value)): + errors.append(f"Required element {path} missing") + logger.info(f"Validation error: Required element {path} missing") + # Validate must-support elements + warnings = [] + for element in elements: + path = element.get('path') + # Only check top-level must-support elements + if element.get('mustSupport', False) and not '.' in path[1 + path.find('.'):]: + # Handle choice types (e.g., value[x], effective[x]) + if '[x]' in path: + base_path = path.replace('[x]', '') + found = False + # Try common choice type suffixes + for suffix in ['Quantity', 'CodeableConcept', 'String', 'DateTime', 'Period', 'Range']: + test_path = f"{base_path}{suffix}" + value = navigate_fhir_path(resource, test_path) + if value is not None and (not isinstance(value, list) or any(value)): + found = True + break + if not found: + warnings.append(f"Must Support element {path} missing or empty") + logger.info(f"Validation warning: Must Support element {path} missing or empty") + else: + value = navigate_fhir_path(resource, path) + if value is None or (isinstance(value, list) and not any(value)): + # Only warn for non-required must-support elements + if element.get('min', 0) == 0: + warnings.append(f"Must Support element {path} missing or empty") + logger.info(f"Validation warning: Must Support element {path} missing or empty") + # Handle dataAbsentReason only if value[x] is absent + if path.endswith('dataAbsentReason') and element.get('mustSupport', False): + value_x_path = path.replace('dataAbsentReason', 'value[x]') + value_found = False + for suffix in ['Quantity', 'CodeableConcept', 'String', 'DateTime', 'Period', 'Range']: + test_path = path.replace('dataAbsentReason', f'value{suffix}') + value = navigate_fhir_path(resource, test_path) + if value is not None and (not isinstance(value, list) or any(value)): + value_found = True + break + if not value_found: + value = navigate_fhir_path(resource, path) + if value is None or (isinstance(value, list) and not any(value)): + warnings.append(f"Must Support element {path} missing or empty") + logger.info(f"Validation warning: Must Support element {path} missing or empty") + + result['valid'] = len(errors) == 0 + result['errors'] = errors + result['warnings'] = warnings + logger.debug(f"Validation result: valid={result['valid']}, errors={result['errors']}, warnings={result['warnings']}") return result def validate_bundle_against_profile(package_name, version, bundle, include_dependencies=True): @@ -630,6 +750,7 @@ def validate_bundle_against_profile(package_name, version, bundle, include_depen if not bundle.get('resourceType') == 'Bundle': result['valid'] = False result['errors'].append("Resource is not a Bundle") + logger.error("Validation failed: Resource is not a Bundle") return result for entry in bundle.get('entry', []): @@ -645,6 +766,7 @@ def validate_bundle_against_profile(package_name, version, bundle, include_depen result['errors'].extend(validation_result['errors']) result['warnings'].extend(validation_result['warnings']) + logger.debug(f"Bundle validation result: valid={result['valid']}, errors={len(result['errors'])}, warnings={len(result['warnings'])}") return result # --- Structure Definition Retrieval --- @@ -1030,6 +1152,97 @@ def import_package_and_dependencies(initial_name, initial_version, dependency_mo logger.info(f"Import finished for {initial_name}#{initial_version}. Processed: {len(results['processed'])}, Downloaded: {len(results['downloaded'])}, Errors: {len(results['errors'])}") return results +# --- Validation Route --- +@services_bp.route('/validate-sample', methods=['POST']) +def validate_sample(): + """Validates a FHIR sample against a package profile.""" + logger.debug("Received validate-sample request") + data = request.get_json(silent=True) + if not data: + logger.error("No JSON data provided or invalid JSON in validate-sample request") + return jsonify({ + 'valid': False, + 'errors': ["No JSON data provided or invalid JSON"], + 'warnings': [], + 'results': {} + }), 400 + + package_name = data.get('package_name') + version = data.get('version') + sample_data = data.get('sample_data') + + logger.debug(f"Request params: package_name={package_name}, version={version}, sample_data_length={len(sample_data) if sample_data else 0}") + if not package_name or not version or not sample_data: + logger.error(f"Missing required fields: package_name={package_name}, version={version}, sample_data={'provided' if sample_data else 'missing'}") + return jsonify({ + 'valid': False, + 'errors': ["Missing required fields: package_name, version, or sample_data"], + 'warnings': [], + 'results': {} + }), 400 + + # Verify download directory access + download_dir = _get_download_dir() + if not download_dir: + logger.error("Cannot access download directory") + return jsonify({ + 'valid': False, + 'errors': ["Server configuration error: cannot access package directory"], + 'warnings': [], + 'results': {} + }), 500 + + # Verify package file exists + tgz_filename = construct_tgz_filename(package_name, version) + tgz_path = os.path.join(download_dir, tgz_filename) + logger.debug(f"Checking package file: {tgz_path}") + if not os.path.exists(tgz_path): + logger.error(f"Package file not found: {tgz_path}") + return jsonify({ + 'valid': False, + 'errors': [f"Package not found: {package_name}#{version}. Please import the package first."], + 'warnings': [], + 'results': {} + }), 400 + + try: + # Parse JSON sample + sample = json.loads(sample_data) + resource_type = sample.get('resourceType') + if not resource_type: + logger.error("Sample JSON missing resourceType") + return jsonify({ + 'valid': False, + 'errors': ["Sample JSON missing resourceType"], + 'warnings': [], + 'results': {} + }), 400 + + logger.debug(f"Validating {resource_type} against {package_name}#{version}") + # Validate resource or bundle + if resource_type == 'Bundle': + result = validate_bundle_against_profile(package_name, version, sample) + else: + result = validate_resource_against_profile(package_name, version, sample) + + logger.info(f"Validation result for {resource_type} against {package_name}#{version}: valid={result['valid']}, errors={len(result['errors'])}, warnings={len(result['warnings'])}") + return jsonify(result) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in sample_data: {e}") + return jsonify({ + 'valid': False, + 'errors': [f"Invalid JSON: {str(e)}"], + 'warnings': [], + 'results': {} + }), 400 + except Exception as e: + logger.error(f"Validation failed: {e}", exc_info=True) + return jsonify({ + 'valid': False, + 'errors': [f"Validation failed: {str(e)}"], + 'warnings': [], + 'results': {} + }), 500 # --- Standalone Test --- if __name__ == '__main__': logger.info("Running services.py directly for testing.") @@ -1082,4 +1295,5 @@ if __name__ == '__main__': print(f" Imposed Profiles: {processing_results.get('imposed_profiles', [])}") print(f" Processing Errors: {processing_results.get('errors', [])}") else: - print(f"\n--- Skipping Processing Test (Import failed for {pkg_name_to_test}#{pkg_version_to_test}) ---") \ No newline at end of file + print(f"\n--- Skipping Processing Test (Import failed for {pkg_name_to_test}#{pkg_version_to_test}) ---") + diff --git a/templates/cp_view_processed_ig.html b/templates/cp_view_processed_ig.html new file mode 100644 index 0000000..d1aaf99 --- /dev/null +++ b/templates/cp_view_processed_ig.html @@ -0,0 +1,842 @@ +{% extends "base.html" %} + +{% block content %} +
+ FHIRFLARE Ig Toolkit +

{{ title }}

+
+

+ View details of the processed FHIR Implementation Guide. +

+ +
+
+ +
+
+

{{ title }}

+ Back to Package List +
+ + {% if processed_ig %} +
+
Package Details
+
+
+
Package Name
+
{{ processed_ig.package_name }}
+
Package Version
+
{{ processed_ig.version }}
+
Processed At
+
{{ processed_ig.processed_date.strftime('%Y-%m-%d %H:%M:%S UTC') }}
+
+
+
+ + {% if config.DISPLAY_PROFILE_RELATIONSHIPS %} +
+
Profile Relationships
+
+
Complies With
+ {% if complies_with_profiles %} +
    + {% for profile in complies_with_profiles %} +
  • {{ profile }}
  • + {% endfor %} +
+ {% else %} +

No profiles declared as compatible.

+ {% endif %} + +
Required Dependent Profiles (Must Also Validate Against)
+ {% if imposed_profiles %} +
    + {% for profile in imposed_profiles %} +
  • {{ profile }}
  • + {% endfor %} +
+ {% else %} +

No imposed profiles.

+ {% endif %} +
+
+ {% endif %} + +
+
Resource Types Found / Defined
+
+ {% if profile_list or base_list %} +

+ + MS = Contains Must Support Elements
+ Optional MS Ext = Optional Extension with Must Support Sub-Elements +
+

+ {% if profile_list %} +

Examples = Examples will be displayed when selecting profile Types if contained in the IG

+
Profiles Defined ({{ profile_list|length }}):
+ + {% else %} +

No profiles defined.

+ {% endif %} + {% if base_list %} +
Base Resource Types Referenced ({{ base_list|length }}):
+ + {% else %} +

No base resource types referenced.

+ {% endif %} + {% else %} +

No resource type information extracted or stored.

+ {% endif %} +
+
+ + + + + + + {% else %} + + {% endif %} +
+ + + + +{% endblock %} \ No newline at end of file diff --git a/templates/validate_sample.html b/templates/validate_sample.html index f9b31f3..666a172 100644 --- a/templates/validate_sample.html +++ b/templates/validate_sample.html @@ -20,14 +20,14 @@
Validation Form
-
+ {{ form.hidden_tag() }}
Select a package from the list below or enter a new one (e.g., hl7.fhir.us.core).
- {% if packages %} {% for pkg in packages %} @@ -69,58 +69,17 @@
{{ error }}
{% endfor %}
- {{ form.submit(class="btn btn-primary") }} +
- {% if validation_report %} -
-
Validation Report
-
-
Summary
-

Valid: {{ 'Yes' if validation_report.valid else 'No' }}

- {% if validation_report.errors %} -
Errors
-
    - {% for error in validation_report.errors %} -
  • {{ error }}
  • - {% endfor %} -
- {% endif %} - {% if validation_report.warnings %} -
Warnings
-
    - {% for warning in validation_report.warnings %} -
  • {{ warning }}
  • - {% endfor %} -
- {% endif %} - - {% if validation_report.results %} -
Detailed Results
- {% for resource_id, result in validation_report.results.items() %} -
{{ resource_id }}
-

Valid: {{ 'Yes' if result.valid else 'No' }}

- {% if result.errors %} -
    - {% for error in result.errors %} -
  • {{ error }}
  • - {% endfor %} -
- {% endif %} - {% if result.warnings %} -
    - {% for warning in result.warnings %} -
  • {{ warning }}
  • - {% endfor %} -
- {% endif %} - {% endfor %} - {% endif %} -
+
{% endblock %} \ No newline at end of file