From b767faafa25fff83aa509c816ad2123dcdeb7010 Mon Sep 17 00:00:00 2001 From: Thomas PETIT-JEAN Date: Mon, 5 Dec 2022 19:24:09 +0100 Subject: [PATCH 01/25] chores: add zenodo DOI --- README.md | 14 ++++++++++++++ docs/index.md | 13 +++++++++++++ 2 files changed, 27 insertions(+) diff --git a/README.md b/README.md index 596498cf..ee90eefe 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Supported Python versions +DOI

@@ -80,6 +81,19 @@ import eds_scikit Please check our [contributing guidelines](https://aphp.github.io/eds-scikit/contributing/). +### Citation + +If you use `eds-scikit`, please cite us as below. + +```bibtex +@misc{eds-scikit, + author = {Petit-Jean, Thomas and Remaki, Adam and Maladière, Vincent and Varoquaux, Gaël and Bey, Romain}, + doi = {10.5281/zenodo.7401549}, + title = {eds-scikit: data analysis on OMOP databases}, + url = {https://github.com/aphp/eds-scikit} +} +``` + ### Acknowledgment We would like to thank the following funders: diff --git a/docs/index.md b/docs/index.md index d82733ec..d40fb2c8 100644 --- a/docs/index.md +++ b/docs/index.md @@ -224,3 +224,16 @@ Also, a rule-based NLP library ([EDS-NLP](https://github.com/aphp/edsnlp)) desig ## Contributing to eds-scikit We welcome contributions! Fork the project and create a pull request. Take a look at the [dedicated page](contributing.md) for details. + +## Citation + +If you use `eds-scikit`, please cite us as below. + +```bibtex +@misc{eds-scikit, + author = {Petit-Jean, Thomas and Remaki, Adam and Maladière, Vincent and Varoquaux, Gaël and Bey, Romain}, + doi = {10.5281/zenodo.7401549}, + title = {eds-scikit: data analysis on OMOP databases}, + url = {https://github.com/aphp/eds-scikit} +} +``` From 4612e5ef7fb3421c95bcf3cb4ad3b1b401b7bcee Mon Sep 17 00:00:00 2001 From: Thomas PETIT-JEAN Date: Thu, 8 Dec 2022 18:48:51 +0100 Subject: [PATCH 02/25] let: it snow --- docs/index.md | 4 ++++ mkdocs.yml | 8 ++++---- pyproject.toml | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/index.md b/docs/index.md index d40fb2c8..ca3b91ef 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,3 +1,7 @@ +--- +ᴴₒᴴₒᴴₒ: true +--- +

eds-scikit

diff --git a/mkdocs.yml b/mkdocs.yml index 9219e346..a6f901de 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,14 +7,14 @@ theme: name: material custom_dir: mkdocs_theme palette: - - scheme: default - toggle: - icon: material/brightness-4 - name: Switch to dark mode - scheme: slate toggle: icon: material/brightness-7 name: Switch to light mode + - scheme: default + toggle: + icon: material/brightness-4 + name: Switch to dark mode logo: _static/scikit_logo.svg favicon: _static/scikit_logo.svg diff --git a/pyproject.toml b/pyproject.toml index b8b31779..3757972b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,7 +89,7 @@ doc = [ "mkdocs-gen-files==0.3.5", "mkdocs-img2fig-plugin==0.9.3", "mkdocs-literate-nav==0.4.1", - "mkdocs-material==8.5.10", + "mkdocs-material==8.5.11", "mkdocs-section-index==0.3.4", "mkdocstrings==0.19.0", "mkdocstrings-python==0.7.1", @@ -115,6 +115,7 @@ authorized_licenses = [ "mpl", "mozilla public license", "public domain", + "unlicense", ] unauthorized_licenses = [ '\bgpl', From f10f7f0719b5aff44889339382c9a1cea4112d64 Mon Sep 17 00:00:00 2001 From: Thomas PETIT-JEAN Date: Thu, 8 Dec 2022 18:48:51 +0100 Subject: [PATCH 03/25] let: it snow --- docs/index.md | 4 ++++ mkdocs.yml | 8 ++++---- pyproject.toml | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/index.md b/docs/index.md index d40fb2c8..ca3b91ef 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,3 +1,7 @@ +--- +ᴴₒᴴₒᴴₒ: true +--- +

eds-scikit

diff --git a/mkdocs.yml b/mkdocs.yml index 9219e346..a6f901de 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,14 +7,14 @@ theme: name: material custom_dir: mkdocs_theme palette: - - scheme: default - toggle: - icon: material/brightness-4 - name: Switch to dark mode - scheme: slate toggle: icon: material/brightness-7 name: Switch to light mode + - scheme: default + toggle: + icon: material/brightness-4 + name: Switch to dark mode logo: _static/scikit_logo.svg favicon: _static/scikit_logo.svg diff --git a/pyproject.toml b/pyproject.toml index b8b31779..3757972b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,7 +89,7 @@ doc = [ "mkdocs-gen-files==0.3.5", "mkdocs-img2fig-plugin==0.9.3", "mkdocs-literate-nav==0.4.1", - "mkdocs-material==8.5.10", + "mkdocs-material==8.5.11", "mkdocs-section-index==0.3.4", "mkdocstrings==0.19.0", "mkdocstrings-python==0.7.1", @@ -115,6 +115,7 @@ authorized_licenses = [ "mpl", "mozilla public license", "public domain", + "unlicense", ] unauthorized_licenses = [ '\bgpl', From 13fb203b8d2bfec271e02b146b7f9bd72720ebd9 Mon Sep 17 00:00:00 2001 From: Vincent M Date: Thu, 29 Dec 2022 20:44:08 +0100 Subject: [PATCH 04/25] fist commit with bulk copy from EDS-Teva --- eds_scikit/io/hive.py | 77 ++++++++++---- eds_scikit/io/i2b2_mapping.py | 184 ++++++++++++++++++++++++++++++++++ eds_scikit/io/settings.py | 161 +++++++++++++++++++++++++++++ 3 files changed, 401 insertions(+), 21 deletions(-) create mode 100644 eds_scikit/io/i2b2_mapping.py diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index 273e0c90..01e10f4b 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -1,8 +1,10 @@ import os +from collections import defaultdict from typing import Dict, Iterable, List, Optional, Union import pandas from databricks import koalas +from i2b2_mapping import get_i2b2_table from loguru import logger from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql import SparkSession @@ -25,6 +27,7 @@ def __init__( columns_to_load: Optional[ Union[Dict[str, Optional[List[str]]], List[str]] ] = None, + database_type: Optional[str] = "OMOP", ): """Spark interface for OMOP data stored in a Hive database. @@ -50,6 +53,8 @@ def __init__( A list of the tables names can also be provided to load all columns of each table. columns_to_load : Optional[Union[Dict[str, Optional[List[str]]], List[str]]] *deprecated* + database_type: Optional[str] = 'OMOP'. Must be 'OMOP' or 'I2B2' + Whether to use the native OMOP schema or to convert I2B2 inputs to OMOP. Attributes ---------- @@ -60,7 +65,6 @@ def __init__( names of OMOP tables that can be accessed as attributes with this HiveData object. - Examples -------- @@ -99,18 +103,29 @@ def __init__( ``` """ - if columns_to_load and not tables_to_load: tables_to_load = columns_to_load + # TODO: Deprecated since which version? Will be removed in which version? logger.warning( - "columns_to_load is a deprecated argument. Please use 'tables_to_load' instead." + "'columns_to_load' is a deprecated argument. Please use 'tables_to_load' instead." ) self.spark_session = ( - spark_session - if spark_session is not None - else SparkSession.builder.enableHiveSupport().getOrCreate() + spark_session or SparkSession.builder.enableHiveSupport().getOrCreate() ) self.database_name = database_name + if database_type not in ["I2B2", "OMOP"]: + raise ValueError( + f"`database_type` must be either 'I2B2' or 'OMOP'. Got {database_type}" + ) + self.database_type = database_type + + if self.database_type == "I2B2": + self.database_source = "cse" if "cse" in self.database_name else "edsprod" + self.omop_to_i2b2 = settings.i2b2_tables[self.database_source] + self.i2b2_to_omop = defaultdict(list) + for omop_col, i2b2_col in self.omop_to_i2b2.items(): + self.i2b2_to_omop[i2b2_col].append(omop_col) + self.person_ids = self._prepare_person_ids(person_ids) tmp_tables_to_load = settings.tables_to_load @@ -134,14 +149,25 @@ def list_available_tables(self) -> List[str]: tables_df = self.spark_session.sql( f"SHOW TABLES IN {self.database_name}" ).toPandas() - available_tables = [ - table_name - for table_name in tables_df["tableName"].drop_duplicates().to_list() - if table_name in self.tables_to_load.keys() - ] + available_tables = set() + session_tables = tables_df["tableName"].drop_duplicates().to_list() + for table_name in session_tables: + if ( + self.database_type == "OMOP" + and table_name in self.tables_to_load.keys() + ): + available_tables.add(table_name) + elif ( + self.database_type == "I2B2" and table_name in self.i2b2_to_omop.keys() + ): + for omop_table in self.i2b2_to_omop[table_name]: + if omop_table in self.tables_to_load.keys(): + available_tables.add(omop_table) + available_tables = list(available_tables) return available_tables def rename_table(self, old_table_name: str, new_table_name: str) -> None: + # TODO: use _tables dict instead of self to store tables? if old_table_name in self.available_tables: setattr(self, new_table_name, getattr(self, old_table_name)) self.available_tables.remove(old_table_name) @@ -180,20 +206,29 @@ def _prepare_person_ids(self, list_of_person_ids) -> Optional[SparkDataFrame]: return filtering_df def _read_table(self, table_name, person_ids=None) -> DataFrame: - assert table_name in self.available_tables - if person_ids is None and self.person_ids is not None: - person_ids = self.person_ids + if table_name not in self.available_tables: + raise ValueError( + f"{table_name} is not available. " + f"Available tables are: {self.available_tables}" + ) - df = self.spark_session.sql(f"select * from {self.database_name}.{table_name}") + if self.database_type == "OMOP": + df = self.spark_session.sql( + f"select * from {self.database_name}.{table_name}" + ) + else: + df = get_i2b2_table( + spark_session=self.spark_session, + db_name=self.database_name, + db_source=self.database_source, + table=table_name, + ) - desired_columns = self.tables_to_load[table_name] - selected_columns = ( - df.columns - if desired_columns is None - else [col for col in df.columns if col in desired_columns] - ) + desired_columns = self.tables_to_load[table_name] or df.columns + selected_columns = list(set(df.columns) & set(desired_columns)) df = df.select(*selected_columns) + person_ids = person_ids or self.person_ids if "person_id" in df.columns and person_ids is not None: df = df.join(person_ids, on="person_id", how="inner") diff --git a/eds_scikit/io/i2b2_mapping.py b/eds_scikit/io/i2b2_mapping.py new file mode 100644 index 00000000..0b680f33 --- /dev/null +++ b/eds_scikit/io/i2b2_mapping.py @@ -0,0 +1,184 @@ +from typing import Dict + +from pyspark.sql import DataFrame as SparkDataFrame +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from pyspark.sql.functions import udf as FunctionUDF + +from .settings import ( + dict_code_UFR, + i2b2_renaming, + i2b2_tables, + sex_cd_mapping, + visit_type_mapping, +) + + +def get_i2b2_table( + spark_session: SparkSession, db_name: str, db_source: str, table: str +) -> SparkDataFrame: + """ + Convert a Spark table from i2b2 to OMOP format. + + Parameters + ---------- + db_name: str + Name of the database where the data is stored. + table: str + Name of the table to extract. + + Returns + ------- + df: Spark DataFrame + Spark DataFrame extracted from the i2b2 database given and converted to OMOP standard. + """ + + table_name = i2b2_tables[db_source][table] + columns = i2b2_renaming[table] + if db_source == "cse": + columns.pop("i2b2_action", None) + query = ",".join([f"{k} AS {v}" for k, v in columns.items()]) + + df = spark_session.sql(f"""SELECT {query} FROM {db_name}.{table_name}""") + + # Special mapping for i2b2 : + + # CIM10 + if table == "condition_occurrence": + df = df.withColumn( + "condition_source_value", + F.substring(F.col("condition_source_value"), 7, 20), + ) + + # CCAM + elif table == "procedure_occurrence": + df = df.withColumn( + "procedure_source_value", + F.substring(F.col("procedure_source_value"), 6, 20), + ) + + # Visits + elif table == "visit_occurrence": + df = df.withColumn( + "visit_source_value", + mapping_dict(visit_type_mapping, "Non Renseigné")( + F.col("visit_source_value") + ), + ) + if db_source == "cse": + df = df.withColumn("row_status_source_value", F.lit("Actif")) + df = df.withColumn( + "visit_occurrence_source_value", df["visit_occurrence_id"] + ) + else: + df = df.withColumn( + "row_status_source_value", + F.when( + F.col("row_status_source_value").isin([-1, -2]), "supprimé" + ).otherwise("Actif"), + ) + # Retrieve Hospital trigram + ufr = spark_session.sql( + f"SELECT * FROM {db_name}.{i2b2_tables[db_source]['visit_detail']}" + ) + ufr = ufr.withColumn( + "care_site_id", + F.substring(F.split(F.col("concept_cd"), ":").getItem(1), 1, 3), + ) + ufr = ufr.withColumnRenamed("encounter_num", "visit_occurrence_id") + ufr = ufr.drop_duplicates(subset=["visit_occurrence_id"]) + ufr = ufr.select(["visit_occurrence_id", "care_site_id"]) + df = df.join(ufr, how="inner", on=["visit_occurrence_id"]) + + # Patients + elif table == "person": + df = df.withColumn( + "gender_source_value", + mapping_dict(sex_cd_mapping, "Non Renseigné")(F.col("gender_source_value")), + ) + + # Documents + elif table == "note": + df = df.withColumn( + "note_class_source_value", + F.substring(F.col("note_class_source_value"), 4, 100), + ) + if db_source == "cse": + df = df.withColumn("row_status_source_value", F.lit("Actif")) + else: + df = df.withColumn( + "row_status_source_value", + F.when(F.col("row_status_source_value") < 0, "SUPP").otherwise("Actif"), + ) + + # Hospital trigrams + elif table == "care_site": + df = df.withColumn("care_site_type_source_value", F.lit("Hôpital")) + df = df.withColumn( + "care_site_source_value", + F.split(F.col("care_site_source_value"), ":").getItem(1), + ) + df = df.withColumn( + "care_site_id", F.substring(F.col("care_site_source_value"), 1, 3) + ) + df = df.drop_duplicates(subset=["care_site_id"]) + df = df.withColumn( + "care_site_short_name", + mapping_dict(dict_code_UFR, "Non Renseigné")(F.col("care_site_id")), + ) + + # UFR + elif table == "visit_detail": + df = df.withColumn( + "care_site_id", F.split(F.col("care_site_id"), ":").getItem(1) + ) + df = df.withColumn("visit_detail_type_source_value", F.lit("PASS")) + df = df.withColumn("row_status_source_value", F.lit("Actif")) + + # biology + elif table == "biology": + df = df.withColumn( + "biology_source_value", F.substring(F.col("biology_source_value"), 5, 20) + ) + + # fact_relationship + elif table == "fact_relationship": + # Retrieve UF information + df = df.withColumn( + "fact_id_1", + F.split(F.col("care_site_source_value"), ":").getItem(1), + ) + df = df.withColumn("domain_concept_id_1", F.lit(57)) # Care_site domain + + # Retrieve hospital information + df = df.withColumn("fact_id_2", F.substring(F.col("fact_id_1"), 1, 3)) + df = df.withColumn("domain_concept_id_2", F.lit(57)) # Care_site domain + df = df.drop_duplicates(subset=["fact_id_1", "fact_id_2"]) + + # Only UF-Hospital relationships in i2b2 + df = df.withColumn("relationship_concept_id", F.lit(46233688)) # Included in + + return df + + +def mapping_dict(mapping: Dict[str, str], default: str) -> FunctionUDF: + """ + Returns a function that maps data according to a mapping dictionnary in a Spark DataFrame. + + Parameters + ---------- + mapping: Dict + Mapping dictionnary + default: str + Value to return if the function input is not find in the mapping dictionnary. + + Returns + ------- + Callable + Function that maps the values of Spark DataFrame column. + """ + + def f(x): + return mapping.get(x, default) + + return F.udf(f) diff --git a/eds_scikit/io/settings.py b/eds_scikit/io/settings.py index f368a8d9..1fa9c353 100644 --- a/eds_scikit/io/settings.py +++ b/eds_scikit/io/settings.py @@ -205,3 +205,164 @@ # make sure we know how to load the tables we want to save assert all(table in tables_to_load.keys() for table in default_tables_to_save) + + +# Tables for each base +i2b2_tables = { + "edsprod": { + "visit_occurrence": "i2b2_orbis_visit_dim", + "note": "i2b2_observation_fact_document", + "person": "i2b2_patient_dim", + "condition_occurrence": "i2b2_arem_observation_fact_cim10", + "procedure_occurrence": "i2b2_arem_observation_fact_ccam", + "care_site": "i2b2_observation_fact_ufr", + "visit_detail": "i2b2_observation_fact_ufr", + "biology": "i2b2_observation_fact_lab", + "fact_relationship": "i2b2_observation_fact_ufr", + "concept": "orbis_form_ref_concept_list", + }, + "cse": { + "visit_occurrence": "i2b2_visit", + "note": "i2b2_observation_doc", + "person": "i2b2_patient", + "condition_occurrence": "i2b2_observation_cim10", + "procedure_occurrence": "i2b2_observation_ccam", + "care_site": "i2b2_observation_ufr", + "visit_detail": "i2b2_observation_ufr", + "biology": "i2b2_observation_lab", + "fact_relationship": "i2b2_observation_ufr", + "concept": "i2b2_concept", + }, +} + + +# Mapping between i2b2 and OMOP +i2b2_renaming = { + "note": { + "instance_num": "note_id", + "start_date": "note_datetime", + "concept_cd": "note_class_source_value", + "encounter_num": "visit_occurrence_id", + "patient_num": "person_id", + "observation_blob": "note_text", + "sourcesystem_cd": "cdm_source", + "i2b2_action": "row_status_source_value", + }, + "person": { + "patient_num": "person_id", + "birth_date": "birth_datetime", + "death_date": "death_datetime", + "sex_cd": "gender_source_value", + "sourcesystem_cd": "cdm_source", + }, + "visit_occurrence": { + "patient_num": "person_id", + "encounter_num": "visit_occurrence_id", + "visit_blob": "visit_occurrence_source_value", + "start_date": "visit_start_datetime", + "end_date": "visit_end_datetime", + "sourcesystem_cd": "cdm_source", + "type_visite": "visit_source_value", + "mode_entree": "admitted_from_source_value", + "mode_sortie": "discharge_to_source_value", + "i2b2_action": "row_status_source_value", + }, + "condition_occurrence": { + "patient_num": "person_id", + "encounter_num": "visit_occurrence_id", + "instance_num": "condition_occurrence_id", + "start_date": "condition_start_datetime", + "concept_cd": "condition_source_value", + "tval_char": "condition_status_source_value", + "sourcesystem_cd": "cdm_source", + }, + "procedure_occurrence": { + "patient_num": "person_id", + "encounter_num": "visit_occurrence_id", + "instance_num": "procedure_occurrence_id", + "start_date": "procedure_start_datetime", + "concept_cd": "procedure_source_value", + "sourcesystem_cd": "cdm_source", + }, + "care_site": {"location_cd": "care_site_source_value"}, + "visit_detail": { + "instance_num": "visit_detail_id", + "encounter_num": "visit_occurrence_id", + "location_cd": "care_site_id", + "patient_num": "person_id", + "start_date": "visit_detail_start_datetime", + "end_date": "visit_detail_end_datetime", + "sourcesystem_cd": "cdm_source", + "i2b2_action": "row_status_source_value", + }, + "biology": { + "patient_num": "person_id", + "encounter_num": "visit_occurrence_id", + "instance_num": "biology_occurrence_id", + "start_date": "biology_start_datetime", + "concept_cd": "biology_source_value", + "sourcesystem_cd": "cdm_source", + "nval_num": "biology_status_source_value", + }, + "fact_relationship": { + "location_cd": "care_site_source_value", + "sourcesystem_cd": "cdm_source", + }, + "concept": {"cd_concept": "concept_id", "lib_concept": "concept_name"}, +} + + +sex_cd_mapping = {"W": "f", "M": "m"} + + +dict_code_UFR = { + "014": "APR", + "028": "ABC", + "095": "AVC", + "005": "BJN", + "009": "BRK", + "010": "BCT", + "011": "BCH", + "033": "BRT", + "016": "BRC", + "042": "CFX", + "019": "CRC", + "021": "CCH", + "022": "CCL", + "029": "ERX", + "036": "GCL", + "075": "EGP", + "038": "HND", + "026": "HMN", + "099": "HAD", + "041": "HTD", + "032": "JVR", + "044": "JFR", + "047": "LRB", + "049": "LRG", + "053": "LMR", + "061": "NCK", + "096": "PBR", + "066": "PSL", + "068": "RPC", + "069": "RMB", + "070": "RDB", + "072": "RTH", + "073": "SAT", + "079": "SPR", + "076": "SLS", + "084": "SSL", + "087": "TNN", + "088": "TRS", + "090": "VGR", + "064": "VPD", + "INC": "INCONNU", +} + + +visit_type_mapping = { + "I": "hospitalisés", + "II": "hospitalisation incomplète", + "U": "urgence", + "O": "consultation externe", +} From 21c1025e5753eb82b8f9ea8145a10e8e04ad3bcb Mon Sep 17 00:00:00 2001 From: Vincent M Date: Thu, 5 Jan 2023 15:35:47 +0100 Subject: [PATCH 05/25] hf1 --- eds_scikit/io/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index 01e10f4b..7a1dfa45 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -4,13 +4,13 @@ import pandas from databricks import koalas -from i2b2_mapping import get_i2b2_table from loguru import logger from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql import SparkSession from pyspark.sql.types import LongType, StructField, StructType from . import settings +from .i2b2_mapping import get_i2b2_table DataFrame = Union[koalas.DataFrame, pandas.DataFrame] From b4b8f582da2f536be9f954846ae3b4b8ecb9caeb Mon Sep 17 00:00:00 2001 From: Vincent M Date: Thu, 5 Jan 2023 17:52:17 +0100 Subject: [PATCH 06/25] hf2 --- eds_scikit/io/hive.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index 7a1dfa45..da6f16aa 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -163,8 +163,7 @@ def list_available_tables(self) -> List[str]: for omop_table in self.i2b2_to_omop[table_name]: if omop_table in self.tables_to_load.keys(): available_tables.add(omop_table) - available_tables = list(available_tables) - return available_tables + return list(available_tables) def rename_table(self, old_table_name: str, new_table_name: str) -> None: # TODO: use _tables dict instead of self to store tables? From 2ad6ec3cbf20d13052216124af7f0d79ea14116d Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 9 Jan 2023 09:22:11 +0100 Subject: [PATCH 07/25] feat: Add the BackendDispatcher object (#20) * add framework object * implement cut for koalas and polish the BackendDispatcher object * chore: flake8 no check E731 * add tests for backend_dispatcher * hotfix framework * add doc and contributing * revert a change on contributing * neat * improve coverage and simplify logic of the framework * update changelog * skip cov checks for custom implem cut Co-authored-by: Thomas PETIT-JEAN --- .flake8 | 2 +- changelog.md | 1 + contributing.md | 18 +- eds_scikit/__init__.py | 8 +- eds_scikit/event/consultations.py | 4 +- eds_scikit/plot/data_quality.py | 40 +- eds_scikit/utils/custom_implem/__init__.py | 0 .../utils/custom_implem/custom_implem.py | 65 +++ eds_scikit/utils/custom_implem/cut.py | 441 ++++++++++++++++++ eds_scikit/utils/framework.py | 314 +++++++++++-- tests/test_age_pyramid.py | 36 +- tests/test_backend_dispatcher.py | 137 ++++++ 12 files changed, 1004 insertions(+), 62 deletions(-) create mode 100644 eds_scikit/utils/custom_implem/__init__.py create mode 100644 eds_scikit/utils/custom_implem/custom_implem.py create mode 100644 eds_scikit/utils/custom_implem/cut.py create mode 100644 tests/test_backend_dispatcher.py diff --git a/.flake8 b/.flake8 index 08333cc0..10fdf64e 100644 --- a/.flake8 +++ b/.flake8 @@ -5,5 +5,5 @@ exclude = __pycache__, *.ipynb, *.ipynb_checkpoints -ignore = E501 W503 +ignore = E501 W503 E731 per-file-ignores = __init__.py:F401 diff --git a/changelog.md b/changelog.md index 8b72c9d4..581e6fbe 100644 --- a/changelog.md +++ b/changelog.md @@ -18,6 +18,7 @@ - Various project metadata - Full CI pipeline - License checker in CI +- BackendDispatcher object to help with pandas / koalas manipulation ### Fixed diff --git a/contributing.md b/contributing.md index 1fdc32e2..b6267ddc 100644 --- a/contributing.md +++ b/contributing.md @@ -30,16 +30,9 @@ $ python -m venv venv $ source venv/bin/activate # Install dependencies and build resources -$ pip install -r requirements.txt +$ pip install -e ".[dev, doc]" -# Install development dependencies -$ pip install -r requirements_dev.txt -$ pip install -r requirements_docs.txt - -# Finally, install the package in editable mode... -$ pip install -e . - -# And switch to a new branch to begin developping +# And switch to a new branch to begin developing $ git switch -c "name_of_my_new_branch" ``` @@ -158,3 +151,10 @@ and automatically reloads the page. !!! warning MkDocs will automaticaly build code documentation by going through every `.py` file located in the `eds_scikit` directory (and sub-arborescence). It expects to find a `__init__.py` file in each directory, so make sure to create one if needed. + + +### Developing your own methods + +Even though the koalas project aim at covering most pandas functions for spark, there are some discrepancies. For instance, the `pd.cut()` method has no koalas alternative. + +To ease the development and switch gears efficiently between the two backends, we advice you to use the [`BackendDispatcher`](../reference/utils/framework) class and its collection of custom methods. diff --git a/eds_scikit/__init__.py b/eds_scikit/__init__.py index dd4f6c11..3e5c80e4 100644 --- a/eds_scikit/__init__.py +++ b/eds_scikit/__init__.py @@ -7,7 +7,7 @@ import os import sys import time -from distutils.version import LooseVersion +from packaging import version from typing import List, Tuple from pathlib import Path @@ -61,11 +61,11 @@ def koalas_options() -> None: def set_env_variables() -> None: # From https://github.com/databricks/koalas/blob/master/databricks/koalas/__init__.py - if LooseVersion(pyspark.__version__) < LooseVersion("3.0"): - if LooseVersion(pyarrow.__version__) >= LooseVersion("0.15"): + if version.parse(pyspark.__version__) < version.parse("3.0"): + if version.parse(pyarrow.__version__) >= version.parse("0.15"): os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1" - if LooseVersion(pyarrow.__version__) >= LooseVersion("2.0.0"): + if version.parse(pyarrow.__version__) >= version.parse("2.0.0"): os.environ["PYARROW_IGNORE_TIMEZONE"] = "0" diff --git a/eds_scikit/event/consultations.py b/eds_scikit/event/consultations.py index 8693c097..cc839af8 100644 --- a/eds_scikit/event/consultations.py +++ b/eds_scikit/event/consultations.py @@ -3,7 +3,7 @@ from eds_scikit.utils.checks import concept_checker from eds_scikit.utils.datetime_helpers import substract_datetime -from eds_scikit.utils.framework import add_unique_id, get_framework +from eds_scikit.utils.framework import bd, get_framework from eds_scikit.utils.typing import DataFrame @@ -104,7 +104,7 @@ def get_consultation_dates( dates_per_visit.name = "CONSULTATION_DATE_EXTRACTION" - dates_per_visit = add_unique_id( + dates_per_visit = bd.add_unique_id( dates_per_visit.reset_index(), col_name="TMP_CONSULTATION_ID" ) diff --git a/eds_scikit/plot/data_quality.py b/eds_scikit/plot/data_quality.py index ac8b7268..9ae9ae1c 100644 --- a/eds_scikit/plot/data_quality.py +++ b/eds_scikit/plot/data_quality.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional, Tuple +from typing import Tuple import altair as alt import numpy as np @@ -8,13 +8,15 @@ from pandas.core.series import Series from ..utils.checks import check_columns +from ..utils.framework import bd def plot_age_pyramid( person: DataFrame, datetime_ref: datetime = None, + filename: str = None, savefig: bool = False, - filename: Optional[str] = None, + return_vector: bool = False, ) -> Tuple[alt.Chart, Series]: """Plot an age pyramid from a 'person' pandas DataFrame. @@ -55,23 +57,36 @@ def plot_age_pyramid( person_ = person.copy() - if datetime_ref: - today = pd.to_datetime(datetime_ref) - else: + if datetime_ref is None: today = datetime.today() - person_["age"] = (today - person_["birth_datetime"]).dt.total_seconds() - person_["age"] /= 365 * 24 * 3600 + else: + today = pd.to_datetime(datetime_ref) + + # TODO: replace with from ..utils.datetime_helpers.substract_datetime + deltas = today - person_["birth_datetime"] + if bd.is_pandas(person_): + deltas = deltas.dt.total_seconds() + + person_["age"] = deltas / (365 * 24 * 3600) + person_ = person_.query("age > 0.0") bins = np.arange(0, 100, 10) labels = [f"{left}-{right}" for left, right in zip(bins[:-1], bins[1:])] - person_["age_bins"] = pd.cut(person_["age"], bins=bins, labels=labels) - person_["age_bins"] = person_["age_bins"].astype(str).str.replace("nan", "90+") + person_["age_bins"] = bd.cut(person_["age"], bins=bins, labels=labels) + + person_["age_bins"] = ( + person_["age_bins"].astype(str).str.lower().str.replace("nan", "90+") + ) person_ = person_.loc[person_["gender_source_value"].isin(["m", "f"])] group_gender_age = person_.groupby(["gender_source_value", "age_bins"])[ "person_id" ].count() + # Convert to pandas to ease plotting. + # Since we have aggregated the data, this operation won't crash. + group_gender_age = bd.to_pandas(group_gender_age) + male = group_gender_age["m"].reset_index() female = group_gender_age["f"].reset_index() @@ -108,5 +123,10 @@ def plot_age_pyramid( if savefig: chart.save(filename) - else: + if return_vector: + return group_gender_age + + if return_vector: return chart, group_gender_age + + return chart diff --git a/eds_scikit/utils/custom_implem/__init__.py b/eds_scikit/utils/custom_implem/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/eds_scikit/utils/custom_implem/custom_implem.py b/eds_scikit/utils/custom_implem/custom_implem.py new file mode 100644 index 00000000..81c5ae2a --- /dev/null +++ b/eds_scikit/utils/custom_implem/custom_implem.py @@ -0,0 +1,65 @@ +import pandas as pd +from databricks import koalas as ks + +from eds_scikit.utils.typing import DataFrame + +from .cut import cut + + +class CustomImplem: + """ + A collection of custom pandas and koalas methods. + + All public facing methods must be stateless and defined as classmethods. + """ + + @classmethod + def add_unique_id( + cls, + obj: DataFrame, + col_name: str = "id", + backend=None, + ) -> DataFrame: + """Add an ID column for koalas or pandas.""" + if backend is pd: + obj[col_name] = range(obj.shape[0]) + return obj + elif backend is ks: + return obj.koalas.attach_id_column(id_type="distributed", column=col_name) + else: + raise NotImplementedError( + f"No method 'add_unique_id' is available for backend '{backend}'." + ) + + @classmethod + def cut( + cls, + x, + bins, + right: bool = True, + labels=None, + retbins: bool = False, + precision: int = 3, + include_lowest: bool = False, + duplicates: str = "raise", + ordered: bool = True, + backend=None, # unused because koalas only + ): + """koalas version of pd.cut + + Notes + ----- + Simplified vendoring from: + https://github.com/pandas-dev/pandas/blob/v1.5.2/pandas/core/reshape/tile.py#L50-L305 + """ + return cut( + x, + bins, + right, + labels, + retbins, + precision, + include_lowest, + duplicates, + ordered, + ) diff --git a/eds_scikit/utils/custom_implem/cut.py b/eds_scikit/utils/custom_implem/cut.py new file mode 100644 index 00000000..85dbc2b5 --- /dev/null +++ b/eds_scikit/utils/custom_implem/cut.py @@ -0,0 +1,441 @@ +from typing import Any, Callable + +import numpy as np +import pandas as pd +import pandas.core.algorithms as algos +from pandas import IntervalIndex, to_datetime, to_timedelta +from pandas._libs import Timedelta, Timestamp +from pandas._libs.lib import infer_dtype +from pandas.core.dtypes.common import ( + DT64NS_DTYPE, + is_categorical_dtype, + is_datetime64_dtype, + is_datetime64tz_dtype, + is_list_like, + is_scalar, + is_timedelta64_dtype, +) + + +def cut( + x, + bins, + right: bool = True, + labels=None, + retbins: bool = False, + precision: int = 3, + include_lowest: bool = False, + duplicates: str = "raise", + ordered: bool = True, +): # pragma: no cover + """ + Bin values into discrete intervals. + + Use `cut` when you need to segment and sort data values into bins. This + function is also useful for going from a continuous variable to a + categorical variable. For example, `cut` could convert ages to groups of + age ranges. Supports binning into an equal number of bins, or a + pre-specified array of bins. + + See original function at: https://github.com/pandas-dev/pandas/blob/v1.5.2/pandas/core/reshape/tile.py#L50-L305 # noqa + + Parameters + ---------- + x : koalas Series. + The input array to be binned. Must be 1-dimensional. + bins : int, sequence of scalars, or IntervalIndex + The criteria to bin by. + * int : Defines the number of equal-width bins in the range of `x`. The + range of `x` is extended by .1% on each side to include the minimum + and maximum values of `x`. + * sequence of scalars : Defines the bin edges allowing for non-uniform + width. No extension of the range of `x` is done. + * IntervalIndex : Defines the exact bins to be used. Note that + IntervalIndex for `bins` must be non-overlapping. + right : bool, default True + Indicates whether `bins` includes the rightmost edge or not. If + ``right == True`` (the default), then the `bins` ``[1, 2, 3, 4]`` + indicate (1,2], (2,3], (3,4]. This argument is ignored when + `bins` is an IntervalIndex. + labels : array or False, default None + Specifies the labels for the returned bins. Must be the same length as + the resulting bins. If False, returns only integer indicators of the + bins. This affects the type of the output container (see below). + This argument is ignored when `bins` is an IntervalIndex. If True, + raises an error. When `ordered=False`, labels must be provided. + retbins : bool, default False + Whether to return the bins or not. Useful when bins is provided + as a scalar. + precision : int, default 3 + The precision at which to store and display the bins labels. + include_lowest : bool, default False + Whether the first interval should be left-inclusive or not. + duplicates : {default 'raise', 'drop'}, optional + If bin edges are not unique, raise ValueError or drop non-uniques. + ordered : bool, default True + Whether the labels are ordered or not. Applies to returned types + Categorical and Series (with Categorical dtype). If True, + the resulting categorical will be ordered. If False, the resulting + categorical will be unordered (labels must be provided). + .. versionadded:: 1.1.0 + Returns + ------- + out : Categorical, Series, or ndarray + An array-like object representing the respective bin for each value + of `x`. The type depends on the value of `labels`. + * None (default) : returns a Series for Series `x` or a + Categorical for all other inputs. The values stored within + are Interval dtype. + * sequence of scalars : returns a Series for Series `x` or a + Categorical for all other inputs. The values stored within + are whatever the type in the sequence is. + * False : returns an ndarray of integers. + bins : numpy.ndarray or IntervalIndex. + The computed or specified bins. Only returned when `retbins=True`. + For scalar or sequence `bins`, this is an ndarray with the computed + bins. If set `duplicates=drop`, `bins` will drop non-unique bin. For + an IntervalIndex `bins`, this is equal to `bins`. + + See Also + -------- + qcut : Discretize variable into equal-sized buckets based on rank + or based on sample quantiles. + Categorical : Array type for storing data that come from a + fixed set of values. + Series : One-dimensional array with axis labels (including time series). + IntervalIndex : Immutable Index implementing an ordered, sliceable set. + + Notes + ----- + Any NA values will be NA in the result. Out of bounds values will be NA in + the resulting Series or Categorical object. + Reference :ref:`the user guide ` for more examples. + + Examples + -------- + Discretize into three equal-sized bins. + + >>> from eds_scikit.utils.framework import bd + >>> bd.cut(ks.Series(np.array([1, 7, 5, 4, 6, 3])), 3) + ... # doctest: +ELLIPSIS + [(0.994, 3.0], (5.0, 7.0], (3.0, 5.0], (3.0, 5.0], (5.0, 7.0], ... + Categories (3, interval[float64, right]): [(0.994, 3.0] < (3.0, 5.0] ... + + >>> bd.cut(ks.Series(np.array([1, 7, 5, 4, 6, 3])), 3, retbins=True) + ... # doctest: +ELLIPSIS + ([(0.994, 3.0], (5.0, 7.0], (3.0, 5.0], (3.0, 5.0], (5.0, 7.0], ... + Categories (3, interval[float64, right]): [(0.994, 3.0] < (3.0, 5.0] ... + array([0.994, 3. , 5. , 7. ])) + + Discovers the same bins, but assign them specific labels. Notice that + the returned Categorical's categories are `labels` and is ordered. + + >>> bd.cut(ks.Series(np.array([1, 7, 5, 4, 6, 3])), + ... 3, labels=["bad", "medium", "good"]) + ['bad', 'good', 'medium', 'medium', 'good', 'bad'] + Categories (3, object): ['bad' < 'medium' < 'good'] + + ``ordered=False`` will result in unordered categories when labels are passed. + This parameter can be used to allow non-unique labels: + + >>> bd.cut(ks.Series(np.array([1, 7, 5, 4, 6, 3])), 3, + ... labels=["B", "A", "B"], ordered=False) + ['B', 'B', 'A', 'A', 'B', 'B'] + Categories (2, object): ['A', 'B'] + + ``labels=False`` implies you just want the bins back. + + >>> bd.cut(ks.Series([0, 1, 1, 2]), bins=4, labels=False) + array([0, 1, 1, 3]) + + Passing a Series as an input returns a Series with categorical dtype: + + >>> s = ks.Series(np.array([2, 4, 6, 8, 10]), + ... index=['a', 'b', 'c', 'd', 'e']) + + >>> bd.cut(s, 3) + ... # doctest: +ELLIPSIS + a (1.992, 4.667] + b (1.992, 4.667] + c (4.667, 7.333] + d (7.333, 10.0] + e (7.333, 10.0] + dtype: category + Categories (3, interval[float64, right]): [(1.992, 4.667] < (4.667, ... + + Passing a Series as an input returns a Series with mapping value. + It is used to map numerically to intervals based on bins. + + >>> s = ks.Series(np.array([2, 4, 6, 8, 10]), + ... index=['a', 'b', 'c', 'd', 'e']) + + >>> bd.cut(s, [0, 2, 4, 6, 8, 10], labels=False, retbins=True, right=False) + ... # doctest: +ELLIPSIS + (a 1.0 + b 2.0 + c 3.0 + d 4.0 + e NaN + dtype: float64, + array([ 0, 2, 4, 6, 8, 10])) + + Use `drop` optional when bins is not unique + + >>> bd.cut(s, [0, 2, 4, 6, 10, 10], labels=False, retbins=True, + ... right=False, duplicates='drop') + ... # doctest: +ELLIPSIS + (a 1.0 + b 2.0 + c 3.0 + d 3.0 + e NaN + dtype: float64, + array([ 0, 2, 4, 6, 10])) + + Passing an IntervalIndex for `bins` results in those categories exactly. + Notice that values not covered by the IntervalIndex are set to NaN. 0 + is to the left of the first bin (which is closed on the right), and 1.5 + falls between two bins. + + >>> bins = pd.IntervalIndex.from_tuples([(0, 1), (2, 3), (4, 5)]) + + >>> bd.cut(ks.Series([0, 0.5, 1.5, 2.5, 4.5]), bins) + [NaN, (0.0, 1.0], NaN, (2.0, 3.0], (4.0, 5.0]] + Categories (3, interval[int64, right]): [(0, 1] < (2, 3] < (4, 5]] + """ + if x.ndim != 1: + raise ValueError("x must be 1D") + + x, dtype = x.astype(np.int64), x.dtype + + if not np.iterable(bins): + if is_scalar(bins) and bins < 1: + raise ValueError("`bins` should be a positive integer.") + + try: # for array-like + sz = x.size + except AttributeError: + x = np.asarray(x) + sz = x.size + + if sz == 0: + raise ValueError("Cannot cut empty array") + + mn, mx = x.min(), x.max() + + if np.isinf(mn) or np.isinf(mx): + raise ValueError( + "cannot specify integer `bins` when input data contains infinity" + ) + elif mn == mx: # adjust end points before binning + mn -= 0.001 * abs(mn) if mn != 0 else 0.001 + mx += 0.001 * abs(mx) if mx != 0 else 0.001 + bins = np.linspace(mn, mx, bins + 1, endpoint=True) + else: # adjust end points after binning + bins = np.linspace(mn, mx, bins + 1, endpoint=True) + adj = (mx - mn) * 0.001 # 0.1% of the range + if right: + bins[0] -= adj + else: + bins[-1] += adj + + elif isinstance(bins, IntervalIndex): + if bins.is_overlapping: + raise ValueError("Overlapping IntervalIndex is not accepted.") + + else: + if is_datetime64tz_dtype(bins): + bins = np.asarray(bins, dtype=DT64NS_DTYPE) + else: + bins = np.asarray(bins) + bins = _convert_bin_to_numeric_type(bins, dtype) + + # GH 26045: cast to float64 to avoid an overflow + if (np.diff(bins.astype("float64")) < 0).any(): + raise ValueError("bins must increase monotonically.") + + fac, bins = _bins_to_cuts( + x, + bins, + right=right, + labels=labels, + precision=precision, + include_lowest=include_lowest, + dtype=dtype, + duplicates=duplicates, + ordered=ordered, + ) + + if not retbins: + return fac + + return fac, bins + + +def _convert_bin_to_numeric_type(bins, dtype): # pragma: no cover + """ + if the passed bin is of datetime/timedelta type, + this method converts it to integer + Parameters + ---------- + bins : list-like of bins + dtype : dtype of data + Raises + ------ + ValueError if bins are not of a compat dtype to dtype + """ + bins_dtype = infer_dtype(bins, skipna=False) + if is_timedelta64_dtype(dtype): + if bins_dtype in ["timedelta", "timedelta64"]: + bins = to_timedelta(bins).view(np.int64) + else: + raise ValueError("bins must be of timedelta64 dtype") + elif is_datetime64_dtype(dtype) or is_datetime64tz_dtype(dtype): + if bins_dtype in ["datetime", "datetime64"]: + bins = to_datetime(bins).view(np.int64) + else: + raise ValueError("bins must be of datetime64 dtype") + + return bins + + +def _format_labels( + bins, precision: int, right: bool = True, include_lowest: bool = False, dtype=None +): # pragma: no cover + """based on the dtype, return our labels""" + closed = "right" if right else "left" + + formatter: Callable[[Any], Timestamp] | Callable[[Any], Timedelta] + + if is_datetime64tz_dtype(dtype): + formatter = lambda x: Timestamp(x, tz=dtype.tz) + adjust = lambda x: x - Timedelta("1ns") + elif is_datetime64_dtype(dtype): + formatter = Timestamp + adjust = lambda x: x - Timedelta("1ns") + elif is_timedelta64_dtype(dtype): + formatter = Timedelta + adjust = lambda x: x - Timedelta("1ns") + else: + precision = _infer_precision(precision, bins) + formatter = lambda x: _round_frac(x, precision) + adjust = lambda x: x - 10 ** (-precision) + + breaks = [formatter(b) for b in bins] + if right and include_lowest: + # adjust lhs of first interval by precision to account for being right closed + breaks[0] = adjust(breaks[0]) + + return IntervalIndex.from_breaks(breaks, closed=closed) + + +def _searchsorted(x, bins, right): # pragma: no cover + """ + koalas version of np.searchsorted + """ + bins = sorted(bins) + d = dict(zip(bins, range(len(bins)))) + + x = x.to_frame() + col = x.columns[0] + x["idx_bins"] = len(bins) + + for _bin in bins[::-1]: + mask = x[col] <= _bin.item() if right else x[col] < _bin.item() + x.loc[mask, "idx_bins"] = d[_bin] + + ids = x.pop("idx_bins") + x = x[col] + + return ids + + +def _bins_to_cuts( + x, + bins: np.ndarray, + right: bool = True, + labels=None, + precision: int = 3, + include_lowest: bool = False, + dtype=None, + duplicates: str = "raise", + ordered: bool = True, +): # pragma: no cover + """ + koalas version of pandas.core.reshape.tile._bins_to_cuts + """ + bins = np.unique(bins) + ids = _searchsorted(x, bins, right) + na_mask = x.isna() | (ids == len(bins)) | (ids == 0) + + # hack to bypass "TypeError: 'Series' object does not support item assignment" + ids = ids.to_frame() + ids.loc[na_mask] = 0 + ids = ids[ids.columns[0]] + + if labels: + if not (labels is None or is_list_like(labels)): + raise ValueError( + "Bin labels must either be False, None or passed in as a " + "list-like argument" + ) + elif labels is None: + labels = _format_labels( + bins, precision, right=right, include_lowest=include_lowest, dtype=dtype + ) + elif ordered and len(set(labels)) != len(labels): + raise ValueError( + "labels must be unique if ordered=True; pass ordered=False " + "for duplicate labels" + ) + else: + if len(labels) != len(bins) - 1: + raise ValueError( + "Bin labels must be one fewer than the number of bin edges" + ) + if not is_categorical_dtype(labels): + labels = pd.Categorical( + labels, + categories=labels if len(set(labels)) == len(labels) else None, + ordered=ordered, + ) + + label_mapping = dict(zip(range(len(labels)), labels)) + # x values outside of bins edges (i.e. when ids = 0) are mapped to NaN + result = (ids - 1).map(label_mapping) + result.fillna(np.nan, inplace=True) + + else: + result = ids - 1 + # hack to bypass "TypeError: 'Series' object does not support item assignment" + result = result.to_frame() + result.loc[na_mask] = np.nan + result = result[result.columns[0]] + + return result, bins + + +def _infer_precision(base_precision: int, bins) -> int: # pragma: no cover + """ + Infer an appropriate precision for _round_frac + """ + for precision in range(base_precision, 20): + levels = [_round_frac(b, precision) for b in bins] + if algos.unique(levels).size == bins.size: + return precision + return base_precision # default + + +def _round_frac(x, precision: int): # pragma: no cover + """ + Round the fractional part of the given number + """ + if not np.isfinite(x) or x == 0: + return x + else: + frac, whole = np.modf(x) + if whole == 0: + digits = -int(np.floor(np.log10(abs(frac)))) - 1 + precision + else: + digits = precision + return np.around(x, digits) diff --git a/eds_scikit/utils/framework.py b/eds_scikit/utils/framework.py index ecb8c299..af8b6d3b 100644 --- a/eds_scikit/utils/framework.py +++ b/eds_scikit/utils/framework.py @@ -1,45 +1,50 @@ +from collections import Counter +from functools import partial from types import ModuleType from typing import Dict, Optional -import pandas as _pandas -from databricks import koalas as _koalas +import pandas as pd +from databricks import koalas as ks +from loguru import logger -from eds_scikit.utils.typing import DataFrame, DataObject +from eds_scikit.utils.typing import DataObject -VALID_FRAMEWORKS = { - "pandas": _pandas, - "koalas": _koalas, -} +from .custom_implem.custom_implem import CustomImplem +VALID_FRAMEWORKS = [pd, ks] -def get_framework(obj: DataObject) -> Optional[ModuleType]: - for _, framework in VALID_FRAMEWORKS.items(): + +# TODO: All non class-methods functions below need to be remove + + +def get_framework(obj: DataObject) -> Optional[ModuleType]: # pragma: no cover + for framework in VALID_FRAMEWORKS: if obj.__class__.__module__.startswith(framework.__name__): return framework # raise ValueError(f"Object from unknown framework: {obj}") return None -def is_pandas(obj: DataObject) -> bool: - return get_framework(obj) == _pandas +def is_pandas(obj: DataObject) -> bool: # pragma: no cover + return get_framework(obj) == pd -def is_koalas(obj: DataObject) -> bool: - return get_framework(obj) == _koalas +def is_koalas(obj: DataObject) -> bool: # pragma: no cover + return get_framework(obj) == ks -def to(framework: str, obj: DataObject) -> DataObject: - possible_values = set(VALID_FRAMEWORKS.keys()).union(VALID_FRAMEWORKS.values()) - assert framework in possible_values - if framework == "koalas" or framework is _koalas: +def to(framework: str, obj: DataObject) -> DataObject: # pragma: no cover + if framework == "koalas" or framework is ks: return koalas(obj) - elif framework == "pandas" or framework is _pandas: + elif framework == "pandas" or framework is pd: return pandas(obj) else: raise ValueError(f"Unknown framework: {framework}") -def dict_to(framework: str, d: Dict[str, DataObject]) -> Dict[str, DataObject]: +def dict_to( + framework: str, d: Dict[str, DataObject] +) -> Dict[str, DataObject]: # pragma: no cover d_converted = dict() for k, v in d.items(): if is_pandas(v) or is_koalas(v): @@ -49,8 +54,8 @@ def dict_to(framework: str, d: Dict[str, DataObject]) -> Dict[str, DataObject]: return d_converted -def pandas(obj: DataObject) -> DataObject: - if get_framework(obj) is _pandas: +def pandas(obj: DataObject) -> DataObject: # pragma: no cover + if get_framework(obj) is pd: return obj try: return obj.to_pandas() @@ -59,8 +64,8 @@ def pandas(obj: DataObject) -> DataObject: raise ValueError("Could not convert object to pandas.") -def koalas(obj: DataObject) -> DataObject: - if get_framework(obj) is _koalas: +def koalas(obj: DataObject) -> DataObject: # pragma: no cover + if get_framework(obj) is ks: return obj try: return obj.to_koalas() @@ -68,13 +73,260 @@ def koalas(obj: DataObject) -> DataObject: pass # will raise ValueError if impossible - return _koalas.from_pandas(obj) + return ks.from_pandas(obj) -def add_unique_id(obj: DataFrame, col_name: str = "id") -> DataFrame: - fw = get_framework(obj) - if fw == _pandas: - obj[col_name] = range(len(obj)) - return obj - else: - return obj.koalas.attach_id_column(id_type="distributed", column=col_name) +class BackendDispatcher: + """Dispatcher between pandas, koalas and custom methods. + + In addition to the methods below, use the `BackendDispatcher` class + to access the custom functions defined in [`CustomImplem`](../custom_implem/custom_implem). + + Examples + -------- + + Use a dispatcher function + + >>> from eds_scikit.utils.framework import bd + >>> bd.is_pandas(pd.DataFrame()) + True + + Use a custom implemented function + + >>> df = pd.DataFrame({"categ": ["a", "b", "c"]}) + >>> bd.add_unique_id(df, col_name="id") + categ id + 0 a 0 + 1 b 1 + 2 c 2 + """ + + def get_backend(self, obj) -> Optional[ModuleType]: + """Return the backend of a given object. + + Parameters + ---------- + obj: DataFrame or backend module among pandas or koalas. + + Returns + ------- + backend: a backend among {pd, ks} or None + + Examples + -------- + + Get the backend from a DataFrame and create another DataFrame from it. + This is especially useful at runtime, when you need to infer the + backend of the input. + + >>> backend = bd.get_backend(pd.DataFrame()) + >>> backend + + >>> df = backend.DataFrame() + + >>> bd.get_backend(ks.DataFrame()) + + + For demo purposes, return the backend when provided directly + + >>> bd.get_backend(ks) + + >>> bd.get_backend(spark) + None + """ + if isinstance(obj, str): + return { + "pd": pd, + "pandas": pd, + "ks": ks, + "koalas": ks, + }.get(obj) + + for backend in VALID_FRAMEWORKS: + if ( + obj.__class__.__module__.startswith(backend.__name__) # DataFrame() + or getattr(obj, "__name__", None) == backend.__name__ # pd or ks + ): + return backend + return None + + def is_pandas(self, obj) -> bool: + """Return True when the obj is either a pd.DataFrame or the pandas module.""" + return self.get_backend(obj) is pd + + def is_koalas(self, obj: DataObject) -> bool: + """Return True when the obj is either a ks.DataFrame or the koalas module.""" + return self.get_backend(obj) is ks + + def to(self, obj, backend): + """Convert a dataframe to the provided backend. + + Parameters + ---------- + obj: DataFrame or iterable of DataFrame (list, tuple, dict) + The object(s) to convert to the provided backend + + backend: str, DataFrame or pandas, koalas module + The desired output backend. + + Returns + ------- + out: DataFrame or iterabel of DataFrame (list, tuple, dict) + The converted object, in the same format as provided in input. + + Examples + -------- + + Convert a single DataFrame + + >>> df = pd.DataFrame({"a": [1, 2]}) + >>> kdf = bd.to(df, backend="koalas") + >>> type(kdf) + databricks.koalas.frame.DataFrame + + Convert a list of DataFrame + + >>> extra_kdf = ks.DataFrame({"b": [0, 1]}) + >>> another_kdf = ks.DataFrame({"c": [2, 3]}) + >>> kdf_list = [kdf, extra_kdf, another_kdf] + >>> df_list = bd.to(kdf_list, backend="pandas") + >>> type(df_list) + list + >>> len(df_list) + 3 + >>> type(df_list[0]) + pandas.core.frame.DataFrame + + Convert a dictionnary of DataFrame + + >>> df_dict = {"df_1": pd.DataFrame({"a": [1, 2]}), "df_2": pd.DataFrame({"a": [2, 3]})} + >>> kdf_dict = bd.to(df_dict, backend="koalas") + >>> type(kdf_dict) + dict + >>> kdf_dict.keys() + dict_keys(["df_1", "df_2"]) + >>> type(kdf_dict["df_1"]) + databricks.koalas.frame.DataFrame + """ + if isinstance(obj, (list, tuple)): + results = [] + for _obj in obj: + results.append(self.to(_obj, backend)) + return results + + if isinstance(obj, dict): + results = {} + for k, _obj in obj.items(): + results[k] = self.to(_obj, backend) + return results + + backend = self.get_backend(backend) + + if self.is_pandas(backend): + return self.to_pandas(obj) + elif self.is_koalas(backend): + return self.to_koalas(obj) + else: + raise ValueError("Unknown backend") + + def to_pandas(self, obj: DataObject) -> DataObject: + if self.get_backend(obj) is pd: + return obj + return obj.to_pandas() + + def to_koalas(self, obj: DataObject) -> DataObject: + if self.get_backend(obj) is ks: + return obj + try: + return obj.to_koalas() + except AttributeError: + pass + # Will raise ValueError if impossible + return ks.from_pandas(obj) + + def __getattr__(self, method): + # Any method that doesn't belong directly to `BackendDispatcher` will + # be picked by __getattr__. + # + # `__getattr__` returns the self.get_params function, wrapped with + # the queried `method` as a parameter. + # + # This way, `get_param` will be able to call `method`, dispatched between + # the desired backend, with the args and kwargs initially provided. + return partial(self.get_params, method) + + def get_params(self, method, *args, backend=None, **kwargs): + # This method should only be called by `__getattr__`. + # + # `get_params` dispatches the call to a backend (pandas or koalas) + # chosen with simple heuristics. + if backend is not None: + backend = self.get_backend(backend) + if backend is None: + raise ValueError("Unknown backend") + else: + backend = self._get_backend_from_params( + *args, **kwargs + ) or self._get_backend_from_method(method) + + if method in dir(backend): + # Use the native method + return getattr(backend, method)(*args, **kwargs) + elif method in dir(CustomImplem): + # Use our implementation + return getattr(CustomImplem, method)(*args, backend=backend, **kwargs) + else: + raise NotImplementedError( + f"Method '{method}' doesn't belong to pandas or koalas " + f"and is not implemented in eds_scikit yet." + ) + + def _get_backend_from_params(self, *args, **kwargs): + counter = Counter() + all_args = [*args, *kwargs.values()] + self._count_backend_from_args(all_args, counter) + if bool(counter["ks"]) and bool(counter["pd"]): + raise ValueError( + "Inputs have mixed types of Koalas and Pandas dataframes," + "which is not supported.\n" + "Please convert your dataframes using fw.to_pandas(df) or fw.to_koalas(df)" + ) + elif counter["pd"]: + return pd + elif counter["ks"]: + return ks + else: + return None + + def _count_backend_from_args(self, all_args, counter): + for arg in all_args: + if isinstance(arg, (list, set, tuple)): + self._count_backend_from_args(arg, counter) + counter["pd"] += self.is_pandas(arg) + counter["ks"] += self.is_koalas(arg) + + def _get_backend_from_method(self, method): + backends = [] + for backend in VALID_FRAMEWORKS: + methods = [ + d for d in dir(backend) if ("__" not in d) and (not d.startswith("_")) + ] + if method in methods: + backends.append(backend) + + if len(backends) == 0: + return None + + if len(backends) > 1: + logger.warning( + f"Both Pandas and Koalas have method {method}." + "Pandas will be used by default." + "You can change this behaviour by setting 'backend':\n\n" + f" fw.{method}(..., backend='koalas', ...)" + ) + return pd + + return backends[0] + + +bd = BackendDispatcher() diff --git a/tests/test_age_pyramid.py b/tests/test_age_pyramid.py index 6edb233b..98b6c7f0 100644 --- a/tests/test_age_pyramid.py +++ b/tests/test_age_pyramid.py @@ -2,6 +2,7 @@ from pathlib import Path import altair as alt +import numpy as np import pytest from pandas.core.series import Series from pandas.testing import assert_frame_equal @@ -12,23 +13,48 @@ data = load_person() -def test_plot_age_pyramid(): +@pytest.mark.parametrize( + "datetime_ref", + [ + None, + datetime(2020, 1, 1), + np.full(data.person.shape[0], datetime(2020, 1, 1)), + ], +) +def test_age_pyramid_datetime_ref_format(datetime_ref): original_person = data.person.copy() - datetime_ref = datetime(2020, 1, 1) - chart, group_gender_age = plot_age_pyramid(data.person, datetime_ref, savefig=False) + chart = plot_age_pyramid( + data.person, datetime_ref, savefig=False, return_vector=False + ) assert isinstance(chart, alt.vegalite.v4.api.ConcatChart) - assert isinstance(group_gender_age, Series) # Check that the data is unchanged assert_frame_equal(original_person, data.person) + +def test_age_pyramid_output(): + filename = "test.html" - _ = plot_age_pyramid(data.person, savefig=True, filename=filename) + plot_age_pyramid(data.person, savefig=True, filename=filename) path = Path(filename) assert path.exists() path.unlink() + group_gender_age = plot_age_pyramid( + data.person, savefig=True, return_vector=True, filename=filename + ) + assert isinstance(group_gender_age, Series) + + chart, group_gender_age = plot_age_pyramid( + data.person, savefig=False, return_vector=True + ) + assert isinstance(chart, alt.vegalite.v4.api.ConcatChart) + assert isinstance(group_gender_age, Series) + + chart = plot_age_pyramid(data.person, savefig=False, return_vector=False) + assert isinstance(chart, alt.vegalite.v4.api.ConcatChart) + with pytest.raises(ValueError, match="You have to set a filename"): _ = plot_age_pyramid(data.person, savefig=True, filename=None) diff --git a/tests/test_backend_dispatcher.py b/tests/test_backend_dispatcher.py new file mode 100644 index 00000000..64fa678c --- /dev/null +++ b/tests/test_backend_dispatcher.py @@ -0,0 +1,137 @@ +import logging +from datetime import datetime + +import numpy as np +import pandas as pd +import pyspark +import pytest +from databricks import koalas as ks +from numpy.testing import assert_array_equal +from pandas.testing import assert_frame_equal + +import eds_scikit +from eds_scikit.datasets.synthetic.person import load_person +from eds_scikit.utils.framework import bd + +spark, sc, sql = eds_scikit.improve_performances() + +data = load_person() +df = data.person + + +def assert_df_is_pandas(df): + assert df.__class__.__module__.startswith(pd.__name__) + + +def assert_df_is_koalas(df): + assert df.__class__.__module__.startswith(ks.__name__) + + +def test_get_backend(): + assert bd.get_backend(pd) is pd + assert bd.get_backend(ks) is ks + + assert bd.get_backend(pd.DataFrame()) is pd + assert bd.get_backend(ks.DataFrame()) is ks + + assert bd.get_backend(pyspark) is None + assert bd.get_backend(np.array([])) is None + assert bd.get_backend([pd.DataFrame()]) is None + assert bd.get_backend([ks.DataFrame()]) is None + + +def test_is_pandas_is_koalas(): + assert bd.is_pandas(pd) + assert not bd.is_pandas(ks) + + assert bd.is_koalas(ks) + assert not bd.is_koalas(pd) + + +def test_to(): + kdf = ks.from_pandas(df) + + assert_df_is_pandas(bd.to(df, backend=pd)) + assert_df_is_koalas(bd.to(df, backend=ks)) + + assert_df_is_pandas(bd.to(kdf, backend="pandas")) + assert_df_is_koalas(bd.to(kdf, backend="koalas")) + + kdf_list = bd.to([df, df, df], backend="koalas") + assert all([bd.is_koalas(_kdf) for _kdf in kdf_list]) + + df_tuple = bd.to((kdf, kdf, kdf), backend="pandas") + assert all([bd.is_pandas(_df) for _df in df_tuple]) + + df_dict = bd.to({"df_1": kdf, "df_2": kdf}, backend="pandas") + assert df_dict.keys() == {"df_1", "df_2"} + assert all([bd.is_pandas(el) for el in df_dict.values()]) + + with pytest.raises(ValueError, match="Unknown backend"): + bd.to(df, backend="spark") + + +@pytest.mark.parametrize("backend", ["pd", pd, df]) +def test_get_params_pd_backend(backend): + + today = datetime.today() + deltas = today - df["birth_datetime"] + deltas = deltas.dt.total_seconds() + df["age"] = deltas / (365 * 24 * 3600) + + bins = np.arange(0, 100, 10) + labels = [f"{left}-{right}" for left, right in zip(bins[:-1], bins[1:])] + age_bins_ref = pd.cut(df["age"], bins=bins, labels=labels) + + age_bins = bd.cut(df["age"], bins=bins, labels=labels, backend=backend) + assert_array_equal(age_bins_ref.astype(str), age_bins.astype(str)) + + +@pytest.mark.parametrize("backend", ["ks", ks]) +def test_get_params_ks_backend(backend): + + df = load_person().person + + kdf = ks.from_pandas(df) + kdf = bd.add_unique_id(kdf, backend=backend) + + df = bd.add_unique_id(df, backend=pd) + assert_array_equal(kdf["id"].to_pandas(), df["id"]) + + with pytest.raises(ValueError, match="Unknown backend"): + bd.add_unique_id(kdf, backend="spark") + + +def test_get_params_from_params(): + expected_result = pd.concat([df, df], axis=0) + result = bd.concat([df, df], axis=0) + assert_frame_equal(expected_result, result) + + kdf = ks.from_pandas(df) + expected_result = ks.concat([kdf, kdf], axis=0).to_pandas() + result = bd.concat([kdf, kdf], axis=0).to_pandas() + assert_frame_equal(expected_result, result) + + with pytest.raises(ValueError): + # mixing backend is not supported + bd.concat([df, kdf], axis=0) + + +def test_get_params_from_method(caplog): + + with caplog.at_level(logging.WARNING): + # Both pd.isna and ks.isna exist, + # so not providing a "backend" argument will + # raise a warning. + bd.isna(1) + + assert bd.isna(1, backend="pandas") == bd.isna(1, backend="koalas") + + msg = ( + "Method 'optimize' doesn't belong to pandas or koalas " + "and is not implemented in eds_scikit yet." + ) + with pytest.raises(NotImplementedError, match=msg): + bd.optimize() + + bd.value_counts(list("scikit")) From 46286b3024bf390c49b8111a0e5a54d97d933849 Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 9 Jan 2023 11:09:47 +0100 Subject: [PATCH 08/25] fix i2b2_renaming --- eds_scikit/io/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eds_scikit/io/settings.py b/eds_scikit/io/settings.py index 1fa9c353..56d7e146 100644 --- a/eds_scikit/io/settings.py +++ b/eds_scikit/io/settings.py @@ -308,7 +308,7 @@ "location_cd": "care_site_source_value", "sourcesystem_cd": "cdm_source", }, - "concept": {"cd_concept": "concept_id", "lib_concept": "concept_name"}, + "concept": {"concept_cd": "concept_id", "lib_concept": "concept_name"}, } From e3c863ea70e06ad665565248c2481a9511df0cba Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 9 Jan 2023 11:09:47 +0100 Subject: [PATCH 09/25] fix i2b2_renaming --- eds_scikit/io/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eds_scikit/io/settings.py b/eds_scikit/io/settings.py index 56d7e146..277a6812 100644 --- a/eds_scikit/io/settings.py +++ b/eds_scikit/io/settings.py @@ -308,7 +308,7 @@ "location_cd": "care_site_source_value", "sourcesystem_cd": "cdm_source", }, - "concept": {"concept_cd": "concept_id", "lib_concept": "concept_name"}, + "concept": {"concept_cd": "concept_id", "name_char": "concept_name"}, } From 4215299bcc576a1c08561c2f0ac3511396e1df52 Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 9 Jan 2023 17:02:43 +0100 Subject: [PATCH 10/25] i2b2_renaming --- eds_scikit/io/settings.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eds_scikit/io/settings.py b/eds_scikit/io/settings.py index 277a6812..7a0ace5e 100644 --- a/eds_scikit/io/settings.py +++ b/eds_scikit/io/settings.py @@ -211,25 +211,25 @@ i2b2_tables = { "edsprod": { "visit_occurrence": "i2b2_orbis_visit_dim", - "note": "i2b2_observation_fact_document", + "note_deid": "i2b2_observation_fact_document", "person": "i2b2_patient_dim", "condition_occurrence": "i2b2_arem_observation_fact_cim10", "procedure_occurrence": "i2b2_arem_observation_fact_ccam", "care_site": "i2b2_observation_fact_ufr", "visit_detail": "i2b2_observation_fact_ufr", - "biology": "i2b2_observation_fact_lab", + "measurement": "i2b2_observation_fact_lab", "fact_relationship": "i2b2_observation_fact_ufr", "concept": "orbis_form_ref_concept_list", }, "cse": { "visit_occurrence": "i2b2_visit", - "note": "i2b2_observation_doc", + "note_deid": "i2b2_observation_doc", "person": "i2b2_patient", "condition_occurrence": "i2b2_observation_cim10", "procedure_occurrence": "i2b2_observation_ccam", "care_site": "i2b2_observation_ufr", "visit_detail": "i2b2_observation_ufr", - "biology": "i2b2_observation_lab", + "measurement": "i2b2_observation_lab", "fact_relationship": "i2b2_observation_ufr", "concept": "i2b2_concept", }, @@ -238,7 +238,7 @@ # Mapping between i2b2 and OMOP i2b2_renaming = { - "note": { + "note_deid": { "instance_num": "note_id", "start_date": "note_datetime", "concept_cd": "note_class_source_value", @@ -295,7 +295,7 @@ "sourcesystem_cd": "cdm_source", "i2b2_action": "row_status_source_value", }, - "biology": { + "measurement": { "patient_num": "person_id", "encounter_num": "visit_occurrence_id", "instance_num": "biology_occurrence_id", From 8ac0fe3eba4c6b50399614e41bb74accbce38ffc Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Sun, 22 Jan 2023 18:00:29 +0000 Subject: [PATCH 11/25] udated mapping for i2b2 --- eds_scikit/io/hive.py | 6 +- eds_scikit/io/i2b2_mapping.py | 50 ++++++++++--- eds_scikit/io/settings.py | 132 ++++++++++++++++++---------------- 3 files changed, 113 insertions(+), 75 deletions(-) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index da6f16aa..1694de22 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -123,8 +123,8 @@ def __init__( self.database_source = "cse" if "cse" in self.database_name else "edsprod" self.omop_to_i2b2 = settings.i2b2_tables[self.database_source] self.i2b2_to_omop = defaultdict(list) - for omop_col, i2b2_col in self.omop_to_i2b2.items(): - self.i2b2_to_omop[i2b2_col].append(omop_col) + for omop_table, i2b2_table in self.omop_to_i2b2.items(): + self.i2b2_to_omop[i2b2_table].append(omop_table) self.person_ids = self._prepare_person_ids(person_ids) @@ -163,6 +163,8 @@ def list_available_tables(self) -> List[str]: for omop_table in self.i2b2_to_omop[table_name]: if omop_table in self.tables_to_load.keys(): available_tables.add(omop_table) + if self.database_type=="I2B2": + available_tables = available_tables | set([omop for omop,i2b2 in self.omop_to_i2b2.items() if omop is not None]) return list(available_tables) def rename_table(self, old_table_name: str, new_table_name: str) -> None: diff --git a/eds_scikit/io/i2b2_mapping.py b/eds_scikit/io/i2b2_mapping.py index 0b680f33..dc566fca 100644 --- a/eds_scikit/io/i2b2_mapping.py +++ b/eds_scikit/io/i2b2_mapping.py @@ -1,9 +1,12 @@ from typing import Dict +import pandas as pd + from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.functions import udf as FunctionUDF +import pyspark.sql.types as T from .settings import ( dict_code_UFR, @@ -33,13 +36,16 @@ def get_i2b2_table( Spark DataFrame extracted from the i2b2 database given and converted to OMOP standard. """ - table_name = i2b2_tables[db_source][table] - columns = i2b2_renaming[table] - if db_source == "cse": - columns.pop("i2b2_action", None) - query = ",".join([f"{k} AS {v}" for k, v in columns.items()]) + table_name = i2b2_tables[db_source][table] # I2B2 table + columns = i2b2_renaming.get(table) # Dictionary of omop_col -> i2b2_col + + if columns is not None: # Can be None if creating a table from scratch (e.g. concept_relationship + available_columns = set(spark_session.sql(f"SELECT * FROM {db_name}.{table_name}").columns) + if db_source == "cse": + columns.pop("i2b2_action", None) + query = ",".join([f"{i2b2} AS {omop}" for omop, i2b2 in columns.items() if i2b2 in available_columns]) - df = spark_session.sql(f"""SELECT {query} FROM {db_name}.{table_name}""") + df = spark_session.sql(f"""SELECT {query} FROM {db_name}.{table_name}""") # Special mapping for i2b2 : @@ -135,12 +141,28 @@ def get_i2b2_table( df = df.withColumn("visit_detail_type_source_value", F.lit("PASS")) df = df.withColumn("row_status_source_value", F.lit("Actif")) - # biology - elif table == "biology": + # measurement + elif table == "measurement": df = df.withColumn( - "biology_source_value", F.substring(F.col("biology_source_value"), 5, 20) + "measurement_source_concept_id", F.substring(F.col("measurement_source_concept_id"), 5, 20) + ).withColumn("row_status_source_value", F.lit("Validé")) + + # concept + elif table == "concept": + df = df.withColumn( + "concept_id", F.substring(F.col("concept_source_value"), 5, 20) # TODO: use regexp_extract to take substring after ':' + ).withColumn("concept_code", F.col("concept_id")).withColumn( + "vocabulary_id", F.lit("ANABIO") ) - + + # Adding LOINC + loinc_pd = pd.read_csv("~/Thomas/concept_loinc.csv") + assert len(loinc_pd.columns) == len(df.columns) + loinc_pd = loinc_pd[df.columns] # for columns ordering + df = df.union( + spark_session.createDataFrame(loinc_pd, df.schema, verifySchema=False) + ).cache() + # fact_relationship elif table == "fact_relationship": # Retrieve UF information @@ -158,6 +180,14 @@ def get_i2b2_table( # Only UF-Hospital relationships in i2b2 df = df.withColumn("relationship_concept_id", F.lit(46233688)) # Included in + elif table == "concept_relationship": + df_pd = pd.read_csv("~/Thomas/concept_relationship.csv") + schema = T.StructType([ + T.StructField('concept_id_1', T.StringType(), True), + T.StructField('concept_id_2', T.StringType(), True), + T.StructField('relationship_id', T.StringType(), True) + ]) + df = spark_session.createDataFrame(df_pd,schema).cache() return df diff --git a/eds_scikit/io/settings.py b/eds_scikit/io/settings.py index 7a0ace5e..8e1d7bdb 100644 --- a/eds_scikit/io/settings.py +++ b/eds_scikit/io/settings.py @@ -232,86 +232,92 @@ "measurement": "i2b2_observation_lab", "fact_relationship": "i2b2_observation_ufr", "concept": "i2b2_concept", + "concept_relationship": None, }, } # Mapping between i2b2 and OMOP i2b2_renaming = { - "note_deid": { - "instance_num": "note_id", - "start_date": "note_datetime", - "concept_cd": "note_class_source_value", - "encounter_num": "visit_occurrence_id", - "patient_num": "person_id", - "observation_blob": "note_text", - "sourcesystem_cd": "cdm_source", - "i2b2_action": "row_status_source_value", + "care_site": {"care_site_source_value": "location_cd"}, + "concept": { + "concept_id": "concept_cd", + "concept_name": "name_char", + "concept_source_value": "concept_cd", }, - "person": { - "patient_num": "person_id", - "birth_date": "birth_datetime", - "death_date": "death_datetime", - "sex_cd": "gender_source_value", - "sourcesystem_cd": "cdm_source", + "condition_occurrence": { + "cdm_source": "sourcesystem_cd", + "condition_occurrence_id": "instance_num", + "condition_source_value": "concept_cd", + "condition_start_datetime": "start_date", + "condition_status_source_value": "tval_char", + "person_id": "patient_num", + "visit_occurrence_id": "encounter_num", }, - "visit_occurrence": { - "patient_num": "person_id", - "encounter_num": "visit_occurrence_id", - "visit_blob": "visit_occurrence_source_value", - "start_date": "visit_start_datetime", - "end_date": "visit_end_datetime", - "sourcesystem_cd": "cdm_source", - "type_visite": "visit_source_value", - "mode_entree": "admitted_from_source_value", - "mode_sortie": "discharge_to_source_value", - "i2b2_action": "row_status_source_value", + "fact_relationship": { + "care_site_source_value": "location_cd", + "cdm_source": "sourcesystem_cd", }, - "condition_occurrence": { - "patient_num": "person_id", - "encounter_num": "visit_occurrence_id", - "instance_num": "condition_occurrence_id", - "start_date": "condition_start_datetime", - "concept_cd": "condition_source_value", - "tval_char": "condition_status_source_value", - "sourcesystem_cd": "cdm_source", + "measurement": { + "cdm_source": "sourcesystem_cd", + "measurement_date": "start_date", + "measurement_datetime": "start_date", + "measurement_id": "instance_num", + "measurement_source_concept_id": "concept_cd", + "person_id": "patient_num", + "unit_source_value": "units_cd", + "value_as_number": "nval_num", + "visit_occurrence_id": "encounter_num", + }, + "note_deid": { + "cdm_source": "sourcesystem_cd", + "note_class_source_value": "concept_cd", + "note_datetime": "start_date", + "note_id": "instance_num", + "note_text": "observation_blob", + "person_id": "patient_num", + "row_status_source_value": "i2b2_action", + "visit_occurrence_id": "encounter_num", + }, + "person": { + "birth_datetime": "birth_date", + "cdm_source": "sourcesystem_cd", + "death_datetime": "death_date", + "gender_source_value": "sex_cd", + "person_id": "patient_num", }, "procedure_occurrence": { - "patient_num": "person_id", - "encounter_num": "visit_occurrence_id", - "instance_num": "procedure_occurrence_id", - "start_date": "procedure_start_datetime", - "concept_cd": "procedure_source_value", - "sourcesystem_cd": "cdm_source", + "cdm_source": "sourcesystem_cd", + "person_id": "patient_num", + "procedure_occurrence_id": "instance_num", + "procedure_source_value": "concept_cd", + "procedure_start_datetime": "start_date", + "visit_occurrence_id": "encounter_num", }, - "care_site": {"location_cd": "care_site_source_value"}, "visit_detail": { - "instance_num": "visit_detail_id", - "encounter_num": "visit_occurrence_id", - "location_cd": "care_site_id", - "patient_num": "person_id", - "start_date": "visit_detail_start_datetime", - "end_date": "visit_detail_end_datetime", - "sourcesystem_cd": "cdm_source", - "i2b2_action": "row_status_source_value", - }, - "measurement": { - "patient_num": "person_id", - "encounter_num": "visit_occurrence_id", - "instance_num": "biology_occurrence_id", - "start_date": "biology_start_datetime", - "concept_cd": "biology_source_value", - "sourcesystem_cd": "cdm_source", - "nval_num": "biology_status_source_value", + "care_site_id": "location_cd", + "cdm_source": "sourcesystem_cd", + "person_id": "patient_num", + "row_status_source_value": "i2b2_action", + "visit_detail_end_datetime": "end_date", + "visit_detail_id": "instance_num", + "visit_detail_start_datetime": "start_date", + "visit_occurrence_id": "encounter_num", }, - "fact_relationship": { - "location_cd": "care_site_source_value", - "sourcesystem_cd": "cdm_source", + "visit_occurrence": { + "admitted_from_source_value": "mode_entree", + "cdm_source": "sourcesystem_cd", + "discharge_to_source_value": "mode_sortie", + "person_id": "patient_num", + "row_status_source_value": "i2b2_action", + "visit_end_datetime": "end_date", + "visit_occurrence_id": "encounter_num", + "visit_occurrence_source_value": "visit_blob", + "visit_source_value": "type_visite", + "visit_start_datetime": "start_date", }, - "concept": {"concept_cd": "concept_id", "name_char": "concept_name"}, } - sex_cd_mapping = {"W": "f", "M": "m"} From e00f7e50038b49bd657c2a714b9d5e2b440079f8 Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Sun, 22 Jan 2023 18:19:04 +0000 Subject: [PATCH 12/25] fix concept_source_value in i2b2 mapping --- eds_scikit/io/i2b2_mapping.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/eds_scikit/io/i2b2_mapping.py b/eds_scikit/io/i2b2_mapping.py index dc566fca..47300f9c 100644 --- a/eds_scikit/io/i2b2_mapping.py +++ b/eds_scikit/io/i2b2_mapping.py @@ -150,8 +150,12 @@ def get_i2b2_table( # concept elif table == "concept": df = df.withColumn( - "concept_id", F.substring(F.col("concept_source_value"), 5, 20) # TODO: use regexp_extract to take substring after ':' - ).withColumn("concept_code", F.col("concept_id")).withColumn( + "concept_source", F.substring(F.col("concept_source_value"), 5, 20) # TODO: use regexp_extract to take substring after ':' + ).withColumn( + "concept_id", "concept_source_value" + ).withColumn( + "concept_code", F.col("concept_id") + ).withColumn( "vocabulary_id", F.lit("ANABIO") ) From 53f6e7658f9b0d261db961594c08df655a575519 Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Sun, 22 Jan 2023 18:27:20 +0000 Subject: [PATCH 13/25] typo --- eds_scikit/io/i2b2_mapping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eds_scikit/io/i2b2_mapping.py b/eds_scikit/io/i2b2_mapping.py index 47300f9c..380cca82 100644 --- a/eds_scikit/io/i2b2_mapping.py +++ b/eds_scikit/io/i2b2_mapping.py @@ -150,9 +150,9 @@ def get_i2b2_table( # concept elif table == "concept": df = df.withColumn( - "concept_source", F.substring(F.col("concept_source_value"), 5, 20) # TODO: use regexp_extract to take substring after ':' + "concept_source_value", F.substring(F.col("concept_source_value"), 5, 20) # TODO: use regexp_extract to take substring after ':' ).withColumn( - "concept_id", "concept_source_value" + "concept_id", F.col("concept_source_value") ).withColumn( "concept_code", F.col("concept_id") ).withColumn( From 798e9daac77d624019fcd5e2b9459acbd5318c4b Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 23 Jan 2023 14:33:05 +0100 Subject: [PATCH 14/25] clean code --- eds_scikit/io/hive.py | 19 ++++----- eds_scikit/io/i2b2_mapping.py | 72 ++++++++++++++++++++--------------- eds_scikit/io/settings.py | 7 +++- 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index 1694de22..fa2a3bae 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -151,20 +151,15 @@ def list_available_tables(self) -> List[str]: ).toPandas() available_tables = set() session_tables = tables_df["tableName"].drop_duplicates().to_list() + session_tables = list(set(session_tables) - set(self.tables_to_load)) for table_name in session_tables: - if ( - self.database_type == "OMOP" - and table_name in self.tables_to_load.keys() - ): + if self.database_type == "OMOP": available_tables.add(table_name) - elif ( - self.database_type == "I2B2" and table_name in self.i2b2_to_omop.keys() - ): - for omop_table in self.i2b2_to_omop[table_name]: - if omop_table in self.tables_to_load.keys(): - available_tables.add(omop_table) - if self.database_type=="I2B2": - available_tables = available_tables | set([omop for omop,i2b2 in self.omop_to_i2b2.items() if omop is not None]) + elif self.database_type == "I2B2": + for omop_table in self.i2b2_to_omop.get(table_name, []): + available_tables.add(omop_table) + if self.database_type == "I2B2": + available_tables |= set(self.omop_to_i2b2) return list(available_tables) def rename_table(self, old_table_name: str, new_table_name: str) -> None: diff --git a/eds_scikit/io/i2b2_mapping.py b/eds_scikit/io/i2b2_mapping.py index 380cca82..8c150c54 100644 --- a/eds_scikit/io/i2b2_mapping.py +++ b/eds_scikit/io/i2b2_mapping.py @@ -1,12 +1,11 @@ from typing import Dict import pandas as pd - +import pyspark.sql.types as T from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.functions import udf as FunctionUDF -import pyspark.sql.types as T from .settings import ( dict_code_UFR, @@ -36,19 +35,27 @@ def get_i2b2_table( Spark DataFrame extracted from the i2b2 database given and converted to OMOP standard. """ - table_name = i2b2_tables[db_source][table] # I2B2 table - columns = i2b2_renaming.get(table) # Dictionary of omop_col -> i2b2_col - - if columns is not None: # Can be None if creating a table from scratch (e.g. concept_relationship - available_columns = set(spark_session.sql(f"SELECT * FROM {db_name}.{table_name}").columns) + i2b2_table_name = i2b2_tables[db_source][table] + # Dictionary of omop_col -> i2b2_col + columns = i2b2_renaming.get(table) + + # Can be None if creating a table from scratch (e.g. concept_relationship + if columns is not None: + query = f"describe {db_name}.{i2b2_table_name}" + available_columns = set(spark_session.sql(query).toPandas().col_name.tolist()) if db_source == "cse": columns.pop("i2b2_action", None) - query = ",".join([f"{i2b2} AS {omop}" for omop, i2b2 in columns.items() if i2b2 in available_columns]) - - df = spark_session.sql(f"""SELECT {query} FROM {db_name}.{table_name}""") + cols = ", ".join( + [ + f"{i2b2} AS {omop}" + for omop, i2b2 in columns.items() + if i2b2 in available_columns + ] + ) + query = f"SELECT {cols} FROM {db_name}.{i2b2_table_name}" + df = spark_session.sql(query) # Special mapping for i2b2 : - # CIM10 if table == "condition_occurrence": df = df.withColumn( @@ -144,29 +151,32 @@ def get_i2b2_table( # measurement elif table == "measurement": df = df.withColumn( - "measurement_source_concept_id", F.substring(F.col("measurement_source_concept_id"), 5, 20) + "measurement_source_concept_id", + F.substring(F.col("measurement_source_concept_id"), 5, 20), ).withColumn("row_status_source_value", F.lit("Validé")) - + # concept elif table == "concept": - df = df.withColumn( - "concept_source_value", F.substring(F.col("concept_source_value"), 5, 20) # TODO: use regexp_extract to take substring after ':' - ).withColumn( - "concept_id", F.col("concept_source_value") - ).withColumn( - "concept_code", F.col("concept_id") - ).withColumn( - "vocabulary_id", F.lit("ANABIO") + df = ( + df.withColumn( + "concept_source_value", + F.substring( + F.col("concept_source_value"), 5, 20 + ), # TODO: use regexp_extract to take substring after ':' + ) + .withColumn("concept_id", F.col("concept_source_value")) + .withColumn("concept_code", F.col("concept_id")) + .withColumn("vocabulary_id", F.lit("ANABIO")) ) - + # Adding LOINC loinc_pd = pd.read_csv("~/Thomas/concept_loinc.csv") assert len(loinc_pd.columns) == len(df.columns) - loinc_pd = loinc_pd[df.columns] # for columns ordering + loinc_pd = loinc_pd[df.columns] # for columns ordering df = df.union( spark_session.createDataFrame(loinc_pd, df.schema, verifySchema=False) ).cache() - + # fact_relationship elif table == "fact_relationship": # Retrieve UF information @@ -186,12 +196,14 @@ def get_i2b2_table( elif table == "concept_relationship": df_pd = pd.read_csv("~/Thomas/concept_relationship.csv") - schema = T.StructType([ - T.StructField('concept_id_1', T.StringType(), True), - T.StructField('concept_id_2', T.StringType(), True), - T.StructField('relationship_id', T.StringType(), True) - ]) - df = spark_session.createDataFrame(df_pd,schema).cache() + schema = T.StructType( + [ + T.StructField("concept_id_1", T.StringType(), True), + T.StructField("concept_id_2", T.StringType(), True), + T.StructField("relationship_id", T.StringType(), True), + ] + ) + df = spark_session.createDataFrame(df_pd, schema).cache() return df diff --git a/eds_scikit/io/settings.py b/eds_scikit/io/settings.py index 8e1d7bdb..e99dee0d 100644 --- a/eds_scikit/io/settings.py +++ b/eds_scikit/io/settings.py @@ -239,9 +239,12 @@ # Mapping between i2b2 and OMOP i2b2_renaming = { - "care_site": {"care_site_source_value": "location_cd"}, + "care_site": { + "care_site_source_value": "location_cd", + "care_site_short_name": "care_site_name", + }, "concept": { - "concept_id": "concept_cd", + "concept_id": "concept_cd", "concept_name": "name_char", "concept_source_value": "concept_cd", }, From 83c4804b4f82a25adb544b12b710c947e5f0fbd9 Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 23 Jan 2023 15:10:47 +0100 Subject: [PATCH 15/25] fix set operator --- eds_scikit/io/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index fa2a3bae..f98a25c1 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -151,7 +151,7 @@ def list_available_tables(self) -> List[str]: ).toPandas() available_tables = set() session_tables = tables_df["tableName"].drop_duplicates().to_list() - session_tables = list(set(session_tables) - set(self.tables_to_load)) + session_tables = list(set(session_tables) & set(self.tables_to_load)) for table_name in session_tables: if self.database_type == "OMOP": available_tables.add(table_name) From 12040ff6cca113057372a0f125ab7c66748dc5f3 Mon Sep 17 00:00:00 2001 From: Vincent M Date: Mon, 23 Jan 2023 19:21:30 +0100 Subject: [PATCH 16/25] hotfix --- eds_scikit/io/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index f98a25c1..b20c7913 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -159,7 +159,7 @@ def list_available_tables(self) -> List[str]: for omop_table in self.i2b2_to_omop.get(table_name, []): available_tables.add(omop_table) if self.database_type == "I2B2": - available_tables |= set(self.omop_to_i2b2) + available_tables |= set(self.omop_to_i2b2) - {None} return list(available_tables) def rename_table(self, old_table_name: str, new_table_name: str) -> None: From 3aebc53fd49fe63bb28b5438f11833e705b4e3bf Mon Sep 17 00:00:00 2001 From: Thomas PETIT-JEAN Date: Fri, 27 Jan 2023 17:03:54 +0100 Subject: [PATCH 17/25] feat: use i2b2 CSVs from registry --- eds_scikit/io/i2b2_mapping.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/eds_scikit/io/i2b2_mapping.py b/eds_scikit/io/i2b2_mapping.py index 8c150c54..feff034e 100644 --- a/eds_scikit/io/i2b2_mapping.py +++ b/eds_scikit/io/i2b2_mapping.py @@ -1,12 +1,13 @@ from typing import Dict -import pandas as pd import pyspark.sql.types as T from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.functions import udf as FunctionUDF +from eds_scikit.resources import registry + from .settings import ( dict_code_UFR, i2b2_renaming, @@ -170,12 +171,13 @@ def get_i2b2_table( ) # Adding LOINC - loinc_pd = pd.read_csv("~/Thomas/concept_loinc.csv") - assert len(loinc_pd.columns) == len(df.columns) - loinc_pd = loinc_pd[df.columns] # for columns ordering - df = df.union( - spark_session.createDataFrame(loinc_pd, df.schema, verifySchema=False) - ).cache() + if "get_additional_i2b2_concept" in registry.data.get_all(): + loinc_pd = registry.get("data", "get_additional_i2b2_concept")() + assert len(loinc_pd.columns) == len(df.columns) + loinc_pd = loinc_pd[df.columns] # for columns ordering + df = df.union( + spark_session.createDataFrame(loinc_pd, df.schema, verifySchema=False) + ).cache() # fact_relationship elif table == "fact_relationship": @@ -195,7 +197,7 @@ def get_i2b2_table( df = df.withColumn("relationship_concept_id", F.lit(46233688)) # Included in elif table == "concept_relationship": - df_pd = pd.read_csv("~/Thomas/concept_relationship.csv") + data = [] schema = T.StructType( [ T.StructField("concept_id_1", T.StringType(), True), @@ -203,7 +205,9 @@ def get_i2b2_table( T.StructField("relationship_id", T.StringType(), True), ] ) - df = spark_session.createDataFrame(df_pd, schema).cache() + if "get_additional_i2b2_concept_relationship" in registry.data.get_all(): + data = registry.get("data", "get_additional_i2b2_concept_relationship")() + df = spark_session.createDataFrame(data, schema).cache() return df From 0ee1a062ece1641d9a47baa350804798cc0c0666 Mon Sep 17 00:00:00 2001 From: Thomas PETIT-JEAN Date: Fri, 27 Jan 2023 17:09:11 +0100 Subject: [PATCH 18/25] fix: logging level from DEBUG to INFO --- eds_scikit/biology/cleaning/cohort.py | 2 +- eds_scikit/biology/cleaning/transform.py | 6 ++--- eds_scikit/biology/utils/process_concepts.py | 4 +-- .../biology/utils/process_measurement.py | 6 ++--- eds_scikit/biology/viz/aggregate.py | 26 +++++++++---------- eds_scikit/biology/viz/wrapper.py | 6 ++--- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/eds_scikit/biology/cleaning/cohort.py b/eds_scikit/biology/cleaning/cohort.py index d8ebbb74..1692411f 100644 --- a/eds_scikit/biology/cleaning/cohort.py +++ b/eds_scikit/biology/cleaning/cohort.py @@ -23,7 +23,7 @@ def select_cohort( DataFrame Filtered DataFrame with selected patients """ - logger.debug("Selecting cohort...") + logger.info("Selecting cohort...") if isinstance(studied_pop, DataFrame.__args__): filtered_measures = measurement.merge( diff --git a/eds_scikit/biology/cleaning/transform.py b/eds_scikit/biology/cleaning/transform.py index a602b526..c1e4090d 100644 --- a/eds_scikit/biology/cleaning/transform.py +++ b/eds_scikit/biology/cleaning/transform.py @@ -46,7 +46,7 @@ def transform_measurement( config = to(get_framework(measurement), config) - logger.debug("Normalizing units...") + logger.info("Normalizing units...") clean_measurement = normalize_unit(measurement) clean_measurement = clean_measurement.merge( config, on=concept_code_cols + ["unit_source_value"] @@ -66,7 +66,7 @@ def transform_measurement( ) clean_measurement = clean_measurement.drop(columns=["Action", "Coefficient"]) - logger.debug("Flagging outliers...") + logger.info("Flagging outliers...") clean_measurement["outlier"] = False clean_measurement["outlier"] = clean_measurement.outlier.mask( (clean_measurement["transformed_value"] > clean_measurement["max_threshold"]) @@ -75,7 +75,7 @@ def transform_measurement( ) if clip: - logger.debug("Clipping extreme values...") + logger.info("Clipping extreme values...") clean_measurement[ "transformed_value" ] = clean_measurement.transformed_value.mask( diff --git a/eds_scikit/biology/utils/process_concepts.py b/eds_scikit/biology/utils/process_concepts.py index 84a5ae72..9947d751 100644 --- a/eds_scikit/biology/utils/process_concepts.py +++ b/eds_scikit/biology/utils/process_concepts.py @@ -98,7 +98,7 @@ def fetch_all_concepts_set( default_concepts_sets = getattr(datasets, concepts_sets_table_name) for concepts_set_name in default_concepts_sets.concepts_set_name: concepts_sets.append(ConceptsSet(concepts_set_name)) - logger.debug("Fetch all concepts-sets from table {}", concepts_sets_table_name) + logger.info("Fetch all concepts-sets from table {}", concepts_sets_table_name) return concepts_sets @@ -329,7 +329,7 @@ def _check_regex( def _override_name_code_with_itm(wide_src_to_std): - logger.debug( + logger.info( "ITM mapper has been identified in your data and will be prioritized for concept names." ) # Get LOINC NAME and code from ITM diff --git a/eds_scikit/biology/utils/process_measurement.py b/eds_scikit/biology/utils/process_measurement.py index db936c4f..22fd32dd 100644 --- a/eds_scikit/biology/utils/process_measurement.py +++ b/eds_scikit/biology/utils/process_measurement.py @@ -28,7 +28,7 @@ def get_valid_measurement(measurement: DataFrame) -> DataFrame: ) measurement_valid = measurement[measurement["row_status_source_value"] == "Validé"] measurement_valid = measurement_valid.drop(columns=["row_status_source_value"]) - logger.debug("Valid measurements have been selected") + logger.info("Valid measurements have been selected") return measurement_valid @@ -85,10 +85,10 @@ def filter_measurement_by_date( if start_date: measurement = measurement[measurement["measurement_date"] >= start_date] - logger.debug("Measurements conducted after {} have been selected", start_date) + logger.info("Measurements conducted after {} have been selected", start_date) if end_date: measurement = measurement[measurement["measurement_date"] <= end_date] - logger.debug("Measurements conducted before {} have been selected", end_date) + logger.info("Measurements conducted before {} have been selected", end_date) return measurement diff --git a/eds_scikit/biology/viz/aggregate.py b/eds_scikit/biology/viz/aggregate.py index 8381648d..8ba490e2 100644 --- a/eds_scikit/biology/viz/aggregate.py +++ b/eds_scikit/biology/viz/aggregate.py @@ -299,12 +299,12 @@ def aggregate_measurement( # Convert DF to Pandas if small enough if is_koalas(measurement): measurement.spark.cache() - logger.debug( + logger.info( "Checking if the Koalas DataFrame is small enough to be converted into Pandas DataFrame" ) size = measurement.shape[0] if size < pd_limit_size: - logger.debug( + logger.info( "The number of measurements identified is {} < {}. DataFrame is converting to Pandas...", size, pd_limit_size, @@ -313,7 +313,7 @@ def aggregate_measurement( if measurement.empty: return {"measurement": measurement} else: - logger.debug( + logger.info( "The number of measurements identified is {}.", size, ) @@ -427,9 +427,9 @@ def _describe_measurement_by_code( on=concept_cols + ["unit_source_value"], ) - logger.debug("The overall statistics of measurements by code are computing...") + logger.info("The overall statistics of measurements by code are computing...") measurement_stats_overall = to("pandas", measurement_stats_overall) - logger.debug("The overall statistics of measurements are computed...") + logger.info("The overall statistics of measurements are computed...") measurement_stats_overall["MAD"] = 1.48 * measurement_stats_overall["MAD"] @@ -476,9 +476,9 @@ def _describe_measurement_by_code( measurement_stats["max_threshold"] = None measurement_stats["min_threshold"] = None - logger.debug("The statistics of measurements by care site are computing...") + logger.info("The statistics of measurements by care site are computing...") measurement_stats = to("pandas", measurement_stats) - logger.debug("The statistics of measurements by care site are computed...") + logger.info("The statistics of measurements by care site are computed...") measurement_stats = pd.concat([measurement_stats_overall, measurement_stats]) @@ -567,17 +567,17 @@ def _count_measurement_by_care_site_and_code_per_month( ["measurement_month"] ].fillna("Unknown") - logger.debug( + logger.info( "The counting of measurements by care site and code for each month is processing..." ) measurement_count = to("pandas", measurement_count) - logger.debug("The counting of measurements is finished...") + logger.info("The counting of measurements is finished...") - logger.debug( + logger.info( "The counting of missing values by care site and code for each month is processing..." ) missing_value_count = to("pandas", missing_value_count) - logger.debug("The counting of missing values is finished...") + logger.info("The counting of missing values is finished...") measurement_volumetry = measurement_count.merge( missing_value_count, @@ -776,7 +776,7 @@ def _bin_measurement_value_by_care_site_and_code( .rename(columns={"measurement_id": "frequency"}) ) - logger.debug("The binning of measurements' values is processing...") + logger.info("The binning of measurements' values is processing...") measurement_distribution = to("pandas", measurement_distribution) - logger.debug("The binning of measurements' values is finished...") + logger.info("The binning of measurements' values is finished...") return measurement_distribution diff --git a/eds_scikit/biology/viz/wrapper.py b/eds_scikit/biology/viz/wrapper.py index 8ede6841..f243134c 100644 --- a/eds_scikit/biology/viz/wrapper.py +++ b/eds_scikit/biology/viz/wrapper.py @@ -76,7 +76,7 @@ def plot_biology_summary( if not os.path.isdir(save_folder_path): os.mkdir(save_folder_path) - logger.debug("{} folder has been created.", save_folder_path) + logger.info("{} folder has been created.", save_folder_path) if isinstance(concepts_sets, list) and all( isinstance(concepts_set, ConceptsSet) for concepts_set in concepts_sets @@ -85,7 +85,7 @@ def plot_biology_summary( concepts_set_path = "{}/{}".format(save_folder_path, concepts_set.name) rmtree(concepts_set_path, ignore_errors=True) os.mkdir(concepts_set_path) - logger.debug( + logger.info( "{}/{} folder has been created.", save_folder_path, concepts_set.name, @@ -111,7 +111,7 @@ def plot_biology_summary( ) ) - logger.debug( + logger.info( "{} has been processed and saved in {}/{} folder.", concepts_set.name, save_folder_path, From 660a9cd7e42f088fc1544e9684ac14837b85d4b8 Mon Sep 17 00:00:00 2001 From: Thomas PETIT-JEAN Date: Fri, 27 Jan 2023 18:20:12 +0100 Subject: [PATCH 19/25] doc: i2b2 --- changelog.md | 7 + docs/functionalities/generic/io.ipynb | 199 +++++++++++++------------- docs/index.md | 8 +- 3 files changed, 115 insertions(+), 99 deletions(-) diff --git a/changelog.md b/changelog.md index 581e6fbe..c7ddd93d 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## Pending + +### Added + +- New BackendDispatcher to handle framework-specific functions +- I2B2 to OMOP connector + ## v0.1.2 (2022-12-05) ### Added diff --git a/docs/functionalities/generic/io.ipynb b/docs/functionalities/generic/io.ipynb index cf152064..48a66dc3 100644 --- a/docs/functionalities/generic/io.ipynb +++ b/docs/functionalities/generic/io.ipynb @@ -2,34 +2,33 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "# IO: Getting Data" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "3 classes are available to facilitate data access:\n", "\n", "- `HiveData`: Getting data from a Hive cluster, returning `Koalas` DataFrames\n", "- `PandasData`: Getting data from tables saved on disk, returning `Pandas` DataFrames\n", "- `PostgresData`: Getting data from a PostGreSQL DB, returning `Pandas` DataFrames\n" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 2, - "metadata": {}, - "outputs": [], "source": [ "from eds_scikit.io import HiveData, PandasData, PostgresData" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Loading from Hive: `HiveData`\n", "\n", @@ -37,11 +36,11 @@ "\n", "- A `SparkSession` variable\n", "- The name of the Database to connect to\n" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "!!! aphp \"Using **Spark** kernels\"\n", " All kernels designed to use Spark are configured to expose 3 variables at startup: \n", @@ -50,21 +49,23 @@ " - `sc`, the current SparkContext\n", " - `sql`, a function to execute SQL code on the Hive Database. \n", "\n", - " In this case you can just provide the `spark` variable to `HiveData` !" - ] + " In this case you can just provide the `spark` variable to `HiveData` !\n", + "\n", + "!!! tip \"Working with an I2B2 database\"\n", + " To use a built-in *I2B2 to OMOP* connector, specify `database_type=\"I2b2\"` when instantiating `HiveData`" + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "If needed, the following snippet allows to create the necessary variables:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "from pyspark import SparkConf, SparkContext\n", "from pyspark.sql.session import SparkSession\n", @@ -75,42 +76,47 @@ " .enableHiveSupport() \\\n", " .getOrCreate()\n", "sql = spark.sql" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "The class `HiveData` provides a convenient interface to OMOP data stored in Hive. \n", "The OMOP tables can be accessed as attribute and they are represented as [Koalas DataFrames](https://koalas.readthedocs.io/en/latest/getting_started/10min.html#10-minutes-to-Koalas).\n", "You simply need to mention your Hive database name." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 3, - "metadata": {}, - "outputs": [], "source": [ "data = HiveData(\n", " spark, \n", " DB_NAME\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "By default, only a subset of tables are added as attributes:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 4, - "metadata": {}, + "source": [ + "data.available_tables" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "['care_site',\n", @@ -122,29 +128,30 @@ " 'visit_occurrence']" ] }, - "execution_count": 4, "metadata": {}, - "output_type": "execute_result" + "execution_count": 4 } ], - "source": [ - "data.available_tables" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "`Koalas` DataFrames, like `Spark` DataFrames, rely on a *lazy* execution plan: As long as no data needs to be specifically collected, saved or displayed, no code is executed. It is simply saved for a later execution. \n", "The main interest of Koalas DataFrames is that you can use (most of) the Pandas API:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 5, - "metadata": {}, + "source": [ + "person = data.person\n", + "person.drop(columns = ['person_id']).head()" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/html": [ "
\n", @@ -250,21 +257,15 @@ "4 3347197472 1990 2 2 1990-02-02 NaT m 2008119900 ORBIS" ] }, - "execution_count": 5, "metadata": {}, - "output_type": "execute_result" + "execution_count": 5 } ], - "source": [ - "person = data.person\n", - "person.drop(columns = ['person_id']).head()" - ] + "metadata": {} }, { "cell_type": "code", "execution_count": 6, - "metadata": {}, - "outputs": [], "source": [ "from datetime import datetime\n", "\n", @@ -276,71 +277,71 @@ " .person_id\n", " .count()\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Once data has been sufficiently aggregated, it can be converted back to Pandas, e.g. for plotting." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 7, - "metadata": {}, - "outputs": [], "source": [ "stats_pd = stats.to_pandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Similarily, if you want to work on the `Spark` DataFrame instead, a similar method is available:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 8, - "metadata": {}, - "outputs": [], "source": [ "person_spark = person.to_spark()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Persisting/Reading a sample to/from disk: `PandasData`\n", "\n", "Working with Pandas DataFrame is, when possible, more convenient. \n", "You have the possibility to save your database or at least a subset of it. \n", "Doing so allows you to work on it later without having to go through `Spark` again. " - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "!!! warning \"Careful with cohort size\"\n", " Do not save it if your cohort is **big**: This saves **all** available tables on disk." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "For instance, let us define a dummy subset of 1000 patients:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 10, - "metadata": {}, - "outputs": [], "source": [ "visits = data.visit_occurrence\n", "\n", @@ -355,20 +356,20 @@ " .head(1000)\n", " .to_list()\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "And save every table restricted to this small cohort as a `parquet` file:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "import os\n", "\n", @@ -382,11 +383,12 @@ " tables=tables_to_save,\n", " person_ids=sample_patients\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Once you saved some data to disk, a dedicated class can be used to access it: \n", "The class `PandasData` can be used to load OMOP data from a folder containing several parquet files. The tables\n", @@ -394,74 +396,84 @@ "\n", "**Warning**: in this case, the whole table will be loaded into memory on a single jupyter server. Consequently it is advised\n", "to only use this for small datasets." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 5, - "metadata": {}, - "outputs": [], "source": [ "data = PandasData(folder)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": 6, - "metadata": {}, + "source": [ + "data.available_tables" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "['visit_occurrence', 'visit_detail', 'person']" ] }, - "execution_count": 6, "metadata": {}, - "output_type": "execute_result" + "execution_count": 6 } ], - "source": [ - "data.available_tables" - ] + "metadata": {} }, { "cell_type": "code", "execution_count": 7, - "metadata": {}, + "source": [ + "person = data.person\n", + "print(f\"type: {type(person)}\")\n", + "print(f\"shape: {person.shape}\")" + ], "outputs": [ { - "name": "stdout", "output_type": "stream", + "name": "stdout", "text": [ "type: \n", "shape: (1000, 10)\n" ] } ], - "source": [ - "person = data.person\n", - "print(f\"type: {type(person)}\")\n", - "print(f\"shape: {person.shape}\")" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Loading from PostGres: `PostgresData`\n", "\n", "OMOP data can be stored in a PostgreSQL database. The `PostgresData` class provides a convinient interface to it.\n", "\n", "**Note :** this class relies on the file `~/.pgpass` that contains your identifiers for several databases." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 15, - "metadata": {}, + "source": [ + "data = PostgresData(\n", + " dbname=DB, \n", + " schema=\"omop\", \n", + " user=USER,\n", + ")\n", + "\n", + "data.read_sql(\"select count(*) from person\")" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/html": [ "
\n", @@ -499,20 +511,11 @@ "0 12688670" ] }, - "execution_count": 15, "metadata": {}, - "output_type": "execute_result" + "execution_count": 15 } ], - "source": [ - "data = PostgresData(\n", - " dbname=DB, \n", - " schema=\"omop\", \n", - " user=USER,\n", - ")\n", - "\n", - "data.read_sql(\"select count(*) from person\")" - ] + "metadata": {} } ], "metadata": { diff --git a/docs/index.md b/docs/index.md index ca3b91ef..8a0e6a64 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,6 +26,9 @@ As an example, the following figure was obtained using various functionalities f !!! question "How was it done ?" Click on the figure above to jump to the tutorial using various functionalities from eds-scikit, or continue reading the introduction! +!!! tip "Using `eds-scikit` with I2B2" + Although designed for OMOP databases, `eds-scikit` provides a connector for I2B2 databases is available. We don't guarantee its exhaustivity, but it should allow you to use functionnalities of the library seamlessly. + ## Quick start ### Installation @@ -90,7 +93,7 @@ color:green Successfully installed eds_scikit ! ### A first example: Merging visits together Let's tackle a common problem when dealing with clinical data: Merging close/consecutive visits into **stays**. -As detailled in [the dedicated section](), eds-scikit is expecting to work with [Pandas](https://pandas.pydata.org/) or [Koalas](https://koalas.readthedocs.io/en/latest/) DataFrames. We provide various connectors to facilitate data fetching, namely a [Hive]() connector and a [Postgres]() connector +As detailled in [the dedicated section](), eds-scikit is expecting to work with [Pandas](https://pandas.pydata.org/) or [Koalas](https://koalas.readthedocs.io/en/latest/) DataFrames. We provide various connectors to facilitate data fetching, namely a [Hive][loading-from-hive-hivedata] connector and a [Postgres][loading-from-postgres-postgresdata] connector === "Using a Hive DataBase" @@ -104,6 +107,9 @@ As detailled in [the dedicated section](), eds-scikit is expecting to work with 1. With this connector, `visit_occurrence` will be a *Pandas* DataFrame + !!! tip "I2B2" + If `DB_NAME` points to an I2B2 database, use `data = HiveData(DB_NAME, database_type="I2B2")` + === "Using a Postgres DataBase" ```python From 7eb9e73cb167fa3e20047ec32dcf8596aece7907 Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Thu, 2 Feb 2023 10:31:56 +0000 Subject: [PATCH 20/25] hotfix: isort version --- .pre-commit-config.yaml | 2 +- eds_scikit/io/files.py | 5 +---- eds_scikit/io/hive.py | 9 +++++++-- pyproject.toml | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e38d3199..3055b3f4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: - id: check-added-large-files args: ["--maxkb", "5000"] - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.11.5 hooks: - id: isort name: isort (python) diff --git a/eds_scikit/io/files.py b/eds_scikit/io/files.py index 2e95aab3..98f0a434 100644 --- a/eds_scikit/io/files.py +++ b/eds_scikit/io/files.py @@ -3,8 +3,6 @@ import pandas as pd -from . import settings - class PandasData: # pragma: no cover def __init__( @@ -36,10 +34,9 @@ def __init__( def list_available_tables(folder: str) -> Tuple[List[str], List[str]]: available_tables = [] tables_paths = {} - known_omop_tables = settings.tables_to_load.keys() for filename in os.listdir(folder): table_name, extension = os.path.splitext(filename) - if extension == ".parquet" and table_name in known_omop_tables: + if extension == ".parquet": abspath = os.path.abspath(os.path.join(folder, filename)) tables_paths[table_name] = abspath available_tables.append(table_name) diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index b20c7913..4eaf1e2b 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -193,12 +193,14 @@ def _prepare_person_ids(self, list_of_person_ids) -> Optional[SparkDataFrame]: else: unique_ids = set(list_of_person_ids) - print(f"Number of unique patients: {len(unique_ids)}") schema = StructType([StructField("person_id", LongType(), True)]) filtering_df = self.spark_session.createDataFrame( [(int(p),) for p in unique_ids], schema=schema - ) + ).cache() + + print(f"Number of unique patients: {filtering_df.count()}") + return filtering_df def _read_table(self, table_name, person_ids=None) -> DataFrame: @@ -261,6 +263,9 @@ def persist_tables_to_folder( ) folder = os.path.abspath(folder) + + os.makedirs(folder, mode=0o766, exist_ok=False) + assert os.path.exists(folder) and os.path.isdir( folder ), f"Folder {folder} not found." diff --git a/pyproject.toml b/pyproject.toml index 3757972b..7323069c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ dependencies = [ "loguru>=0.6.0, <0.7.0", "pypandoc==1.7.5", "pyspark==2.4.3", - "pyarrow>=0.10, <0.17.0", + "pyarrow==0.17.0", #"pyarrow>=0.10, <0.17.0", "pretty-html-table>=0.9.15, <0.10.0", "catalogue", "schemdraw>=0.15.0, <1.0.0", From 3fc77faa78b8079e51dc6e6b76c17803f665f0d3 Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Thu, 2 Feb 2023 12:39:18 +0000 Subject: [PATCH 21/25] fix emergency mapping from registry --- docs/recipes/small-cohorts.ipynb | 90 +++++++++++++-------- eds_scikit/emergency/emergency_care_site.py | 1 + eds_scikit/utils/checks.py | 2 +- tests/emergency/test_emergency_care_site.py | 2 +- 4 files changed, 58 insertions(+), 37 deletions(-) diff --git a/docs/recipes/small-cohorts.ipynb b/docs/recipes/small-cohorts.ipynb index 1c7efd29..66dafc71 100644 --- a/docs/recipes/small-cohorts.ipynb +++ b/docs/recipes/small-cohorts.ipynb @@ -40,7 +40,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "2022-05-19 07:59:01.372 | WARNING | eds_scikit::25 - \n", + "2023-02-02 11:04:54.866 | WARNING | eds_scikit::31 - \n", " To improve performances when using Spark and Koalas, please call `eds_scikit.improve_performances()`\n", " This function optimally configures Spark. Use it as:\n", " `spark, sc, sql = eds_scikit.improve_performances()`\n", @@ -67,7 +67,17 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 3, + "id": "779c6ef7-5839-4fcd-9971-a5e6fd804124", + "metadata": {}, + "outputs": [], + "source": [ + "DBNAME=\"cse210038_20220921_160214312112\"" + ] + }, + { + "cell_type": "code", + "execution_count": 4, "id": "27086763-abf5-43ec-9d22-7fea742ef4e6", "metadata": {}, "outputs": [ @@ -75,7 +85,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/export/home/tpetitjean/.user_conda/miniconda/envs/scikit/lib/python3.7/site-packages/pyarrow/util.py:43: FutureWarning: pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead.\n", + "/export/home/cse210038/Thomas/scikitenv/lib/python3.7/site-packages/pyarrow/util.py:39: FutureWarning: pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead\n", " warnings.warn(msg, FutureWarning)\n", " \r" ] @@ -98,7 +108,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 5, "id": "71a75e1c-5abc-4471-aa92-41185a95b261", "metadata": {}, "outputs": [], @@ -109,7 +119,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 6, "id": "11794b94-7736-4468-a262-7ad2d36a7232", "metadata": {}, "outputs": [], @@ -146,7 +156,7 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": 7, "id": "4e0c42dc-6333-4b02-b159-33d5341db558", "metadata": {}, "outputs": [ @@ -154,6 +164,10 @@ "name": "stderr", "output_type": "stream", "text": [ + "[Stage 2:> (0 + 2) / 200]/export/home/cse210038/Thomas/scikitenv/lib/python3.7/site-packages/pyarrow/util.py:39: FutureWarning: pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead\n", + " warnings.warn(msg, FutureWarning)\n", + "/export/home/cse210038/Thomas/scikitenv/lib/python3.7/site-packages/pyarrow/util.py:39: FutureWarning: pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead\n", + " warnings.warn(msg, FutureWarning)\n", " \r" ] }, @@ -161,11 +175,11 @@ "data": { "text/plain": [ "concept value \n", - "HEART_TRANSPLANT DZEA002 27\n", + "HEART_TRANSPLANT DZEA002 39\n", "dtype: int64" ] }, - "execution_count": 22, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } @@ -176,7 +190,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": 8, "id": "897f1753-6f6d-4633-a8f7-135d9ccb01ad", "metadata": {}, "outputs": [ @@ -191,11 +205,11 @@ "data": { "text/plain": [ "concept value\n", - "HEART_TRANSPLANT Z941 135\n", + "HEART_TRANSPLANT Z941 602\n", "dtype: int64" ] }, - "execution_count": 23, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } @@ -214,7 +228,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 9, "id": "f388afd0-4373-4170-af2e-203adb84e9a2", "metadata": {}, "outputs": [ @@ -242,17 +256,17 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": 10, "id": "b394ce04-fd97-4d48-8700-1321de4f0d17", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "30" + "53" ] }, - "execution_count": 25, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } @@ -271,7 +285,7 @@ }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 11, "id": "d5124d4a-8f6d-4433-a93f-99cc6fa660cc", "metadata": {}, "outputs": [ @@ -285,10 +299,10 @@ { "data": { "text/plain": [ - "'0.08898 %'" + "'0.06849 %'" ] }, - "execution_count": 32, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -308,16 +322,23 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": 14, "id": "e279c8d8-f489-479d-aa7c-886863718491", "metadata": {}, "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, { "name": "stdout", "output_type": "stream", "text": [ - "Number of unique patients: 30\n", - "writing ./heart_transplant_cohort/person.parquet\n" + "Number of unique patients: 53\n", + "writing /export/home/cse210038/Thomas/eds-scikit/docs/recipes/heart_transplant_cohort/person.parquet\n" ] }, { @@ -331,7 +352,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "writing ./heart_transplant_cohort/visit_detail.parquet\n" + "writing /export/home/cse210038/Thomas/eds-scikit/docs/recipes/heart_transplant_cohort/visit_detail.parquet\n" ] }, { @@ -345,7 +366,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "writing ./heart_transplant_cohort/visit_occurrence.parquet\n" + "writing /export/home/cse210038/Thomas/eds-scikit/docs/recipes/heart_transplant_cohort/visit_occurrence.parquet\n" ] }, { @@ -359,7 +380,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "writing ./heart_transplant_cohort/procedure_occurrence.parquet\n" + "writing /export/home/cse210038/Thomas/eds-scikit/docs/recipes/heart_transplant_cohort/procedure_occurrence.parquet\n" ] }, { @@ -373,7 +394,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "writing ./heart_transplant_cohort/condition_occurrence.parquet\n" + "writing /export/home/cse210038/Thomas/eds-scikit/docs/recipes/heart_transplant_cohort/condition_occurrence.parquet\n" ] }, { @@ -388,7 +409,6 @@ "import os\n", "\n", "folder = os.path.abspath(\"./heart_transplant_cohort\")\n", - "os.makedirs(folder, mode=777, exist_ok=True)\n", "\n", "tables_to_save = [\n", " \"person\",\n", @@ -418,7 +438,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 15, "id": "4938e306-4178-46f4-a8f6-3da42e3bbfb6", "metadata": {}, "outputs": [], @@ -438,17 +458,17 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 16, "id": "900b09bc-40c1-4222-8931-a63b13d78433", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "30" + "53" ] }, - "execution_count": 39, + "execution_count": 16, "metadata": {}, "output_type": "execute_result" } @@ -468,7 +488,7 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 17, "id": "f8f3f466-0b59-4ae1-900a-f7a93972daa6", "metadata": {}, "outputs": [ @@ -478,7 +498,7 @@ "'100.00000 %'" ] }, - "execution_count": 40, + "execution_count": 17, "metadata": {}, "output_type": "execute_result" } @@ -490,9 +510,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "scikit", "language": "python", - "name": "python3" + "name": "scikit" }, "language_info": { "codemirror_mode": { @@ -504,7 +524,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.11" + "version": "3.7.8" } }, "nbformat": 4, diff --git a/eds_scikit/emergency/emergency_care_site.py b/eds_scikit/emergency/emergency_care_site.py index 5f372ba2..dbc3fbd0 100644 --- a/eds_scikit/emergency/emergency_care_site.py +++ b/eds_scikit/emergency/emergency_care_site.py @@ -103,6 +103,7 @@ def from_mapping( if version is not None: function_name += f".{version}" mapping = registry.get("data", function_name=function_name)() + print(mapping) # Getting the right framework fw = framework.get_framework(care_site) diff --git a/eds_scikit/utils/checks.py b/eds_scikit/utils/checks.py index ef3ff172..c31b763e 100644 --- a/eds_scikit/utils/checks.py +++ b/eds_scikit/utils/checks.py @@ -96,7 +96,7 @@ def algo_checker( algo = _get_arg_value(function, "algo", args, kwargs) # Stripping eventual version suffix - algo = algo.split(".")[-1] + algo = algo.split(".")[0] if algo not in algos: raise ValueError( diff --git a/tests/emergency/test_emergency_care_site.py b/tests/emergency/test_emergency_care_site.py index a734cc7d..5bb5d9c8 100644 --- a/tests/emergency/test_emergency_care_site.py +++ b/tests/emergency/test_emergency_care_site.py @@ -145,6 +145,6 @@ def test_tagging(module, algo): converted_input_df = framework.to(module, input_df) - output = tag_emergency_care_site(converted_input_df, algo=algo) + output = tag_emergency_care_site(converted_input_df, algo=f"{algo}.test") assert_equal_no_order(framework.pandas(output), expected_result, check_like=True) From 35a662e486bf4cda5748e27874e46e59ec63d0a7 Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Thu, 2 Feb 2023 14:37:32 +0000 Subject: [PATCH 22/25] fix emergency test --- eds_scikit/emergency/emergency_care_site.py | 2 +- tests/emergency/test_emergency_care_site.py | 10 +++++----- tests/emergency/test_emergency_visits.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/eds_scikit/emergency/emergency_care_site.py b/eds_scikit/emergency/emergency_care_site.py index dbc3fbd0..6549c9de 100644 --- a/eds_scikit/emergency/emergency_care_site.py +++ b/eds_scikit/emergency/emergency_care_site.py @@ -102,8 +102,8 @@ def from_mapping( function_name = "get_care_site_emergency_mapping" if version is not None: function_name += f".{version}" + mapping = registry.get("data", function_name=function_name)() - print(mapping) # Getting the right framework fw = framework.get_framework(care_site) diff --git a/tests/emergency/test_emergency_care_site.py b/tests/emergency/test_emergency_care_site.py index 5bb5d9c8..077800f2 100644 --- a/tests/emergency/test_emergency_care_site.py +++ b/tests/emergency/test_emergency_care_site.py @@ -5,8 +5,8 @@ from eds_scikit.utils.test_utils import assert_equal_no_order, make_df # Dictionnary of the form {algo_name : [input_df, expected_output_df]} -algos = dict( - from_mapping=[ +algos = { + "from_mapping.test": [ make_df( """ care_site_id,care_site_source_value @@ -36,7 +36,7 @@ """ ), ], - from_regex_on_care_site_description=[ + "from_regex_on_care_site_description": [ make_df( """ care_site_name @@ -134,7 +134,7 @@ """ ), ], -) +} @pytest.mark.parametrize("module", ["pandas", "koalas"]) @@ -145,6 +145,6 @@ def test_tagging(module, algo): converted_input_df = framework.to(module, input_df) - output = tag_emergency_care_site(converted_input_df, algo=f"{algo}.test") + output = tag_emergency_care_site(converted_input_df, algo=algo) assert_equal_no_order(framework.pandas(output), expected_result, check_like=True) diff --git a/tests/emergency/test_emergency_visits.py b/tests/emergency/test_emergency_visits.py index 6be75747..8fb3b10c 100644 --- a/tests/emergency/test_emergency_visits.py +++ b/tests/emergency/test_emergency_visits.py @@ -52,7 +52,7 @@ visit_detail=visit_detail, care_site=care_site, visit_occurrence=None, - algo="from_mapping", + algo="from_mapping.test", ), dict( visit_detail=visit_detail, From bb51963d5ffecc35d0fbe48565db71e01bc5040b Mon Sep 17 00:00:00 2001 From: Matthieu Doutreligne Date: Thu, 2 Feb 2023 14:39:29 +0000 Subject: [PATCH 23/25] fix: remove unwanted cell in notebook --- docs/recipes/small-cohorts.ipynb | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docs/recipes/small-cohorts.ipynb b/docs/recipes/small-cohorts.ipynb index 66dafc71..fbd2ad1a 100644 --- a/docs/recipes/small-cohorts.ipynb +++ b/docs/recipes/small-cohorts.ipynb @@ -65,16 +65,6 @@ "DBNAME=\"YOUR_DATABASE_NAME\"" ] }, - { - "cell_type": "code", - "execution_count": 3, - "id": "779c6ef7-5839-4fcd-9971-a5e6fd804124", - "metadata": {}, - "outputs": [], - "source": [ - "DBNAME=\"cse210038_20220921_160214312112\"" - ] - }, { "cell_type": "code", "execution_count": 4, From 117083762cc2d14e60b51a84842a11c521e6a20c Mon Sep 17 00:00:00 2001 From: Adam REMAKI Date: Thu, 2 Feb 2023 16:35:49 +0100 Subject: [PATCH 24/25] docs: :lipstick: Update bioclean table for clarity (#23) * docs: :lipstick: Update bioclean table for clarity * new line * Hot fix n the doc * doc: explicit configuration in biology tutorial --------- Co-authored-by: Thomas Petit-Jean --- docs/functionalities/biology/index.md | 14 ++--- docs/functionalities/biology/tutorial.ipynb | 65 ++++++++++++++------ docs/functionalities/biology/vocabulary.md | 2 +- eds_scikit/biology/utils/process_concepts.py | 4 +- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/docs/functionalities/biology/index.md b/docs/functionalities/biology/index.md index 09c7260f..2a3e51af 100644 --- a/docs/functionalities/biology/index.md +++ b/docs/functionalities/biology/index.md @@ -73,13 +73,13 @@ bioclean(data, start_date="2020-01-01", end_date="2021-12-31") data.bioclean.head() ``` -| concepts_set | transformed_unit | transformed_value | max_threshold | min_threshold | outlier | .... | -| :----------- | :--------------- | :---------------- | :------------ | :------------ | :------ | :--- | -| Entity A | x10*9/l | 115 | 190 | 0 | False | .... | -| Entity A | x10*9/l | 220 | 190 | 0 | True | .... | -| Entity B | mmol | 0.45 | 8.548 | 0.542 | True | .... | -| Entity B | mmol | 4.52 | 8.548 | 0.542 | False | .... | -| Entity B | mmol | 9.58 | 8.548 | 0.542 | True | .... | +| concepts_set | LOINC_concept_code | LOINC_concept_name | AnaBio_concept_code | AnaBio_concept_name | transformed_unit | transformed_value | max_threshold | min_threshold | outlier | value_source_value | unit_source_value | +| :------------------------- | :----------------- | :----------------- | :------------------ | :------------------- | :--------------- | :---------------- | :------------ | :------------ | :------ | :----------------- | :---------------- | +| EntityA_Blood_Quantitative | 000-0 | EntityA #Bld | A0000 | EntityA_Blood | x10*9/l | 115 | 190 | 0 | False | 115 x10*9/l | x10*9/l | +| EntityA_Blood_Quantitative | 000-1 | EntityA_Blood_Vol | A0001 | EntityA_Blood_g/l | x10*9/l | 220 | 190 | 0 | True | 560 g/l | g/l | +| EntityB_Blood_Quantitative | 001-0 | EntityB_Blood | B0000 | EntityB_Blood_artery | mmol | 0.45 | 8.548 | 0.542 | True | 0.45 mmol | mmol | +| EntityB_Blood_Quantitative | 001-0 | EntityB_Blood | B0001 | EntityB_Blood_vein | mmol | 4.52 | 8.548 | 0.542 | False | 4.52 mmol | mmol | +| EntityB_Blood_Quantitative | 000-1 | EntityB Bld Auto | B0002 | EntityB_Blood_µg/l | mmol | 9.58 | 8.548 | 0.542 | True | 3587 µg/l | µg/l | For more details, have a look on [the dedicated section](cleaning). diff --git a/docs/functionalities/biology/tutorial.ipynb b/docs/functionalities/biology/tutorial.ipynb index 59a224dc..bd6a87d5 100644 --- a/docs/functionalities/biology/tutorial.ipynb +++ b/docs/functionalities/biology/tutorial.ipynb @@ -143,13 +143,42 @@ }, { "cell_type": "markdown", - "id": "e67de6c8", + "id": "9ac6f5a6-bdff-4826-8b3f-314856f2e1d9", + "metadata": {}, + "source": [ + "## 3. Define the configuration\n", + "\n", + "The configuration files does 3 things:\n", + "\n", + "- Remove outliers\n", + "- Remove unwanted codes\n", + "- Normalize units\n", + "\n", + "### 3.1 The default configuration\n", + "\n", + "A **default configuration** is available when working on APHP's CDW. You can access it via:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5051cb27-ada6-49dd-9026-0c6489b29bbd", "metadata": {}, + "outputs": [], "source": [ - "## 3. Create your own configuration (**OPTIONAL**)\n", + "from eds_scikit.resources import registry\n", "\n", - "If the [default configuration](../../datasets/biology-config.md) file based on the AP-HP's Data Warehouse does not meet your requirements, you can follow this tutorial to create your own configuration file.\n", + "biology_config = registry.get(\"data\", \"get_biology_config.all_aphp\")()" + ] + }, + { + "cell_type": "markdown", + "id": "e67de6c8", + "metadata": {}, + "source": [ + "### 3.2 Create your own configuration (**OPTIONAL**)\n", "\n", + "If this default configuration file does not meet your requirements, you can follow this tutorial to create your own configuration file. \n", "As a reminder, a configuration file is a csv table where each row corresponds to a given standard concept_code and a given unit. For each row, it gives a maximum threshold and a minimum threshold to flag outliers and a unit conversion coefficient to normalize units if needed." ] }, @@ -158,7 +187,7 @@ "id": "ed657ed4", "metadata": {}, "source": [ - "### 3.1 Plot statistical summary\n", + "#### 3.2.1 Plot statistical summary\n", "\n", "The first step is to compute the statistical summary of each concepts-set with the function ``plot_biology_summary(stats_only=True)``. " ] @@ -602,7 +631,7 @@ "id": "70ce6f91", "metadata": {}, "source": [ - "### 3.2 Create configuration from statistical summary\n", + "#### 3.2.2 Create configuration from statistical summary\n", "\n", "Then, you can use the function ``create_config_from_stats()`` to pre-fill the configuration file with ``max_threshold`` and ``min_threshold``. The thresholds computation is based on the Median Absolute Deviation (MAD) Methodology[@madmethodology]." ] @@ -629,7 +658,7 @@ "id": "7eb98862", "metadata": {}, "source": [ - "### 3.3 Edit units manually" + "#### 3.2.3 Edit units manually" ] }, { @@ -649,7 +678,7 @@ "id": "8c634329", "metadata": {}, "source": [ - "### 3.4 Use your custom configuration\n", + "#### 3.2.4 Use your custom configuration\n", "\n", "Once you created your configuration (for instance under the name `config_name=\"my_custom_config\"`), you can use provide it to the relevant functions (see below).\n", "\n", @@ -689,7 +718,7 @@ "bioclean(\n", " data,\n", " concepts_sets=concepts_sets,\n", - " config_name=config_name,\n", + " config_name=config_name, # use config_name=\"all_aphp\" for APHP's default configuration\n", " start_date=start_date,\n", " end_date=end_date,\n", ")" @@ -702,13 +731,13 @@ "source": [ "See below the columns created by the ``bioclean()`` function:\n", "\n", - "| concepts_set | transformed_unit | transformed_value | max_threshold | min_threshold | outlier | .... |\n", - "| :----------- | :--------------- | :---------------- | :------------ | :------------ | :------ | :--- |\n", - "| Entity A | x10*9/l | 115 | 190 | 0 | False | .... |\n", - "| Entity A | x10*9/l | 220 | 190 | 0 | True | .... |\n", - "| Entity B | mmol | 0.45 | 8.548 | 0.542 | True | .... |\n", - "| Entity B | mmol | 4.52 | 8.548 | 0.542 | False | .... |\n", - "| Entity B | mmol | 9.58 | 8.548 | 0.542 | True | .... |" + "| concepts_set | LOINC_concept_code | LOINC_concept_name | AnaBio_concept_code | AnaBio_concept_name | transformed_unit | transformed_value | max_threshold | min_threshold | outlier | value_source_value | unit_source_value |\n", + "| :------------------------- | :----------------- | :----------------- | :------------------ | :------------------- | :--------------- | :---------------- | :------------ | :------------ | :------ | :----------------- | :---------------- |\n", + "| EntityA_Blood_Quantitative | 000-0 | EntityA #Bld | A0000 | EntityA_Blood | x10*9/l | 115 | 190 | 0 | False | 115 x10*9/l | x10*9/l |\n", + "| EntityA_Blood_Quantitative | 000-1 | EntityA_Blood_Vol | A0001 | EntityA_Blood_g/l | x10*9/l | 220 | 190 | 0 | True | 560 g/l | g/l |\n", + "| EntityB_Blood_Quantitative | 001-0 | EntityB_Blood | B0000 | EntityB_Blood_artery | mmol | 0.45 | 8.548 | 0.542 | True | 0.45 mmol | mmol |\n", + "| EntityB_Blood_Quantitative | 001-0 | EntityB_Blood | B0001 | EntityB_Blood_vein | mmol | 4.52 | 8.548 | 0.542 | False | 4.52 mmol | mmol |\n", + "| EntityB_Blood_Quantitative | 000-1 | EntityB Bld Auto | B0002 | EntityB_Blood_µg/l | mmol | 9.58 | 8.548 | 0.542 | True | 3587 µg/l | µg/l |" ] }, { @@ -748,9 +777,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "scikit", "language": "python", - "name": "python3" + "name": "scikit" }, "language_info": { "codemirror_mode": { @@ -762,7 +791,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.12" + "version": "3.7.8" }, "vscode": { "interpreter": { diff --git a/docs/functionalities/biology/vocabulary.md b/docs/functionalities/biology/vocabulary.md index 40a3c5c5..b08813f5 100644 --- a/docs/functionalities/biology/vocabulary.md +++ b/docs/functionalities/biology/vocabulary.md @@ -15,4 +15,4 @@ The standard vocabulary is a unified vocabulary that allows data analysis on a l ## Vocabulary flowchart in OMOP -![Image title](../../_static/biology/config_map_units.svg) +![Image title](../../_static/biology/vocabulary_flowchart.svg) diff --git a/eds_scikit/biology/utils/process_concepts.py b/eds_scikit/biology/utils/process_concepts.py index 9947d751..cfd96398 100644 --- a/eds_scikit/biology/utils/process_concepts.py +++ b/eds_scikit/biology/utils/process_concepts.py @@ -120,9 +120,9 @@ def get_concept_src_to_std( concepts_sets : List[ConceptsSet] List of concepts-sets to select standard_concept_regex : dict, optional - **EXAMPLE**: `["LOINC", "AnaBio"]` - standard_terminologies : List[str], optional **EXAMPLE**: `{"LOINC": "[0-9]{2,5}[-][0-9]","AnaBio": "[A-Z][0-9]{4}"}` + standard_terminologies : List[str], optional + **EXAMPLE**: `["LOINC", "AnaBio"]` Returns From e29f4a58e0c4d01e990496e20b61b333ec686ede Mon Sep 17 00:00:00 2001 From: Thomas Petit-Jean <30775613+Thomzoy@users.noreply.github.com> Date: Thu, 2 Feb 2023 16:47:18 +0100 Subject: [PATCH 25/25] V0.1.3 (#28) * chore: bump to v0.1.3 * chore: changelog --- README.md | 6 +++--- changelog.md | 2 +- eds_scikit/__init__.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ee90eefe..057c014c 100644 --- a/README.md +++ b/README.md @@ -57,13 +57,13 @@ eds-scikit stands on the shoulders of [Spark 2.4](https://spark.apache.org/docs/ You can install eds-scikit via `pip`: ```bash -pip install eds-scikit +pip install "eds-scikit[aphp]" ``` -:warning: If you work in AP-HP's ecosystem (EDS), please install additionnal features via: +:warning: If you don't work in AP-HP's ecosystem (EDS), please install via: ```bash -pip install "eds-scikit[aphp]" +pip install eds-scikit ``` You can now import the library via diff --git a/changelog.md b/changelog.md index c7ddd93d..5b6401e8 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,6 @@ # Changelog -## Pending +## v0.1.3 (2023-02-02) ### Added diff --git a/eds_scikit/__init__.py b/eds_scikit/__init__.py index 3e5c80e4..9cfd7c6d 100644 --- a/eds_scikit/__init__.py +++ b/eds_scikit/__init__.py @@ -1,7 +1,7 @@ """Top-level package for eds_scikit.""" __author__ = """eds_scikit""" -__version__ = "0.1.2" +__version__ = "0.1.3" import importlib import os