condition_occurrence.py · fhir2omop
refs/refs/fhir-x-omop/fhir_x_omop/to_omop/condition_occurrence.py
lines 28–32
47 lines · py
1from fhir.resources.condition import Condition 2from omop_pydantic import ConditionOccurrence 4from chidian import DataMapping, Mapper 5import chidian.partials as p 7def get_first_existing(src, paths): 9 value = p.get(path)(src) 14condition_occurrence_mapper = Mapper( 16 "condition_occurrence_id": (p.get("id") | p.int())(src), 17 "person_id": p.get("subject.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 18 "condition_concept_id": 0, # Would need concept mapping in production 19 "condition_start_date": p.get(getter=lambda s: (get_first_existing(s, ['onsetDateTime', 'recordedDate']) or '').split('T')[0] or None)(src), 20 "condition_start_datetime": get_first_existing(src, ['onsetDateTime', 'recordedDate']), 21 "condition_end_date": p.get("abatementDateTime", getter=lambda x: x.split('T')[0] if x else None)(src), 22 "condition_end_datetime": p.get("abatementDateTime")(src), 23 "condition_type_concept_id": p.case(p.get("category[0].coding[0].code")(src), { 24 "encounter-diagnosis": 32817, 25 "problem-list-item": 32818, 26 "health-concern": 32819, 28 "condition_status_concept_id": p.case(p.get("clinicalStatus.coding[0].code")(src), { 33 "stop_reason": p.get("note[0].text")(src), 34 "provider_id": p.get("recorder.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 35 "visit_occurrence_id": p.get("encounter.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 36 "visit_detail_id": p.get("encounter.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 37 "condition_source_value": p.get("code.coding[0].code")(src), 38 "condition_source_concept_id": 0, 39 "condition_status_source_value": p.get("clinicalStatus.coding[0].code")(src), 43to_omop_condition_occurrence = DataMapping( 44 mapper=condition_occurrence_mapper, 45 input_schema=Condition, 46 output_schema=ConditionOccurrence,