#!/usr/bin/env python3 # -*- coding: UTF-8 -*-. import geopandas as gpd from pycen import con from os import path,listdir from re import findall PATH = '/media/colas/SRV/FICHIERS/OUTILS/CARTOGRAPHIE/ESPACE DE TRAVAIL/ETUDES/Zones humides' lst_layer = [x for x in listdir(PATH) if (x.endswith('.shp') or x.endswith('.gpkg')) and x.startswith('ZHP')] def upper_first_letter(s): if s is None or gpd.pd.isna(s): return s else: return s[0].upper() + s[1:] def manual_replace(s, old, new, index): if s is None or gpd.pd.isna(s): return s else: if old == s[index]: return s[:index] + new + s[index +1:] else : return s def extract_info(df,value): t1 = df[0].str.lower().str.contains(value,na=False) t2 = df[1].str.lower().str.contains(value,na=False) t3 = df[2].str.lower().str.contains(value,na=False) t4 = df[3].str.lower().str.contains(value,na=False) t5 = df[4].str.lower().str.contains(value,na=False) df[5] = None for i,t in enumerate([t1,t2,t3,t4,t5]): if t.any(): df.loc[t,5] = df[t][i] return df[5].str.strip() def extract_descripti2(df): df['statut'] = extract_info(df,'isit|possible ?|absence zh|non cartographier') df['source'] = extract_info(df,'photointe|ortho|scan|information communale') return df if __name__ == "__main__": gdf = gpd.GeoDataFrame() for layer in lst_layer: print(layer) data = gpd.read_file(path.join(PATH,layer)) gdf = gpd.pd.concat([gdf,data]) gdf.DATEMODIF = gpd.pd.to_datetime(gdf.DATEMODIF,format='mixed') gdf.columns = gdf.columns.str.lower() gdf.sort_values(['datemodif'],inplace=True,ignore_index=True,ascending=False) gdf.drop(columns=['X','Y'],inplace=True,errors='ignore') gdf.rename_geometry('geom',inplace=True) gdf.drop_duplicates(subset='geom',inplace=True,ignore_index=True,keep='first') tmp = (gdf.descripti2 .str.replace('/','|') .str.replace('06|19','06/19') .str.replace('04|12|2019','04/12/2019') .str.replace('vistier','visiter') .str.replace('AbsenceZH','Absence ZH') .str.replace('Zone humide non cartographiée et','non cartographier | ZH') .str.replace('continuité zh de la gère, à visiter','continuité zh de la gère | à visiter') .str.split('\\|| - ',expand=True) .replace('',None)) res = extract_descripti2(tmp) gdf[['statut','source']] = res[['statut','source']] gdf['comment'] = (gdf.descripti2 .str.replace('vistier','visiter') .str.replace('AbsenceZH','Absence ZH') .str.replace('Zone humide non cartographiée et','non cartographier | ZH') .str.replace('continuité zh de la gère, à visiter','continuité zh de la gère | à visiter') .replace([x for x in gdf.statut.unique() if x is not None],'',regex=True) .replace([x for x in gdf.source.unique() if x is not None],'',regex=True) .str.strip() .replace('',None) .apply(lambda x: manual_replace(x,'|','',0)).str.strip() .apply(lambda x: manual_replace(x,'|','',0)).str.strip() .replace('',None) .apply(lambda x: manual_replace(x,'/','',0)).str.strip() .apply(lambda x: manual_replace(x,'|','',len(x)-1) if x is not None else None) .apply(lambda x: manual_replace(x,',','',len(x)-1) if x is not None else None) .replace('',None) .apply(lambda x: manual_replace(x,'/','',len(x)-1) if x is not None else None) .str.strip() .str.replace('| | ','|') .replace('',None) .apply(lambda x: manual_replace(x,'?','',len(x)-1) if x is not None else None) .str.strip() .apply(lambda x: manual_replace(x,'|','',len(x)-1) if x is not None else None) .str.strip() .str.replace('Extension zh amont - peupleraie','Peupleraie - extension zh amont') .str.replace("Zone d'effondrement - source","Source - zone d'effondrement") ) gdf['statut'] = (gdf['statut'] .apply(lambda x: upper_first_letter(x)) .str.replace(' ?','?') .str.replace('?',' ?') .replace('À visiter','A visiter')) gdf['source'] = (gdf['source'] .apply(lambda x: upper_first_letter(x)) .str.replace('Interprétation','Photointerprétation')) gdf['type'] = (gdf.comment .str.replace('Prairie humide','Prairiehumide') .str.split('\\|| ',n=1) .str[0] .str.replace('Prairiehumide','Prairie humide') .str.strip() .replace('Roselières','Roselière') .apply(lambda x: upper_first_letter(x)) ) gdf['comment'] = (gdf.comment .replace([x for x in gdf.type.unique() if x is not None or x.contains('INVZH')],'',regex=True) .str.replace('Prairie humide','Prairiehumide') .str.split('\\|| ',n=1) .str[1] .str.replace('Prairiehumide','Prairie humide') .str.strip() .replace('',None) .apply(lambda x: manual_replace(x,'-','',0)).str.strip() ) et = gdf.comment.str.startswith('et ',na=False) de = gdf.comment.str.startswith('de ',na=False) gdf.loc[et,'comment'] = gdf.loc[et,'comment'].str.replace('et ','',n=1) gdf.loc[de,'comment'] = gdf.loc[de,'comment'].str.replace('de ','',n=1) invzh = gdf['type'].str.contains('INVZH',na=False) gdf.loc[invzh,'comment'] = gdf.loc[invzh,'type'] gdf.loc[invzh,'type'] = None gdf.comment = (gdf.comment .str.strip() .replace('',None) .apply(lambda x: upper_first_letter(x))) gdf.to_postgis('zh_ponctuelles',con,'zones_humides',if_exists='replace',index=True,index_label='gid') sql = """ ALTER TABLE zones_humides.zh_ponctuelles ADD PRIMARY KEY (gid); GRANT ALL ON TABLE zones_humides.zh_ponctuelles TO grp_admin; GRANT SELECT ON TABLE zones_humides.zh_ponctuelles TO grp_consult; """ with con.begin() as cnx: cnx.execute(sql) test = """ SELECT * FROM zones_humides.zh_ponctuelles; """ data = gpd.pd.read_sql_query( sql = test, con = con)