device_exposure.py · fhir2omop
refs/refs/HealthcareLakeETL/mappings/device_exposure.py
55 lines · py
1import pyspark.sql.functions as F
4def map_device_exposure(df):
5 """ DEVICE_EXPOSURE -> DEVICE_EXPOSURE (FHIR -> OMOP)
7 :param df: Input frame of FHIR records
9 :return: Output frame of OMOP transform
12 # Filter By procedure resource type
14 filtered = df.filter(df['resourceType'] == 'Procedure')
16 # Selects relevant fields. Using alias so we don't have columns with the same name
18 Procedure = filtered.select("id",
19 F.col("encounter.reference").alias(
20 "visit_occurrence_id"),
21 F.col("subject.reference").alias("person_id"),
26 "extension.valueCodeableConcept")
28 # Extract the start date and time from the period field.
29 # splits the date and time
30 split_start = F.split(Procedure['performedPeriod.start'], 'T')
31 split_end = F.split(Procedure['performedPeriod.end'], 'T')
33 # assigns each to a column
34 procedure_date_and_time = Procedure\
35 .withColumn("device_exposure_start_date", split_start.getItem(0))\
36 .withColumn("device_exposure_start_datetime", split_start.getItem(1))\
37 .withColumn("device_exposure_end_date", split_end.getItem(0))\
38 .withColumn("device_exposure_end_datetime", split_end.getItem(1))
39 # Drop columns no longer needed
40 dropped = procedure_date_and_time.drop("performedPeriod")
42 device_exposure = dropped\
43 .withColumnRenamed("id", "device_exposure_id")\
44 .withColumnRenamed("valueCodeableConcept", "device_type_concept_id")\
45 .withColumnRenamed("performer", "provider_id")
49 device_exposure = dropped\
50 .withColumnRenamed("id", "device_exposure_id")\
51 .withColumnRenamed("valueCodeableConcept", "device_type_concept_id")\
52 .withColumnRenamed("performer", "provider_id")
54 return device_exposure