Python_scripts/tmp_save/pgszh_SudGres.py

2374 lines
78 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# from binascii import Incomplete
from lib2to3.pgen2 import driver
from warnings import filterwarnings
import geopandas as gpd
# from shapely import geometry
from shapely.geometry import MultiPolygon, MultiLineString, MultiPoint, Polygon, LineString #, collection
from shapely import ops
import pycen
filterwarnings("ignore",category=RuntimeWarning)
zh = pycen.zh()
path0 = '/home/colas/Documents/9_PROJETS/3_PGZH/'
PATH_OUT = path0 + 'RESULTATS/Résultats_etude_PGSZH_SudGresivaudan.xlsx'
Path = path0 + 'SIG/'
Path_tmp = Path + 'tmp/'
p_bio_eco = 'biologie_ecologie/'
p_hydro = 'hydraulique_hydrologique/'
p_phybio = 'physique_biochimique/'
p_mltifct = 'multi_fonctions/IGN - BD Alti 25M/'
p_press = 'pressions/'
P_expert = path0 + 'DIRE_EXPERT/'
# Couche des fonctions biologiques et écologiques
c_znieff = 'PGZSH_znieff1.gpkg'
c_zico = 'PGZSH_zico.gpkg'
c_redi = 'PGZSH_axefauneredi.gpkg'
# Couche des fonctions hydrauliques et hydrologiques
c_reso_hydro = 'PGSZH_SG_cours eau_DREAL.gpkg'
c_alea_inond = 'alea_inondation/utsg_gpu.gpkg'
c_ebf_crseau = 'EBF_cours_eau.shp'
c_connex_molasse = 'ZH_CONNECT_ESO.shp'
c_idpr = 'BRGM_IDPR/IDPR_2154_CLIP.tif'
c_idpr2 = 'BRGM_IDPR/IDPR_2154_CLIP.gpkg'
c_piezo = 'Piezo_SAGE BDPV/carte_piézo_HE_2021.shp'
c_piezo_interp = 'Piezo_SAGE BDPV/piezo_interpoler.tif'
# Couche des fonctions physiques et biochimiques
c_artif = 'AERMC-Grésivaudan/pressions_vecteur/PRESSIONS_ARTIFICIALISATION_2020.shp'
c_smvic_PPR1 = 'Captage/smvic_PPR1_SMVI.shp'
c_smvic_PPR2 = 'Captage/smvic_PPR2_SMVI.shp'
c_smvic_PPi = 'Captage/smvic_PPi_SMVI.shp'
c_smvic_PPe = 'Captage/smvic_PPe_SMVI.shp'
c_rpg = 'RPG/RPG_2017.shp'
c_zse = 'ZSE.shp'
c_zsea = 'ZSNEA.shp'
c_occupsol = 'PGSZH_oscom.gpkg'
# Couche des critères « multi-fonctions »
c_alti = 'BDALTIV2_25M_FXX_'
c_mnt = 'BDALTIV2_25M.tif'
# Couche des pressions
c_rhomeo = 'RESULTATS/RHOMEO/sig_indicateurs_2021_simby_202110081733.csv'
# c_artif = 'AERMC-Grésivaudan/pressions_vecteur/PRESSIONS_ARTIFICIALISATION_2020.shp'
c_captag = 'AERMC-Grésivaudan/Captage_tout_usage_Sud_Gresivaudan_2019_AERMC.csv'
c_agric = 'AERMC-Grésivaudan/pressions_vecteur/PRESSIONS_AGRICOLES_2019.shp'
c_urba_plu1 = 'SYMBHI/PLU/Zonage urba Cras/38137_ZONE_URBA_20210930.shp'
c_urba_plu2 = 'SYMBHI/PLU/Zonage urba NDO/38278_ZONE_URBA_20210930.shp'
c_urba_plu3 = 'SYMBHI/PLU/Zonage urba SMVIC/sirap_std_cnig_plu.v_geo_zone_urba.shp'
c_urba_scot = 'SYMBHI/SCOT/PGSZH_ref_scot_esp_pot_dev.shp'
c_iClass = 'InstallationsClassees_France.shp'
c_Purb = 'tache_urbaine.shp'
c_lign_confliredi = 'PGSZH_lignes_conflits_REDI.gpkg'
c_poin_confliredi = 'PGSZH_points_conflits_REDI.gpkg'
c_ouvrag = 'SYMBHI/ouvrage.TAB'
c_depot = 'SYMBHI/PlagesdeDepot.TAB'
c_invas = 'SYMBHI/TerrainInvasives2020.tab'
c_fallo = 'SYMBHI/fallopia.TAB'
c_cd38_eee = 'CD38/PGSZH_EEE_CDIsere.gpkg'
# c_vulner = 'SAGE/VULNERABILITE.shp'
# Couche des dire d'expert
c_expert = 'PGSZH_dire_expert_compilation_17032022.csv'
class check_dtypeInList(list):
def __contains__(self, typ):
return any(isinstance(val, typ) for val in self)
# get_flux
def open_gpkg(Path, layer=None,bbox=None):
'''
Ouverture des couches Shapefile et Geopackages et
mise au formt du script:
Parameters
----------
Path : str. Chemain/fichier.
layer : str. Si Geopackage, nom de la couche dans le
cas où il y en a plusieurs.
'''
df = gpd.read_file(Path,layer=layer, bbox=bbox)
if df.geometry.name != 'geom':
df.rename_geometry('geom',inplace=True)
if 'Lambert' in df.crs.name \
and '93' in df.crs.name \
and df.crs.srs.upper() != 'EPSG:2154':
print('Projection : %s - %s'%(df.crs.name,df.crs.srs) )
print('Modification de la projection ...')
df.to_crs(epsg=2154,inplace=True)
if df.crs.srs.upper() != 'EPSG:2154':
print('Projection : %s - %s'%(df.crs.name,df.crs.srs) )
print('Modification de la projection ...')
df.to_crs(epsg=2154,inplace=True)
return df
def to_geoms(geometries):
for geometry in geometries:
if isinstance(geometry, (Polygon,LineString)):
yield geometry
else:
yield from geometry
def _union_polygons_geometry(df):
'''
Transforme un GeoDataFrame de Polygons
et/ou MultiPolygons en un MultiPolygon unique:
Parameters
----------
df : GeoDataFrame.
'''
df = df.copy()
name_geom = df.geometry.name
# poly = df[df.geom_type=='Polygon'][name_geom]
poly = df.loc[df.geom_type=='Polygon',name_geom]
multipoly = df.loc[df.geom_type=='MultiPolygon',name_geom]
poly = [*poly]
multipoly = [*multipoly]
if poly:
mp2 = MultiPolygon(poly)
if poly and multipoly:
res = MultiPolygon(to_geoms([*mp2, *multipoly]))
elif not poly and multipoly:
res = MultiPolygon(to_geoms(multipoly))
elif not multipoly and poly:
res = MultiPolygon(poly)
return res
def _union_lines_geometry(df):
name_geom = df.geometry.name
line = df.loc[df.geom_type=='LineString',name_geom].tolist()
multiline = df.loc[df.geom_type=='MultiLineString',name_geom].tolist()
if line:
mp2 = MultiLineString(line)
if line and multiline:
res = MultiLineString(to_geoms([*mp2, *multiline]))
elif not line and multiline:
res = MultiLineString(to_geoms([*multiline]))
elif not multiline and line:
res = MultiLineString(line)
return res
def _calc_recouvrmt(df1,df2):
'''
Calcule le recouvrement de df2 sur df1
pour chaque géométrie de df1:
Parameters
----------
df1 : GeoDataFrame.
df2 : GeoDataFrame.
'''
tmp = gpd.sjoin(
df1,
df2[['geom']],
op = 'intersects',
how = 'left')
tmp.dropna(subset=['index_right'],inplace=True)
tmp.index_right = tmp.index_right.astype(int)
tmp.reset_index(inplace=True)
tmp = tmp.join(
df2[['geom']].rename(columns={'geom': 'right_geom'}),
on=['index_right'], how='left')
tmp2 = tmp[['index_right','right_geom']].copy() \
.rename(columns={'right_geom': 'geom'}) \
.set_geometry('geom')
tmp1 = tmp[['id_site','geom']].copy() \
.set_geometry('geom')
if not tmp1.geom.values.is_valid.all():
tmp1.loc[~tmp1.geom.values.is_valid,'geom'] = tmp1.loc[~tmp1.geom.values.is_valid,'geom'].buffer(0)
if not tmp2.geom.values.is_valid.all():
tmp2.loc[~tmp2.geom.values.is_valid,'geom'] = tmp2.loc[~tmp2.geom.values.is_valid,'geom'].buffer(0)
tmp['perc_rcvmt'] = (tmp1.intersection(tmp2).area/tmp1.area)*100
tmp = tmp.groupby(['id_site']).sum().reset_index()
df1 = df1.merge(tmp[['id_site','perc_rcvmt']], on=['id_site'], how='left')
df1.perc_rcvmt.fillna(0, inplace=True)
df1.perc_rcvmt = df1.perc_rcvmt.round(2)
return df1
def jenks(data,col,labels):
import jenkspy
data = data.copy()
c = col
tmp = data[c].unique()
tmp = gpd.pd.DataFrame({'val':tmp})
ddf = gpd.pd.DataFrame()
# Si str in labels
labs_copy = None
if str in check_dtypeInList(labels):
labs_copy = labels.copy()
labels = range(len(labels))
labels = list(labels)
tmp['jenks'] = gpd.pd.cut(tmp['val'],
bins=jenkspy.jenks_breaks(tmp['val'], nb_class=len(labels)),
labels=labels,
include_lowest=True)
ddf[c] = data[c].copy()
ddf[c] = ddf[c].replace([*tmp.val],[*tmp.jenks])
ddf[c+'1'] = data[c]
# ddf[c+'1'] = ddf[c+'1'].replace([*tmp.val],[*tmp.jenks])
# ddf[c] = ddf[c].astype(float)
if labs_copy:
ddf[c] = ddf[c].replace([*labels],[*labs_copy])
# ddf[c+'1'] = ddf[c+'1'].replace([*labels],[*labs_copy])
return ddf[c]
class fct_bio_eco:
'''
Calcule l'indice et les sous-indices des fonctions biologiques
et écologiques des zones humides:
Liste des fonctions
----------
znieff_1 : Calcul la présence/absence
des zones humides sur des ZNIEFF1 par intersection.
redi : Calcul la capacité de déplacement de la faune
du réseau écologique départemental de l'Isère (REDI)
par intersection. Utilisation d'une couche SIG REDI
(PolyLignes) avec un buffer de 100m.
fct_bio : Dans le cas où un zone humide n'intersecte pas
une ZNIEFF1, attribution d'un poids à chaque zone humide
pour ses fonctions Biologiques et Ecologiques.
Calcul dépendant du nombre de fonctions recensées en BDD.
fct_hab : Attribution d'un poids à chaque zone humide
en fonction des types d'habitat présents sur site.
fct_esppatri : Attribution d'un poids à chaque zone humide
en fonction des espèces protégées présentes sur site.
bilan : Somme des sous-indices des fonctions biologiques
et écologiques des zones humides.
'''
def znieff_1(df):
'''
Calcul la présence/absence des zones
humides sur des ZNIEFF 1 par intersection:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_znief' : Présence = 2 / Absence = 0
'''
print('INIT : Localisation de la zone en territoire ZNIEFF 1 ...')
df = df.copy()
data = open_gpkg(Path+p_bio_eco+c_znieff,bbox=df)
geom = _union_polygons_geometry(data)
df['ssind_znief'] = df.geom.intersects(geom) \
.astype(int) \
.replace(1,2)
return df
def redi(df):
'''
Calcul la capacité de déplacement de la faune
dans le réseau écologique départemental de l'Isère (REDI)
par intersection. Utilisation d'une couche SIG REDI
(PolyLignes) avec un buffer de 100m:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_redi' : Présence = 2 / Absence = 0
'''
print('INIT : Axe de déplacement de la faune du REDI ...')
df = df.copy()
data = open_gpkg(Path+p_bio_eco+c_redi,bbox=df)
data.geometry = data.geometry.map(ops.linemerge)
geom = _union_lines_geometry(data).buffer(100)
df['ssind_redi'] = df.geom.intersects(geom) \
.astype(int) \
.replace(1,2)
return df
def fct_bio(df):
'''
Dans le cas où un zone humide n'intersecte pas
une ZNIEFF 1 (ssind_znief = 0), attribution d'un poids
à chaque zone humide pour ses fonctions Biologiques et
Ecologiques. Calcul dépendant du nombre de fonctions
recensées en BDD:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_znief' : Présence = 2 / Absence = 0
'ssind_fctbio' : si (ssind_znief = 2) = 0 /
1 fonction = 1 / multiple fonctions = 2 /
'''
print('INIT : biologiques et écologiques ...')
df = df.copy()
data = zh.get_fct(id_site=df.id_site.tolist())
data = data[(data.type == 'fct_bio') & (data.nom_fct != 'non documenté')]
df['ssind_fctbio'] = df.apply(
lambda x: data[data.id_site == x['id_site']].shape[0],
axis=1)
if 'ssind_znief' not in df.columns:
df = fct_bio_eco.znieff_1(df)
df.loc[df.ssind_znief==2, 'ssind_fctbio'] = 0
# df.loc[(df.ssind_znief==0) & (df.ssind_fctbio==1), 'ssind_fctbio'] = 1
df.loc[(df.ssind_znief==0) & (df.ssind_fctbio > 1), 'ssind_fctbio'] = 2
return df
def fct_hab(df):
'''
Attribution d'un poids à chaque zone humide
en fonction des types d'habitat et des espèces protégées
présents sur site:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_hab' :
Habitat 'prioritaire|très rare' = 2
'''
print('INIT : Habitats (prioritaire|très rare) ...')
df = df.copy()
data = zh.get_fct(id_site=df.id_site.tolist())
data = data[(data.type == 'int_patri') & (data.nom_fct != 'non documenté')]
# 2 pt si habitat prioritaire
lst_termep = 'prioritaire|communautaire|DH|très rare'
lst_termeC = 'communautaire'
lst_termeP = 'prioritaire|très rare'
datap = data[data.nom_fct == 'habitats']
lst_siteh = datap.loc[datap.description.str.contains(lst_termep), 'id_site']
lst_sitehC = datap.loc[datap.description.str.contains(lst_termeC), 'id_site']
lst_sitehP = datap.loc[datap.description.str.contains(lst_termeP), 'id_site']
df['ssind_hab'] = 0
df.loc[df.id_site.isin(lst_sitehP),'ssind_hab'] = 2
return df
def fct_esppatri(df):
'''
Attribution d'un poids à chaque zone humide
en fonction des espèces protégées présentes
sur site:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_esppatri' :
Faune/Flore nb(1 à 2) 'DH|PR|DO|LR|PN|LRR|LRN|LRM|LRF|LRD|LRE' = 0.5
Faune/Flore nb(< 3) 'DH|PR|DO|LR|PN|LRR|LRN|LRM|LRF|LRD|LRE' = 1
'''
print('INIT : Faune - Flore (PN PR P38) ...')
df = df.copy()
data = zh.get_fct(id_site=df.id_site.tolist())
data = data[(data.type == 'int_patri') & (data.nom_fct != 'non documenté')]
# 1 pt si liste terme
lst_terme = 'DH|PR|DO|LR|PN|LRR|LRN|LRM|LRF|LRD|LRE'
datat = data[data.nom_fct != 'habitats'].copy()
datat.quantite = datat.quantite.astype(float)
lst_sitet = datat.loc[datat.description.str.contains(lst_terme), 'id_site']
lst_site1 = datat.loc[(datat.id_site.isin(lst_sitet))&(datat.quantite < 3), 'id_site']
lst_site2 = datat.loc[(datat.id_site.isin(lst_sitet))&(datat.quantite >= 3),'id_site']
lst_site3 = datat.loc[
(datat.id_site.isin(lst_sitet)) &
(datat.quantite == 0) &
(datat.description.str.contains('nombreuses|plusieurs')),'id_site']
df['ssind_esppatri'] = 0
df.loc[df.id_site.isin(lst_site1),'ssind_esppatri'] = 0.5
df.loc[df.id_site.isin(lst_site2),'ssind_esppatri'] = 1
df.loc[df.id_site.isin(lst_site3),'ssind_esppatri'] = 1
return df
def bilan(df):
'''
Somme des sous-indices des fonctions biologiques
et écologiques des zones humides:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_bioeco' :
sum(
ssind_znief + ssind_redi +
ssind_fctbio + ssind_hab + ssind_esppatri
)
'''
df = fct_bio_eco.znieff_1(df)
df = fct_bio_eco.redi(df)
df = fct_bio_eco.fct_bio(df)
df = fct_bio_eco.fct_hab(df)
df = fct_bio_eco.fct_esppatri(df)
ssind = df.columns[df.columns.str.contains('ssind_')]
df['ind_bioeco'] = df[
['ssind_znief','ssind_redi','ssind_fctbio','ssind_hab','ssind_esppatri']
# ssind
].sum(axis=1)
print(df['ind_bioeco'].dtype)
df.name = 'Fct_bio_eco'
return df
class fct_hyd:
'''
Calcule l'indice et les sous-indices des fonctions hydrauliques
et hydrologiques des zones humides:
Liste des fonctions
----------
zone_inond : Calcul la présence/absence des zones humides
sur des zones inondables par intersection.
eabf : Calcul de l'espace alluvial de bon fonctionnement (EABF)
ou de fond de vallée par intersection.
dist_reso_hydro : Si la zone humide ne possède pas d'espace
alluviale de bon fonctionnement d'après la fonction "eabf()",
calcul la distance au réseau hydrographique linéaire
(le plus proche). Attribution d'un poids en fonction de
la distance. Si la zone ne possède pas d'eabf et ne semble
pas à proximité d'un réseau hydrique, recherche de la
présence d'un cours d'eau dans la base de données zones humes.
reghydro_out: Pour chaque zone humide, en cas de distance
au réseau hydrographique linéaire > 50 et d'absence
d'espace alluviale de bon fonctionnement, recherche
dans la base de données des zones humides si une sortie
d'eau "Cours d'eau"est définie. Attribution d'un poids
en fonction.
connex_molasse : Attribution d'un poids à chaque zone humide
en fonction de sa connexion avérée à la molasse ou non.
idpr : Calcul de l'Indice de Développement et
de Persistance des Réseaux. Calcul réalisé dans le cas où
connex_molasse = 0 .
fct_hydro : Attribution d'un poids à chaque zone humide
en fonction du nombre de rôles hydro-biologiques à caractères
hydrauliques et hydrologiques qu'elle remplie.
zse_zsnea : Attribution d'un poids à chaque zone humide
en fonction de sont appartenance à une zone de sauvegarde
exploitée actuelle (zse) ou future (zsnea).
bilan : Somme des sous-indices des fonctions hydrauliques
et hydrologiques des zones humides.
'''
def zone_inond(df):
'''
Calcul la présence/absence des zones humides
sur des zones inondables par intersection:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_zoneinond' : None
'''
print('INIT : Zone inondable ...')
df = df.copy()
data = open_gpkg(
Path+p_hydro+c_alea_inond,bbox=df,
layer='prescription_surf')
data = data[data['PGSZH_alea_inondation']==1]
tmp = gpd.sjoin(df,data[['geom']],op='intersects', how='left')
lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
df['ssind_zoneinond'] = None
df.loc[df.id_site.isin(lst),'ssind_zoneinond'] = 1
return df
def eabf(df):
'''
Si la zone humide n'est pas en zone inondable d'après
la fonction "zone_inond()", calcul de l'espace alluvial de bon
fonctionnement (EABF) ou de fond de vallée par intersection:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_eabf' : None
'''
print('INIT : Espace alluvial de bon fonctionnement (EABF) ou fond de vallée ...')
df = df.copy()
data = open_gpkg(Path+p_hydro+c_ebf_crseau,bbox=df)
tmp = gpd.sjoin(df,data[['geom']],op='intersects', how='left')
lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
df['ssind_eabf'] = 0
df.loc[df.id_site.isin(lst),'ssind_eabf'] = 1
return df
def dist_reso_hydro(df):
'''
Si la zone humide ne possède pas d'espace alluviale
de bon fonctionnement d'après la fonction "eabf()",
calcul de la distance au réseau hydrographique
linéaire (le plus proche).Attribution d'un poids
en fonction de la distance.
Si la zone ne possède pas d'eabf et ne semble pas
à proximité d'un réseau hydrique, recherche de la
présence d'un cours d'eau dans la base de données
zones humes:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_distHydro' :
Si > 50m = 0
Si ]10 m 50 m] = 0.5
Si <= 10m = 1
'''
print('INIT : Distance au réseau hydrographique linéaire (le plus proche) ...')
df = df.copy()
# data = open_gpkg(Path+p_hydro+c_reso_hydro,bbox=df)
data = pycen.ref_hydro().get_troncon()
if 'ssind_eabf' not in df.columns:
df = fct_hyd.eabf(df)
# if 'MultiLineString' in data.geom_type:
# data.loc[data.geom_type=='MultiLineString'] = data.loc[data.geom_type=='MultiLineString'].geometry.map(ops.linemerge)
# if True in data.has_z.unique():
# import shapely.wkb
# data.loc[data.has_z,'geom']
# data.geom = [shapely.wkb.loads(shapely.wkb.dumps(g, output_dimension=2)) for g in data.geom]
df10 = df[['id_site','geom']].copy()
df50 = df[['id_site','geom']].copy()
df10.geom = df10.buffer(10)
df50.geom = df50.buffer(50)
df10 = gpd.sjoin(df10,data[['geom']],op='intersects', how='left')
df50 = gpd.sjoin(df50,data[['geom']],op='intersects', how='left')
lst10 = df10.loc[~df10.index_right.isna(),'id_site'].tolist()
lst50 = df50.loc[~df50.index_right.isna(),'id_site'].tolist()
df['ssind_distHydro'] = 0
df.loc[(df.ssind_eabf==0)&(df.id_site.isin(lst50)),'ssind_distHydro'] = 0.5
df.loc[(df.ssind_eabf==0)&(df.id_site.isin(lst10)),'ssind_distHydro'] = 1
# df.loc[df.dist_min <= 10, 'ssind_distHydro'] = 2
# Si 0, check entree/sortie regime hydro.
# Si cours d'eau ou eaux de crues ==> 1
# union = data.geometry.unary_union
# df['buff10'] = df.buffer(10).intersects(union).astype(int)
# df['buff50'] = df.buffer(50).intersects(union).astype(int)
# df['ssind_distHydro'] = None
# df.loc[df.buff50 == 0, 'ssind_distHydro'] = 0
# df.loc[df.buff50 == 1, 'ssind_distHydro'] = 1
# df.loc[df.buff10 == 1, 'ssind_distHydro'] = 2
# df.drop(columns=['buff10', 'buff50'], inplace=True)
return df
def reghydro_out(df):
'''
Pour chaque zone humide, en cas de distance
au réseau hydrographique linéaire > 50 et d'absence
d'espace alluviale de bon fonctionnement, recherche
dans la base de données des zones humides si une sortie
d'eau "Cours d'eau"est définie. Attribution d'un poids
en fonction:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_hydrout' :
Si ssind_distHydro = 0
& ssind_eabf = 0
& regime_hydri_out = "Cours d'eau" :
'ssind_distHydro' = 1
'''
df = df.copy()
if 'ssind_distHydro' not in df.columns:
df = fct_hyd.dist_reso_hydro(df)
df['ssind_hydrout'] = 0
if not df.loc[df.ssind_distHydro == 0].empty :
lst_zh = df.loc[df.ssind_distHydro == 0].id_site.tolist()
tmp = zh.get_regHydro(id_site=lst_zh)
# tmp = tmp.loc[tmp.regime_hydri.isin(["Cours d'eau", "Eaux de crues"])]
tmp = tmp.loc[(tmp.in_out=='sortie')&(tmp.regime_hydri=="Cours d'eau")]
# in_out ???????????
# permanance ???????
lsttmp = tmp.id_site
df.loc[(df.ssind_eabf==0)&(df.id_site.isin(lsttmp)),'ssind_hydrout'] = 1
return df
def connex_molasse(df):
'''
Attribution d'un poids à chaque zone humide
en fonction de sa connexion avérée
à la molasse ou non :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_molasse' :
(intersects == False) = 0
(intersects == True) = 1
'''
print('INIT : Connexion à la molasse ...')
df = df.copy()
data = open_gpkg(Path+p_hydro+c_connex_molasse,bbox=df)
tmp = gpd.sjoin(df,data[['geom']],op='intersects', how='left')
tmp = tmp[~tmp.index_right.isna()]
df['ssind_molasse'] = 0
df.loc[df.id_site.isin(tmp.id_site),'ssind_molasse'] = 1
return df
def piezo(df):
import rasterio as rio
from os import system
print('INIT : Cross piézométrie ...')
df = df.copy()
# Polygoniser (raster vers vecteur)
# """gdal_polygonize.py
# "/home/colas/Documents/9_PROJETS/3_PGZH/SIG/multi_fonctions/IGN - BD Alti 25M/BDALTIV2_25M.tif"
# -b 1 -f "GPKG"
# /tmp/processing_sSlfcG/96533f9ad23e4c10992caa807da01bf1/OUTPUT.gpkg
# OUTPUT alti
# """
mnt_in = ''
piezo_in = 'my_interpolate_piezo.tif'
piezo_out = 'piezo.tif'
gpkg_out = 'out.gpkg'
mnt_out = 'mnt.tif'
piezoVSmnt = 'piezoVSmnt_out.tif'
poly_connect_nape = 'poly_connect_nape.gpkg'
# Découper un raster selon une couche de masque
# Découpage du MNT par les polygones d'étude
op = '''
gdalwarp -overwrite -s_srs EPSG:2154 -t_srs EPSG:2154 -co FORMAT=GPKG -of GTiff -tr 25.0 -25.0 -tap -cutline \
"PG:dbname='azalee' host=91.134.194.221 port=5432 sslmode=disable user='cgeier' password='adm1n*bdCen'" \
-csql "SELECT site_code, geom FROM zones_humides.v_zoneshumides WHERE site_code in ('{liste_site}')" \
"{mnt}" {out}
'''.format(vector = Path_tmp+gpkg_out, mnt=Path+p_mltifct+c_mnt, out=Path_tmp+mnt_out, liste_site="','".join(df.id_site.tolist()))
system(op)
mnt = rio.open(Path_tmp+mnt_out)
xmin, ymin, xmax, ymax = mnt.bounds
# Découpage du PIEZO rasterizé interpolé par les polygones d'étude
op = '''
gdalwarp -overwrite -s_srs EPSG:2154 -t_srs EPSG:2154 -co FORMAT=GPKG -of GTiff -tr 25.0 -25.0 \
-te {xmin} {ymin} {xmax} {ymax} \
-tap -cutline "PG:dbname='azalee' host=91.134.194.221 port=5432 sslmode=disable user='cgeier' password='adm1n*bdCen'" \
-csql "SELECT site_code, geom FROM zones_humides.v_zoneshumides WHERE site_code in ('{liste_site}')" \
"{mnt}" {out}
'''.format(
xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax,
mnt=Path_tmp+piezo_in,
out=Path_tmp+piezo_out,
liste_site="','".join(df.id_site.tolist())
)
system(op)
# Soustraction de la valeur des mailles
op = '''
gdal_calc.py --overwrite -A {mnt} -B {piezo} --outfile={out} --calc="(A-B)<=0"
'''.format(mnt=Path_tmp+mnt_out, piezo=Path_tmp+piezo_out, out=Path_tmp+piezoVSmnt)
system(op)
# polygonisation du raster
op = '''
gdal_polygonize.py {mnt} -b 1 -f "GPKG" {out}
'''.format(mnt=Path_tmp+piezoVSmnt, out=Path_tmp+poly_connect_nape)
system(op)
# data = rio.open(Path_tmp+piezoVSmnt)
data = open_gpkg(Path_tmp+poly_connect_nape)
data.rename(columns={'DN':'connect_nappe'}, inplace=True)
data = data[data.connect_nappe > 0].copy()
# IDEM :
# gpd.sjoin(df,data).sort_values('id_site').id_site.unique() == \
# df[df.intersects(data.unary_union)].sort_values('id_site').id_site.tolist()
tmp = gpd.sjoin(df,data,how='left')
del tmp['index_right']
tmp.drop_duplicates(inplace=True)
df = tmp.copy()
df.to_file(Path_tmp+'zh_connect_nappe.gpkg',driver='GPKG')
# Import des courbe des niveau rasteriser par interpolation
# by QGIS : Outils de traitements > Interpolation > Interpolation TIN
# piezo = rio.open(Path+p_hydro+c_piezo_interp)
# from geocube.api.core import make_geocube
# piézo = open_gpkg(Path+p_hydro+c_piezo)
# piézo = piézo[~piézo.geom.isna()]
# piézo.rename_geometry('geometry', inplace=True)
# out_grid = make_geocube(
# vector_data=piézo,
# measurements=["id"],
# resolution=(-25, 25)
# )
# out_grid["id"].rio.to_raster(Path_tmp+"my_rasterized_column.tif")
# import xarray
# xds = xarray.open_dataarray(Path_tmp+"my_rasterized_column.tif")
# filled = xds.rio.interpolate_na(method='linear')
# filled.rio.to_raster(Path_tmp+"my_interpolate_raster.tif")
return df
def idpr(df):
'''
Calcul réalisé dans le cas où connex_molasse = 0.
Calcul de l'Indice de Développement et
de Persistance des Réseaux :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_idpr' :
if [ %_recouvrement(idpr < 1000) > 25% ] :
ssind_idpr = 1
else :
ssind_idpr = 0
'''
print('INIT : IDPR ...')
df = df.copy()
if 'ssind_molasse' not in df.columns:
df = fct_hyd.connex_molasse(df)
import rasterio
from rasterio.features import shapes
mask = None
with rasterio.Env():
with rasterio.open(Path+p_hydro+c_idpr2) as src:
image = src.read(1) # first band
image[(image < 1000) & (image > -1)] = 1
image[image >= 1000] = 0
data = gpd.GeoDataFrame.from_features(
features = [{'properties': {'raster_val': v}, 'geometry': s}
for i, (s, v)
in enumerate(
shapes(image, mask=mask, transform=src.transform)) if v >= 0],
crs = 'EPSG:2154')
data.rename_geometry('geom', inplace=True)
lst_data = []
if not df[df.ssind_molasse == 0].empty:
perc = _calc_recouvrmt(
df[df.ssind_molasse == 0],
data[data.raster_val == 1] )
lst_data = perc.loc[perc.perc_rcvmt.round(2) > 25,'id_site']
df['ssind_idpr'] = 0
df.loc[df.id_site.isin(lst_data), 'ssind_idpr'] = 1
return df
def fct_hydro(df):
'''
Attribution d'un poids à chaque zone humide
en fonction du nombre de rôles hydro-biologiques à caractères
hydrauliques et hydrologiques qu'elle remplie :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_fcthydro' :
0 fonction = 0
1 fonction = 0.5
+ fonctions = 1
'''
print('INIT : Fonctions hydro-biologiques à caractères hydrauliques et hydrologiques ...')
df = df.copy()
data = zh.get_fct(id_site=df.id_site.tolist())
data = data[(data.type == 'fct_hydro') & (data.nom_fct != 'non documenté')]
lst_terme = ["soutien naturel d'étiage",'ralentissement du ruissellement','expansion naturelle des crues']
d = data.loc[data.nom_fct.isin(lst_terme),['id_site','nom_fct']]
d['nb_fct'] = 1
d = d.groupby(['id_site']).sum().reset_index()
df['ssind_fcthydro'] = 0
lst_data1 = d.loc[d.nb_fct == 1, 'id_site']
lst_dataSup = d.loc[d.nb_fct > 1,'id_site']
df.loc[df.id_site.isin(lst_data1), 'ssind_fcthydro'] = 0.5
df.loc[df.id_site.isin(lst_dataSup), 'ssind_fcthydro'] = 1
return df
def zse_zsnea(df):
'''
Attribution d'un poids à chaque zone humide
en fonction de sont appartenance à une zone de sauvegarde
exploitée actuelle (zse) ou future (zsnea) :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_fcthydro' :
if ZH.intersects(ZSNEA|ZSE):
'ssind_fcthydro' = 1
else :
'ssind_fcthydro' = 0
'''
print('INIT : Zones de sauvegardes actuelles et futures (ZSE / ZSNEA) ...')
df = df.copy()
data1 = open_gpkg(Path+p_phybio+c_zse,bbox=df)
data2 = open_gpkg(Path+p_phybio+c_zsea,bbox=df)
data1 = _union_polygons_geometry(data1)
data2 = _union_polygons_geometry(data2)
if not data1.is_valid:
data1 = data1.buffer(0)
if not data2.is_valid:
data2 = data2.buffer(0)
df['zse'] = df.intersects(data1).astype(int) #.replace(1,2)
df['zsnea'] = df.intersects(data2).astype(int)
df['ssind_zse_zsnea'] = df[['zse', 'zsnea']].max(axis=1)
df.drop(columns=['zse','zsnea'], inplace=True)
return df
def bilan(df):
'''
Somme des sous-indices des fonctions hydrauliques
et hydrologiques des zones humides :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_hydro' :
sum(
ssind_zoneinond + ssind_eabf + ssind_distHydro +
ssind_molasse + ssind_idpr + ssind_fcthydro +
ssind_zse_zsnea
)
'''
df = fct_hyd.zone_inond(df)
df = fct_hyd.eabf(df)
df = fct_hyd.dist_reso_hydro(df)
df = fct_hyd.reghydro_out(df)
# df = fct_hyd.connex_molasse(df)
# df = fct_hyd.idpr(df)
df = fct_hyd.fct_hydro(df)
df = fct_hyd.zse_zsnea(df)
df['ind_hydro'] = df[
['ssind_zoneinond','ssind_eabf','ssind_distHydro',
'ssind_hydrout',#'ssind_molasse','ssind_idpr',
'ssind_fcthydro','ssind_zse_zsnea'
]
].sum(axis=1)
df.name = 'Fct_hyd'
return df
class fct_phy_bio:
'''
Calcule l'indice et les sous-indices des fonctions physiques
et biochimiques des zones humides :
Liste des fonctions
----------
perim_captage : Identification de la présence/absence
de zones de captages à proximité des zones humides
par intersection.
fct_hydrobio : Attribution d'un poids à chaque zone humide
en fonction du nombre de rôles hydro-biologiques à caractères
physiques et biochimiques qu'elle remplie.
occup_sol : Pour chaque zone humide, identification de la nature
d'occupation du sol et de sa surface de recouvrement. Déduction
de la surface d'espace naturel concernée par les zonnages.
Attribution d'un poids en fonction de la surface de recouverte.
bilan : Somme des sous-indices des fonctions physiques
et biochimiques des zones humides.
'''
def perim_captage(df):
'''
Identification de la présence/absence
de zones de captages à proximité des zones humides
par intersection. Ne considère pas les captages 'ABA|HS' :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_perimcaptage' :
if ZH.intersects(captage):
'ssind_perimcaptage' = 2
else :
'ssind_perimcaptage' = 0
'''
print('INIT : Périmètre de protection des captages AEP ...')
from pandas import concat
df = df.copy()
data1 = open_gpkg(Path+p_phybio+c_smvic_PPR1,bbox=df)
data2 = open_gpkg(Path+p_phybio+c_smvic_PPR2,bbox=df)
data3 = open_gpkg(Path+p_phybio+c_smvic_PPi,bbox=df)
data4 = open_gpkg(Path+p_phybio+c_smvic_PPe,bbox=df)
data = concat([data1,data2,data3,data4])
tmp = gpd.sjoin(
df,
data.loc[
~data.N_INS___NO.str.contains('ABA|HS'),
['geom']],
op = 'intersects',
how = 'left')
lst_site = tmp[~tmp.index_right.isna()].id_site
df['ssind_perimcaptage'] = 0
df.loc[df.id_site.isin(lst_site),'ssind_perimcaptage'] = 2
return df
# def zse_zsnea(df):
# print('INIT : Zones de sauvegardes actuelles et futures (ZSE / ZSNEA) ...')
# df = df.copy()
# data1 = open_gpkg(Path+p_phybio+c_zse,bbox=df)
# data2 = open_gpkg(Path+p_phybio+c_zsea,bbox=df)
# data1 = _union_polygons_geometry(data1)
# data2 = _union_polygons_geometry(data2)
# if not data1.is_valid:
# data1 = data1.buffer(0)
# if not data2.is_valid:
# data2 = data2.buffer(0)
# df['zse'] = df.intersects(data1).astype(int).replace(1,2)
# df['zsnea'] = df.intersects(data2).astype(int)
# df['ssind_zse_zsnea'] = df[['zse', 'zsnea']].max(axis=1)
# df.drop(columns=['zse','zsnea'], inplace=True)
# return df
def fct_hydrobio(df):
'''
Attribution d'un poids à chaque zone humide
en fonction du nombre de rôles hydro-biologiques à caractères
physiques et biochimiques qu'elle remplie :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_fcthydrobio' :
0 fonction = 0
1 fonction = 1
+ fonctions = 2
'''
print('INIT : Fonctions hydro-biologiques à caractères physiques et biochimiques ...')
df = df.copy()
data = zh.get_fct(id_site=df.id_site.tolist())
data = data[(data.type == 'fct_hydro') & (data.nom_fct != 'non documenté')]
lst_terme = ["fonctions d'épuration","rôle naturel de protection contre l'érosion"]
d = data.loc[data.nom_fct.isin(lst_terme),['id_site','nom_fct']]
d['nb_fct'] = 1
d = d.groupby(['id_site']).sum().reset_index()
df['ssind_fcthydrobio'] = 0
lst_data1 = d.loc[d.nb_fct == 1, 'id_site']
lst_dataSup = d.loc[d.nb_fct > 1,'id_site']
df.loc[df.id_site.isin(lst_data1), 'ssind_fcthydrobio'] = 1
df.loc[df.id_site.isin(lst_dataSup), 'ssind_fcthydrobio'] = 2
return df
def occup_sol(df):
'''
Pour chaque zone humide, identification de la nature
d'occupation du sol et de sa surface de recouvrement.
Déduction de la surface d'espace naturel concernée par les zonnages.
Type d'intérêt :
'Forêts' / 'Milieux à végétation arbustive et/ou herbacée' / 'Prairies'
Attribution d'un poids en fonction de la surface de recouverte :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_occupsol' :
if surf_recouverte < 25% :
ssind = 0
elif surf_recouverte in [ 25% ; 50% [ :
ssind = 1
elif surf_recouverte in [ 50% ; 75% [ :
ssind = 1.5
elif surf_recouverte in [ 75% ; 100% ] :
ssind = 2
'''
init = dt.now()
print('INIT : Occupation du sol ...')
df = df.copy()
ddf = df.copy()
ddf['area_init'] = ddf.area
# data = open_gpkg(Path+p_phybio+c_occupsol, bbox=df)
print('IMPORT DATA ...')
artif = open_gpkg(Path+p_press+c_artif, bbox=df)
artif = artif[['ID','geom']]
artif1 = artif.iloc[0,:]
artif1 = gpd.GeoDataFrame(artif1).T
artif1.set_geometry('geom', inplace=True)
artif1.set_crs(crs=artif.crs.srs, inplace=True)
# artif1 = gpd.GeoDataFrame(artif.iloc[0].copy(),geometry=artif.iloc[0].geom,crs=artif.crs.srs)
artif1 = gpd.overlay(artif1,ddf, how='intersection')
artif1.rename_geometry('geom', inplace=True)
artif2 = gpd.GeoDataFrame(artif.iloc[1:,:])
artif2.set_geometry('geom', inplace=True)
artif2.set_crs(crs=artif.crs.srs, inplace=True)
rpg = open_gpkg(Path+p_phybio+c_rpg, bbox=df)
lst_code = ['BOP','SPH','SPL','PPH','PRL','J6P','J6S','BTA','ROS','SBO']
rpg = rpg[~rpg.code_cultu.isin(lst_code)]
print((dt.now() - init).total_seconds())
print('CORRECTION GEOMETRY ...')
if not artif1.geom.is_valid.all() :
artif1.loc[~artif1.geom.is_valid,'geom'] = artif1.loc[~artif1.geom.is_valid,'geom'].buffer(0)
if not artif2.geom.is_valid.all() :
artif2.loc[~artif2.geom.is_valid,'geom'] = artif2.loc[~artif2.geom.is_valid,'geom'].buffer(0)
if not rpg.geom.is_valid.all() :
rpg.loc[~rpg.geom.is_valid,'geom'] = rpg.loc[~rpg.geom.is_valid,'geom'].buffer(0)
print((dt.now() - init).total_seconds())
print('DATA READY ...')
print('INIT OVERLAY ...')
ddf = gpd.overlay(ddf,artif1, how='difference')
if 'GeometryCollection' in ddf.geom_type.unique():
ddf.geom = ddf.geom.buffer(0)
ddf = gpd.overlay(ddf,rpg, how='difference')
if 'GeometryCollection' in ddf.geom_type.unique():
ddf.geom = ddf.geom.buffer(0)
ddf = gpd.overlay(ddf,artif2, how='difference')
if 'GeometryCollection' in ddf.geom_type.unique():
ddf.geom = ddf.geom.buffer(0)
print('END OVERLAY ...')
print((dt.now() - init).total_seconds())
ddf['area_end'] = ddf.area
ddf['perc_rcvmt'] = 100 - (100*ddf.area_end/ddf.area_init)
df = df.merge(ddf[['id_site','perc_rcvmt']], on='id_site', how='left')
df.perc_rcvmt.fillna(0,inplace=True)
df['perc_surfNat'] = 100 - df['perc_rcvmt']
# lst_terme = ['Forêts','Milieux à végétation arbustive et/ou herbacée','Prairies']
# d = data.loc[data.libelle_02.isin(lst_terme),]
# print(
# ('INIT : Calcul du recouvrement de l\'occupation des sols sur les zones humides :'),
# ('"Forêts","Milieux à végétation arbustive et/ou herbacée","Prairies"'))
# print(('ATTENTION : Les géometries de l\'occupation des sols étant complexes,'),
# ('le calcul peut prendre un certain temps ...'))
# df = _calc_recouvrmt(df,d)
# print('END : Calcul du recouvrement de l\'occupation des sols sur les zones humides')
if 'ssind_fcthydrobio' not in df.columns:
df = fct_phy_bio.fct_hydrobio(df)
df['ssind_occupsol'] = 0
df.loc[(df.ssind_fcthydrobio == 0) & (df.perc_surfNat.between(25,50,inclusive=True)),'ssind_occupsol'] = 1
df.loc[(df.ssind_fcthydrobio == 0) & (df.perc_surfNat.between(50,75,inclusive=True)),'ssind_occupsol'] = 1.5
df.loc[(df.ssind_fcthydrobio == 0) & (df.perc_surfNat >= 75),'ssind_occupsol'] = 2
df.drop(columns=['perc_rcvmt','perc_surfNat'], inplace=True)
print('END ssind_occupsol ...')
return df
def bilan(df):
'''
Somme des sous-indices des fonctions physiques
et biochimiques des zones humides :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_phybio' :
sum(
ssind_perimcaptage +
ssind_fcthydrobio +
ssind_occupsol
)
'''
df = fct_phy_bio.perim_captage(df)
# df = fct_phy_bio.zse_zsnea(df)
df = fct_phy_bio.fct_hydrobio(df)
df = fct_phy_bio.occup_sol(df)
df['ind_phybio'] = df[
['ssind_perimcaptage',
# 'ssind_zse_zsnea',
'ssind_fcthydrobio','ssind_occupsol']
].sum(axis=1)
df.name = 'Fct_phy_bio'
return df
class crit_multi_fct:
'''
Calcule l'indice et les sous-indices des criètes
« multi-fonctions » des zones humides :
Liste des fonctions
----------
surface : Calcul de la surface totale des zones humides.
Attribution d'un poid en fonction du résultat.
pente : Calcul de la pente moyenne des zones humides
via le MNT. Attribution d'un poid en fonction du résultat.
dir_exp : Ajout d'un champ dir_exp dans le tableau de sortie
qui sera à remplir manuellement par celui-ci.
bilan : Rassemblement des sous-indices des criètes
« multi-fonctions » dans un même tableau. L'indice pourra
être calculé lorque le champs dir_exp sera remplie.
'''
def surface(df):
'''
Calcul de la surface totale des zones humides.
Attribution d'un poid en fonction du résultat :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_surf' :
if ZH < 1ha :
ssind = 0
elif ZH in [ 1ha ; 20ha [ :
ssind = 0.5
elif ZH in [ 20ha ; 100ha [ :
ssind = 1
elif ZH >= 100ha :
ssind = 1.5
'''
print('INIT : Calcul de la surface ...')
df = df.copy()
df['ssind_surf'] = 0
df.loc[(df.area/10000).between(1,20,inclusive=True),'ssind_surf'] = 0.5
df.loc[(df.area/10000).between(20,100,inclusive=True),'ssind_surf'] = 1
df.loc[df.area/10000 >= 100,'ssind_surf'] = 1.5
return df
def pente(df):
'''
Calcul de la pente moyenne des zones humides
via le MNT. Attribution d'un poid en fonction du résultat :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_surf' :
if ZH > 5% :
ssind = 0
else:
ssind = 1
'''
print('INIT : Calcul de la pente moyenne ...')
dfP = df.copy()
# df.geom = df.buffer(0)
# poly=df.geom[0]
# from shapely.geometry import Point
# tutu = gpd.pd.DataFrame(
# [{'index':g,'coords':list(x.exterior.coords)} for g,geom in enumerate(df.geom) for x in geom.geoms]
# )
# tutu = tutu.explode('coords')
# tutu['geom'] = [Point([x]) for x in tutu.coords]
# tutu = tutu.set_geometry('geom')
# tutu.set_crs(epsg=2154, inplace=True)
# chdir(Path+p_mltifct)
# Dir = listdir()
# Dir = [x for x in Dir if '.zip' in x]
# for i, d in enumerate(Dir):
# zip = zipfile.ZipFile(d).namelist()
# z = [z for z in zip if 'MNT' in z][0]
# stats = rasterstats.zonal_stats(tutu.geometry,'zip:{0}/{1}'.format(d,z))
# stats = pd.DataFrame(stats)
# if i == 0 :
# tutu[stats.columns] = stats
# else:
# tmp = pd.DataFrame({'tutu':tutu['mean']*tutu['count'], 'stats':stats['mean']*stats['count']})
# tutu['mean'] = tmp.sum(axis=1)/(tutu['count']+stats['count'])
# tmp = pd.DataFrame({'tutu':tutu['count'], 'stats':stats['count']})
# tutu['count'] = tmp.sum(axis=1)
# tmp = pd.DataFrame({'tutu':tutu['min'], 'stats':stats['min']})
# tutu['min'] = tmp.min(axis=1)
# tmp = pd.DataFrame({'tutu':tutu['max'], 'stats':stats['max']})
# tutu['max'] = tmp.max(axis=1)
# transformation altitude to pente .
# gdaldem slope
# "/home/colas/Documents/9_PROJETS/3_PGZH/SIG/multi_fonctions/IGN - BD Alti 25M/BDALTIV2_25M_FXX_0900_6475/BDALTIV2_25M_FXX_0900_6475_MNT_LAMB93_IGN69.asc"
# /tmp/processing_nooAPj/4392eb6b07804db4b4350433cc6db54a/OUTPUT.tif -of GTiff -b 1 -s 1.0 -p
from os import listdir, chdir, system
from pathlib import Path as Pathlib
from zipfile import ZipFile
from rasterstats import zonal_stats
home = str(Pathlib.home())
chdir(Path+p_mltifct)
Dir = listdir()
Dir = [x for x in Dir if '.zip' in x]
slope = 'temp_slop.tif'
for i, d in enumerate(Dir):
zip = ZipFile(d).namelist()
z = [z for z in zip if 'MNT' in z][0]
system("gdaldem slope '/vsizip/{zip}/{mnt}' '{slope}' -of GTiff -b 1 -s 1.0 -p".format(zip=d,mnt=z,slope=slope))
stats = zonal_stats(dfP.geom,slope)
stats = gpd.pd.DataFrame(stats)
if i == 0 :
dfP[stats.columns] = stats
else:
tmp = gpd.pd.DataFrame({'dfP':dfP['mean']*dfP['count'], 'stats':stats['mean']*stats['count']})
dfP['mean'] = tmp.sum(axis=1)/(dfP['count']+stats['count'])
tmp = gpd.pd.DataFrame({'dfP':dfP['count'], 'stats':stats['count']})
dfP['count'] = tmp.sum(axis=1)
tmp = gpd.pd.DataFrame({'dfP':dfP['min'], 'stats':stats['min']})
dfP['min'] = tmp.min(axis=1)
tmp = gpd.pd.DataFrame({'dfP':dfP['max'], 'stats':stats['max']})
dfP['max'] = tmp.max(axis=1)
system('rm {slope}'.format(slope=slope))
chdir(home)
dfP['ssind_pente'] = 0
dfP.loc[dfP['mean'] < 5, 'ssind_pente'] = 1
df = df.merge(dfP[['id_site','ssind_pente']], on=['id_site'], how='left')
# dst = [z for z in zip if 'DST' in z][0]
# src = [z for z in zip if 'SRC' in z][0]
# dst = rasterio.open('zip:{0}/{1}'.format(d,dst))
# src = rasterio.open('zip:{0}/{1}'.format(d,src))
# mnt = rasterio.open(
# 'zip:{0}/{1}'.format(d,z),'w+',
# width=dst.width,
# height=dst.height,
# count=1,
# crs=dst.crs.data,
# transform=dst.transform,
# dtype=dst.dtypes[0])
# stats = rasterstats.zonal_stats(df.geom,'zip:{0}/{1}'.format(d,z))
# stats = pd.DataFrame(stats)
# if i == 0 :
# df[stats.columns] = stats
# else:
# tmp = pd.DataFrame({'df':df['mean']*df['count'], 'stats':stats['mean']*stats['count']})
# df['mean'] = tmp.sum(axis=1)/(df['count']+stats['count'])
# tmp = pd.DataFrame({'df':df['count'], 'stats':stats['count']})
# df['count'] = tmp.sum(axis=1)
# tmp = pd.DataFrame({'df':df['min'], 'stats':stats['min']})
# df['min'] = tmp.min(axis=1)
# tmp = pd.DataFrame({'df':df['max'], 'stats':stats['max']})
# df['max'] = tmp.max(axis=1)
# https://stackoverflow.com/questions/8844781/get-file-list-of-files-contained-in-a-zip-file
# res.append(rasterio.open(
# 'zip:{0}/{1}'.format(d,z)))
# continue
# df['ssind_pente'] = 0
return df
def dir_exp(df):
'''
Ajout d'un champ dir_exp dans le tableau de sortie
qui sera à remplir manuellement par celui-ci :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_direxp' : None
'''
df = df.copy()
data = gpd.pd.read_csv(P_expert + c_expert,sep=';')
data = data[['site_code','Note_DE_fonction']]
data.columns = ['site_code','ssind_direxp']
data.ssind_direxp = data.ssind_direxp.astype(float)
df = df.merge(data,right_on='site_code',left_on='id_site')
del df['site_code']
# df['ssind_direxp'] = None
return df
def bilan(df):
'''
Rassemblement des sous-indices des criètes
« multi-fonctions » dans un même tableau. L'indice pourra
être calculé lorque le champs dir_exp sera remplie :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_multifct' :
sum(
ssind_surf +
ssind_pente + 'ssind_direxp'
)
'''
# df = crit_multi_fct.surface(df)
df = crit_multi_fct.pente(df)
# df['ssind_total'] = df[
# ['ssind_surf','ssind_pente']
# ].sum(axis=1)
df = crit_multi_fct.dir_exp(df)
df['ind_multifct'] = df[
[#'ssind_surf',
'ssind_pente','ssind_direxp']
].sum(axis=1)
df.name = 'Crit_multi_fct'
return df
class pression:
'''
Calcule l'indice et les sous-indices des pressions
exercées sur les zones humides :
Liste des fonctions
----------
artif_directe : Récupération des résultats des pressions directes
d'artificialisation Rhoméo I12. Application de la discrimination
de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1].
artif_indir : Récupération des résultats des pressions indirectes
d'artificialisation Rhoméo I12. Application de la discrimination
de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1].
urbani_directe : Récupération des résultats des pressions directes
d'urbanisation Rhoméo I12. Application de la discrimination
de Jenks pour catégoriser les résultats en 4 classes [0, 0.5, 1, 1.5].
urbani_indir : Récupération des résultats des pressions indirectes
d'urbanisation Rhoméo I12. Application de la discrimination
de Jenks pour catégoriser les résultats en 4 classes [0, 0.5, 1, 1.5].
pressAgri_directe : Récupération des résultats des pressions directes
agricoles Rhoméo I13. Application de la discrimination
de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1].
pressAgri_indir : Récupération des résultats des pressions indirectes
agricoles Rhoméo I13. Application de la discrimination
de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1].
projet_plu_U : Intersections des zones relevant du projet d'Urbanisme (PLU)
avec les polygones de l'étude. Considération du champs
Typezone == 'U'. Attribution des points en cas d'intersections.
projet_plu_AU : Intersections des zones relevant du projet d'Urbanisme (PLU)
avec les polygones de l'étude. Considération du champs
Typezone == 'AU'. Attribution des points en cas d'intersections.
projet_scot : En cas d'absence de PLU, recherche d'espaces de
développements potentiels au alentours des sites (SCOT).
Attribution des points en cas d'intersections.
conflit_redi : Intersections des zones de conflits redi
(Points, Lignes) avec les polygones de l'étude. Utilistaion
d'un buffer de 100m.
prelev_eau : Identification da la proximité des zones humides
avec des sources de captages. Application d'un buffer de 50m.
Identification par intersection.
icpe : Identification da la proximité des zones humides
avec des installations classés. Application d'un buffer de 500m.
Identification par intersection.
ouvrage : Identification da la présence d'ouvrages et de dépôts
au sein des zones humides. Identification par intersection.
vulnerabilite : Identification da la proximité des zones humides
avec des espèces exotiques envahissantes. Application d'un buffer
de 100m. Identification par intersection.
press_dir : Calcul de l'indice de pression directe (artif_directe,
urbani_directe, pressAgri_directe, projet_plu_U, conflit_redi,
icpe, prelev_eau, ouvrage, vulnerabilite).
press_ind : Calcul de l'indice de pression indirecte (artif_indir;
urbani_indir, pressAgri_indir, projet_plu_AU, projet_scot).
bilan : Rassemblement des sous-indices des pressions directes
et indirectes dans un même tableau.
'''
# indesirable : Identification da la présence d'espèces indésirables
# au sein des zones humides. Identification par intersection
# à partir d'une couche fournie par le commanditaire
def artif_directe(df):
'''
Récupération des résultats des pressions directes
d'artificialisation Rhoméo I12. Application de
la discrimination de Jenks pour catégoriser les résultats
en 3 classes [0, 0.5, 1]:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_artifDir' = 0|0.5|1
'''
# from datetime import datetime as dt
print('INIT : Artificialisation directe ...')
df = df.copy()
# data = open_gpkg(Path+p_press+c_artif, bbox=df)
# data = data[['ID','geom']]
# # recouvrement de la surface
# init = dt.now()
# data_surf = gpd.GeoDataFrame(data.iloc[0].to_dict(), crs=data.crs.srs,geometry='geom')
# tmp_surf = _calc_recouvrmt(df,data_surf)
# print((dt.now() - init).total_seconds())
# # recouvrement des routes sur les zones humides
# init = dt.now()
# tmp_route = _calc_recouvrmt(df,data.iloc[1:])
# print((dt.now() - init).total_seconds())
# # Calcul des sous-indices
# tmp = tmp_surf.merge(tmp_route, on=['id_site'])
# tmp['perc_rcvmt'] = tmp[['perc_rcvmt_x','perc_rcvmt_y']].sum(axis=1)
# tmp['ssind_artif'] = 0
# tmp.loc[tmp.perc_rcvmt > 5,'ssind_artif'] = 1
# tmp.loc[tmp.perc_rcvmt > 10,'ssind_artif'] = 2
# df = df.merge(tmp[['id_site','ssind_artif']], on=['id_site'],how='left')
data = gpd.pd.read_csv(path0 + c_rhomeo)
data.set_index('site_code', inplace=True)
data.presdirect_artif = data.presdirect_artif.round()
tmp = jenks(
data=data[['presdirect_artif']],
col='presdirect_artif',
labels=[0, 0.5, 1])
df = df.merge(tmp,how='left',left_on='id_site',right_index=True)
df.rename(columns={'presdirect_artif':'ssind_artifDir'}, inplace=True)
return df
def artif_indir(df):
'''
Récupération des résultats des pressions indirectes
d'artificialisation Rhoméo I12. Application de
la discrimination de Jenks pour catégoriser les résultats
en 3 classes [0, 0.5, 1]:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_artifIndir' = 0|0.5|1
'''
# from datetime import datetime as dt
print('INIT : Artificialisation indirecte ...')
df = df.copy()
data = gpd.pd.read_csv(path0 + c_rhomeo)
data.set_index('site_code', inplace=True)
data.presindir_artif = data.presindir_artif.round()
tmp = jenks(
data=data[['presindir_artif']],
col='presindir_artif',
labels=[0, 0.5, 1])
df = df.merge(tmp,how='left',left_on='id_site',right_index=True)
df.rename(columns={'presindir_artif':'ssind_artifIndir'}, inplace=True)
return df
def urbani_directe(df):
'''
Récupération des résultats des pressions directes
d'urbanisation Rhoméo I12. Application de
la discrimination de Jenks pour catégoriser les résultats
en 4 classes [0, 0.5, 1, 1.5]:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_urbaDir' = 0|0.5|1|1.5
'''
print('INIT : Urbanisation indirecte ...')
df = df.copy()
data = gpd.pd.read_csv(path0 + c_rhomeo)
data.set_index('site_code', inplace=True)
data.presdirect_urba = data.presdirect_urba.round()
tmp = jenks(
data=data[['presdirect_urba']],
col='presdirect_urba',
labels=[0, 0.5, 1, 1.5])
df = df.merge(tmp,how='left',left_on='id_site',right_index=True)
df.rename(columns={'presdirect_urba':'ssind_urbaDir'}, inplace=True)
# df['ssind_urbani'] = None
return df
def urbani_indir(df):
'''
Récupération des résultats des pressions indirectes
d'urbanisation Rhoméo I12. Application de
la discrimination de Jenks pour catégoriser les résultats
en 4 classes [0, 0.5, 1, 1.5]:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_urbaIndir' = 0|0.5|1|1.5
'''
print('INIT : Urbanisation indirecte ...')
df = df.copy()
data = gpd.pd.read_csv(path0 + c_rhomeo)
data.set_index('site_code', inplace=True)
data.presindir_urba = data.presindir_urba.round()
tmp = jenks(
data=data[['presindir_urba']],
col='presindir_urba',
labels=[0, 0.5, 1, 1.5])
df = df.merge(tmp,how='left',left_on='id_site',right_index=True)
df.rename(columns={'presindir_urba':'ssind_urbaIndir'}, inplace=True)
# df['ssind_urbani'] = None
return df
def pressAgri_directe(df):
'''
Récupération des résultats des pressions directes
agricoles Rhoméo I13. Application de la discrimination
de Jenks pour catégoriser les résultats en 3 classes
[0, 0.5, 1]:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_agriDir' = 0|0.5|1
'''
print('INIT : Pressions agricoles directes...')
df = df.copy()
# data = open_gpkg(Path+p_press+c_agric, bbox=df)
# tmp = _calc_recouvrmt(df,data)
# tmp['ssind_agri'] = 0
# tmp.loc[tmp.perc_rcvmt > 5,'ssind_agri'] = 0.5
# tmp.loc[tmp.perc_rcvmt > 10,'ssind_agri'] = 1
# df = df.merge(tmp[['id_site','ssind_agri']], on=['id_site'],how='left')
data = gpd.pd.read_csv(path0 + c_rhomeo)
data.set_index('site_code', inplace=True)
data.presdirect_agri = data.presdirect_agri.round()
tmp = jenks(
data=data[['presdirect_agri']],
col='presdirect_agri',
labels=[0, 0.5, 1])
df = df.merge(tmp,how='left',left_on='id_site',right_index=True)
df.rename(columns={'presdirect_agri':'ssind_agriDir'}, inplace=True)
return df
def pressAgri_indir(df):
'''
Récupération des résultats des pressions indirectes
agricoles Rhoméo I13. Application de la discrimination
de Jenks pour catégoriser les résultats en 3 classes
[0, 0.5, 1]:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_agriIndir' = 0|0.5|1
'''
print('INIT : Pressions agricoles indirectes...')
df = df.copy()
data = gpd.pd.read_csv(path0 + c_rhomeo)
data.set_index('site_code', inplace=True)
data.presindir_agri = data.presindir_agri.round()
tmp = jenks(
data=data[['presindir_agri']],
col='presindir_agri',
labels=[0, 0.5, 1])
df = df.merge(tmp,how='left',left_on='id_site',right_index=True)
df.rename(columns={'presindir_agri':'ssind_agriIndir'}, inplace=True)
return df
def projet_plu_U(df):
'''
Intersections des zones relevant du projet d'Urbanisme (PLU)
avec les polygones de l'étude. Considération du champs
Typezone == 'U'. Attribution des points en cas d'intersections:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_pluU' = 0|1
'''
df = df.copy()
d1 = open_gpkg(Path+p_press+c_urba_plu1, bbox=df)
d2 = open_gpkg(Path+p_press+c_urba_plu2, bbox=df)
d3 = open_gpkg(Path+p_press+c_urba_plu3, bbox=df)
d3.rename(columns={'typezone': 'Typezone'}, inplace=True)
d1 = d1[d1.Typezone =='U']
d2 = d2[d2.Typezone =='U']
d3 = d3[d3.Typezone =='U']
data = gpd.pd.concat(
[d1[['Typezone','geom']],d2[['Typezone','geom']],d3[['Typezone','geom']]],
ignore_index=True)
tmp = gpd.sjoin(df,data[['geom']],op='intersects', how='left')
lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
df['ssind_pluU'] = 0
df.loc[df.id_site.isin(lst),'ssind_pluU'] = 1
return df
def projet_plu_AU(df):
'''
Intersections des zones relevant du projet d'Urbanisme (PLU)
avec les polygones de l'étude. Considération du champs
Typezone == 'AU'. Attribution des points en cas d'intersections:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_pluAU' = 0|1
'''
df = df.copy()
d1 = open_gpkg(Path+p_press+c_urba_plu1, bbox=df)
d2 = open_gpkg(Path+p_press+c_urba_plu2, bbox=df)
d3 = open_gpkg(Path+p_press+c_urba_plu3, bbox=df)
d3.rename(columns={'typezone': 'Typezone'}, inplace=True)
d1 = d1[d1.Typezone.str.contains('AU')]
d2 = d2[d2.Typezone.str.contains('AU')]
d3 = d3[d3.Typezone.str.contains('AU',na=False)]
data = gpd.pd.concat(
[d1[['Typezone','geom']],d2[['Typezone','geom']],d3[['Typezone','geom']]],
ignore_index=True)
tmp = gpd.sjoin(df,data[['geom']],op='intersects', how='left')
lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
df['ssind_pluAU'] = 0
df.loc[df.id_site.isin(lst),'ssind_pluAU'] = 1
return df
def projet_scot(df):
'''
En cas d'absence de PLU, recherche d'espaces de
développements potentiels au alentours des sites
(SCOT). Attribution des points en cas d'intersections:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_scot' = 0|1
'''
df = df.copy()
# Ouverture des couches PLU
d1 = open_gpkg(Path+p_press+c_urba_plu1, bbox=df)
d2 = open_gpkg(Path+p_press+c_urba_plu2, bbox=df)
d3 = open_gpkg(Path+p_press+c_urba_plu3, bbox=df)
d3.rename(columns={'typezone': 'Typezone'}, inplace=True)
dataPLU = gpd.pd.concat(
[d1[['Typezone','geom']],d2[['Typezone','geom']],d3[['Typezone','geom']]],
ignore_index=True)
tmpPLU = gpd.sjoin(df,dataPLU[['geom']],op='intersects', how='left')
lstPLU = tmpPLU.loc[~tmpPLU.index_right.isna(),'id_site'].tolist()
# if 'ssind_pluAU' not in df.columns:
# df = pression.projet_plu_AU(df)
# Si pas de PLU concerner par les sites, intersection des SCOT
dataSCOT = open_gpkg(Path+p_press+c_urba_scot, bbox=df)
tmp = gpd.sjoin(df,dataSCOT[['geom']],op='intersects', how='left')
lstSCOT = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
df['ssind_scot'] = 0
df.loc[(~df.id_site.isin(lstPLU))&(df.id_site.isin(lstSCOT)),'ssind_scot'] = 1
return df
def conflit_redi(df):
'''
Intersections des zones de conflits redi (Points, Lignes)
avec les polygones de l'étude. Utilistaion d'un buffer
de 100m:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_confli' = 0
if conflit :
'ssind_confli' = 2
'''
df = df.copy()
lign = open_gpkg(Path+p_press+c_lign_confliredi, bbox=df)
poin = open_gpkg(Path+p_press+c_poin_confliredi, bbox=df)
lign.columns = lign.columns.str.lower()
poin.columns = poin.columns.str.lower()
lign.geom = lign.geom.buffer(50)
poin.geom = poin.geom.buffer(50)
data = gpd.pd.concat([
lign[['id','geom']],
poin[['id','geom']]
])
data = gpd.GeoDataFrame(data,geometry='geom',crs=lign.crs.srs)
geom = _union_polygons_geometry(data)
if not geom.is_valid:
geom = geom.buffer(0)
df['ssind_confli'] = df.intersects(geom).astype(int).replace(1,2)
return df
def prelev_eau(df):
'''
Identification da la proximité des zones humides
avec des sources de captages.
Application d'un buffer de 50m.
Identification par intersection :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_prlvmteau' :
ZH.geom = ZH.buffer(50)
if ZH.intersects(prelev_eau):
ssind = 1
else :
ssind = 0
'''
print("INIT : Prélevement d'eau ...")
df_buf = df.copy()
df_buf.geom = df.buffer(50)
data = gpd.pd.read_csv(Path+p_press+c_captag, sep=';')
data = data.loc[data['Qualit de localisation']==1]
data = gpd.GeoDataFrame(
data,
crs=df_buf.crs.srs,
geometry=gpd.points_from_xy(data['Coordonnes lambert X'],data['Coordonnes lambert Y'])
)
data.rename_geometry('geom', inplace=True)
data.reset_index(drop=True, inplace=True)
mp = MultiPoint(data.geom)
df_buf['ssind_prlvmteau'] = df_buf.intersects(mp).astype(int)
df = df.merge(df_buf[['id_site','ssind_prlvmteau']], on=['id_site'])
return df
def icpe(df):
'''
Identification da la proximité des zones humides
avec des installations classés.
Application d'un buffer de 500m.
Identification par intersection :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_icpe' :
ZH.geom = ZH.buffer(500)
if ZH.intersects(install_classee):
ssind = 1
else :
ssind = 0
'''
print('INIT : ICPE ...')
tmp = df.copy()
tmp.geom = tmp.buffer(500)
data = open_gpkg(Path+p_press+c_iClass)
data = MultiPoint(data.geom)
tmp['ssind_icpe'] = tmp.intersects(data).astype(int)
df = df.merge(tmp[['id_site','ssind_icpe']], on=['id_site'],how='left')
return df
# def indesirable(df):
# '''
# Identification da la présence d'espèces indésirables
# au sein des zones humides. Identification par intersection
# à partir d'une couche fournie par le commanditaire :
# Parameters
# ----------
# df : GeoDataFrame. GeoDataFrame des zones humides
# de l'étude.
# Return
# ----------
# 'ssind_indesi' :
# if ZH.intersects(esp_indesir):
# ssind_indesir = 1
# else :
# ssind_indesir = 0
# '''
# print('INIT : Indésirable ...')
# df = df.copy()
# d1 = open_gpkg(Path+p_press+c_indesi, bbox=df, layer='Ambroisie')
# d2 = open_gpkg(Path+p_press+c_indesi, bbox=df, layer='Renouée')
# d1.rename(columns={'ID_SIR_GL_AMBROISIE':'ID'}, inplace=True)
# d2.rename(columns={'ID_SIR_GL_RENOUEE':'ID'}, inplace=True)
# dftmp = df.copy()
# dftmp.geom = dftmp.buffer(100)
# data = gpd.pd.concat([d1[['ID','geom']],d2[['ID','geom']]])
# tmp = gpd.sjoin(dftmp,data[['geom']],op='intersects', how='left')
# lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
# df['ssind_indesir'] = 0
# df.loc[df.id_site.isin(lst),'ssind_indesir'] = 1
# return df
# data.reset_index(inplace=True)
# data.surface_m2 = data.surface_m2.str.replace('m2','', regex=True)
# MinMax = data.loc[data.surface_m2.str.contains('-'),'surface_m2'].str.split('-',1, expand=True)
# MinMax.columns = ['min_surf', 'max_surf']
# Min = data.loc[data.surface_m2.str.contains('>'),'surface_m2'].str.replace('>','', regex=True)
# MinMax = gpd.pd.concat([MinMax,Min])
# MinMax.loc[~MinMax[0].isna(),'min_surf'] = MinMax.loc[~MinMax[0].isna(),0]
# MinMax.drop(columns=0, inplace=True)
# Max = data.loc[data.surface_m2.str.contains('<'),'surface_m2'].str.replace('<','', regex=True)
# MinMax = gpd.pd.concat([MinMax,Max])
# MinMax.loc[~MinMax[0].isna(),'max_surf'] = MinMax.loc[~MinMax[0].isna(),0]
# MinMax.drop(columns=0, inplace=True)
# data = data.join(MinMax)
# from shapely.ops import nearest_points
# # unary union of the gpd2 geomtries
# pts3 = data.geom.unary_union
# def near(point, pts=pts3):
# # find the nearest point and return the corresponding Place value
# nearest = data.geom == nearest_points(point, pts)[1]
# return data[nearest].index[0]
# df_temp['Nearest'] = df_temp.apply(lambda row: near(row.geom), axis=1)
# data.rename(columns={'index':'Nearest'}, inplace=True)
# data2 = df_temp[['Nearest']].copy()
# data2 = data2.join(data[['geom', 'surface_m2','min_surf', 'max_surf']], how='left', on=['Nearest'])
# data2 = data2.set_geometry('geom')
# data3 = data2.drop_duplicates()
# df_temp['dist'] = df_temp.distance(data2)
# df_temp = df_temp.merge(data3[['Nearest','min_surf','max_surf']], on=['Nearest'], how='left')
# df_temp.min_surf.fillna(0, inplace=True)
# df_temp['min_surf'] = gpd.pd.to_numeric(df_temp['min_surf'], errors='coerce')
# df_temp['max_surf'] = gpd.pd.to_numeric(df_temp['max_surf'], errors='coerce')
# df_temp.loc[df_temp.max_surf.isna(),'max_surf'] = df_temp.loc[df_temp.max_surf.isna(),'min_surf']
# from math import pi, sqrt
# df_temp['rayon'] = df_temp.max_surf.apply(lambda x: sqrt(x*pi))
# df_temp['ssind_indesi'] = 0
# df_temp.loc[df_temp.dist <= df_temp.rayon,'ssind_indesi'] = 1
# df = df.merge(df_temp[['id_site','ssind_indesi']], on=['id_site'],how='left')
# # geom_data = data.unary_union
# # df['ssind_indesi'] = df.intersects(geom_data).astype(int)
def ouvrage(df):
'''
Identification da la présence d'ouvrages et de dépôts
au sein des zones humides. Identification par intersection :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_ouvrag' :
if ZH.intersects(ouvrages):
ssind = 1
else :
ssind = 0
'''
print('INIT : Ouvrage ...')
df = df.copy()
data = open_gpkg(Path+p_press+c_ouvrag, bbox=df)
data = data.unary_union
# data = MultiPoint(data.geom)
df['ssind_ouvrag'] = df.intersects(data).astype(int)
data2 = open_gpkg(Path+p_press+c_depot, bbox=df)
data2 = data2.unary_union
df.loc[df.ssind_ouvrag == 0, 'ssind_ouvrag'] = df[df.ssind_ouvrag == 0].intersects(data2).astype(int)
return df
def vulnerabilite(df):
'''
Identification da la proximité des zones humides
avec des espèces exotiques envahissantes.
Application d'un buffer de 100m.
Identification par intersection :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_vulnerab' :
ZH.geom = ZH.buffer(100)
if ZH.intersects(esp_exo_envahi):
ssind = 1
else :
ssind = 0
'''
print('INIT : Vulnérabilité ...')
df_buf = df.copy()
df_buf.geom = df_buf.buffer(100)
lst_term = ['Buddleia','Renouee','Solidage']
data1 = open_gpkg(Path+p_press+c_invas, bbox=df_buf)
data1 = data1[data1.Espece.isin(lst_term)]
data2 = open_gpkg(Path+p_press+c_fallo, bbox=df_buf)
data3 = open_gpkg(Path+p_press+c_cd38_eee, bbox=df_buf, layer='Renouée')
data4 = open_gpkg(Path+p_press+c_cd38_eee, bbox=df_buf, layer='Ambroisie')
data = gpd.pd.concat([data1[['geom']],data2[['geom']],data3[['geom']],data4[['geom']]],ignore_index=True)
data.reset_index(inplace=True)
tmp = gpd.sjoin(df,data[['geom']],op='intersects', how='left')
lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist()
df_buf['ssind_vulnerab'] = 0
df_buf.loc[df_buf.id_site.isin(lst),'ssind_vulnerab'] = 1
# get_sicen2 get observation in bbox of df_buff who contains lst_term
lst_term = [
'Solidago gigantea','Reynoutria','Buddleja davidii',
'Impatiens glandulifera','Ambrosia artemisiifolia']
from shapely.geometry import box
from geopandas import read_postgis
from pycen.params import sicen_con as con
bbox = box(*df_buf.total_bounds)
geom_col = 'geom'
schema_sicen = 'saisie'
table_sicen = 'vm_synthese_observations'
sql = """SELECT id_obs, date_obs, regne, classe, ordre, nom_latin, nom_vern, geom FROM {sch}.{tab}
WHERE ST_Intersects ({geom_col}, 'SRID={epsg};{poly}')
AND nom_latin LIKE ANY (array{array});""".format(
sch=schema_sicen,
tab=table_sicen,
array = ['%%{}%%'.format(t) for t in lst_term],
geom_col=geom_col,
epsg=df_buf.crs.srs.split(':')[1],
poly=bbox
)
sicen = read_postgis(
sql = sql,
con = con)
data3 = sicen.unary_union
df_buf.loc[df_buf.ssind_vulnerab == 0, 'ssind_vulnerab'] = df_buf[df_buf.ssind_vulnerab == 0].intersects(data3).astype(int)
df = df.merge(df_buf[['id_site','ssind_vulnerab']], on=['id_site'],how='left')
return df
def press_dir(df):
'''
Calcul de l'indice de pression directe:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_pressDir' :
sum(
ssind_artifDir + ssind_urbaDir +
ssind_agriDir + ssind_pluU +
ssind_confli + ssind_prlvmteau +
ssind_icpe + ssind_ouvrag +
ssind_vulnerab
)
'''
df = pression.artif_directe(df)
df = pression.urbani_directe(df)
df = pression.pressAgri_directe(df)
df = pression.projet_plu_U(df)
df = pression.conflit_redi(df)
df = pression.icpe(df)
df = pression.prelev_eau(df)
df = pression.ouvrage(df)
df = pression.vulnerabilite(df)
df['ind_pressDir'] = df[
['ssind_artifDir','ssind_urbaDir','ssind_agriDir',
'ssind_pluU','ssind_confli','ssind_prlvmteau','ssind_icpe',
'ssind_ouvrag','ssind_vulnerab']].sum(axis=1)
df.name = 'Pression_direct'
return df
def press_ind(df):
'''
Calcul de l'indice de pression indirecte:
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_pressInd' :
sum(
ssind_artifIndir + ssind_urbaIndir +
ssind_agriIndir + ssind_pluAU +
ssind_scot
)
'''
df = pression.artif_indir(df)
df = pression.urbani_indir(df)
df = pression.pressAgri_indir(df)
df = pression.projet_plu_AU(df)
df = pression.projet_scot(df)
# df = pression.dir_exp(df)
df['ind_pressInd'] = df[
['ssind_artifIndir','ssind_urbaIndir','ssind_agriIndir',
'ssind_pluAU', 'ssind_scot']].sum(axis=1)
df.name = 'Pression_indir'
return df
def dir_exp(df):
'''
Ajout d'un champ dir_exp dans le tableau de sortie
qui sera à remplir manuellement par celui-ci :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ssind_direxp' : None
'''
df = df.copy()
data = gpd.pd.read_csv(P_expert + c_expert,sep=';')
data = data[['site_code','Note_DE_pression']]
data.columns = ['site_code','ssind_direxp']
data.ssind_direxp = data.ssind_direxp.astype(float)
df = df.merge(data,right_on='site_code',left_on='id_site')
del df['site_code']
# df['ssind_direxp'] = None
return df
def bilan(df):
'''
Rassemblement des sous-indices des criètes
« pression » dans un même tableau. L'indice pourra
être calculé lorque le champs dir_exp sera remplie :
Parameters
----------
df : GeoDataFrame. GeoDataFrame des zones humides
de l'étude.
Return
----------
'ind_pression' :
sum(
ind_pressDir + ind_pressInd
)
'''
df = pression.press_dir(df)
df = pression.press_ind(df)
df = pression.dir_exp(df)
# df = pression.indesirable(df)
# df['ssind_total'] = df[
# ['ssind_artifDir','ssind_artifIndir','ssind_urbaDir',
# 'ssind_urbaIndir','ssind_agriDir','ssind_agriIndir',
# 'ssind_pluU','ssind_confli','ssind_prlvmteau','ssind_icpe',
# 'ssind_ouvrag','ssind_vulnerab']
# ].sum(axis=1)
df['ind_pression'] = df[
['ind_pressDir','ind_pressInd','ssind_direxp']
].sum(axis=1)
df.name = 'Pression'
return df
def priorisation(data,titre,col1,col2):
data = data.copy()
data[titre] = None
data.loc[(data[col1]=='fort')&(data[col2]=='fort'),titre] = 'P1'
data.loc[(data[col1]=='fort')&(data[col2]=='moyen'),titre] = 'P2'
data.loc[(data[col1]=='moyen')&(data[col2]=='fort'),titre] = 'P2'
data.loc[(data[col1]=='moyen')&(data[col2]=='moyen'),titre] = 'P2'
data.loc[(data[col1]=='faible')&(data[col2]=='faible'),titre] = 'P4'
data.loc[data[titre].isna(),titre] = 'P3'
return data[titre]
if __name__ == '__main__':
from datetime import datetime as dt
from pandas import read_table
# Récupération de la liste des zones concernées
init = dt.now()
liste_zh = read_table(path0+'PGSZH_liste_ZH.csv',sep=';')
lst_idsite = liste_zh.site_code.tolist()
sit = zh.get_sitesGeom(id_site=lst_idsite, last_update=True)
# sit = zh.get_sitesGeom(last_update=True)
if not sit.is_valid.all():
sit.loc[~sit.is_valid, 'geom'] = sit.loc[~sit.is_valid].buffer(0)
df = sit[['id_site', 'geom']].copy()
# Définition des pressions et fonctions de l'étude
bilan = sit[['id_site']].copy()
df_bio = fct_bio_eco.bilan(df)
print((dt.now() - init).total_seconds())
df_hyd = fct_hyd.bilan(df)
print((dt.now() - init).total_seconds())
df_phy = fct_phy_bio.bilan(df)
print((dt.now() - init).total_seconds())
df_mlt = crit_multi_fct.bilan(df)
print((dt.now() - init).total_seconds())
df_pre = pression.bilan(df)
print((dt.now() - init).total_seconds())
lst_df = [df_bio,df_hyd,df_phy,df_mlt,df_pre]
for d in lst_df:
ind_col = d.columns[d.columns.str.startswith('ind')]
bilan = bilan.merge(d[['id_site', *ind_col]], on=['id_site'])
cols_ind = bilan.columns[bilan.columns.str.startswith('ind')]
ind_pres = df_pre.columns[df_pre.columns.str.startswith('ind')]
bilan['ind_fct'] = bilan[cols_ind.drop(ind_pres)].sum(axis=1)
bilan['indice'] = bilan[['ind_fct','ind_pression']].sum(axis=1)
bilan.name = 'Bilan'
print((dt.now() - init).total_seconds())
# jenks(data, col, labels)
# Normalisation des notes via la méthode de classification de jenks
enjeux = bilan[['id_site']].copy()
enjeux['clss_bioeco'] = jenks(bilan,'ind_bioeco',['faible','moyen','fort'])
enjeux['clss_hydro'] = jenks(bilan,'ind_hydro',['faible','moyen','fort'])
enjeux['clss_phybio'] = jenks(bilan,'ind_phybio',['faible','moyen','fort'])
enjeux['clss_multifct'] = jenks(bilan,'ind_multifct',['faible','moyen','fort'])
enjeux['clss_pressDir'] = jenks(bilan,'ind_pressDir',['faible','moyen','fort'])
enjeux['clss_pressInd'] = jenks(bilan,'ind_pressInd',['faible','moyen','fort'])
enjeux['clss_fct'] = jenks(bilan,'ind_fct',['faible','moyen','fort'])
enjeux['clss_pression'] = jenks(bilan,'ind_pression',['faible','moyen','fort'])
# Priorisation des enjeux
enjeux['enjeu_bioeco'] = priorisation(data=enjeux,titre='enjeu_bioeco',col1='clss_bioeco',col2='clss_pression')
enjeux['enjeu_hydro'] = priorisation(data=enjeux,titre='enjeu_hydro',col1='clss_hydro',col2='clss_pression')
enjeux['enjeu_phybio'] = priorisation(data=enjeux,titre='enjeu_phybio',col1='clss_phybio',col2='clss_pression')
enjeux['enjeu_bilan'] = priorisation(data=enjeux,titre='enjeu_bilan',col1='clss_fct',col2='clss_pression')
enjeux.name = 'Enjeux'
# Récupération des bornes de chaques classes
cols_enjeu = enjeux.columns
cols_enjeu = cols_enjeu[cols_enjeu.str.contains('enjeu')]
borne = gpd.pd.DataFrame(
columns=enjeux.columns.drop(['id_site',*cols_enjeu]), # ,'enjeu_bilan'
index=['faible','moyen','fort'])
for col in borne.columns:
sfx = col.split('_')[1]
tmp = gpd.pd.merge(
bilan[['id_site','ind_'+sfx]],
enjeux[['id_site','clss_'+sfx]],
on = 'id_site'
)
for idx in borne.index:
borne.loc[borne.index==idx,col] = str([
tmp[tmp[col]==idx]['ind_'+sfx].min(),
tmp[tmp[col]==idx]['ind_'+sfx].max()
])
borne.index.name = 'classe'
borne.reset_index(inplace=True, drop=False)
borne.name = 'Borne des classes enjeux'
# Constitution d'un dictionnaire de tableau
dict_dfs = {}
for ddf in [borne, enjeux, bilan, *lst_df]:
dict_dfs[ddf.name] = ddf
# Ecriture du multi-tableau des résultats
print((dt.now() - init).total_seconds())
pycen.write_bilan(dict_dfs, PATH_OUT)
# Jointure des géometries sur Bilan et Enjeux
bilan = bilan.merge(df,how='left',on='id_site')
bilan = bilan.set_geometry('geom')
bilan.rename(columns={'id_site':'site_code'}, inplace=True)
bilan.name = 'Bilan'
enjeux = enjeux.merge(df,how='left',on='id_site')
enjeux = enjeux.set_geometry('geom')
enjeux.rename(columns={'id_site':'site_code'}, inplace=True)
enjeux.name = 'Enjeux'
# Ecriture du géopackage
df_bio.to_file(PATH_OUT[:-4]+'gpkg', layer=df_bio.name,driver='GPKG')
df_hyd.to_file(PATH_OUT[:-4]+'gpkg', layer=df_hyd.name,driver='GPKG')
df_phy.to_file(PATH_OUT[:-4]+'gpkg', layer=df_phy.name,driver='GPKG')
df_mlt.to_file(PATH_OUT[:-4]+'gpkg', layer=df_mlt.name,driver='GPKG')
df_pre.to_file(PATH_OUT[:-4]+'gpkg', layer=df_pre.name,driver='GPKG')
enjeux.to_file(PATH_OUT[:-4]+'gpkg', layer=enjeux.name,driver='GPKG')
bilan.to_file( PATH_OUT[:-4]+'gpkg', layer=bilan.name, driver='GPKG')
from sys import exit
print('')
print((dt.now() - init).total_seconds())
exit('END PGZH')