import os, io, json, base64, time, zipfile, datetime
import requests, boto3
from google.cloud import storage
from google.auth.transport.requests import Request
import google.auth
from google.auth.transport.requests import AuthorizedSession
import base64
import functions_framework
import pathlib
import textwrap
import json
from datetime import datetime, timedelta, timezone
PROJECT_ID = "pidtoo-fhir"
LOCATION = "us-east4"
DATASET_ID = "isc"
FHIR_STORE_ID = "fhir-omop"
GCS_EXPORT_BUCKET = "fhir-export-bucket"
AWS_BUCKET = "intersystems-fhir2omop"
AWS_REGION = "us-east-2"
def trigger_incremental_export(export_time_iso):
client = storage.Client()
bucket = client.bucket("fhir-export-bucket")
blobs = bucket.list_blobs()
for blob in blobs:
print(f"Deleting: {blob.name}")
blob.delete()
credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
export_uri = f"gs://{GCS_EXPORT_BUCKET}/fhir-export-{int(time.time())}/"
export_uri = f"gs://{GCS_EXPORT_BUCKET}/"
url = (
f"https://healthcare.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/"
f"datasets/{DATASET_ID}/fhirStores/{FHIR_STORE_ID}:export"
)
body = {
"gcsDestination": {"uriPrefix": export_uri},
"since": export_time_iso
}
response = authed_session.post(url, json=body)
print(f"Export response: {response.status_code} - {response.text}")
return export_uri if response.ok else None
def wait_for_ndjson_files(export_uri_prefix):
client = storage.Client()
bucket_name = export_uri_prefix.split("/")[2]
prefix = "/".join(export_uri_prefix.split("/")[3:])
print(bucket_name)
print(prefix)
bucket = client.bucket(bucket_name)
for _ in range(20):
blobs = list(bucket.list_blobs(prefix=prefix))
if any(blob.name.endswith("Organization") for blob in blobs):
return [blob for blob in blobs if blob.name.endswith("Organization")]
time.sleep(5)
raise TimeoutError("Export files did not appear in GCS within timeout window")
def create_zip_from_blobs(blobs, zip_path):
client = storage.Client()
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for blob in blobs:
data = blob.download_as_bytes()
fname = os.path.basename(blob.name)
zipf.writestr(fname + ".ndjson", data)
def upload_to_s3(zip_path, s3_key):
s3 = boto3.client('s3', region_name=AWS_REGION)
s3.upload_file(zip_path, AWS_BUCKET, "from_gcp_to_omop" + s3_key)
print(f"Uploaded {zip_path} to s3://{AWS_BUCKET}/from_gcp_to_omop/{s3_key}")
@functions_framework.cloud_event
def receive_pubsub(cloud_event):
print(cloud_event)
data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
data = cloud_event.data
print(data)
print(type(data))
if not data:
return "No data", 400
method = data['message']['attributes']['action']
resource_name = data['message']['attributes']['resourceType']
timestamp = data['message']['publishTime']
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
five_minutes_ago = dt - timedelta(minutes=5)
timestamp = five_minutes_ago.isoformat().replace('+00:00', 'Z')
print(method)
print(resource_name)
print(timestamp)
if "CreateResource" in method and "Organization" in resource_name:
print(f"New Organization detected at {timestamp}")
export_uri = trigger_incremental_export(timestamp)
if not export_uri:
return "Export failed", 500
blobs = wait_for_ndjson_files(export_uri)
zip_file_path = "/tmp/fhir_export.zip"
create_zip_from_blobs(blobs, zip_file_path)
s3_key = f"/export-{int(time.time())}.zip"
upload_to_s3(zip_file_path, s3_key)
return "Exported and uploaded", 200
return "No relevant event", 204