Python_scripts/2_MEDWET/get_zh_cen.py

512 lines
20 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#Nom : : get_zh_cen.py
#Description :
#Copyright : 2021, CEN38
#Auteur : Colas Geier
#Version : 1.0
import re
import pandas as pd
import pandas_access as mdb
import numpy as np
from sqlalchemy.sql.expression import column
from sqlalchemy import create_engine
from geoalchemy2 import Geometry
isin_bdd = True
# Parametres bdd OUT
user_zh = 'postgres'
pwd_zh = 'tutu'
adr_zh = '192.168.60.10'
base_zh = 'bd_cen'
con_zh = create_engine('postgresql+psycopg2://{0}:{1}@{2}/{3}'.format(user_zh,pwd_zh,adr_zh,base_zh), echo=False)
#####################################
### Fonctions générales ###
#####################################
def _aggr_cols(df, lst_col, sep=''):
df['aggreg'] = ''
for c,col in enumerate(lst_col):
add = ''
if c > 0:
add = sep
df.loc[~df[col].isna(),'aggreg'] = df.loc[~df[col].isna(),'aggreg'] + add + df.loc[~df[col].isna(),col]
return df
def to_tuple(obj):
if isinstance(obj, list): obj = tuple(obj)
if isinstance(obj, (int, str)) : obj = tuple([obj])
return obj
def to_colStringSQL(obj):
if isinstance(obj, (int, str)) : obj = str(obj)
if isinstance(obj, list): obj = ",".join(obj)
return obj
def to_upper(obj):
if isinstance(obj, tuple): obj = tuple([o.upper() for o in list(obj)])
if isinstance(obj, list) : obj = [o.upper() for o in obj]
if isinstance(obj, str) : obj = obj.upper()
return obj
def to_upperfirst(obj):
if isinstance(obj, tuple): obj = tuple([o.upper()[0] + o.lower()[1:] for o in list(obj)])
if isinstance(obj, list) : obj = [o.upper()[0] + o.lower()[1:] for o in obj]
if isinstance(obj, str) : obj = obj.upper()[0] + obj.lower()[1:]
return obj
def _get_table(con, schema, table, ids=None, nom=None, cols=None, params_col={}):
sql = 'SELECT * FROM {sch}.{tab}'.format(sch=schema, tab=table)
if cols : sql = sql.replace('*', to_colStringSQL(cols) )
if ids or nom or params_col : sql = sql + ' WHERE '
if ids : sql = sql + 'id IN %(ids)s'
if ids and (nom or params_col) : sql = sql + ' AND '
if nom : sql = sql + 'nom IN %(nom)s'
if nom and params_col : sql = sql + ' AND '
if params_col :
sql = sql + ' AND '.join([k + ' IN %({})s'.format(k) for k in params_col.keys()])
params_col = {key:to_tuple(params_col[key]) for key in params_col.keys()}
df = pd.read_sql(
sql = sql,
con = con,
params = {'ids': to_tuple(ids), 'nom': to_tuple(nom), **params_col })
return df
#####################################
### schema personnes ###
#####################################
class pers:
def __init__(self):
self.schema = 'personnes'
self.con = con_zh
# self._get_table = _get_table
def get_auteur(self, nom=None, prenom=None):
sql = 'SELECT * FROM %s.personne'%self.schema
if nom or prenom : sql = sql + ' WHERE '
if nom :
sql = sql + 'nom IN %(nom)s'
nom = to_upper(nom)
if nom and prenom : sql = sql + ' AND '
if prenom :
sql = sql + 'prenom IN %(prenom)s'
prenom = to_upperfirst(prenom)
df = pd.read_sql(
sql = sql,
con = self.con,
params = {'nom': to_tuple(nom), 'prenom': to_tuple(prenom) })
return df
def get_organisme(self, ids=None, nom=None):
table = 'organisme'
return _get_table(self.con, self.schema, table, ids=ids, nom=nom)
#####################################
### schema sites ###
#####################################
class sites:
def __init__(self):
self.schema = 'sites'
self.con = con_zh
self.typ_milieux = self._get_typ_milieux()
self.typo_sdage = self._get_typo_sdage()
self.typ_site = self._get_typ_site()
self.auteur = pers().get_auteur()
self.organisme = pers().get_organisme()
def _get_typ_milieux(self, ids=None, nom=None):
table = 'type_milieu'
df = _get_table(self.con, self.schema, table, ids=ids, nom=nom)
return df
def _get_typo_sdage(self, ids=None, nom=None):
table = 'typo_sdage'
df = _get_table(self.con, self.schema, table, ids=ids, nom=nom)
return df
def _get_typ_site(self, ids=None, nom=None):
table = 'type_site'
df = _get_table(self.con, self.schema, table, ids=ids, nom=nom)
return df
def _merge_orga(self, df, split_cols):
org = self.organisme
aut = self.auteur
df = df.copy()
for c in split_cols:
if not isinstance(df[c], int): df[c] = df[c].astype(float)
df[c].replace(aut.id.tolist(), aut.id_organisme.tolist(), inplace=True)
df[c].replace(org.id.tolist(), org.nom.tolist(), inplace=True)
df['organisme'] = None
for c in split_cols:
df.loc[df.organisme.isna(), 'organisme'] = df.loc[df['organisme'].isna(), c]
for c in split_cols:
comp = df.loc[~df[c].isna(),'organisme'].compare(df.loc[~df[c].isna(), c])
if not comp.empty:
comp['test'] = comp.apply(lambda x: x['other'] in x['self'], axis=1)
comp = comp[~comp.test]
if not comp.empty:
df.loc[comp.index,'organisme'] = comp.self + ' & ' + comp.other
df.drop(columns=split_cols, inplace=True)
return df
def _merge_author(self, df, col_aut, orga=False):
# récupération des auteurs
aut = self.auteur.fillna('')
aut['nom_prenom'] = (aut['nom'] + ' ' + aut['prenom']).str.strip()
aut['id'] = aut['id'].astype(str)
# merge des auteurs
r_id = df[['id', col_aut]].copy()
r_idSplit = r_id[col_aut].str.split(' & ', expand=True)
r_id = r_id.join(r_idSplit)
cSplit = r_idSplit.shape[1]
cSplit = list(range(cSplit))
if orga:
# récup des organismes
org = self._merge_orga(r_id, cSplit)
r_id[cSplit] = r_id[cSplit].replace(aut['id'].tolist(),aut['nom_prenom'].tolist())
r_id = _aggr_cols(r_id,cSplit,' & ') \
.rename(columns={'aggreg': 'auteur'}) \
.drop(columns=cSplit)
if orga:
# merge des organismes
r_id = pd.merge(r_id,org, on=['id', col_aut])
df = pd.merge(df,r_id, on=['id', col_aut]) \
.drop(columns=[col_aut])
return df
def get_sitesInfos(self, ids=None, nom=None, columns=None, with_nameOrga=False, details=False):
drop = []
table = 'sites'
df = _get_table(self.con, self.schema, table, ids=ids, nom=nom, cols=columns)
# récupération des auteurs
if 'id_auteur' in df.columns:
df = self._merge_author(df=df, col_aut='id_auteur', orga=with_nameOrga)
# merge type_site
if 'id_type_site' in df.columns:
df = pd.merge(df, self.typ_site, left_on='id_type_site', right_on='id', suffixes=('','_y') ) \
.drop(columns=['id_type_site', 'id_y']) \
.rename(columns={'nom_y': 'type_site', 'description': 'desc_type_site'})
drop += ['desc_type_site']
# merge typo_sdage
if 'id_typo_sdage' in df.columns:
df = pd.merge(df, self.typo_sdage, left_on='id_typo_sdage', right_on='id', suffixes=('','_y') ) \
.drop(columns=['id_typo_sdage', 'id_y']) \
.rename(columns={'nom_y': 'typo_sdage', 'description': 'desc_typo_sdage'})
drop += ['desc_typo_sdage']
# merge type_milieu
if 'id_type_milieu' in df.columns:
df = pd.merge(df, self.typ_milieux, left_on='id_type_milieu', right_on='id', suffixes=('','_y') ) \
.drop(columns=['id_type_milieu', 'id_y']) \
.rename(columns={'nom_y': 'type_milieu', 'description': 'desc_type_milieu', 'nom_court': 'nom_court_milieu'})
drop += ['desc_type_milieu', 'nom_court_milieu']
if not details:
df.drop(columns=drop, inplace=True)
return df.sort_values('id')
def get_sitesGeom(self, id_site=None, nom_site=None, columns=None, last_update=False, with_nameOrga=False):
from shapely.wkb import loads
import geopandas as gpd # set_geometry
if columns:
if not isinstance(columns, list): columns = [columns]
if 'id' not in columns: columns.insert(0,'id')
if 'id_site' not in columns: columns.insert(1,'id_site')
if 'geom' not in columns: columns.insert(2,'geom')
table = 'sites'
df = _get_table(self.con, self.schema, table, ids=id_site, nom=nom_site, cols='id')
idSite = df.id.tolist()
table = 'r_sites_geom'
df = _get_table(self.con, self.schema, table, params_col={'id_site':idSite}, cols=columns)
if last_update:
df.drop_duplicates(subset=['id_site'], keep='last', inplace=True)
df.reset_index(inplace=True, drop=True)
df['geom'] = [(loads(geom, hex=True)) for geom in df['geom']]
df = df.set_geometry('geom', crs='EPSG:2154')
# merge auteur
if 'id_auteur' in df.columns:
df = self._merge_author(df=df, col_aut='id_auteur', orga=with_nameOrga)
return df
#####################################
### schema sites ###
#####################################
class zh:
def __init__(self):
self.schema = 'zones_humides'
self.con = con_zh
def _get_param(self, param_table, type_table=None, type_court=True):
if type_table:
typ = _get_table(self.con, self.schema, table=type_table)
par = _get_table(self.con, self.schema, table=param_table, params_col={'id_type':typ.id.tolist()})
df = pd.merge(par, typ, left_on='id_type', right_on='id', how='left', suffixes=(None, '_typ')) \
.drop(columns=['id_type','id_typ'])
if 'description_typ' in df.columns: del df['description_typ']
if type_court: df = df.drop(columns=['nom_typ']).rename(columns={'nom_court_typ':'type'})
else : df = df.drop(columns=['nom_court_typ'],errors='ignore').rename(columns={'nom_typ':'type'})
df = df.set_index(['id', 'type']).reset_index()
else:
df = _get_table(self.con, self.schema, table=param_table)
return df
def _get_relation_tab(self, tab, id_site=None, nom_site=None, last_update=False, geom=False):
table = 'sites'
dfSG = sites().get_sitesGeom(columns='date', id_site=id_site, nom_site=nom_site, last_update=last_update)
if not geom and not dfSG.empty:
dfSG.drop('geom',1,inplace=True)
ids = dfSG.id.tolist()
table = tab
if ids :
df = _get_table(self.con, self.schema, table, params_col={'id_geom_site':ids})
if not df.empty:
df = pd.merge(dfSG,df, left_on='id', right_on='id_geom_site', suffixes=('_x', None)) \
.drop(['id_x','id_geom_site'],1) \
.set_index('id').reset_index()
return df
else:
print('PAS de géometries de sites sélectionnées ...')
def get_delim(self, id_site=None, nom_site=None, last_update=False, geom=False, nom_type_court=True):
table = 'r_site_critdelim'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
dic = self._get_param(type_table='type_param_delim_fct', param_table='param_delim_fct', type_court=nom_type_court)
if not df.empty:
df = pd.merge(df,dic, left_on='id_crit_delim', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_crit_delim'],1) \
.rename(columns={'description_y':'desc_param', 'nom_court':'nom_court_crit','nom':'nom_crit'}) \
.sort_values('id_site')
if df.nom_court_crit.isnull().sum() == df.shape[0] : del df['nom_court_crit']
# typ = df.type.unique()
# x = {}
# for t in typ:
# x[t] = df[df.type == t]
# x[t] = x[t].rename(columns={'nom': t}) \
# .reset_index(drop=True)
return df
def get_fct(self, id_site=None, nom_site=None, last_update=False, geom=False, nom_type_court=True):
table = 'r_site_fctecosociopatri'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
dic = self._get_param(type_table='type_param_fct', param_table='param_fct_eco_socio_patri', type_court=nom_type_court)
if not df.empty:
df = pd.merge(df,dic, left_on='id_fct', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_fct'],1) \
.rename(columns={'description_y':'desc_param', 'nom_court':'nom_court_fct','nom':'nom_fct'}) \
.sort_values('id_site')
if df.nom_court_fct.isnull().sum() == df.shape[0] : del df['nom_court_fct']
return df
def get_connex(self, id_site=None, nom_site=None, last_update=False, geom=False):
table = 'r_site_type_connect'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
dic = self._get_param(param_table='param_type_connect')
if not df.empty:
df = pd.merge(df,dic, left_on='id_param_connect', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_param_connect'],1) \
.rename(columns={'description_y':'desc_param', 'nom':'connexion'}) \
.sort_values('id_site')
return df
def get_sub(self, id_site=None, nom_site=None, last_update=False, geom=False):
table = 'r_site_sub'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
dic = self._get_param(type_table='type_param_sub', param_table='param_sub', type_court=False)
d1 = dic[dic.type == 'Submersion étendue']
d2 = dic[dic.type == 'Submersion fréquente']
if not df.empty:
df = pd.merge(df,d1, how='left', left_on='id_etendsub', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_etendsub', 'type'],1) \
.rename(columns={'description':'desc_param_etend', 'nom':'Submersion étendue'})
df = pd.merge(df,d2, how='left', left_on='id_freqsub', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_freqsub', 'type'],1) \
.rename(columns={'description':'desc_param_freq', 'nom':'Submersion fréquente'}) \
.sort_values('id_site')
df.rename(columns={'id_origsub': 'origine_sub'}, inplace=True)
if df['desc_param_etend'].isnull().sum() == df.shape[0] : del df['desc_param_etend']
if df['desc_param_freq'].isnull().sum() == df.shape[0] : del df['desc_param_freq']
return df
def get_usageprocess(self, id_site=None, nom_site=None, last_update=False, geom=False):
table = 'r_site_usageprocess'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
dic1 = self._get_param(param_table='param_usageprocess')
dic2 = self._get_param(param_table='param_position')
if not df.empty:
df = pd.merge(df,dic1, how='left', left_on='id_usageprocess', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_usageprocess'],1) \
.rename(columns={'description':'desc_param_usag', 'nom':'usageprocess'})
df = pd.merge(df,dic2, how='left', left_on='id_position', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_position'],1) \
.rename(columns={'description':'desc_param_pos', 'nom':'position'}) \
.sort_values('id_site')
return df
def _get_r_toponymie(self, ids=None):
table = 'r_toponymie'
df = _get_table(self.con, self.schema, table=table, ids=ids)
dic1 = self._get_param(param_table='liste_table_topohydro')
if not df.empty:
df = pd.merge(df,dic1, left_on='id_orig', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_orig'],1)
n_tab = df.nom_table.unique()
for tab in n_tab:
iids = df.loc[df.nom_table == tab, 'id_topo'].to_list()
if tab == 'orig_hydro': dic = _get_table(self.con, self.schema, table='orig_hydro', ids=iids)
if tab == 'troncon_hydro': dic = ref_hydro.get_troncon(cols=['id','nom'], ids=iids)
df.loc[df.nom_table == tab, 'id_topo'] = df.loc[df.nom_table == tab, 'id_topo'].replace(dic.id.to_list(),dic.nom.to_list())
if tab == 'troncon_hydro': df = pd.merge(df, dic, left_on='id_topo', right_on='nom', suffixes=(None,'_y')) \
.drop(columns=['id_y', 'nom'])
df.rename(columns={'id_topo':'toponymie'})
return df
def get_regHydro(self, id_site=None, nom_site=None, last_update=False, geom=False):
table = 'r_site_reghydro'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
dic1 = self._get_param(param_table='param_reg_hydro')
dic2 = self._get_param(param_table='param_permanance')
if not df.empty:
dic3 = self._get_r_toponymie(ids=df.id_toponymie.unique().tolist())
df.in_out = df.in_out.replace([True,False],['entree','sortie'])
df = pd.merge(df,dic1, left_on='id_reg_hydro', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_reg_hydro'],1) \
.rename(columns={'description':'desc_param_regHydri', 'nom':'regime_hydri'})
df = pd.merge(df,dic2, left_on='id_permanance', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_permanance'],1) \
.rename(columns={'description':'desc_param_perm', 'nom':'permanance'})
df = pd.merge(df,dic3, left_on='id_toponymie', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y','id_toponymie'],1) \
.rename(columns={'description':'desc_topo'}) \
.sort_values('id_site')
return df
def get_habitat(self, id_site=None, nom_site=None, last_update=False, geom=False):
table = 'r_site_habitat'
df = self._get_relation_tab(tab=table,id_site=id_site,nom_site=nom_site,last_update=last_update,geom=geom)
if not df.empty:
dic = ref_hab().get_CB(ids=df.id_cb.unique().tolist(),cols=['id','lb_hab_fr'])
df = pd.merge(df,dic, left_on='id_cb', right_on='id', suffixes=(None,'_y')) \
.drop(['id_y'],1) \
.rename(columns={'id_cb':'code_cb'}) \
.sort_values('id_site')
return df
#####################################
### schema ref_habitats ###
#####################################
class ref_hab:
def __init__(self):
self.schema = 'ref_habitats'
self.con = con_zh
def get_CB(self, ids=None, cols=None, params_col={}):
table = 'corine_biotope'
df = _get_table(self.con, self.schema, table=table, ids=ids, cols=cols, params_col=params_col)
return df
#####################################
### schema ref_hydro ###
#####################################
class ref_hydro:
def __init__(self):
self.schema = 'ref_hydro'
self.con = con_zh
def get_troncon(self, ids=None, cols=None, params_col={}):
table = 'troncon_hydro'
df = _get_table(self.con, self.schema, table=table, ids=ids, cols=cols, params_col=params_col)
return df
#####################################
### Bilan ###
#####################################
def get_bilan(code_site=None, nom_site=None):
'''
:sites: list,str. Nom de code du site de la zh.
'''
SITES = sites()
ZH = zh()
info = SITES.get_sitesInfos(ids=code_site, nom=nom_site)
CB = ZH.get_habitat(id_site=code_site, nom_site=nom_site)
delim = ZH.get_delim(id_site=code_site, nom_site=nom_site)
desc = ZH.get_usageprocess(id_site=code_site, nom_site=nom_site)
rghyd = ZH.get_regHydro(id_site=code_site, nom_site=nom_site)
subm = ZH.get_sub(id_site=code_site, nom_site=nom_site)
conn = ZH.get_connex(id_site=code_site, nom_site=nom_site)
fct = ZH.get_fct(id_site=code_site, nom_site=nom_site)
evall = SITES.get_sitesGeom().drop(columns=['geom'])
sub_con = pd.merge(subm, conn, how='outer', on=['id', 'id_site', 'date']) \
.rename(columns={'description': 'desc_connex'})
fctmt = {
'entree_eau': rghyd[rghyd.in_out == 'entree'].drop(columns=['in_out']),
'sortie_eau': rghyd[rghyd.in_out == 'sortie'].drop(columns=['in_out']),
'sub_connex': sub_con,
}
lst_df = {
'infos':info,
'corine_biotope': CB,
'delimitation': delim,
'description': desc,
'fonctionnement': fctmt,
'fonction': fct,
'evaluation': evall}
for key in lst_df:
if isinstance(lst_df[key], pd.DataFrame): lst_df[key].name = key
if isinstance(lst_df[key], dict):
for d in lst_df[key]:
lst_df[key][d].name = d
lst_df[key]['title'] = key
return lst_df
def write_bilan(df, output):
'''
:df: dict. Dictionnaire de DataFrame.
Ecriture d'un feuillet par élément du dictionnaire.
Le nom du DataFrame est le titre du feuillet.
output: str. chemin_vers_mon_fichier/mon_fichier.xlsx
'''
# Ecriture des données
with pd.ExcelWriter(output) as writer:
for d in df:
DF = df[d]
if isinstance(DF, pd.DataFrame):
DF.to_excel(writer,sheet_name=DF.name,startrow=1 , startcol=0, index=False, header=DF.columns)
ws = writer.book.active
writer.sheets[DF.name].cell(1,1,value=DF.name)
writer.save()
elif isinstance(DF, dict):
for i,d in enumerate(DF):
if d == 'title': continue
if i == 0:
row = 1
col = 0
else:
col = DF[d].shape[1] + col + 3
DF[d].to_excel(writer,sheet_name=DF['title'],startrow=row , startcol=col, index=False)
ws = writer.book.active
writer.sheets[DF['title']].cell(column=col+1,row=row,value=d)
writer.save()