#!/usr/bin/env python # -*- coding: UTF-8 -*- import pandas_access as mdb from pycen import con_sicen, update_to_sql import pandas as pd import geopandas as gpd from os import path postgis_sch = 'serenabase' ACCESS_PATH = '/media/colas/SRV/FICHIERS/OUTILS/BASES DE DONNEES/RHOMEO/2012/CEN-38_RHOMEO_BDD_2012' ACCESS_FILE = 'BD_Rhomeo_data.mdb' DICT_COLS = { 'obse_id':'id_origine', } def get_columns_oftable(tab,sch,con): res = con.dialect.get_columns(con,tab,sch) return [x['name'] for x in res] def get_columns_dtypes(tab,sch,con,df:pd.DataFrame=None): res = con.dialect.get_columns(con,tab,sch) if isinstance(df,pd.DataFrame): return {x['name']:x['type'] for x in res if x['name'] in df.columns} else: return {x['name']:x['type'] for x in res} def match_columns(df,tab,sch,con): cols_saisie = get_columns_oftable(tab,sch,con) obs_cols = df.columns return obs_cols[obs_cols.isin(cols_saisie)] def lower(df): for c in df.columns : df[c] = df[c].str.lower() if df[c].apply(type).eq(str).all() else df[c] return df def get_odat_obse(id_obse:gpd.np.ndarray|list|pd.Series|int=None): sql = ''' SELECT o.odat_obse_id, c.choi_nom, l.list_nom, o.odat_nom FROM {sch}.rnf_odat o LEFT JOIN {sch}.rnf_choi c ON c.choi_id = o.odat_choi_id LEFT JOIN {sch}.rnf_list l ON l.list_id = o.odat_c_list_id '''.format(sch=postgis_sch) if isinstance(id_obse,int) : sql += ' WHERE odat_obse_id = %i'%id_obse elif isinstance(id_obse,(list,gpd.pd.Series,gpd.np.ndarray)) : sql += ' WHERE odat_obse_id IN {lst_id}'.format(lst_id=tuple(id_obse)) result = pd.read_sql_query(sql,con_sicen,index_col=['odat_obse_id','list_nom']).replace({r' \(RhoMeO\)':'',r'\?':'',r'[.]':''},regex=True) df = lower(result).pivot(columns='choi_nom',values='odat_nom').reset_index(drop=False) return format_comportement(df) def format_comportement(oda:pd.DataFrame): DICT = { 'exuvie/émergent':'ODO_Exuvie/émergence', 'mâles+femelles':'ODO_Mâles+Femelles', 'autre':'ODO_Autre', 'territoriale':'ODO_Territorial', 'ponte':'ODO_Ponte', 'accouplement':'ODO_Tandem', 'tandem':'ODO_Tandem', 'aucun':None, } return oda.replace(DICT) def format_odat(df,odat): DICT_COLS = { 'comportement':'reprostatut', 'strate':'type_effectif', 'rmq_odat':'remarque_obs' } df_odat = df[['obse_id','obse_nombre','obse_nom','cd_nom','ordre']].merge( odat, left_on='obse_id',right_on='odat_obse_id' ) nb_cols = odat.columns[odat.columns.str.startswith('nb')] df_odat['rmq_odat'] = None for c in nb_cols: test1 = df_odat.obse_nombre!=df_odat[c] test2 = df_odat[c].notna() test3 = df_odat.rmq_odat.notna() test = test1&test2&test3 tes = test1&test2&~test3 if any(tes) or any(test): tt = '{rmq_o}; {c}:{data}'.format(rmq_o=df_odat[test].rmq_odat,c=c,data=df_odat[test][c]) df_odat.loc[test,'rmq_odat'] = df_odat[test].rmq_odat+';'+c+':'+df_odat[test][c] df_odat.loc[tes,'rmq_odat'] = c+':'+df_odat[tes][c] return df_odat.rename(columns=DICT_COLS)[['odat_obse_id',*DICT_COLS.values()]] def get_obs_serena(insicen:bool=None): sql = ''' SELECT o.*, s.site_nom, s.site_ref_sig, CASE WHEN t.taxo_id = 203471 THEN (SELECT cd_nom FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT cd_nom FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.cd_nom IS NULL THEN ta.cd_nom::text ELSE tax.cd_nom END cd_nom, CASE WHEN t.taxo_id = 203471 THEN (SELECT phylum FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT phylum FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.phylum IS NULL THEN ta.phylum ELSE tax.phylum END phylum, CASE WHEN t.taxo_id = 203471 THEN (SELECT classe FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT classe FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.classe IS NULL THEN ta.classe ELSE tax.classe END classe, CASE WHEN t.taxo_id = 203471 THEN (SELECT ordre FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT ordre FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.ordre IS NULL THEN ta.ordre ELSE tax.ordre END ordre, CASE WHEN t.taxo_id = 203471 THEN (SELECT famille FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT famille FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.famille IS NULL THEN ta.famille ELSE tax.famille END famille, CASE WHEN t.taxo_id = 203471 THEN (SELECT nom_complet FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT nom_complet FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.nom_complet IS NULL THEN ta.nom_complet ELSE tax.nom_complet END nom_complet, CASE WHEN t.taxo_id = 203471 THEN (SELECT nom_vern FROM inpn.taxref WHERE cd_nom = '54213' ) WHEN t.taxo_id = 203491 THEN (SELECT nom_vern FROM inpn.taxref WHERE cd_nom = '521494' ) WHEN tax.nom_vern IS NULL THEN ta.nom_vern ELSE tax.nom_vern END nom_vern, CASE WHEN tmpo.ogll_lon = '999' THEN tmps.sgll_lon::float ELSE tmpo.ogll_lon::float END lon, CASE WHEN tmpo.ogll_lat = '999' THEN tmps.sgll_lat::float ELSE tmpo.ogll_lat::float END lat FROM {sch}.rnf_obse o LEFT JOIN {sch}.rnf_relv r ON r.relv_id = o.obse_relv_id LEFT JOIN {sch}.rnf_site s ON s.site_id = o.obse_site_id LEFT JOIN {sch}.tmp_sgll tmps ON tmps.sgll_site_id = o.obse_site_id LEFT JOIN {sch}.tmp_ogll tmpo ON tmpo.ogll_obse_id = o.obse_id LEFT JOIN serenarefe.rnf_taxo t ON t.taxo_id = o.obse_taxo_id LEFT JOIN inpn.taxons_isere_absents_taxref ta ON ta.id_taxon = o.obse_taxo_id LEFT JOIN inpn.taxref tax ON t.taxo_mnhn_id = tax.cd_nom::int --LEFT JOIN {sch}.rnf_odat od ON od.odat_obse_id = o.obse_id '''.format(sch=postgis_sch) if insicen is not None : sql_sicen = 'SELECT DISTINCT id_origine FROM saisie.saisie_observation WHERE id_lot=3' sicen_obs = pd.read_sql_query(sql_sicen,con_sicen) if insicen is True : sql += 'WHERE obse_id IN {lst_id}'.format(lst_id=tuple(sicen_obs.id_origine.astype(int))) if insicen is False : sql += 'WHERE obse_id NOT IN {lst_id}'.format(lst_id=tuple(sicen_obs.id_origine.astype(int))) return pd.read_sql_query(sql,con_sicen)\ .dropna(axis=1,how='all') def list_releve(id_relv=None,source='postgis'): if source=='postgis': df_choi = pd.read_sql_table('rnf_choi',con_sicen,postgis_sch)\ .dropna(axis=1,how='all') df_relv = pd.read_sql_table('rnf_relv',con_sicen,postgis_sch)\ .dropna(axis=1,how='all') # Correction coquille df_relv.loc[df_relv.relv_categ_choi_id==100800,'relv_categ_choi_id'] = 100008 # Jointure relevé / catégorie df = df_relv.merge(df_choi[['choi_id','choi_nom']],how='inner',left_on='relv_categ_choi_id',right_on='choi_id')\ .drop(columns=['relv_categ_choi_id']) # Mise en forme des dates lstcol_dat = df.columns[df.columns.str.contains('date')] for lcd in lstcol_dat: # df[lcd] = df[lcd].replace({'VIDE':None}) df.loc[df[lcd]=='2000 à 2001',lcd] = '2000' if '1date' in lcd else '2001' df[lcd].replace({'VIDE':None},inplace=True) df[lcd] = pd.to_datetime(df[lcd]) if id_relv is not None: if isinstance(id_relv,int) : filtre = df.relv_id == id_relv elif isinstance(id_relv,(list,gpd.pd.Series,gpd.np.ndarray)) : filtre = df.relv_id.isin([*id_relv]) return df[filtre] else: return df def get_serena_obs(srce_id=None,source:str='access'): DICT_STR = { 'avenir':'cen isère', 'be':'personnel' } if source == 'postgis': sql = 'SELECT user_id,user_srce_id FROM {sch}.rnf_user'.format(sch=postgis_sch) if srce_id is not None: if isinstance(srce_id,int) : sql += ' WHERE user_srce_id = %i'%srce_id elif isinstance(srce_id,(list,gpd.pd.Series,gpd.np.ndarray)) : sql += ' WHERE user_srce_id IN {lst_id}'.format(lst_id=tuple(srce_id)) return lower(pd.read_sql_query(sql,con_sicen)) elif source == 'access': df = mdb.read_table(path.join(ACCESS_PATH,ACCESS_FILE),'rev_obs') if srce_id is not None: if isinstance(srce_id,int) : df = df[df.rev_observateur==srce_id] elif isinstance(srce_id,(list,gpd.pd.Series,gpd.np.ndarray)) : df = df[df.rev_observateur.isin([*srce_id])] df[['nom','prenom']] = df.nom_observateur.str.split(' ',1,expand=True) return lower(df).replace(DICT_STR) def get_sicen_obs(personne_id=None): sql = 'SELECT id_personne,prenom,nom,id_structure,nom_structure FROM md.personne JOIN md.structure USING (id_structure)'.format(sch=postgis_sch) if personne_id is not None: if isinstance(personne_id,int) : sql += ' WHERE personne_id = %i'%personne_id elif isinstance(personne_id,str) : sql += ' WHERE personne_id = %i'%int(personne_id) elif isinstance(personne_id,(list,gpd.pd.Series,gpd.np.ndarray)) : sql += ' WHERE personne_id IN {lst_id}'.format(lst_id=tuple(personne_id)) return lower(pd.read_sql_query(sql,con_sicen)) def get_sicen_etude(): sql = 'SELECT * FROM md.etude' return pd.read_sql_query(sql, con_sicen) def get_serena_choi(choi_id:gpd.np.ndarray|list|pd.Series|int=None): '''''' sql = 'SELECT choi_id,choi_nom FROM {sch}.rnf_choi'.format(sch=postgis_sch) if isinstance(choi_id,int) : sql += ' WHERE choi_id = %i'%choi_id elif isinstance(choi_id,(list,gpd.pd.Series,gpd.np.ndarray)) : sql += ' WHERE choi_id IN {lst_id}'.format(lst_id=tuple(choi_id)) return lower(pd.read_sql_query(sql,con_sicen)) def get_serena_site(site_id:gpd.np.ndarray|list|pd.Series|int=None): sql = 'SELECT site_id,site_nom FROM {sch}.rnf_site'.format(sch=postgis_sch) if isinstance(site_id,int) : sql += ' WHERE site_id = %i'%site_id elif isinstance(site_id,(list,gpd.pd.Series,gpd.np.ndarray)) : sql += ' WHERE site_id IN {lst_id}'.format(lst_id=tuple(site_id)) return pd.read_sql_query(sql,con_sicen) def get_sicen_pcol(): sql = '''SELECT * FROM "md"."protocole";''' return lower(pd.read_sql_query(sql,con_sicen)) def _crsp(left:pd.DataFrame,right:pd.DataFrame,lefton,righton,idleft,idright): df = left.merge(right,left_on=lefton,right_on=righton) return dict(zip(df[idleft],df[idright])) def crsp_relv(relv:pd.Series): seren_relv = lower(list_releve(relv.unique())) sicen_relv = lower(get_sicen_etude()) DICT_RELV_ID = _crsp(left=seren_relv,right=sicen_relv,lefton='relv_nom',righton='nom_etude',idleft='relv_id',idright='id_etude') return relv.replace(DICT_RELV_ID) def crsp_obs(obs:pd.Series): obs_seren = get_serena_obs(obs.unique()) obs_sicen = get_sicen_obs() DICT_OBS_ID = _crsp(left=obs_seren,right=obs_sicen,lefton=['nom','structure_obs'],righton=['nom','nom_structure'],idleft='rev_observateur',idright='id_personne') return obs.replace(DICT_OBS_ID) def crsp_pcol(pcol:pd.Series): pcol_seren = get_serena_choi(pcol.unique()) pcol_sicen = get_sicen_pcol() DICT_PCOL_ID = _crsp(left=pcol_seren,right=pcol_sicen,lefton='choi_nom',righton='libelle',idleft='choi_id',idright='id_protocole') return pcol.replace(DICT_PCOL_ID) def crsp_valid(valid:pd.Series): pcol_seren = get_serena_choi(valid.unique()) pcol_sicen = get_sicen_pcol() DICT_PCOL_ID = _crsp(left=pcol_seren,right=pcol_sicen,lefton='choi_nom',righton='libelle',idleft='choi_id',idright='id_protocole') return valid.replace(DICT_PCOL_ID) def crsp_abond(abond:pd.Series): ab_seren = get_serena_choi(abond.dropna().unique()) DICT_ABOND_ID = dict(zip(ab_seren.choi_id,ab_seren.choi_nom)) return abond.replace(DICT_ABOND_ID) def crsp_site(site:pd.Series): sit_seren = get_serena_site(site.dropna().unique()) DICT_SITE_ID = dict(zip(sit_seren.site_id,sit_seren.site_nom)) return site.replace(DICT_SITE_ID) def get_structure_id(id_pers:gpd.np.ndarray|list|pd.Series|int=None): sql = 'SELECT id_personne, id_structure FROM md.personne' if isinstance(id_pers,int) : sql += ' WHERE id_personne = %i'%id_pers elif isinstance(id_pers,(list,gpd.pd.Series,gpd.np.ndarray)) : sql += ' WHERE id_personne IN {lst_id}'.format(lst_id=tuple(id_pers)) return pd.read_sql_query(sql,con_sicen) def format_effectif(DF): df = DF.copy() isnum = df.effectif_min.str.isnumeric() df.loc[isnum,'effectif'] = df[isnum].effectif_min.copy() df.loc[isnum,'effectif_min'] = None df.loc[~isnum,'effectif_textuel'] = df[~isnum].effectif_min.copy() df.loc[~isnum,'effectif_min'] = df[~isnum].effectif_textuel.str.split(r'[-+]',expand=True)[0] df.loc[~isnum,'effectif_max'] = df[~isnum].effectif_textuel.str.split(r'[-+]',expand=True)[1] return df.replace({'':None}) def to_sicen(DF): CRSP_COLUMNS = { 'obse_id':'id_origine', 'obse_relv_id':'id_etude', 'obse_obsv_id':'observateur', 'obse_detm_id':'validateur', 'obse_date':'date_obs', # 'obse_site_id':'localisation', 'obse_pcole_choi_id':'id_protocole', 'obse_validat_choi_id':'statut_validation', 'obse_confid_choi_id':'diffusable', 'obse_abond_choi_id':'effectif_textuel', 'obse_nombre':'effectif_min', 'site_nom':'localisation', 'site_ref_sig':'id_waypoint', 'lat':'latitude', 'lon':'longitude', } df = DF.copy() if df.geometry.name != 'geometrie' : df.rename_geometry('geometrie',inplace=True) df.obse_relv_id = crsp_relv(df.obse_relv_id) df.obse_obsv_id = crsp_obs(df.obse_obsv_id) df.obse_detm_id = crsp_obs(df.obse_detm_id).astype(int) df.obse_date = pd.to_datetime(df.obse_date) # df.obse_site_id = crsp_site(df.obse_site_id) df.obse_pcole_choi_id = crsp_pcol(df.obse_pcole_choi_id) df.loc[df.obse_validat_choi_id==100373,'obse_validat_choi_id'] = 'validée' df.loc[df.obse_confid_choi_id==100473,'obse_confid_choi_id'] = True df.obse_abond_choi_id = crsp_abond(df.obse_abond_choi_id) df.rename(columns=CRSP_COLUMNS,inplace=True) # Tag du lot SERENA df['id_lot'] = 3 # Jointure des id_structure struct = get_structure_id(df.observateur.unique()) df = df.merge(struct,how='left',left_on='observateur', right_on='id_personne')\ .rename(columns={'id_structure':'structure'}) cols = match_columns(df,'saisie_observation','saisie',con_sicen) # Formatage des effectifs return format_effectif(df[cols]) def drop_cdnom_missing(df): isna = df.cd_nom.isna() idx_na = df[isna].index return df.drop(idx_na) if __name__ == "__main__": obs_serena = get_obs_serena(insicen=True) isna = obs_serena.cd_nom.isna() # Recréation des géométries gdf_obs = obs_serena.set_geometry(gpd.points_from_xy(obs_serena.lon,obs_serena.lat)) gdf_obs.set_crs(4326,inplace=True) gdf_obs.to_crs(2154,inplace=True) # Identification des données RhoMeo is_rhomeo = gdf_obs.obse_nom.str.contains('rhoméo',na=False,case=False) gdf_obs.loc[is_rhomeo,['obse_id','obse_nom','obse_date','cd_nom']] obs_rhomeo = gdf_obs[is_rhomeo]\ .dropna(axis=1,how='all')\ .drop(columns=['obse_habi_id']) rhoisna = obs_rhomeo.cd_nom.isna() obs_rhomeo = drop_cdnom_missing(obs_rhomeo) odat = get_odat_obse(obs_rhomeo.obse_id) OBS_RHOMEO = obs_rhomeo.merge( format_odat(obs_rhomeo,odat), left_on='obse_id',right_on='odat_obse_id',how='left' ).drop(columns='odat_obse_id') OBS_RHOMEO.to_file('/home/colas/Documents/9_PROJETS/4_SICEN/RECOVERY/rhomeo_data_notin_sicen.gpkg') RES_OBS = to_sicen(OBS_RHOMEO) RES_OBS.to_postgis( 'saisie_observation', con_sicen, 'saisie', if_exists='append', index=False, dtype=get_columns_dtypes( 'saisie_observation', 'saisie', con_sicen, RES_OBS ) ) # .to_file('/home/colas/Documents/9_PROJETS/4_SICEN/RECOVERY/rhomeo_data_notin_sicen.gpkg') update_to_sql( RES_OBS[['id_lot','id_origine','effectif','effectif_min']], con_sicen, 'saisie_observation', 'saisie', ['id_lot','id_origine'], dtype=get_columns_dtypes( 'saisie_observation', 'saisie', con_sicen, RES_OBS[['id_lot','id_origine','effectif','effectif_min']] ) ) # Récupération des relevés id_relvrho = gdf_obs[is_rhomeo].obse_relv_id.unique() relv = list_releve(id_relv=id_relvrho) relv_rh = relv.relv_nom