mirror of
https://github.com/Sudo-JHare/FHIRFLARE-IG-Toolkit.git
synced 2025-06-15 17:20:00 +00:00
588 lines
28 KiB
Python
588 lines
28 KiB
Python
from flask import Flask, render_template, render_template_string, request, redirect, url_for, flash, jsonify, Response
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import StringField, SubmitField
|
|
from wtforms.validators import DataRequired, Regexp
|
|
import os
|
|
import tarfile
|
|
import json
|
|
from datetime import datetime
|
|
import services
|
|
import logging
|
|
import requests
|
|
import re # Added for regex validation
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'your-secret-key-here'
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////app/instance/fhir_ig.db'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['FHIR_PACKAGES_DIR'] = os.path.join(app.instance_path, 'fhir_packages')
|
|
app.config['API_KEY'] = 'your-api-key-here' # Hardcoded API key for now; replace with a secure solution
|
|
|
|
# Ensure directories exist and are writable
|
|
instance_path = '/app/instance'
|
|
db_path = os.path.join(instance_path, 'fhir_ig.db')
|
|
packages_path = app.config['FHIR_PACKAGES_DIR']
|
|
|
|
logger.debug(f"Instance path: {instance_path}")
|
|
logger.debug(f"Database path: {db_path}")
|
|
logger.debug(f"Packages path: {packages_path}")
|
|
|
|
try:
|
|
os.makedirs(instance_path, exist_ok=True)
|
|
os.makedirs(packages_path, exist_ok=True)
|
|
os.chmod(instance_path, 0o777)
|
|
os.chmod(packages_path, 0o777)
|
|
logger.debug(f"Directories created: {os.listdir('/app')}")
|
|
logger.debug(f"Instance contents: {os.listdir(instance_path)}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to create directories: {e}")
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/fhir_ig.db'
|
|
logger.warning("Falling back to /tmp/fhir_ig.db")
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
class IgImportForm(FlaskForm):
|
|
package_name = StringField('Package Name (e.g., hl7.fhir.us.core)', validators=[
|
|
DataRequired(),
|
|
Regexp(r'^[a-zAZ0-9]+(\.[a-zA-Z0-9]+)+$', message='Invalid package name format.')
|
|
])
|
|
package_version = StringField('Package Version (e.g., 1.0.0 or current)', validators=[
|
|
DataRequired(),
|
|
Regexp(r'^[a-zA-Z0-9\.\-]+$', message='Invalid version format.')
|
|
])
|
|
submit = SubmitField('Fetch & Download IG')
|
|
|
|
class ProcessedIg(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
package_name = db.Column(db.String(128), nullable=False)
|
|
version = db.Column(db.String(32), nullable=False)
|
|
processed_date = db.Column(db.DateTime, nullable=False)
|
|
resource_types_info = db.Column(db.JSON, nullable=False)
|
|
must_support_elements = db.Column(db.JSON, nullable=True)
|
|
examples = db.Column(db.JSON, nullable=True)
|
|
|
|
# Middleware to check API key
|
|
def check_api_key():
|
|
api_key = request.json.get('api_key') if request.is_json else None
|
|
if not api_key:
|
|
logger.error("API key missing in request")
|
|
return jsonify({"status": "error", "message": "API key missing"}), 401
|
|
if api_key != app.config['API_KEY']:
|
|
logger.error(f"Invalid API key provided: {api_key}")
|
|
return jsonify({"status": "error", "message": "Invalid API key"}), 401
|
|
logger.debug("API key validated successfully")
|
|
return None
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html', site_name='FHIRFLARE IG Toolkit', now=datetime.now())
|
|
|
|
@app.route('/import-ig', methods=['GET', 'POST'])
|
|
def import_ig():
|
|
form = IgImportForm()
|
|
if form.validate_on_submit():
|
|
name = form.package_name.data
|
|
version = form.package_version.data
|
|
try:
|
|
result = services.import_package_and_dependencies(name, version)
|
|
if result['errors'] and not result['downloaded']:
|
|
flash(f"Failed to import {name}#{version}: {result['errors'][0]}", "error")
|
|
return redirect(url_for('import_ig'))
|
|
flash(f"Successfully downloaded {name}#{version} and dependencies!", "success")
|
|
return redirect(url_for('view_igs'))
|
|
except Exception as e:
|
|
flash(f"Error downloading IG: {str(e)}", "error")
|
|
return render_template('import_ig.html', form=form, site_name='FLARE FHIR IG Toolkit', now=datetime.now())
|
|
|
|
@app.route('/view-igs')
|
|
def view_igs():
|
|
igs = ProcessedIg.query.all()
|
|
processed_ids = {(ig.package_name, ig.version) for ig in igs}
|
|
|
|
packages = []
|
|
packages_dir = app.config['FHIR_PACKAGES_DIR']
|
|
logger.debug(f"Scanning packages directory: {packages_dir}")
|
|
if os.path.exists(packages_dir):
|
|
for filename in os.listdir(packages_dir):
|
|
if filename.endswith('.tgz'):
|
|
# Split on the last hyphen to separate name and version
|
|
last_hyphen_index = filename.rfind('-')
|
|
if last_hyphen_index != -1 and filename.endswith('.tgz'):
|
|
name = filename[:last_hyphen_index]
|
|
version = filename[last_hyphen_index + 1:-4] # Remove .tgz
|
|
# Validate that the version looks reasonable (e.g., starts with a digit or is a known keyword)
|
|
if version[0].isdigit() or version in ('preview', 'current', 'latest'):
|
|
# Replace underscores with dots to match FHIR package naming convention
|
|
name = name.replace('_', '.')
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
else:
|
|
# Fallback: treat as name only, log warning
|
|
name = filename[:-4]
|
|
version = ''
|
|
logger.warning(f"Could not parse version from {filename}, treating as name only")
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
else:
|
|
# Fallback: treat as name only, log warning
|
|
name = filename[:-4]
|
|
version = ''
|
|
logger.warning(f"Could not parse version from {filename}, treating as name only")
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
logger.debug(f"Found packages: {packages}")
|
|
else:
|
|
logger.warning(f"Packages directory not found: {packages_dir}")
|
|
|
|
# Calculate duplicate_names
|
|
duplicate_names = {}
|
|
for pkg in packages:
|
|
name = pkg['name']
|
|
if name not in duplicate_names:
|
|
duplicate_names[name] = []
|
|
duplicate_names[name].append(pkg)
|
|
|
|
# Calculate duplicate_groups
|
|
duplicate_groups = {}
|
|
for name, pkgs in duplicate_names.items():
|
|
if len(pkgs) > 1: # Only include packages with multiple versions
|
|
duplicate_groups[name] = [pkg['version'] for pkg in pkgs]
|
|
|
|
# Precompute group colors
|
|
colors = ['bg-warning', 'bg-info', 'bg-success', 'bg-danger']
|
|
group_colors = {}
|
|
for i, name in enumerate(duplicate_groups.keys()):
|
|
group_colors[name] = colors[i % len(colors)]
|
|
|
|
return render_template('cp_downloaded_igs.html', packages=packages, processed_list=igs,
|
|
processed_ids=processed_ids, duplicate_names=duplicate_names,
|
|
duplicate_groups=duplicate_groups, group_colors=group_colors,
|
|
site_name='FLARE FHIR IG Toolkit', now=datetime.now())
|
|
|
|
@app.route('/push-igs', methods=['GET', 'POST'])
|
|
def push_igs():
|
|
igs = ProcessedIg.query.all()
|
|
processed_ids = {(ig.package_name, ig.version) for ig in igs}
|
|
|
|
packages = []
|
|
packages_dir = app.config['FHIR_PACKAGES_DIR']
|
|
logger.debug(f"Scanning packages directory: {packages_dir}")
|
|
if os.path.exists(packages_dir):
|
|
for filename in os.listdir(packages_dir):
|
|
if filename.endswith('.tgz'):
|
|
# Split on the last hyphen to separate name and version
|
|
last_hyphen_index = filename.rfind('-')
|
|
if last_hyphen_index != -1 and filename.endswith('.tgz'):
|
|
name = filename[:last_hyphen_index]
|
|
version = filename[last_hyphen_index + 1:-4] # Remove .tgz
|
|
# Validate that the version looks reasonable (e.g., starts with a digit or is a known keyword)
|
|
if version[0].isdigit() or version in ('preview', 'current', 'latest'):
|
|
# Replace underscores with dots to match FHIR package naming convention
|
|
name = name.replace('_', '.')
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
else:
|
|
# Fallback: treat as name only, log warning
|
|
name = filename[:-4]
|
|
version = ''
|
|
logger.warning(f"Could not parse version from {filename}, treating as name only")
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
else:
|
|
# Fallback: treat as name only, log warning
|
|
name = filename[:-4]
|
|
version = ''
|
|
logger.warning(f"Could not parse version from {filename}, treating as name only")
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
logger.debug(f"Found packages: {packages}")
|
|
else:
|
|
logger.warning(f"Packages directory not found: {packages_dir}")
|
|
|
|
# Calculate duplicate_names
|
|
duplicate_names = {}
|
|
for pkg in packages:
|
|
name = pkg['name']
|
|
if name not in duplicate_names:
|
|
duplicate_names[name] = []
|
|
duplicate_names[name].append(pkg)
|
|
|
|
# Calculate duplicate_groups
|
|
duplicate_groups = {}
|
|
for name, pkgs in duplicate_names.items():
|
|
if len(pkgs) > 1: # Only include packages with multiple versions
|
|
duplicate_groups[name] = [pkg['version'] for pkg in pkgs]
|
|
|
|
# Precompute group colors
|
|
colors = ['bg-warning', 'bg-info', 'bg-success', 'bg-danger']
|
|
group_colors = {}
|
|
for i, name in enumerate(duplicate_groups.keys()):
|
|
group_colors[name] = colors[i % len(colors)]
|
|
|
|
return render_template('cp_push_igs.html', packages=packages, processed_list=igs,
|
|
processed_ids=processed_ids, duplicate_names=duplicate_names,
|
|
duplicate_groups=duplicate_groups, group_colors=group_colors,
|
|
site_name='FLARE FHIR IG Toolkit', now=datetime.now(),
|
|
api_key=app.config['API_KEY']) # Pass the API key to the template
|
|
|
|
@app.route('/process-igs', methods=['POST'])
|
|
def process_ig():
|
|
filename = request.form.get('filename')
|
|
if not filename or not filename.endswith('.tgz'):
|
|
flash("Invalid package file.", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename)
|
|
if not os.path.exists(tgz_path):
|
|
flash(f"Package file not found: {filename}", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
try:
|
|
# Parse name and version from filename
|
|
last_hyphen_index = filename.rfind('-')
|
|
if last_hyphen_index != -1 and filename.endswith('.tgz'):
|
|
name = filename[:last_hyphen_index]
|
|
version = filename[last_hyphen_index + 1:-4]
|
|
# Replace underscores with dots to match FHIR package naming convention
|
|
name = name.replace('_', '.')
|
|
else:
|
|
name = filename[:-4]
|
|
version = ''
|
|
logger.warning(f"Could not parse version from {filename} during processing")
|
|
package_info = services.process_package_file(tgz_path)
|
|
processed_ig = ProcessedIg(
|
|
package_name=name,
|
|
version=version,
|
|
processed_date=datetime.now(),
|
|
resource_types_info=package_info['resource_types_info'],
|
|
must_support_elements=package_info.get('must_support_elements'),
|
|
examples=package_info.get('examples')
|
|
)
|
|
db.session.add(processed_ig)
|
|
db.session.commit()
|
|
flash(f"Successfully processed {name}#{version}!", "success")
|
|
except Exception as e:
|
|
flash(f"Error processing IG: {str(e)}", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
@app.route('/delete-ig', methods=['POST'])
|
|
def delete_ig():
|
|
filename = request.form.get('filename')
|
|
if not filename or not filename.endswith('.tgz'):
|
|
flash("Invalid package file.", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename)
|
|
if os.path.exists(tgz_path):
|
|
try:
|
|
os.remove(tgz_path)
|
|
flash(f"Deleted {filename}", "success")
|
|
except Exception as e:
|
|
flash(f"Error deleting {filename}: {str(e)}", "error")
|
|
else:
|
|
flash(f"File not found: {filename}", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
@app.route('/unload-ig', methods=['POST'])
|
|
def unload_ig():
|
|
ig_id = request.form.get('ig_id')
|
|
if not ig_id:
|
|
flash("Invalid package ID.", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
processed_ig = ProcessedIg.query.get(ig_id)
|
|
if processed_ig:
|
|
try:
|
|
db.session.delete(processed_ig)
|
|
db.session.commit()
|
|
flash(f"Unloaded {processed_ig.package_name}#{processed_ig.version}", "success")
|
|
except Exception as e:
|
|
flash(f"Error unloading package: {str(e)}", "error")
|
|
else:
|
|
flash(f"Package not found with ID: {ig_id}", "error")
|
|
return redirect(url_for('view_igs'))
|
|
|
|
@app.route('/view-ig/<int:processed_ig_id>')
|
|
def view_ig(processed_ig_id):
|
|
processed_ig = ProcessedIg.query.get_or_404(processed_ig_id)
|
|
profile_list = [t for t in processed_ig.resource_types_info if t.get('is_profile')]
|
|
base_list = [t for t in processed_ig.resource_types_info if not t.get('is_profile')]
|
|
examples_by_type = processed_ig.examples or {}
|
|
return render_template('cp_view_processed_ig.html', title=f"View {processed_ig.package_name}#{processed_ig.version}",
|
|
processed_ig=processed_ig, profile_list=profile_list, base_list=base_list,
|
|
examples_by_type=examples_by_type, site_name='FLARE FHIR IG Toolkit', now=datetime.now())
|
|
|
|
@app.route('/get-structure')
|
|
def get_structure_definition():
|
|
package_name = request.args.get('package_name')
|
|
package_version = request.args.get('package_version')
|
|
resource_identifier = request.args.get('resource_type')
|
|
if not all([package_name, package_version, resource_identifier]):
|
|
return jsonify({"error": "Missing query parameters"}), 400
|
|
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], services._construct_tgz_filename(package_name, package_version))
|
|
if not os.path.exists(tgz_path):
|
|
return jsonify({"error": f"Package file not found: {tgz_path}"}), 404
|
|
sd_data, _ = services.find_and_extract_sd(tgz_path, resource_identifier)
|
|
if sd_data is None:
|
|
return jsonify({"error": f"SD for '{resource_identifier}' not found."}), 404
|
|
elements = sd_data.get('snapshot', {}).get('element', []) or sd_data.get('differential', {}).get('element', [])
|
|
processed_ig = ProcessedIg.query.filter_by(package_name=package_name, version=package_version).first()
|
|
must_support_paths = processed_ig.must_support_elements.get(resource_identifier, []) if processed_ig else []
|
|
return jsonify({"elements": elements, "must_support_paths": must_support_paths})
|
|
|
|
@app.route('/get-example')
|
|
def get_example_content():
|
|
package_name = request.args.get('package_name')
|
|
package_version = request.args.get('package_version')
|
|
example_member_path = request.args.get('filename')
|
|
if not all([package_name, package_version, example_member_path]):
|
|
return jsonify({"error": "Missing query parameters"}), 400
|
|
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], services._construct_tgz_filename(package_name, package_version))
|
|
if not os.path.exists(tgz_path):
|
|
return jsonify({"error": f"Package file not found: {tgz_path}"}), 404
|
|
if not example_member_path.startswith('package/') or '..' in example_member_path:
|
|
return jsonify({"error": "Invalid example file path."}), 400
|
|
try:
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
try:
|
|
example_member = tar.getmember(example_member_path)
|
|
except KeyError:
|
|
return jsonify({"error": f"Example file '{example_member_path}' not found."}), 404
|
|
with tar.extractfile(example_member) as example_fileobj:
|
|
content_bytes = example_fileobj.read()
|
|
return content_bytes.decode('utf-8-sig')
|
|
except tarfile.TarError as e:
|
|
return jsonify({"error": f"Error reading {tgz_path}: {e}"}), 500
|
|
|
|
# API Endpoint: Import IG Package
|
|
@app.route('/api/import-ig', methods=['POST'])
|
|
def api_import_ig():
|
|
# Check API key
|
|
auth_error = check_api_key()
|
|
if auth_error:
|
|
return auth_error
|
|
|
|
# Validate request
|
|
if not request.is_json:
|
|
return jsonify({"status": "error", "message": "Request must be JSON"}), 400
|
|
|
|
data = request.get_json()
|
|
package_name = data.get('package_name')
|
|
version = data.get('version')
|
|
|
|
if not package_name or not version:
|
|
return jsonify({"status": "error", "message": "Missing package_name or version"}), 400
|
|
|
|
# Validate package name and version format using re
|
|
if not (isinstance(package_name, str) and isinstance(version, str) and
|
|
re.match(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+$', package_name) and
|
|
re.match(r'^[a-zA-Z0-9\.\-]+$', version)):
|
|
return jsonify({"status": "error", "message": "Invalid package name or version format"}), 400
|
|
|
|
try:
|
|
# Import package and dependencies
|
|
result = services.import_package_and_dependencies(package_name, version)
|
|
if result['errors'] and not result['downloaded']:
|
|
return jsonify({"status": "error", "message": f"Failed to import {package_name}#{version}: {result['errors'][0]}"}), 500
|
|
|
|
# Check for duplicates
|
|
packages = []
|
|
packages_dir = app.config['FHIR_PACKAGES_DIR']
|
|
if os.path.exists(packages_dir):
|
|
for filename in os.listdir(packages_dir):
|
|
if filename.endswith('.tgz'):
|
|
# Split on the last hyphen to separate name and version
|
|
last_hyphen_index = filename.rfind('-')
|
|
if last_hyphen_index != -1 and filename.endswith('.tgz'):
|
|
name = filename[:last_hyphen_index]
|
|
version = filename[last_hyphen_index + 1:-4]
|
|
# Replace underscores with dots to match FHIR package naming convention
|
|
name = name.replace('_', '.')
|
|
if version[0].isdigit() or version in ('preview', 'current', 'latest'):
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
else:
|
|
name = filename[:-4]
|
|
version = ''
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
else:
|
|
name = filename[:-4]
|
|
version = ''
|
|
packages.append({'name': name, 'version': version, 'filename': filename})
|
|
|
|
# Calculate duplicates
|
|
duplicate_names = {}
|
|
for pkg in packages:
|
|
name = pkg['name']
|
|
if name not in duplicate_names:
|
|
duplicate_names[name] = []
|
|
duplicate_names[name].append(pkg)
|
|
|
|
duplicates = []
|
|
for name, pkgs in duplicate_names.items():
|
|
if len(pkgs) > 1:
|
|
versions = [pkg['version'] for pkg in pkgs]
|
|
duplicates.append(f"{name} (exists as {', '.join(versions)})")
|
|
|
|
# Deduplicate dependencies
|
|
seen = set()
|
|
unique_dependencies = []
|
|
for dep in result.get('dependencies', []):
|
|
dep_str = f"{dep['name']}#{dep['version']}"
|
|
if dep_str not in seen:
|
|
seen.add(dep_str)
|
|
unique_dependencies.append(dep_str)
|
|
|
|
# Prepare response
|
|
response = {
|
|
"status": "success",
|
|
"message": "Package imported successfully",
|
|
"package_name": package_name,
|
|
"version": version,
|
|
"dependencies": unique_dependencies,
|
|
"duplicates": duplicates
|
|
}
|
|
return jsonify(response), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in api_import_ig: {str(e)}")
|
|
return jsonify({"status": "error", "message": f"Error importing package: {str(e)}"}), 500
|
|
|
|
# API Endpoint: Push IG to FHIR Server with Streaming
|
|
@app.route('/api/push-ig', methods=['POST'])
|
|
def api_push_ig():
|
|
# Check API key
|
|
auth_error = check_api_key()
|
|
if auth_error:
|
|
return auth_error
|
|
|
|
# Validate request
|
|
if not request.is_json:
|
|
return jsonify({"status": "error", "message": "Request must be JSON"}), 400
|
|
|
|
data = request.get_json()
|
|
package_name = data.get('package_name')
|
|
version = data.get('version')
|
|
fhir_server_url = data.get('fhir_server_url')
|
|
include_dependencies = data.get('include_dependencies', True)
|
|
|
|
if not all([package_name, version, fhir_server_url]):
|
|
return jsonify({"status": "error", "message": "Missing package_name, version, or fhir_server_url"}), 400
|
|
|
|
# Validate package name and version format using re
|
|
if not (isinstance(package_name, str) and isinstance(version, str) and
|
|
re.match(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+$', package_name) and
|
|
re.match(r'^[a-zA-Z0-9\.\-]+$', version)):
|
|
return jsonify({"status": "error", "message": "Invalid package name or version format"}), 400
|
|
|
|
# Check if package exists
|
|
tgz_filename = services._construct_tgz_filename(package_name, version)
|
|
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], tgz_filename)
|
|
if not os.path.exists(tgz_path):
|
|
return jsonify({"status": "error", "message": f"Package not found: {package_name}#{version}"}), 404
|
|
|
|
def generate_stream():
|
|
try:
|
|
# Start message
|
|
yield json.dumps({"type": "start", "message": f"Starting push for {package_name}#{version}..."}) + "\n"
|
|
|
|
# Extract resources from the package
|
|
resources = []
|
|
with tarfile.open(tgz_path, "r:gz") as tar:
|
|
for member in tar.getmembers():
|
|
if member.name.startswith('package/') and member.name.endswith('.json'):
|
|
with tar.extractfile(member) as f:
|
|
resource_data = json.load(f)
|
|
if 'resourceType' in resource_data:
|
|
resources.append(resource_data)
|
|
|
|
# If include_dependencies is True, find and include dependencies
|
|
pushed_packages = [f"{package_name}#{version}"]
|
|
if include_dependencies:
|
|
yield json.dumps({"type": "progress", "message": "Processing dependencies..."}) + "\n"
|
|
# Re-import to get dependencies (simulating dependency resolution)
|
|
import_result = services.import_package_and_dependencies(package_name, version)
|
|
dependencies = import_result.get('dependencies', [])
|
|
for dep in dependencies:
|
|
dep_name = dep['name']
|
|
dep_version = dep['version']
|
|
dep_tgz_filename = services._construct_tgz_filename(dep_name, dep_version)
|
|
dep_tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], dep_tgz_filename)
|
|
if os.path.exists(dep_tgz_path):
|
|
with tarfile.open(dep_tgz_path, "r:gz") as tar:
|
|
for member in tar.getmembers():
|
|
if member.name.startswith('package/') and member.name.endswith('.json'):
|
|
with tar.extractfile(member) as f:
|
|
resource_data = json.load(f)
|
|
if 'resourceType' in resource_data:
|
|
resources.append(resource_data)
|
|
pushed_packages.append(f"{dep_name}#{dep_version}")
|
|
yield json.dumps({"type": "progress", "message": f"Added dependency {dep_name}#{dep_version}"}) + "\n"
|
|
else:
|
|
yield json.dumps({"type": "warning", "message": f"Dependency {dep_name}#{dep_version} not found, skipping"}) + "\n"
|
|
|
|
# Push resources to FHIR server
|
|
server_response = []
|
|
success_count = 0
|
|
failure_count = 0
|
|
total_resources = len(resources)
|
|
yield json.dumps({"type": "progress", "message": f"Found {total_resources} resources to upload"}) + "\n"
|
|
|
|
for i, resource in enumerate(resources, 1):
|
|
resource_type = resource.get('resourceType')
|
|
resource_id = resource.get('id')
|
|
if not resource_type or not resource_id:
|
|
yield json.dumps({"type": "warning", "message": f"Skipping invalid resource at index {i}"}) + "\n"
|
|
failure_count += 1
|
|
continue
|
|
|
|
# Construct the FHIR server URL for the resource
|
|
resource_url = f"{fhir_server_url.rstrip('/')}/{resource_type}/{resource_id}"
|
|
yield json.dumps({"type": "progress", "message": f"Uploading {resource_type}/{resource_id} ({i}/{total_resources})..."}) + "\n"
|
|
|
|
try:
|
|
response = requests.put(resource_url, json=resource, headers={'Content-Type': 'application/fhir+json'})
|
|
response.raise_for_status()
|
|
server_response.append(f"Uploaded {resource_type}/{resource_id} successfully")
|
|
yield json.dumps({"type": "success", "message": f"Uploaded {resource_type}/{resource_id} successfully"}) + "\n"
|
|
success_count += 1
|
|
except requests.exceptions.RequestException as e:
|
|
error_msg = f"Failed to upload {resource_type}/{resource_id}: {str(e)}"
|
|
server_response.append(error_msg)
|
|
yield json.dumps({"type": "error", "message": error_msg}) + "\n"
|
|
failure_count += 1
|
|
|
|
# Final summary
|
|
summary = {
|
|
"status": "success" if failure_count == 0 else "partial",
|
|
"message": f"Push completed: {success_count} resources uploaded, {failure_count} failed",
|
|
"package_name": package_name,
|
|
"version": version,
|
|
"pushed_packages": pushed_packages,
|
|
"server_response": "; ".join(server_response) if server_response else "No resources uploaded",
|
|
"success_count": success_count,
|
|
"failure_count": failure_count
|
|
}
|
|
yield json.dumps({"type": "complete", "data": summary}) + "\n"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in api_push_ig: {str(e)}")
|
|
error_response = {
|
|
"status": "error",
|
|
"message": f"Error pushing package: {str(e)}"
|
|
}
|
|
yield json.dumps({"type": "error", "message": error_response["message"]}) + "\n"
|
|
yield json.dumps({"type": "complete", "data": error_response}) + "\n"
|
|
|
|
return Response(generate_stream(), mimetype='application/x-ndjson')
|
|
|
|
with app.app_context():
|
|
logger.debug(f"Creating database at: {app.config['SQLALCHEMY_DATABASE_URI']}")
|
|
db.create_all()
|
|
logger.debug("Database initialization complete")
|
|
|
|
# Add route to serve favicon
|
|
@app.route('/favicon.ico')
|
|
def favicon():
|
|
return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico', mimetype='image/vnd.microsoft.icon')
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True) |