mirror of
https://github.com/Sudo-JHare/FHIRFLARE-IG-Toolkit.git
synced 2025-06-15 13:09:59 +00:00
Slight regression in validation - alpha package, so no concern. slicing is greatly improved with colors and highlighting and nodes being more accurate and nesting.
1085 lines
57 KiB
Python
1085 lines
57 KiB
Python
import requests
|
|
import os
|
|
import tarfile
|
|
import json
|
|
import re
|
|
import logging
|
|
from flask import current_app
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
import datetime
|
|
|
|
# Configure logging
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
else:
|
|
pass
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Constants ---
|
|
FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org"
|
|
DOWNLOAD_DIR_NAME = "fhir_packages"
|
|
CANONICAL_PACKAGE = ("hl7.fhir.r4.core", "4.0.1")
|
|
CANONICAL_PACKAGE_ID = f"{CANONICAL_PACKAGE[0]}#{CANONICAL_PACKAGE[1]}"
|
|
|
|
# Define standard FHIR R4 base types
|
|
FHIR_R4_BASE_TYPES = {
|
|
"Account", "ActivityDefinition", "AdministrableProductDefinition", "AdverseEvent", "AllergyIntolerance",
|
|
"Appointment", "AppointmentResponse", "AuditEvent", "Basic", "Binary", "BiologicallyDerivedProduct",
|
|
"BodyStructure", "Bundle", "CapabilityStatement", "CarePlan", "CareTeam", "CatalogEntry", "ChargeItem",
|
|
"ChargeItemDefinition", "Claim", "ClaimResponse", "ClinicalImpression", "CodeSystem", "Communication",
|
|
"CommunicationRequest", "CompartmentDefinition", "Composition", "ConceptMap", "Condition", "Consent",
|
|
"Contract", "Coverage", "CoverageEligibilityRequest", "CoverageEligibilityResponse", "DetectedIssue",
|
|
"Device", "DeviceDefinition", "DeviceMetric", "DeviceRequest", "DeviceUseStatement", "DiagnosticReport",
|
|
"DocumentManifest", "DocumentReference", "DomainResource", "EffectEvidenceSynthesis", "Encounter",
|
|
"Endpoint", "EnrollmentRequest", "EnrollmentResponse", "EpisodeOfCare", "EventDefinition", "Evidence",
|
|
"EvidenceVariable", "ExampleScenario", "ExplanationOfBenefit", "FamilyMemberHistory", "Flag", "Goal",
|
|
"GraphDefinition", "Group", "GuidanceResponse", "HealthcareService", "ImagingStudy", "Immunization",
|
|
"ImmunizationEvaluation", "ImmunizationRecommendation", "ImplementationGuide", "InsurancePlan",
|
|
"Invoice", "Library", "Linkage", "List", "Location", "Measure", "MeasureReport", "Media", "Medication",
|
|
"MedicationAdministration", "MedicationDispense", "MedicationKnowledge", "MedicationRequest",
|
|
"MedicationStatement", "MedicinalProduct", "MedicinalProductAuthorization", "MedicinalProductContraindication",
|
|
"MedicinalProductIndication", "MedicinalProductIngredient", "MedicinalProductInteraction",
|
|
"MedicinalProductManufactured", "MedicinalProductPackaged", "MedicinalProductPharmaceutical",
|
|
"MedicinalProductUndesirableEffect", "MessageDefinition", "MessageHeader", "MolecularSequence",
|
|
"NamingSystem", "NutritionOrder", "Observation", "ObservationDefinition", "OperationDefinition",
|
|
"OperationOutcome", "Organization", "OrganizationAffiliation", "Patient", "PaymentNotice",
|
|
"PaymentReconciliation", "Person", "PlanDefinition", "Practitioner", "PractitionerRole", "Procedure",
|
|
"Provenance", "Questionnaire", "QuestionnaireResponse", "RelatedPerson", "RequestGroup", "ResearchDefinition",
|
|
"ResearchElementDefinition", "ResearchStudy", "ResearchSubject", "Resource", "RiskAssessment",
|
|
"RiskEvidenceSynthesis", "Schedule", "SearchParameter", "ServiceRequest", "Slot", "Specimen",
|
|
"SpecimenDefinition", "StructureDefinition", "StructureMap", "Subscription", "Substance",
|
|
"SubstanceNucleicAcid", "SubstancePolymer", "SubstanceProtein", "SubstanceReferenceInformation",
|
|
"SubstanceSourceMaterial", "SubstanceSpecification", "SupplyDelivery", "SupplyRequest", "Task",
|
|
"TerminologyCapabilities", "TestReport", "TestScript", "ValueSet", "VerificationResult", "VisionPrescription"
|
|
}
|
|
|
|
# --- Helper Functions ---
|
|
|
|
def _get_download_dir():
|
|
"""Gets the absolute path to the configured FHIR package download directory."""
|
|
packages_dir = None
|
|
try:
|
|
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
|
|
if packages_dir:
|
|
logger.debug(f"Using FHIR_PACKAGES_DIR from current_app config: {packages_dir}")
|
|
else:
|
|
logger.warning("FHIR_PACKAGES_DIR not found in current_app config.")
|
|
instance_path = current_app.instance_path
|
|
packages_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME)
|
|
logger.warning(f"Falling back to default packages path: {packages_dir}")
|
|
except RuntimeError:
|
|
logger.warning("No app context found. Constructing default relative path for packages.")
|
|
script_dir = os.path.dirname(__file__)
|
|
instance_path_fallback = os.path.abspath(os.path.join(script_dir, '..', 'instance'))
|
|
packages_dir = os.path.join(instance_path_fallback, DOWNLOAD_DIR_NAME)
|
|
logger.debug(f"Constructed fallback packages path: {packages_dir}")
|
|
if not packages_dir:
|
|
logger.error("Fatal Error: Could not determine FHIR packages directory.")
|
|
return None
|
|
try:
|
|
os.makedirs(packages_dir, exist_ok=True)
|
|
return packages_dir
|
|
except OSError as e:
|
|
logger.error(f"Fatal Error creating packages directory {packages_dir}: {e}", exc_info=True)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error getting/creating packages directory {packages_dir}: {e}", exc_info=True)
|
|
return None
|
|
|
|
def sanitize_filename_part(text):
|
|
"""Basic sanitization for name/version parts of filename."""
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text)
|
|
safe_text = re.sub(r'_+', '_', safe_text)
|
|
safe_text = safe_text.strip('_-.')
|
|
return safe_text if safe_text else "invalid_name"
|
|
|
|
def construct_tgz_filename(name, version):
|
|
"""Constructs the standard FHIR package filename using sanitized parts."""
|
|
if not name or not version:
|
|
logger.error(f"Cannot construct filename with missing name ('{name}') or version ('{version}')")
|
|
return None
|
|
return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz"
|
|
|
|
def construct_metadata_filename(name, version):
|
|
"""Constructs the standard metadata filename."""
|
|
if not name or not version:
|
|
logger.error(f"Cannot construct metadata filename with missing name ('{name}') or version ('{version}')")
|
|
return None
|
|
return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.metadata.json"
|
|
|
|
def parse_package_filename(filename):
|
|
"""Parses a standard FHIR package filename into name and version."""
|
|
if not filename or not filename.endswith('.tgz'):
|
|
logger.debug(f"Filename '{filename}' does not end with .tgz")
|
|
return None, None
|
|
base_name = filename[:-4]
|
|
last_hyphen_index = base_name.rfind('-')
|
|
while last_hyphen_index != -1:
|
|
potential_name = base_name[:last_hyphen_index]
|
|
potential_version = base_name[last_hyphen_index + 1:]
|
|
if potential_version and (potential_version[0].isdigit() or any(potential_version.startswith(kw) for kw in ['v', 'dev', 'draft', 'preview', 'release', 'alpha', 'beta'])):
|
|
name = potential_name.replace('_', '.')
|
|
version = potential_version
|
|
logger.debug(f"Parsed '{filename}' -> name='{name}', version='{version}'")
|
|
return name, version
|
|
else:
|
|
last_hyphen_index = base_name.rfind('-', 0, last_hyphen_index)
|
|
logger.warning(f"Could not parse version from '{filename}'. Treating '{base_name}' as name.")
|
|
name = base_name.replace('_', '.')
|
|
version = ""
|
|
return name, version
|
|
|
|
def find_and_extract_sd(tgz_path, resource_identifier):
|
|
"""Helper to find and extract StructureDefinition json from a tgz path."""
|
|
sd_data = None
|
|
found_path = None
|
|
if not tgz_path or not os.path.exists(tgz_path):
|
|
logger.error(f"File not found in find_and_extract_sd: {tgz_path}")
|
|
return None, None
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
logger.debug(f"Searching for SD matching '{resource_identifier}' in {os.path.basename(tgz_path)}")
|
|
for member in tar:
|
|
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
|
|
continue
|
|
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
|
|
continue
|
|
fileobj = None
|
|
try:
|
|
fileobj = tar.extractfile(member)
|
|
if fileobj:
|
|
content_bytes = fileobj.read()
|
|
content_string = content_bytes.decode('utf-8-sig')
|
|
data = json.loads(content_string)
|
|
if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition':
|
|
sd_id = data.get('id')
|
|
sd_name = data.get('name')
|
|
sd_type = data.get('type')
|
|
sd_url = data.get('url')
|
|
if resource_identifier and (
|
|
(sd_id and resource_identifier.lower() == sd_id.lower()) or
|
|
(sd_name and resource_identifier.lower() == sd_name.lower()) or
|
|
(sd_type and resource_identifier.lower() == sd_type.lower()) or
|
|
(sd_url and resource_identifier.lower() == sd_url.lower()) or
|
|
(os.path.splitext(os.path.basename(member.name))[0].lower() == resource_identifier.lower())
|
|
):
|
|
sd_data = data
|
|
found_path = member.name
|
|
logger.info(f"Found matching SD for '{resource_identifier}' at path: {found_path}")
|
|
break
|
|
except json.JSONDecodeError as e:
|
|
logger.debug(f"Could not parse JSON in {member.name}, skipping: {e}")
|
|
except UnicodeDecodeError as e:
|
|
logger.warning(f"Could not decode UTF-8 in {member.name}, skipping: {e}")
|
|
except tarfile.TarError as e:
|
|
logger.warning(f"Tar error reading member {member.name}, skipping: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not read/parse potential SD {member.name}, skipping: {e}", exc_info=False)
|
|
finally:
|
|
if fileobj:
|
|
fileobj.close()
|
|
if sd_data is None:
|
|
logger.info(f"SD matching identifier '{resource_identifier}' not found within archive {os.path.basename(tgz_path)}")
|
|
except tarfile.ReadError as e:
|
|
logger.error(f"Tar ReadError reading {tgz_path}: {e}")
|
|
return None, None
|
|
except tarfile.TarError as e:
|
|
logger.error(f"TarError reading {tgz_path} in find_and_extract_sd: {e}")
|
|
raise
|
|
except FileNotFoundError:
|
|
logger.error(f"FileNotFoundError reading {tgz_path} in find_and_extract_sd.")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in find_and_extract_sd for {tgz_path}: {e}", exc_info=True)
|
|
raise
|
|
return sd_data, found_path
|
|
|
|
# --- Metadata Saving/Loading ---
|
|
def save_package_metadata(name, version, dependency_mode, dependencies, complies_with_profiles=None, imposed_profiles=None):
|
|
"""Saves dependency mode, imported dependencies, and profile relationships as metadata."""
|
|
download_dir = _get_download_dir()
|
|
if not download_dir:
|
|
logger.error("Could not get download directory for metadata saving.")
|
|
return False
|
|
metadata_filename = construct_metadata_filename(name, version)
|
|
if not metadata_filename: return False
|
|
metadata_path = os.path.join(download_dir, metadata_filename)
|
|
metadata = {
|
|
'package_name': name,
|
|
'version': version,
|
|
'dependency_mode': dependency_mode,
|
|
'imported_dependencies': dependencies or [],
|
|
'complies_with_profiles': complies_with_profiles or [],
|
|
'imposed_profiles': imposed_profiles or [],
|
|
'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat()
|
|
}
|
|
try:
|
|
with open(metadata_path, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
logger.info(f"Saved metadata for {name}#{version} at {metadata_path}")
|
|
return True
|
|
except IOError as e:
|
|
logger.error(f"Failed to write metadata file {metadata_path}: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error saving metadata for {name}#{version}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def get_package_metadata(name, version):
|
|
"""Retrieves the metadata for a given package."""
|
|
download_dir = _get_download_dir()
|
|
if not download_dir:
|
|
logger.error("Could not get download directory for metadata retrieval.")
|
|
return None
|
|
metadata_filename = construct_metadata_filename(name, version)
|
|
if not metadata_filename: return None
|
|
metadata_path = os.path.join(download_dir, metadata_filename)
|
|
if os.path.exists(metadata_path):
|
|
try:
|
|
with open(metadata_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (IOError, json.JSONDecodeError) as e:
|
|
logger.error(f"Failed to read or parse metadata file {metadata_path}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error reading metadata for {name}#{version}: {e}", exc_info=True)
|
|
return None
|
|
else:
|
|
logger.debug(f"Metadata file not found: {metadata_path}")
|
|
return None
|
|
|
|
# --- Package Processing ---
|
|
|
|
def process_package_file(tgz_path):
|
|
"""Extracts types, profile status, MS elements, examples, and profile relationships from a downloaded .tgz package."""
|
|
if not tgz_path or not os.path.exists(tgz_path):
|
|
logger.error(f"Package file not found for processing: {tgz_path}")
|
|
return {'errors': [f"Package file not found: {tgz_path}"], 'resource_types_info': []}
|
|
|
|
pkg_basename = os.path.basename(tgz_path)
|
|
logger.info(f"Processing package file details: {pkg_basename}")
|
|
|
|
results = {
|
|
'resource_types_info': [],
|
|
'must_support_elements': {},
|
|
'examples': {},
|
|
'complies_with_profiles': [],
|
|
'imposed_profiles': [],
|
|
'errors': []
|
|
}
|
|
resource_info = defaultdict(lambda: {
|
|
'name': None,
|
|
'type': None,
|
|
'is_profile': False,
|
|
'ms_flag': False,
|
|
'ms_paths': set(),
|
|
'examples': set(),
|
|
'sd_processed': False,
|
|
'optional_usage': False
|
|
})
|
|
referenced_types = set()
|
|
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
members = tar.getmembers()
|
|
logger.debug(f"Found {len(members)} members in {pkg_basename}")
|
|
|
|
# Pass 1: Process StructureDefinitions
|
|
logger.debug("Processing StructureDefinitions...")
|
|
sd_members = [m for m in members if m.isfile() and m.name.startswith('package/') and m.name.lower().endswith('.json')]
|
|
|
|
for member in sd_members:
|
|
base_filename_lower = os.path.basename(member.name).lower()
|
|
if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
|
|
continue
|
|
|
|
fileobj = None
|
|
try:
|
|
fileobj = tar.extractfile(member)
|
|
if not fileobj: continue
|
|
|
|
content_bytes = fileobj.read()
|
|
content_string = content_bytes.decode('utf-8-sig')
|
|
data = json.loads(content_string)
|
|
|
|
if not isinstance(data, dict) or data.get('resourceType') != 'StructureDefinition':
|
|
continue
|
|
|
|
profile_id = data.get('id') or data.get('name')
|
|
sd_type = data.get('type')
|
|
sd_base = data.get('baseDefinition')
|
|
is_profile_sd = bool(sd_base)
|
|
|
|
if not profile_id or not sd_type:
|
|
logger.warning(f"Skipping StructureDefinition in {member.name} due to missing ID ('{profile_id}') or Type ('{sd_type}').")
|
|
continue
|
|
|
|
entry_key = profile_id
|
|
entry = resource_info[entry_key]
|
|
|
|
if entry.get('sd_processed'): continue
|
|
|
|
logger.debug(f"Processing StructureDefinition: {entry_key} (type={sd_type}, is_profile={is_profile_sd})")
|
|
|
|
entry['name'] = entry_key
|
|
entry['type'] = sd_type
|
|
entry['is_profile'] = is_profile_sd
|
|
entry['sd_processed'] = True
|
|
referenced_types.add(sd_type)
|
|
|
|
complies_with = []
|
|
imposed = []
|
|
for ext in data.get('extension', []):
|
|
ext_url = ext.get('url')
|
|
value = ext.get('valueCanonical')
|
|
if value:
|
|
if ext_url == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-compliesWithProfile':
|
|
complies_with.append(value)
|
|
elif ext_url == 'http://hl7.org/fhir/StructureDefinition/structuredefinition-imposeProfile':
|
|
imposed.append(value)
|
|
results['complies_with_profiles'].extend(c for c in complies_with if c not in results['complies_with_profiles'])
|
|
results['imposed_profiles'].extend(i for i in imposed if i not in results['imposed_profiles'])
|
|
|
|
# Must Support and Optional Usage Logic
|
|
has_ms_in_this_sd = False
|
|
ms_paths_in_this_sd = set()
|
|
elements = data.get('snapshot', {}).get('element', []) or data.get('differential', {}).get('element', [])
|
|
|
|
for element in elements:
|
|
if not isinstance(element, dict):
|
|
continue
|
|
must_support = element.get('mustSupport')
|
|
element_id = element.get('id')
|
|
element_path = element.get('path')
|
|
slice_name = element.get('sliceName')
|
|
|
|
logger.debug(f"Checking element in {entry_key}: id={element_id}, path={element_path}, sliceName={slice_name}, mustSupport={must_support}")
|
|
|
|
if must_support is True:
|
|
if element_id and element_path:
|
|
ms_path = element_id if slice_name else element_path
|
|
ms_paths_in_this_sd.add(ms_path)
|
|
has_ms_in_this_sd = True
|
|
logger.info(f"Found MS element in {entry_key}: path={element_path}, id={element_id}, sliceName={slice_name}, ms_path={ms_path}")
|
|
else:
|
|
logger.warning(f"Found mustSupport=true without path/id in element of {entry_key} ({member.name})")
|
|
has_ms_in_this_sd = True
|
|
|
|
if has_ms_in_this_sd:
|
|
entry['ms_paths'].update(ms_paths_in_this_sd)
|
|
entry['ms_flag'] = True
|
|
logger.debug(f"Profile {entry_key} has MS flag set. MS paths: {entry['ms_paths']}")
|
|
|
|
if sd_type == 'Extension' and has_ms_in_this_sd:
|
|
internal_ms_exists = any(p.startswith('Extension.') or ':' in p for p in entry['ms_paths'])
|
|
if internal_ms_exists:
|
|
entry['optional_usage'] = True
|
|
logger.info(f"Marked Extension {entry_key} as optional_usage: MS paths={entry['ms_paths']}")
|
|
else:
|
|
logger.debug(f"Extension {entry_key} has MS flag but no internal MS elements: {entry['ms_paths']}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Could not parse JSON SD in {member.name}: {e}")
|
|
results['errors'].append(f"JSON parse error in {member.name}")
|
|
except UnicodeDecodeError as e:
|
|
logger.warning(f"Could not decode SD in {member.name}: {e}")
|
|
results['errors'].append(f"Encoding error in {member.name}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not process SD member {member.name}: {e}", exc_info=False)
|
|
results['errors'].append(f"Processing error in {member.name}: {e}")
|
|
finally:
|
|
if fileobj: fileobj.close()
|
|
|
|
# --- Pass 2: Process Examples ---
|
|
logger.debug("Processing Examples...")
|
|
example_members = [m for m in members if m.isfile() and m.name.startswith('package/') and 'example' in m.name.lower()]
|
|
|
|
for member in example_members:
|
|
base_filename_lower = os.path.basename(member.name).lower()
|
|
if base_filename_lower in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
|
|
continue
|
|
|
|
logger.debug(f"Processing potential example file: {member.name}")
|
|
is_json = member.name.lower().endswith('.json')
|
|
fileobj = None
|
|
associated_key = None
|
|
|
|
try:
|
|
fileobj = tar.extractfile(member)
|
|
if not fileobj: continue
|
|
|
|
if is_json:
|
|
content_bytes = fileobj.read()
|
|
content_string = content_bytes.decode('utf-8-sig')
|
|
data = json.loads(content_string)
|
|
|
|
if not isinstance(data, dict): continue
|
|
resource_type = data.get('resourceType')
|
|
if not resource_type: continue
|
|
|
|
profile_meta = data.get('meta', {}).get('profile', [])
|
|
found_profile_match = False
|
|
if profile_meta and isinstance(profile_meta, list):
|
|
for profile_url in profile_meta:
|
|
if profile_url and isinstance(profile_url, str):
|
|
profile_id_from_meta = profile_url.split('/')[-1]
|
|
if profile_id_from_meta in resource_info:
|
|
associated_key = profile_id_from_meta
|
|
found_profile_match = True
|
|
logger.debug(f"Example {member.name} associated with profile {associated_key} via meta.profile")
|
|
break
|
|
elif profile_url in resource_info:
|
|
associated_key = profile_url
|
|
personally_match = True
|
|
logger.debug(f"Example {member.name} associated with profile {associated_key} via meta.profile")
|
|
break
|
|
|
|
if not found_profile_match:
|
|
key_to_use = resource_type
|
|
if key_to_use not in resource_info:
|
|
resource_info[key_to_use].update({'name': key_to_use, 'type': resource_type, 'is_profile': False})
|
|
associated_key = key_to_use
|
|
logger.debug(f"Example {member.name} associated with resource type {associated_key}")
|
|
referenced_types.add(resource_type)
|
|
else:
|
|
guessed_type = base_filename_lower.split('-')[0].capitalize()
|
|
guessed_profile_id = base_filename_lower.split('-')[0]
|
|
key_to_use = None
|
|
if guessed_profile_id in resource_info:
|
|
key_to_use = guessed_profile_id
|
|
elif guessed_type in resource_info:
|
|
key_to_use = guessed_type
|
|
else:
|
|
key_to_use = guessed_type
|
|
resource_info[key_to_use].update({'name': key_to_use, 'type': key_to_use, 'is_profile': False})
|
|
associated_key = key_to_use
|
|
logger.debug(f"Non-JSON Example {member.name} associated with profile/type {associated_key}")
|
|
referenced_types.add(guessed_type)
|
|
|
|
if associated_key:
|
|
resource_info[associated_key]['examples'].add(member.name)
|
|
else:
|
|
logger.warning(f"Could not associate example {member.name} with any known resource or profile.")
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Could not parse JSON example in {member.name}: {e}")
|
|
except UnicodeDecodeError as e:
|
|
logger.warning(f"Could not decode example in {member.name}: {e}")
|
|
except tarfile.TarError as e:
|
|
logger.warning(f"TarError reading example {member.name}: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not process example member {member.name}: {e}", exc_info=False)
|
|
finally:
|
|
if fileobj: fileobj.close()
|
|
|
|
# --- Pass 3: Ensure Relevant Base Types ---
|
|
logger.debug("Ensuring relevant FHIR R4 base types...")
|
|
essential_types = {'CapabilityStatement'}
|
|
for type_name in referenced_types | essential_types:
|
|
if type_name in FHIR_R4_BASE_TYPES and type_name not in resource_info:
|
|
resource_info[type_name]['name'] = type_name
|
|
resource_info[type_name]['type'] = type_name
|
|
resource_info[type_name]['is_profile'] = False
|
|
logger.debug(f"Added base type entry for {type_name}")
|
|
|
|
# --- Final Consolidation ---
|
|
final_list = []
|
|
final_ms_elements = {}
|
|
final_examples = {}
|
|
logger.debug(f"Finalizing results from resource_info keys: {list(resource_info.keys())}")
|
|
|
|
for key, info in resource_info.items():
|
|
display_name = info.get('name') or key
|
|
base_type = info.get('type')
|
|
|
|
if not display_name or not base_type:
|
|
logger.warning(f"Skipping final formatting for incomplete key: {key} - Info: {info}")
|
|
continue
|
|
|
|
logger.debug(f"Finalizing item '{display_name}': type='{base_type}', is_profile='{info.get('is_profile', False)}', ms_flag='{info.get('ms_flag', False)}', optional_usage='{info.get('optional_usage', False)}'")
|
|
final_list.append({
|
|
'name': display_name,
|
|
'type': base_type,
|
|
'is_profile': info.get('is_profile', False),
|
|
'must_support': info.get('ms_flag', False),
|
|
'optional_usage': info.get('optional_usage', False)
|
|
})
|
|
if info['ms_paths']:
|
|
final_paths = []
|
|
for path in info['ms_paths']:
|
|
if ':' in path and info['type'] == 'Extension':
|
|
final_paths.append(path)
|
|
else:
|
|
final_paths.append(path)
|
|
final_ms_elements[display_name] = sorted(final_paths)
|
|
if info['examples']:
|
|
final_examples[display_name] = sorted(list(info['examples']))
|
|
|
|
results['resource_types_info'] = sorted(final_list, key=lambda x: (not x.get('is_profile', False), x.get('name', '')))
|
|
results['must_support_elements'] = final_ms_elements
|
|
results['examples'] = final_examples
|
|
|
|
logger.debug(f"Final must_support_elements: {json.dumps(final_ms_elements, indent=2)}")
|
|
logger.debug(f"Final examples: {json.dumps(final_examples, indent=2)}")
|
|
|
|
except tarfile.ReadError as e:
|
|
err_msg = f"Tar ReadError processing package file {pkg_basename}: {e}"
|
|
logger.error(err_msg)
|
|
results['errors'].append(err_msg)
|
|
except tarfile.TarError as e:
|
|
err_msg = f"TarError processing package file {pkg_basename}: {e}"
|
|
logger.error(err_msg)
|
|
results['errors'].append(err_msg)
|
|
except FileNotFoundError:
|
|
err_msg = f"Package file not found during processing: {tgz_path}"
|
|
logger.error(err_msg)
|
|
results['errors'].append(err_msg)
|
|
except Exception as e:
|
|
err_msg = f"Unexpected error processing package file {pkg_basename}: {e}"
|
|
logger.error(err_msg, exc_info=True)
|
|
results['errors'].append(err_msg)
|
|
|
|
final_types_count = len(results['resource_types_info'])
|
|
ms_count = sum(1 for r in results['resource_types_info'] if r.get('must_support'))
|
|
optional_ms_count = sum(1 for r in results['resource_types_info'] if r.get('optional_usage'))
|
|
total_ms_paths = sum(len(v) for v in results['must_support_elements'].values())
|
|
total_examples = sum(len(v) for v in results['examples'].values())
|
|
logger.info(f"Package processing finished for {pkg_basename}: "
|
|
f"{final_types_count} Resources/Profiles identified; "
|
|
f"{ms_count} have MS elements ({optional_ms_count} are Optional Extensions w/ MS); "
|
|
f"{total_ms_paths} total unique MS paths found across all profiles; "
|
|
f"{total_examples} examples associated. "
|
|
f"CompliesWith: {len(results['complies_with_profiles'])}, Imposed: {len(results['imposed_profiles'])}. "
|
|
f"Errors during processing: {len(results['errors'])}")
|
|
|
|
return results
|
|
|
|
# --- Validation Functions ---
|
|
def navigate_fhir_path(resource, path, extension_url=None):
|
|
"""Navigates a FHIR resource using a FHIRPath-like expression."""
|
|
if not resource or not path:
|
|
return None
|
|
parts = path.split('.')
|
|
current = resource
|
|
for part in parts:
|
|
match = re.match(r'^(\w+)\[(\d+)\]$', part)
|
|
if match:
|
|
key, index = match.groups()
|
|
index = int(index)
|
|
if isinstance(current, dict) and key in current:
|
|
if isinstance(current[key], list) and index < len(current[key]):
|
|
current = current[key][index]
|
|
else:
|
|
return None
|
|
else:
|
|
return None
|
|
else:
|
|
if isinstance(current, dict) and part in current:
|
|
current = current[part]
|
|
else:
|
|
return None
|
|
if extension_url and isinstance(current, list):
|
|
current = [item for item in current if item.get('url') == extension_url]
|
|
return current
|
|
|
|
def validate_resource_against_profile(package_name, version, resource, include_dependencies=True):
|
|
"""Validates a FHIR resource against a StructureDefinition in the specified package."""
|
|
logger.debug(f"Validating resource {resource.get('resourceType')} against {package_name}#{version}")
|
|
result = {'valid': True, 'errors': [], 'warnings': []}
|
|
download_dir = _get_download_dir()
|
|
if not download_dir:
|
|
result['valid'] = False
|
|
result['errors'].append("Could not access download directory")
|
|
return result
|
|
|
|
tgz_path = os.path.join(download_dir, construct_tgz_filename(package_name, version))
|
|
sd_data, _ = find_and_extract_sd(tgz_path, resource.get('resourceType'))
|
|
if not sd_data:
|
|
result['valid'] = False
|
|
result['errors'].append(f"No StructureDefinition found for {resource.get('resourceType')} in {package_name}#{version}")
|
|
return result
|
|
|
|
# Basic validation: check required elements and Must Support
|
|
elements = sd_data.get('snapshot', {}).get('element', [])
|
|
for element in elements:
|
|
path = element.get('path')
|
|
if element.get('min', 0) > 0:
|
|
value = navigate_fhir_path(resource, path)
|
|
if value is None or (isinstance(value, list) and not value):
|
|
result['valid'] = False
|
|
result['errors'].append(f"Required element {path} missing")
|
|
if element.get('mustSupport', False):
|
|
value = navigate_fhir_path(resource, path)
|
|
if value is None or (isinstance(value, list) and not value):
|
|
result['warnings'].append(f"Must Support element {path} missing or empty")
|
|
|
|
return result
|
|
|
|
def validate_bundle_against_profile(package_name, version, bundle, include_dependencies=True):
|
|
"""Validates a FHIR Bundle against profiles in the specified package."""
|
|
logger.debug(f"Validating bundle against {package_name}#{version}")
|
|
result = {
|
|
'valid': True,
|
|
'errors': [],
|
|
'warnings': [],
|
|
'results': {}
|
|
}
|
|
if not bundle.get('resourceType') == 'Bundle':
|
|
result['valid'] = False
|
|
result['errors'].append("Resource is not a Bundle")
|
|
return result
|
|
|
|
for entry in bundle.get('entry', []):
|
|
resource = entry.get('resource')
|
|
if not resource:
|
|
continue
|
|
resource_type = resource.get('resourceType')
|
|
resource_id = resource.get('id', 'unknown')
|
|
validation_result = validate_resource_against_profile(package_name, version, resource, include_dependencies)
|
|
result['results'][f"{resource_type}/{resource_id}"] = validation_result
|
|
if not validation_result['valid']:
|
|
result['valid'] = False
|
|
result['errors'].extend(validation_result['errors'])
|
|
result['warnings'].extend(validation_result['warnings'])
|
|
|
|
return result
|
|
|
|
# --- Structure Definition Retrieval ---
|
|
def get_structure_definition(package_name, version, resource_type):
|
|
"""Fetches StructureDefinition with slicing support."""
|
|
download_dir = _get_download_dir()
|
|
if not download_dir:
|
|
logger.error("Could not get download directory.")
|
|
return {'error': 'Download directory not accessible'}
|
|
|
|
tgz_filename = construct_tgz_filename(package_name, version)
|
|
tgz_path = os.path.join(download_dir, tgz_filename)
|
|
sd_data, sd_path = find_and_extract_sd(tgz_path, resource_type)
|
|
|
|
if not sd_data:
|
|
# Fallback to canonical package
|
|
canonical_tgz = construct_tgz_filename(*CANONICAL_PACKAGE)
|
|
canonical_path = os.path.join(download_dir, canonical_tgz)
|
|
sd_data, sd_path = find_and_extract_sd(canonical_path, resource_type)
|
|
if sd_data:
|
|
logger.info(f"Using canonical SD for {resource_type} from {canonical_path}")
|
|
elements = sd_data.get('snapshot', {}).get('element', [])
|
|
return {
|
|
'elements': elements,
|
|
'must_support_paths': [el['path'] for el in elements if el.get('mustSupport', False)],
|
|
'slices': [],
|
|
'fallback_used': True,
|
|
'source_package': f"{CANONICAL_PACKAGE[0]}#{CANONICAL_PACKAGE[1]}"
|
|
}
|
|
logger.error(f"No StructureDefinition found for {resource_type} in {package_name}#{version} or canonical package")
|
|
return {'error': f"No StructureDefinition for {resource_type}"}
|
|
|
|
elements = sd_data.get('snapshot', {}).get('element', [])
|
|
must_support_paths = []
|
|
slices = []
|
|
|
|
# Process elements for must-support and slicing
|
|
for element in elements:
|
|
path = element.get('path', '')
|
|
element_id = element.get('id', '')
|
|
slice_name = element.get('sliceName')
|
|
if element.get('mustSupport', False):
|
|
ms_path = element_id if slice_name else path
|
|
must_support_paths.append(ms_path)
|
|
if 'slicing' in element:
|
|
slice_info = {
|
|
'path': path,
|
|
'sliceName': slice_name,
|
|
'discriminator': element.get('slicing', {}).get('discriminator', []),
|
|
'nested_slices': []
|
|
}
|
|
# Find nested slices
|
|
for sub_element in elements:
|
|
if sub_element['path'].startswith(path + '.') and 'slicing' in sub_element:
|
|
sub_slice_name = sub_element.get('sliceName')
|
|
slice_info['nested_slices'].append({
|
|
'path': sub_element['path'],
|
|
'sliceName': sub_slice_name,
|
|
'discriminator': sub_element.get('slicing', {}).get('discriminator', [])
|
|
})
|
|
slices.append(slice_info)
|
|
|
|
logger.debug(f"StructureDefinition for {resource_type}: {len(elements)} elements, {len(must_support_paths)} must-support paths, {len(slices)} slices")
|
|
return {
|
|
'elements': elements,
|
|
'must_support_paths': sorted(list(set(must_support_paths))),
|
|
'slices': slices,
|
|
'fallback_used': False
|
|
}
|
|
|
|
# --- Other Service Functions ---
|
|
def _build_package_index(download_dir):
|
|
"""Builds an index of canonical URLs to package details from .index.json files."""
|
|
index = {}
|
|
try:
|
|
for tgz_file in os.listdir(download_dir):
|
|
if not tgz_file.endswith('.tgz'):
|
|
continue
|
|
tgz_path = os.path.join(download_dir, tgz_file)
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
index_file = next((m for m in tar.getmembers() if m.name == 'package/.index.json'), None)
|
|
if index_file:
|
|
fileobj = tar.extractfile(index_file)
|
|
if fileobj:
|
|
content = json.loads(fileobj.read().decode('utf-8-sig'))
|
|
package_name = content.get('package-id', '')
|
|
package_version = content.get('version', '')
|
|
for file_entry in content.get('files', []):
|
|
canonical = file_entry.get('canonical')
|
|
filename = file_entry.get('filename')
|
|
if canonical and filename:
|
|
index[canonical] = {
|
|
'package_name': package_name,
|
|
'package_version': package_version,
|
|
'filename': filename
|
|
}
|
|
fileobj.close()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to index {tgz_file}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error building package index: {e}")
|
|
return index
|
|
|
|
def _find_definition_details(url, download_dir):
|
|
"""Finds package details for a canonical URL."""
|
|
index = current_app.config.get('PACKAGE_INDEX')
|
|
if index is None:
|
|
index = _build_package_index(download_dir)
|
|
current_app.config['PACKAGE_INDEX'] = index
|
|
return index.get(url)
|
|
|
|
def _load_definition(details, download_dir):
|
|
"""Loads a StructureDefinition from package details."""
|
|
if not details:
|
|
return None
|
|
tgz_path = os.path.join(download_dir, construct_tgz_filename(details['package_name'], details['package_version']))
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
member_path = f"package/{details['filename']}"
|
|
member = next((m for m in tar.getmembers() if m.name == member_path), None)
|
|
if member:
|
|
fileobj = tar.extractfile(member)
|
|
if fileobj:
|
|
data = json.loads(fileobj.read().decode('utf-8-sig'))
|
|
fileobj.close()
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"Failed to load definition {details['filename']} from {tgz_path}: {e}")
|
|
return None
|
|
|
|
def download_package(name, version):
|
|
"""Downloads a single FHIR package."""
|
|
download_dir = _get_download_dir()
|
|
if not download_dir: return None, "Download dir error"
|
|
filename = construct_tgz_filename(name, version)
|
|
if not filename: return None, "Filename construction error"
|
|
save_path = os.path.join(download_dir, filename)
|
|
if os.path.exists(save_path):
|
|
logger.info(f"Package already exists: {save_path}")
|
|
return save_path, None
|
|
package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}"
|
|
try:
|
|
with requests.get(package_url, stream=True, timeout=60) as r:
|
|
r.raise_for_status()
|
|
with open(save_path, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
|
|
logger.info(f"Downloaded {filename}")
|
|
return save_path, None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Download failed for {name}#{version}: {e}")
|
|
return None, f"Download error: {e}"
|
|
except IOError as e:
|
|
logger.error(f"File write error for {save_path}: {e}")
|
|
return None, f"File write error: {e}"
|
|
|
|
def extract_dependencies(tgz_path):
|
|
"""Extracts dependencies from package.json."""
|
|
package_json_path = "package/package.json"
|
|
dependencies = {}
|
|
error_message = None
|
|
if not tgz_path or not os.path.exists(tgz_path): return None, "File not found"
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
try:
|
|
pkg_member = tar.getmember(package_json_path)
|
|
with tar.extractfile(pkg_member) as f:
|
|
pkg_data = json.load(f)
|
|
dependencies = pkg_data.get('dependencies', {})
|
|
except KeyError: error_message = "package.json not found"
|
|
except (json.JSONDecodeError, tarfile.TarError) as e: error_message = f"Error reading package.json: {e}"
|
|
except tarfile.TarError as e: error_message = f"Error opening tarfile: {e}"
|
|
except Exception as e: error_message = f"Unexpected error: {e}"
|
|
return dependencies, error_message
|
|
|
|
def extract_used_types(tgz_path):
|
|
"""Extracts all resource types and referenced types from the package resources."""
|
|
used_types = set()
|
|
if not tgz_path or not os.path.exists(tgz_path):
|
|
logger.error(f"Cannot extract used types: File not found at {tgz_path}")
|
|
return used_types
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
for member in tar:
|
|
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
|
|
continue
|
|
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
|
|
continue
|
|
fileobj = None
|
|
try:
|
|
fileobj = tar.extractfile(member)
|
|
if fileobj:
|
|
content_bytes = fileobj.read()
|
|
content_string = content_bytes.decode('utf-8-sig')
|
|
data = json.loads(content_string)
|
|
if not isinstance(data, dict): continue
|
|
resource_type = data.get('resourceType')
|
|
if not resource_type: continue
|
|
used_types.add(resource_type)
|
|
if resource_type == 'StructureDefinition':
|
|
sd_type = data.get('type')
|
|
if sd_type: used_types.add(sd_type)
|
|
base_def = data.get('baseDefinition')
|
|
if base_def:
|
|
base_type = base_def.split('/')[-1]
|
|
if base_type and base_type[0].isupper(): used_types.add(base_type)
|
|
elements = data.get('snapshot', {}).get('element', []) or data.get('differential', {}).get('element', [])
|
|
for element in elements:
|
|
if isinstance(element, dict) and 'type' in element:
|
|
for t in element.get('type', []):
|
|
code = t.get('code')
|
|
if code and code[0].isupper(): used_types.add(code)
|
|
for profile_uri in t.get('targetProfile', []):
|
|
if profile_uri:
|
|
profile_type = profile_uri.split('/')[-1]
|
|
if profile_type and profile_type[0].isupper(): used_types.add(profile_type)
|
|
else:
|
|
profiles = data.get('meta', {}).get('profile', [])
|
|
for profile_uri in profiles:
|
|
if profile_uri:
|
|
profile_type = profile_uri.split('/')[-1]
|
|
if profile_type and profile_type[0].isupper(): used_types.add(profile_type)
|
|
if resource_type == 'ValueSet':
|
|
for include in data.get('compose', {}).get('include', []):
|
|
system = include.get('system')
|
|
if system and system.startswith('http://hl7.org/fhir/'):
|
|
type_name = system.split('/')[-1]
|
|
if type_name and type_name[0].isupper() and not type_name.startswith('sid'):
|
|
used_types.add(type_name)
|
|
if resource_type == 'CapabilityStatement':
|
|
for rest_item in data.get('rest', []):
|
|
for resource_item in rest_item.get('resource', []):
|
|
res_type = resource_item.get('type')
|
|
if res_type and res_type[0].isupper(): used_types.add(res_type)
|
|
profile_uri = resource_item.get('profile')
|
|
if profile_uri:
|
|
profile_type = profile_uri.split('/')[-1]
|
|
if profile_type and profile_type[0].isupper(): used_types.add(profile_type)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Could not parse JSON in {member.name}: {e}")
|
|
except UnicodeDecodeError as e:
|
|
logger.warning(f"Could not decode {member.name}: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not process member {member.name}: {e}")
|
|
finally:
|
|
if fileobj:
|
|
fileobj.close()
|
|
except tarfile.ReadError as e:
|
|
logger.error(f"Tar ReadError extracting used types from {tgz_path}: {e}")
|
|
except tarfile.TarError as e:
|
|
logger.error(f"TarError extracting used types from {tgz_path}: {e}")
|
|
except FileNotFoundError:
|
|
logger.error(f"Package file not found: {tgz_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error extracting used types from {tgz_path}: {e}", exc_info=True)
|
|
core_non_resource_types = {
|
|
'string', 'boolean', 'integer', 'decimal', 'uri', 'url', 'canonical', 'base64Binary', 'instant',
|
|
'date', 'dateTime', 'time', 'code', 'oid', 'id', 'markdown', 'unsignedInt', 'positiveInt', 'xhtml',
|
|
'Element', 'BackboneElement', 'Resource', 'DomainResource', 'DataType'
|
|
}
|
|
final_used_types = {t for t in used_types if t not in core_non_resource_types and t[0].isupper()}
|
|
logger.debug(f"Extracted used types from {os.path.basename(tgz_path)}: {final_used_types}")
|
|
return final_used_types
|
|
|
|
def map_types_to_packages(used_types, all_dependencies, download_dir):
|
|
"""Maps used types to packages by checking .index.json files."""
|
|
type_to_package = {}
|
|
processed_types = set()
|
|
for (pkg_name, pkg_version), _ in all_dependencies.items():
|
|
tgz_filename = construct_tgz_filename(pkg_name, pkg_version)
|
|
tgz_path = os.path.join(download_dir, tgz_filename)
|
|
if not os.path.exists(tgz_path):
|
|
logger.warning(f"Package {tgz_filename} not found for type mapping")
|
|
continue
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
index_file = next((m for m in tar.getmembers() if m.name == 'package/.index.json'), None)
|
|
if index_file:
|
|
fileobj = tar.extractfile(index_file)
|
|
if fileobj:
|
|
content = json.loads(fileobj.read().decode('utf-8-sig'))
|
|
for file_entry in content.get('files', []):
|
|
resource_type = file_entry.get('resourceType')
|
|
filename = file_entry.get('filename')
|
|
if resource_type == 'StructureDefinition' and filename.endswith('.json'):
|
|
sd_name = os.path.splitext(os.path.basename(filename))[0]
|
|
if sd_name in used_types:
|
|
type_to_package[sd_name] = (pkg_name, pkg_version)
|
|
processed_types.add(sd_name)
|
|
logger.debug(f"Mapped type '{sd_name}' to package '{pkg_name}#{pkg_version}'")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to process .index.json for {pkg_name}#{pkg_version}: {e}")
|
|
for t in used_types - processed_types:
|
|
for (pkg_name, pkg_version), _ in all_dependencies.items():
|
|
if t.lower() in pkg_name.lower():
|
|
type_to_package[t] = (pkg_name, pkg_version)
|
|
processed_types.add(t)
|
|
logger.debug(f"Fallback: Mapped type '{t}' to package '{pkg_name}#{pkg_version}'")
|
|
break
|
|
canonical_name, canonical_version = CANONICAL_PACKAGE
|
|
for t in used_types - processed_types:
|
|
type_to_package[t] = CANONICAL_PACKAGE
|
|
logger.debug(f"Fallback: Mapped type '{t}' to canonical package {canonical_name}#{canonical_version}")
|
|
logger.debug(f"Final type-to-package mapping: {type_to_package}")
|
|
return type_to_package
|
|
|
|
def import_package_and_dependencies(initial_name, initial_version, dependency_mode='recursive'):
|
|
"""Orchestrates recursive download and dependency extraction."""
|
|
logger.info(f"Starting import for {initial_name}#{initial_version} with dependency_mode={dependency_mode}")
|
|
download_dir = _get_download_dir()
|
|
if not download_dir:
|
|
return {'requested': (initial_name, initial_version), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'dependencies': [], 'errors': ['Download directory not accessible']}
|
|
|
|
results = {
|
|
'requested': (initial_name, initial_version),
|
|
'processed': set(),
|
|
'downloaded': {},
|
|
'all_dependencies': {},
|
|
'dependencies': [],
|
|
'errors': []
|
|
}
|
|
pending_queue = [(initial_name, initial_version)]
|
|
queued_or_processed_lookup = set([(initial_name, initial_version)])
|
|
all_found_dependencies = set()
|
|
|
|
while pending_queue:
|
|
name, version = pending_queue.pop(0)
|
|
package_id_tuple = (name, version)
|
|
if package_id_tuple in results['processed']:
|
|
logger.debug(f"Skipping already processed package: {name}#{version}")
|
|
continue
|
|
logger.info(f"Processing package from queue: {name}#{version}")
|
|
save_path, dl_error = download_package(name, version)
|
|
if dl_error:
|
|
results['errors'].append(f"Download failed for {name}#{version}: {dl_error}")
|
|
continue
|
|
results['downloaded'][package_id_tuple] = save_path
|
|
dependencies, dep_error = extract_dependencies(save_path)
|
|
if dep_error:
|
|
results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}")
|
|
results['processed'].add(package_id_tuple)
|
|
continue
|
|
elif dependencies is None:
|
|
results['errors'].append(f"Dependency extraction returned critical error for {name}#{version}.")
|
|
results['processed'].add(package_id_tuple)
|
|
continue
|
|
results['all_dependencies'][package_id_tuple] = dependencies
|
|
results['processed'].add(package_id_tuple)
|
|
current_package_deps = []
|
|
for dep_name, dep_version in dependencies.items():
|
|
if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version:
|
|
dep_tuple = (dep_name, dep_version)
|
|
current_package_deps.append({"name": dep_name, "version": dep_version})
|
|
if dep_tuple not in all_found_dependencies:
|
|
all_found_dependencies.add(dep_tuple)
|
|
results['dependencies'].append({"name": dep_name, "version": dep_version})
|
|
if dep_tuple not in queued_or_processed_lookup:
|
|
should_queue = False
|
|
if dependency_mode == 'recursive':
|
|
should_queue = True
|
|
elif dependency_mode == 'patch-canonical' and dep_tuple == CANONICAL_PACKAGE:
|
|
should_queue = True
|
|
if should_queue:
|
|
logger.debug(f"Adding dependency to queue ({dependency_mode}): {dep_name}#{dep_version}")
|
|
pending_queue.append(dep_tuple)
|
|
queued_or_processed_lookup.add(dep_tuple)
|
|
save_package_metadata(name, version, dependency_mode, current_package_deps)
|
|
if dependency_mode == 'tree-shaking' and package_id_tuple == (initial_name, initial_version):
|
|
logger.info(f"Performing tree-shaking for {initial_name}#{initial_version}")
|
|
used_types = extract_used_types(save_path)
|
|
if used_types:
|
|
type_to_package = map_types_to_packages(used_types, results['all_dependencies'], download_dir)
|
|
tree_shaken_deps = set(type_to_package.values()) - {package_id_tuple}
|
|
if CANONICAL_PACKAGE not in tree_shaken_deps:
|
|
tree_shaken_deps.add(CANONICAL_PACKAGE)
|
|
logger.debug(f"Ensuring canonical package {CANONICAL_PACKAGE} for tree-shaking")
|
|
for dep_tuple in tree_shaken_deps:
|
|
if dep_tuple not in queued_or_processed_lookup:
|
|
logger.info(f"Queueing tree-shaken dependency: {dep_tuple[0]}#{dep_tuple[1]}")
|
|
pending_queue.append(dep_tuple)
|
|
queued_or_processed_lookup.add(dep_tuple)
|
|
results['dependencies'] = [{"name": d[0], "version": d[1]} for d in all_found_dependencies]
|
|
logger.info(f"Import finished for {initial_name}#{initial_version}. Processed: {len(results['processed'])}, Downloaded: {len(results['downloaded'])}, Errors: {len(results['errors'])}")
|
|
return results
|
|
|
|
# --- Standalone Test ---
|
|
if __name__ == '__main__':
|
|
logger.info("Running services.py directly for testing.")
|
|
class MockFlask:
|
|
class Config(dict):
|
|
pass
|
|
config = Config()
|
|
instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance'))
|
|
mock_app = MockFlask()
|
|
test_download_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'instance', DOWNLOAD_DIR_NAME))
|
|
mock_app.config['FHIR_PACKAGES_DIR'] = test_download_dir
|
|
os.makedirs(test_download_dir, exist_ok=True)
|
|
logger.info(f"Using test download directory: {test_download_dir}")
|
|
print("\n--- Testing Filename Parsing ---")
|
|
test_files = [
|
|
"hl7.fhir.r4.core-4.0.1.tgz",
|
|
"hl7.fhir.us.core-6.1.0.tgz",
|
|
"fhir.myig.patient-1.2.3-beta.tgz",
|
|
"my.company.fhir.Terminologies-0.1.0.tgz",
|
|
"package-with-hyphens-in-name-1.0.tgz",
|
|
"noversion.tgz",
|
|
"badformat-1.0",
|
|
"hl7.fhir.au.core-1.1.0-preview.tgz",
|
|
]
|
|
for tf in test_files:
|
|
p_name, p_ver = parse_package_filename(tf)
|
|
print(f"'{tf}' -> Name: '{p_name}', Version: '{p_ver}'")
|
|
pkg_name_to_test = "hl7.fhir.au.core"
|
|
pkg_version_to_test = "1.1.0-preview"
|
|
print(f"\n--- Testing Import: {pkg_name_to_test}#{pkg_version_to_test} ---")
|
|
import_results = import_package_and_dependencies(pkg_name_to_test, pkg_version_to_test, dependency_mode='recursive')
|
|
print("\nImport Results:")
|
|
print(f" Requested: {import_results['requested']}")
|
|
print(f" Downloaded Count: {len(import_results['downloaded'])}")
|
|
print(f" Unique Dependencies Found: {len(import_results['dependencies'])}")
|
|
print(f" Errors: {len(import_results['errors'])}")
|
|
for error in import_results['errors']:
|
|
print(f" - {error}")
|
|
if (pkg_name_to_test, pkg_version_to_test) in import_results['downloaded']:
|
|
test_tgz_path = import_results['downloaded'][(pkg_name_to_test, pkg_version_to_test)]
|
|
print(f"\n--- Testing Processing: {test_tgz_path} ---")
|
|
processing_results = process_package_file(test_tgz_path)
|
|
print("\nProcessing Results:")
|
|
print(f" Resource Types Info Count: {len(processing_results.get('resource_types_info', []))}")
|
|
print(f" Profiles with MS Elements: {sum(1 for r in processing_results.get('resource_types_info', []) if r.get('must_support'))}")
|
|
print(f" Optional Extensions w/ MS: {sum(1 for r in processing_results.get('resource_types_info', []) if r.get('optional_usage'))}")
|
|
print(f" Must Support Elements Dict Count: {len(processing_results.get('must_support_elements', {}))}")
|
|
print(f" Examples Dict Count: {len(processing_results.get('examples', {}))}")
|
|
print(f" Complies With Profiles: {processing_results.get('complies_with_profiles', [])}")
|
|
print(f" Imposed Profiles: {processing_results.get('imposed_profiles', [])}")
|
|
print(f" Processing Errors: {processing_results.get('errors', [])}")
|
|
else:
|
|
print(f"\n--- Skipping Processing Test (Import failed for {pkg_name_to_test}#{pkg_version_to_test}) ---") |