429 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import geopandas as gpd
from shapely import wkt
from shapely.ops import nearest_points,Polygon,Point,LineString
from os import path,listdir
PATH = '/home/colas/Documents/9_PROJETS/6_GEONATURE/MIGRATION/PLATIERE/4_JDD_Faune_Mailles_A_Importer'
FILE_DATA = '(CNPE) Chrono-inventaires entomologiques Orthopteres.csv'
FILE_DATA = '(CNPE) Observations Opportunistes Orthopteres.csv'
FILE_EF_0 = 'Ensembles_Fonctionnels_Platière_Lambert93.shp'
FILE_GRID = 'grillegographiquecomplte/Grille_Geographique_Lignes_Complete_WGS84.shp'
dictef_cols = {
'code_ensemble':'code_ensem',
**dict.fromkeys(['ensemble_fonctionnel','nom_ensemble'],'nom_ensemb'),
}
ef0 = gpd.read_file(path.join(PATH,'..',FILE_EF_0),encoding='UTF-8')
ef1 = gpd.read_file(path.join(PATH,'..','ensemblefonctionnelscomplte','ensemble fonc baix.TAB'),encoding='UTF-8')
ef2 = gpd.read_file(path.join(PATH,'..','ensemblefonctionnelscomplte','ensemble fonc donzère.TAB'),encoding='UTF-8')
ef3 = gpd.read_file(path.join(PATH,'..','ensemblefonctionnelscomplte','Ensembles fonctionnels montélimar.TAB'),encoding='UTF-8')
ef4 = gpd.read_file(path.join(PATH,'..','ensemblefonctionnelscomplte','Secteurs obs oiseaux montélimar.TAB'),encoding='UTF-8')
for e in [ef1,ef2,ef3,ef4]:
e.columns = e.columns.str.lower()
e.rename(columns=dictef_cols,inplace=True)
if e.crs.srs.lower() != 'epsg:2154':
e.to_crs('epsg:2154',inplace=True)
ef = (gpd.pd.concat([ef0,ef1,ef2,ef3,ef4])
.astype({'code_ensem':str})
.sort_values(['nom_ensemb'],ignore_index=True)
.drop_duplicates(ignore_index=True))
ef.to_file(path.join(PATH,'..','ensemblefonctionnelscomplte','Ensembles_Fonctionnels_Platière_Lambert93_ALL.geojson'),driver='GeoJSON')
data = gpd.pd.read_csv(path.join(PATH,FILE_DATA),sep=";",encoding='Windows 1252') # ['ASCII','Windows 1252']
dict_col = {
**dict.fromkeys(['longitude autre'],'Long autre'),
**dict.fromkeys(['latitude autre'],'Lat autre'),
**dict.fromkeys([' Longitude grade ','LONGITUDE'],'Longitude grade'),
**dict.fromkeys([' Latitude grade ','LATITUDE'],'Latitude grade'),
**dict.fromkeys(['latitude autre'],'Lat autre'),
**dict.fromkeys(['Numéro relevé','Numéro de relevé'],'Numéro'),
}
dict_ef = {
'Prairie ancienne pépinière CNR':'Ancienne pépinière CNR',
'Marais méandre des Oves':'Marais des Oves',
'Prairie des Iles (Péage)':'Prairie des Iles',
'Vieux Rhône Péage amont seuil':'Vieux Rhône amont seuil Sablons',
'Lône de la Platière':'Lone Platière',
'Banc Arcoules (banc1)':'Banc Arcoule (banc 1)',
'Plaine St Marice rive gauche canal':'Plaine St Maurice rive gauche canal',
'Casiers île des Graviers':'Casiers aval observatoire',
'Prairie violettes (vers Marais des Oves)':'Prairie Violettes',
'Forêt nord des Oves':'Forêt Nord Oves',
'Digue neuve et entonnement de la lône':'Digue neuve et entonnement',
'Marais méandre des Oves':'Marais des Oves',
'Banc et ilots aval seuil Peyraud':'banc et ilots aval seuil Peyraud',
'Casiers banc 3':'casiers banc 3',
'Casiers RD Limony/Serrières':'Casiers RD Limony Serrieres',
'Forêt Pointe sud':'Forêt pointe Sud',
'Gravière des Rotissots (CORA)':'Gravière des Rotissots(CORA)',
'Lone des îles':'Boisements MICRONAT',
"Lône de l'Ilon":'Lône Ilon',
'Lône du Bayard (Donzère)':'Lône du Bayard',
'Lône du Noyer nord':'Lône Noyer Nord',
'Lône du Noyer sud':'Lône Noyer Sud',
'Lone de la Roussette':'Lône de la Roussette',
"Plan d'eau St Pierre de Bœuf":"Plan d'eau St Pierre de Boeuf",
'Prairie SW +extS+ zone embroussaillée':'Prairie SW + ext Sud + zone embrousaillée',
'Remblais Ecluse Sablons':'Remblais usine ecluse',
'Roselière et marais St-Maurice':'Roselière et marais St Maurice',
'Vieux Rhône Péage aval seuil Sablons':'Vieux Rhône aval Sablons',
'Zone cultivée nord':'Zone cultivée Nord',
'Contre canal RD canal Péage':'Contre-canal RD Péage',
'Aval Ruisseau de Limony':'Foret Limony',
'Forêts des grandes Oves':'Forêt des Grandes Oves',
'Ile du Noyer nord':'Ile Noyer Nord',
"Lone de l'ile de Peyraud":"Lône de l'ile de Peyraud",
'Casiers banc 3':'casiers banc 3',
'Foret Ile Bugnon':'Forêt Ile Bugnon',
'Graviere des Rotissots (Commune)':'Gravière Rotissots (Commune)',
'Plaine de St Pierre de Bœuf':'Plaine St Pierre de Boeuf',
'Plaine nord Serrières':'Plaine Nord Serrières',
'Zone cultivée sud':'Zone cultivée Sud',
'Casiers Boussarde':'casiers Boussarde',
'Casiers aval seuil Peyraud RG':'Casiers aval seil Peyraud RG',
'Contre-canal rive gauche Péage':'Contre-canal RG Péage',
'Ile du Noyer sud + boisement ouest':'Ile Noyer Sud + boisement ouest',
'Lone de la Barcasse':'Lône de la Barcasse',
'Ruisseau Dolon':'Ruisseau Sanne Dolon', # c'est bon !?
'la Payre, cours aval + cc RD amont':'La Payre, cours aval + CC RD amont',
'Lône de la quarantaine':'Lône de la Quarantaine',
"Plan d'eau des Blaches":"plan d'eau des Blaches",
'Lone du Gouvernement':'Lône du Gouvernement',
'Anc. Gravière CNR Amont RCC Donzère':'Anc Gravière CNR Amont RCC', # ?
'Zone réhabilitée traitement granulat RCC Baix':'Zone réhabilitée traitement granulat', # ?
# 'Banc 1 RCC Donzère',
# 'Butte (Arras sur Rhône)',
# 'Canaux et ruisseaux Ancone',
# 'Canaux et ruisseaux Petits Robins',
# 'Casiers Ile du Chambon',
# 'Etang de la Gare',
# 'Forêt Limony (hors RN)',
# 'Graviere Ancone',
# 'Gravières Cruas',
# 'Le Petit-Rhône de la Voulte/Livron',
# 'Lone Ancone',
# 'Prairie ile du Chambon',
# 'Prairie sèche "Picard", St Gervais/Roubion',
# 'Rivière Roubion',
# 'Ruisseau de Limony (gorges)',
# 'Ruisseau de la Riaille',
# 'Ruisseau du Lumas',
# 'Ruisseau ile du Chambon',
# 'Casiers RCC Gervans':,
# 'Forêt Limony (hors RN)':,
# 'coteaux rive droite de St Pierre à Champagne':,
# 'Butte (Arras sur Rhône)':,
# 'Prairie ile du Chambon':,
# 'Ravin de Serrières':,
# 'ruisseau de Peyraud (gorges)':,
# 'Rivière Roubion':'Roubion',
# 'Banc 2 RCC Donzère',
# 'Canaux et ruisseaux Petits Robins',
# 'Contre canal RG Planaris',
# 'Contre canaux Cruas',
# 'Etang Chanson (Chavanay)',
# 'Etangs Verlieux',
# 'Forêt alluviale Roubion',
# 'Grand val (Chavanay)',
# 'Graviere Ancone',
# 'Lac des Sauzets',
# 'Le Petit-Rhône de la Voulte/Livron',
# 'Lone des Petits Robins',
# 'Marais "de Printegarde"',
# "Plan d'eau loisir Le Pouzin",
# 'Prairie sèche Pierrelatte',
# 'Retenue de Baix-Logis Neuf',
# 'Rivière Drôme aval ramières',
# 'Rivière Roubion',
# 'Ruisseau de Limony (gorges)',
# 'Ruisseau ile du Chambon',
# 'Vieux rhone de baix',
# 'Vieux-Rhône Donzère --> confluent Ardèche',
# 'ruisseau de Peyraud (gorges)',
}
def remove_special_char(obj,space=False):
dict_char = {
r'[]':"'",
r'[àáâãäå]':'.',
r'[èéêë]':'.',
r'[ìíîï]':'.',
r'[òóôõö]':'.',
r'[ùúûü]':'.',
r'[ -]':"",
r'[?|!^]':"."
}
if space:
dict_char = {**dict_char, **{r'[ ]':""}}
return obj.replace(dict_char,regex=True)
def exend_lines(line,geom):
if line.geom_type == 'MultiLineString':
mini = gpd.GeoSeries(list(line.geoms),crs=2154).distance(geom).min()
line = [l for l in line.geoms if l.distance(geom) == mini][0]
x,y = line.coords.xy
ysup = y[-1] - y[0] if y[0] < y[-1] else y[0] - y[-1]
xsup = x[-1] - x[0] if x[0] < x[-1] else x[0] - x[-1]
x[0] = x[0] + xsup if x[0] > x[-1] else x[0] - xsup
x[-1] = x[-1] - xsup if x[0] > x[-1] else x[-1] + xsup
y[0] = y[0] + ysup if y[0] > y[-1] else y[0] - ysup
y[-1] = y[-1] - ysup if y[0] > y[-1] else y[-1] + ysup
return LineString(zip(x,y))
def test_data_ok(df,envelop):
test_data_ok = gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df['Long autre'],df['Lat autre']),
crs=27572
).to_crs(epsg=2154)
test_enveloppe = test_data_ok.intersects(envelop)
is_ok = test_data_ok[test_enveloppe].copy()
not_ok = test_data_ok[~test_enveloppe].copy()
not_ok['Lat autre'] = None
not_ok['Long autre'] = None
return is_ok.drop('geometry',axis=1),not_ok.drop('geometry',axis=1)
def get_nearest_values(row, other_gdf, grid, point_column='geometry'):
"""Find the nearest point and return the corresponding value from specified value column."""
# Recréation de la maille
lonG = row['Latitude grade']
latG = row['Longitude grade']
long = grid[grid.Champ1 == lonG].unary_union
lat = grid[grid.Champ1 == latG].unary_union
longSup = grid[grid.Champ1 == round((lonG+0.0025),4)].unary_union
latSup = grid[grid.Champ1 == round((latG+0.0025),4)].unary_union
if longSup is None:
longInf = grid[grid.Champ1 == round((lonG-0.0025),4)].unary_union
d = long.distance(longInf)
x,y = long.xy
y[0] = y[0] + d
y[-1] = y[-1] + d
longSup = LineString(zip(x,y))
p1 = long.intersection(lat)
while p1.is_empty:
long = exend_lines(long,lat)
lat = exend_lines(lat,long)
p1 = long.intersection(lat)
p2 = longSup.intersection(lat)
while p2.is_empty:
longSup = exend_lines(longSup,lat)
lat = exend_lines(lat,longSup)
p2 = longSup.intersection(lat)
p3 = longSup.intersection(latSup)
while p3.is_empty:
longSup = exend_lines(longSup,latSup)
latSup = exend_lines(latSup,longSup)
p3 = longSup.intersection(latSup)
p4 = long.intersection(latSup)
while p4.is_empty:
long = exend_lines(long,latSup)
latSup = exend_lines(latSup,long)
p4 = long.intersection(latSup)
pointList = [p1, p2, p3, p4, p1]
for i,p in enumerate(pointList):
if p.geom_type == 'MultiPoint':
for pp in list(p.geoms):
if p.distance(row.geom) == row.geom.distance(pp):
pointList[i] = pp
poly = gpd.GeoSeries(Polygon([[p.x, p.y] for p in pointList]).buffer(0),crs=2154)
# Filtre de l'ensemble fonctionnel
other_geom = other_gdf.geometry.name
other_gdf_filter = other_gdf[other_gdf.nom_ensemb == row.esmb].copy()
if other_gdf_filter.contains(row.geom).any():
# pass
return row.geom
elif other_gdf_filter.intersection(poly.unary_union).is_empty.all():
pass
else:
other_gdf_filter[other_geom] = other_gdf_filter.intersection(poly.unary_union)
# Create an union of the other GeoDataFrame's geometries:
other_points = other_gdf_filter[other_geom].unary_union
# Find the nearest points
nearest_geoms = nearest_points(row[point_column], other_points)
# Get corresponding values from the other df
# nearest_data = other_gdf.loc[other_gdf[other_geom] == nearest_geoms[1]]
# nearest_value = nearest_data[value_column].get_values()[0]
return nearest_geoms[1]
def create_grid():
grid = gpd.read_file(path.join(PATH,'..',FILE_GRID))
grid.rename(columns={'Coordonné':'Champ1'},inplace=True)
if grid.crs.srs.lower()!='epsg:2154':
grid.to_crs(epsg=2154,inplace=True)
if grid['Champ1'].dtype == object:
grid['Champ1'] = grid['Champ1'].str.replace(',','.').astype(float)
grid = grid.dissolve(by='Champ1',as_index=False)
# Filtre Lat/Long
long = grid[grid.Champ1 < 25].copy()
lat = grid[grid.Champ1 > 25].copy()
# Identification du point d'intersection
grd = gpd.GeoSeries(list(long.unary_union.intersection(lat.unary_union).geoms),crs=2154)\
.to_frame()\
.rename_geometry('geom')
# Récuperation des coordonnées x et y
grd['x'] = grd.geom.x.copy()
grd['y'] = grd.geom.y.copy()
grd = grd.sort_values(['y','x'],ascending=[True,False]).reset_index(drop=True)
x0 = (grd.x[1]-grd.x[0])/2
y0 = (grd.y[1]-grd.y[0])/2
grd = grd.sort_values(['x','y'],ascending=[True,False]).reset_index(drop=True)
x1 = (grd.x[1]-grd.x[0])/2
y1 = (grd.y[0]-grd.y[1])/2
X = x0+x1
Y = y0+y1
return grid,X,Y
if __name__ == "__main__":
lst_file = listdir(PATH)
lst_file.sort()
for FILE_DATA in lst_file[22:]:
grid,X,Y = create_grid()
envlop = grid.geometry.unary_union.envelope.buffer(10000)
data = gpd.pd.read_csv(path.join(PATH,FILE_DATA),sep=";",encoding='Windows 1252') # ['ASCII','Windows 1252']
data.dropna(how='all',inplace=True)
data.rename(columns=dict_col,inplace=True)
print('File :',FILE_DATA)
# Si Long/Lat est un champ text, transformation des colonnes en nombre.
if ('Long autre' in data.columns) :
if (data['Long autre'].dtype == object):
data['Long autre'] = data['Long autre'].str.replace(',','.').astype(float)
if ('Lat autre' in data.columns) :
if data['Lat autre'].dtype == object:
data['Lat autre'] = data['Lat autre'].str.replace(',','.').astype(float)
if data['Longitude grade'].dtype == object:
data['Longitude grade'] = data['Longitude grade'].str.replace(',','.').astype(float)
if data['Latitude grade'].dtype == object:
data['Latitude grade'] = data['Latitude grade'].str.replace(',','.').astype(float)
data.loc[data['Lat autre']==0,'Lat autre'] = None
data.loc[data['Long autre']==0,'Long autre'] = None
if not ef.is_valid.all():
ef.geometry = ef.make_valid()
# Isolement des données précises
data_ok = data.loc[
data['Long autre'].notna() & data['Lat autre'].notna()
].copy()
if data.shape[0] == data_ok.shape[0]:
continue
data_ok, data_not_ok = test_data_ok(data_ok,envlop)
df = gpd.pd.concat([
data.loc[
data['Long autre'].isna()&data['Lat autre'].isna()
].copy(),
data_not_ok]
).sort_values('Numéro').reset_index() # 'Numero'
if data.shape[0] == data_ok.shape[0]:
continue
# Data to GéoData
gdf_ok = gpd.GeoDataFrame(
data_ok,
geometry=gpd.points_from_xy(data_ok['Long autre'],data_ok['Lat autre']),
crs=27572
).to_crs(epsg=2154)
# Correspondance [ Grades - Grid ]
DICT_GRID = dict(zip(grid.Champ1,grid.geometry.to_wkt()))
# Replace Grade by L93
df[['Lat93','Long93']] = df[['Latitude grade','Longitude grade']].replace(DICT_GRID)
# Reconstruction du GéoDataframe
gdf = gpd.GeoDataFrame(
df.copy(),
geometry=gpd.GeoSeries.from_wkt(df['Lat93'])\
.intersection(gpd.GeoSeries.from_wkt(df['Long93'])),
crs=2154
)
geom_empty = gdf.geometry.is_empty
if geom_empty.any():
for i,row in gdf.loc[geom_empty].iterrows():
while wkt.loads(row['Lat93']).intersection(wkt.loads(row['Long93'])).is_empty:
row['Lat93'] = exend_lines(
wkt.loads(row['Lat93']),
wkt.loads(row['Long93'])
).wkt
row['Long93'] = exend_lines(
wkt.loads(row['Long93']),
wkt.loads(row['Lat93'])
).wkt
gdf.loc[i,'geometry'] = wkt.loads(row['Lat93']).intersection(wkt.loads(row['Long93']))
gdf.rename_geometry('geom',inplace=True)
gdf['x'] = gdf.geom.x + X
gdf['y'] = gdf.geom.y + Y
gdf.set_geometry(gpd.points_from_xy(gdf.x,gdf.y),inplace=True)
# Test
ef_exp = ef.explode(index_parts=True)
# [
# near(geom,ldt,ldt_union,'id_lieu_dit')
# for geom in df.geometrie
# ]
# gdf['Code ensemble fonct']
gdf['esmb'] = remove_special_char(
gdf['Ensemble fonctionnel'].replace(dict_ef)
)
ef_exp.nom_ensemb = remove_special_char(ef_exp.nom_ensemb)
gdf['nearest_point'] = gdf.apply(get_nearest_values, other_gdf=ef_exp, grid=grid, point_column="geom", axis=1)
gdf.drop(['geom','esmb'],axis=1,inplace=True)
gdf.set_geometry('nearest_point',crs=2154,inplace=True)
gdf.rename_geometry('geometry',inplace=True)
if not gdf_ok.empty:
gdf = gpd.pd.concat([gdf,gdf_ok])
FILE_OUT = FILE_DATA.replace('.csv','_reprojected.csv')
gdf.to_csv(path.join(PATH,'..','RESULT',FILE_OUT),sep=';',encoding='Windows 1252',index=False)
FILE_OUT = FILE_OUT.replace('-','_').replace(' ','_')
FILE_OUT = FILE_OUT.replace('(','').replace(')','')
(gdf
.to_file(path.join(PATH,'..','RESULT','result_2.gpkg'),driver='GPKG',layer=FILE_OUT.replace('-','_')))