Significant update

Changed and redefined all slicing and added tabbed models.

TBD - terminology bindings and constrains.
This commit is contained in:
Joshua Hare 2025-04-21 23:32:28 +10:00
parent a28ecec025
commit ec75498b29
7 changed files with 60075 additions and 759 deletions

106
app.py
View File

@ -385,95 +385,113 @@ def get_structure():
package_name = request.args.get('package_name')
package_version = request.args.get('package_version')
resource_type = request.args.get('resource_type')
# Keep view parameter for potential future use or caching, though not used directly in this revised logic
view = request.args.get('view', 'snapshot') # Default to snapshot view processing
if not all([package_name, package_version, resource_type]):
logger.warning("get_structure: Missing query parameters: package_name=%s, package_version=%s, resource_type=%s", package_name, package_version, resource_type)
return jsonify({"error": "Missing required query parameters: package_name, package_version, resource_type"}), 400
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
if not packages_dir:
logger.error("FHIR_PACKAGES_DIR not configured.")
return jsonify({"error": "Server configuration error: Package directory not set."}), 500
tgz_filename = services.construct_tgz_filename(package_name, package_version)
tgz_path = os.path.join(packages_dir, tgz_filename)
sd_data = None
fallback_used = False
source_package_id = f"{package_name}#{package_version}"
logger.debug(f"Attempting to find SD for '{resource_type}' in {tgz_filename}")
# --- Fetch SD Data (Keep existing logic including fallback) ---
if os.path.exists(tgz_path):
try:
sd_data, _ = services.find_and_extract_sd(tgz_path, resource_type)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error for SD '{resource_type}' in {tgz_path}: {e}")
return jsonify({"error": f"Invalid JSON in StructureDefinition: {str(e)}"}), 500
except tarfile.TarError as e:
logger.error(f"TarError extracting SD '{resource_type}' from {tgz_path}: {e}")
return jsonify({"error": f"Error reading package archive: {str(e)}"}), 500
# Add error handling as before...
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from {tgz_path}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error reading StructureDefinition: {str(e)}"}), 500
else:
logger.warning(f"Package file not found: {tgz_path}")
logger.warning(f"Package file not found: {tgz_path}")
# Try fallback... (keep existing fallback logic)
if sd_data is None:
logger.info(f"SD for '{resource_type}' not found in {source_package_id}. Attempting fallback to {services.CANONICAL_PACKAGE_ID}.")
core_package_name, core_package_version = services.CANONICAL_PACKAGE
core_tgz_filename = services.construct_tgz_filename(core_package_name, core_package_version)
core_tgz_path = os.path.join(packages_dir, core_tgz_filename)
if not os.path.exists(core_tgz_path):
logger.warning(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally, attempting download.")
try:
result = services.import_package_and_dependencies(core_package_name, core_package_version, dependency_mode='direct')
if result['errors'] and not result['downloaded']:
logger.error(f"Failed to download fallback core package {services.CANONICAL_PACKAGE_ID}: {result['errors'][0]}")
return jsonify({"error": f"SD for '{resource_type}' not found in primary package, and failed to download core package: {result['errors'][0]}"}), 500
elif not os.path.exists(core_tgz_path):
logger.error(f"Core package download reported success but file {core_tgz_filename} still not found.")
return jsonify({"error": f"SD for '{resource_type}' not found, and core package download failed unexpectedly."}), 500
except Exception as e:
logger.error(f"Error downloading core package {services.CANONICAL_PACKAGE_ID}: {str(e)}", exc_info=True)
return jsonify({"error": f"SD for '{resource_type}' not found, and error downloading core package: {str(e)}"}), 500
# Handle missing core package / download if needed...
logger.error(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally.")
return jsonify({"error": f"SD for '{resource_type}' not found in primary package, and core package is missing."}), 500
# (Add download logic here if desired)
try:
sd_data, _ = services.find_and_extract_sd(core_tgz_path, resource_type)
if sd_data is not None:
fallback_used = True
source_package_id = services.CANONICAL_PACKAGE_ID
logger.info(f"Found SD for '{resource_type}' in fallback package {source_package_id}.")
else:
logger.error(f"SD for '{resource_type}' not found in primary package OR fallback {services.CANONICAL_PACKAGE_ID}.")
return jsonify({"error": f"StructureDefinition for '{resource_type}' not found in {package_name}#{package_version} or in core FHIR package."}), 404
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error for SD '{resource_type}' in fallback {core_tgz_path}: {e}")
return jsonify({"error": f"Invalid JSON in fallback StructureDefinition: {str(e)}"}), 500
except tarfile.TarError as e:
logger.error(f"TarError extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}")
return jsonify({"error": f"Error reading fallback package archive: {str(e)}"}), 500
# Add error handling as before...
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error reading fallback StructureDefinition: {str(e)}"}), 500
# Remove narrative text element (ensure applied after fallback)
if sd_data and 'text' in sd_data:
logger.debug(f"Removing narrative text from SD for '{resource_type}'")
del sd_data['text']
logger.error(f"Unexpected error extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error reading fallback StructureDefinition: {str(e)}"}), 500
# --- Check if SD data was found ---
if not sd_data:
logger.error(f"SD for '{resource_type}' not found in primary or fallback package.")
return jsonify({"error": f"StructureDefinition for '{resource_type}' not found."}), 404
elements = sd_data.get('snapshot', {}).get('element', [])
if not elements and 'differential' in sd_data:
logger.debug(f"Using differential elements for {resource_type} as snapshot is missing.")
elements = sd_data.get('differential', {}).get('element', [])
if not elements:
logger.warning(f"No snapshot or differential elements found in the SD for '{resource_type}' from {source_package_id}")
# --- *** START Backend Modification *** ---
# Extract snapshot and differential elements
snapshot_elements = sd_data.get('snapshot', {}).get('element', [])
differential_elements = sd_data.get('differential', {}).get('element', [])
# Create a set of element IDs from the differential for efficient lookup
# Using element 'id' is generally more reliable than 'path' for matching
differential_ids = {el.get('id') for el in differential_elements if el.get('id')}
logger.debug(f"Found {len(differential_ids)} unique IDs in differential.")
enriched_elements = []
if snapshot_elements:
logger.debug(f"Processing {len(snapshot_elements)} snapshot elements to add isInDifferential flag.")
for element in snapshot_elements:
element_id = element.get('id')
# Add the isInDifferential flag
element['isInDifferential'] = bool(element_id and element_id in differential_ids)
enriched_elements.append(element)
# Clean narrative from enriched elements (should have been done in find_and_extract_sd, but double check)
enriched_elements = [services.remove_narrative(el) for el in enriched_elements]
else:
# Fallback: If no snapshot, log warning. Maybe return differential only?
# Returning only differential might break frontend filtering logic.
# For now, return empty, but log clearly.
logger.warning(f"No snapshot found for {resource_type} in {source_package_id}. Returning empty element list.")
enriched_elements = [] # Or consider returning differential and handle in JS
# --- *** END Backend Modification *** ---
# Retrieve must_support_paths from DB (keep existing logic)
must_support_paths = []
processed_ig = ProcessedIg.query.filter_by(package_name=package_name, version=package_version).first()
if processed_ig and processed_ig.must_support_elements:
# Use the profile ID (which is likely the resource_type for profiles) as the key
must_support_paths = processed_ig.must_support_elements.get(resource_type, [])
logger.debug(f"Retrieved {len(must_support_paths)} Must Support paths for '{resource_type}' from processed IG {package_name}#{package_version}")
# Serialize with indent=4 and sort_keys=False for consistent formatting
logger.debug(f"Retrieved {len(must_support_paths)} Must Support paths for '{resource_type}' from processed IG DB record.")
else:
logger.debug(f"No processed IG record or no must_support_elements found in DB for {package_name}#{package_version}, resource {resource_type}")
# Construct the response
response_data = {
'structure_definition': sd_data,
'elements': enriched_elements, # Return the processed list
'must_support_paths': must_support_paths,
'fallback_used': fallback_used,
'source_package': source_package_id
# Removed raw structure_definition to reduce payload size, unless needed elsewhere
}
return Response(json.dumps(response_data, indent=4, sort_keys=False), mimetype='application/json')
# Use Response object for consistent JSON formatting
return Response(json.dumps(response_data, indent=2), mimetype='application/json') # Use indent=2 for readability if debugging
@app.route('/get-example')
def get_example():

Binary file not shown.

View File

@ -1,6 +1,6 @@
#FileLock
#Sat Apr 19 23:58:47 UTC 2025
server=172.18.0.2\:33791
hostName=edfd6b88589d
#Mon Apr 21 13:28:52 UTC 2025
server=172.18.0.2\:37033
hostName=d3691f0dbfcf
method=file
id=196507d8451106de1bd0820c05fb574d4e048158077
id=196588987143154de87600de82a07a5280026b49718

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,3 @@
# services.py
import requests
import os
import tarfile
@ -6,6 +5,7 @@ import json
import re
import logging
import shutil
import sqlite3
from flask import current_app, Blueprint, request, jsonify
from fhirpathpy import evaluate
from collections import defaultdict
@ -62,6 +62,7 @@ FHIR_R4_BASE_TYPES = {
"SubstanceSourceMaterial", "SubstanceSpecification", "SupplyDelivery", "SupplyRequest", "Task",
"TerminologyCapabilities", "TestReport", "TestScript", "ValueSet", "VerificationResult", "VisionPrescription"
}
# --- Helper Functions ---
def _get_download_dir():
@ -140,6 +141,62 @@ def parse_package_filename(filename):
version = ""
return name, version
def remove_narrative(resource):
"""Remove narrative text element from a FHIR resource."""
if isinstance(resource, dict):
if 'text' in resource:
logger.debug(f"Removing narrative text from resource: {resource.get('resourceType', 'unknown')}")
del resource['text']
if resource.get('resourceType') == 'Bundle' and 'entry' in resource:
resource['entry'] = [dict(entry, resource=remove_narrative(entry.get('resource'))) if entry.get('resource') else entry for entry in resource['entry']]
return resource
def get_cached_structure(package_name, package_version, resource_type, view):
"""Retrieve cached StructureDefinition from SQLite."""
try:
conn = sqlite3.connect(os.path.join(current_app.instance_path, 'fhir_ig.db'))
cursor = conn.cursor()
cursor.execute("""
SELECT structure_data FROM structure_cache
WHERE package_name = ? AND package_version = ? AND resource_type = ? AND view = ?
""", (package_name, package_version, resource_type, view))
result = cursor.fetchone()
conn.close()
if result:
logger.debug(f"Cache hit for {package_name}#{package_version}:{resource_type}:{view}")
return json.loads(result[0])
logger.debug(f"No cache entry for {package_name}#{package_version}:{resource_type}:{view}")
return None
except Exception as e:
logger.error(f"Error accessing structure cache: {e}", exc_info=True)
return None
def cache_structure(package_name, package_version, resource_type, view, structure_data):
"""Cache StructureDefinition in SQLite."""
try:
conn = sqlite3.connect(os.path.join(current_app.instance_path, 'fhir_ig.db'))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS structure_cache (
package_name TEXT,
package_version TEXT,
resource_type TEXT,
view TEXT,
structure_data TEXT,
PRIMARY KEY (package_name, package_version, resource_type, view)
)
""")
cursor.execute("""
INSERT OR REPLACE INTO structure_cache
(package_name, package_version, resource_type, view, structure_data)
VALUES (?, ?, ?, ?, ?)
""", (package_name, package_version, resource_type, view, json.dumps(structure_data)))
conn.commit()
conn.close()
logger.debug(f"Cached structure for {package_name}#{package_version}:{resource_type}:{view}")
except Exception as e:
logger.error(f"Error caching structure: {e}", exc_info=True)
def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
"""Helper to find and extract StructureDefinition json from a tgz path, prioritizing profile match."""
sd_data = None
@ -150,45 +207,84 @@ def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
try:
with tarfile.open(tgz_path, "r:gz") as tar:
logger.debug(f"Searching for SD matching '{resource_identifier}' with profile '{profile_url}' in {os.path.basename(tgz_path)}")
# Store potential matches to evaluate the best one at the end
potential_matches = [] # Store tuples of (precision_score, data, member_name)
for member in tar:
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
continue
# Skip common metadata files
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
continue
fileobj = None
try:
fileobj = tar.extractfile(member)
if fileobj:
content_bytes = fileobj.read()
# Handle potential BOM (Byte Order Mark)
content_string = content_bytes.decode('utf-8-sig')
data = json.loads(content_string)
if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition':
sd_id = data.get('id')
sd_name = data.get('name')
sd_type = data.get('type')
sd_url = data.get('url')
# Log SD details for debugging
logger.debug(f"Found SD: id={sd_id}, name={sd_name}, type={sd_type}, url={sd_url}, path={member.name}")
# Prioritize match with profile_url if provided
sd_filename_base = os.path.splitext(os.path.basename(member.name))[0]
sd_filename_lower = sd_filename_base.lower()
resource_identifier_lower = resource_identifier.lower() if resource_identifier else None
# logger.debug(f"Checking SD: id={sd_id}, name={sd_name}, type={sd_type}, url={sd_url}, file={sd_filename_lower} against identifier='{resource_identifier}'")
match_score = 0 # Higher score means more precise match
# Highest precision: Exact match on profile_url
if profile_url and sd_url == profile_url:
sd_data = data
match_score = 5
logger.debug(f"Exact match found based on profile_url: {profile_url}")
# If we find the exact profile URL, this is the best possible match.
sd_data = remove_narrative(data)
found_path = member.name
logger.info(f"Found SD matching profile '{profile_url}' at path: {found_path}")
break
# Broader matching for resource_identifier
elif resource_identifier and (
(sd_id and resource_identifier.lower() == sd_id.lower()) or
(sd_name and resource_identifier.lower() == sd_name.lower()) or
(sd_type and resource_identifier.lower() == sd_type.lower()) or
# Add fallback for partial filename match
(resource_identifier.lower() in os.path.splitext(os.path.basename(member.name))[0].lower()) or
# Handle AU Core naming conventions
(sd_url and resource_identifier.lower() in sd_url.lower())
):
sd_data = data
found_path = member.name
logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}")
# Continue searching for a profile match
logger.info(f"Found definitive SD matching profile '{profile_url}' at path: {found_path}. Stopping search.")
break # Stop searching immediately
# Next highest precision: Exact match on id or name
elif resource_identifier_lower:
if sd_id and resource_identifier_lower == sd_id.lower():
match_score = 4
logger.debug(f"Match found based on exact sd_id: {sd_id}")
elif sd_name and resource_identifier_lower == sd_name.lower():
match_score = 4
logger.debug(f"Match found based on exact sd_name: {sd_name}")
# Next: Match filename pattern "StructureDefinition-{identifier}.json"
elif sd_filename_lower == f"structuredefinition-{resource_identifier_lower}":
match_score = 3
logger.debug(f"Match found based on exact filename pattern: {member.name}")
# Next: Match on type ONLY if the identifier looks like a base type (no hyphens/dots)
elif sd_type and resource_identifier_lower == sd_type.lower() and not re.search(r'[-.]', resource_identifier):
match_score = 2
logger.debug(f"Match found based on sd_type (simple identifier): {sd_type}")
# Lower precision: Check if identifier is IN the filename
elif resource_identifier_lower in sd_filename_lower:
match_score = 1
logger.debug(f"Potential match based on identifier in filename: {member.name}")
# Lowest precision: Check if identifier is IN the URL
elif sd_url and resource_identifier_lower in sd_url.lower():
match_score = 1
logger.debug(f"Potential match based on identifier in url: {sd_url}")
if match_score > 0:
potential_matches.append((match_score, remove_narrative(data), member.name))
# If it's a very high precision match, we can potentially break early
if match_score >= 3: # Exact ID, Name, or Filename pattern
logger.info(f"Found high-confidence match for '{resource_identifier}' ({member.name}), stopping search.")
# Set sd_data here and break
sd_data = remove_narrative(data)
found_path = member.name
break
except json.JSONDecodeError as e:
logger.debug(f"Could not parse JSON in {member.name}, skipping: {e}")
except UnicodeDecodeError as e:
@ -200,20 +296,32 @@ def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
finally:
if fileobj:
fileobj.close()
# If the loop finished without finding an exact profile_url or high-confidence match (score >= 3)
if not sd_data and potential_matches:
# Sort potential matches by score (highest first)
potential_matches.sort(key=lambda x: x[0], reverse=True)
best_match = potential_matches[0]
sd_data = best_match[1]
found_path = best_match[2]
logger.info(f"Selected best match for '{resource_identifier}' from potential matches (Score: {best_match[0]}): {found_path}")
if sd_data is None:
logger.info(f"SD matching identifier '{resource_identifier}' or profile '{profile_url}' not found within archive {os.path.basename(tgz_path)}")
except tarfile.ReadError as e:
logger.error(f"Tar ReadError reading {tgz_path}: {e}")
return None, None
except tarfile.TarError as e:
logger.error(f"TarError reading {tgz_path} in find_and_extract_sd: {e}")
raise
raise # Re-raise critical tar errors
except FileNotFoundError:
logger.error(f"FileNotFoundError reading {tgz_path} in find_and_extract_sd.")
raise
raise # Re-raise critical file errors
except Exception as e:
logger.error(f"Unexpected error in find_and_extract_sd for {tgz_path}: {e}", exc_info=True)
raise
raise # Re-raise unexpected errors
return sd_data, found_path
# --- Metadata Saving/Loading ---
@ -269,7 +377,6 @@ def get_package_metadata(name, version):
else:
logger.debug(f"Metadata file not found: {metadata_path}")
return None
# --- Package Processing ---
def process_package_file(tgz_path):
"""Extracts types, profile status, MS elements, examples, and profile relationships from a downloaded .tgz package."""
@ -278,6 +385,7 @@ def process_package_file(tgz_path):
return {'errors': [f"Package file not found: {tgz_path}"], 'resource_types_info': []}
pkg_basename = os.path.basename(tgz_path)
name, version = parse_package_filename(pkg_basename)
logger.info(f"Processing package file details: {pkg_basename}")
results = {
@ -326,9 +434,8 @@ def process_package_file(tgz_path):
if not isinstance(data, dict) or data.get('resourceType') != 'StructureDefinition':
continue
# Remove narrative text element from StructureDefinition
if 'text' in data:
del data['text']
# Remove narrative text element
data = remove_narrative(data)
profile_id = data.get('id') or data.get('name')
sd_type = data.get('type')
@ -352,6 +459,34 @@ def process_package_file(tgz_path):
entry['sd_processed'] = True
referenced_types.add(sd_type)
# Cache StructureDefinition for all views
views = ['differential', 'snapshot', 'must-support', 'key-elements']
for view in views:
view_elements = data.get('differential', {}).get('element', []) if view == 'differential' else data.get('snapshot', {}).get('element', [])
if view == 'must-support':
view_elements = [e for e in view_elements if e.get('mustSupport')]
elif view == 'key-elements':
key_elements = set()
differential_elements = data.get('differential', {}).get('element', [])
for e in view_elements:
is_in_differential = any(de['id'] == e['id'] for de in differential_elements)
if e.get('mustSupport') or is_in_differential or e.get('min', 0) > 0 or e.get('max') != '*' or e.get('slicing') or e.get('constraint'):
key_elements.add(e['id'])
parent_path = e['path']
while '.' in parent_path:
parent_path = parent_path.rsplit('.', 1)[0]
parent_element = next((el for el in view_elements if el['path'] == parent_path), None)
if parent_element:
key_elements.add(parent_element['id'])
view_elements = [e for e in view_elements if e['id'] in key_elements]
cache_data = {
'structure_definition': data,
'must_support_paths': [],
'fallback_used': False,
'source_package': f"{name}#{version}"
}
cache_structure(name, version, sd_type, view, cache_data)
complies_with = []
imposed = []
for ext in data.get('extension', []):
@ -382,7 +517,7 @@ def process_package_file(tgz_path):
if must_support is True:
if element_id and element_path:
ms_path = element_id if slice_name else element_path
ms_path = f"{element_path}[sliceName='{slice_name}']" if slice_name else element_id
ms_paths_in_this_sd.add(ms_path)
has_ms_in_this_sd = True
logger.info(f"Found MS element in {entry_key}: path={element_path}, id={element_id}, sliceName={slice_name}, ms_path={ms_path}")
@ -442,9 +577,8 @@ def process_package_file(tgz_path):
resource_type = data.get('resourceType')
if not resource_type: continue
# Remove narrative text element from example
if 'text' in data:
del data['text']
# Remove narrative text element
data = remove_narrative(data)
profile_meta = data.get('meta', {}).get('profile', [])
found_profile_match = False
@ -583,7 +717,8 @@ def process_package_file(tgz_path):
return results
# --- Validation Functions ---------------------------------------------------------------------------------------------------------------FHIRPATH CHANGES STARTED
# --- Validation Functions ---
def _legacy_navigate_fhir_path(resource, path, extension_url=None):
"""Navigates a FHIR resource using a FHIRPath-like expression, handling nested structures."""
logger.debug(f"Navigating FHIR path: {path}")
@ -659,7 +794,6 @@ def _legacy_navigate_fhir_path(resource, path, extension_url=None):
result = current if (current is not None and (not isinstance(current, list) or current)) else None
logger.debug(f"Path {path} resolved to: {result}")
return result
#-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
def navigate_fhir_path(resource, path, extension_url=None):
"""Navigates a FHIR resource using FHIRPath expressions."""
@ -678,7 +812,6 @@ def navigate_fhir_path(resource, path, extension_url=None):
# Fallback to legacy navigation for compatibility
return _legacy_navigate_fhir_path(resource, path, extension_url)
##------------------------------------------------------------------------------------------------------------------------------------and fhirpath here
def _legacy_validate_resource_against_profile(package_name, version, resource, include_dependencies=True):
"""Validates a FHIR resource against a StructureDefinition in the specified package."""
logger.debug(f"Validating resource {resource.get('resourceType')} against {package_name}#{version}, include_dependencies={include_dependencies}")
@ -783,7 +916,7 @@ def _legacy_validate_resource_against_profile(package_name, version, resource, i
definition = element.get('definition', 'No definition provided in StructureDefinition.')
# Check required elements
if min_val > 0 and not '.' in path[1 + path.find('.'):]:
if min_val > 0 and not '.' in path[1 + path.find('.'):] if path.find('.') != -1 else True:
value = navigate_fhir_path(resource, path)
if value is None or (isinstance(value, list) and not any(value)):
error_msg = f"{resource.get('resourceType')}/{resource.get('id', 'unknown')}: Required element {path} missing"
@ -796,7 +929,7 @@ def _legacy_validate_resource_against_profile(package_name, version, resource, i
logger.info(f"Validation error: Required element {path} missing")
# Check must-support elements
if must_support and not '.' in path[1 + path.find('.'):]:
if must_support and not '.' in path[1 + path.find('.'):] if path.find('.') != -1 else True:
if '[x]' in path:
base_path = path.replace('[x]', '')
found = False
@ -859,7 +992,6 @@ def _legacy_validate_resource_against_profile(package_name, version, resource, i
}
logger.debug(f"Validation result: valid={result['valid']}, errors={len(result['errors'])}, warnings={len(result['warnings'])}")
return result
##--------------------------------------------------------------------------------------------------------------------------------------------------------
def validate_resource_against_profile(package_name, version, resource, include_dependencies=True):
result = {
@ -1123,7 +1255,7 @@ def get_structure_definition(package_name, version, resource_type):
element_id = element.get('id', '')
slice_name = element.get('sliceName')
if element.get('mustSupport', False):
ms_path = element_id if slice_name else path
ms_path = f"{path}[sliceName='{slice_name}']" if slice_name else element_id
must_support_paths.append(ms_path)
if 'slicing' in element:
slice_info = {
@ -1557,61 +1689,7 @@ def validate_sample():
'warnings': [],
'results': {}
}), 500
# --- Standalone Test ---
if __name__ == '__main__':
logger.info("Running services.py directly for testing.")
class MockFlask:
class Config(dict):
pass
config = Config()
instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance'))
mock_app = MockFlask()
test_download_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance', DOWNLOAD_DIR_NAME))
mock_app.config['FHIR_PACKAGES_DIR'] = test_download_dir
os.makedirs(test_download_dir, exist_ok=True)
logger.info(f"Using test download directory: {test_download_dir}")
print("\n--- Testing Filename Parsing ---")
test_files = [
"hl7.fhir.r4.core-4.0.1.tgz",
"hl7.fhir.us.core-6.1.0.tgz",
"fhir.myig.patient-1.2.3-beta.tgz",
"my.company.fhir.Terminologies-0.1.0.tgz",
"package-with-hyphens-in-name-1.0.tgz",
"noversion.tgz",
"badformat-1.0",
"hl7.fhir.au.core-1.1.0-preview.tgz",
]
for tf in test_files:
p_name, p_ver = parse_package_filename(tf)
print(f"'{tf}' -> Name: '{p_name}', Version: '{p_ver}'")
pkg_name_to_test = "hl7.fhir.au.core"
pkg_version_to_test = "1.1.0-preview"
print(f"\n--- Testing Import: {pkg_name_to_test}#{pkg_version_to_test} ---")
import_results = import_package_and_dependencies(pkg_name_to_test, pkg_version_to_test, dependency_mode='recursive')
print("\nImport Results:")
print(f" Requested: {import_results['requested']}")
print(f" Downloaded Count: {len(import_results['downloaded'])}")
print(f" Unique Dependencies Found: {len(import_results['dependencies'])}")
print(f" Errors: {len(import_results['errors'])}")
for error in import_results['errors']:
print(f" - {error}")
if (pkg_name_to_test, pkg_version_to_test) in import_results['downloaded']:
test_tgz_path = import_results['downloaded'][(pkg_name_to_test, pkg_version_to_test)]
print(f"\n--- Testing Processing: {test_tgz_path} ---")
processing_results = process_package_file(test_tgz_path)
print("\nProcessing Results:")
print(f" Resource Types Info Count: {len(processing_results.get('resource_types_info', []))}")
print(f" Profiles with MS Elements: {sum(1 for r in processing_results.get('resource_types_info', []) if r.get('must_support'))}")
print(f" Optional Extensions w/ MS: {sum(1 for r in processing_results.get('resource_types_info', []) if r.get('optional_usage'))}")
print(f" Must Support Elements Dict Count: {len(processing_results.get('must_support_elements', {}))}")
print(f" Examples Dict Count: {len(processing_results.get('examples', {}))}")
print(f" Complies With Profiles: {processing_results.get('complies_with_profiles', [])}")
print(f" Imposed Profiles: {processing_results.get('imposed_profiles', [])}")
print(f" Processing Errors: {processing_results.get('errors', [])}")
else:
print(f"\n--- Skipping Processing Test (Import failed for {pkg_name_to_test}#{pkg_version_to_test}) ---")
# Add new functions for GoFSH integration
def run_gofsh(input_path, output_dir, output_style, log_level, fhir_version=None, fishing_trip=False, dependencies=None, indent_rules=False, meta_profile='only-one', alias_file=None, no_alias=False):
"""Run GoFSH with advanced options and return FSH output and optional comparison report."""
# Use a temporary output directory for initial GoFSH run
@ -1853,4 +1931,59 @@ def process_fhir_input(input_mode, fhir_file, fhir_text, alias_file=None):
return input_file, temp_dir, alias_path, None
except Exception as e:
logger.error(f"Error processing input: {str(e)}", exc_info=True)
return None, None, None, f"Error processing input: {str(e)}"
return None, None, None, f"Error processing input: {str(e)}"
# --- Standalone Test ---
if __name__ == '__main__':
logger.info("Running services.py directly for testing.")
class MockFlask:
class Config(dict):
pass
config = Config()
instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance'))
mock_app = MockFlask()
test_download_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance', DOWNLOAD_DIR_NAME))
mock_app.config['FHIR_PACKAGES_DIR'] = test_download_dir
os.makedirs(test_download_dir, exist_ok=True)
logger.info(f"Using test download directory: {test_download_dir}")
print("\n--- Testing Filename Parsing ---")
test_files = [
"hl7.fhir.r4.core-4.0.1.tgz",
"hl7.fhir.us.core-6.1.0.tgz",
"fhir.myig.patient-1.2.3-beta.tgz",
"my.company.fhir.Terminologies-0.1.0.tgz",
"package-with-hyphens-in-name-1.0.tgz",
"noversion.tgz",
"badformat-1.0",
"hl7.fhir.au.core-1.1.0-preview.tgz",
]
for tf in test_files:
p_name, p_ver = parse_package_filename(tf)
print(f"'{tf}' -> Name: '{p_name}', Version: '{p_ver}'")
pkg_name_to_test = "hl7.fhir.au.core"
pkg_version_to_test = "1.1.0-preview"
print(f"\n--- Testing Import: {pkg_name_to_test}#{pkg_version_to_test} ---")
import_results = import_package_and_dependencies(pkg_name_to_test, pkg_version_to_test, dependency_mode='recursive')
print("\nImport Results:")
print(f" Requested: {import_results['requested']}")
print(f" Downloaded Count: {len(import_results['downloaded'])}")
print(f" Unique Dependencies Found: {len(import_results['dependencies'])}")
print(f" Errors: {len(import_results['errors'])}")
for error in import_results['errors']:
print(f" - {error}")
if (pkg_name_to_test, pkg_version_to_test) in import_results['downloaded']:
test_tgz_path = import_results['downloaded'][(pkg_name_to_test, pkg_version_to_test)]
print(f"\n--- Testing Processing: {test_tgz_path} ---")
processing_results = process_package_file(test_tgz_path)
print("\nProcessing Results:")
print(f" Resource Types Info Count: {len(processing_results.get('resource_types_info', []))}")
print(f" Profiles with MS Elements: {sum(1 for r in processing_results.get('resource_types_info', []) if r.get('must_support'))}")
print(f" Optional Extensions w/ MS: {sum(1 for r in processing_results.get('resource_types_info', []) if r.get('optional_usage'))}")
print(f" Must Support Elements Dict Count: {len(processing_results.get('must_support_elements', {}))}")
print(f" Examples Dict Count: {len(processing_results.get('examples', {}))}")
print(f" Complies With Profiles: {processing_results.get('complies_with_profiles', [])}")
print(f" Imposed Profiles: {processing_results.get('imposed_profiles', [])}")
print(f" Processing Errors: {processing_results.get('errors', [])}")
else:
print(f"\n--- Skipping Processing Test (Import failed for {pkg_name_to_test}#{pkg_version_to_test}) ---")

File diff suppressed because it is too large Load Diff