149 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import geopandas as gpd
from shapely import wkt
from shapely.ops import nearest_points
from os import path
PATH = '/home/colas/Documents/9_PROJETS/6_GEONATURE/MIGRATION/PLATIERE/Envoi Colas Correction Données Mailles Par Les Ensembles Fonctionnels'
FILE_DATA = 'Lépido_Mailles_2.csv'
FILE_EF = 'Ensembles_Fonctionnels_Platière_Lambert93.shp'
FILE_GRID = 'Mailles_Platière_Lambert93.shp'
ef = gpd.read_file(path.join(PATH,FILE_EF),encoding='UTF-8')
grid = gpd.read_file(path.join(PATH,FILE_GRID))
data = gpd.pd.read_csv(path.join(PATH,FILE_DATA),sep=";",encoding='Windows 1252') # ['ASCII','Windows 1252']
def remove_special_char(obj,space=False):
dict_char = {
r'[]':"'",
r'[àáâãäå]':'.',
r'[èéêë]':'.',
r'[ìíîï]':'.',
r'[òóôõö]':'.',
r'[ùúûü]':'.',
r'[ ]':"",
r'[?|!^]':"."
}
if space:
dict_char = {**dict_char, **{r'[ ]':""}}
return obj.replace(dict_char,regex=True)
def nearest(row, geom_union, df1, df2, geom1_col='geometry', geom2_col='geometry', src_column=None):
"""Find the nearest point and return the corresponding value from specified column."""
# Find the geometry that is closest
nearest = df2[geom2_col] == nearest_points(row[geom1_col], geom_union)[1]
# Get the corresponding value from df2 (matching is based on the geometry)
value = df2[nearest][src_column].get_values()[0]
return value
def near(point,df2,geom_union,src_column):
# find the nearest point and return the corresponding Place value
geom2_col = df2.geometry.name
nearest = df2[geom2_col] == nearest_points(point, geom_union)[1]
# print(nearest)
# print(df2[nearest][src_column])
return df2[nearest][src_column].values[0]
if __name__ == "__main__":
# Si Long/Lat est un champ text, transformation des colonnes en nombre.
if data['Long autre'].dtype == object:
data['Long autre'] = data['Long autre'].str.replace(',','.').astype(float)
if data['Lat autre'].dtype == object:
data['Lat autre'] = data['Lat autre'].str.replace(',','.').astype(float)
if data['Longitude grade'].dtype == object:
data['Longitude grade'] = data['Longitude grade'].str.replace(',','.').astype(float)
if data['Latitude grade'].dtype == object:
data['Latitude grade'] = data['Latitude grade'].str.replace(',','.').astype(float)
if grid['Champ1'].dtype == object:
grid['Champ1'] = grid['Champ1'].str.replace(',','.').astype(float)
# Isolement des données précises
data_ok = data.loc[
data['Long autre'].notna() & data['Lat autre'].notna()
].copy()
df = data.loc[
data['Long autre'].isna()&data['Lat autre'].isna()
].copy().sort_values('Numéro').reset_index() # 'Numero'
# Data to GéoData
gdf_ok = gpd.GeoDataFrame(
data_ok,
geometry=gpd.points_from_xy(data_ok['Long autre'],data_ok['Lat autre']),
crs=2154
)
## Traitement des données Non-Précises
# Re-construction de la grille
long = grid[grid.Champ1 < 25].copy()
lat = grid[grid.Champ1 > 25].copy()
grd = gpd.GeoSeries(list(long.unary_union.intersection(lat.unary_union).geoms),crs=2154)\
.to_frame()\
.rename_geometry('geom')
grd['x'] = grd.geom.x.copy()
grd['y'] = grd.geom.y.copy()
grd = grd.sort_values(['y','x'],ascending=[True,False]).reset_index(drop=True)
x0 = (grd.x[1]-grd.x[0])/2
y0 = (grd.y[1]-grd.y[0])/2
grd = grd.sort_values(['x','y'],ascending=[True,False]).reset_index(drop=True)
x1 = (grd.x[1]-grd.x[0])/2
y1 = (grd.y[0]-grd.y[1])/2
X = x0+x1
Y = y0+y1
# test résultats sur grd
# grd = grd.sort_values(['y','x'],ascending=[True,False]).reset_index(drop=True)
# grd.x = grd.x + X
# grd.y = grd.y + Y
# grd.set_geometry(gpd.points_from_xy(grd.x,grd.y),inplace=True)
# grd.to_file(path.join(PATH,'result.gpkg'),driver='GPKG',layer='test_grid')
# Correspondance [ Grades - Grid ]
DICT_GRID = dict(zip(grid.Champ1,grid.geometry.to_wkt()))
# Replace Grade by L93
df[['Latitude grade','Longitude grade']] = df[['Latitude grade','Longitude grade']].replace(DICT_GRID)
# Reconstruction du GéoDataframe
gdf = gpd.GeoDataFrame(
df.copy(),
geometry=gpd.GeoSeries.from_wkt(df['Latitude grade'])\
.intersection(gpd.GeoSeries.from_wkt(df['Longitude grade'])),
crs=2154
)
gdf.rename_geometry('geom',inplace=True)
gdf['x'] = gdf.geom.x + X
gdf['y'] = gdf.geom.y + Y
gdf.set_geometry(gpd.points_from_xy(gdf.x,gdf.y),inplace=True)
#
# Test
# ef_exp = ef.explode(index_parts=True)
# [
# near(geom,ldt,ldt_union,'id_lieu_dit')
# for geom in df.geometrie
# ]
# gdf['Code ensemble fonct']
gdf['esmb'] = remove_special_char(gdf['Ensemble fonctionnel'])
ef.nom_ensemb = remove_special_char(ef.nom_ensemb)
ef[ef.nom_ensemb.isin(gdf.esmb)].shape
# gdf[~gdf.esmb.isin(ef.nom_ensemb)]
# gdf['Code secteur fonctionnel']
# df1.apply(nearest, geom_union=unary_union, df1=df1, df2=df2, geom1_col='centroid', src_column='id', axis=1)
# gdf.apply(nearest, geom_union=ef_exp.unary_union, df1=gdf, df2=ef_exp, geom1_col='centroid', src_column='code_ensem', axis=1)
gdf.to_file(path.join(PATH,'result_2.gpkg'),driver='GPKG',layer='test_data')