Python_scripts/0_FONCIER/foncier_insert_administratif_V2.py

574 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#Nom : : foncier_insert_table.py
#Description : Insertion/MAJ des données administratives et territoriales à la base <foncier> lors de sa création.
#Copyright : Mai 2021, CEN38
#Auteur : Colas Geier
#Version : 2.0
import pandas as pd
import geopandas as gpd
from sqlalchemy import create_engine, text
from geoalchemy2 import Geometry
from shapely.geometry.multipolygon import MultiPolygon
from pydate import cdate
from pathlib import Path
from pycen import con_fon as con
import sys
import os
# from pycen import bdd
# import psycopg2
####################################
####################################
####################################
# PARAMETRES
# Liste des tables à mettre à jour. ATTENTION : le respect des nomenclatures est importante
# Liste dispo : ['com', 'dpt', 'ter', 'ter_com'] Cette liste doit être identique aux index de l'object : Table
# Correspondance tables BD_FONCIER : ['communes', 'departements', 'territoires', 'r_histo_com', 'r_ter_com']
run = ['com']
# INSERT, TRUNCATE, UPDATE
action = 'UPDATE'
# Parametres bdd
# user = 'remi_clement'
# pwd = 'adm1n*CEN'
# adr = '192.168.60.9'
user = 'colas_geier'
pwd = 'adm1n*fOncier'
adr = '91.134.194.221'
port = '5432'
base = 'bd_cen'
epsg = '2154'
crs = 'EPSG:%s'%epsg
# PATH
INDEX_FOLDER = '/home/colas' # "/home/USER" (Unix), "C:" ou autre (Windows)
MAIN_FOLDER = os.path.join(INDEX_FOLDER,'Documents') # Dossier principale
SUB_FOLDERS = os.path.join('5_BDD','1_QGIS') # Sous dossier (facultatif: os.path.join('subFolder1','subFolder2'), None si non précisé)
FILE_COM = ['COMMUNE.shp']
FILE_DPT = ['DEPARTEMENT.shp']
FILE_TER = [
'EPCI.shp',
'PARC_OU_RESERVE.shp',
'BASSIN_VERSANT_TOPOGRAPHIQUE.shp',
'Contours_GRPT_AURA.shp' ]
administratif = ['epci'] # liste des couches administratives listé dans PATH_TER
mask_Mfolder = os.path.join(INDEX_FOLDER,'Documents')
mask_Sfolder = os.path.join('5_BDD','1_QGIS')
mask_file = 'mask_parcelles_cadastre.shp'
# DICT FILE / TABLE
IN_COM = [{
'id': 'id',
'insee_com': 'code_insee',
'nom': 'nom',
None: 'prec_plani', # Plus fournis par la bd_topo
None: 'statut', # Plus fournis par la bd_topo
None: 'canton', # Plus fournis par la bd_topo
'insee_arr': 'arrondisst',
'insee_dep': 'depart',
'insee_reg': 'region',
'population': 'popul',
None: 'multican', # Plus fournis par la bd_topo
'actif': 'actif',
None: 'epfl', # Plus fournis par la bd_topo
}]
IN_DPT = [{
'id': 'id',
'nom': 'nom',
'insee_dep': 'insee_dep',
'insee_reg': 'insee_reg',
'date_creat': 'date_creat',
'date_maj': 'date_maj',
'actif': 'actif',
}]
IN_TER = [{ # DICT epci
'': 'territoire_id',
'code_siren': 'siren',
'nom': 'territoire_lib',
'': 'territoire_sigle',
'nature': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},{ # DICT parc_ou_reserve
'': 'territoire_id',
'id': 'siren', # absence de code siren ==> récup des 10 derniers charactères du champs ID
'toponyme': 'territoire_lib',
'': 'territoire_sigle',
'nature': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},{ # DICT bassin_versant
'': 'territoire_id',
'code_hydro': 'siren', # absence de code siren ==> récup des 10 derniers charactères du champs ID
'toponyme': 'territoire_lib',
'': 'territoire_sigle',
'id': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},{ # DICT epci
'': 'territoire_id',
'sirengrpt': 'siren',
'grpt': 'territoire_lib',
'': 'territoire_sigle',
'nature': 'typterritoire_id',
'': 'administratif',
'': 'prefixe',
},]
# DICT typterritoire_id lorsqu'une correspondance
# n'est pas possible avec un champs de la table attributaire.
# Relation typterritoire_id / ID (de la table attributaire).
typterid = {
'bassvers' : 'bv'
}
Table = {
'dpt': {'schema':'administratif', 'table':'departements', 'name_file': FILE_DPT, 'geom': True, 'dict':IN_DPT, 'unique': 'insee_dep'},
'com' : {'schema':'administratif', 'table':'communes', 'name_file': FILE_COM, 'geom': True, 'dict':IN_COM, 'unique': 'code_insee'},
# 'histo_com': {'schema':'administratif', 'table':'r_histo_com', 'name_file':None, 'geom': False, 'dict':None},
'ter': {'schema':'territoires', 'table':'territoires', 'name_file': FILE_TER, 'geom': True, 'dict':IN_TER, 'unique': 'siren', 'join': {
'schema':'territoires', 'table': 'd_typterritoire', 'id': 'typterritoire_id', 'on': {'x': 'typterritoire_id', 'y': 'typterritoire_lib'}}},
'ter_com': {'schema':'territoires', 'table':'r_ter_com', 'name_file':None, 'geom': False, 'dict':None, 'unique': ['code_insee', 'territoire_id']}
}
####################################
####################################
####################################
# FONCTIONS
def join_typterritoire(df, join):
'''
@df : dataframe
@join : dict
'''
tab = join['table']
sch = join['schema']
ind = join['id']
on = join['on']
tmp = pd.read_sql_table(
table_name = tab,
con = con,
schema = sch,
index_col = ind,
)
df[on['x']] = df[on['x']].str.lower()
df[on['x']] = df[on['x']].replace(tmp[on['y']].str.lower().to_list(),tmp.index.to_list())
df = df[ df[on['x']].isin(tmp.index.to_list()) ]
return df
def find_files(File, main_path='Documents', sub_path=None):
'''
@File : list
@main_path : str
@sub_path : str
'''
sub = ''
if sub_path:
sub = sub_path
path = os.path.join(main_path, sub)
list_path = []
nb_path = []
for F in File :
matches = [str(path) for path in Path(path).rglob(F)]
list_path += matches
nb_path += str(len(matches))
return list_path, nb_path
def tab_has_data(con, schema, table):
'''
@con : connection sqlalchemy create_engine
@schema : str
@table : str
'''
has_sch = con.dialect.has_schema(con, schema=schema)
if has_sch :
has_table = con.dialect.has_table(con, table_name=table, schema=schema)
if has_table:
sql = 'SELECT * FROM {sch}.{tab} LIMIT 1'.format(sch=schema,tab=table)
df = pd.read_sql_query(
sql = sql,
con = con
)
return not df.empty
else:
return '''TABLE %s don't exist in SCHEMA %s''' %(table,schema)
else :
return '''SCHEMA %s don't exist'''%schema
def update_data(df, con, sch, tab, epsg=None):
columns = df.columns.to_list()
frame = df.copy()
frame.replace("'","''", regex=True, inplace=True)
pkey = con.dialect.get_pk_constraint(con, table_name=tab, schema=sch)['constrained_columns']
if 'geom' in columns or 'geometry' in columns:
if epsg or df.crs:
if not epsg:
epsg = df.crs.to_epsg()
name_geom = df.geometry.name
frame[name_geom] = 'SRID={epsg};'.format(epsg=epsg) + df[name_geom].map(str)
# else: return 'No crs define in update_data or in gdf'
for c, col in enumerate(columns):
if c == 0:
frame['insert'] = "('" + frame[col].map(str)
# break
else:
frame['insert'] = frame['insert'] + "','" + frame[col].map(str)
if c == len(columns)-1:
frame['insert'] = frame['insert'] + "')"
# if c == 0:
# frame['insert'] = '("' + frame[col].map(str)
# # break
# else:
# frame['insert'] = frame['insert'] + '","' + frame[col].map(str)
# if c == len(columns)-1:
# frame['insert'] = frame['insert'] + '")'
lst_cols = ', '.join(columns)
lst_vals = ','.join(frame['insert'])
lst_dupKey = ', '.join([col + '=EXCLUDED.' + col for col in columns])
lst_pkey = ','.join(pkey)
sql = '''INSERT INTO {sch}.{tab} ({lst_cols}) VALUES {lst_vals} ON CONFLICT ({lst_pkey}) DO UPDATE SET {lst_dupKey} ;'''.format(
sch=sch, tab=tab, lst_cols=lst_cols, lst_vals=lst_vals, lst_dupKey=lst_dupKey, lst_pkey=lst_pkey)
# sql = '''INSERT INTO {sch}.{tab} ({lst_cols})
# VALUES {lst_vals}
# ON CONFLICT DO NOTHING;
# '''.format(sch=sch, tab=tab, lst_cols=lst_cols, lst_vals=lst_vals)
try:
con.execute(sql)
# con.execute(text(sql))
print('''
Update OK !''')
except Exception as exept:
print(exept)
# con = create_engine('postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(user,pwd,adr,port,base), echo=False)
con_open = con.connect()
# conn = psycopg2.connect(
# user=user,
# password=pwd,
# host=adr,
# port=port,
# database=base)
####################################
####################################
####################################
# MAIN
if __name__ == "__main__":
mask_f = find_files(File=[mask_file], sub_path=mask_Sfolder, main_path=mask_Mfolder)[0]
if mask_f:
mask = gpd.read_file(mask_f[0], crs=crs)
else :
sys.exit('NO MASK FILE')
not_run = [k for k in Table.keys() if k not in run]
for r in not_run:
del Table[r]
for tab in Table :
if Table[tab]['name_file']:
Table[tab]['file'], Table[tab]['nb_typ_file'] = find_files(File=Table[tab]['name_file'], sub_path=SUB_FOLDERS, main_path=MAIN_FOLDER)
d = []
for i, nb in enumerate(Table[tab]['nb_typ_file']):
d += [ Table[tab]['dict'][i] ] * int(nb)
Table[tab]['dict'] = d
else:
Table[tab]['file'] = None
if action == 'TRUNCATE':
for tab in reversed(Table):
# continue
sql = "TRUNCATE TABLE {0}.{1} CASCADE".format(Table[tab]['schema'], Table[tab]['table'])
print(sql)
con_open.execute(sql)
sys.exit('END TRUNCATE')
else:
# filter1 = lambda x: x not in ['ter_com']
# for key in filter(filter1, Table.keys()):
for key in Table.keys():
# Test existance de données dans la table en bdd
has_data = tab_has_data(con, schema=Table[key]['schema'], table=Table[key]['table'])
# Si la table existe
if type(has_data) == bool :
# récupération des données présentent en bdd
if type(Table[key]['unique']) == str:
Table[key]['unique'] = [ Table[key]['unique'] ]
df_exist = False
if has_data :
df_exist = pd.read_sql_table(
con = con,
schema = Table[key]['schema'],
table_name = Table[key]['table'],
columns = Table[key]['unique']
)
DICT = Table[key]['dict']
# Test présence d'un champ 'geom' ou 'geometry' dans la table d'export
# Si géometrie, ajout du champs au dictionnaire
geom = False
col_tab = con.dialect.get_columns(con, Table[key]['table'], schema=Table[key]['schema'])
for o, obj in enumerate(col_tab):
if 'geom' in obj['name']:
geom = True
geom_name = obj['name']
geom_type = obj['type'].geometry_type
if DICT:
for D, tmp in enumerate(DICT):
DICT[D]['geometry'] = geom_name
# Récupération des données existantes dans la base de données
# Suppression des champs non utiles
if DICT:
for D, tmp in enumerate(DICT):
if DICT[D] and None in DICT[D].keys():
del DICT[D][None]
if Table[key]['file']:
for f, i_file in enumerate(Table[key]['file']):
# Test existance de données dans la table en bdd
has_data = tab_has_data(con, schema=Table[key]['schema'], table=Table[key]['table'])
# Si la table existe
if type(has_data) == bool :
# récupération des données présentent en bdd
if type(Table[key]['unique']) == str:
Table[key]['unique'] = [ Table[key]['unique'] ]
df_exist = False
if has_data :
pkey = con.dialect.get_pk_constraint(con, table_name=Table[key]['table'], schema=Table[key]['schema'])['constrained_columns']
df_exist = pd.read_sql_table(
con = con,
schema = Table[key]['schema'],
table_name = Table[key]['table'],
columns = Table[key]['unique'],
index_col = pkey
)
if df_exist.shape[1] == 0 :
df_exist[Table[key]['unique']] = df_exist.index
print('''
IMPORT {1} for table {0}'''.format(Table[key]['table'], i_file))
# Si présence d'une géometrie dans la table à insérer
if geom:
# if Table[key]['geom']:
# Read new table
df = gpd.read_file(filename=i_file)
if not df.crs:
df.set_crs(crs=crs, inplace=True)
df = gpd.sjoin(df, mask, how='inner', op='intersects', rsuffix='right')
del_cols = [col for col in df.columns if col.endswith('right')] + ['FID']
df.drop(columns=del_cols, inplace=True)
df['actif'] = True
geom_df = df.geometry.geom_type.unique().tolist()
geom_df = [x.upper() for x in geom_df]
if [geom_type] != geom_df:
if geom_type == 'MULTIPOLYGON' and 'POLYGON' in geom_df:
print('CORRECTION des géometries POLYGON ==> MULTIPOLYGON')
lst = []
for o, obj in enumerate(df.geometry):
if obj.geom_type == 'Polygon':
obj = MultiPolygon([obj])
lst.append(obj)
df['geometry'] = lst
# elif geom_type == 'POLYGON' and 'MULTIPOLYGON' in geom_df:
# df[df.geom.geom_type == 'MultiPolygon']
# pass
else:
print('ERROR : conflit entre la géometrie du df {0} et de la table postgis {1}'.format(geom_df,geom_type))
else:
# Si le fichier à importer ne possède pas de géometrie
print('NO geom !')
print('IMPORT data without geom : No-config !')
# Conservation des lignes appartenant au département
print('CONSERVATION des données départementales')
df.columns = df.columns.str.lower()
# if 'insee_dep' in df.columns:
# df = df.loc[df.insee_dep == dep]
# Formatage des champs pour insertion
print('FORMATAGE des données')
df.rename(columns=DICT[f], inplace=True)
rm_col = df.columns.difference(DICT[f].values())
df.drop(columns=rm_col, inplace=True)
# Identification du champs 'administratif' pour la table territoire
couche = i_file.split('/')
couche = couche[len(couche)-1]
couche = couche.split('.')[0].lower()
if 'ter' == key and couche in administratif:
df['administratif'] = True
elif 'ter' == key and couche not in administratif:
df['administratif'] = False
if df['siren'].dtypes == float:
df['siren'] = df['siren'].astype(int)
if df['siren'].dtypes == int:
df['siren'] = df['siren'].astype(str)
df['siren'] = [siren[-10:] for siren in df['siren']]
if 'typterritoire_id' in DICT[f].values():
key_typterr = [k for (k, v) in DICT[f].items() if v == 'typterritoire_id'][0]
if 'join' in Table[key].keys() and key_typterr != 'id':
df = join_typterritoire(df, Table[key]['join'])
if key == 'ter' and key_typterr == 'id':
df['typterritoire_id'] = [typter[:8] for typter in df['typterritoire_id']]
df['typterritoire_id'] = df['typterritoire_id'].str.lower()
df['typterritoire_id'] = df['typterritoire_id'].replace(typterid)
# if key == 'com':
# sys.exit()
# Suppression des lignes existantes en bdd
if action == 'INSERT' and has_data:
unique_cols = df_exist.columns.to_list()
for d in [df, df_exist]:
d['exist'] = ''
for col in unique_cols:
d['exist'] += d[col].astype(str)
df = df[~df['exist'].isin(df_exist['exist'])]
df.drop(columns='exist', inplace=True)
if action == 'UPDATE':
unique_cols = df_exist.columns.to_list()
for d in [df, df_exist]:
d['exist'] = ''
for col in unique_cols:
d['exist'] += d[col].astype(str)
df = df[df['exist'].isin(df_exist['exist'])].sort_values(unique_cols)
ind = df_exist[df_exist['exist'].isin(df['exist'])].sort_values(unique_cols).index
df.set_index(ind, inplace=True)
df.drop(columns='exist', inplace=True)
# Si présence d'une géometrie dans la table à insérer
if geom and not df.empty and action == 'INSERT' :
if not isinstance(df, gpd.GeoDataFrame):
df = df.set_geometry('geom', drop=False, crs=crs)
# df.rename(columns={'geometry': 'geom'}, inplace=True)
df.to_postgis(
name = Table[key]['table'],
con = con,
schema = Table[key]['schema'],
index = False,
if_exists = 'append',
geom_col = geom_name,
)
elif geom and df.empty and action == 'INSERT' :
print('NO NEWS data insert !')
# Si présence d'une géometrie dans la table à updater
elif geom and not df.empty and action == 'UPDATE' :
if not isinstance(df, gpd.GeoDataFrame):
df = df.set_geometry('geom', drop=False, crs=crs)
df.reset_index(inplace=True)
update_data(df, con, sch=Table[key]['schema'], tab=Table[key]['table'])
print('NO NEWS data update !')
sys.exit()
elif geom and df.empty and action == 'UPDATE' :
print('NO NEWS data update !')
else:
# Si les données à importer sont issues d'un fichier sans géometrie
print('FILE WITHOUT GEOM !')
else:
# Si il n'y a pas de données à importer
# Création des liens relationnelles Communes/Territoires.
print('NO IMPORT FILE !')
tab = Table[key]
print('IMPORT tables for table {0}'.format(tab['table']))
# SELECT data territoires
ter_sql = 'SELECT * FROM {sch}.{tab}'.format(sch='territoires', tab='territoires' )
ter = gpd.read_postgis(
sql = ter_sql,
con = con,
geom_col = 'geom',
crs = crs,
)
# SELECT DATA communes
com_sql = 'SELECT * FROM {sch}.{tab} WHERE actif = true'.format(sch='administratif', tab='communes' )
com = gpd.read_postgis(
sql = com_sql,
con = con,
geom_col = 'geom',
crs = crs,
)
# Conservation des données utiles
tab['unique'] += ['geom']
for df in [ter, com]:
rm_col = [ col for col in df.columns[~df.columns.isin(tab['unique'])] ]
df.drop(columns=rm_col, inplace=True)
# Jointure territoires VS communes
if not ter.empty and not com.empty:
print('JOIN tables "territoires" & "communes"')
df = gpd.sjoin(ter, com, op='intersects')
rm_col = [ col for col in df.columns[~df.columns.isin(tab['unique'])] ]
rm_col.append('geom')
df.drop(columns=rm_col, inplace=True)
df = pd.DataFrame(df)
# Récupération des données déjà présentent en bdd
# r_sql = 'SELECT code_insee, territoire_id FROM {sch}.{tab}'.format(sch=tab['schema'], tab=tab['table'] )
# r_tab = pd.read_sql(
# sql = r_sql,
# con = con,
# )
# if not r_tab.empty:
# print('DROP lignes présentes dans la table {}'.format(tab['table']))
# df = pd.concat([df,r_tab]).drop_duplicates(keep=False)
if action == 'INSERT' and has_data:
df = pd.concat([df,df_exist]).drop_duplicates(keep=False)
if action == 'INSERT' and not df.empty:
users = pd.read_sql_table(
table_name = 'utilisateurs',
con = con,
schema = 'admin_sig'
)
date_now = cdate.today()
df['actif'] = True
df['date_maj'] = date_now
df['utilisateur_id'] = users[users.utilisateur_id == user].iloc[0]['individu_id']
df.to_sql(
name = tab['table'],
con = con,
schema = tab['schema'],
index = False,
if_exists = 'append'
)
else:
print('''
TOUTES les relations "communes" / "territoires" existent déjà !
''')
elif ter.empty and not com.empty:
print('''
NO DATA dans la table "territoires" !
''')
elif not ter.empty and com.empty:
print('''
NO DATA dans la table "communes" !
''')
else:
print('''
NO DATA dans la table "communes" et dans la table "territoires" !
''')
sys.exit()