procedure_occurrence.py · fhir2omop
refs/refs/fhir-x-omop/fhir_x_omop/to_omop/procedure_occurrence.py
34 lines · py
1from fhir.resources.procedure import Procedure 2from omop_pydantic import ProcedureOccurrence 4from chidian import DataMapping, Mapper 5import chidian.partials as p 7procedure_occurrence_mapper = Mapper( 9 "procedure_occurrence_id": (p.get("id") | p.int())(src), 10 "person_id": p.get("subject.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 11 "procedure_concept_id": 0, # Would need concept mapping in production 12 "procedure_date": p.get("performedDateTime", getter=lambda x: x.split('T')[0] if x else None)(src), 13 "procedure_datetime": p.get("performedDateTime")(src), 14 "procedure_type_concept_id": p.case(p.get("code.coding[0].system")(src), { 15 "http://www.ama-assn.org/go/cpt": 32817, 16 "http://hl7.org/fhir/sid/icd-10-pcs": 32818, 17 "http://hl7.org/fhir/sid/icd-9-cm": 32819, 19 "modifier_concept_id": 0, 21 "provider_id": p.get("performer[0].actor.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 22 "visit_occurrence_id": p.get("encounter.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 23 "visit_detail_id": p.get("encounter.reference", getter=lambda x: int(x.split('/')[1]) if x else None)(src), 24 "procedure_source_value": p.get("code.coding[0].code")(src), 25 "procedure_source_concept_id": 0, 26 "modifier_source_value": p.get("code.coding[0].display")(src), 30to_omop_procedure_occurrence = DataMapping( 31 mapper=procedure_occurrence_mapper, 32 input_schema=Procedure, 33 output_schema=ProcedureOccurrence,