From 86fa94538a4ce4a20ed0c2ec2c9fcefaf59ad0dd Mon Sep 17 00:00:00 2001 From: Sudo-JHare Date: Fri, 11 Apr 2025 13:36:22 +1000 Subject: [PATCH] Added Future proofing: https://hl7.org/fhir/extensions/StructureDefinition-structuredefinition-compliesWithProfile.html added logic to deal with Imposes and complies with --- README.md | 47 ++- app.py | 143 +++---- instance/fhir_ig.db | Bin 180224 -> 180224 bytes .../hl7.fhir.r4.core-4.0.1.metadata.json | 4 +- services.py | 372 +++++++++++++----- templates/cp_view_processed_ig.html | 29 ++ 6 files changed, 416 insertions(+), 179 deletions(-) diff --git a/README.md b/README.md index 80f3a38..ec7c505 100644 --- a/README.md +++ b/README.md @@ -8,10 +8,12 @@ The FHIRFLARE IG Toolkit is a Flask-based web application designed to simplify t - **Import IGs**: Download FHIR IG packages and their dependencies from a package registry. - **Manage IGs**: View, process, and delete downloaded IGs, with duplicate detection. -- **Process IGs**: Extract resource types, profiles, must-support elements, and examples from IGs. -- **Push IGs**: Upload IG resources to a FHIR server with real-time console output. -- **API Support**: Provides RESTful API endpoints for importing and pushing IGs. +- **Process IGs**: Extract resource types, profiles, must-support elements, examples, and profile relationships (`compliesWithProfile` and `imposeProfile`) from IGs. +- **Push IGs**: Upload IG resources to a FHIR server with real-time console output, including validation against imposed profiles. +- **Profile Relationships**: Support for `structuredefinition-compliesWithProfile` and `structuredefinition-imposeProfile` extensions, with validation and UI display. +- **API Support**: Provides RESTful API endpoints for importing and pushing IGs, including profile relationship metadata. - **Live Console**: Displays real-time logs during push operations. +- **Configurable Behavior**: Options to enable/disable imposed profile validation and UI display of profile relationships. ## Technology Stack @@ -85,18 +87,23 @@ The FHIRFLARE IG Toolkit is built using the following technologies: - Go to the "Manage FHIR Packages" tab to view downloaded IGs. - Process, delete, or view details of IGs. Duplicates are highlighted for resolution. -3. **Push IGs to a FHIR Server**: +3. **View Processed IGs**: + - After processing an IG, view its details, including resource types, profiles, must-support elements, examples, and profile relationships (`compliesWithProfile` and `imposeProfile`). + - Profile relationships are displayed if enabled via the `DISPLAY_PROFILE_RELATIONSHIPS` configuration. + +4. **Push IGs to a FHIR Server**: - Navigate to the "Push IGs" tab. - Select a package, enter a FHIR server URL (e.g., `http://hapi.fhir.org/baseR4`), and choose whether to include dependencies. - - Click "Push to FHIR Server" to upload resources, with progress shown in the live console. + - Click "Push to FHIR Server" to upload resources, with validation against imposed profiles (if enabled via `VALIDATE_IMPOSED_PROFILES`) and progress shown in the live console. -4. **API Usage**: +5. **API Usage**: - **Import IG**: `POST /api/import-ig` ```bash curl -X POST http://localhost:5000/api/import-ig \ -H "Content-Type: application/json" \ -d '{"package_name": "hl7.fhir.us.core", "version": "3.1.1", "api_key": "your-api-key"}' ``` + Response includes `complies_with_profiles` and `imposed_profiles` if present. - **Push IG**: `POST /api/push-ig` ```bash curl -X POST http://localhost:5000/api/push-ig \ @@ -104,6 +111,18 @@ The FHIRFLARE IG Toolkit is built using the following technologies: -H "Accept: application/x-ndjson" \ -d '{"package_name": "hl7.fhir.us.core", "version": "3.1.1", "fhir_server_url": "http://hapi.fhir.org/baseR4", "include_dependencies": true, "api_key": "your-api-key"}' ``` + Resources are validated against imposed profiles before pushing. + +## Configuration Options + +- **`VALIDATE_IMPOSED_PROFILES`**: Set to `True` (default) to validate resources against imposed profiles during the push operation. Set to `False` to skip this validation. + ```python + app.config['VALIDATE_IMPOSED_PROFILES'] = False + ``` +- **`DISPLAY_PROFILE_RELATIONSHIPS`**: Set to `True` (default) to display `compliesWithProfile` and `imposeProfile` relationships in the UI. Set to `False` to hide them. + ```python + app.config['DISPLAY_PROFILE_RELATIONSHIPS'] = False + ``` ## Testing @@ -147,11 +166,11 @@ The test suite includes 27 test cases covering the following areas: - Import IG page (`/import-ig`): Form rendering and submission (success, failure, invalid input). - Manage IGs page (`/view-igs`): Rendering with and without packages. - Push IGs page (`/push-igs`): Rendering and live console. - - View Processed IG page (`/view-ig/`): Rendering processed IG details. + - View Processed IG page (`/view-ig/`): Rendering processed IG details, including profile relationships. - **API Endpoints**: - - `POST /api/import-ig`: Success, invalid API key, missing parameters. - - `POST /api/push-ig`: Success, invalid API key, package not found. + - `POST /api/import-ig`: Success, invalid API key, missing parameters, profile relationships in response. + - `POST /api/push-ig`: Success, invalid API key, package not found, imposed profile validation. - `GET /get-structure`: Fetching structure definitions (success, not found). - `GET /get-example`: Fetching example content (success, invalid path). @@ -161,7 +180,7 @@ The test suite includes 27 test cases covering the following areas: - Viewing processed IGs: Retrieving and displaying processed IG data. - **File Operations**: - - Processing IG packages: Extracting data from `.tgz` files. + - Processing IG packages: Extracting data from `.tgz` files, including profile relationships. - Deleting IG packages: Removing `.tgz` files from the filesystem. - **Security**: @@ -219,7 +238,7 @@ test_app.py::TestFHIRFlareIGToolkit::test_get_example_content_invalid_path PASSE ### Background -The FHIRFLARE IG Toolkit was developed to address the need for a user-friendly tool to manage FHIR Implementation Guides. The project focuses on providing a seamless experience for importing, processing, and analyzing FHIR packages, with a particular emphasis on handling duplicate dependencies—a common challenge in FHIR development. +The FHIRFLARE IG Toolkit was developed to address the need for a user-friendly tool to manage FHIR Implementation Guides. The project focuses on providing a seamless experience for importing, processing, and analyzing FHIR packages, with a particular emphasis on handling duplicate dependencies and profile relationships—a common challenge in FHIR development. ### Technical Decisions @@ -227,18 +246,21 @@ The FHIRFLARE IG Toolkit was developed to address the need for a user-friendly t - **SQLite**: Used as the database for simplicity and ease of setup. For production use, consider switching to a more robust database like PostgreSQL. - **Bootstrap**: Integrated for a responsive and professional UI, with custom CSS to handle duplicate package highlighting. - **Docker Support**: Added to simplify deployment and ensure consistency across development and production environments. +- **Profile Validation**: Added support for `structuredefinition-compliesWithProfile` and `structuredefinition-imposeProfile` to ensure resources comply with required profiles during push operations. ### Known Issues and Workarounds - **Bootstrap CSS Conflicts**: Early versions of the application had issues with Bootstrap’s table background styles (`--bs-table-bg`) overriding custom row colors for duplicate packages. This was resolved by setting `--bs-table-bg` to `transparent` for the affected table (see `templates/cp_downloaded_igs.html`). - **Database Permissions**: The `instance` directory must be writable by the application. If you encounter permission errors, ensure the directory has the correct permissions (`chmod -R 777 instance`). - **Package Parsing**: Some FHIR package filenames may not follow the expected `name-version.tgz` format, leading to parsing issues. The application includes a fallback to treat such files as name-only packages, but this may need further refinement. +- **Profile Validation Overhead**: Validating against imposed profiles can increase processing time during push operations. This can be disabled via the `VALIDATE_IMPOSED_PROFILES` configuration if performance is a concern. ### Future Improvements - [ ] **Sorting Versions**: Add sorting for package versions in the "Manage FHIR Packages" view to display them in a consistent order (e.g., ascending or descending). - [ ] **Advanced Duplicate Handling**: Implement options to resolve duplicates (e.g., keep the latest version, merge resources). - [ ] **Production Database**: Support for PostgreSQL or MySQL for better scalability in production environments. +- [ ] **Profile Validation Enhancements**: Add more detailed validation reports for imposed profiles, including specific element mismatches. **Completed Items** (Removed from the list as they are done): - ~~Testing: Add unit tests using pytest to cover core functionality, especially package processing and database operations.~~ (Implemented in `tests/test_app.py` with 27 test cases covering UI, API, database, and file operations.) @@ -267,11 +289,12 @@ Please ensure your code follows the project’s coding style and includes approp - **Database Issues**: If the SQLite database (`instance/fhir_ig.db`) cannot be created, ensure the `instance` directory is writable. You may need to adjust permissions (`chmod -R 777 instance`). - **Package Download Fails**: Verify your internet connection and ensure the package name and version are correct. - **Colors Not Displaying**: If table row colors for duplicates are not showing, inspect the page with browser developer tools (F12) to check for CSS conflicts with Bootstrap. +- **Profile Relationships Not Displaying**: Ensure `DISPLAY_PROFILE_RELATIONSHIPS` is set to `True` in the application configuration. ## Directory Structure - `app.py`: Main Flask application file. -- `services.py`: Business logic for importing, processing, and pushing IGs. +- `services.py`: Business logic for importing, processing, and pushing IGs, including profile relationship handling. - `templates/`: HTML templates for the UI. - `instance/`: Directory for SQLite database and downloaded packages. - `tests/`: Directory for test files. diff --git a/app.py b/app.py index de4b6ad..7e559f4 100644 --- a/app.py +++ b/app.py @@ -10,7 +10,7 @@ from datetime import datetime import services import logging import requests -import re # Added for regex validation +import re # Set up logging logging.basicConfig(level=logging.DEBUG) @@ -21,7 +21,9 @@ app.config['SECRET_KEY'] = 'your-secret-key-here' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////app/instance/fhir_ig.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['FHIR_PACKAGES_DIR'] = os.path.join(app.instance_path, 'fhir_packages') -app.config['API_KEY'] = 'your-api-key-here' # Hardcoded API key for now; replace with a secure solution +app.config['API_KEY'] = 'your-api-key-here' +app.config['VALIDATE_IMPOSED_PROFILES'] = True # Enable/disable imposed profile validation +app.config['DISPLAY_PROFILE_RELATIONSHIPS'] = True # Enable/disable UI display of relationships # Ensure directories exist and are writable instance_path = '/app/instance' @@ -98,7 +100,6 @@ def import_ig(): result = services.import_package_and_dependencies(name, version, dependency_mode=dependency_mode) if result['errors'] and not result['downloaded']: error_msg = result['errors'][0] - # Simplify the error message by taking the last part after the last colon simplified_msg = error_msg.split(": ")[-1] if ": " in error_msg else error_msg flash(f"Failed to import {name}#{version}: {simplified_msg}", "error - check the name and version!") return redirect(url_for('import_ig')) @@ -119,24 +120,19 @@ def view_igs(): if os.path.exists(packages_dir): for filename in os.listdir(packages_dir): if filename.endswith('.tgz'): - # Split on the last hyphen to separate name and version last_hyphen_index = filename.rfind('-') if last_hyphen_index != -1 and filename.endswith('.tgz'): name = filename[:last_hyphen_index] - version = filename[last_hyphen_index + 1:-4] # Remove .tgz - # Validate that the version looks reasonable (e.g., starts with a digit or is a known keyword) + version = filename[last_hyphen_index + 1:-4] if version[0].isdigit() or version in ('preview', 'current', 'latest'): - # Replace underscores with dots to match FHIR package naming convention name = name.replace('_', '.') packages.append({'name': name, 'version': version, 'filename': filename}) else: - # Fallback: treat as name only, log warning name = filename[:-4] version = '' logger.warning(f"Could not parse version from {filename}, treating as name only") packages.append({'name': name, 'version': version, 'filename': filename}) else: - # Fallback: treat as name only, log warning name = filename[:-4] version = '' logger.warning(f"Could not parse version from {filename}, treating as name only") @@ -156,7 +152,7 @@ def view_igs(): # Calculate duplicate_groups duplicate_groups = {} for name, pkgs in duplicate_names.items(): - if len(pkgs) > 1: # Only include packages with multiple versions + if len(pkgs) > 1: duplicate_groups[name] = [pkg['version'] for pkg in pkgs] # Precompute group colors @@ -165,10 +161,11 @@ def view_igs(): for i, name in enumerate(duplicate_groups.keys()): group_colors[name] = colors[i % len(colors)] - return render_template('cp_downloaded_igs.html', packages=packages, processed_list=igs, - processed_ids=processed_ids, duplicate_names=duplicate_names, + return render_template('cp_downloaded_igs.html', packages=packages, processed_list=igs, + processed_ids=processed_ids, duplicate_names=duplicate_names, duplicate_groups=duplicate_groups, group_colors=group_colors, - site_name='FLARE FHIR IG Toolkit', now=datetime.now()) + site_name='FLARE FHIR IG Toolkit', now=datetime.now(), + config=app.config) @app.route('/push-igs', methods=['GET', 'POST']) def push_igs(): @@ -181,24 +178,19 @@ def push_igs(): if os.path.exists(packages_dir): for filename in os.listdir(packages_dir): if filename.endswith('.tgz'): - # Split on the last hyphen to separate name and version last_hyphen_index = filename.rfind('-') if last_hyphen_index != -1 and filename.endswith('.tgz'): name = filename[:last_hyphen_index] - version = filename[last_hyphen_index + 1:-4] # Remove .tgz - # Validate that the version looks reasonable (e.g., starts with a digit or is a known keyword) + version = filename[last_hyphen_index + 1:-4] if version[0].isdigit() or version in ('preview', 'current', 'latest'): - # Replace underscores with dots to match FHIR package naming convention name = name.replace('_', '.') packages.append({'name': name, 'version': version, 'filename': filename}) else: - # Fallback: treat as name only, log warning name = filename[:-4] version = '' logger.warning(f"Could not parse version from {filename}, treating as name only") packages.append({'name': name, 'version': version, 'filename': filename}) else: - # Fallback: treat as name only, log warning name = filename[:-4] version = '' logger.warning(f"Could not parse version from {filename}, treating as name only") @@ -218,7 +210,7 @@ def push_igs(): # Calculate duplicate_groups duplicate_groups = {} for name, pkgs in duplicate_names.items(): - if len(pkgs) > 1: # Only include packages with multiple versions + if len(pkgs) > 1: duplicate_groups[name] = [pkg['version'] for pkg in pkgs] # Precompute group colors @@ -227,11 +219,12 @@ def push_igs(): for i, name in enumerate(duplicate_groups.keys()): group_colors[name] = colors[i % len(colors)] - return render_template('cp_push_igs.html', packages=packages, processed_list=igs, - processed_ids=processed_ids, duplicate_names=duplicate_names, + return render_template('cp_push_igs.html', packages=packages, processed_list=igs, + processed_ids=processed_ids, duplicate_names=duplicate_names, duplicate_groups=duplicate_groups, group_colors=group_colors, site_name='FLARE FHIR IG Toolkit', now=datetime.now(), - api_key=app.config['API_KEY']) # Pass the API key to the template + api_key=app.config['API_KEY'], + config=app.config) @app.route('/process-igs', methods=['POST']) def process_ig(): @@ -246,12 +239,10 @@ def process_ig(): return redirect(url_for('view_igs')) try: - # Parse name and version from filename last_hyphen_index = filename.rfind('-') if last_hyphen_index != -1 and filename.endswith('.tgz'): name = filename[:last_hyphen_index] version = filename[last_hyphen_index + 1:-4] - # Replace underscores with dots to match FHIR package naming convention name = name.replace('_', '.') else: name = filename[:-4] @@ -320,9 +311,25 @@ def view_ig(processed_ig_id): profile_list = [t for t in processed_ig.resource_types_info if t.get('is_profile')] base_list = [t for t in processed_ig.resource_types_info if not t.get('is_profile')] examples_by_type = processed_ig.examples or {} + + # Load metadata to get profile relationships + package_name = processed_ig.package_name + version = processed_ig.version + metadata_filename = f"{services.sanitize_filename_part(package_name)}-{services.sanitize_filename_part(version)}.metadata.json" + metadata_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], metadata_filename) + complies_with_profiles = [] + imposed_profiles = [] + if os.path.exists(metadata_path): + with open(metadata_path, 'r') as f: + metadata = json.load(f) + complies_with_profiles = metadata.get('complies_with_profiles', []) + imposed_profiles = metadata.get('imposed_profiles', []) + return render_template('cp_view_processed_ig.html', title=f"View {processed_ig.package_name}#{processed_ig.version}", processed_ig=processed_ig, profile_list=profile_list, base_list=base_list, - examples_by_type=examples_by_type, site_name='FLARE FHIR IG Toolkit', now=datetime.now()) + examples_by_type=examples_by_type, site_name='FLARE FHIR IG Toolkit', now=datetime.now(), + complies_with_profiles=complies_with_profiles, imposed_profiles=imposed_profiles, + config=app.config) @app.route('/get-structure') def get_structure_definition(): @@ -332,7 +339,6 @@ def get_structure_definition(): if not all([package_name, package_version, resource_identifier]): return jsonify({"error": "Missing query parameters"}), 400 - # First, try to get the structure definition from the specified package tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], services._construct_tgz_filename(package_name, package_version)) sd_data = None fallback_used = False @@ -340,13 +346,11 @@ def get_structure_definition(): if os.path.exists(tgz_path): sd_data, _ = services.find_and_extract_sd(tgz_path, resource_identifier) - # If not found, fall back to the core FHIR package (hl7.fhir.r4.core#4.0.1) if sd_data is None: logger.debug(f"Structure definition for '{resource_identifier}' not found in {package_name}#{package_version}, attempting fallback to hl7.fhir.r4.core#4.0.1") core_package_name = "hl7.fhir.r4.core" core_package_version = "4.0.1" - # Ensure the core package is downloaded core_tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], services._construct_tgz_filename(core_package_name, core_package_version)) if not os.path.exists(core_tgz_path): logger.debug(f"Core package {core_package_name}#{core_package_version} not found, attempting to download") @@ -359,7 +363,6 @@ def get_structure_definition(): logger.error(f"Error downloading core package: {str(e)}") return jsonify({"error": f"SD for '{resource_identifier}' not found in {package_name}#{package_version}, and error downloading core package: {str(e)}"}), 500 - # Try to extract the structure definition from the core package if os.path.exists(core_tgz_path): sd_data, _ = services.find_and_extract_sd(core_tgz_path, resource_identifier) if sd_data is None: @@ -418,52 +421,57 @@ def get_package_metadata(): # API Endpoint: Import IG Package @app.route('/api/import-ig', methods=['POST']) def api_import_ig(): - # Check API key auth_error = check_api_key() if auth_error: return auth_error - # Validate request if not request.is_json: return jsonify({"status": "error", "message": "Request must be JSON"}), 400 data = request.get_json() package_name = data.get('package_name') version = data.get('version') - dependency_mode = data.get('dependency_mode', 'recursive') # Default to recursive + dependency_mode = data.get('dependency_mode', 'recursive') if not package_name or not version: return jsonify({"status": "error", "message": "Missing package_name or version"}), 400 - # Validate package name and version format using re if not (isinstance(package_name, str) and isinstance(version, str) and re.match(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+$', package_name) and re.match(r'^[a-zA-Z0-9\.\-]+$', version)): return jsonify({"status": "error", "message": "Invalid package name or version format"}), 400 - # Validate dependency mode valid_modes = ['recursive', 'patch-canonical', 'tree-shaking'] if dependency_mode not in valid_modes: return jsonify({"status": "error", "message": f"Invalid dependency mode: {dependency_mode}. Must be one of {valid_modes}"}), 400 try: - # Import package and dependencies result = services.import_package_and_dependencies(package_name, version, dependency_mode=dependency_mode) if result['errors'] and not result['downloaded']: return jsonify({"status": "error", "message": f"Failed to import {package_name}#{version}: {result['errors'][0]}"}), 500 + # Process the package to get compliesWithProfile and imposeProfile + package_filename = f"{services.sanitize_filename_part(package_name)}-{services.sanitize_filename_part(version)}.tgz" + package_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], package_filename) + complies_with_profiles = [] + imposed_profiles = [] + if os.path.exists(package_path): + process_result = services.process_package_file(package_path) + complies_with_profiles = process_result.get('complies_with_profiles', []) + imposed_profiles = process_result.get('imposed_profiles', []) + else: + logger.warning(f"Package file not found after import: {package_path}") + # Check for duplicates packages = [] packages_dir = app.config['FHIR_PACKAGES_DIR'] if os.path.exists(packages_dir): for filename in os.listdir(packages_dir): if filename.endswith('.tgz'): - # Split on the last hyphen to separate name and version last_hyphen_index = filename.rfind('-') if last_hyphen_index != -1 and filename.endswith('.tgz'): name = filename[:last_hyphen_index] version = filename[last_hyphen_index + 1:-4] - # Replace underscores with dots to match FHIR package naming convention name = name.replace('_', '.') if version[0].isdigit() or version in ('preview', 'current', 'latest'): packages.append({'name': name, 'version': version, 'filename': filename}) @@ -476,7 +484,6 @@ def api_import_ig(): version = '' packages.append({'name': name, 'version': version, 'filename': filename}) - # Calculate duplicates duplicate_names = {} for pkg in packages: name = pkg['name'] @@ -490,7 +497,6 @@ def api_import_ig(): versions = [pkg['version'] for pkg in pkgs] duplicates.append(f"{name} (exists as {', '.join(versions)})") - # Deduplicate dependencies seen = set() unique_dependencies = [] for dep in result.get('dependencies', []): @@ -499,7 +505,6 @@ def api_import_ig(): seen.add(dep_str) unique_dependencies.append(dep_str) - # Prepare response response = { "status": "success", "message": "Package imported successfully", @@ -507,6 +512,8 @@ def api_import_ig(): "version": version, "dependency_mode": dependency_mode, "dependencies": unique_dependencies, + "complies_with_profiles": complies_with_profiles, + "imposed_profiles": imposed_profiles, "duplicates": duplicates } return jsonify(response), 200 @@ -518,12 +525,10 @@ def api_import_ig(): # API Endpoint: Push IG to FHIR Server with Streaming @app.route('/api/push-ig', methods=['POST']) def api_push_ig(): - # Check API key auth_error = check_api_key() if auth_error: return auth_error - # Validate request if not request.is_json: return jsonify({"status": "error", "message": "Request must be JSON"}), 400 @@ -536,13 +541,11 @@ def api_push_ig(): if not all([package_name, version, fhir_server_url]): return jsonify({"status": "error", "message": "Missing package_name, version, or fhir_server_url"}), 400 - # Validate package name and version format using re if not (isinstance(package_name, str) and isinstance(version, str) and re.match(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+$', package_name) and re.match(r'^[a-zA-Z0-9\.\-]+$', version)): return jsonify({"status": "error", "message": "Invalid package name or version format"}), 400 - # Check if package exists tgz_filename = services._construct_tgz_filename(package_name, version) tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], tgz_filename) if not os.path.exists(tgz_path): @@ -550,21 +553,10 @@ def api_push_ig(): def generate_stream(): try: - # Start message yield json.dumps({"type": "start", "message": f"Starting push for {package_name}#{version}..."}) + "\n" - # Extract resources from the main package resources = [] - with tarfile.open(tgz_path, "r:gz") as tar: - for member in tar.getmembers(): - if member.name.startswith('package/') and member.name.endswith('.json') and not member.name.endswith('package.json'): - with tar.extractfile(member) as f: - resource_data = json.load(f) - if 'resourceType' in resource_data: - resources.append(resource_data) - - # If include_dependencies is True, fetch dependencies from metadata - pushed_packages = [f"{package_name}#{version}"] + packages_to_push = [(package_name, version, tgz_path)] if include_dependencies: yield json.dumps({"type": "progress", "message": "Processing dependencies..."}) + "\n" metadata = services.get_package_metadata(package_name, version) @@ -575,36 +567,44 @@ def api_push_ig(): dep_tgz_filename = services._construct_tgz_filename(dep_name, dep_version) dep_tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], dep_tgz_filename) if os.path.exists(dep_tgz_path): - with tarfile.open(dep_tgz_path, "r:gz") as tar: - for member in tar.getmembers(): - if member.name.startswith('package/') and member.name.endswith('.json') and not member.name.endswith('package.json'): - with tar.extractfile(member) as f: - resource_data = json.load(f) - if 'resourceType' in resource_data: - resources.append(resource_data) - pushed_packages.append(f"{dep_name}#{dep_version}") + packages_to_push.append((dep_name, dep_version, dep_tgz_path)) yield json.dumps({"type": "progress", "message": f"Added dependency {dep_name}#{dep_version}"}) + "\n" else: yield json.dumps({"type": "warning", "message": f"Dependency {dep_name}#{dep_version} not found, skipping"}) + "\n" - # Push resources to FHIR server + for pkg_name, pkg_version, pkg_path in packages_to_push: + with tarfile.open(pkg_path, "r:gz") as tar: + for member in tar.getmembers(): + if member.name.startswith('package/') and member.name.endswith('.json') and not member.name.endswith('package.json'): + with tar.extractfile(member) as f: + resource_data = json.load(f) + if 'resourceType' in resource_data: + resources.append((resource_data, pkg_name, pkg_version)) + server_response = [] success_count = 0 failure_count = 0 total_resources = len(resources) yield json.dumps({"type": "progress", "message": f"Found {total_resources} resources to upload"}) + "\n" - for i, resource in enumerate(resources, 1): + pushed_packages = [] + for i, (resource, pkg_name, pkg_version) in enumerate(resources, 1): resource_type = resource.get('resourceType') resource_id = resource.get('id') if not resource_type or not resource_id: - yield json.dumps({"type": "warning", "message": f"Skipping invalid resource at index {i}"}) + "\n" + yield json.dumps({"type": "warning", "message": f"Skipping invalid resource at index {i} from {pkg_name}#{pkg_version}"}) + "\n" + failure_count += 1 + continue + + # Validate against the profile and imposed profiles + validation_result = services.validate_resource_against_profile(resource, pkg_name, pkg_version, resource_type) + if not validation_result['valid']: + yield json.dumps({"type": "error", "message": f"Validation failed for {resource_type}/{resource_id} in {pkg_name}#{pkg_version}: {', '.join(validation_result['errors'])}"}) + "\n" failure_count += 1 continue - # Construct the FHIR server URL for the resource resource_url = f"{fhir_server_url.rstrip('/')}/{resource_type}/{resource_id}" - yield json.dumps({"type": "progress", "message": f"Uploading {resource_type}/{resource_id} ({i}/{total_resources})..."}) + "\n" + yield json.dumps({"type": "progress", "message": f"Uploading {resource_type}/{resource_id} ({i}/{total_resources}) from {pkg_name}#{pkg_version}..."}) + "\n" try: response = requests.put(resource_url, json=resource, headers={'Content-Type': 'application/fhir+json'}) @@ -612,13 +612,14 @@ def api_push_ig(): server_response.append(f"Uploaded {resource_type}/{resource_id} successfully") yield json.dumps({"type": "success", "message": f"Uploaded {resource_type}/{resource_id} successfully"}) + "\n" success_count += 1 + if f"{pkg_name}#{pkg_version}" not in pushed_packages: + pushed_packages.append(f"{pkg_name}#{pkg_version}") except requests.exceptions.RequestException as e: error_msg = f"Failed to upload {resource_type}/{resource_id}: {str(e)}" server_response.append(error_msg) yield json.dumps({"type": "error", "message": error_msg}) + "\n" failure_count += 1 - # Final summary summary = { "status": "success" if failure_count == 0 else "partial", "message": f"Push completed: {success_count} resources uploaded, {failure_count} failed", diff --git a/instance/fhir_ig.db b/instance/fhir_ig.db index e9b848412dd5dbadcd5abde11b450006c109de39..fb27c588a8d1a9d40cb50a265c55a3778fe20865 100644 GIT binary patch delta 278 zcmZo@;BIK(o**swmVtpm2Z))0Set=?am7R(W5%}|6IRPhasY+cfmj@f#ei58h*^P{ iZL*3&J+Bs!1=Il6%(VF-pPJz24}9W7m)O{FfCT`qmsHyT delta 278 zcmZo@;BIK(o**qakAZa!cG2^_A39IELwID(a;y_vqh(&>z6^L~v ht0>g-vH@A_K+FNeOq(C_sR?fWz$ZR*iH!{hSOCxyQ}X}- diff --git a/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json b/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json index 61e2718..d1f669e 100644 --- a/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json +++ b/instance/fhir_packages/hl7.fhir.r4.core-4.0.1.metadata.json @@ -2,5 +2,7 @@ "package_name": "hl7.fhir.r4.core", "version": "4.0.1", "dependency_mode": "recursive", - "imported_dependencies": [] + "imported_dependencies": [], + "complies_with_profiles": [], + "imposed_profiles": [] } \ No newline at end of file diff --git a/services.py b/services.py index 776bc95..6d7431e 100644 --- a/services.py +++ b/services.py @@ -21,7 +21,7 @@ CANONICAL_PACKAGE = ("hl7.fhir.r4.core", "4.0.1") # Define the canonical FHIR p def _get_download_dir(): """Gets the absolute path to the download directory, creating it if needed.""" logger = logging.getLogger(__name__) - instance_path = None # Initialize + instance_path = None # Initialize try: instance_path = current_app.instance_path logger.debug(f"Using instance path from current_app: {instance_path}") @@ -31,8 +31,8 @@ def _get_download_dir(): logger.debug(f"Constructed instance path: {instance_path}") if not instance_path: - logger.error("Fatal Error: Could not determine instance path.") - return None + logger.error("Fatal Error: Could not determine instance path.") + return None download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME) try: @@ -42,10 +42,10 @@ def _get_download_dir(): logger.error(f"Fatal Error creating dir {download_dir}: {e}", exc_info=True) return None -def sanitize_filename_part(text): # Public version +def sanitize_filename_part(text): """Basic sanitization for name/version parts of filename.""" safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text) - safe_text = re.sub(r'_+', '_', safe_text) # Uses re + safe_text = re.sub(r'_+', '_', safe_text) safe_text = safe_text.strip('_-.') return safe_text if safe_text else "invalid_name" @@ -53,7 +53,7 @@ def _construct_tgz_filename(name, version): """Constructs the standard filename using the sanitized parts.""" return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz" -def find_and_extract_sd(tgz_path, resource_identifier): # Public version +def find_and_extract_sd(tgz_path, resource_identifier): """Helper to find and extract SD json from a given tgz path by ID, Name, or Type.""" sd_data = None found_path = None @@ -83,15 +83,15 @@ def find_and_extract_sd(tgz_path, resource_identifier): # Public version sd_type = data.get('type') # Match if requested identifier matches ID, Name, or Base Type if resource_identifier == sd_type or resource_identifier == sd_id or resource_identifier == sd_name: - sd_data = data - found_path = member.name - logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}") - break # Stop searching once found + sd_data = data + found_path = member.name + logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}") + break # Stop searching once found except Exception as e: - # Log issues reading/parsing individual files but continue search logger.warning(f"Could not read/parse potential SD {member.name}: {e}") finally: - if fileobj: fileobj.close() # Ensure resource cleanup + if fileobj: + fileobj.close() if sd_data is None: logger.info(f"SD matching '{resource_identifier}' not found within archive {os.path.basename(tgz_path)} - caller may attempt fallback") @@ -106,8 +106,8 @@ def find_and_extract_sd(tgz_path, resource_identifier): # Public version raise return sd_data, found_path -def save_package_metadata(name, version, dependency_mode, dependencies): - """Saves the dependency mode and imported dependencies as metadata alongside the package.""" +def save_package_metadata(name, version, dependency_mode, dependencies, complies_with_profiles=None, imposed_profiles=None): + """Saves the dependency mode, imported dependencies, and profile relationships as metadata alongside the package.""" logger = logging.getLogger(__name__) download_dir = _get_download_dir() if not download_dir: @@ -118,7 +118,9 @@ def save_package_metadata(name, version, dependency_mode, dependencies): 'package_name': name, 'version': version, 'dependency_mode': dependency_mode, - 'imported_dependencies': dependencies # List of {'name': ..., 'version': ...} + 'imported_dependencies': dependencies, + 'complies_with_profiles': complies_with_profiles or [], + 'imposed_profiles': imposed_profiles or [] } metadata_filename = f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.metadata.json" metadata_path = os.path.join(download_dir, metadata_filename) @@ -150,6 +152,108 @@ def get_package_metadata(name, version): return None return None +def validate_resource_against_profile(resource, package_name, package_version, resource_type): + """ + Validate a FHIR resource against a profile and its imposed profiles. + Returns a dictionary with validation results. + """ + logger = logging.getLogger(__name__) + result = { + 'valid': True, + 'errors': [], + 'imposed_profile_results': {} + } + + # Load the primary profile + package_filename = f"{sanitize_filename_part(package_name)}-{sanitize_filename_part(package_version)}.tgz" + package_path = os.path.join(_get_download_dir(), package_filename) + if not os.path.exists(package_path): + result['valid'] = False + result['errors'].append(f"Package not found: {package_name}#{package_version}") + return result + + # Find the StructureDefinition for the resource type + sd_filename = f"package/StructureDefinition-{resource_type.lower()}.json" + if package_name == 'hl7.fhir.us.core': + sd_filename = f"package/StructureDefinition-us-core-{resource_type.lower()}.json" + + primary_profile_valid = True + primary_errors = [] + with tarfile.open(package_path, "r:gz") as tar: + try: + file_obj = tar.extractfile(sd_filename) + if file_obj is None: + raise KeyError(f"StructureDefinition not found: {sd_filename}") + sd_data = json.load(file_obj) + # Simplified validation: Check required elements + snapshot = sd_data.get('snapshot', {}) + for element in snapshot.get('element', []): + if element.get('min', 0) > 0: # Required element + path = element.get('path') + # Check if the path exists in the resource + keys = path.split('.') + current = resource + for key in keys[1:]: # Skip the resourceType + current = current.get(key) + if current is None: + primary_profile_valid = False + primary_errors.append(f"Missing required element {path} in {package_name}#{package_version}") + break + except (KeyError, json.JSONDecodeError) as e: + primary_profile_valid = False + primary_errors.append(f"Error loading StructureDefinition: {str(e)}") + + if not primary_profile_valid: + result['valid'] = False + result['errors'].extend(primary_errors) + + # Check imposed profiles if validation is enabled + if not current_app.config.get('VALIDATE_IMPOSED_PROFILES', True): + logger.info("Imposed profile validation is disabled via configuration.") + return result + + metadata_filename = f"{sanitize_filename_part(package_name)}-{sanitize_filename_part(package_version)}.metadata.json" + metadata_path = os.path.join(_get_download_dir(), metadata_filename) + if not os.path.exists(metadata_path): + logger.warning(f"Metadata not found for {package_name}#{package_version}, skipping imposed profile validation.") + return result + + with open(metadata_path, 'r') as f: + metadata = json.load(f) + imposed_profiles = metadata.get('imposed_profiles', []) + + for imposed_url in imposed_profiles: + # Parse the canonical URL to get package name and version + # Example: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient|3.1.1 + try: + imposed_package, imposed_version = parse_canonical_url(imposed_url) + except ValueError as e: + result['errors'].append(f"Invalid canonical URL for imposed profile: {imposed_url} - {str(e)}") + continue + + imposed_result = validate_resource_against_profile(resource, imposed_package, imposed_version, resource_type) + result['imposed_profile_results'][imposed_url] = imposed_result + if not imposed_result['valid']: + result['valid'] = False + result['errors'].extend([f"Failed imposed profile {imposed_url}: {err}" for err in imposed_result['errors']]) + + return result + +def parse_canonical_url(canonical_url): + """ + Parse a canonical URL to extract package name and version. + Example: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient|3.1.1 + Returns (package_name, version) + """ + parts = canonical_url.split('|') + if len(parts) != 2: + raise ValueError("Canonical URL must include version after '|'") + version = parts[1] + path_parts = parts[0].split('/') + # Extract package name (e.g., hl7.fhir.us.core) + package_name = '.'.join(path_parts[3:5]) # Adjust based on URL structure + return package_name, version + # --- Core Service Functions --- def download_package(name, version): @@ -161,7 +265,7 @@ def download_package(name, version): package_id = f"{name}#{version}" package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}" - filename = _construct_tgz_filename(name, version) # Uses public sanitize via helper + filename = _construct_tgz_filename(name, version) save_path = os.path.join(download_dir, filename) if os.path.exists(save_path): @@ -208,11 +312,11 @@ def extract_dependencies(tgz_path): raise FileNotFoundError(f"Could not extract {package_json_path}") except KeyError: error_message = f"'{package_json_path}' not found in {os.path.basename(tgz_path)}."; - logger.warning(error_message) # OK if missing + logger.warning(error_message) except (json.JSONDecodeError, UnicodeDecodeError) as e: - error_message = f"Parse error in {package_json_path}: {e}"; logger.error(error_message); dependencies = None # Parsing failed + error_message = f"Parse error in {package_json_path}: {e}"; logger.error(error_message); dependencies = None except (tarfile.TarError, FileNotFoundError) as e: - error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None # Archive read failed + error_message = f"Archive error {os.path.basename(tgz_path)}: {e}"; logger.error(error_message); dependencies = None except Exception as e: error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None return dependencies, error_message @@ -291,14 +395,10 @@ def map_types_to_packages(used_types, all_dependencies): logger = logging.getLogger(__name__) type_to_package = {} for (pkg_name, pkg_version), deps in all_dependencies.items(): - # Simplified mapping: assume package names indicate the types they provide - # In a real implementation, you'd need to inspect each package's contents for dep_name, dep_version in deps.items(): - # Heuristic: map types to packages based on package name for t in used_types: if t.lower() in dep_name.lower(): type_to_package[t] = (dep_name, dep_version) - # Special case for the package itself for t in used_types: if t.lower() in pkg_name.lower(): type_to_package[t] = (pkg_name, pkg_version) @@ -320,7 +420,7 @@ def import_package_and_dependencies(initial_name, initial_version, dependency_mo 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, - 'dependencies': [], # Store dependencies as a list + 'dependencies': [], 'errors': [] } pending_queue = [(initial_name, initial_version)] @@ -351,12 +451,18 @@ def import_package_and_dependencies(initial_name, initial_version, dependency_mo if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: results['dependencies'].append({"name": dep_name, "version": dep_version}) - # Save metadata for the initial package - save_package_metadata(initial_name, initial_version, dependency_mode, results['dependencies']) + # Process the package to extract compliesWithProfile and imposeProfile + package_info = process_package_file(save_path) + complies_with_profiles = package_info.get('complies_with_profiles', []) + imposed_profiles = package_info.get('imposed_profiles', []) + + # Save metadata for the initial package with profile relationships + save_package_metadata(initial_name, initial_version, dependency_mode, results['dependencies'], + complies_with_profiles=complies_with_profiles, + imposed_profiles=imposed_profiles) # Handle dependency pulling based on mode if dependency_mode == 'recursive': - # Current behavior: recursively download all dependencies for dep in results['dependencies']: dep_name, dep_version = dep['name'], dep['version'] dep_tuple = (dep_name, dep_version) @@ -365,7 +471,6 @@ def import_package_and_dependencies(initial_name, initial_version, dependency_mo logger.debug(f"Added to queue (recursive): {dep_name}#{dep_version}") elif dependency_mode == 'patch-canonical': - # Patch Canonical: Only download the canonical package if needed canonical_name, canonical_version = CANONICAL_PACKAGE canonical_tuple = (canonical_name, canonical_version) if canonical_tuple not in processed_lookup: @@ -373,15 +478,10 @@ def import_package_and_dependencies(initial_name, initial_version, dependency_mo logger.debug(f"Added canonical package to queue: {canonical_name}#{canonical_version}") elif dependency_mode == 'tree-shaking': - # Tree Shaking: Analyze the initial package to determine used types used_types = extract_used_types(save_path) logger.debug(f"Used types in {initial_name}#{initial_version}: {used_types}") - - # Map used types to packages type_to_package = map_types_to_packages(used_types, results['all_dependencies']) logger.debug(f"Type to package mapping: {type_to_package}") - - # Add only the necessary packages to the queue for t, (dep_name, dep_version) in type_to_package.items(): dep_tuple = (dep_name, dep_version) if dep_tuple not in processed_lookup and dep_tuple != package_id_tuple: @@ -414,121 +514,203 @@ def import_package_and_dependencies(initial_name, initial_version, dependency_mo results['all_dependencies'][package_id_tuple] = dependencies results['processed'].add(package_id_tuple) logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}") - # Add dependencies to the list for dep_name, dep_version in dependencies.items(): if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version: dep_tuple = (dep_name, dep_version) results['dependencies'].append({"name": dep_name, "version": dep_version}) - # For recursive mode, add to queue if dependency_mode == 'recursive' and dep_tuple not in processed_lookup: pending_queue.append(dep_tuple) logger.debug(f"Added to queue: {dep_name}#{dep_version}") - proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors']) + proc_count = len(results['processed']) + dl_count = len(results['downloaded']) + err_count = len(results['errors']) logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}") return results -# --- Package File Content Processor (V6.2 - Fixed MS path handling) --- +# --- Package File Content Processor --- def process_package_file(tgz_path): - """ Extracts types, profile status, MS elements, and examples from a downloaded .tgz package (Single Pass). """ + """ Extracts types, profile status, MS elements, examples, and profile relationships from a downloaded .tgz package. """ logger = logging.getLogger(__name__) - logger.info(f"Processing package file details (V6.2 Logic): {tgz_path}") + logger.info(f"Processing package file details: {tgz_path}") - results = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'errors': [] } - resource_info = defaultdict(lambda: {'name': None, 'type': None, 'is_profile': False, 'ms_flag': False, 'ms_paths': set(), 'examples': set()}) + results = { + 'resource_types_info': [], + 'must_support_elements': {}, + 'examples': {}, + 'complies_with_profiles': [], + 'imposed_profiles': [], + 'errors': [] + } + resource_info = defaultdict(lambda: { + 'name': None, + 'type': None, + 'is_profile': False, + 'ms_flag': False, + 'ms_paths': set(), + 'examples': set() + }) if not tgz_path or not os.path.exists(tgz_path): - results['errors'].append(f"Package file not found: {tgz_path}"); return results + results['errors'].append(f"Package file not found: {tgz_path}") + return results try: with tarfile.open(tgz_path, "r:gz") as tar: for member in tar: - if not member.isfile() or not member.name.startswith('package/') or not member.name.lower().endswith(('.json', '.xml', '.html')): continue - member_name_lower = member.name.lower(); base_filename_lower = os.path.basename(member_name_lower); fileobj = None - if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: continue + if not member.isfile() or not member.name.startswith('package/') or not member.name.lower().endswith(('.json', '.xml', '.html')): + continue + member_name_lower = member.name.lower() + base_filename_lower = os.path.basename(member_name_lower) + fileobj = None + if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']: + continue is_example = member.name.startswith('package/example/') or 'example' in base_filename_lower is_json = member_name_lower.endswith('.json') - try: # Process individual member + try: if is_json: - fileobj = tar.extractfile(member); - if not fileobj: continue - content_bytes = fileobj.read(); content_string = content_bytes.decode('utf-8-sig'); data = json.loads(content_string) - if not isinstance(data, dict) or 'resourceType' not in data: continue + fileobj = tar.extractfile(member) + if not fileobj: + continue + content_bytes = fileobj.read() + content_string = content_bytes.decode('utf-8-sig') + data = json.loads(content_string) + if not isinstance(data, dict) or 'resourceType' not in data: + continue - resource_type = data['resourceType']; entry_key = resource_type; is_sd = False + resource_type = data['resourceType'] + entry_key = resource_type + is_sd = False if resource_type == 'StructureDefinition': - is_sd = True; profile_id = data.get('id') or data.get('name'); sd_type = data.get('type'); sd_base = data.get('baseDefinition'); is_profile_sd = bool(sd_base); - if not profile_id or not sd_type: logger.warning(f"SD missing ID or Type: {member.name}"); continue + is_sd = True + profile_id = data.get('id') or data.get('name') + sd_type = data.get('type') + sd_base = data.get('baseDefinition') + is_profile_sd = bool(sd_base) + if not profile_id or not sd_type: + logger.warning(f"SD missing ID or Type: {member.name}") + continue entry_key = profile_id - - entry = resource_info[entry_key]; entry.setdefault('type', resource_type) # Ensure type exists + + # Extract compliesWithProfile and imposeProfile extensions + complies_with = [] + imposed_profiles = [] + for ext in data.get('extension', []): + if ext.get('url') == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-compliesWithProfile': + value = ext.get('valueCanonical') + if value: + complies_with.append(value) + elif ext.get('url') == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-imposeProfile': + value = ext.get('valueCanonical') + if value: + imposed_profiles.append(value) + + # Store the relationships + if complies_with: + results['complies_with_profiles'].extend(complies_with) + if imposed_profiles: + results['imposed_profiles'].extend(imposed_profiles) + + entry = resource_info[entry_key] + entry.setdefault('type', resource_type) if is_sd: - entry['name'] = entry_key; entry['type'] = sd_type; entry['is_profile'] = is_profile_sd; + entry['name'] = entry_key + entry['type'] = sd_type + entry['is_profile'] = is_profile_sd if not entry.get('sd_processed'): - has_ms = False; ms_paths_for_sd = set() + has_ms = False + ms_paths_for_sd = set() for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]: for element in element_list: if isinstance(element, dict) and element.get('mustSupport') is True: - # --- FIX: Check path safely --- - element_path = element.get('path') - if element_path: # Only add if path exists + element_path = element.get('path') + if element_path: ms_paths_for_sd.add(element_path) - has_ms = True # Mark MS found if we added a path + has_ms = True else: - logger.warning(f"Found mustSupport=true without path in element of {entry_key}") - # --- End FIX --- - if ms_paths_for_sd: entry['ms_paths'] = ms_paths_for_sd # Store the set of paths - if has_ms: entry['ms_flag'] = True; logger.debug(f" Found MS elements in {entry_key}") # Use boolean flag - entry['sd_processed'] = True # Mark MS check done + logger.warning(f"Found mustSupport=true without path in element of {entry_key}") + if ms_paths_for_sd: + entry['ms_paths'] = ms_paths_for_sd + if has_ms: + entry['ms_flag'] = True + logger.debug(f" Found MS elements in {entry_key}") + entry['sd_processed'] = True - elif is_example: # JSON Example - key_to_use = None; profile_meta = data.get('meta', {}).get('profile', []) - if profile_meta and isinstance(profile_meta, list): - for profile_url in profile_meta: profile_id_from_meta = profile_url.split('/')[-1]; - if profile_id_from_meta in resource_info: key_to_use = profile_id_from_meta; break - if not key_to_use: key_to_use = resource_type - if key_to_use not in resource_info: resource_info[key_to_use].update({'name': key_to_use, 'type': resource_type}) - resource_info[key_to_use]['examples'].add(member.name) + elif is_example: + key_to_use = None + profile_meta = data.get('meta', {}).get('profile', []) + if profile_meta and isinstance(profile_meta, list): + for profile_url in profile_meta: + profile_id_from_meta = profile_url.split('/')[-1] + if profile_id_from_meta in resource_info: + key_to_use = profile_id_from_meta + break + if not key_to_use: + key_to_use = resource_type + if key_to_use not in resource_info: + resource_info[key_to_use].update({'name': key_to_use, 'type': resource_type}) + resource_info[key_to_use]['examples'].add(member.name) - elif is_example: # XML/HTML examples - # ... (XML/HTML example association logic) ... - guessed_type = base_filename_lower.split('-')[0].capitalize(); guessed_profile_id = base_filename_lower.split('-')[0]; key_to_use = None - if guessed_profile_id in resource_info: key_to_use = guessed_profile_id - elif guessed_type in resource_info: key_to_use = guessed_type - if key_to_use: resource_info[key_to_use]['examples'].add(member.name) - else: logger.warning(f"Could not associate non-JSON example {member.name}") + elif is_example: + guessed_type = base_filename_lower.split('-')[0].capitalize() + guessed_profile_id = base_filename_lower.split('-')[0] + key_to_use = None + if guessed_profile_id in resource_info: + key_to_use = guessed_profile_id + elif guessed_type in resource_info: + key_to_use = guessed_type + if key_to_use: + resource_info[key_to_use]['examples'].add(member.name) + else: + logger.warning(f"Could not associate non-JSON example {member.name}") - except Exception as e: logger.warning(f"Could not process member {member.name}: {e}", exc_info=False) + except Exception as e: + logger.warning(f"Could not process member {member.name}: {e}", exc_info=False) finally: - if fileobj: fileobj.close() - # -- End Member Loop -- + if fileobj: + fileobj.close() - # --- Final formatting moved INSIDE the main try block --- - final_list = []; final_ms_elements = {}; final_examples = {} + # Final formatting + final_list = [] + final_ms_elements = {} + final_examples = {} logger.debug(f"Formatting results from resource_info keys: {list(resource_info.keys())}") for key, info in resource_info.items(): - display_name = info.get('name') or key; base_type = info.get('type') + display_name = info.get('name') or key + base_type = info.get('type') if display_name or base_type: logger.debug(f" Formatting item '{display_name}': type='{base_type}', profile='{info.get('is_profile', False)}', ms_flag='{info.get('ms_flag', False)}'") - final_list.append({'name': display_name, 'type': base_type, 'is_profile': info.get('is_profile', False), 'must_support': info.get('ms_flag', False)}) # Ensure 'must_support' key uses 'ms_flag' - if info['ms_paths']: final_ms_elements[display_name] = sorted(list(info['ms_paths'])) - if info['examples']: final_examples[display_name] = sorted(list(info['examples'])) - else: logger.warning(f"Skipping formatting for key: {key}") + final_list.append({ + 'name': display_name, + 'type': base_type, + 'is_profile': info.get('is_profile', False), + 'must_support': info.get('ms_flag', False) + }) + if info['ms_paths']: + final_ms_elements[display_name] = sorted(list(info['ms_paths'])) + if info['examples']: + final_examples[display_name] = sorted(list(info['examples'])) + else: + logger.warning(f"Skipping formatting for key: {key}") results['resource_types_info'] = sorted(final_list, key=lambda x: (not x.get('is_profile', False), x.get('name', ''))) results['must_support_elements'] = final_ms_elements results['examples'] = final_examples - # --- End formatting moved inside --- except Exception as e: - err_msg = f"Error processing package file {tgz_path}: {e}"; logger.error(err_msg, exc_info=True); results['errors'].append(err_msg) + err_msg = f"Error processing package file {tgz_path}: {e}" + logger.error(err_msg, exc_info=True) + results['errors'].append(err_msg) # Logging counts - final_types_count = len(results['resource_types_info']); ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']); total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()); total_examples = sum(len(v) for v in results['examples'].values()) - logger.info(f"V6.2 Extraction: {final_types_count} items ({ms_count} MS; {total_ms_paths} MS paths; {total_examples} examples) from {os.path.basename(tgz_path)}") + final_types_count = len(results['resource_types_info']) + ms_count = sum(1 for r in results['resource_types_info'] if r['must_support']) + total_ms_paths = sum(len(v) for v in results['must_support_elements'].values()) + total_examples = sum(len(v) for v in results['examples'].values()) + logger.info(f"Extraction: {final_types_count} items ({ms_count} MS; {total_ms_paths} MS paths; {total_examples} examples) from {os.path.basename(tgz_path)}") return results \ No newline at end of file diff --git a/templates/cp_view_processed_ig.html b/templates/cp_view_processed_ig.html index b04a38a..8a9dcc3 100644 --- a/templates/cp_view_processed_ig.html +++ b/templates/cp_view_processed_ig.html @@ -37,6 +37,35 @@ + {% if config.DISPLAY_PROFILE_RELATIONSHIPS %} +
+
Profile Relationships
+
+
Complies With
+ {% if complies_with_profiles %} +
    + {% for profile in complies_with_profiles %} +
  • {{ profile }}
  • + {% endfor %} +
+ {% else %} +

No profiles declared as compatible.

+ {% endif %} + +
Required Dependent Profiles (Must Also Validate Against)
+ {% if imposed_profiles %} +
    + {% for profile in imposed_profiles %} +
  • {{ profile }}
  • + {% endfor %} +
+ {% else %} +

No imposed profiles.

+ {% endif %} +
+
+ {% endif %} +
Resource Types Found / Defined