Python_scripts/0_FONCIER/foncier_insert_administratif.py
2024-05-15 17:04:52 +02:00

438 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#Nom : : foncier_insert_table.py
#Description : Insertion/MAJ des données administratives et territoriales à la base <foncier> lors de sa création.
#Copyright : 2021, CEN38
#Auteur : Colas Geier
#Version : 1.0
import pandas as pd
import geopandas as gpd
from sqlalchemy import create_engine
from geoalchemy2 import Geometry
from pycen import bdd
from shapely.geometry.multipolygon import MultiPolygon
from pydate import cdate
import sys
import os
####################################
####################################
####################################
# PARAMETRES
# Liste des tables à mettre à jour. ATTENTION : le respect des nomenclatures est importante
# Liste dispo : ['com', 'dpt', 'ter', 'histo_com', 'ter_com'] Cette liste doit être identique aux index de l'object : Table
# Correspondance tables BD_FONCIER : ['communes', 'departements', 'territoires', 'r_histo_com', 'r_ter_com']
run = ['com', 'dpt', 'ter', 'histo_com', 'ter_com']
user = 'colas_geier'
pwd = 'adm1n*fOncier'
# Parametres bdd
# user = 'postgres'
# pwd = 'tutu'
# adr = '192.168.60.9'
adr = '91.134.194.221'
port = '5432'
base = 'bd_cen'
# Connexion bdd
# bd = bdd.CEN(
# user = user,
# pwd = pwd,
# adr = adr,
# base = base
# schema = schema
# )
crs = 'EPSG:2154'
con = create_engine('postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(user,pwd,adr,port,base), echo=False)
con_open = con.connect()
# create_engine('postgresql+psycopg2://cen_admin:#CEN38@venir@172.17.0.2:5432/bd_cen', echo=True)
# PATH
PATHIN = '/home/colas/'
LIST_DEPT = ['07', '26', '42', '38']
#
MAIN_FOLDER = 'Documents'
SUB_FOLDERS = os.path.join('5_BDD','1_QGIS')
FILE_COM = ['COMMUNE.shp']
FILE_DPT = ['DEPARTEMENT.shp']
FILE_TER = [
'EPCI.shp',
'PARC_OU_RESERVE.shp',
'BASSIN_VERSANT_TOPOGRAPHIQUE.shp',
'Contours_GRPT_AURA.shp' ]
# Dict table
IN_COM = [{
'id': 'id',
'insee_com': 'code_insee',
'nom': 'nom',
None: 'prec_plani',
None: 'statut',
None: 'canton',
'insee_arr': 'arrondisst',
'insee_dep': 'depart',
'insee_reg': 'region',
'population': 'popul',
None: 'multican',
'actif': 'actif',
None: 'epfl',
# 'geometry': 'geom',
}]
IN_DPT = [{
'id': 'id',
'nom': 'nom',
'insee_dep': 'insee_dep',
'insee_reg': 'insee_reg',
'date_creat': 'date_creat',
'date_maj': 'date_maj',
'actif': 'actif',
# 'geometry': 'geom',
}]
IN_TER = [{ # DICT epci
'': 'territoire_id',
'code_siren': 'siren',
'nom': 'territoire_lib',
'': 'territoire_sigle',
'nature': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},{ # DICT parc_ou_reserve
'': 'territoire_id',
'id': 'siren', # absence de code siren ==> récup des 10 derniers charactères du champs ID
'toponyme': 'territoire_lib',
'': 'territoire_sigle',
'nature': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},{ # DICT bassin_versant
'': 'territoire_id',
'code_hydro': 'siren',
'toponyme': 'territoire_lib',
'': 'territoire_sigle',
'id': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},{ # DICT epci
'': 'territoire_id',
'sirengrpt': 'siren',
'grpt': 'territoire_lib',
'': 'territoire_sigle',
'nature': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},]
# liste des couches administratives listé dans PATH_TER
administratif = ['epci']
# DICT typterritoire_id lorsqu'une correspondance
# n'est pas possible avec un champs de la table attributaire.
# Relation typterritoire_id / ID (de la table attributaire).
typterid = {
'bassvers' : 'bv'
}
mask_path = 'Documents/5_BDD/1_QGIS/'
mask_file = 'mask_parcelles_cadastre.shp'
####################################
####################################
####################################
# FONCTIONS
def join_typterritoire(df, join):
tab = join['table']
sch = join['schema']
ind = join['id']
on = join['on']
tmp = pd.read_sql_table(
table_name = tab,
con = con,
schema = sch,
index_col = ind,
)
df[on['x']] = df[on['x']].str.lower()
df[on['x']] = df[on['x']].replace(tmp[on['y']].str.lower().to_list(),tmp.index.to_list())
df = df[ df[on['x']].isin(tmp.index.to_list()) ]
return df
def find_files(File, main_path='Documents', sub_path=None):
'''
@File : list
@main_path : str
@sub_path : str
'''
sub = ''
if sub_path:
sub = sub_path
path = os.path.join(main_path, sub)
list_path = []
for F in File :
matches = [str(path) for path in Path(path).rglob(F)]
list_path += matches
return list_path
####################################
####################################
####################################
# MAIN
if os.path.exists(PATHIN + mask_path + mask_file):
mask = gpd.read_file(PATHIN + mask_path + mask_file, crs=crs)
else :
sys.exit()
Table = {
'dpt': {'schema':'administratif', 'table':'departements', 'file': FILE_DPT, 'geom': True, 'dict':IN_DPT},
'com' : {'schema':'administratif', 'table':'communes', 'file': FILE_COM, 'geom': True, 'dict':IN_COM},
# 'histo_com': {'schema':'administratif', 'table':'r_histo_com', 'file':None, 'geom': False, 'dict':None},
'ter': {'schema':'territoires', 'table':'territoires', 'file': FILE_TER, 'geom': True, 'dict':IN_TER, 'join': {
'schema':'territoires', 'table': 'd_typterritoire', 'id': 'typterritoire_id', 'on': {'x': 'typterritoire_id', 'y': 'typterritoire_lib'}}},
'ter_com': {'schema':'territoires', 'table':'r_ter_com', 'file':None, 'geom': False, 'dict':None}
}
for d, dep in enumerate(LIST_DEPT):
PATH = 'Documents/5_BDD/1_QGIS/202103_BDTOPO/bdtopo_dep{0}_202103_shapefile/BDT_3-0_SHP_LAMB93_D0{0}-ED2021-03-15/'.format(dep)
PATH_COM = [PATH + 'ADMINISTRATIF/COMMUNE.shp']
PATH_DPT = [PATH + 'ADMINISTRATIF/DEPARTEMENT.shp']
PATH_TER = [
PATH + 'ADMINISTRATIF/EPCI.shp',
PATH + 'ZONES_REGLEMENTEES/PARC_OU_RESERVE.shp',
PATH + 'HYDROGRAPHIE/BASSIN_VERSANT_TOPOGRAPHIQUE.shp',
'' ]
Table = {
'dpt': {'schema':'administratif', 'table':'departements', 'file': [PATHIN + path for path in PATH_DPT], 'geom': True, 'dict':IN_DPT},
'com' : {'schema':'administratif', 'table':'communes', 'file': [PATHIN + path for path in PATH_COM], 'geom': True, 'dict':IN_COM},
# 'histo_com': {'schema':'administratif', 'table':'r_histo_com', 'file':None, 'geom': False, 'dict':None},
'ter': {'schema':'territoires', 'table':'territoires', 'file': [PATHIN + path for path in PATH_TER], 'geom': True, 'dict':IN_TER, 'join': {
'schema':'territoires', 'table': 'd_typterritoire', 'id': 'typterritoire_id', 'on': {'x': 'typterritoire_id', 'y': 'typterritoire_lib'}}},
'ter_com': {'schema':'territoires', 'table':'r_ter_com', 'file':None, 'geom': False, 'dict':None}
}
not_run = [k for k in Table.keys() if k not in run]
for r in not_run:
del Table[r]
for tab in Table:
Table['file'] = find_files(File= Table['file'], sub_path=SUB_FOLDERS, main_path=MAIN_FOLDER)
if d == 0:
for tab in reversed(Table):
# continue
sql = "TRUNCATE TABLE {0}.{1} CASCADE".format(Table[tab]['schema'], Table[tab]['table'])
print(sql)
con_open.execute(sql)
no_r_tab = lambda x: x not in ['ter_com']
for key in filter(no_r_tab, Table.keys()):
# Test existance de la table en bdd
lst_tab = con.dialect.get_table_names(con, schema=Table[key]['schema'])
test = Table[key]['table'] in lst_tab
# Si la table existe
if test:
DICT = Table[key]['dict']
# Test présence d'un champ 'geom' ou 'geometry' dans la table d'export
geom = False
col_tab = con.dialect.get_columns(con, Table[key]['table'], schema=Table[key]['schema'])
for o, obj in enumerate(col_tab):
if 'geom' in obj['name']:
geom = True
geom_name = obj['name']
geom_type = obj['type'].geometry_type
if DICT:
for D, tmp in enumerate(DICT):
DICT[D]['geometry'] = geom_name
# Suppression des champs non utiles
if DICT:
for D, tmp in enumerate(DICT):
if DICT[D] and None in DICT[D].keys():
del DICT[D][None]
if Table[key]['file']:
for f, i_file in enumerate(Table[key]['file']):
# Si présence d'une géometrie dans la table à insérer
if geom:
# if Table[key]['geom']:
# Read new table
print('IMPORT shape for table {0}'.format(Table[key]['table']))
df = gpd.read_file(filename=i_file)
df = gpd.sjoin(df, mask, how='inner', op='intersects', rsuffix='right')
del_cols = [col for col in tutu.columns if col.endswith('right')] + ['FID']
df.drop(columns=del_cols, inplace=True)
df['actif'] = True
# if 'ID' in df.columns:
# df.set_index('ID', inplace=True)
# typ_geom_out = con.dialect.get_columns(con, Table[key]['table'], schema=Table[key]['schema'])
# Harmonisation des géometries
# Transformation des géometries POLYGON ==> MULTIPOLIGON
geom_df = df.geometry.geom_type.unique().tolist()
geom_df = [x.upper() for x in geom_df]
if [geom_type] != geom_df:
if geom_type == 'MULTIPOLYGON' and 'POLYGON' in geom_df:
print('CORRECTION des géometries POLYGON ==> MULTIPOLYGON')
lst = []
for o, obj in enumerate(df.geometry):
if obj.geom_type == 'Polygon':
obj = MultiPolygon([obj])
lst.append(obj)
df['geometry'] = lst
# elif geom_type == 'POLYGON' and 'MULTIPOLYGON' in geom_df:
# df[df.geom.geom_type == 'MultiPolygon']
# pass
else:
print('ERROR : conflit entre la géometrie du df {0} et de la table postgis {1}'.format(geom_df,geom_type))
# Conservation des lignes appartenant au département
print('CONSERVATION des données départementales')
df.columns = df.columns.str.lower()
if 'insee_dep' in df.columns:
df = df.loc[df.insee_dep == dep]
# Formatage des champs pour insertion
print('FORMATAGE des données')
df.rename(columns=DICT[f], inplace=True)
rm_col = df.columns.difference(DICT[f].values())
df.drop(columns=rm_col, inplace=True)
# break
# Identification du champs 'administratif' pour la table territoire
couche = i_file.split('/')
couche = couche[len(couche)-1]
couche = couche.split('.')[0].lower()
if 'ter' == key and couche in administratif:
df['administratif'] = True
elif 'ter' == key and couche not in administratif:
df['administratif'] = False
df['siren'] = [siren[-10:] for siren in df['siren']]
if 'typterritoire_id' in DICT[f].values():
key_typterr = [k for (k, v) in DICT[f].items() if v == 'typterritoire_id'][0]
if 'join' in Table[key].keys() and key_typterr != 'id':
df = join_typterritoire(df, Table[key]['join'])
if key == 'ter' and key_typterr == 'id':
# df['typterritoire_id'] = df.index.to_list()
df['typterritoire_id'] = [typter[:8] for typter in df['typterritoire_id']]
df['typterritoire_id'] = df['typterritoire_id'].str.lower()
df['typterritoire_id'] = df['typterritoire_id'].replace(typterid)
# df[on['x']].replace(tmp[on['y']].str.lower().to_list(),tmp.index.to_list())
# Si présence d'une géometrie dans la table à insérer
if geom:
if not isinstance(df, gpd.GeoDataFrame):
df = df.set_geometry('geom', drop=True, crs=crs)
df.rename(columns={'geometry': 'geom'}, inplace=True)
# if Table[key]['geom']:
df.to_postgis(
name = Table[key]['table'],
con = con,
schema = Table[key]['schema'],
index = False,
if_exists = 'append',
geom_col = geom_name,
# dtype={'geom': Geometry(geometry_type='MULTIPOLYGON', srid=df.crs.to_epsg())}
)
print('''INSERT TABLE OK for DEPT {}
'''.format(dep))
else:
None
# Si la table existe pas
else:
print('ERROR : La table {0} n\'existe pas dans le schéma {1} !'.format(
Table[key]['table'].upper(),
Table[key]['schema'].upper()
))
if 'ter_com' in Table.keys() and d == 0:
tab = Table['ter_com']
print('IMPORT tables for table {0}'.format(tab['table']))
ter_sql = 'SELECT * FROM {sch}.{tab}'.format(sch='territoires', tab='territoires' )
ter = gpd.read_postgis(
sql = ter_sql,
con = con,
geom_col = 'geom',
crs = crs,
)
com_sql = 'SELECT * FROM {sch}.{tab} WHERE actif = true'.format(sch='administratif', tab='communes' )
com = gpd.read_postgis(
sql = com_sql,
con = con,
geom_col = 'geom',
crs = crs,
)
col_id = ['territoire_id', 'code_insee', 'geom']
for df in [ter, com]:
rm_col = [ col for col in df.columns[~df.columns.isin(col_id)] ]
df.drop(columns=rm_col, inplace=True)
print('JOIN tables "territoires" & "communes"')
df = gpd.sjoin(ter, com, op='intersects')
rm_col = [ col for col in df.columns[~df.columns.isin(col_id)] ]
rm_col.append('geom')
df.drop(columns=rm_col, inplace=True)
df = pd.DataFrame(df)
r_sql = 'SELECT code_insee, territoire_id FROM {sch}.{tab}'.format(sch=tab['schema'], tab=tab['table'] )
r_tab = pd.read_sql(
sql = r_sql,
con = con,
)
if not r_tab.empty:
print('DROP lignes présentes dans la table {}'.format(tab['table']))
df = pd.concat([df,r_tab]).drop_duplicates(keep=False)
if not df.empty:
users = pd.read_sql_table(
table_name = 'utilisateurs',
con = con,
schema = 'admin_sig'
)
date_now = cdate.today()
df['actif'] = True
df['date_maj'] = date_now
df['utilisateur_id'] = users[users.utilisateur_id == user].iloc[0]['individu_id']
df.to_sql(
name = tab['table'],
con = con,
schema = tab['schema'],
index = False,
if_exists = 'append'
)
else:
print('''
TOUTES les relations "communes" / "territoires" existent déjà !
''')
# gdf.set_index('id', drop=True, inplace=True)
# for key in Table.keys():
# # schema = Table[key]['schema']
# # table = Table[key]['table']
# # query = 'SELECT * FROM {0}.{1}'.format(schema,table)
# # gdf = gpd.read_postgis(sql=query, con=con)
# df = bd.get_table(
# schema=Table[key]['schema'],
# table=Table[key]['table'])
# print(df)