Python_scripts/1_SICEN/SERENA_siege/recovery_rhomeo.py

465 lines
17 KiB
Python

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import pandas_access as mdb
from pycen import con_sicen, update_to_sql
import pandas as pd
import geopandas as gpd
from os import path
postgis_sch = 'serenabase'
ACCESS_PATH = '/media/colas/SRV/FICHIERS/OUTILS/BASES DE DONNEES/RHOMEO/2012/CEN-38_RHOMEO_BDD_2012'
ACCESS_FILE = 'BD_Rhomeo_data.mdb'
DICT_COLS = {
'obse_id':'id_origine',
}
def get_columns_oftable(tab,sch,con):
res = con.dialect.get_columns(con,tab,sch)
return [x['name'] for x in res]
def get_columns_dtypes(tab,sch,con,df:pd.DataFrame=None):
res = con.dialect.get_columns(con,tab,sch)
if isinstance(df,pd.DataFrame):
return {x['name']:x['type'] for x in res if x['name'] in df.columns}
else:
return {x['name']:x['type'] for x in res}
def match_columns(df,tab,sch,con):
cols_saisie = get_columns_oftable(tab,sch,con)
obs_cols = df.columns
return obs_cols[obs_cols.isin(cols_saisie)]
def lower(df):
for c in df.columns :
df[c] = df[c].str.lower() if df[c].apply(type).eq(str).all() else df[c]
return df
def get_odat_obse(id_obse:gpd.np.ndarray|list|pd.Series|int=None):
sql = '''
SELECT
o.odat_obse_id,
c.choi_nom,
l.list_nom,
o.odat_nom
FROM {sch}.rnf_odat o
LEFT JOIN {sch}.rnf_choi c ON c.choi_id = o.odat_choi_id
LEFT JOIN {sch}.rnf_list l ON l.list_id = o.odat_c_list_id
'''.format(sch=postgis_sch)
if isinstance(id_obse,int) :
sql += ' WHERE odat_obse_id = %i'%id_obse
elif isinstance(id_obse,(list,gpd.pd.Series,gpd.np.ndarray)) :
sql += ' WHERE odat_obse_id IN {lst_id}'.format(lst_id=tuple(id_obse))
result = pd.read_sql_query(sql,con_sicen,index_col=['odat_obse_id','list_nom']).replace({r' \(RhoMeO\)':'',r'\?':'',r'[.]':''},regex=True)
df = lower(result).pivot(columns='choi_nom',values='odat_nom').reset_index(drop=False)
return format_comportement(df)
def format_comportement(oda:pd.DataFrame):
DICT = {
'exuvie/émergent':'ODO_Exuvie/émergence',
'mâles+femelles':'ODO_Mâles+Femelles',
'autre':'ODO_Autre',
'territoriale':'ODO_Territorial',
'ponte':'ODO_Ponte',
'accouplement':'ODO_Tandem',
'tandem':'ODO_Tandem',
'aucun':None,
}
return oda.replace(DICT)
def format_odat(df,odat):
DICT_COLS = {
'comportement':'reprostatut',
'strate':'type_effectif',
'rmq_odat':'remarque_obs'
}
df_odat = df[['obse_id','obse_nombre','obse_nom','cd_nom','ordre']].merge(
odat,
left_on='obse_id',right_on='odat_obse_id'
)
nb_cols = odat.columns[odat.columns.str.startswith('nb')]
df_odat['rmq_odat'] = None
for c in nb_cols:
test1 = df_odat.obse_nombre!=df_odat[c]
test2 = df_odat[c].notna()
test3 = df_odat.rmq_odat.notna()
test = test1&test2&test3
tes = test1&test2&~test3
if any(tes) or any(test):
tt = '{rmq_o}; {c}:{data}'.format(rmq_o=df_odat[test].rmq_odat,c=c,data=df_odat[test][c])
df_odat.loc[test,'rmq_odat'] = df_odat[test].rmq_odat+';'+c+':'+df_odat[test][c]
df_odat.loc[tes,'rmq_odat'] = c+':'+df_odat[tes][c]
return df_odat.rename(columns=DICT_COLS)[['odat_obse_id',*DICT_COLS.values()]]
def get_obs_serena(insicen:bool=None):
sql = '''
SELECT o.*,
s.site_nom,
s.site_ref_sig,
CASE WHEN t.taxo_id = 203471 THEN (SELECT cd_nom FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT cd_nom FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.cd_nom IS NULL THEN ta.cd_nom::text
ELSE tax.cd_nom
END cd_nom,
CASE WHEN t.taxo_id = 203471 THEN (SELECT phylum FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT phylum FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.phylum IS NULL THEN ta.phylum
ELSE tax.phylum
END phylum,
CASE WHEN t.taxo_id = 203471 THEN (SELECT classe FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT classe FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.classe IS NULL THEN ta.classe
ELSE tax.classe
END classe,
CASE WHEN t.taxo_id = 203471 THEN (SELECT ordre FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT ordre FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.ordre IS NULL THEN ta.ordre
ELSE tax.ordre
END ordre,
CASE WHEN t.taxo_id = 203471 THEN (SELECT famille FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT famille FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.famille IS NULL THEN ta.famille
ELSE tax.famille
END famille,
CASE WHEN t.taxo_id = 203471 THEN (SELECT nom_complet FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT nom_complet FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.nom_complet IS NULL THEN ta.nom_complet
ELSE tax.nom_complet
END nom_complet,
CASE WHEN t.taxo_id = 203471 THEN (SELECT nom_vern FROM inpn.taxref WHERE cd_nom = '54213' )
WHEN t.taxo_id = 203491 THEN (SELECT nom_vern FROM inpn.taxref WHERE cd_nom = '521494' )
WHEN tax.nom_vern IS NULL THEN ta.nom_vern
ELSE tax.nom_vern
END nom_vern,
CASE WHEN tmpo.ogll_lon = '999' THEN tmps.sgll_lon::float
ELSE tmpo.ogll_lon::float
END lon,
CASE WHEN tmpo.ogll_lat = '999' THEN tmps.sgll_lat::float
ELSE tmpo.ogll_lat::float
END lat
FROM {sch}.rnf_obse o
LEFT JOIN {sch}.rnf_relv r ON r.relv_id = o.obse_relv_id
LEFT JOIN {sch}.rnf_site s ON s.site_id = o.obse_site_id
LEFT JOIN {sch}.tmp_sgll tmps ON tmps.sgll_site_id = o.obse_site_id
LEFT JOIN {sch}.tmp_ogll tmpo ON tmpo.ogll_obse_id = o.obse_id
LEFT JOIN serenarefe.rnf_taxo t ON t.taxo_id = o.obse_taxo_id
LEFT JOIN inpn.taxons_isere_absents_taxref ta ON ta.id_taxon = o.obse_taxo_id
LEFT JOIN inpn.taxref tax ON t.taxo_mnhn_id = tax.cd_nom::int
--LEFT JOIN {sch}.rnf_odat od ON od.odat_obse_id = o.obse_id
'''.format(sch=postgis_sch)
if insicen is not None :
sql_sicen = 'SELECT DISTINCT id_origine FROM saisie.saisie_observation WHERE id_lot=3'
sicen_obs = pd.read_sql_query(sql_sicen,con_sicen)
if insicen is True :
sql += 'WHERE obse_id IN {lst_id}'.format(lst_id=tuple(sicen_obs.id_origine.astype(int)))
if insicen is False :
sql += 'WHERE obse_id NOT IN {lst_id}'.format(lst_id=tuple(sicen_obs.id_origine.astype(int)))
return pd.read_sql_query(sql,con_sicen)\
.dropna(axis=1,how='all')
def list_releve(id_relv=None,source='postgis'):
if source=='postgis':
df_choi = pd.read_sql_table('rnf_choi',con_sicen,postgis_sch)\
.dropna(axis=1,how='all')
df_relv = pd.read_sql_table('rnf_relv',con_sicen,postgis_sch)\
.dropna(axis=1,how='all')
# Correction coquille
df_relv.loc[df_relv.relv_categ_choi_id==100800,'relv_categ_choi_id'] = 100008
# Jointure relevé / catégorie
df = df_relv.merge(df_choi[['choi_id','choi_nom']],how='inner',left_on='relv_categ_choi_id',right_on='choi_id')\
.drop(columns=['relv_categ_choi_id'])
# Mise en forme des dates
lstcol_dat = df.columns[df.columns.str.contains('date')]
for lcd in lstcol_dat:
# df[lcd] = df[lcd].replace({'VIDE':None})
df.loc[df[lcd]=='2000 à 2001',lcd] = '2000' if '1date' in lcd else '2001'
df[lcd].replace({'VIDE':None},inplace=True)
df[lcd] = pd.to_datetime(df[lcd])
if id_relv is not None:
if isinstance(id_relv,int) : filtre = df.relv_id == id_relv
elif isinstance(id_relv,(list,gpd.pd.Series,gpd.np.ndarray)) : filtre = df.relv_id.isin([*id_relv])
return df[filtre]
else:
return df
def get_serena_obs(srce_id=None,source:str='access'):
DICT_STR = {
'avenir':'cen isère',
'be':'personnel'
}
if source == 'postgis':
sql = 'SELECT user_id,user_srce_id FROM {sch}.rnf_user'.format(sch=postgis_sch)
if srce_id is not None:
if isinstance(srce_id,int) :
sql += ' WHERE user_srce_id = %i'%srce_id
elif isinstance(srce_id,(list,gpd.pd.Series,gpd.np.ndarray)) :
sql += ' WHERE user_srce_id IN {lst_id}'.format(lst_id=tuple(srce_id))
return lower(pd.read_sql_query(sql,con_sicen))
elif source == 'access':
df = mdb.read_table(path.join(ACCESS_PATH,ACCESS_FILE),'rev_obs')
if srce_id is not None:
if isinstance(srce_id,int) :
df = df[df.rev_observateur==srce_id]
elif isinstance(srce_id,(list,gpd.pd.Series,gpd.np.ndarray)) :
df = df[df.rev_observateur.isin([*srce_id])]
df[['nom','prenom']] = df.nom_observateur.str.split(' ',1,expand=True)
return lower(df).replace(DICT_STR)
def get_sicen_obs(personne_id=None):
sql = 'SELECT id_personne,prenom,nom,id_structure,nom_structure FROM md.personne JOIN md.structure USING (id_structure)'.format(sch=postgis_sch)
if personne_id is not None:
if isinstance(personne_id,int) :
sql += ' WHERE personne_id = %i'%personne_id
elif isinstance(personne_id,str) :
sql += ' WHERE personne_id = %i'%int(personne_id)
elif isinstance(personne_id,(list,gpd.pd.Series,gpd.np.ndarray)) :
sql += ' WHERE personne_id IN {lst_id}'.format(lst_id=tuple(personne_id))
return lower(pd.read_sql_query(sql,con_sicen))
def get_sicen_etude():
sql = 'SELECT * FROM md.etude'
return pd.read_sql_query(sql, con_sicen)
def get_serena_choi(choi_id:gpd.np.ndarray|list|pd.Series|int=None):
''''''
sql = 'SELECT choi_id,choi_nom FROM {sch}.rnf_choi'.format(sch=postgis_sch)
if isinstance(choi_id,int) :
sql += ' WHERE choi_id = %i'%choi_id
elif isinstance(choi_id,(list,gpd.pd.Series,gpd.np.ndarray)) :
sql += ' WHERE choi_id IN {lst_id}'.format(lst_id=tuple(choi_id))
return lower(pd.read_sql_query(sql,con_sicen))
def get_serena_site(site_id:gpd.np.ndarray|list|pd.Series|int=None):
sql = 'SELECT site_id,site_nom FROM {sch}.rnf_site'.format(sch=postgis_sch)
if isinstance(site_id,int) :
sql += ' WHERE site_id = %i'%site_id
elif isinstance(site_id,(list,gpd.pd.Series,gpd.np.ndarray)) :
sql += ' WHERE site_id IN {lst_id}'.format(lst_id=tuple(site_id))
return pd.read_sql_query(sql,con_sicen)
def get_sicen_pcol():
sql = '''SELECT * FROM "md"."protocole";'''
return lower(pd.read_sql_query(sql,con_sicen))
def _crsp(left:pd.DataFrame,right:pd.DataFrame,lefton,righton,idleft,idright):
df = left.merge(right,left_on=lefton,right_on=righton)
return dict(zip(df[idleft],df[idright]))
def crsp_relv(relv:pd.Series):
seren_relv = lower(list_releve(relv.unique()))
sicen_relv = lower(get_sicen_etude())
DICT_RELV_ID = _crsp(left=seren_relv,right=sicen_relv,lefton='relv_nom',righton='nom_etude',idleft='relv_id',idright='id_etude')
return relv.replace(DICT_RELV_ID)
def crsp_obs(obs:pd.Series):
obs_seren = get_serena_obs(obs.unique())
obs_sicen = get_sicen_obs()
DICT_OBS_ID = _crsp(left=obs_seren,right=obs_sicen,lefton=['nom','structure_obs'],righton=['nom','nom_structure'],idleft='rev_observateur',idright='id_personne')
return obs.replace(DICT_OBS_ID)
def crsp_pcol(pcol:pd.Series):
pcol_seren = get_serena_choi(pcol.unique())
pcol_sicen = get_sicen_pcol()
DICT_PCOL_ID = _crsp(left=pcol_seren,right=pcol_sicen,lefton='choi_nom',righton='libelle',idleft='choi_id',idright='id_protocole')
return pcol.replace(DICT_PCOL_ID)
def crsp_valid(valid:pd.Series):
pcol_seren = get_serena_choi(valid.unique())
pcol_sicen = get_sicen_pcol()
DICT_PCOL_ID = _crsp(left=pcol_seren,right=pcol_sicen,lefton='choi_nom',righton='libelle',idleft='choi_id',idright='id_protocole')
return valid.replace(DICT_PCOL_ID)
def crsp_abond(abond:pd.Series):
ab_seren = get_serena_choi(abond.dropna().unique())
DICT_ABOND_ID = dict(zip(ab_seren.choi_id,ab_seren.choi_nom))
return abond.replace(DICT_ABOND_ID)
def crsp_site(site:pd.Series):
sit_seren = get_serena_site(site.dropna().unique())
DICT_SITE_ID = dict(zip(sit_seren.site_id,sit_seren.site_nom))
return site.replace(DICT_SITE_ID)
def get_structure_id(id_pers:gpd.np.ndarray|list|pd.Series|int=None):
sql = 'SELECT id_personne, id_structure FROM md.personne'
if isinstance(id_pers,int) :
sql += ' WHERE id_personne = %i'%id_pers
elif isinstance(id_pers,(list,gpd.pd.Series,gpd.np.ndarray)) :
sql += ' WHERE id_personne IN {lst_id}'.format(lst_id=tuple(id_pers))
return pd.read_sql_query(sql,con_sicen)
def format_effectif(DF):
df = DF.copy()
isnum = df.effectif_min.str.isnumeric()
df.loc[isnum,'effectif'] = df[isnum].effectif_min.copy()
df.loc[isnum,'effectif_min'] = None
df.loc[~isnum,'effectif_textuel'] = df[~isnum].effectif_min.copy()
df.loc[~isnum,'effectif_min'] = df[~isnum].effectif_textuel.str.split(r'[-+]',expand=True)[0]
df.loc[~isnum,'effectif_max'] = df[~isnum].effectif_textuel.str.split(r'[-+]',expand=True)[1]
return df.replace({'':None})
def to_sicen(DF):
CRSP_COLUMNS = {
'obse_id':'id_origine',
'obse_relv_id':'id_etude',
'obse_obsv_id':'observateur',
'obse_detm_id':'validateur',
'obse_date':'date_obs',
# 'obse_site_id':'localisation',
'obse_pcole_choi_id':'id_protocole',
'obse_validat_choi_id':'statut_validation',
'obse_confid_choi_id':'diffusable',
'obse_abond_choi_id':'effectif_textuel',
'obse_nombre':'effectif_min',
'site_nom':'localisation',
'site_ref_sig':'id_waypoint',
'lat':'latitude',
'lon':'longitude',
}
df = DF.copy()
if df.geometry.name != 'geometrie' :
df.rename_geometry('geometrie',inplace=True)
df.obse_relv_id = crsp_relv(df.obse_relv_id)
df.obse_obsv_id = crsp_obs(df.obse_obsv_id)
df.obse_detm_id = crsp_obs(df.obse_detm_id).astype(int)
df.obse_date = pd.to_datetime(df.obse_date)
# df.obse_site_id = crsp_site(df.obse_site_id)
df.obse_pcole_choi_id = crsp_pcol(df.obse_pcole_choi_id)
df.loc[df.obse_validat_choi_id==100373,'obse_validat_choi_id'] = 'validée'
df.loc[df.obse_confid_choi_id==100473,'obse_confid_choi_id'] = True
df.obse_abond_choi_id = crsp_abond(df.obse_abond_choi_id)
df.rename(columns=CRSP_COLUMNS,inplace=True)
# Tag du lot SERENA
df['id_lot'] = 3
# Jointure des id_structure
struct = get_structure_id(df.observateur.unique())
df = df.merge(struct,how='left',left_on='observateur', right_on='id_personne')\
.rename(columns={'id_structure':'structure'})
cols = match_columns(df,'saisie_observation','saisie',con_sicen)
# Formatage des effectifs
return format_effectif(df[cols])
def drop_cdnom_missing(df):
isna = df.cd_nom.isna()
idx_na = df[isna].index
return df.drop(idx_na)
if __name__ == "__main__":
obs_serena = get_obs_serena(insicen=True)
isna = obs_serena.cd_nom.isna()
# Recréation des géométries
gdf_obs = obs_serena.set_geometry(gpd.points_from_xy(obs_serena.lon,obs_serena.lat))
gdf_obs.set_crs(4326,inplace=True)
gdf_obs.to_crs(2154,inplace=True)
# Identification des données RhoMeo
is_rhomeo = gdf_obs.obse_nom.str.contains('rhoméo',na=False,case=False)
gdf_obs.loc[is_rhomeo,['obse_id','obse_nom','obse_date','cd_nom']]
obs_rhomeo = gdf_obs[is_rhomeo]\
.dropna(axis=1,how='all')\
.drop(columns=['obse_habi_id'])
rhoisna = obs_rhomeo.cd_nom.isna()
obs_rhomeo = drop_cdnom_missing(obs_rhomeo)
odat = get_odat_obse(obs_rhomeo.obse_id)
OBS_RHOMEO = obs_rhomeo.merge(
format_odat(obs_rhomeo,odat),
left_on='obse_id',right_on='odat_obse_id',how='left'
).drop(columns='odat_obse_id')
OBS_RHOMEO.to_file('/home/colas/Documents/9_PROJETS/4_SICEN/RECOVERY/rhomeo_data_notin_sicen.gpkg')
RES_OBS = to_sicen(OBS_RHOMEO)
RES_OBS.to_postgis(
'saisie_observation',
con_sicen,
'saisie',
if_exists='append',
index=False,
dtype=get_columns_dtypes(
'saisie_observation',
'saisie',
con_sicen,
RES_OBS
)
)
# .to_file('/home/colas/Documents/9_PROJETS/4_SICEN/RECOVERY/rhomeo_data_notin_sicen.gpkg')
update_to_sql(
RES_OBS[['id_lot','id_origine','effectif','effectif_min']],
con_sicen,
'saisie_observation',
'saisie',
['id_lot','id_origine'],
dtype=get_columns_dtypes(
'saisie_observation',
'saisie',
con_sicen,
RES_OBS[['id_lot','id_origine','effectif','effectif_min']]
)
)
# Récupération des relevés
id_relvrho = gdf_obs[is_rhomeo].obse_relv_id.unique()
relv = list_releve(id_relv=id_relvrho)
relv_rh = relv.relv_nom