#!/usr/bin/env python3 # -*- coding: UTF-8 -*- #Nom : : foncier_insert_table.py #Description : Insertion/MAJ des données administratives et territoriales à la base lors de sa création. #Copyright : Mai 2021, CEN38 #Auteur : Colas Geier #Version : 2.0 import pandas as pd import geopandas as gpd from sqlalchemy import create_engine, text from geoalchemy2 import Geometry from shapely.geometry.multipolygon import MultiPolygon from pydate import cdate from pathlib import Path from pycen import con_fon as con import sys import os # from pycen import bdd # import psycopg2 #################################### #################################### #################################### # PARAMETRES # Liste des tables à mettre à jour. ATTENTION : le respect des nomenclatures est importante # Liste dispo : ['com', 'dpt', 'ter', 'ter_com'] Cette liste doit être identique aux index de l'object : Table # Correspondance tables BD_FONCIER : ['communes', 'departements', 'territoires', 'r_histo_com', 'r_ter_com'] run = ['com'] # INSERT, TRUNCATE, UPDATE action = 'UPDATE' # Parametres bdd # user = 'remi_clement' # pwd = 'adm1n*CEN' # adr = '192.168.60.9' user = 'colas_geier' pwd = 'adm1n*fOncier' adr = '91.134.194.221' port = '5432' base = 'bd_cen' epsg = '2154' crs = 'EPSG:%s'%epsg # PATH INDEX_FOLDER = '/home/colas' # "/home/USER" (Unix), "C:" ou autre (Windows) MAIN_FOLDER = os.path.join(INDEX_FOLDER,'Documents') # Dossier principale SUB_FOLDERS = os.path.join('5_BDD','1_QGIS') # Sous dossier (facultatif: os.path.join('subFolder1','subFolder2'), None si non précisé) FILE_COM = ['COMMUNE.shp'] FILE_DPT = ['DEPARTEMENT.shp'] FILE_TER = [ 'EPCI.shp', 'PARC_OU_RESERVE.shp', 'BASSIN_VERSANT_TOPOGRAPHIQUE.shp', 'Contours_GRPT_AURA.shp' ] administratif = ['epci'] # liste des couches administratives listé dans PATH_TER mask_Mfolder = os.path.join(INDEX_FOLDER,'Documents') mask_Sfolder = os.path.join('5_BDD','1_QGIS') mask_file = 'mask_parcelles_cadastre.shp' # DICT FILE / TABLE IN_COM = [{ 'id': 'id', 'insee_com': 'code_insee', 'nom': 'nom', None: 'prec_plani', # Plus fournis par la bd_topo None: 'statut', # Plus fournis par la bd_topo None: 'canton', # Plus fournis par la bd_topo 'insee_arr': 'arrondisst', 'insee_dep': 'depart', 'insee_reg': 'region', 'population': 'popul', None: 'multican', # Plus fournis par la bd_topo 'actif': 'actif', None: 'epfl', # Plus fournis par la bd_topo }] IN_DPT = [{ 'id': 'id', 'nom': 'nom', 'insee_dep': 'insee_dep', 'insee_reg': 'insee_reg', 'date_creat': 'date_creat', 'date_maj': 'date_maj', 'actif': 'actif', }] IN_TER = [{ # DICT epci '': 'territoire_id', 'code_siren': 'siren', 'nom': 'territoire_lib', '': 'territoire_sigle', 'nature': 'typterritoire_id', '': 'administratif', '': 'prefixe', },{ # DICT parc_ou_reserve '': 'territoire_id', 'id': 'siren', # absence de code siren ==> récup des 10 derniers charactères du champs ID 'toponyme': 'territoire_lib', '': 'territoire_sigle', 'nature': 'typterritoire_id', '': 'administratif', '': 'prefixe', },{ # DICT bassin_versant '': 'territoire_id', 'code_hydro': 'siren', # absence de code siren ==> récup des 10 derniers charactères du champs ID 'toponyme': 'territoire_lib', '': 'territoire_sigle', 'id': 'typterritoire_id', '': 'administratif', '': 'prefixe', },{ # DICT epci '': 'territoire_id', 'sirengrpt': 'siren', 'grpt': 'territoire_lib', '': 'territoire_sigle', 'nature': 'typterritoire_id', '': 'administratif', '': 'prefixe', },] # DICT typterritoire_id lorsqu'une correspondance # n'est pas possible avec un champs de la table attributaire. # Relation typterritoire_id / ID (de la table attributaire). typterid = { 'bassvers' : 'bv' } Table = { 'dpt': {'schema':'administratif', 'table':'departements', 'name_file': FILE_DPT, 'geom': True, 'dict':IN_DPT, 'unique': 'insee_dep'}, 'com' : {'schema':'administratif', 'table':'communes', 'name_file': FILE_COM, 'geom': True, 'dict':IN_COM, 'unique': 'code_insee'}, # 'histo_com': {'schema':'administratif', 'table':'r_histo_com', 'name_file':None, 'geom': False, 'dict':None}, 'ter': {'schema':'territoires', 'table':'territoires', 'name_file': FILE_TER, 'geom': True, 'dict':IN_TER, 'unique': 'siren', 'join': { 'schema':'territoires', 'table': 'd_typterritoire', 'id': 'typterritoire_id', 'on': {'x': 'typterritoire_id', 'y': 'typterritoire_lib'}}}, 'ter_com': {'schema':'territoires', 'table':'r_ter_com', 'name_file':None, 'geom': False, 'dict':None, 'unique': ['code_insee', 'territoire_id']} } #################################### #################################### #################################### # FONCTIONS def join_typterritoire(df, join): ''' @df : dataframe @join : dict ''' tab = join['table'] sch = join['schema'] ind = join['id'] on = join['on'] tmp = pd.read_sql_table( table_name = tab, con = con, schema = sch, index_col = ind, ) df[on['x']] = df[on['x']].str.lower() df[on['x']] = df[on['x']].replace(tmp[on['y']].str.lower().to_list(),tmp.index.to_list()) df = df[ df[on['x']].isin(tmp.index.to_list()) ] return df def find_files(File, main_path='Documents', sub_path=None): ''' @File : list @main_path : str @sub_path : str ''' sub = '' if sub_path: sub = sub_path path = os.path.join(main_path, sub) list_path = [] nb_path = [] for F in File : matches = [str(path) for path in Path(path).rglob(F)] list_path += matches nb_path += str(len(matches)) return list_path, nb_path def tab_has_data(con, schema, table): ''' @con : connection sqlalchemy create_engine @schema : str @table : str ''' has_sch = con.dialect.has_schema(con, schema=schema) if has_sch : has_table = con.dialect.has_table(con, table_name=table, schema=schema) if has_table: sql = 'SELECT * FROM {sch}.{tab} LIMIT 1'.format(sch=schema,tab=table) df = pd.read_sql_query( sql = sql, con = con ) return not df.empty else: return '''TABLE %s don't exist in SCHEMA %s''' %(table,schema) else : return '''SCHEMA %s don't exist'''%schema def update_data(df, con, sch, tab, epsg=None): columns = df.columns.to_list() frame = df.copy() frame.replace("'","''", regex=True, inplace=True) pkey = con.dialect.get_pk_constraint(con, table_name=tab, schema=sch)['constrained_columns'] if 'geom' in columns or 'geometry' in columns: if epsg or df.crs: if not epsg: epsg = df.crs.to_epsg() name_geom = df.geometry.name frame[name_geom] = 'SRID={epsg};'.format(epsg=epsg) + df[name_geom].map(str) # else: return 'No crs define in update_data or in gdf' for c, col in enumerate(columns): if c == 0: frame['insert'] = "('" + frame[col].map(str) # break else: frame['insert'] = frame['insert'] + "','" + frame[col].map(str) if c == len(columns)-1: frame['insert'] = frame['insert'] + "')" # if c == 0: # frame['insert'] = '("' + frame[col].map(str) # # break # else: # frame['insert'] = frame['insert'] + '","' + frame[col].map(str) # if c == len(columns)-1: # frame['insert'] = frame['insert'] + '")' lst_cols = ', '.join(columns) lst_vals = ','.join(frame['insert']) lst_dupKey = ', '.join([col + '=EXCLUDED.' + col for col in columns]) lst_pkey = ','.join(pkey) sql = '''INSERT INTO {sch}.{tab} ({lst_cols}) VALUES {lst_vals} ON CONFLICT ({lst_pkey}) DO UPDATE SET {lst_dupKey} ;'''.format( sch=sch, tab=tab, lst_cols=lst_cols, lst_vals=lst_vals, lst_dupKey=lst_dupKey, lst_pkey=lst_pkey) # sql = '''INSERT INTO {sch}.{tab} ({lst_cols}) # VALUES {lst_vals} # ON CONFLICT DO NOTHING; # '''.format(sch=sch, tab=tab, lst_cols=lst_cols, lst_vals=lst_vals) try: con.execute(sql) # con.execute(text(sql)) print(''' Update OK !''') except Exception as exept: print(exept) # con = create_engine('postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(user,pwd,adr,port,base), echo=False) con_open = con.connect() # conn = psycopg2.connect( # user=user, # password=pwd, # host=adr, # port=port, # database=base) #################################### #################################### #################################### # MAIN if __name__ == "__main__": mask_f = find_files(File=[mask_file], sub_path=mask_Sfolder, main_path=mask_Mfolder)[0] if mask_f: mask = gpd.read_file(mask_f[0], crs=crs) else : sys.exit('NO MASK FILE') not_run = [k for k in Table.keys() if k not in run] for r in not_run: del Table[r] for tab in Table : if Table[tab]['name_file']: Table[tab]['file'], Table[tab]['nb_typ_file'] = find_files(File=Table[tab]['name_file'], sub_path=SUB_FOLDERS, main_path=MAIN_FOLDER) d = [] for i, nb in enumerate(Table[tab]['nb_typ_file']): d += [ Table[tab]['dict'][i] ] * int(nb) Table[tab]['dict'] = d else: Table[tab]['file'] = None if action == 'TRUNCATE': for tab in reversed(Table): # continue sql = "TRUNCATE TABLE {0}.{1} CASCADE".format(Table[tab]['schema'], Table[tab]['table']) print(sql) con_open.execute(sql) sys.exit('END TRUNCATE') else: # filter1 = lambda x: x not in ['ter_com'] # for key in filter(filter1, Table.keys()): for key in Table.keys(): # Test existance de données dans la table en bdd has_data = tab_has_data(con, schema=Table[key]['schema'], table=Table[key]['table']) # Si la table existe if type(has_data) == bool : # récupération des données présentent en bdd if type(Table[key]['unique']) == str: Table[key]['unique'] = [ Table[key]['unique'] ] df_exist = False if has_data : df_exist = pd.read_sql_table( con = con, schema = Table[key]['schema'], table_name = Table[key]['table'], columns = Table[key]['unique'] ) DICT = Table[key]['dict'] # Test présence d'un champ 'geom' ou 'geometry' dans la table d'export # Si géometrie, ajout du champs au dictionnaire geom = False col_tab = con.dialect.get_columns(con, Table[key]['table'], schema=Table[key]['schema']) for o, obj in enumerate(col_tab): if 'geom' in obj['name']: geom = True geom_name = obj['name'] geom_type = obj['type'].geometry_type if DICT: for D, tmp in enumerate(DICT): DICT[D]['geometry'] = geom_name # Récupération des données existantes dans la base de données # Suppression des champs non utiles if DICT: for D, tmp in enumerate(DICT): if DICT[D] and None in DICT[D].keys(): del DICT[D][None] if Table[key]['file']: for f, i_file in enumerate(Table[key]['file']): # Test existance de données dans la table en bdd has_data = tab_has_data(con, schema=Table[key]['schema'], table=Table[key]['table']) # Si la table existe if type(has_data) == bool : # récupération des données présentent en bdd if type(Table[key]['unique']) == str: Table[key]['unique'] = [ Table[key]['unique'] ] df_exist = False if has_data : pkey = con.dialect.get_pk_constraint(con, table_name=Table[key]['table'], schema=Table[key]['schema'])['constrained_columns'] df_exist = pd.read_sql_table( con = con, schema = Table[key]['schema'], table_name = Table[key]['table'], columns = Table[key]['unique'], index_col = pkey ) if df_exist.shape[1] == 0 : df_exist[Table[key]['unique']] = df_exist.index print(''' IMPORT {1} for table {0}'''.format(Table[key]['table'], i_file)) # Si présence d'une géometrie dans la table à insérer if geom: # if Table[key]['geom']: # Read new table df = gpd.read_file(filename=i_file) if not df.crs: df.set_crs(crs=crs, inplace=True) df = gpd.sjoin(df, mask, how='inner', op='intersects', rsuffix='right') del_cols = [col for col in df.columns if col.endswith('right')] + ['FID'] df.drop(columns=del_cols, inplace=True) df['actif'] = True geom_df = df.geometry.geom_type.unique().tolist() geom_df = [x.upper() for x in geom_df] if [geom_type] != geom_df: if geom_type == 'MULTIPOLYGON' and 'POLYGON' in geom_df: print('CORRECTION des géometries POLYGON ==> MULTIPOLYGON') lst = [] for o, obj in enumerate(df.geometry): if obj.geom_type == 'Polygon': obj = MultiPolygon([obj]) lst.append(obj) df['geometry'] = lst # elif geom_type == 'POLYGON' and 'MULTIPOLYGON' in geom_df: # df[df.geom.geom_type == 'MultiPolygon'] # pass else: print('ERROR : conflit entre la géometrie du df {0} et de la table postgis {1}'.format(geom_df,geom_type)) else: # Si le fichier à importer ne possède pas de géometrie print('NO geom !') print('IMPORT data without geom : No-config !') # Conservation des lignes appartenant au département print('CONSERVATION des données départementales') df.columns = df.columns.str.lower() # if 'insee_dep' in df.columns: # df = df.loc[df.insee_dep == dep] # Formatage des champs pour insertion print('FORMATAGE des données') df.rename(columns=DICT[f], inplace=True) rm_col = df.columns.difference(DICT[f].values()) df.drop(columns=rm_col, inplace=True) # Identification du champs 'administratif' pour la table territoire couche = i_file.split('/') couche = couche[len(couche)-1] couche = couche.split('.')[0].lower() if 'ter' == key and couche in administratif: df['administratif'] = True elif 'ter' == key and couche not in administratif: df['administratif'] = False if df['siren'].dtypes == float: df['siren'] = df['siren'].astype(int) if df['siren'].dtypes == int: df['siren'] = df['siren'].astype(str) df['siren'] = [siren[-10:] for siren in df['siren']] if 'typterritoire_id' in DICT[f].values(): key_typterr = [k for (k, v) in DICT[f].items() if v == 'typterritoire_id'][0] if 'join' in Table[key].keys() and key_typterr != 'id': df = join_typterritoire(df, Table[key]['join']) if key == 'ter' and key_typterr == 'id': df['typterritoire_id'] = [typter[:8] for typter in df['typterritoire_id']] df['typterritoire_id'] = df['typterritoire_id'].str.lower() df['typterritoire_id'] = df['typterritoire_id'].replace(typterid) # if key == 'com': # sys.exit() # Suppression des lignes existantes en bdd if action == 'INSERT' and has_data: unique_cols = df_exist.columns.to_list() for d in [df, df_exist]: d['exist'] = '' for col in unique_cols: d['exist'] += d[col].astype(str) df = df[~df['exist'].isin(df_exist['exist'])] df.drop(columns='exist', inplace=True) if action == 'UPDATE': unique_cols = df_exist.columns.to_list() for d in [df, df_exist]: d['exist'] = '' for col in unique_cols: d['exist'] += d[col].astype(str) df = df[df['exist'].isin(df_exist['exist'])].sort_values(unique_cols) ind = df_exist[df_exist['exist'].isin(df['exist'])].sort_values(unique_cols).index df.set_index(ind, inplace=True) df.drop(columns='exist', inplace=True) # Si présence d'une géometrie dans la table à insérer if geom and not df.empty and action == 'INSERT' : if not isinstance(df, gpd.GeoDataFrame): df = df.set_geometry('geom', drop=False, crs=crs) # df.rename(columns={'geometry': 'geom'}, inplace=True) df.to_postgis( name = Table[key]['table'], con = con, schema = Table[key]['schema'], index = False, if_exists = 'append', geom_col = geom_name, ) elif geom and df.empty and action == 'INSERT' : print('NO NEWS data insert !') # Si présence d'une géometrie dans la table à updater elif geom and not df.empty and action == 'UPDATE' : if not isinstance(df, gpd.GeoDataFrame): df = df.set_geometry('geom', drop=False, crs=crs) df.reset_index(inplace=True) update_data(df, con, sch=Table[key]['schema'], tab=Table[key]['table']) print('NO NEWS data update !') sys.exit() elif geom and df.empty and action == 'UPDATE' : print('NO NEWS data update !') else: # Si les données à importer sont issues d'un fichier sans géometrie print('FILE WITHOUT GEOM !') else: # Si il n'y a pas de données à importer # Création des liens relationnelles Communes/Territoires. print('NO IMPORT FILE !') tab = Table[key] print('IMPORT tables for table {0}'.format(tab['table'])) # SELECT data territoires ter_sql = 'SELECT * FROM {sch}.{tab}'.format(sch='territoires', tab='territoires' ) ter = gpd.read_postgis( sql = ter_sql, con = con, geom_col = 'geom', crs = crs, ) # SELECT DATA communes com_sql = 'SELECT * FROM {sch}.{tab} WHERE actif = true'.format(sch='administratif', tab='communes' ) com = gpd.read_postgis( sql = com_sql, con = con, geom_col = 'geom', crs = crs, ) # Conservation des données utiles tab['unique'] += ['geom'] for df in [ter, com]: rm_col = [ col for col in df.columns[~df.columns.isin(tab['unique'])] ] df.drop(columns=rm_col, inplace=True) # Jointure territoires VS communes if not ter.empty and not com.empty: print('JOIN tables "territoires" & "communes"') df = gpd.sjoin(ter, com, op='intersects') rm_col = [ col for col in df.columns[~df.columns.isin(tab['unique'])] ] rm_col.append('geom') df.drop(columns=rm_col, inplace=True) df = pd.DataFrame(df) # Récupération des données déjà présentent en bdd # r_sql = 'SELECT code_insee, territoire_id FROM {sch}.{tab}'.format(sch=tab['schema'], tab=tab['table'] ) # r_tab = pd.read_sql( # sql = r_sql, # con = con, # ) # if not r_tab.empty: # print('DROP lignes présentes dans la table {}'.format(tab['table'])) # df = pd.concat([df,r_tab]).drop_duplicates(keep=False) if action == 'INSERT' and has_data: df = pd.concat([df,df_exist]).drop_duplicates(keep=False) if action == 'INSERT' and not df.empty: users = pd.read_sql_table( table_name = 'utilisateurs', con = con, schema = 'admin_sig' ) date_now = cdate.today() df['actif'] = True df['date_maj'] = date_now df['utilisateur_id'] = users[users.utilisateur_id == user].iloc[0]['individu_id'] df.to_sql( name = tab['table'], con = con, schema = tab['schema'], index = False, if_exists = 'append' ) else: print(''' TOUTES les relations "communes" / "territoires" existent déjà ! ''') elif ter.empty and not com.empty: print(''' NO DATA dans la table "territoires" ! ''') elif not ter.empty and com.empty: print(''' NO DATA dans la table "communes" ! ''') else: print(''' NO DATA dans la table "communes" et dans la table "territoires" ! ''') sys.exit()