refs/refs/HealthcareLakeETL/mappings/observation.py

lines 31–31 45 lines · py
1import pyspark.sql.functions as F
4def map_observation(df):
5 """ Observation -> Observation (FHIR -> OMOP)
7 :param df: Input frame of FHIR records
8 :type df: DynamicFrame
9 :return: Output frame of OMOP transform
10 :rtype: DynamicFrame
11 """
13 filtered = df.filter(df['resourceType'] == 'Observation')
14 Observation = filtered.filter(filtered.valueCodeableConcept.isNotNull())
15 Observation = Observation.select(['id',
16 'subject',
17 'code',
18 'performer',
19 'encounter',
20 'meta',
21 'effectiveDateTime',
22 'valueCodeableConcept',
23 'category'])
25 split_dates = F.split(Observation["effectiveDateTime"], 'T')
27 Observation = Observation.withColumnRenamed("id", "observation_id")\
28 .withColumn("observation_type_concept_id", Observation.category.coding.getItem(0).code.getItem(0))\
29 .withColumn("observation_date", split_dates.getItem(0))\
30 .withColumn("person_id", Observation.subject.reference)\
31 .withColumn("value_as_string", Observation.valueCodeableConcept.text)\
32 .withColumnRenamed("code", "observation_concept_id")\
33 .withColumnRenamed("effectiveDateTime", "observation_datetime")\
34 .drop("valueCodeableConcept")\
35 .withColumn("visit_occurrence_id", Observation.encounter.reference)\
36 .withColumnRenamed("performer", "provider_id")\
37 .drop("encounter")\
38 .drop("subject")\
39 .drop("meta")\
40 .drop("category")
42 Observation = Observation.withColumn(
43 "observation_concept_id", Observation.observation_concept_id.coding.getItem(0).code)
44 return Observation