This commit is contained in:
Joshua Hare 2025-04-26 22:51:52 +10:00
parent 02d58facbb
commit cfe4f3dd4f
19 changed files with 16589 additions and 2470 deletions

365
app.py
View File

@ -587,219 +587,15 @@ def view_ig(processed_ig_id):
# return Response(json.dumps(response_data, indent=None, separators=(',', ':')), mimetype='application/json')
#-----------------------------------------------------------------------------------------------------------------------------------
# --- Full /get-structure Function ---
@app.route('/get-structure')
def get_structure():
package_name = request.args.get('package_name')
package_version = request.args.get('package_version')
# This is the StructureDefinition ID/Name or base ResourceType
resource_type = request.args.get('resource_type')
view = request.args.get('view', 'snapshot') # Keep for potential future use
# --- Parameter Validation ---
if not all([package_name, package_version, resource_type]):
logger.warning("get_structure: Missing query parameters: package_name=%s, package_version=%s, resource_type=%s", package_name, package_version, resource_type)
return jsonify({"error": "Missing required query parameters: package_name, package_version, resource_type"}), 400
# --- Package Directory Setup ---
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
if not packages_dir:
logger.error("FHIR_PACKAGES_DIR not configured.")
return jsonify({"error": "Server configuration error: Package directory not set."}), 500
# --- Paths setup ---
tgz_filename = services.construct_tgz_filename(package_name, package_version)
tgz_path = os.path.join(packages_dir, tgz_filename)
# Assuming CANONICAL_PACKAGE is defined in services (e.g., ('hl7.fhir.r4.core', '4.0.1'))
core_package_name, core_package_version = services.CANONICAL_PACKAGE
core_tgz_filename = services.construct_tgz_filename(core_package_name, core_package_version)
core_tgz_path = os.path.join(packages_dir, core_tgz_filename)
sd_data = None
search_params_data = [] # Initialize search params list
fallback_used = False
source_package_id = f"{package_name}#{package_version}"
base_resource_type_for_sp = None # Variable to store the base type for SP search
logger.debug(f"Attempting to find SD for '{resource_type}' in {tgz_filename}")
# --- Fetch SD Data (Primary Package) ---
primary_package_exists = os.path.exists(tgz_path)
core_package_exists = os.path.exists(core_tgz_path)
if primary_package_exists:
try:
# Assuming find_and_extract_sd handles narrative removal
sd_data, _ = services.find_and_extract_sd(tgz_path, resource_type)
if sd_data:
# Determine the base resource type from the fetched SD
base_resource_type_for_sp = sd_data.get('type')
logger.debug(f"Determined base resource type '{base_resource_type_for_sp}' from primary SD '{resource_type}'")
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from primary package {tgz_path}: {e}", exc_info=True)
sd_data = None # Ensure sd_data is None if extraction failed
# --- Fallback SD Check (if primary failed or file didn't exist) ---
if sd_data is None:
logger.info(f"SD for '{resource_type}' not found or failed to load from {source_package_id}. Attempting fallback to {services.CANONICAL_PACKAGE_ID}.")
if not core_package_exists:
logger.error(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally at {core_tgz_path}.")
error_message = f"SD for '{resource_type}' not found in primary package, and core package is missing." if primary_package_exists else f"Primary package {package_name}#{package_version} and core package are missing."
return jsonify({"error": error_message}), 500 if primary_package_exists else 404
try:
sd_data, _ = services.find_and_extract_sd(core_tgz_path, resource_type)
if sd_data is not None:
fallback_used = True
source_package_id = services.CANONICAL_PACKAGE_ID
base_resource_type_for_sp = sd_data.get('type') # Store base type from fallback SD
logger.info(f"Found SD for '{resource_type}' in fallback package {source_package_id}. Base type: '{base_resource_type_for_sp}'")
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error reading fallback StructureDefinition: {str(e)}"}), 500
# --- Check if SD data was ultimately found ---
if not sd_data:
logger.error(f"SD for '{resource_type}' could not be found in primary or fallback packages.")
return jsonify({"error": f"StructureDefinition for '{resource_type}' not found."}), 404
# --- Fetch Search Parameters (Primary Package First) ---
# find_and_extract_search_params returns a list of dicts with basic SP info
if base_resource_type_for_sp and primary_package_exists:
try:
logger.info(f"Fetching SearchParameters for base type '{base_resource_type_for_sp}' from primary package {tgz_path}")
search_params_data = services.find_and_extract_search_params(tgz_path, base_resource_type_for_sp)
except Exception as e:
logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from primary package {tgz_path}: {e}", exc_info=True)
search_params_data = [] # Continue with empty list on error
elif not primary_package_exists:
logger.warning(f"Original package {tgz_path} not found, cannot search it for specific SearchParameters.")
elif not base_resource_type_for_sp:
logger.warning(f"Base resource type could not be determined for '{resource_type}', cannot search for SearchParameters.")
# --- Fetch Search Parameters (Fallback to Core Package if needed) ---
if not search_params_data and base_resource_type_for_sp and core_package_exists:
logger.info(f"No relevant SearchParameters found in primary package for '{base_resource_type_for_sp}'. Searching core package {core_tgz_path}.")
try:
search_params_data = services.find_and_extract_search_params(core_tgz_path, base_resource_type_for_sp)
if search_params_data:
logger.info(f"Found {len(search_params_data)} SearchParameters for '{base_resource_type_for_sp}' in core package.")
except Exception as e:
logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from core package {core_tgz_path}: {e}", exc_info=True)
search_params_data = [] # Continue with empty list on error
elif not search_params_data and not core_package_exists:
logger.warning(f"Core package {core_tgz_path} not found, cannot perform fallback search for SearchParameters.")
# --- Prepare Snapshot/Differential Elements ---
snapshot_elements = sd_data.get('snapshot', {}).get('element', [])
differential_elements = sd_data.get('differential', {}).get('element', [])
# Create set of IDs from differential elements for efficient lookup
differential_ids = {el.get('id') for el in differential_elements if el.get('id')}
logger.debug(f"Found {len(differential_ids)} unique IDs in differential.")
enriched_elements = []
if snapshot_elements:
logger.debug(f"Processing {len(snapshot_elements)} snapshot elements to add isInDifferential flag.")
for element in snapshot_elements:
element_id = element.get('id')
# Add the isInDifferential flag based on presence in differential_ids set
element['isInDifferential'] = bool(element_id and element_id in differential_ids)
enriched_elements.append(element)
# remove_narrative should ideally be handled within find_and_extract_sd,
# but applying it again here ensures it's done if the service function missed it.
enriched_elements = [services.remove_narrative(el) for el in enriched_elements]
else:
# If no snapshot, log warning. Front-end might need adjustment if only differential is sent.
logger.warning(f"No snapshot found for {resource_type} in {source_package_id}. Returning empty element list.")
enriched_elements = [] # Or consider returning differential and handle in JS
# --- Retrieve Must Support Paths from DB ---
must_support_paths = []
# Query DB once for the ProcessedIg record
processed_ig_record = ProcessedIg.query.filter_by(package_name=package_name, version=package_version).first()
if processed_ig_record and processed_ig_record.must_support_elements:
ms_elements_dict = processed_ig_record.must_support_elements
# Try getting MS paths using the profile ID/name first, fallback to base type
must_support_paths = ms_elements_dict.get(resource_type, [])
if not must_support_paths and base_resource_type_for_sp:
must_support_paths = ms_elements_dict.get(base_resource_type_for_sp, [])
if must_support_paths:
logger.debug(f"Retrieved {len(must_support_paths)} MS paths using base type key '{base_resource_type_for_sp}' from DB.")
elif must_support_paths:
logger.debug(f"Retrieved {len(must_support_paths)} MS paths using profile key '{resource_type}' from DB.")
else:
logger.debug(f"No specific MS paths found for keys '{resource_type}' or '{base_resource_type_for_sp}' in DB.")
else:
logger.debug(f"No processed IG record or no must_support_elements found in DB for {package_name}#{package_version}")
# --- Fetch and Merge Conformance Data ---
search_param_conformance_rules = {}
if base_resource_type_for_sp: # Only proceed if we identified the base type
# Reuse the DB record queried for Must Support if available
if processed_ig_record:
# Check if the record has the conformance data attribute and it's not None/empty
# **IMPORTANT**: This assumes 'search_param_conformance' column was added to the model
if hasattr(processed_ig_record, 'search_param_conformance') and processed_ig_record.search_param_conformance:
all_conformance_data = processed_ig_record.search_param_conformance
# Get the specific rules map for the current base resource type
search_param_conformance_rules = all_conformance_data.get(base_resource_type_for_sp, {})
logger.debug(f"Retrieved conformance rules for {base_resource_type_for_sp} from DB: {search_param_conformance_rules}")
else:
logger.warning(f"ProcessedIg record found, but 'search_param_conformance' attribute/data is missing or empty for {package_name}#{package_version}.")
else:
# This case should be rare if MS check already happened, but handles it
logger.warning(f"No ProcessedIg record found for {package_name}#{package_version} to get conformance rules.")
# Merge the retrieved conformance rules into the search_params_data list
if search_params_data:
logger.debug(f"Merging conformance data into {len(search_params_data)} search parameters.")
for param in search_params_data:
param_code = param.get('code')
if param_code:
# Lookup the code in the rules; default to 'Optional' if not found
conformance_level = search_param_conformance_rules.get(param_code, 'Optional')
param['conformance'] = conformance_level # Update the dictionary
else:
# Handle cases where SearchParameter might lack a 'code' (should be rare)
param['conformance'] = 'Unknown'
logger.debug("Finished merging conformance data.")
else:
logger.debug(f"No search parameters found for {base_resource_type_for_sp} to merge conformance data into.")
else:
logger.warning(f"Cannot fetch conformance data because base resource type (e.g., Patient) for '{resource_type}' could not be determined.")
# Ensure existing search params still have a default conformance
for param in search_params_data:
if 'conformance' not in param or param['conformance'] == 'N/A':
param['conformance'] = 'Optional'
# --- Construct the final response ---
response_data = {
'elements': enriched_elements,
'must_support_paths': must_support_paths,
# This list now includes the 'conformance' field with actual values (or 'Optional'/'Unknown')
'search_parameters': search_params_data,
'fallback_used': fallback_used,
'source_package': source_package_id
# Consider explicitly including the raw sd_data['differential'] if needed by JS,
# otherwise keep it excluded to reduce payload size.
# 'differential_elements': differential_elements
}
# Use Response object for consistent JSON formatting and smaller payload
# indent=None, separators=(',', ':') creates the most compact JSON
return Response(json.dumps(response_data, indent=None, separators=(',', ':')), mimetype='application/json')
# --- End of /get-structure Function ---
@app.route('/get-example')
def get_example():
package_name = request.args.get('package_name')
version = request.args.get('package_version')
version = request.args.get('version')
filename = request.args.get('filename')
include_narrative = request.args.get('include_narrative', 'false').lower() == 'true'
if not all([package_name, version, filename]):
logger.warning("get_example: Missing query parameters: package_name=%s, version=%s, filename=%s", package_name, version, filename)
return jsonify({"error": "Missing required query parameters: package_name, package_version, filename"}), 400
return jsonify({"error": "Missing required query parameters: package_name, version, filename"}), 400
if not filename.startswith('package/') or '..' in filename:
logger.warning(f"Invalid example file path requested: {filename}")
return jsonify({"error": "Invalid example file path."}), 400
@ -819,12 +615,9 @@ def get_example():
with tar.extractfile(example_member) as example_fileobj:
content_bytes = example_fileobj.read()
content_string = content_bytes.decode('utf-8-sig')
# Parse JSON to remove narrative
content = json.loads(content_string)
if 'text' in content:
logger.debug(f"Removing narrative text from example '{filename}'")
del content['text']
# Return filtered JSON content as a compact string
if not include_narrative:
content = services.remove_narrative(content, include_narrative=False)
filtered_content_string = json.dumps(content, separators=(',', ':'), sort_keys=False)
return Response(filtered_content_string, mimetype='application/json')
except KeyError:
@ -849,6 +642,154 @@ def get_example():
logger.error(f"Unexpected error getting example '{filename}' from {tgz_filename}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error: {str(e)}"}), 500
@app.route('/get-structure')
def get_structure():
package_name = request.args.get('package_name')
version = request.args.get('version')
resource_type = request.args.get('resource_type')
view = request.args.get('view', 'snapshot')
include_narrative = request.args.get('include_narrative', 'false').lower() == 'true'
raw = request.args.get('raw', 'false').lower() == 'true'
profile_url = request.args.get('profile_url')
if not all([package_name, version, resource_type]):
logger.warning("get_structure: Missing query parameters: package_name=%s, version=%s, resource_type=%s", package_name, version, resource_type)
return jsonify({"error": "Missing required query parameters: package_name, version, resource_type"}), 400
packages_dir = current_app.config.get('FHIR_PACKAGES_DIR')
if not packages_dir:
logger.error("FHIR_PACKAGES_DIR not configured.")
return jsonify({"error": "Server configuration error: Package directory not set."}), 500
tgz_filename = services.construct_tgz_filename(package_name, version)
tgz_path = os.path.join(packages_dir, tgz_filename)
core_package_name, core_package_version = services.CANONICAL_PACKAGE
core_tgz_filename = services.construct_tgz_filename(core_package_name, core_package_version)
core_tgz_path = os.path.join(packages_dir, core_tgz_filename)
sd_data = None
search_params_data = []
fallback_used = False
source_package_id = f"{package_name}#{version}"
base_resource_type_for_sp = None
logger.debug(f"Attempting to find SD for '{resource_type}' in {tgz_filename}")
primary_package_exists = os.path.exists(tgz_path)
core_package_exists = os.path.exists(core_tgz_path)
if primary_package_exists:
try:
sd_data, _ = services.find_and_extract_sd(tgz_path, resource_type, profile_url=profile_url, include_narrative=include_narrative, raw=raw)
if sd_data:
base_resource_type_for_sp = sd_data.get('type')
logger.debug(f"Determined base resource type '{base_resource_type_for_sp}' from primary SD '{resource_type}'")
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from primary package {tgz_path}: {e}", exc_info=True)
sd_data = None
if sd_data is None:
logger.info(f"SD for '{resource_type}' not found or failed to load from {source_package_id}. Attempting fallback to {services.CANONICAL_PACKAGE_ID}.")
if not core_package_exists:
logger.error(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally at {core_tgz_path}.")
error_message = f"SD for '{resource_type}' not found in primary package, and core package is missing." if primary_package_exists else f"Primary package {package_name}#{version} and core package are missing."
return jsonify({"error": error_message}), 500 if primary_package_exists else 404
try:
sd_data, _ = services.find_and_extract_sd(core_tgz_path, resource_type, profile_url=profile_url, include_narrative=include_narrative, raw=raw)
if sd_data is not None:
fallback_used = True
source_package_id = services.CANONICAL_PACKAGE_ID
base_resource_type_for_sp = sd_data.get('type')
logger.info(f"Found SD for '{resource_type}' in fallback package {source_package_id}. Base type: '{base_resource_type_for_sp}'")
except Exception as e:
logger.error(f"Unexpected error extracting SD '{resource_type}' from fallback {core_tgz_path}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error reading fallback StructureDefinition: {str(e)}"}), 500
if not sd_data:
logger.error(f"SD for '{resource_type}' could not be found in primary or fallback packages.")
return jsonify({"error": f"StructureDefinition for '{resource_type}' not found."}), 404
if raw:
return Response(json.dumps(sd_data, indent=None, separators=(',', ':')), mimetype='application/json')
snapshot_elements = sd_data.get('snapshot', {}).get('element', [])
differential_elements = sd_data.get('differential', {}).get('element', [])
differential_ids = {el.get('id') for el in differential_elements if el.get('id')}
logger.debug(f"Found {len(differential_ids)} unique IDs in differential.")
enriched_elements = []
if snapshot_elements:
logger.debug(f"Processing {len(snapshot_elements)} snapshot elements to add isInDifferential flag.")
for element in snapshot_elements:
element_id = element.get('id')
element['isInDifferential'] = bool(element_id and element_id in differential_ids)
enriched_elements.append(element)
enriched_elements = [services.remove_narrative(el, include_narrative=include_narrative) for el in enriched_elements]
else:
logger.warning(f"No snapshot found for {resource_type} in {source_package_id}. Returning empty element list.")
enriched_elements = []
must_support_paths = []
processed_ig_record = ProcessedIg.query.filter_by(package_name=package_name, version=version).first()
if processed_ig_record and processed_ig_record.must_support_elements:
ms_elements_dict = processed_ig_record.must_support_elements
must_support_paths = ms_elements_dict.get(resource_type, [])
if not must_support_paths and base_resource_type_for_sp:
must_support_paths = ms_elements_dict.get(base_resource_type_for_sp, [])
if must_support_paths:
logger.debug(f"Retrieved {len(must_support_paths)} MS paths using base type key '{base_resource_type_for_sp}' from DB.")
elif must_support_paths:
logger.debug(f"Retrieved {len(must_support_paths)} MS paths using profile key '{resource_type}' from DB.")
else:
logger.debug(f"No specific MS paths found for keys '{resource_type}' or '{base_resource_type_for_sp}' in DB.")
else:
logger.debug(f"No processed IG record or no must_support_elements found in DB for {package_name}#{version}")
if base_resource_type_for_sp and primary_package_exists:
try:
logger.info(f"Fetching SearchParameters for base type '{base_resource_type_for_sp}' from primary package {tgz_path}")
search_params_data = services.find_and_extract_search_params(tgz_path, base_resource_type_for_sp)
except Exception as e:
logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from primary package {tgz_path}: {e}", exc_info=True)
search_params_data = []
elif not primary_package_exists:
logger.warning(f"Original package {tgz_path} not found, cannot search it for specific SearchParameters.")
elif not base_resource_type_for_sp:
logger.warning(f"Base resource type could not be determined for '{resource_type}', cannot search for SearchParameters.")
if not search_params_data and base_resource_type_for_sp and core_package_exists:
logger.info(f"No relevant SearchParameters found in primary package for '{base_resource_type_for_sp}'. Searching core package {core_tgz_path}.")
try:
search_params_data = services.find_and_extract_search_params(core_tgz_path, base_resource_type_for_sp)
if search_params_data:
logger.info(f"Found {len(search_params_data)} SearchParameters for '{base_resource_type_for_sp}' in core package.")
except Exception as e:
logger.error(f"Error extracting SearchParameters for '{base_resource_type_for_sp}' from core package {core_tgz_path}: {e}", exc_info=True)
search_params_data = []
elif not search_params_data and not core_package_exists:
logger.warning(f"Core package {core_tgz_path} not found, cannot perform fallback search for SearchParameters.")
search_param_conformance_rules = {}
if base_resource_type_for_sp:
if processed_ig_record:
if hasattr(processed_ig_record, 'search_param_conformance') and processed_ig_record.search_param_conformance:
all_conformance_data = processed_ig_record.search_param_conformance
search_param_conformance_rules = all_conformance_data.get(base_resource_type_for_sp, {})
logger.debug(f"Retrieved conformance rules for {base_resource_type_for_sp} from DB: {search_param_conformance_rules}")
else:
logger.warning(f"ProcessedIg record found, but 'search_param_conformance' attribute/data is missing or empty for {package_name}#{version}.")
else:
logger.warning(f"No ProcessedIg record found for {package_name}#{version} to get conformance rules.")
if search_params_data:
logger.debug(f"Merging conformance data into {len(search_params_data)} search parameters.")
for param in search_params_data:
param_code = param.get('code')
if param_code:
conformance_level = search_param_conformance_rules.get(param_code, 'Optional')
param['conformance'] = conformance_level
else:
param['conformance'] = 'Unknown'
logger.debug("Finished merging conformance data.")
else:
logger.debug(f"No search parameters found for {base_resource_type_for_sp} to merge conformance data into.")
else:
logger.warning(f"Cannot fetch conformance data because base resource type (e.g., Patient) for '{resource_type}' could not be determined.")
for param in search_params_data:
if 'conformance' not in param or param['conformance'] == 'N/A':
param['conformance'] = 'Optional'
response_data = {
'elements': enriched_elements,
'must_support_paths': must_support_paths,
'search_parameters': search_params_data,
'fallback_used': fallback_used,
'source_package': source_package_id
}
return Response(json.dumps(response_data, indent=None, separators=(',', ':')), mimetype='application/json')
@app.route('/get-package-metadata')
def get_package_metadata():
package_name = request.args.get('package_name')

