observation.py · fhir2omop
refs/refs/HealthcareLakeETL/mappings/observation.py
lines 31–31
45 lines · py
1import pyspark.sql.functions as F 4def map_observation(df): 5 """ Observation -> Observation (FHIR -> OMOP) 7 :param df: Input frame of FHIR records 9 :return: Output frame of OMOP transform 13 filtered = df.filter(df['resourceType'] == 'Observation') 14 Observation = filtered.filter(filtered.valueCodeableConcept.isNotNull()) 15 Observation = Observation.select(['id', 22 'valueCodeableConcept', 25 split_dates = F.split(Observation["effectiveDateTime"], 'T') 27 Observation = Observation.withColumnRenamed("id", "observation_id")\ 28 .withColumn("observation_type_concept_id", Observation.category.coding.getItem(0).code.getItem(0))\ 29 .withColumn("observation_date", split_dates.getItem(0))\ 30 .withColumn("person_id", Observation.subject.reference)\ 31 .withColumn("value_as_string", Observation.valueCodeableConcept.text)\ 32 .withColumnRenamed("code", "observation_concept_id")\ 33 .withColumnRenamed("effectiveDateTime", "observation_datetime")\ 34 .drop("valueCodeableConcept")\ 35 .withColumn("visit_occurrence_id", Observation.encounter.reference)\ 36 .withColumnRenamed("performer", "provider_id")\ 42 Observation = Observation.withColumn( 43 "observation_concept_id", Observation.observation_concept_id.coding.getItem(0).code)