345 lines
11 KiB
Python
345 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: UTF-8 -*-
|
|
#Nom : : tools.py
|
|
#Description :
|
|
#Copyright : 2021, CEN38
|
|
#Auteur : Colas Geier
|
|
#Version : 1.0
|
|
|
|
from pandas import Series, Index, read_sql, merge
|
|
|
|
|
|
|
|
#####################################
|
|
### Fonctions générales ###
|
|
#####################################
|
|
def _aggr_cols(df, lst_col, sep=''):
|
|
df['aggreg'] = ''
|
|
for c,col in enumerate(lst_col):
|
|
add = ''
|
|
if c > 0:
|
|
add = sep
|
|
df.loc[~df[col].isna(),'aggreg'] = df.loc[~df[col].isna(),'aggreg'] + add + df.loc[~df[col].isna(),col]
|
|
return df
|
|
|
|
def to_tuple(obj):
|
|
if isinstance(obj, (list,Series)): obj = tuple(obj)
|
|
if isinstance(obj, (int, str)) : obj = tuple([obj])
|
|
return obj
|
|
|
|
def to_colStringSQL(obj):
|
|
if isinstance(obj, (int)) : obj = str(obj)
|
|
if isinstance(obj, (list,Index)): obj = ",".join(obj) # Don't work with df.columns
|
|
return obj
|
|
|
|
def to_upper(obj):
|
|
if isinstance(obj, Series): obj = Series([o.upper() for o in list(obj)])
|
|
if isinstance(obj, tuple): obj = tuple([o.upper() for o in list(obj)])
|
|
if isinstance(obj, list) : obj = [o.upper() for o in obj]
|
|
if isinstance(obj, str) : obj = obj.upper()
|
|
return obj
|
|
|
|
def to_upperfirst(obj):
|
|
if isinstance(obj, Series): obj = Series([o.upper()[0] + o.lower()[1:] for o in list(obj)])
|
|
if isinstance(obj, tuple): obj = tuple([o.upper()[0] + o.lower()[1:] for o in list(obj)])
|
|
if isinstance(obj, list) : obj = [o.upper()[0] + o.lower()[1:] for o in obj]
|
|
if isinstance(obj, str) : obj = obj.upper()[0] + obj.lower()[1:]
|
|
return obj
|
|
|
|
def dropZ(df,geom_col='geom'):
|
|
from shapely import wkb
|
|
df = df.copy()
|
|
df.loc[df[geom_col].has_z, geom_col] = [
|
|
wkb.loads(wkb.dumps(geom, output_dimension=2)) for geom in df.loc[df[geom_col].has_z, geom_col]
|
|
]
|
|
# if all(df_sites['geom'].has_z):
|
|
# # Suppression de la dimension Z
|
|
# geom_type = df_sites['geom'].geom_type
|
|
# df_sites['geom'] = [wkb.loads(wkb.dumps(geom, output_dimension=2)) for geom in df_sites['geom']]
|
|
# df_sites.set_geometry('geom', drop=False, inplace=True, crs=crs)
|
|
return df
|
|
|
|
def remove_empty_keys(d):
|
|
for k in d.keys():
|
|
if not d[k]:
|
|
del d[k]
|
|
|
|
def _get_table(con, schema, table, ids=None, nom=None, cols=None, params_col={}, statut='actif'):
|
|
'''
|
|
Selection d'une table:
|
|
|
|
Parameters
|
|
----------
|
|
con : Connection sqlalchemy
|
|
schema : str. Nom du schema PostgreSQL.
|
|
table : str. Nom de la table PostgreSQL.
|
|
ids : liste. Identifiant de la table.
|
|
Doit contenir un champ 'id'.
|
|
nom : liste. Nom de la table.
|
|
Doit contenir un champ 'nom'.
|
|
cols : liste. Colonnes de la table à sélectionner.
|
|
params_col : Dict. Paramètre de Séléction IN.
|
|
requete : {'column': [liste]}
|
|
sql trad : 'column IN (liste)'
|
|
statut : str. 'actif', 'history', 'all'.
|
|
Statut des sites à récupérer,
|
|
'actif'; Date_fin IS NULL
|
|
'history'; A une Date_fin
|
|
'all'; Tous les Sites
|
|
'''
|
|
sql = 'SELECT * FROM {sch}.{tab}'.format(sch=schema, tab=table)
|
|
if params_col:
|
|
params_col = { k: v for k, v in params_col.items() if v }
|
|
|
|
if cols : sql = sql.replace('*', to_colStringSQL(cols) )
|
|
# Si arg (ids|nom|params_col), ajout de 'WHERE'
|
|
if ids or nom or params_col or (statut!='all' and table=='sites') : sql = sql + ' WHERE '
|
|
if ids : sql = sql + 'id IN %(ids)s'
|
|
if ids and (nom or params_col or (statut!='all' and table=='sites')) : sql = sql + ' AND '
|
|
if nom : sql = sql + 'nom IN %(nom)s'
|
|
if nom and (params_col or (statut!='all' and table=='sites')) : sql = sql + ' AND '
|
|
if statut=='actif' and table=='sites': sql = sql + ' date_fin IS NULL '
|
|
if statut=='history' and table=='sites': sql = sql + ' date_fin IS NOT NULL '
|
|
if params_col and (statut!='all' and table=='sites') : sql = sql + ' AND '
|
|
if params_col :
|
|
sql = sql + ' AND '.join([k + ' IN %({})s'.format(k) for k in params_col.keys()])
|
|
params_col = {key:to_tuple(params_col[key]) for key in params_col.keys()}
|
|
|
|
sql = sql.replace("',)","')")
|
|
df = read_sql(
|
|
sql = sql,
|
|
con = con,
|
|
params = {'ids': to_tuple(ids), 'nom': to_tuple(nom), **params_col })
|
|
if 'geom' in df.columns:
|
|
df = _set_geom(df)
|
|
return df
|
|
|
|
def _set_geom(df, hex=True):
|
|
from shapely.wkb import loads
|
|
import geopandas as gpd # set_geometry
|
|
|
|
if hex:
|
|
df['geom'] = [(loads(geom, hex=True)) for geom in df['geom']]
|
|
df = df.set_geometry('geom', crs='EPSG:2154')
|
|
|
|
return df
|
|
|
|
|
|
def _get_param(schema, param_table, type_table=None, type_court=True):
|
|
from .params import con
|
|
|
|
if type_table:
|
|
typ = _get_table(con, schema, table=type_table)
|
|
par = _get_table(con, schema, table=param_table, params_col={'id_type':typ.id.tolist()})
|
|
df = merge(par, typ, left_on='id_type', right_on='id', how='left', suffixes=(None, '_typ')) \
|
|
.drop(columns=['id_type','id_typ'])
|
|
if 'description_typ' in df.columns: del df['description_typ']
|
|
if type_court: df = df.drop(columns=['nom_typ']).rename(columns={'nom_court_typ':'type'})
|
|
else : df = df.drop(columns=['nom_court_typ'],errors='ignore').rename(columns={'nom_typ':'type'})
|
|
index = ['id']
|
|
if 'type' in df.columns:
|
|
index += ['type']
|
|
df = df.set_index(index).reset_index()
|
|
else:
|
|
df = _get_table(con, schema, table=param_table)
|
|
|
|
return df
|
|
|
|
|
|
def _get_relation_tab(schema, tab, id_site=None, nom_site=None, last_update=False,
|
|
geom=False,params_col={},milieu=None,statut='actif'):
|
|
'''
|
|
|
|
Parameters
|
|
----------
|
|
schema : str. Default : None.
|
|
Schéma de la database New_cen38.
|
|
tab : str. Default : None.
|
|
Schéma de la database New_cen38.
|
|
id_site : str,list. Default : None.
|
|
Identifiants des sites présent dans la table 'sites'.
|
|
nom_site : str,list.
|
|
Nom des sites présent dans la table 'sites'.
|
|
last_update : bool. Default : False.
|
|
If True, récupération des dernières données à jour.
|
|
If False, récupération des toutes les données.
|
|
geom : bool. Default : False.
|
|
Return les geometries des sites
|
|
params_col : dict. Default : {}.
|
|
Application des conditions de séléction des données
|
|
sous la forme d'un dictionnaire {'nomcolumn': conditions}.
|
|
milieu : str. Default : None.
|
|
Nom d'un milieu référencé dans la table `sites.type_milieu`.
|
|
Liste récupérable avec la fonction `pyzh.sites._get_typ_milieux()`
|
|
statut : str. 'actif', 'history', 'all'.
|
|
Statut des sites à récupérer,
|
|
'actif'; Date_fin IS NULL
|
|
'history'; A une Date_fin
|
|
'all'; Tous les Sites
|
|
|
|
|
|
Return
|
|
----------
|
|
df
|
|
'''
|
|
from .params import con
|
|
from .sites.sites import get_sitesGeom
|
|
# from .pers.pers import _merge_relation, _merge_author
|
|
|
|
table = 'sites'
|
|
dfSG = get_sitesGeom(columns='date', id_site=id_site, nom_site=nom_site,
|
|
last_update=last_update,params_col=params_col,milieu=milieu,statut=statut)
|
|
|
|
if not geom and not dfSG.empty:
|
|
dfSG.drop('geom',1,inplace=True)
|
|
ids = dfSG.id.tolist()
|
|
table = tab
|
|
|
|
if ids :
|
|
df = _get_table(con, schema, table, params_col={'id_geom_site':ids})
|
|
if last_update:
|
|
tmp = ['id', 'date', 'valid']
|
|
col = [*df.columns[~df.columns.isin(tmp)]]
|
|
df = df.sort_values(col).reset_index(drop=True)
|
|
df.drop_duplicates(subset=col, keep='last', inplace=True)
|
|
df.reset_index(drop=True, inplace=True)
|
|
df = df[df.valid].copy()
|
|
|
|
if 'date' in dfSG.columns and 'date' in df.columns:
|
|
dfSG.rename(columns={'date':'date_geom'}, inplace=True)
|
|
df.rename(columns={'date':'date_'+table.rsplit('_',1)[1][:5]}, inplace=True)
|
|
# if table == 'r_site_sub':
|
|
# print('DF : {}'.format(df))
|
|
# print(df.empty)
|
|
# print('DFSG : {}'.format(dfSG))
|
|
|
|
if not df.empty:
|
|
df = merge(dfSG,df, how='left', left_on='id', right_on='id_geom_site', suffixes=('_x', None)) \
|
|
.drop(['id_x','id_geom_site'],1) \
|
|
.set_index('id').reset_index()
|
|
|
|
return df
|
|
else:
|
|
print('PAS de géometries de sites sélectionnées ...')
|
|
|
|
|
|
def _get_relation_autor(df, relation_tab, schema, id_df, id_relation, id_rela_auth='id_auteur'):
|
|
from .pers.pers import _merge_relation, _merge_author
|
|
if 'site' in relation_tab:
|
|
suffixe = relation_tab.split('_')[1].split('site')[1]
|
|
suffixe = '_' + suffixe
|
|
df = _merge_relation(df=df,table=relation_tab,schema=schema,
|
|
left_id = id_df,
|
|
right_id = id_relation)
|
|
df = _merge_author(df=df, col_aut=id_rela_auth, on_index=True)
|
|
df.rename(columns={'auteur': 'auteur'+suffixe}, inplace=True)
|
|
return df
|
|
|
|
|
|
|
|
def to_geoms(geometries):
|
|
from shapely.geometry import Polygon,LineString
|
|
for geometry in geometries:
|
|
if isinstance(geometry, (Polygon,LineString)):
|
|
yield geometry
|
|
else:
|
|
yield from geometry
|
|
|
|
|
|
def union_polygons_geometry(df):
|
|
'''
|
|
Transforme un GeoDataFrame de Polygons
|
|
et/ou MultiPolygons en un MultiPolygon unique:
|
|
|
|
Parameters
|
|
----------
|
|
df : GeoDataFrame.
|
|
'''
|
|
from shapely.geometry import MultiPolygon
|
|
name_geom = df.geometry.name
|
|
|
|
poly = df.loc[df.geom_type=='Polygon',name_geom].tolist()
|
|
multipoly = df.loc[df.geom_type=='MultiPolygon',name_geom].tolist()
|
|
|
|
if poly:
|
|
mp2 = MultiPolygon(poly)
|
|
if poly and multipoly:
|
|
res = MultiPolygon(to_geoms([*mp2, *multipoly]))
|
|
elif not poly and multipoly:
|
|
res = MultiPolygon(to_geoms(multipoly))
|
|
elif not multipoly and poly:
|
|
res = MultiPolygon(poly)
|
|
|
|
return res
|
|
|
|
|
|
def union_lines_geometry(df):
|
|
from shapely.geometry import MultiLineString
|
|
name_geom = df.geometry.name
|
|
|
|
line = df.loc[df.geom_type=='LineString',name_geom].tolist()
|
|
multiline = df.loc[df.geom_type=='MultiLineString',name_geom].tolist()
|
|
|
|
if line:
|
|
mp2 = MultiLineString(line)
|
|
if line and multiline:
|
|
res = MultiLineString(to_geoms([*mp2, *multiline]))
|
|
elif not line and multiline:
|
|
res = MultiLineString(to_geoms([*multiline]))
|
|
elif not multiline and line:
|
|
res = MultiLineString(line)
|
|
|
|
return res
|
|
|
|
|
|
def calc_recouvrmt(df1,df2):
|
|
'''
|
|
Calcule le recouvrement de df2 sur df1
|
|
pour chaque géométrie de df1:
|
|
|
|
Parameters
|
|
----------
|
|
df1 : GeoDataFrame.
|
|
df2 : GeoDataFrame.
|
|
'''
|
|
from geopandas import sjoin
|
|
tmp = sjoin(
|
|
df1,
|
|
df2[['geom']],
|
|
op = 'intersects',
|
|
how = 'left')
|
|
tmp.dropna(subset=['index_right'],inplace=True)
|
|
tmp.index_right = tmp.index_right.astype(int)
|
|
tmp.reset_index(inplace=True)
|
|
tmp = tmp.join(
|
|
df2[['geom']].rename(columns={'geom': 'right_geom'}),
|
|
on=['index_right'], how='left')
|
|
tmp2 = tmp[['index_right','right_geom']].copy() \
|
|
.rename(columns={'right_geom': 'geom'}) \
|
|
.set_geometry('geom')
|
|
tmp1 = tmp[['id_site','geom']].copy() \
|
|
.set_geometry('geom')
|
|
|
|
if not tmp1.geom.values.is_valid.all():
|
|
tmp1.loc[~tmp1.geom.values.is_valid,'geom'] = tmp1.loc[~tmp1.geom.values.is_valid,'geom'].buffer(0)
|
|
if not tmp2.geom.values.is_valid.all():
|
|
tmp2.loc[~tmp2.geom.values.is_valid,'geom'] = tmp2.loc[~tmp2.geom.values.is_valid,'geom'].buffer(0)
|
|
|
|
tmp['perc_rcvmt'] = (tmp1.intersection(tmp2).area/tmp1.area)*100
|
|
tmp = tmp.groupby(['id_site']).sum().reset_index()
|
|
df1 = df1.merge(tmp[['id_site','perc_rcvmt']], on=['id_site'], how='left')
|
|
df1.perc_rcvmt.fillna(0, inplace=True)
|
|
df1.perc_rcvmt = df1.perc_rcvmt.round(2)
|
|
return df1
|
|
|
|
|
|
def Polygons_to_MultiPolygon(df):
|
|
from shapely.geometry import MultiPolygon
|
|
from pandas import concat
|
|
df = df.copy()
|
|
multi = df.loc[df.geom_type=='MultiPolygon'].copy()
|
|
poly = df.loc[df.geom_type=='Polygon'].copy()
|
|
poly['geom'] = [MultiPolygon([geom]) for geom in df.loc[df.geom_type=='Polygon','geom'] ]
|
|
df = concat([multi,poly])
|
|
df.sort_index(inplace=True)
|
|
return df |