Binary file not shown.

View File

@ -18,5 +18,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:45.070781+00:00"
"timestamp": "2025-04-26T06:09:44.307543+00:00"
}

View File

@ -30,5 +30,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:20.523471+00:00"
"timestamp": "2025-04-26T06:09:01.788251+00:00"
}

View File

@ -5,5 +5,5 @@
"imported_dependencies": [],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:29.230227+00:00"
"timestamp": "2025-04-26T06:09:30.920844+00:00"
}

View File

@ -10,5 +10,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:41.588025+00:00"
"timestamp": "2025-04-26T06:09:40.785525+00:00"
}

View File

@ -18,5 +18,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:49.395594+00:00"
"timestamp": "2025-04-26T06:09:48.451981+00:00"
}

View File

@ -10,5 +10,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:56.492512+00:00"
"timestamp": "2025-04-26T06:09:54.450933+00:00"
}

View File

@ -14,5 +14,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:46.943079+00:00"
"timestamp": "2025-04-26T06:09:46.089555+00:00"
}

View File

@ -10,5 +10,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:54.857273+00:00"
"timestamp": "2025-04-26T06:09:52.965617+00:00"
}

View File

@ -10,5 +10,5 @@
],
"complies_with_profiles": [],
"imposed_profiles": [],
"timestamp": "2025-04-17T04:04:37.703082+00:00"
"timestamp": "2025-04-26T06:09:36.852059+00:00"
}

