refs/refs/HealthcareLakeETL/mappings/procedure_occurrence.py

45 lines · py
1import pyspark.sql.functions as F
4def map_procedure_occurrence(df):
5 """ PROCEDURE_OCCURRENCE -> PROCEDURE_OCCURRENCE (FHIR -> OMOP)
7 :param df: Input frame of FHIR records
8 :type df: DynamicFrame
9 :return: Output frame of OMOP transform
10 :rtype: DynamicFrame
11 """
12 # Filter By Procedure Resource type
14 filtered = df.filter(df['resourceType'] == 'Procedure')
16 # Selects relevant fields. Using alias so we don't have columns with the same name
18 Procedure = filtered.select("id",
19 F.col("encounter.reference").alias(
20 "visit_occurrence_id"),
21 F.col("subject.reference").alias("person_id"),
22 "code.coding",
23 "performedPeriod",
24 "performer",
25 "extension")
27 # Extract the start date and time from the period field.
28 # splits the date and time
29 split_start = F.split(Procedure['performedPeriod.start'], 'T')
31 # assigns each to a column
32 procedure_date_and_time = Procedure\
33 .withColumn("procedure_date", split_start.getItem(0))\
34 .withColumn("procedure_datetime", split_start.getItem(1))
35 # Drop columns no longer needed
36 dropped = procedure_date_and_time.drop("performedPeriod")
38 # Rename the columns
39 procedure_occurrence = dropped\
40 .withColumnRenamed("id", "procedure_occurrence_id")\
41 .withColumnRenamed("coding", "procedure_concept_id")\
42 .withColumnRenamed("extension", "procedure_type_concept_id")\
43 .withColumnRenamed("performer", "provider_id")
44 return procedure_occurrence