#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import geopandas as gpd from geoalchemy2 import Geometry def _calc_recouvrmt(df1,df2,how='inner'): ''' Calcule le recouvrement de df2 sur df1 pour chaque géométrie de df1: Parameters ---------- df1 : GeoDataFrame. df2 : GeoDataFrame. ''' iddf1 = df1.columns[0] iddf2 = df2.columns[0] # Jointure spaciale tmp = gpd.sjoin( df1, df2[['geom']], predicate = 'intersects', how = how) tmp.dropna(subset=['index_right'],inplace=True) tmp.index_right = tmp.index_right.astype(int) tmp.reset_index(inplace=True) tmp = tmp.join( df2[['geom',iddf2]].rename(columns={'geom': 'right_geom'}), on=['index_right'], how='left') tmp2 = tmp[['index_right','right_geom',iddf2]].copy() \ .rename(columns={'right_geom': 'geom'}) \ .set_geometry('geom') tmp1 = tmp[[iddf1,'geom']].copy() \ .set_geometry('geom') if not tmp1.geom.values.is_valid.all(): tmp1.loc[~tmp1.geom.values.is_valid,'geom'] = tmp1.loc[~tmp1.geom.values.is_valid,'geom'].buffer(0) if not tmp2.geom.values.is_valid.all(): tmp2.loc[~tmp2.geom.values.is_valid,'geom'] = tmp2.loc[~tmp2.geom.values.is_valid,'geom'].buffer(0) tmp['perc_rcvmt'] = (tmp1.intersection(tmp2).area/tmp1.area)*100 return tmp[[iddf1,iddf2,'perc_rcvmt']] def _cor_zh_area(con,tzh_code,typ,cover=False): """ @tzh : pd.Serie. Série de valeurs correspondants à la colonne pr_zh.t_zh."code". @typ : str. COM, DEP, ref_geo """ from math import ceil table = 'cor_zh_area' sqltzh = """ SELECT zh.id_zh, zh.geom FROM pr_zh.t_zh zh WHERE zh."code" in {tzh_code} """.format(tzh_code=tuple(tzh_code)) tzh = gpd.read_postgis(sqltzh,con,crs=4326) if tzh.crs.srs=='epsg:4326': tzh.to_crs(2154,inplace=True) if typ == 'COM': cd2 = ' AND l."enable";' else: cd2 = ';' # sql = ''' # SELECT l.id_area,l.geom,zh.id_zh # FROM ref_geo.l_areas l # JOIN ref_geo.bib_areas_types bib USING (id_type), # pr_zh.t_zh zh # WHERE zh."code" in {tzh_code} # AND bib.type_code='{typ}' # AND ST_INTERSECTS( ST_SetSRID(l.geom,2154), ST_MakeValid(ST_Transform(ST_SetSRID(zh.geom,4326),2154)) ) # AND (l.id_area,zh.id_zh) NOT IN (SELECT id_area,id_zh FROM pr_zh.cor_zh_area) # {cd2} # '''.format(tzh_code=tuple(tzh_code),typ=typ,cd2=cd2) # # larea = gpd.pd.read_sql_query(sql,con) # larea = gpd.read_postgis(sql,con) sqllarea = """ SELECT l.id_area, l.geom FROM ref_geo.l_areas l JOIN ref_geo.bib_areas_types bib USING (id_type) WHERE bib.type_code='{typ}' and l."enable" """.format(typ=typ) larea = gpd.read_postgis(sqllarea,con,crs=2154) df = _calc_recouvrmt(larea,tzh).rename(columns={'perc_rcvmt':'cover'}) if cover: df['cover'] = [ceil(x) for x in df.cover] else : df.drop(columns=['cover'],inplace=True) if not df.empty: df.to_sql( name=table, con=con, schema='pr_zh', if_exists='append', index=False ) print('INSERT %i correspondances'%df.shape[0]) else: sql = ''' SELECT l.id_area FROM ref_geo.l_areas l JOIN ref_geo.bib_areas_types bib USING (id_type) WHERE bib.type_code='{typ}' '''.format(typ=typ) res = gpd.pd.read_sql_query(sql,con) if not res.empty: print('AUCUNE nouvelles correspondances identifiées') else : print('AUCUNE geometrie dans la table `ref_geo.l_areas` pour le `type_code` %s'%typ) def _cor_zh_(con,tzh_code,typ): """ @tzh : pd.Serie. Série de valeurs correspondants à la colonne pr_zh.t_zh."code". @typ : str. [hydro,rb] """ typ = typ.lower() table = 'cor_zh_%s'%typ tab_typ = 't_hydro_area' if typ == 'hydro' else 't_river_basin' id_typ = 'id_hydro' if typ == 'hydro' else 'id_rb' sql = ''' SELECT h.{id_typ},zh.id_zh FROM pr_zh.{tab_typ} h, pr_zh.t_zh zh WHERE zh."code" in {tzh_code} AND ST_INTERSECTS( ST_SetSRID(h.geom,4326),ST_MakeValid(ST_SetSRID(zh.geom,4326))) AND (h.{id_typ},zh.id_zh) NOT IN (SELECT {id_typ},id_zh FROM pr_zh.{tab_to}) ;'''.format( tzh_code = tuple(tzh_code), id_typ = id_typ, tab_typ = tab_typ, tab_to = table) df = gpd.pd.read_sql_query(sql,con) if not df.empty: df.to_sql( name=table, con=con, schema='pr_zh', if_exists='append', index=False ) print('INSERT %i correspondances'%df.shape[0]) else: print('AUCUNE nouvelles correspondances identifiées') def get_id_t_zh(con,code=None): """@code : str, list, Serie, Index. Code à 12 characters maximum de la zone humide. """ sql = "SELECT id_zh,code FROM pr_zh.t_zh" # "SELECT id_zh,zh_uuid,code FROM pr_zh.t_zh" if isinstance(code,str): sql += " WHERE code='%s'"%code elif isinstance(code,list) or isinstance(code,gpd.pd.Series) or isinstance(code,gpd.pd.Index): sql += " WHERE code IN {lst_code}".format(lst_code=str(tuple(code))) return gpd.pd.read_sql_query(sql,con) def to_tzh(df,con): table = 't_zh' if isinstance(df,gpd.GeoDataFrame): epsg = df.crs.srs.split(':')[1] df = df.to_wkt() df.to_sql( name=table, con=con, schema='pr_zh', if_exists='append', index=False, dtype={ 'geom': Geometry(srid=epsg) # 'id_lim_list':uuid.SafeUUID } ) if isinstance(df,gpd.pd.DataFrame): df.geom = gpd.GeoSeries.from_wkt(df.geom) df = df.set_geometry('geom',crs=epsg) _cor_zh_area(con,tzh_code=df.code,typ='ZNIEFF1') print('INSERT cor_zh_area ZNIEFF1 OK !') _cor_zh_area(con,tzh_code=df.code,typ='ZNIEFF2') print('INSERT cor_zh_area ZNIEFF2 OK !') _cor_zh_area(con,tzh_code=df.code,typ='DEP') print('INSERT cor_zh_area DEP OK !') _cor_zh_area(con,tzh_code=df.code,typ='COM',cover=True) print('INSERT cor_zh_area COM OK !') _cor_zh_area(con,tzh_code=df.code,typ='ZPS') print('INSERT cor_zh_area ZPS OK !') _cor_zh_area(con,tzh_code=df.code,typ='SIC') print('INSERT cor_zh_area SIC OK !') _cor_zh_area(con,tzh_code=df.code,typ='SRAM') print('INSERT cor_zh_area SRAM OK !') _cor_zh_area(con,tzh_code=df.code,typ='ZICO') print('INSERT cor_zh_area ZICO OK !') _cor_zh_(con,tzh_code=df.code,typ='hydro') print('INSERT cor_zh_hydro OK !') _cor_zh_(con,tzh_code=df.code,typ='rb') print('INSERT cor_zh_rb OK !') def to_cor_(con,df,table): df.to_sql( name=table,con=con,schema='pr_zh',if_exists='append',index=False ) def to_t_(con,df,table): res = gpd.pd.merge(df,get_id_t_zh(con,df.SITE_COD),left_on='SITE_COD',right_on='code')\ .drop(columns=['SITE_COD','code']) res.to_sql( name=table,con=con,schema='pr_zh',if_exists='append',index=False ) def add_bib_cb(df,con,humidity=None,is_ch=None): """ @df : pd.Dataframe. Tableau à 3 colonnes |lb_code|humidity|is_ch|. @con : engine. Connexion PostgreSQL à la base de données cible. @humidity : str. Caractère d'humidité a attribuer par défault à chaque lb_code du tableau où la colonne est Null. Défaut : None (désactivé) Liste : P (Potentiellement), H (Humide). @is_ch : bool. Caractère de présence/absence de carto d'habitat à attribuer par défault à chaque lb_code du tableau où la colonne est Null. Défaut : None (désactivé) Liste : True (existe), False (n'existe pas). """ table = 'bib_cb' if humidity is not None: df.loc[df.humidity.isna(),'humidity'] = humidity.upper() if is_ch is not None: df.loc[df.is_ch.isna(),'is_ch'] = is_ch bib_cb = gpd.pd.read_sql_table(table,con,'pr_zh') df = df[~df.lb_code.isin(bib_cb.lb_code)] if df.empty: print('NO news data to insert !') else: df.to_sql( name=table,con=con,schema='pr_zh',if_exists='append',index=False ) print('INSERT %s news rows !'%df.shape[0]) if __name__ == '__main__': from ..MEDWET2Geonat import get_t_zh zh,_cor_lim_list = get_t_zh()