View File

@ -1,6 +1,6 @@
#FileLock
#Fri Apr 25 13:11:59 UTC 2025
server=172.19.0.2\:35493
hostName=499bb2429005
#Sat Apr 26 12:19:30 UTC 2025
server=172.18.0.2\:38385
hostName=18113a0f20a7
method=file
id=1966d138790b0be6a4873c639ee5ac2e23787fd766d
id=1967209d290a29cebe723a4809b58c3cf6c80af4585

Binary file not shown.

File diff suppressed because it is too large Load Diff

1
logs/supervisord.pid Normal file
View File

@ -0,0 +1 @@
1

10720
logs/tomcat.log Normal file

File diff suppressed because it is too large Load Diff

View File

@ -244,14 +244,18 @@ def parse_package_filename(filename):
version = ""
return name, version
def remove_narrative(resource):
"""Remove narrative text element from a FHIR resource."""
if isinstance(resource, dict):
def remove_narrative(resource, include_narrative=False):
"""Remove narrative text element from a FHIR resource if not including narrative."""
if isinstance(resource, dict) and not include_narrative:
if 'text' in resource:
logger.debug(f"Removing narrative text from resource: {resource.get('resourceType', 'unknown')}")
del resource['text']
if resource.get('resourceType') == 'Bundle' and 'entry' in resource:
resource['entry'] = [dict(entry, resource=remove_narrative(entry.get('resource'))) if entry.get('resource') else entry for entry in resource['entry']]
resource['entry'] = [
dict(entry, resource=remove_narrative(entry.get('resource'), include_narrative))
if entry.get('resource') else entry
for entry in resource['entry']
]
return resource
def get_cached_structure(package_name, package_version, resource_type, view):
@ -300,7 +304,7 @@ def cache_structure(package_name, package_version, resource_type, view, structur
except Exception as e:
logger.error(f"Error caching structure: {e}", exc_info=True)
def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None, include_narrative=False, raw=False):
"""Helper to find and extract StructureDefinition json from a tgz path, prioritizing profile match."""
sd_data = None
found_path = None
@ -310,25 +314,19 @@ def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
try:
with tarfile.open(tgz_path, "r:gz") as tar:
logger.debug(f"Searching for SD matching '{resource_identifier}' with profile '{profile_url}' in {os.path.basename(tgz_path)}")
# Store potential matches to evaluate the best one at the end
potential_matches = [] # Store tuples of (precision_score, data, member_name)
potential_matches = []
for member in tar:
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
continue
# Skip common metadata files
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
continue
fileobj = None
try:
fileobj = tar.extractfile(member)
if fileobj:
content_bytes = fileobj.read()
# Handle potential BOM (Byte Order Mark)
content_string = content_bytes.decode('utf-8-sig')
data = json.loads(content_string)
if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition':
sd_id = data.get('id')
sd_name = data.get('name')
@ -337,57 +335,32 @@ def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
sd_filename_base = os.path.splitext(os.path.basename(member.name))[0]
sd_filename_lower = sd_filename_base.lower()
resource_identifier_lower = resource_identifier.lower() if resource_identifier else None
# logger.debug(f"Checking SD: id={sd_id}, name={sd_name}, type={sd_type}, url={sd_url}, file={sd_filename_lower} against identifier='{resource_identifier}'")
match_score = 0 # Higher score means more precise match
# Highest precision: Exact match on profile_url
match_score = 0
if profile_url and sd_url == profile_url:
match_score = 5
logger.debug(f"Exact match found based on profile_url: {profile_url}")
# If we find the exact profile URL, this is the best possible match.
sd_data = remove_narrative(data)
sd_data = remove_narrative(data, include_narrative)
found_path = member.name
logger.info(f"Found definitive SD matching profile '{profile_url}' at path: {found_path}. Stopping search.")
break # Stop searching immediately
# Next highest precision: Exact match on id or name
logger.info(f"Found definitive SD matching profile '{profile_url}' at path: {found_path}")
break
elif resource_identifier_lower:
if sd_id and resource_identifier_lower == sd_id.lower():
match_score = 4
logger.debug(f"Match found based on exact sd_id: {sd_id}")
elif sd_name and resource_identifier_lower == sd_name.lower():
match_score = 4
logger.debug(f"Match found based on exact sd_name: {sd_name}")
# Next: Match filename pattern "StructureDefinition-{identifier}.json"
match_score = 4
elif sd_filename_lower == f"structuredefinition-{resource_identifier_lower}":
match_score = 3
logger.debug(f"Match found based on exact filename pattern: {member.name}")
# Next: Match on type ONLY if the identifier looks like a base type (no hyphens/dots)
match_score = 3
elif sd_type and resource_identifier_lower == sd_type.lower() and not re.search(r'[-.]', resource_identifier):
match_score = 2
logger.debug(f"Match found based on sd_type (simple identifier): {sd_type}")
# Lower precision: Check if identifier is IN the filename
match_score = 2
elif resource_identifier_lower in sd_filename_lower:
match_score = 1
logger.debug(f"Potential match based on identifier in filename: {member.name}")
# Lowest precision: Check if identifier is IN the URL
match_score = 1
elif sd_url and resource_identifier_lower in sd_url.lower():
match_score = 1
logger.debug(f"Potential match based on identifier in url: {sd_url}")
match_score = 1
if match_score > 0:
potential_matches.append((match_score, remove_narrative(data), member.name))
# If it's a very high precision match, we can potentially break early
if match_score >= 3: # Exact ID, Name, or Filename pattern
logger.info(f"Found high-confidence match for '{resource_identifier}' ({member.name}), stopping search.")
# Set sd_data here and break
sd_data = remove_narrative(data)
found_path = member.name
break
potential_matches.append((match_score, remove_narrative(data, include_narrative), member.name))
if match_score >= 3:
sd_data = remove_narrative(data, include_narrative)
found_path = member.name
break
except json.JSONDecodeError as e:
logger.debug(f"Could not parse JSON in {member.name}, skipping: {e}")
except UnicodeDecodeError as e:
@ -395,36 +368,38 @@ def find_and_extract_sd(tgz_path, resource_identifier, profile_url=None):
except tarfile.TarError as e:
logger.warning(f"Tar error reading member {member.name}, skipping: {e}")
except Exception as e:
logger.warning(f"Could not read/parse potential SD {member.name}, skipping: {e}", exc_info=False)
logger.warning(f"Could not read/parse potential SD {member.name}, skipping: {e}")
finally:
if fileobj:
fileobj.close()
# If the loop finished without finding an exact profile_url or high-confidence match (score >= 3)
if not sd_data and potential_matches:
# Sort potential matches by score (highest first)
potential_matches.sort(key=lambda x: x[0], reverse=True)
best_match = potential_matches[0]
sd_data = best_match[1]
found_path = best_match[2]
logger.info(f"Selected best match for '{resource_identifier}' from potential matches (Score: {best_match[0]}): {found_path}")
if sd_data is None:
logger.info(f"SD matching identifier '{resource_identifier}' or profile '{profile_url}' not found within archive {os.path.basename(tgz_path)}")
elif raw:
# Return the full, unprocessed StructureDefinition JSON
with tarfile.open(tgz_path, "r:gz") as tar:
fileobj = tar.extractfile(found_path)
content_bytes = fileobj.read()
content_string = content_bytes.decode('utf-8-sig')
raw_data = json.loads(content_string)
return remove_narrative(raw_data, include_narrative), found_path
except tarfile.ReadError as e:
logger.error(f"Tar ReadError reading {tgz_path}: {e}")
return None, None
except tarfile.TarError as e:
logger.error(f"TarError reading {tgz_path} in find_and_extract_sd: {e}")
raise # Re-raise critical tar errors
raise
except FileNotFoundError:
logger.error(f"FileNotFoundError reading {tgz_path} in find_and_extract_sd.")
raise # Re-raise critical file errors
raise
except Exception as e:
logger.error(f"Unexpected error in find_and_extract_sd for {tgz_path}: {e}", exc_info=True)
raise # Re-raise unexpected errors
raise
return sd_data, found_path
# --- Metadata Saving/Loading ---
@ -2056,25 +2031,19 @@ def process_fhir_input(input_mode, fhir_file, fhir_text, alias_file=None):
# --- ADD THIS NEW FUNCTION TO services.py ---
def find_and_extract_search_params(tgz_path, base_resource_type):
"""
Finds and extracts SearchParameter resources relevant to a given base resource type
from a FHIR package tgz file.
"""
"""Finds and extracts SearchParameter resources relevant to a given base resource type from a FHIR package tgz file."""
search_params = []
if not tgz_path or not os.path.exists(tgz_path):
logger.error(f"Package file not found for SearchParameter extraction: {tgz_path}")
return search_params # Return empty list on error
return search_params
logger.debug(f"Searching for SearchParameters based on '{base_resource_type}' in {os.path.basename(tgz_path)}")
try:
with tarfile.open(tgz_path, "r:gz") as tar:
for member in tar:
# Basic filtering for JSON files in package directory, excluding common metadata
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
continue
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
continue
fileobj = None
try:
fileobj = tar.extractfile(member)
@ -2082,31 +2051,23 @@ def find_and_extract_search_params(tgz_path, base_resource_type):
content_bytes = fileobj.read()
content_string = content_bytes.decode('utf-8-sig')
data = json.loads(content_string)
# Check if it's a SearchParameter resource
if isinstance(data, dict) and data.get('resourceType') == 'SearchParameter':
# Check if the SearchParameter applies to the requested base resource type
sp_bases = data.get('base', []) # 'base' is a list of applicable resource types
sp_bases = data.get('base', [])
if base_resource_type in sp_bases:
# Extract relevant information
param_info = {
'id': data.get('id'),
'url': data.get('url'),
'name': data.get('name'),
'description': data.get('description'),
'code': data.get('code'), # The actual parameter name used in searches
'type': data.get('type'), # e.g., token, reference, date
'expression': data.get('expression'), # FHIRPath expression
'code': data.get('code'),
'type': data.get('type'),
'expression': data.get('expression'),
'base': sp_bases,
# NOTE: Conformance (mandatory/optional) usually comes from CapabilityStatement,
# which is not processed here. Add placeholders or leave out for now.
'conformance': 'N/A', # Placeholder
'is_mandatory': False # Placeholder
'conformance': 'N/A',
'is_mandatory': False
}
search_params.append(param_info)
logger.debug(f"Found relevant SearchParameter: {param_info.get('name')} (ID: {param_info.get('id')}) for base {base_resource_type}")
# --- Error handling for individual file processing ---
except json.JSONDecodeError as e:
logger.debug(f"Could not parse JSON for SearchParameter in {member.name}, skipping: {e}")
except UnicodeDecodeError as e:
@ -2118,8 +2079,6 @@ def find_and_extract_search_params(tgz_path, base_resource_type):
finally:
if fileobj:
fileobj.close()
# --- Error handling for opening/reading the tgz file ---
except tarfile.ReadError as e:
logger.error(f"Tar ReadError extracting SearchParameters from {tgz_path}: {e}")
except tarfile.TarError as e:
@ -2128,7 +2087,6 @@ def find_and_extract_search_params(tgz_path, base_resource_type):
logger.error(f"Package file not found during SearchParameter extraction: {tgz_path}")
except Exception as e:
logger.error(f"Unexpected error extracting SearchParameters from {tgz_path}: {e}", exc_info=True)
logger.info(f"Found {len(search_params)} SearchParameters relevant to '{base_resource_type}' in {os.path.basename(tgz_path)}")
return search_params
# --- END OF NEW FUNCTION ---

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff