import unittest import os import sys import json import tarfile import shutil import io import requests import time import subprocess from unittest.mock import patch, MagicMock, mock_open, call from flask import Flask, session from flask.testing import FlaskClient from datetime import datetime, timezone # Add the parent directory (/app) to sys.path APP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) if APP_DIR not in sys.path: sys.path.insert(0, APP_DIR) from app import app, db, ProcessedIg import services # Helper function to parse NDJSON stream def parse_ndjson(byte_stream): decoded_stream = byte_stream.decode('utf-8').strip() if not decoded_stream: return [] lines = decoded_stream.split('\n') return [json.loads(line) for line in lines if line.strip()] class DockerComposeContainer: """ A class that follows the Testcontainers pattern for managing Docker Compose environments. This implementation uses subprocess to call docker-compose directly since we're not installing the testcontainers-python package. """ def __init__(self, compose_file_path): """ Initialize with the path to the docker-compose.yml file Args: compose_file_path: Path to the docker-compose.yml file """ self.compose_file = compose_file_path self.compose_dir = os.path.dirname(os.path.abspath(compose_file_path)) self.containers_up = False self.service_ports = {} self._container_ids = {} def __enter__(self): """Start containers when entering context""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Stop containers when exiting context""" self.stop() def with_service_port(self, service_name, port): """ Map a service port (following the testcontainers builder pattern) Args: service_name: Name of the service in docker-compose.yml port: Port number to expose Returns: self for chaining """ self.service_ports[service_name] = port return self def start(self): """Start the Docker Compose environment""" if self.containers_up: return self print("Starting Docker Compose environment...") result = subprocess.run( ['docker-compose', '-f', self.compose_file, 'up', '-d'], cwd=self.compose_dir, capture_output=True, text=True ) if result.returncode != 0: error_msg = f"Failed to start Docker Compose environment: {result.stderr}" print(error_msg) raise RuntimeError(error_msg) # Store container IDs for later use self._get_container_ids() self.containers_up = True self._wait_for_services() return self def _get_container_ids(self): """Get the container IDs for all services""" result = subprocess.run( ['docker-compose', '-f', self.compose_file, 'ps', '-q'], cwd=self.compose_dir, capture_output=True, text=True ) if result.returncode != 0: return container_ids = result.stdout.strip().split('\n') if not container_ids: return # Get service names for each container for container_id in container_ids: if not container_id: continue inspect_result = subprocess.run( ['docker', 'inspect', '--format', '{{index .Config.Labels "com.docker.compose.service"}}', container_id], capture_output=True, text=True ) if inspect_result.returncode == 0: service_name = inspect_result.stdout.strip() self._container_ids[service_name] = container_id def get_container_id(self, service_name): """ Get the container ID for a specific service Args: service_name: Name of the service in docker-compose.yml Returns: Container ID as string or None if not found """ return self._container_ids.get(service_name) def get_service_host(self, service_name): """ Get the host for a specific service - for Docker Compose we just use localhost Args: service_name: Name of the service in docker-compose.yml Returns: Host as string (usually localhost) """ return "localhost" def get_service_url(self, service_name, path=""): """ Get the URL for a specific service Args: service_name: Name of the service in docker-compose.yml path: Optional path to append to the URL Returns: URL as string """ port = self.service_ports.get(service_name) if not port: raise ValueError(f"No port mapping defined for service {service_name}") url = f"http://{self.get_service_host(service_name)}:{port}" if path: # Ensure path starts with / if not path.startswith('/'): path = f"/{path}" url = f"{url}{path}" return url def get_logs(self, service_name): """ Get logs for a specific service Args: service_name: Name of the service in docker-compose.yml Returns: Logs as string """ container_id = self.get_container_id(service_name) if not container_id: return f"No container found for service {service_name}" result = subprocess.run( ['docker', 'logs', container_id], capture_output=True, text=True ) return result.stdout def stop(self): """Stop the Docker Compose environment""" if not self.containers_up: return print("Stopping Docker Compose environment...") result = subprocess.run( ['docker-compose', '-f', self.compose_file, 'down'], cwd=self.compose_dir, capture_output=True, text=True ) if result.returncode != 0: print(f"Warning: Error stopping Docker Compose: {result.stderr}") self.containers_up = False def _wait_for_services(self): """Wait for all services to be ready""" print("Waiting for services to be ready...") # Wait for HAPI FHIR server if 'fhir' in self.service_ports: self._wait_for_http_service( self.get_service_url('fhir', 'fhir/metadata'), "HAPI FHIR server" ) # Wait for FHIRFLARE application if 'fhirflare' in self.service_ports: self._wait_for_http_service( self.get_service_url('fhirflare'), "FHIRFLARE application" ) # Give additional time for services to stabilize time.sleep(5) def _wait_for_http_service(self, url, service_name, max_retries=30, retry_interval=2): """ Wait for an HTTP service to be ready Args: url: URL to check service_name: Name of the service for logging max_retries: Maximum number of retries retry_interval: Interval between retries in seconds """ for attempt in range(max_retries): try: response = requests.get(url, timeout=5) if response.status_code == 200: print(f"{service_name} is ready after {attempt + 1} attempts") return True except requests.RequestException: pass print(f"Waiting for {service_name} (attempt {attempt + 1}/{max_retries})...") time.sleep(retry_interval) print(f"Warning: {service_name} did not become ready in time") return False class TestFHIRFlareIGToolkit(unittest.TestCase): @classmethod def setUpClass(cls): # Define the Docker Compose container compose_file_path = os.path.join(os.path.dirname(__file__), 'docker-compose.yml') cls.container = DockerComposeContainer(compose_file_path) \ .with_service_port('fhir', 8080) \ .with_service_port('fhirflare', 5000) # Start the containers cls.container.start() # Configure app for testing app.config['TESTING'] = True app.config['WTF_CSRF_ENABLED'] = False app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:' cls.test_packages_dir = os.path.join(os.path.dirname(__file__), 'test_fhir_packages_temp') app.config['FHIR_PACKAGES_DIR'] = cls.test_packages_dir app.config['SECRET_KEY'] = 'test-secret-key' app.config['API_KEY'] = 'test-api-key' app.config['VALIDATE_IMPOSED_PROFILES'] = True app.config['DISPLAY_PROFILE_RELATIONSHIPS'] = True app.config['HAPI_FHIR_URL'] = cls.container.get_service_url('fhir', 'fhir') # Point to containerized HAPI FHIR cls.app_context = app.app_context() cls.app_context.push() db.create_all() cls.client = app.test_client() @classmethod def tearDownClass(cls): cls.app_context.pop() if os.path.exists(cls.test_packages_dir): shutil.rmtree(cls.test_packages_dir) # Stop Docker Compose environment cls.container.stop() def setUp(self): if os.path.exists(self.test_packages_dir): shutil.rmtree(self.test_packages_dir) os.makedirs(self.test_packages_dir, exist_ok=True) with self.app_context: for item in db.session.query(ProcessedIg).all(): db.session.delete(item) db.session.commit() def tearDown(self): pass # Helper Method def create_mock_tgz(self, filename, files_content): tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename) with tarfile.open(tgz_path, "w:gz") as tar: for name, content in files_content.items(): if isinstance(content, (dict, list)): data_bytes = json.dumps(content).encode('utf-8') elif isinstance(content, str): data_bytes = content.encode('utf-8') else: raise TypeError(f"Unsupported type for mock file '{name}': {type(content)}") file_io = io.BytesIO(data_bytes) tarinfo = tarfile.TarInfo(name=name) tarinfo.size = len(data_bytes) tarinfo.mtime = int(datetime.now(timezone.utc).timestamp()) tar.addfile(tarinfo, file_io) return tgz_path # --- Phase 1 Tests --- def test_01_navigate_fhir_path(self): resource = { "resourceType": "Patient", "name": [{"given": ["John"]}], "identifier": [{"system": "http://hl7.org/fhir/sid/us-ssn", "sliceName": "us-ssn"}], "extension": [{"url": "http://hl7.org/fhir/StructureDefinition/patient-birthPlace", "valueAddress": {"city": "Boston"}}] } self.assertEqual(services.navigate_fhir_path(resource, "Patient.name[0].given"), ["John"]) self.assertEqual(services.navigate_fhir_path(resource, "Patient.identifier:us-ssn.system"), "http://hl7.org/fhir/sid/us-ssn") self.assertEqual(services.navigate_fhir_path(resource, "Patient.extension", extension_url="http://hl7.org/fhir/StructureDefinition/patient-birthPlace")["valueAddress"]["city"], "Boston") with patch('fhirpath.evaluate', side_effect=Exception("fhirpath error")): self.assertEqual(services.navigate_fhir_path(resource, "Patient.name[0].given"), ["John"]) # --- Basic Page Rendering Tests --- def test_03_homepage(self): # Connect to the containerized application response = requests.get(self.container.get_service_url('fhirflare')) self.assertEqual(response.status_code, 200) self.assertIn('FHIRFLARE IG Toolkit', response.text) def test_04_import_ig_page(self): response = requests.get(self.container.get_service_url('fhirflare', 'import-ig')) self.assertEqual(response.status_code, 200) self.assertIn('Import IG', response.text) self.assertIn('Package Name', response.text) self.assertIn('Package Version', response.text) self.assertIn('name="dependency_mode"', response.text) # --- API Integration Tests --- def test_30_load_ig_to_hapi_integration(self): """Test loading an IG to the containerized HAPI FHIR server""" pkg_name = 'hl7.fhir.us.core' pkg_version = '6.1.0' filename = f'{pkg_name}-{pkg_version}.tgz' self.create_mock_tgz(filename, { 'package/package.json': {'name': pkg_name, 'version': pkg_version}, 'package/StructureDefinition-us-core-patient.json': { 'resourceType': 'StructureDefinition', 'id': 'us-core-patient', 'url': 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient', 'name': 'USCorePatientProfile', 'type': 'Patient', 'status': 'active' } }) # Load IG to HAPI response = self.client.post( '/api/load-ig-to-hapi', data=json.dumps({'package_name': pkg_name, 'version': pkg_version}), content_type='application/json', headers={'X-API-Key': 'test-api-key'} ) self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertEqual(data['status'], 'success') # Verify the resource was loaded by querying the HAPI FHIR server directly hapi_response = requests.get(self.container.get_service_url('fhir', 'fhir/StructureDefinition/us-core-patient')) self.assertEqual(hapi_response.status_code, 200) resource = hapi_response.json() self.assertEqual(resource['resourceType'], 'StructureDefinition') self.assertEqual(resource['id'], 'us-core-patient') def test_31_validate_sample_with_hapi_integration(self): """Test validating a sample against the containerized HAPI FHIR server""" # First, load the necessary StructureDefinition pkg_name = 'hl7.fhir.us.core' pkg_version = '6.1.0' filename = f'{pkg_name}-{pkg_version}.tgz' self.create_mock_tgz(filename, { 'package/package.json': {'name': pkg_name, 'version': pkg_version}, 'package/StructureDefinition-us-core-patient.json': { 'resourceType': 'StructureDefinition', 'id': 'us-core-patient', 'url': 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient', 'name': 'USCorePatientProfile', 'type': 'Patient', 'status': 'active', 'snapshot': { 'element': [ {'path': 'Patient', 'min': 1, 'max': '1'}, {'path': 'Patient.name', 'min': 1, 'max': '*'}, {'path': 'Patient.identifier', 'min': 0, 'max': '*', 'mustSupport': True} ] } } }) # Load IG to HAPI self.client.post( '/api/load-ig-to-hapi', data=json.dumps({'package_name': pkg_name, 'version': pkg_version}), content_type='application/json', headers={'X-API-Key': 'test-api-key'} ) # Validate a sample that's missing a required element sample_resource = { 'resourceType': 'Patient', 'id': 'test-patient', 'meta': {'profile': ['http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient']} # Missing required 'name' element } response = self.client.post( '/api/validate-sample', data=json.dumps({ 'package_name': pkg_name, 'version': pkg_version, 'sample_data': json.dumps(sample_resource), 'mode': 'single', 'include_dependencies': True }), content_type='application/json', headers={'X-API-Key': 'test-api-key'} ) self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertFalse(data['valid']) # Check for validation error related to missing name found_name_error = any('name' in error for error in data['errors']) self.assertTrue(found_name_error, f"Expected error about missing name element, got: {data['errors']}") def test_32_push_ig_to_hapi_integration(self): """Test pushing multiple resources from an IG to the containerized HAPI FHIR server""" pkg_name = 'test.push.pkg' pkg_version = '1.0.0' filename = f'{pkg_name}-{pkg_version}.tgz' # Create a test package with multiple resources self.create_mock_tgz(filename, { 'package/package.json': {'name': pkg_name, 'version': pkg_version}, 'package/Patient-test1.json': { 'resourceType': 'Patient', 'id': 'test1', 'name': [{'family': 'Test', 'given': ['Patient']}] }, 'package/Observation-test1.json': { 'resourceType': 'Observation', 'id': 'test1', 'status': 'final', 'code': {'coding': [{'system': 'http://loinc.org', 'code': '12345-6'}]} } }) # Push the IG to HAPI response = self.client.post( '/api/push-ig', data=json.dumps({ 'package_name': pkg_name, 'version': pkg_version, 'fhir_server_url': self.container.get_service_url('fhir', 'fhir'), 'include_dependencies': False }), content_type='application/json', headers={'X-API-Key': 'test-api-key', 'Accept': 'application/x-ndjson'} ) self.assertEqual(response.status_code, 200) streamed_data = parse_ndjson(response.data) complete_msg = next((item for item in streamed_data if item.get('type') == 'complete'), None) self.assertIsNotNone(complete_msg, "Complete message not found in streamed response") summary = complete_msg.get('data', {}) self.assertTrue(summary.get('success_count') >= 2, f"Expected at least 2 successful resources, got {summary.get('success_count')}") # Verify resources were loaded by querying the HAPI FHIR server directly patient_response = requests.get(self.container.get_service_url('fhir', 'fhir/Patient/test1')) self.assertEqual(patient_response.status_code, 200) patient = patient_response.json() self.assertEqual(patient['resourceType'], 'Patient') self.assertEqual(patient['id'], 'test1') observation_response = requests.get(self.container.get_service_url('fhir', 'fhir/Observation/test1')) self.assertEqual(observation_response.status_code, 200) observation = observation_response.json() self.assertEqual(observation['resourceType'], 'Observation') self.assertEqual(observation['id'], 'test1') # --- Existing API Tests --- @patch('app.list_downloaded_packages') @patch('app.services.process_package_file') @patch('app.services.import_package_and_dependencies') @patch('os.path.exists') def test_40_api_import_ig_success(self, mock_os_exists, mock_import, mock_process, mock_list_pkgs): pkg_name = 'api.test.pkg' pkg_version = '1.2.3' filename = f'{pkg_name}-{pkg_version}.tgz' pkg_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename) mock_import.return_value = {'requested': (pkg_name, pkg_version), 'processed': {(pkg_name, pkg_version)}, 'downloaded': {(pkg_name, pkg_version): pkg_path}, 'all_dependencies': {}, 'dependencies': [], 'errors': []} mock_process.return_value = {'resource_types_info': [], 'must_support_elements': {}, 'examples': {}, 'complies_with_profiles': ['http://prof.com/a'], 'imposed_profiles': [], 'errors': []} mock_os_exists.return_value = True mock_list_pkgs.return_value = ([{'name': pkg_name, 'version': pkg_version, 'filename': filename}], [], {}) response = self.client.post( '/api/import-ig', data=json.dumps({'package_name': pkg_name, 'version': pkg_version, 'dependency_mode': 'direct', 'api_key': 'test-api-key'}), content_type='application/json' ) self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertEqual(data['status'], 'success') self.assertEqual(data['complies_with_profiles'], ['http://prof.com/a']) @patch('app.services.import_package_and_dependencies') def test_41_api_import_ig_failure(self, mock_import): mock_import.return_value = {'requested': ('bad.pkg', '1.0'), 'processed': set(), 'downloaded': {}, 'all_dependencies': {}, 'dependencies': [], 'errors': ['HTTP error: 404 Not Found']} response = self.client.post( '/api/import-ig', data=json.dumps({'package_name': 'bad.pkg', 'version': '1.0', 'api_key': 'test-api-key'}), content_type='application/json' ) self.assertEqual(response.status_code, 404) data = json.loads(response.data) self.assertIn('Failed to import bad.pkg#1.0: HTTP error: 404 Not Found', data['message']) def test_42_api_import_ig_invalid_key(self): response = self.client.post( '/api/import-ig', data=json.dumps({'package_name': 'a', 'version': '1', 'api_key': 'wrong'}), content_type='application/json' ) self.assertEqual(response.status_code, 401) def test_43_api_import_ig_missing_key(self): response = self.client.post( '/api/import-ig', data=json.dumps({'package_name': 'a', 'version': '1'}), content_type='application/json' ) self.assertEqual(response.status_code, 401) # --- API Push Tests --- @patch('os.path.exists', return_value=True) @patch('app.services.get_package_metadata') @patch('tarfile.open') @patch('requests.Session') def test_50_api_push_ig_success(self, mock_session, mock_tarfile_open, mock_get_metadata, mock_os_exists): pkg_name = 'push.test.pkg' pkg_version = '1.0.0' filename = f'{pkg_name}-{pkg_version}.tgz' fhir_server_url = self.container.get_service_url('fhir', 'fhir') mock_get_metadata.return_value = {'imported_dependencies': []} mock_tar = MagicMock() mock_patient = {'resourceType': 'Patient', 'id': 'pat1'} mock_obs = {'resourceType': 'Observation', 'id': 'obs1', 'status': 'final'} patient_member = MagicMock(spec=tarfile.TarInfo) patient_member.name = 'package/Patient-pat1.json' patient_member.isfile.return_value = True obs_member = MagicMock(spec=tarfile.TarInfo) obs_member.name = 'package/Observation-obs1.json' obs_member.isfile.return_value = True mock_tar.getmembers.return_value = [patient_member, obs_member] def mock_extractfile(member): if member.name == 'package/Patient-pat1.json': return io.BytesIO(json.dumps(mock_patient).encode('utf-8')) if member.name == 'package/Observation-obs1.json': return io.BytesIO(json.dumps(mock_obs).encode('utf-8')) return None mock_tar.extractfile.side_effect = mock_extractfile mock_tarfile_open.return_value.__enter__.return_value = mock_tar mock_session_instance = MagicMock() mock_put_response = MagicMock(status_code=200) mock_put_response.raise_for_status.return_value = None mock_session_instance.put.return_value = mock_put_response mock_session.return_value = mock_session_instance self.create_mock_tgz(filename, {'package/dummy.txt': 'content'}) response = self.client.post( '/api/push-ig', data=json.dumps({ 'package_name': pkg_name, 'version': pkg_version, 'fhir_server_url': fhir_server_url, 'include_dependencies': False, 'api_key': 'test-api-key' }), content_type='application/json', headers={'X-API-Key': 'test-api-key', 'Accept': 'application/x-ndjson'} ) self.assertEqual(response.status_code, 200) self.assertEqual(response.mimetype, 'application/x-ndjson') streamed_data = parse_ndjson(response.data) complete_msg = next((item for item in streamed_data if item.get('type') == 'complete'), None) self.assertIsNotNone(complete_msg) summary = complete_msg.get('data', {}) self.assertEqual(summary.get('status'), 'success') self.assertEqual(summary.get('success_count'), 2) self.assertEqual(len(summary.get('failed_details')), 0) mock_os_exists.assert_called_with(os.path.join(self.test_packages_dir, filename)) # --- Helper method to debug container issues --- def test_99_print_container_logs_on_failure(self): """Helper test that prints container logs in case of failures""" # This test should always pass but will print logs if other tests fail try: if hasattr(self, 'container') and self.container.containers_up: for service_name in ['fhir', 'db', 'fhirflare']: if service_name in self.container._container_ids: print(f"\n=== Logs for {service_name} ===") print(self.container.get_logs(service_name)) except Exception as e: print(f"Error getting container logs: {e}") # This assertion always passes - this test is just for debug info self.assertTrue(True) if __name__ == '__main__': unittest.main()