#!/usr/bin/env python # -*- coding: UTF-8 -*- # from binascii import Incomplete from lib2to3.pgen2 import driver from warnings import filterwarnings import geopandas as gpd # from shapely import geometry from shapely.geometry import MultiPolygon, MultiLineString, MultiPoint, Polygon, LineString #, collection from shapely import ops import pycen filterwarnings("ignore",category=RuntimeWarning) zh = pycen.zh() # Chemin path0 = '/home/colas/Documents/9_PROJETS/3_PGZH/' PATH_OUT = path0 + 'RESULTATS/Résultats_etude_PGSZH_Belledonne.xlsx' Path0 = '/media/colas/SRV/FICHIERS/' Path_tmp = 'OUTILS/CARTOGRAPHIE/ESPACE DE TRAVAIL/ETUDES/PGSZH_Belledonne/Fonctions/' p_bio_eco = Path_tmp+'Biologique/' p_hydro = Path_tmp+'Hydrologique/' p_phybio = Path_tmp+'Physico-chimique/' p_mltifct = Path_tmp+'multi_fonctions/IGN - BD Alti 25M/' p_press = Path_tmp+'../Pressions/' P_expert = path0 + 'DIRE_EXPERT/' Path_alti = '/home/colas/Documents/8_SIG/MNT' p_mltifct = '/IGN - BD Alti 25M/' # Couche des fonctions biologiques et écologiques c_znieff = 'PGSZH_Bell_ZNIEFF-I.gpkg' # c_zico = 'PGZSH_zico.gpkg' c_redi = 'PGSZH_Bell_axe_faune_REDI.gpkg' # Couche des fonctions hydrauliques et hydrologiques c_alea_inond = 'alea_inondation/utsg_gpu.gpkg' c_ebf_crseau = ['Breda_EBF_Optimal.shp','Salin_EBF_Optimal_2021.shp','Sonnant_EBF_Optimal.shp'] c_connex_molasse = 'ZH_CONNECT_ESO.shp' c_idpr = 'BRGM_IDPR/IDPR_2154_CLIP.tif' c_idpr2 = 'BRGM_IDPR/IDPR_2154_CLIP.gpkg' c_piezo = 'Piezo_SAGE BDPV/carte_piézo_HE_2021.shp' c_piezo_interp = 'Piezo_SAGE BDPV/piezo_interpoler.tif' c_captage_ppi = [ 'captage PPI/38350_INFO_SURF_20200219.shp', 'captage PPI/38501_INFO_SURF_20200311.shp', 'captage PPI/38181_PRESCRIPTION_SURF_20180220.shp', 'captage PPI/38206_PRESCRIPTION_SURF_20200218.shp', 'captage PPI/38314_PRESCRIPTION_SURF_20220922.shp', 'captage PPI/38439_PRESCRIPTION_SURF_20180628.shp', 'captage PPI/38567_PRESCRIPTION_SURF_20191125.shp' ] # Couche des fonctions physiques et biochimiques c_artif = 'indicateurs rhomeo_pressions_vecteur/PRESSIONS_ARTIFICIALISATION_2020.shp' c_captage_aep = [ 'captage PPR/38303_INFO_SURF_20100308.shp', 'captage PPR/38350_INFO_SURF_20200219.shp', 'captage PPR/38501_INFO_SURF_20200311.shp', 'captage PPR/38181_PRESCRIPTION_SURF_20180220.shp', 'captage PPR/38206_PRESCRIPTION_SURF_20200218.shp', 'captage PPR/38314_PRESCRIPTION_SURF_20220922.shp', 'captage PPR/38439_PRESCRIPTION_SURF_20180628.shp', 'captage PPR/38567_PRESCRIPTION_SURF_20191125.shp' ] # c_smvic_PPR1 = 'Captage/smvic_PPR1_SMVI.shp' # c_smvic_PPR2 = 'Captage/smvic_PPR2_SMVI.shp' # c_smvic_PPi = 'Captage/smvic_PPi_SMVI.shp' # c_smvic_PPe = 'Captage/smvic_PPe_SMVI.shp' c_rpg = 'RPG/RPG_2017.shp' c_zse = 'ZSE.shp' c_zsea = 'ZSNEA.shp' c_occupsol = 'PGSZH_oscom.gpkg' # Couche des critères « multi-fonctions » c_alti = 'BDALTIV2_25M_FXX_' c_mnt = 'BDALTIV2_25M.tif' # Couche des pressions c_rhomeo = 'sig_indicateurs_2021_simby' c_artif = 'indicateurs rhomeo_pressions_vecteur/PRESSIONS_ARTIFICIALISATION_2020.shp' c_captag = 'PGSZH_Bell_OuvragePrel_sandre.gpkg' c_agric = 'indicateurs rhomeo_pressions_vecteur/PRESSIONS_AGRICOLES_2019.shp' c_urba_plu = 'PLU/COUCHES_AGREGEES/FILTREZONE_URBA_U&AU.gpkg' # c_urba_scot = 'SYMBHI/SCOT/PGSZH_ref_scot_esp_pot_dev.shp' c_iClass = 'PGSZH_Bell_ICPE.gpkg' # c_Purb = 'tache_urbaine.shp' c_lign_confliredi = 'PGSZH_Bell_conflits_ligne_REDI.gpkg' c_poin_confliredi = 'PGSZH_Bell_conflits_point_REDI.gpkg' c_ouvrag = 'PGSZH_Bell_ouvrages_symbhi.shp' c_barage = 'PGSZH_Bell_ROE.shp' # c_depot = 'PGSZH_Bell_ouvrages_symbhi.shp' c_invas = 'SYMBHI/TerrainInvasives2020.tab' c_fallo = 'SYMBHI/fallopia.TAB' c_cd38_eee = 'CD38/PGSZH_EEE_CDIsere.gpkg' # c_vulner = 'SAGE/VULNERABILITE.shp' # Couche des dire d'expert c_expert = 'PGSZH_dire_expert_compilation_17032022.csv' class check_dtypeInList(list): def __contains__(self, typ): return any(isinstance(val, typ) for val in self) # get_flux def open_gpkg(Path0, layer=None,bbox=None): ''' Ouverture des couches Shapefile et Geopackages et mise au formt du script: Parameters ---------- Path0 : str. Chemain/fichier. layer : str. Si Geopackage, nom de la couche dans le cas où il y en a plusieurs. ''' df = gpd.read_file(Path0,layer=layer, bbox=bbox) if df.geometry.name != 'geom': df.rename_geometry('geom',inplace=True) if 'Lambert' in df.crs.name \ and '93' in df.crs.name \ and df.crs.srs.upper() != 'EPSG:2154': print('Projection : %s - %s'%(df.crs.name,df.crs.srs[:20]) ) print('Modification de la projection ...') df.to_crs(epsg=2154,inplace=True) if df.crs.srs.upper() != 'EPSG:2154': print('Projection : %s - %s'%(df.crs.name,df.crs.srs[:20]) ) print('Modification de la projection ...') df.to_crs(epsg=2154,inplace=True) return df def to_geoms(geometries): for geometry in geometries: if isinstance(geometry, (Polygon,LineString)): yield geometry else: yield from geometry def _union_polygons_geometry(df): ''' Transforme un GeoDataFrame de Polygons et/ou MultiPolygons en un MultiPolygon unique: Parameters ---------- df : GeoDataFrame. ''' df = df.copy() name_geom = df.geometry.name # poly = df[df.geom_type=='Polygon'][name_geom] poly = df.loc[df.geom_type=='Polygon',name_geom] multipoly = df.loc[df.geom_type=='MultiPolygon',name_geom] poly = [*poly] multipoly = [*multipoly] if poly: mp2 = MultiPolygon(poly) if poly and multipoly: res = MultiPolygon(to_geoms([*mp2, *multipoly])) elif not poly and multipoly: res = MultiPolygon(to_geoms(multipoly)) elif not multipoly and poly: res = MultiPolygon(poly) return res def _union_lines_geometry(df): name_geom = df.geometry.name line = df.loc[df.geom_type=='LineString',name_geom].tolist() multiline = df.loc[df.geom_type=='MultiLineString',name_geom].tolist() if line: mp2 = MultiLineString(line) if line and multiline: res = MultiLineString(to_geoms([*mp2, *multiline])) elif not line and multiline: res = MultiLineString(to_geoms([*multiline])) elif not multiline and line: res = MultiLineString(line) return res def _calc_recouvrmt(df1,df2): ''' Calcule le recouvrement de df2 sur df1 pour chaque géométrie de df1: Parameters ---------- df1 : GeoDataFrame. df2 : GeoDataFrame. ''' tmp = gpd.sjoin( df1, df2[['geom']], predicate = 'intersects', how = 'left') tmp.dropna(subset=['index_right'],inplace=True) tmp.index_right = tmp.index_right.astype(int) tmp.reset_index(inplace=True) tmp = tmp.join( df2[['geom']].rename(columns={'geom': 'right_geom'}), on=['index_right'], how='left') tmp2 = tmp[['index_right','right_geom']].copy() \ .rename(columns={'right_geom': 'geom'}) \ .set_geometry('geom') tmp1 = tmp[['id_site','geom']].copy() \ .set_geometry('geom') if not tmp1.geom.values.is_valid.all(): tmp1.loc[~tmp1.geom.values.is_valid,'geom'] = tmp1.loc[~tmp1.geom.values.is_valid,'geom'].buffer(0) if not tmp2.geom.values.is_valid.all(): tmp2.loc[~tmp2.geom.values.is_valid,'geom'] = tmp2.loc[~tmp2.geom.values.is_valid,'geom'].buffer(0) tmp['perc_rcvmt'] = (tmp1.intersection(tmp2).area/tmp1.area)*100 tmp = tmp.groupby(['id_site']).sum().reset_index() df1 = df1.merge(tmp[['id_site','perc_rcvmt']], on=['id_site'], how='left') df1.perc_rcvmt.fillna(0, inplace=True) df1.perc_rcvmt = df1.perc_rcvmt.round(2) return df1 def jenks(data,col,labels): import jenkspy data = data.copy() c = col tmp = data[c].unique() tmp = gpd.pd.DataFrame({'val':tmp}) ddf = gpd.pd.DataFrame() # Si str in labels labs_copy = None if str in check_dtypeInList(labels): labs_copy = labels.copy() labels = range(len(labels)) labels = list(labels) tmp['jenks'] = gpd.pd.cut(tmp['val'], bins=jenkspy.jenks_breaks(tmp['val'], n_classes=len(labels)), # bins=list(set(jenkspy.jenks_breaks(tmp['val'], n_classes=len(labels)))), labels=labels, include_lowest=False) ddf[c] = data[c].copy() ddf[c] = ddf[c].replace([*tmp.val],[*tmp.jenks]) ddf[c+'1'] = data[c] # ddf[c+'1'] = ddf[c+'1'].replace([*tmp.val],[*tmp.jenks]) # ddf[c] = ddf[c].astype(float) if labs_copy: ddf[c] = ddf[c].replace([*labels],[*labs_copy]) # ddf[c+'1'] = ddf[c+'1'].replace([*labels],[*labs_copy]) return ddf[c] def get_rhomeo_indicateur(table,cols): return gpd.pd.read_sql_table(table,pycen.con,'zh_analyse',columns=cols) def intersects_rpg(df,code_cultu:list=None): # RPG from pycen.ref import territoire as ter # rpg = open_gpkg(Path0+p_phybio+c_rpg, bbox=df) lst_code = ['BOP','SPH','SPL'] if code_cultu: code_cultu = {'code_cultu':code_cultu} elif lst_code: code_cultu = {'code_cultu':lst_code} rpg = ter.rpg2021_dep_parc(args=code_cultu,bbox=df.unary_union) q = df.intersects(rpg.unary_union) df['rpg'] = q.astype(int) return df class fct_bio_eco: ''' Calcule l'indice et les sous-indices des fonctions biologiques et écologiques des zones humides: Liste des fonctions ---------- znieff_1 : Calcul la présence/absence des zones humides sur des ZNIEFF1 par intersection. redi : Calcul la capacité de déplacement de la faune du réseau écologique départemental de l'Isère (REDI) par intersection. Utilisation d'une couche SIG REDI (PolyLignes) avec un buffer de 100m. fct_bio : Dans le cas où un zone humide n'intersecte pas une ZNIEFF1, attribution d'un poids à chaque zone humide pour ses fonctions Biologiques et Ecologiques. Calcul dépendant du nombre de fonctions recensées en BDD. fct_hab : Attribution d'un poids à chaque zone humide en fonction des types d'habitat présents sur site. fct_intpatri : Attribution d'un poids à chaque zone humide en fonction des espèces protégées présentes sur site. bilan : Somme des sous-indices des fonctions biologiques et écologiques des zones humides. ''' def znieff_1(df): ''' Calcul la présence/absence des zones humides sur des ZNIEFF 1 par intersection: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_znieff1' : Présence = 2 / Absence = 0 ''' print('INIT : Localisation de la zone en territoire ZNIEFF 1 ...') df = df.copy() data = open_gpkg(Path0+p_bio_eco+c_znieff,bbox=df) geom = _union_polygons_geometry(data) df['ssind_znieff1'] = df.geom.intersects(geom) \ .astype(int) \ .replace(1,2) return df def redi(df,buffer:int=100): ''' Calcul la capacité de déplacement de la faune dans le réseau écologique départemental de l'Isère (REDI) par intersection. Utilisation d'une couche SIG REDI (PolyLignes) avec un buffer de 100m: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_redi' : Présence = 2 / Absence = 0 ''' print('INIT : Axe de déplacement de la faune du REDI ...') df = df.copy() data = open_gpkg(Path0+p_bio_eco+c_redi,bbox=df) data.geometry = data.geometry.map(ops.linemerge) geom = _union_lines_geometry(data).buffer(buffer) df['ssind_redi'] = df.geom.intersects(geom) \ .astype(int) \ .replace(1,2) return df def fct_bio(df): ''' Dans le cas où un zone humide n'intersecte pas une ZNIEFF 1 (ssind_znieff1 = 0), attribution d'un poids à chaque zone humide pour ses fonctions Biologiques et Ecologiques. Calcul dépendant du nombre de fonctions recensées en BDD: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_znieff1' : Présence = 2 / Absence = 0 'ssind_fctbio' : si (ssind_znieff1 = 2) = 0 / 1 fonction = 1 / multiple fonctions = 2 / ''' print('INIT : biologiques et écologiques ...') df = df.copy() data = zh.get_fct(id_site=df.id_site.tolist()) data = data[(data.type == 'fct_bio') & (data.nom_fct != 'non documenté')] df['ssind_fctbio'] = df.apply( lambda x: data[data.id_site == x['id_site']].shape[0], axis=1) if 'ssind_znieff1' not in df.columns: df = fct_bio_eco.znieff_1(df) df.loc[df.ssind_znieff1==2, 'ssind_fctbio'] = 0 # df.loc[(df.ssind_znieff1==0) & (df.ssind_fctbio==1), 'ssind_fctbio'] = 1 df.loc[(df.ssind_znieff1==0) & (df.ssind_fctbio > 1), 'ssind_fctbio'] = 2 return df def fct_hab(df): ''' Attribution d'un poids à chaque zone humide en fonction des types d'habitat et des espèces protégées présents sur site: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_hab' : Habitat 'prioritaire|très rare' = 2 ''' print('INIT : Habitats (prioritaire|très rare) ...') df = df.copy() data = zh.get_fct(id_site=df.id_site.tolist()) data = data[(data.type == 'int_patri') & (data.nom_fct != 'non documenté')] # 2 pt si habitat prioritaire lst_termep = 'prioritaire|communautaire|DH|très rare' lst_termeC = 'communautaire' lst_termeP = 'prioritaire|très rare' datap = data[data.nom_fct == 'habitats'] lst_siteh = datap.loc[datap.description.str.contains(lst_termep,na=False), 'id_site'] lst_sitehC = datap.loc[datap.description.str.contains(lst_termeC,na=False), 'id_site'] lst_sitehP = datap.loc[datap.description.str.contains(lst_termeP,na=False), 'id_site'] df['ssind_hab'] = 0 df.loc[df.id_site.isin(lst_sitehP),'ssind_hab'] = 2 return df def readGN_espPatri(df:gpd.GeoDataFrame=None): from pycen import con_gn sql = ''' with r1 as ( SELECT id_synthese, date_debut, cd_ref, ST_GeomFromText(geometrie_wkt_4326,4326) geom, json_build_object(cd_type_statut,json_agg(code_statut)) statut_code, array_agg(distinct cd_type_statut)::text[] type_statut FROM gn_synthese.v_synthese_for_export JOIN taxonomie.bdc_statut USING (cd_ref) WHERE (regroupement_type = 'Liste rouge' AND code_statut in ('VU','CR','EN')) OR (regroupement_type = 'Protection' AND (cd_type_statut in ('PN','PNA') OR (cd_type_statut = 'PD' AND lb_adm_tr = 'Isère') OR (cd_type_statut = 'PR' AND lb_adm_tr IN ('Auvergne-Rhône-Alpes','Rhône-Alpes')) ) ) OR regroupement_type = 'Directives européennes' GROUP BY 1,2,3,4,"cd_type_statut") SELECT id_synthese, date_debut, cd_ref, geom, json_agg(type_statut) type_statut, json_agg(statut_code) statut_code FROM r1 ''' if not df.empty : sql += "WHERE ST_INTERSECTS(geom,'SRID={epsg};{poly}')".format( epsg=4326, poly = df.to_crs(4326).unary_union ) sql += 'GROUP BY 1,2,3,4' return gpd.read_postgis(sql,con_gn) def esp_patrim(df): gn_sp = fct_bio_eco.readGN_espPatri(df) if not gn_sp.empty: gn_sp.to_crs(2154,inplace=True) gn_sp['type_statut'] = [[item for sublist in TUTU for item in sublist] for TUTU in gn_sp.type_statut] gn_sp['typ_count'] = gn_sp.type_statut.str.len() spa_join = (gpd.sjoin( df.set_index('id_site')[['geom']], gn_sp, predicate='intersects', how = 'left' )) df_test = (spa_join .groupby("id_site")['type_statut'] .apply(lambda x: [*x]) .reset_index() ) df_test['patri'] = [ set([item for sublist in TUTU for item in sublist]) if gpd.pd.notna(TUTU).all() else None for TUTU in df_test['type_statut'] ] df_test['count_typ'] = df_test.patri.str.len() df.merge(df_test[['id_site','count_typ']],on='id_site',how='left') return df def fct_intpatri(df): ''' Attribution d'un poids à chaque zone humide en fonction des espèces protégées présentes sur site: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_esppatri' : Faune/Flore nb(1 à 2) 'DH|PR|DO|LR|PN|LRR|LRN|LRM|LRF|LRD|LRE' = 0.5 Faune/Flore nb(< 3) 'DH|PR|DO|LR|PN|LRR|LRN|LRM|LRF|LRD|LRE' = 1 ''' print('INIT : Faune - Flore (PN – PR – P38) ...') df = df.copy() data = zh.get_fct(id_site=df.id_site.tolist()) data = data[(data.type == 'int_patri') & (data.nom_fct != 'non documenté')] # 1 pt si liste terme lst_terme = 'DH|PR|DO|LR|PN|LRR|LRN|LRM|LRF|LRD|LRE' datat = data[data.nom_fct != 'habitats'].copy() datat.quantite = datat.quantite.astype(float) lst_sitet = datat.loc[datat.description.str.contains(lst_terme,na=False), 'id_site'] lst_site1 = datat.loc[(datat.id_site.isin(lst_sitet))&(datat.quantite < 3), 'id_site'] lst_site2 = datat.loc[(datat.id_site.isin(lst_sitet))&(datat.quantite >= 3),'id_site'] lst_site3 = datat.loc[ (datat.id_site.isin(lst_sitet)) & (datat.quantite == 0) & (datat.description.str.contains('nombreuses|plusieurs')),'id_site'] df['ssind_esppatri'] = 0 df.loc[df.id_site.isin(lst_site1),'ssind_esppatri'] = 0.5 df.loc[df.id_site.isin(lst_site2),'ssind_esppatri'] = 1 df.loc[df.id_site.isin(lst_site3),'ssind_esppatri'] = 1 return df def bilan(df): ''' Somme des sous-indices des fonctions biologiques et écologiques des zones humides: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_bioeco' : sum( ssind_znieff1 + ssind_redi + ssind_fctbio + ssind_hab + ssind_esppatri ) ''' df = fct_bio_eco.znieff_1(df) df = fct_bio_eco.redi(df) df = fct_bio_eco.fct_bio(df) df = fct_bio_eco.fct_hab(df) df = fct_bio_eco.fct_intpatri(df) ssind = df.columns[df.columns.str.contains('ssind_')] df['ind_bioeco'] = df[ ['ssind_znieff1','ssind_redi','ssind_fctbio','ssind_hab','ssind_esppatri'] # ssind ].sum(axis=1) print(df['ind_bioeco'].dtype) df.name = 'Fct_bio_eco' return df class fct_hyd: ''' Calcule l'indice et les sous-indices des fonctions hydrauliques et hydrologiques des zones humides: Liste des fonctions ---------- zone_inond : Calcul la présence/absence des zones humides sur des zones inondables par intersection. eabf : Calcul de l'espace alluvial de bon fonctionnement (EABF) ou de fond de vallée par intersection. dist_reso_hydro : Si la zone humide ne possède pas d'espace alluviale de bon fonctionnement d'après la fonction "eabf()", calcul la distance au réseau hydrographique linéaire (le plus proche). Attribution d'un poids en fonction de la distance. Si la zone ne possède pas d'eabf et ne semble pas à proximité d'un réseau hydrique, recherche de la présence d'un cours d'eau dans la base de données zones humes. reghydro_out: Pour chaque zone humide, en cas de distance au réseau hydrographique linéaire > 50 et d'absence d'espace alluviale de bon fonctionnement, recherche dans la base de données des zones humides si une sortie d'eau "Cours d'eau"est définie. Attribution d'un poids en fonction. connex_molasse : Attribution d'un poids à chaque zone humide en fonction de sa connexion avérée à la molasse ou non. idpr : Calcul de l'Indice de Développement et de Persistance des Réseaux. Calcul réalisé dans le cas où connex_molasse = 0 . fct_hydro : Attribution d'un poids à chaque zone humide en fonction du nombre de rôles hydro-biologiques à caractères hydrauliques et hydrologiques qu'elle remplie. zse_zsnea : Attribution d'un poids à chaque zone humide en fonction de sont appartenance à une zone de sauvegarde exploitée actuelle (zse) ou future (zsnea). bilan : Somme des sous-indices des fonctions hydrauliques et hydrologiques des zones humides. ''' def zone_inond(df): ''' Calcul la présence/absence des zones humides sur des zones inondables par intersection: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_zoneinond' : None ''' print('INIT : Zone inondable ...') df = df.copy() data = open_gpkg( Path0+p_hydro+c_alea_inond,bbox=df, layer='prescription_surf') data = data[data['PGSZH_alea_inondation']==1] tmp = gpd.sjoin(df,data[['geom']],predicate='intersects', how='left') lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist() df['ssind_zoneinond'] = None df.loc[df.id_site.isin(lst),'ssind_zoneinond'] = 1 return df def eabf(df): ''' Si la zone humide n'est pas en zone inondable d'après la fonction "zone_inond()", calcul de l'espace alluvial de bon fonctionnement (EABF) ou de fond de vallée par intersection: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_eabf' : if ZH.intersects(EABF): 'ssind_eabf' = 2 else : 'ssind_eabf' = 0 ''' print('INIT : Espace alluvial de bon fonctionnement (EABF) ou fond de vallée ...') df = df.copy() if isinstance(c_ebf_crseau,str): data = open_gpkg(Path0+p_hydro+c_ebf_crseau,bbox=df) else : data = gpd.GeoDataFrame() for c in c_ebf_crseau: data = gpd.pd.concat([ data, open_gpkg(Path0+p_hydro+c,bbox=df)[['geom']] ], ignore_index=True) tmp = gpd.sjoin(df,data[['geom']],predicate='intersects', how='left') lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist() df['ssind_eabf'] = 0 df.loc[df.id_site.isin(lst),'ssind_eabf'] = 2 return df def dist_reso_hydro(df): ''' Si la zone humide ne possède pas d'espace alluviale de bon fonctionnement d'après la fonction "eabf()", calcul de la distance au réseau hydrographique linéaire (le plus proche).Attribution d'un poids en fonction de la distance. Si la zone ne possède pas d'eabf et ne semble pas à proximité d'un réseau hydrique, recherche de la présence d'un cours d'eau dans la base de données zones humes: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_distHydro' : Si > 50m = 0 Si ]10 m – 50 m] = 1 Si <= 10m = 2 ''' from pycen import wfs print('INIT : Distance au réseau hydrographique linéaire (le plus proche) ...') df = df.copy() # data = pycen.ref_hydro().get_troncon() list_layer = wfs.list_layer('https://datacarto.datara.gouv.fr/wfs','1.1.0') layer = 'l_inventaire_cours_eau_l_v3_038' if layer in list_layer: data = wfs.get_wfs( url='https://datacarto.datara.gouv.fr/wfs', layer=layer, version='1.1.0', bbox=df.unary_union) else : sql = "SELECT gid,geom FROM ref_hydro.l_inventaire_cours_eau_l_v3_038 WHERE ST_INTERSECTS(geom,'SRID={epsg};{poly}')".format( epsg = df.crs.srs.split(':')[1], poly = df.unary_union ) data = gpd.read_postgis(sql,con=pycen.con) # elif 'cours_d_eau' in list_layer: # print('\n\tUTILISATION DE LA COUCHE IN BDD : l_inventaire_cours_eau_l_v3_038') # layer = 'cours_d_eau' # else : # raise("Couche des cours d'eau non disponible ... ") if 'ssind_eabf' not in df.columns: df = fct_hyd.eabf(df) # if 'MultiLineString' in data.geom_type: # data.loc[data.geom_type=='MultiLineString'] = data.loc[data.geom_type=='MultiLineString'].geometry.map(ops.linemerge) # if True in data.has_z.unique(): # import shapely.wkb # data.loc[data.has_z,'geom'] # data.geom = [shapely.wkb.loads(shapely.wkb.dumps(g, output_dimension=2)) for g in data.geom] # df10 = df[['id_site','geom']].copy() # df50 = df[['id_site','geom']].copy() # df10.geom = df10.buffer(10) # df50.geom = df50.buffer(50) isin10 = df.buffer(10).intersects(data) isin50 = df.buffer(50).intersects(data) noteabf = df.ssind_eabf == 0 # df10 = gpd.sjoin(df10,data[['geom']],predicate='intersects', how='left') # df50 = gpd.sjoin(df50,data[['geom']],predicate='intersects', how='left') # lst10 = df10.loc[~df10.index_right.isna(),'id_site'].tolist() # lst50 = df50.loc[~df50.index_right.isna(),'id_site'].tolist() df['ssind_distHydro'] = 0 df.loc[noteabf & isin50,'ssind_distHydro'] = 1 df.loc[noteabf & isin10,'ssind_distHydro'] = 2 # df.loc[df.dist_min <= 10, 'ssind_distHydro'] = 2 # Si 0, check entree/sortie regime hydro. # Si cours d'eau ou eaux de crues ==> 1 # union = data.geometry.unary_union # df['buff10'] = df.buffer(10).intersects(union).astype(int) # df['buff50'] = df.buffer(50).intersects(union).astype(int) # df['ssind_distHydro'] = None # df.loc[df.buff50 == 0, 'ssind_distHydro'] = 0 # df.loc[df.buff50 == 1, 'ssind_distHydro'] = 1 # df.loc[df.buff10 == 1, 'ssind_distHydro'] = 2 # df.drop(columns=['buff10', 'buff50'], inplace=True) return df def reghydro_out(df): ''' Pour chaque zone humide, en cas de distance au réseau hydrographique linéaire > 50 et d'absence d'espace alluviale de bon fonctionnement, recherche dans la base de données des zones humides si une sortie d'eau "Cours d'eau"est définie. Attribution d'un poids en fonction: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_hydrout' : Si ssind_distHydro = 0 & ssind_eabf = 0 & regime_hydri_out = "Cours d'eau" : 'ssind_distHydro' = 2 ''' df = df.copy() if 'ssind_distHydro' not in df.columns: df = fct_hyd.dist_reso_hydro(df) df['ssind_hydrout'] = 0 if not df.loc[df.ssind_distHydro == 0].empty : lst_zh = df.loc[df.ssind_distHydro == 0].id_site.tolist() tmp = zh.get_regHydro(id_site=lst_zh) # tmp = tmp.loc[tmp.regime_hydri.isin(["Cours d'eau", "Eaux de crues"])] tmp = tmp.loc[(tmp.in_out=='sortie')&(tmp.regime_hydri=="Cours d'eau")] # in_out ??????????? # permanance ??????? lsttmp = tmp.id_site df.loc[(df.ssind_eabf==0)&(df.id_site.isin(lsttmp)),'ssind_hydrout'] = 2 return df def connex_molasse(df): ''' Attribution d'un poids à chaque zone humide en fonction de sa connexion avérée à la molasse ou non : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_molasse' : (intersects == False) = 0 (intersects == True) = 1 ''' print('INIT : Connexion à la molasse ...') df = df.copy() data = open_gpkg(Path0+p_hydro+c_connex_molasse,bbox=df) tmp = gpd.sjoin(df,data[['geom']],predicate='intersects', how='left') tmp = tmp[~tmp.index_right.isna()] df['ssind_molasse'] = 0 df.loc[df.id_site.isin(tmp.id_site),'ssind_molasse'] = 1 return df def piezo(df): import rasterio as rio from os import system print('INIT : Cross piézométrie ...') df = df.copy() # Polygoniser (raster vers vecteur) # """gdal_polygonize.py # "/home/colas/Documents/9_PROJETS/3_PGZH/SIG/multi_fonctions/IGN - BD Alti 25M/BDALTIV2_25M.tif" # -b 1 -f "GPKG" # /tmp/processing_sSlfcG/96533f9ad23e4c10992caa807da01bf1/OUTPUT.gpkg # OUTPUT alti # """ mnt_in = '' piezo_in = 'my_interpolate_piezo.tif' piezo_out = 'piezo.tif' gpkg_out = 'out.gpkg' mnt_out = 'mnt.tif' piezoVSmnt = 'piezoVSmnt_out.tif' poly_connect_nape = 'poly_connect_nape.gpkg' # Découper un raster selon une couche de masque # Découpage du MNT par les polygones d'étude op = ''' gdalwarp -overwrite -s_srs EPSG:2154 -t_srs EPSG:2154 -co FORMAT=GPKG -of GTiff -tr 25.0 -25.0 -tap -cutline \ "PG:dbname='azalee' host=91.134.194.221 port=5432 sslmode=disable user='cgeier' password='adm1n*bdCen'" \ -csql "SELECT site_code, geom FROM zones_humides.v_zoneshumides WHERE site_code in ('{liste_site}')" \ "{mnt}" {out} '''.format(vector = Path_tmp+gpkg_out, mnt=Path0+p_mltifct+c_mnt, out=Path_tmp+mnt_out, liste_site="','".join(df.id_site.tolist())) system(op) mnt = rio.open(Path_tmp+mnt_out) xmin, ymin, xmax, ymax = mnt.bounds # Découpage du PIEZO rasterizé interpolé par les polygones d'étude op = ''' gdalwarp -overwrite -s_srs EPSG:2154 -t_srs EPSG:2154 -co FORMAT=GPKG -of GTiff -tr 25.0 -25.0 \ -te {xmin} {ymin} {xmax} {ymax} \ -tap -cutline "PG:dbname='azalee' host=91.134.194.221 port=5432 sslmode=disable user='cgeier' password='adm1n*bdCen'" \ -csql "SELECT site_code, geom FROM zones_humides.v_zoneshumides WHERE site_code in ('{liste_site}')" \ "{mnt}" {out} '''.format( xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, mnt=Path_tmp+piezo_in, out=Path_tmp+piezo_out, liste_site="','".join(df.id_site.tolist()) ) system(op) # Soustraction de la valeur des mailles op = ''' gdal_calc.py --overwrite -A {mnt} -B {piezo} --outfile={out} --calc="(A-B)<=0" '''.format(mnt=Path_tmp+mnt_out, piezo=Path_tmp+piezo_out, out=Path_tmp+piezoVSmnt) system(op) # polygonisation du raster op = ''' gdal_polygonize.py {mnt} -b 1 -f "GPKG" {out} '''.format(mnt=Path_tmp+piezoVSmnt, out=Path_tmp+poly_connect_nape) system(op) # data = rio.open(Path_tmp+piezoVSmnt) data = open_gpkg(Path_tmp+poly_connect_nape) data.rename(columns={'DN':'connect_nappe'}, inplace=True) data = data[data.connect_nappe > 0].copy() # IDEM : # gpd.sjoin(df,data).sort_values('id_site').id_site.unique() == \ # df[df.intersects(data.unary_union)].sort_values('id_site').id_site.tolist() tmp = gpd.sjoin(df,data,how='left') del tmp['index_right'] tmp.drop_duplicates(inplace=True) df = tmp.copy() df.to_file(Path_tmp+'zh_connect_nappe.gpkg',driver='GPKG') # Import des courbe des niveau rasteriser par interpolation # by QGIS : Outils de traitements > Interpolation > Interpolation TIN # piezo = rio.open(Path0+p_hydro+c_piezo_interp) # from geocube.api.core import make_geocube # piézo = open_gpkg(Path0+p_hydro+c_piezo) # piézo = piézo[~piézo.geom.isna()] # piézo.rename_geometry('geometry', inplace=True) # out_grid = make_geocube( # vector_data=piézo, # measurements=["id"], # resolution=(-25, 25) # ) # out_grid["id"].rio.to_raster(Path_tmp+"my_rasterized_column.tif") # import xarray # xds = xarray.open_dataarray(Path_tmp+"my_rasterized_column.tif") # filled = xds.rio.interpolate_na(method='linear') # filled.rio.to_raster(Path_tmp+"my_interpolate_raster.tif") return df def idpr(df): ''' Calcul réalisé dans le cas où connex_molasse = 0. Calcul de l'Indice de Développement et de Persistance des Réseaux : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_idpr' : if [ %_recouvrement(idpr < 1000) > 25% ] : ssind_idpr = 1 else : ssind_idpr = 0 ''' print('INIT : IDPR ...') df = df.copy() if 'ssind_molasse' not in df.columns: df = fct_hyd.connex_molasse(df) import rasterio from rasterio.features import shapes mask = None with rasterio.Env(): with rasterio.open(Path0+p_hydro+c_idpr2) as src: image = src.read(1) # first band image[(image < 1000) & (image > -1)] = 1 image[image >= 1000] = 0 data = gpd.GeoDataFrame.from_features( features = [{'properties': {'raster_val': v}, 'geometry': s} for i, (s, v) in enumerate( shapes(image, mask=mask, transform=src.transform)) if v >= 0], crs = 'EPSG:2154') data.rename_geometry('geom', inplace=True) lst_data = [] if not df[df.ssind_molasse == 0].empty: perc = _calc_recouvrmt( df[df.ssind_molasse == 0], data[data.raster_val == 1] ) lst_data = perc.loc[perc.perc_rcvmt.round(2) > 25,'id_site'] df['ssind_idpr'] = 0 df.loc[df.id_site.isin(lst_data), 'ssind_idpr'] = 1 return df def fct_hydro(df): ''' Attribution d'un poids à chaque zone humide en fonction du nombre de rôles hydro-biologiques à caractères hydrauliques et hydrologiques qu'elle remplie : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_fcthydro' : 0 fonction = 0 1 fonction = 0.5 2 fonctions = 1 3 fonctions = 2 ''' print('INIT : Fonctions hydro-biologiques à caractères hydrauliques et hydrologiques ...') df = df.copy() data = zh.get_fct(id_site=df.id_site.tolist()) data = data[(data.type == 'fct_hydro') & (data.nom_fct != 'non documenté')] lst_terme = ["soutien naturel d'étiage",'ralentissement du ruissellement','expansion naturelle des crues'] d = data.loc[data.nom_fct.isin(lst_terme),['id_site','nom_fct']] d['nb_fct'] = 1 d = d.groupby(['id_site']).sum().reset_index() df['ssind_fcthydro'] = 0 lst_data1 = d.loc[d.nb_fct == 1, 'id_site'] lst_data2 = d.loc[d.nb_fct == 2, 'id_site'] lst_dataSup = d.loc[d.nb_fct > 2,'id_site'] df.loc[df.id_site.isin(lst_data1), 'ssind_fcthydro'] = 0.5 df.loc[df.id_site.isin(lst_data2), 'ssind_fcthydro'] = 1 df.loc[df.id_site.isin(lst_dataSup), 'ssind_fcthydro'] = 2 return df def zse_zsnea(df): ''' Attribution d'un poids à chaque zone humide en fonction de sont appartenance à une zone de sauvegarde exploitée actuelle (zse) ou future (zsnea) : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_fcthydro' : if ZH.intersects(ZSNEA|ZSE): 'ssind_fcthydro' = 1 else : 'ssind_fcthydro' = 0 ''' print('INIT : Zones de sauvegardes actuelles et futures (ZSE / ZSNEA) ...') df = df.copy() data1 = open_gpkg(Path0+p_phybio+c_zse,bbox=df) data2 = open_gpkg(Path0+p_phybio+c_zsea,bbox=df) data1 = _union_polygons_geometry(data1) data2 = _union_polygons_geometry(data2) if not data1.is_valid: data1 = data1.buffer(0) if not data2.is_valid: data2 = data2.buffer(0) df['zse'] = df.intersects(data1).astype(int) #.replace(1,2) df['zsnea'] = df.intersects(data2).astype(int) df['ssind_zse_zsnea'] = df[['zse', 'zsnea']].max(axis=1) df.drop(columns=['zse','zsnea'], inplace=True) return df def captage_ppi(df): ''' Attribution d'un poids à chaque zone humide en fonction de sa présence ou non au sein d'un périmêtre de prélèvement d'eau potable : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_capt_ppi' : if ZH.intersects(Captage_ppi): 'ssind_capt_ppi' = 1 else : 'ssind_capt_ppi' = 0 ''' if isinstance(c_captage_ppi,str): data = open_gpkg(Path0+p_hydro+c_captage_ppi,bbox=df) else : data = gpd.GeoDataFrame() for c in c_captage_ppi: data = gpd.pd.concat([ data, open_gpkg(Path0+p_hydro+c,bbox=df)[['geom']] ],ignore_index=True) df['ssind_capt_ppi'] = (df.intersects(data.unary_union) .astype(int)) return df def bilan(df): ''' Somme des sous-indices des fonctions hydrauliques et hydrologiques des zones humides : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_hydro' : sum( ssind_eabf + ssind_distHydro + ssind_hydrout ssind_fcthydro + ssind_pente + ssind_capt_ppi ) ''' # df = fct_hyd.zone_inond(df) df = fct_hyd.eabf(df) df = fct_hyd.dist_reso_hydro(df) df = fct_hyd.reghydro_out(df) # df = fct_hyd.connex_molasse(df) # df = fct_hyd.idpr(df) df = fct_hyd.fct_hydro(df) df = crit_multi_fct.pente2(df) df = fct_hyd.captage_ppi(df) # df = fct_hyd.zse_zsnea(df) df['ind_hydro'] = df[ [#'ssind_zoneinond', 'ssind_eabf','ssind_distHydro', 'ssind_hydrout', #'ssind_molasse','ssind_idpr', 'ssind_fcthydro', #'ssind_zse_zsnea' 'ssind_pente','ssind_capt_ppi' ] ].sum(axis=1) df.name = 'Fct_hyd' return df class fct_phy_bio: ''' Calcule l'indice et les sous-indices des fonctions physiques et biochimiques des zones humides : Liste des fonctions ---------- captage_aep : Identification de la présence/absence de zones de captages AEP à proximité des zones humides par intersection. fct_hydrobio : Attribution d'un poids à chaque zone humide en fonction du nombre de rôles hydro-biologiques à caractères physiques et biochimiques qu'elle remplie. occup_sol : Pour chaque zone humide, identification de la nature d'occupation du sol et de sa surface de recouvrement. Déduction de la surface d'espace naturel concernée par les zonnages. Attribution d'un poids en fonction de la surface de recouverte. bilan : Somme des sous-indices des fonctions physiques et biochimiques des zones humides. ''' def captage_aep(df): ''' Identification de la présence/absence de zones de captages à proximité des zones humides par intersection. Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_perimcaptage' : if ZH.intersects(captage): 'ssind_perimcaptage' = 1 else : 'ssind_perimcaptage' = 0 ''' print('INIT : Périmètre de protection des captages AEP ...') from pandas import concat df = df.copy() if isinstance(c_captage_aep,str): data = open_gpkg(Path0+p_phybio+c_captage_aep,bbox=df) else : data = gpd.GeoDataFrame() for c in c_captage_aep: data = gpd.pd.concat([ data, open_gpkg(Path0+p_phybio+c,bbox=df)[['geom']] ],ignore_index=True) tmp = gpd.sjoin( df, data.loc[ # ~data.N_INS___NO.str.contains('ABA|HS'), :, ['geom']], predicate = 'intersects', how = 'left') lst_site = tmp[~tmp.index_right.isna()].id_site df['ssind_perimcaptage'] = 0 df.loc[df.id_site.isin(lst_site),'ssind_perimcaptage'] = 1 return df # def zse_zsnea(df): # print('INIT : Zones de sauvegardes actuelles et futures (ZSE / ZSNEA) ...') # df = df.copy() # data1 = open_gpkg(Path0+p_phybio+c_zse,bbox=df) # data2 = open_gpkg(Path0+p_phybio+c_zsea,bbox=df) # data1 = _union_polygons_geometry(data1) # data2 = _union_polygons_geometry(data2) # if not data1.is_valid: # data1 = data1.buffer(0) # if not data2.is_valid: # data2 = data2.buffer(0) # df['zse'] = df.intersects(data1).astype(int).replace(1,2) # df['zsnea'] = df.intersects(data2).astype(int) # df['ssind_zse_zsnea'] = df[['zse', 'zsnea']].max(axis=1) # df.drop(columns=['zse','zsnea'], inplace=True) # return df def fct_hydrobio(df): ''' Attribution d'un poids à chaque zone humide en fonction du nombre de rôles hydro-biologiques à caractères physiques et biochimiques qu'elle remplie : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_fcthydrobio' : 0 fonction = 0 1 fonction = 1 + fonctions = 2 ''' print('INIT : Fonctions hydro-biologiques à caractères physiques et biochimiques ...') df = df.copy() data = zh.get_fct(id_site=df.id_site.tolist()) data = data[(data.type == 'fct_hydro') & (data.nom_fct != 'non documenté')] lst_terme = ["fonctions d'épuration","rôle naturel de protection contre l'érosion"] d = data.loc[data.nom_fct.isin(lst_terme),['id_site','nom_fct']] d['nb_fct'] = 1 d = d.groupby(['id_site']).sum().reset_index() df['ssind_fcthydrobio'] = 0 lst_data1 = d.loc[d.nb_fct == 1, 'id_site'] lst_dataSup = d.loc[d.nb_fct > 1,'id_site'] df.loc[df.id_site.isin(lst_data1), 'ssind_fcthydrobio'] = 1 df.loc[df.id_site.isin(lst_dataSup), 'ssind_fcthydrobio'] = 2 return df def occup_sol(df): ''' Pour chaque zone humide, identification de la nature d'occupation du sol et de sa surface de recouvrement. Déduction de la surface d'espace naturel concernée par les zonnages. Type d'intérêt : 'Forêts' / 'Milieux à végétation arbustive et/ou herbacée' / 'Prairies' Attribution d'un poids en fonction de la surface de recouverte : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_occupsol' : if surf_recouverte < 25% : ssind = 0 elif surf_recouverte in [ 25% ; 50% [ : ssind = 0.5 elif surf_recouverte in [ 50% ; 75% [ : ssind = 1.5 elif surf_recouverte in [ 75% ; 100% ] : ssind = 2 ''' init = dt.now() print('INIT : Occupation du sol ...') df = df.copy() ddf = df.copy() ddf['area_init'] = ddf.area # data = open_gpkg(Path0+p_phybio+c_occupsol, bbox=df) print('IMPORT DATA ...') artif = open_gpkg(Path0+p_press+c_artif, bbox=df) artif = artif[['ID','geom']] artif1 = artif.iloc[0,:] artif1 = (gpd.GeoDataFrame(artif1).T .set_geometry('geom') .set_crs(crs=artif.crs.srs)) # artif1 = gpd.GeoDataFrame(artif.iloc[0].copy(),geometry=artif.iloc[0].geom,crs=artif.crs.srs) artif1 = (gpd.overlay(artif1,ddf, how='intersection') .rename_geometry('geom')) artif2 = (gpd.GeoDataFrame(artif.iloc[1:,:]) .set_geometry('geom') .set_crs(crs=artif.crs.srs)) # RPG from pycen.ref import territoire as ter # rpg = open_gpkg(Path0+p_phybio+c_rpg, bbox=df) rpg = ter.rpg2021_dep_parc(bbox=df.unary_union) lst_code = ['BOP','SPH','SPL','PPH','PRL','J6P','J6S','BTA','ROS','SBO'] rpg = rpg[~rpg.code_cultu.isin(lst_code)] print((dt.now() - init).total_seconds()) print('CORRECTION GEOMETRY ...') if not artif1.geom.is_valid.all() : artif1.loc[~artif1.geom.is_valid,'geom'] = artif1.loc[~artif1.geom.is_valid,'geom'].buffer(0) if not artif2.geom.is_valid.all() : artif2.loc[~artif2.geom.is_valid,'geom'] = artif2.loc[~artif2.geom.is_valid,'geom'].buffer(0) if not rpg.geom.is_valid.all() : rpg.loc[~rpg.geom.is_valid,'geom'] = rpg.loc[~rpg.geom.is_valid,'geom'].buffer(0) print((dt.now() - init).total_seconds()) print('DATA READY ...') print('INIT OVERLAY ...') ddf = gpd.overlay(ddf,artif1, how='difference') if 'GeometryCollection' in ddf.geom_type.unique(): ddf.geom = ddf.geom.buffer(0) ddf = gpd.overlay(ddf,rpg, how='difference') if 'GeometryCollection' in ddf.geom_type.unique(): ddf.geom = ddf.geom.buffer(0) ddf = gpd.overlay(ddf,artif2, how='difference') if 'GeometryCollection' in ddf.geom_type.unique(): ddf.geom = ddf.geom.buffer(0) print('END OVERLAY ...') print((dt.now() - init).total_seconds()) ddf['area_end'] = ddf.area ddf['perc_rcvmt'] = 100 - (100*ddf.area_end/ddf.area_init) df = df.merge(ddf[['id_site','perc_rcvmt']], on='id_site', how='left') df.perc_rcvmt.fillna(0,inplace=True) df['perc_surfNat'] = 100 - df['perc_rcvmt'] # lst_terme = ['Forêts','Milieux à végétation arbustive et/ou herbacée','Prairies'] # d = data.loc[data.libelle_02.isin(lst_terme),] # print( # ('INIT : Calcul du recouvrement de l\'occupation des sols sur les zones humides :'), # ('"Forêts","Milieux à végétation arbustive et/ou herbacée","Prairies"')) # print(('ATTENTION : Les géometries de l\'occupation des sols étant complexes,'), # ('le calcul peut prendre un certain temps ...')) # df = _calc_recouvrmt(df,d) # print('END : Calcul du recouvrement de l\'occupation des sols sur les zones humides') if 'ssind_fcthydrobio' not in df.columns: df = fct_phy_bio.fct_hydrobio(df) df['ssind_occupsol'] = 0 df.loc[(df.ssind_fcthydrobio == 0) & (df.perc_surfNat.between(25,50,inclusive=True)),'ssind_occupsol'] = 0.5 df.loc[(df.ssind_fcthydrobio == 0) & (df.perc_surfNat.between(50,75,inclusive=True)),'ssind_occupsol'] = 1.5 df.loc[(df.ssind_fcthydrobio == 0) & (df.perc_surfNat >= 75),'ssind_occupsol'] = 2 df.drop(columns=['perc_rcvmt','perc_surfNat'], inplace=True) print('END ssind_occupsol ...') return df def bilan(df): ''' Somme des sous-indices des fonctions physiques et biochimiques des zones humides : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_phybio' : sum( ssind_perimcaptage + ssind_fcthydrobio + ssind_occupsol + ssind_pente ) ''' df = fct_phy_bio.captage_aep(df) # df = fct_phy_bio.zse_zsnea(df) df = fct_phy_bio.fct_hydrobio(df) df = fct_phy_bio.occup_sol(df) df = crit_multi_fct.pente2(df) df['ind_phybio'] = df[ ['ssind_perimcaptage', # 'ssind_zse_zsnea', 'ssind_fcthydrobio','ssind_occupsol', 'ssind_pente' ] ].sum(axis=1) df.name = 'Fct_phy_bio' return df class crit_multi_fct: ''' Calcule l'indice et les sous-indices des criètes « multi-fonctions » des zones humides : Liste des fonctions ---------- surface : Calcul de la surface totale des zones humides. Attribution d'un poid en fonction du résultat. pente : Calcul de la pente moyenne des zones humides via le MNT. Attribution d'un poid en fonction du résultat. dir_exp : Ajout d'un champ dir_exp dans le tableau de sortie qui sera à remplir manuellement par celui-ci. bilan : Rassemblement des sous-indices des criètes « multi-fonctions » dans un même tableau. L'indice pourra être calculé lorque le champs dir_exp sera remplie. ''' def surface(df): ''' Calcul de la surface totale des zones humides. Attribution d'un poid en fonction du résultat : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_surf' : if ZH < 1ha : ssind = 0 elif ZH in [ 1ha ; 20ha [ : ssind = 0.5 elif ZH in [ 20ha ; 100ha [ : ssind = 1 elif ZH >= 100ha : ssind = 1.5 ''' print('INIT : Calcul de la surface ...') df = df.copy() df['ssind_surf'] = 0 df.loc[(df.area/10000).between(1,20,inclusive=True),'ssind_surf'] = 0.5 df.loc[(df.area/10000).between(20,100,inclusive=True),'ssind_surf'] = 1 df.loc[df.area/10000 >= 100,'ssind_surf'] = 1.5 return df def pente(df,seuil:int=5): ''' Calcul de la pente moyenne des zones humides via le MNT. Attribution d'un poid en fonction du résultat : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_surf' : if ZH > 5% : ssind = 0 else: ssind = 1 ''' print('INIT : Calcul de la pente moyenne ...') dfP = df.copy() from os import listdir, chdir, system from pathlib import Path as Pathlib from zipfile import ZipFile from rasterstats import zonal_stats home = str(Pathlib.home()) chdir(Path_alti+p_mltifct) Dir = listdir() Dir = [x for x in Dir if '.zip' in x] slope = 'temp_slop.tif' for i, d in enumerate(Dir): zip = ZipFile(d).namelist() z = [z for z in zip if 'MNT' in z][0] system("gdaldem slope '/vsizip/{zip}/{mnt}' '{slope}' -of GTiff -b 1 -s 1.0 -p".format(zip=d,mnt=z,slope=slope)) stats = zonal_stats(dfP.geom,slope) stats = gpd.pd.DataFrame(stats) if i == 0 : dfP[stats.columns] = stats else: tmp = gpd.pd.DataFrame({'dfP':dfP['mean']*dfP['count'], 'stats':stats['mean']*stats['count']}) dfP['mean'] = tmp.sum(axis=1)/(dfP['count']+stats['count']) tmp = gpd.pd.DataFrame({'dfP':dfP['count'], 'stats':stats['count']}) dfP['count'] = tmp.sum(axis=1) tmp = gpd.pd.DataFrame({'dfP':dfP['min'], 'stats':stats['min']}) dfP['min'] = tmp.min(axis=1) tmp = gpd.pd.DataFrame({'dfP':dfP['max'], 'stats':stats['max']}) dfP['max'] = tmp.max(axis=1) system('rm {slope}'.format(slope=slope)) chdir(home) dfP['ssind_pente'] = 0 dfP.loc[dfP['mean'] < seuil, 'ssind_pente'] = 1 df = df.merge(dfP[['id_site','ssind_pente']], on=['id_site'], how='left') # dst = [z for z in zip if 'DST' in z][0] # src = [z for z in zip if 'SRC' in z][0] # dst = rasterio.open('zip:{0}/{1}'.format(d,dst)) # src = rasterio.open('zip:{0}/{1}'.format(d,src)) # mnt = rasterio.open( # 'zip:{0}/{1}'.format(d,z),'w+', # width=dst.width, # height=dst.height, # count=1, # crs=dst.crs.data, # transform=dst.transform, # dtype=dst.dtypes[0]) # stats = rasterstats.zonal_stats(df.geom,'zip:{0}/{1}'.format(d,z)) # stats = pd.DataFrame(stats) # if i == 0 : # df[stats.columns] = stats # else: # tmp = pd.DataFrame({'df':df['mean']*df['count'], 'stats':stats['mean']*stats['count']}) # df['mean'] = tmp.sum(axis=1)/(df['count']+stats['count']) # tmp = pd.DataFrame({'df':df['count'], 'stats':stats['count']}) # df['count'] = tmp.sum(axis=1) # tmp = pd.DataFrame({'df':df['min'], 'stats':stats['min']}) # df['min'] = tmp.min(axis=1) # tmp = pd.DataFrame({'df':df['max'], 'stats':stats['max']}) # df['max'] = tmp.max(axis=1) # https://stackoverflow.com/questions/8844781/get-file-list-of-files-contained-in-a-zip-file # res.append(rasterio.open( # 'zip:{0}/{1}'.format(d,z))) # continue # df['ssind_pente'] = 0 return df def pente2(df,seuil:int=5): sql = """ SELECT site_code, geom, (slope).count slope_count, (slope).sum slope_sum, (slope).mean slope_mean, (slope).stddev slope_stddev, (slope).min slope_min, (slope).max slope_max FROM ( SELECT v.site_code, v.geom, ST_SummaryStats(ST_Slope(ST_Union(mm.rast),1,'32BF','DEGREES')) slope --,ST_SummaryStats(ST_Union(mm.rast)) sum2 --,(ST_Intersection(mm.rast,1,v.geom)) rast_geom FROM ref_territoire.mnt_5m mm CROSS JOIN (SELECT site_code, geom FROM zones_humides.v_zoneshumides vz WHERE site_code in {list_zh} ) v WHERE ST_Intersects(mm.rast,1,v.geom) GROUP BY 1,2 ) t; """.format(list_zh=tuple(df.id_site)) data = (gpd.read_postgis(sql,pycen.con). rename(columns={'site_code':'id_site'})) data['ssind_pente'] = 0 data.loc[data['slope_mean'] < seuil, 'ssind_pente'] = 1 return df.merge(data[['id_site','ssind_pente']], on=['id_site'], how='left') def dir_exp(df): ''' Ajout d'un champ dir_exp dans le tableau de sortie qui sera à remplir manuellement par celui-ci : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_direxp' : None ''' df = df.copy() data = gpd.pd.read_csv(P_expert + c_expert,sep=';') data = data[['site_code','Note_DE_fonction']] data.columns = ['site_code','ssind_direxp'] data.ssind_direxp = data.ssind_direxp.astype(float) df = df.merge(data,right_on='site_code',left_on='id_site') del df['site_code'] # df['ssind_direxp'] = None return df def bilan(df): ''' Rassemblement des sous-indices des criètes « multi-fonctions » dans un même tableau. L'indice pourra être calculé lorque le champs dir_exp sera remplie : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_multifct' : sum( ssind_surf + ssind_pente + 'ssind_direxp' ) ''' # df = crit_multi_fct.surface(df) df = crit_multi_fct.pente2(df) # df['ssind_total'] = df[ # ['ssind_surf','ssind_pente'] # ].sum(axis=1) df = crit_multi_fct.dir_exp(df) df['ind_multifct'] = df[ [#'ssind_surf', 'ssind_pente','ssind_direxp'] ].sum(axis=1) df.name = 'Crit_multi_fct' return df class pression: ''' Calcule l'indice et les sous-indices des pressions exercées sur les zones humides : Liste des fonctions ---------- artificialisation : Récupération des résultats des pressions directes d'artificialisation Rhoméo I12. Application de la discrimination de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1]. artif_indir : Récupération des résultats des pressions indirectes d'artificialisation Rhoméo I12. Application de la discrimination de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1]. urbanisation : Récupération des résultats des pressions directes d'urbanisation Rhoméo I12. Application de la discrimination de Jenks pour catégoriser les résultats en 4 classes [0, 0.5, 1, 1.5]. urbani_indir : Récupération des résultats des pressions indirectes d'urbanisation Rhoméo I12. Application de la discrimination de Jenks pour catégoriser les résultats en 4 classes [0, 0.5, 1, 1.5]. pressAgricole : Récupération des résultats des pressions directes agricoles Rhoméo I13. Application de la discrimination de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1]. pressAgri_indir : Récupération des résultats des pressions indirectes agricoles Rhoméo I13. Application de la discrimination de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1]. projet_plu_U : Intersections des zones relevant du projet d'Urbanisme (PLU) avec les polygones de l'étude. Considération du champs Typezone == 'U'. Attribution des points en cas d'intersections. projet_scot : En cas d'absence de PLU, recherche d'espaces de développements potentiels au alentours des sites (SCOT). Attribution des points en cas d'intersections. conflit_redi : Intersections des zones de conflits redi (Points, Lignes) avec les polygones de l'étude. Utilistaion d'un buffer de 100m. prelev_eau : Identification da la proximité des zones humides avec des sources de captages. Application d'un buffer de 50m. Identification par intersection. icpe : Identification da la proximité des zones humides avec des installations classés. Application d'un buffer de 500m. Identification par intersection. ouvrage : Identification da la présence d'ouvrages et de dépôts au sein des zones humides. Identification par intersection. vulnerabilite : Identification da la proximité des zones humides avec des espèces exotiques envahissantes. Application d'un buffer de 100m. Identification par intersection. press_urba : Calcul de l'indice de pression directe (artificialisation, urbanisation, pressAgricole, projet_plu_U, conflit_redi, icpe, prelev_eau, ouvrage, vulnerabilite). press_agri : Calcul de l'indice de pression indirecte (artif_indir; urbani_indir, pressAgri_indir, projet_plu_AU, projet_scot). bilan : Rassemblement des sous-indices des pressions directes et indirectes dans un même tableau. ''' # indesirable : Identification da la présence d'espèces indésirables # au sein des zones humides. Identification par intersection # à partir d'une couche fournie par le commanditaire def artificialisation(df,pres_type:str='both'): ''' Récupération des résultats des pressions directes d'artificialisation Rhoméo I12. Application de la discrimination de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1]: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_artif' = 0|0.5|1 ''' ind_col = ['site_code'] if pres_type == 'direct': ind_col += ['presdirect_artif'] elif pres_type == 'indirect': ind_col += ['presindir_artif'] elif pres_type == 'both': ind_col += ['presdirect_artif','presindir_artif'] else: raise('Argument `pres_type` invalid !') # from datetime import datetime as dt print('INIT : Artificialisation ...') df = df.copy() # data = gpd.pd.read_csv(path0 + c_rhomeo) # data = open_gpkg(Path0+p_press+c_artif, bbox=df) # data.set_index('site_code', inplace=True) # data.presdirect_artif = data.presdirect_artif.round() data = (get_rhomeo_indicateur(c_rhomeo,ind_col) .set_index('site_code') .round() .sum(axis=1) .to_frame('pres_artif')) tmp = jenks( data=data[['pres_artif']], col='pres_artif', labels=[0, 0.5, 1]) df = df.merge(tmp,how='left',left_on='id_site',right_index=True) df.rename(columns={'pres_artif':'ssind_artif'}, inplace=True) return df def urbanisation(df,pres_type:str='both'): ''' Récupération des résultats des pressions directes d'urbanisation Rhoméo I12. Application de la discrimination de Jenks pour catégoriser les résultats en 4 classes [0, 0.5, 1, 1.5]: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. pres_type : str. Type de pression calculer `direct`, `indirect` ou `both`. Default : `both` Return ---------- 'ssind_urban' = 0|0.5|1|2 ''' ind_col = ['site_code'] if pres_type == 'direct': ind_col += ['presdirect_urba'] elif pres_type == 'indirect': ind_col += ['presindir_urba'] elif pres_type == 'both': ind_col += ['presdirect_urba','presindir_urba'] else: raise('Argument `pres_type` invalid !') print('INIT : Urbanisation indirecte ...') df = df.copy() # data = gpd.pd.read_csv(path0 + c_rhomeo) data = (get_rhomeo_indicateur(c_rhomeo,ind_col) .set_index('site_code') .round() .sum(axis=1) .to_frame('pres_urba')) # data.presdirect_urba = data.presdirect_urba.round() tmp = jenks( data=data[['pres_urba']], col='pres_urba', labels=[0, 0.5, 1, 2]) df = df.merge(tmp,how='left',left_on='id_site',right_index=True) df.rename(columns={'pres_urba':'ssind_urban'}, inplace=True) # df['ssind_urbani'] = None return df def pressAgricole(df,pres_type:str='both'): ''' Récupération des résultats des pressions directes agricoles Rhoméo I13. Application de la discrimination de Jenks pour catégoriser les résultats en 3 classes [0, 0.5, 1]: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_agrico' = 0|0.5|1 ''' ind_col = ['site_code'] if pres_type == 'direct': ind_col += ['presdirect_agri'] elif pres_type == 'indirect': ind_col += ['presindir_agri'] elif pres_type == 'both': ind_col += ['presdirect_agri','presindir_agri'] else: raise('Argument `pres_type` invalid !') print('INIT : Pressions agricoles directes...') df = df.copy() # data = open_gpkg(Path0+p_press+c_agric, bbox=df) # tmp = _calc_recouvrmt(df,data) # tmp['ssind_agri'] = 0 # tmp.loc[tmp.perc_rcvmt > 5,'ssind_agri'] = 0.5 # tmp.loc[tmp.perc_rcvmt > 10,'ssind_agri'] = 1 # df = df.merge(tmp[['id_site','ssind_agri']], on=['id_site'],how='left') # data = gpd.pd.read_csv(path0 + c_rhomeo) # data.set_index('site_code', inplace=True) # data.presdirect_agri = data.presdirect_agri.round() data = (get_rhomeo_indicateur(c_rhomeo,ind_col) .set_index('site_code') .round() .sum(axis=1) .to_frame('pres_agri')) tmp = jenks( data=data[['pres_agri']], col='pres_agri', labels=[0, 0.5, 1]) df = df.merge(tmp,how='left',left_on='id_site',right_index=True) df.rename(columns={'pres_agri':'ssind_agrico'}, inplace=True) return df def projet_plu(df): ''' Intersections des zones relevant du projet d'Urbanisme (PLU) avec les polygones de l'étude. Considération du champs Typezone == 'U'. Attribution des points en cas d'intersections: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_plu' = 0|1 ''' Typezone = ['U','AU'] df = df.copy() if isinstance(c_urba_plu,str): data = (open_gpkg(Path0+p_press+c_urba_plu,bbox=df) .rename(columns={'typezone': 'Typezone'})) else : data = gpd.GeoDataFrame() for c in c_urba_plu: data = gpd.pd.concat([ data, (open_gpkg(Path0+p_press+c,bbox=df)[['geom']] .rename(columns={'typezone': 'Typezone'})) ],ignore_index=True) data.columns = data.columns.str.lower() d = data.typezone.str.contains('|'.join(Typezone)) tmp = gpd.sjoin( df, data.loc[d,['geom']], predicate='intersects', how='left') lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist() df['ssind_plu'] = 0 df.loc[df.id_site.isin(lst),'ssind_plu'] = 1 return df def projet_scot(df): ''' En cas d'absence de PLU, recherche d'espaces de développements potentiels au alentours des sites (SCOT). Attribution des points en cas d'intersections: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_scot' = 0|1 ''' df = df.copy() # Ouverture des couches PLU d = open_gpkg(Path0+p_press+c_urba_plu, bbox=df) d.rename(columns={'typezone': 'Typezone'}, inplace=True) dataPLU = d[['Typezone','geom']] tmpPLU = gpd.sjoin(df,dataPLU[['geom']],predicate='intersects', how='left') lstPLU = tmpPLU.loc[~tmpPLU.index_right.isna(),'id_site'].tolist() # if 'ssind_pluAU' not in df.columns: # df = pression.projet_plu_AU(df) # Si pas de PLU concerner par les sites, intersection des SCOT dataSCOT = open_gpkg(Path0+p_press+c_urba_scot, bbox=df) tmp = gpd.sjoin(df,dataSCOT[['geom']],predicate='intersects', how='left') lstSCOT = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist() df['ssind_scot'] = 0 df.loc[(~df.id_site.isin(lstPLU))&(df.id_site.isin(lstSCOT)),'ssind_scot'] = 1 return df def alpage(df): ''' Intersections des zones d'alpage au RPG: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_alpage' = 0 if ZH.intersects(RPG) : 'ssind_alpage' = 2 ''' lst_code = ['BOP','SPH','SPL'] return (intersects_rpg(df,lst_code) .rename(columns={'rpg':'ssind_alpage'})) def conflit_redi(df): ''' Intersections des zones de conflits redi (Points, Lignes) avec les polygones de l'étude. Utilistaion d'un buffer de 100m: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_confli' = 0 if conflit : 'ssind_confli' = 0.5 ''' df = df.copy() lign = open_gpkg(Path0+p_press+c_lign_confliredi, bbox=df) poin = open_gpkg(Path0+p_press+c_poin_confliredi, bbox=df) lign.columns = lign.columns.str.lower() poin.columns = poin.columns.str.lower() lign.geom = lign.geom.buffer(50) poin.geom = poin.geom.buffer(50) data = gpd.pd.concat([ lign[['id','geom']], poin[['id','geom']] ]) data = gpd.GeoDataFrame(data,geometry='geom',crs=lign.crs.srs) geom = _union_polygons_geometry(data) if not geom.is_valid: geom = geom.buffer(0) df['ssind_confli'] = df.intersects(geom).astype(int).replace(1,0.5) return df def prelev_eau(df): ''' Identification da la proximité des zones humides avec des sources de captages. Application d'un buffer de 50m. Identification par intersection : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_prlvmteau' : ZH.geom = ZH.buffer(50) if ZH.intersects(prelev_eau): ssind = 1 else : ssind = 0 ''' print("INIT : Prélevement d'eau ...") df_buf = df.copy() df_buf.geom = df.buffer(50) data = open_gpkg(Path0+p_press+c_captag,bbox=df) if data.geometry.name != 'geom': data.rename_geometry('geom', inplace=True) data.reset_index(drop=True, inplace=True) mp = MultiPoint(data.geom) df_buf['ssind_prlvmteau'] = df_buf.intersects(mp).astype(int) df = df.merge(df_buf[['id_site','ssind_prlvmteau']], on=['id_site']) return df def icpe(df): ''' Identification da la proximité des zones humides avec des installations classés. Application d'un buffer de 500m. Identification par intersection : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_icpe' : ZH.geom = ZH.buffer(500) if ZH.intersects(install_classee): ssind = 0.5 else : ssind = 0 ''' print('INIT : ICPE ...') tmp = df.copy() tmp.geom = tmp.buffer(500) data = open_gpkg(Path0+p_press+c_iClass) data = MultiPoint(data.geom) tmp['ssind_icpe'] = tmp.intersects(data).astype(int).replace(1,0.5) df = df.merge(tmp[['id_site','ssind_icpe']], on=['id_site'],how='left') return df def barrage(df): ''' Identification da la présence de Barrage, seuil (ROE) au sein des zones humides. Identification par intersection : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_ouvrag' : if ZH.intersects(ouvrages): ssind = 0.5 else : ssind = 0 ''' print('INIT : Ouvrage ...') df = df.copy() data = open_gpkg(Path0+p_press+c_barage, bbox=df) data = data.unary_union # data = MultiPoint(data.geom) df['ssind_barage'] = df.intersects(data).astype(int).replace(1,0.5) return df def ouvrage(df): ''' Identification da la présence d'ouvrages et de dépôts au sein des zones humides. Identification par intersection : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_ouvrag' : if ZH.intersects(ouvrages): ssind = 0.5 else : ssind = 0 ''' print('INIT : Ouvrage ...') df = df.copy() data = open_gpkg(Path0+p_press+c_ouvrag, bbox=df) data = data.unary_union # data = MultiPoint(data.geom) df['ssind_ouvrag'] = df.intersects(data).astype(int).replace(1,0.5) return df def vulnerabilite(df): ''' Identification da la proximité des zones humides avec des espèces exotiques envahissantes. Application d'un buffer de 100m. Identification par intersection : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_vulnerab' : ZH.geom = ZH.buffer(100) if ZH.intersects(esp_exo_envahi): ssind = 1 else : ssind = 0 ''' print('INIT : Vulnérabilité ...') df_buf = df.copy() df_buf.geom = df_buf.buffer(100) lst_term = ['Buddleia','Renouee','Solidage'] data1 = open_gpkg(Path0+p_press+c_invas, bbox=df_buf) data1 = data1[data1.Espece.isin(lst_term)] data2 = open_gpkg(Path0+p_press+c_fallo, bbox=df_buf) data3 = open_gpkg(Path0+p_press+c_cd38_eee, bbox=df_buf, layer='Renouée') data4 = open_gpkg(Path0+p_press+c_cd38_eee, bbox=df_buf, layer='Ambroisie') data = gpd.pd.concat([data1[['geom']],data2[['geom']],data3[['geom']],data4[['geom']]],ignore_index=True) data.reset_index(inplace=True) tmp = gpd.sjoin(df,data[['geom']],predicate='intersects', how='left') lst = tmp.loc[~tmp.index_right.isna(),'id_site'].tolist() df_buf['ssind_vulnerab'] = 0 df_buf.loc[df_buf.id_site.isin(lst),'ssind_vulnerab'] = 1 # get_sicen2 get observation in bbox of df_buff who contains lst_term lst_term = [ 'Solidago gigantea','Reynoutria','Buddleja davidii', 'Impatiens glandulifera','Ambrosia artemisiifolia'] from shapely.geometry import box from geopandas import read_postgis from pycen.params import sicen_con as con bbox = box(*df_buf.total_bounds) geom_col = 'geom' schema_sicen = 'saisie' table_sicen = 'vm_synthese_observations' sql = """SELECT id_obs, date_obs, regne, classe, ordre, nom_latin, nom_vern, geom FROM {sch}.{tab} WHERE ST_Intersects ({geom_col}, 'SRID={epsg};{poly}') AND nom_latin LIKE ANY (array{array});""".format( sch=schema_sicen, tab=table_sicen, array = ['%%{}%%'.format(t) for t in lst_term], geom_col=geom_col, epsg=df_buf.crs.srs.split(':')[1], poly=bbox ) sicen = read_postgis( sql = sql, con = con) data3 = sicen.unary_union df_buf.loc[df_buf.ssind_vulnerab == 0, 'ssind_vulnerab'] = df_buf[df_buf.ssind_vulnerab == 0].intersects(data3).astype(int) df = df.merge(df_buf[['id_site','ssind_vulnerab']], on=['id_site'],how='left') return df def press_urba(df): ''' Calcul de l'indice de pression directe: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_pressURBAN' : sum( ssind_artif + ssind_urban + ssind_agrico + ssind_plu + ssind_confli + ssind_prlvmteau + ssind_icpe + ssind_ouvrag + ssind_vulnerab ) ''' df = pression.artificialisation(df,pres_type='both') df = pression.urbanisation(df,pres_type='both') df = pression.projet_plu(df) # df = pression.conflit_redi(df) # df = pression.icpe(df) # df = pression.prelev_eau(df) # df = pression.ouvrage(df) # df = pression.vulnerabilite(df) df['ind_pressURBAN'] = df[ ['ssind_artif','ssind_urban', 'ssind_plu']].sum(axis=1) df.name = 'Pression_urbanisation' return df def press_agri(df): ''' Calcul de l'indice de pression indirecte: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_pressAGRI' : sum( ssind_agrico + ssind_alpage ) ''' df = pression.pressAgricole(df,pres_type='both') df = pression.alpage(df) # df = pression.dir_exp(df) df['ind_pressAGRI'] = df[ ['ssind_agrico','ssind_alpage' '']].sum(axis=1) df.name = 'Pression_agricole' return df def press_autre(df): ''' Calcul de l'indice d'autres pressions: Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_pressAGRI' : sum( ssind_ouvrag + ssind_barage + ssind_confli + ssind_prlvmteau + ssind_scot + ssind_icpe + ssind_direxp ) ''' df = pression.conflit_redi(df) df = pression.barrage(df) df = pression.ouvrage(df) df = pression.prelev_eau(df) df = pression.icpe(df) try: df = pression.dir_exp(df) except : print('No dir_exp file') calc_ind = [] if 'ssind_confli' in df.columns: calc_ind += ['ssind_confli'] if 'ssind_barage' in df.columns: calc_ind += ['ssind_barage'] if 'ssind_ouvrag' in df.columns: calc_ind += ['ssind_ouvrag'] if 'ssind_prlvmteau' in df.columns: calc_ind += ['ssind_prlvmteau'] if 'ssind_icpe' in df.columns: calc_ind += ['ssind_icpe'] if 'ssind_direxp' in df.columns: calc_ind += ['ssind_direxp'] # df = pression.dir_exp(df) df['ind_pressAutr'] = df[ calc_ind].sum(axis=1) df.name = 'Pression_autre' return df def dir_exp(df): ''' Ajout d'un champ dir_exp dans le tableau de sortie qui sera à remplir manuellement par celui-ci : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ssind_direxp' : None ''' df = df.copy() data = gpd.pd.read_csv(P_expert + c_expert,sep=';') data = data[['site_code','Note_DE_pression']] data.columns = ['site_code','ssind_direxp'] data.ssind_direxp = data.ssind_direxp.astype(float) df = df.merge(data,right_on='site_code',left_on='id_site') del df['site_code'] # df['ssind_direxp'] = None return df def bilan(df): ''' Rassemblement des sous-indices des criètes « pression » dans un même tableau. L'indice pourra être calculé lorque le champs dir_exp sera remplie : Parameters ---------- df : GeoDataFrame. GeoDataFrame des zones humides de l'étude. Return ---------- 'ind_pression' : sum( ind_pressURBAN + ind_pressAGRI ) ''' df = pression.press_urba(df) df = pression.press_agri(df) df = pression.press_autre(df) # df['ssind_total'] = df[ # ['ssind_artif','ssind_urban','ssind_plu' # 'ssind_agrico','ssind_alpage', # 'ssind_confli','ssind_barage','ssind_ouvrag', # 'ssind_prlvmteau','ssind_icpe''ssind_direxp'] # ].sum(axis=1) df['ind_pression'] = df[ ['ind_pressURBAN','ind_pressAGRI','ind_pressAutr'] ].sum(axis=1) df.name = 'Pression' return df def priorisation(data,titre,fct,pss): data = data.copy() data[titre] = None # data.loc[(data[fct]=='fort')&(data[pss]=='fort'),titre] = 'P1' # data.loc[(data[fct]=='fort')&(data[pss]=='moyen'),titre] = 'P2' # data.loc[(data[fct]=='moyen')&(data[pss]=='fort'),titre] = 'P2' # data.loc[(data[fct]=='moyen')&(data[pss]=='moyen'),titre] = 'P2' # data.loc[(data[fct]=='faible')&(data[pss]=='faible'),titre] = 'P4' data.loc[(data[fct]=='faible')&(data[pss]=='fort'),titre] = 'P1' data.loc[(data[fct]=='moyen')&(data[pss]=='fort'),titre] = 'P1' data.loc[(data[fct]=='fort')&(data[pss]=='fort'),titre] = 'P3' data.loc[(data[fct]=='faible')&(data[pss]=='moyen'),titre] = 'P2' data.loc[(data[fct]=='moyen')&(data[pss]=='moyen'),titre] = 'P2' data.loc[(data[fct]=='fort')&(data[pss]=='moyen'),titre] = 'P4' data.loc[(data[fct]=='moyen')&(data[pss]=='faible'),titre] = 'P4' data.loc[data[titre].isna(),titre] = 'P5' return data[titre] if __name__ == '__main__': from datetime import datetime as dt from pandas import read_table from pycen import zh from os import listdir zh = zh() # Récupération de la liste des zones concernées init = dt.now() zone = gpd.read_file(Path0+Path_tmp+"../Zone d'étude/Perimetre etude/PGSZH_bell_zonage.shp") v_zh = zh.v_zoneshumides() is_etude = v_zh.intersects(zone.unary_union) lst_idsite = v_zh[is_etude].site_code.tolist() # lst_idsite = zone.site_code.tolist() sit = zh.get_sitesGeom(id_site=lst_idsite, last_update=True) # sit = zh.get_sitesGeom(last_update=True) if not sit.is_valid.all(): sit.loc[~sit.is_valid, 'geom'] = sit.loc[~sit.is_valid].buffer(0) df = sit[['id_site', 'geom']].copy() # Définition des pressions et fonctions de l'étude df_bio = fct_bio_eco.bilan(df) print((dt.now() - init).total_seconds()) df_hyd = fct_hyd.bilan(df) print((dt.now() - init).total_seconds()) df_phy = fct_phy_bio.bilan(df) print((dt.now() - init).total_seconds()) # df_mlt = crit_multi_fct.bilan(df) # print((dt.now() - init).total_seconds()) df_pre = pression.bilan(df) print((dt.now() - init).total_seconds()) lst_df = [df_bio,df_hyd,df_phy,df_pre] bilan = sit[['id_site']].copy() for d in lst_df: ind_col = d.columns[d.columns.str.startswith('ind')] bilan = bilan.merge(d[['id_site', *ind_col]], on=['id_site']) cols_ind = bilan.columns[bilan.columns.str.startswith('ind')] ind_pres = df_pre.columns[df_pre.columns.str.startswith('ind')] bilan['ind_fct'] = bilan[cols_ind.drop(ind_pres)].sum(axis=1) bilan['indice'] = bilan[['ind_fct','ind_pression']].sum(axis=1) bilan.name = 'Bilan' print((dt.now() - init).total_seconds()) # jenks(data, col, labels) # Normalisation des notes via la méthode de classification de jenks enjeux = bilan[['id_site']].copy() enjeux['clss_bioeco'] = jenks(bilan,'ind_bioeco',['faible','moyen','fort']) enjeux['clss_hydro'] = jenks(bilan,'ind_hydro',['faible','moyen','fort']) enjeux['clss_phybio'] = jenks(bilan,'ind_phybio',['faible','moyen','fort']) # enjeux['clss_multifct'] = jenks(bilan,'ind_multifct',['faible','moyen','fort']) # enjeux['clss_pressURBAN'] = jenks(bilan,'ind_pressURBAN',['faible','moyen','fort']) # enjeux['clss_pressAGRI'] = jenks(bilan,'ind_pressAGRI',['faible','moyen','fort']) # enjeux['clss_pressAutr'] = jenks(bilan,'ind_pressAutr',['faible','moyen','fort']) enjeux['clss_fct'] = jenks(bilan,'ind_fct',['faible','moyen','fort']) enjeux['clss_pression'] = jenks(bilan,'ind_pression',['faible','moyen','fort']) # Priorisation des enjeux enjeux['enjeu_bioeco'] = priorisation(data=enjeux,titre='enjeu_bioeco',fct='clss_bioeco',pss='clss_pression') enjeux['enjeu_hydro'] = priorisation(data=enjeux,titre='enjeu_hydro',fct='clss_hydro',pss='clss_pression') enjeux['enjeu_phybio'] = priorisation(data=enjeux,titre='enjeu_phybio',fct='clss_phybio',pss='clss_pression') enjeux['enjeu_bilan'] = priorisation(data=enjeux,titre='enjeu_bilan',fct='clss_fct',pss='clss_pression') enjeux.name = 'Enjeux' # Récupération des bornes de chaques classes cols_enjeu = enjeux.columns cols_enjeu = cols_enjeu[cols_enjeu.str.contains('enjeu')] borne = gpd.pd.DataFrame( columns=enjeux.columns.drop(['id_site',*cols_enjeu]), # ,'enjeu_bilan' index=['faible','moyen','fort']) for col in borne.columns: sfx = col.split('_')[1] tmp = gpd.pd.merge( bilan[['id_site','ind_'+sfx]], enjeux[['id_site','clss_'+sfx]], on = 'id_site' ) for idx in borne.index: borne.loc[borne.index==idx,col] = str([ tmp[tmp[col]==idx]['ind_'+sfx].min(), tmp[tmp[col]==idx]['ind_'+sfx].max() ]) borne.index.name = 'classe' borne.reset_index(inplace=True, drop=False) borne.name = 'Borne des classes enjeux' # Constitution d'un dictionnaire de tableau dict_dfs = {} for ddf in [borne, enjeux, bilan, *lst_df]: dict_dfs[ddf.name] = ddf # Ecriture du multi-tableau des résultats print((dt.now() - init).total_seconds()) pycen.write_bilan(dict_dfs, PATH_OUT) # Jointure des géometries sur Bilan et Enjeux bilan = bilan.merge(df,how='left',on='id_site') bilan = bilan.set_geometry('geom') bilan.rename(columns={'id_site':'site_code'}, inplace=True) bilan.name = 'Bilan' enjeux = enjeux.merge(df,how='left',on='id_site') enjeux = enjeux.set_geometry('geom') enjeux.rename(columns={'id_site':'site_code'}, inplace=True) enjeux.name = 'Enjeux' # Ecriture du géopackage df_bio.to_file(PATH_OUT[:-4]+'gpkg', layer=df_bio.name,driver='GPKG') df_hyd.to_file(PATH_OUT[:-4]+'gpkg', layer=df_hyd.name,driver='GPKG') df_phy.to_file(PATH_OUT[:-4]+'gpkg', layer=df_phy.name,driver='GPKG') # df_mlt.to_file(PATH_OUT[:-4]+'gpkg', layer=df_mlt.name,driver='GPKG') df_pre.to_file(PATH_OUT[:-4]+'gpkg', layer=df_pre.name,driver='GPKG') enjeux.to_file(PATH_OUT[:-4]+'gpkg', layer=enjeux.name,driver='GPKG') bilan.to_file( PATH_OUT[:-4]+'gpkg', layer=bilan.name, driver='GPKG') from sys import exit print('') print((dt.now() - init).total_seconds()) exit('END PGZH')