#!/usr/bin/env python3 # -*- coding: UTF-8 -*- #Nom : : foncier_insert_table.py #Description : Insertion/MAJ des données administratives et territoriales à la base lors de sa création. #Copyright : 2021, CEN38 #Auteur : Colas Geier #Version : 1.0 import pandas as pd import geopandas as gpd from sqlalchemy import create_engine from geoalchemy2 import Geometry from pycen import bdd from shapely.geometry.multipolygon import MultiPolygon from pydate import cdate import sys import os #################################### #################################### #################################### # PARAMETRES # Liste des tables à mettre à jour. ATTENTION : le respect des nomenclatures est importante # Liste dispo : ['com', 'dpt', 'ter', 'histo_com', 'ter_com'] Cette liste doit être identique aux index de l'object : Table # Correspondance tables BD_FONCIER : ['communes', 'departements', 'territoires', 'r_histo_com', 'r_ter_com'] run = ['com', 'dpt', 'ter', 'histo_com', 'ter_com'] user = 'colas_geier' pwd = 'adm1n*fOncier' # Parametres bdd # user = 'postgres' # pwd = 'tutu' # adr = '192.168.60.9' adr = '91.134.194.221' port = '5432' base = 'bd_cen' # Connexion bdd # bd = bdd.CEN( # user = user, # pwd = pwd, # adr = adr, # base = base # schema = schema # ) crs = 'EPSG:2154' con = create_engine('postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(user,pwd,adr,port,base), echo=False) con_open = con.connect() # create_engine('postgresql+psycopg2://cen_admin:#CEN38@venir@172.17.0.2:5432/bd_cen', echo=True) # PATH PATHIN = '/home/colas/' LIST_DEPT = ['07', '26', '42', '38'] # MAIN_FOLDER = 'Documents' SUB_FOLDERS = os.path.join('5_BDD','1_QGIS') FILE_COM = ['COMMUNE.shp'] FILE_DPT = ['DEPARTEMENT.shp'] FILE_TER = [ 'EPCI.shp', 'PARC_OU_RESERVE.shp', 'BASSIN_VERSANT_TOPOGRAPHIQUE.shp', 'Contours_GRPT_AURA.shp' ] # Dict table IN_COM = [{ 'id': 'id', 'insee_com': 'code_insee', 'nom': 'nom', None: 'prec_plani', None: 'statut', None: 'canton', 'insee_arr': 'arrondisst', 'insee_dep': 'depart', 'insee_reg': 'region', 'population': 'popul', None: 'multican', 'actif': 'actif', None: 'epfl', # 'geometry': 'geom', }] IN_DPT = [{ 'id': 'id', 'nom': 'nom', 'insee_dep': 'insee_dep', 'insee_reg': 'insee_reg', 'date_creat': 'date_creat', 'date_maj': 'date_maj', 'actif': 'actif', # 'geometry': 'geom', }] IN_TER = [{ # DICT epci '': 'territoire_id', 'code_siren': 'siren', 'nom': 'territoire_lib', '': 'territoire_sigle', 'nature': 'typterritoire_id', '': 'administratif', '': 'prefixe', },{ # DICT parc_ou_reserve '': 'territoire_id', 'id': 'siren', # absence de code siren ==> récup des 10 derniers charactères du champs ID 'toponyme': 'territoire_lib', '': 'territoire_sigle', 'nature': 'typterritoire_id', '': 'administratif', '': 'prefixe', },{ # DICT bassin_versant '': 'territoire_id', 'code_hydro': 'siren', 'toponyme': 'territoire_lib', '': 'territoire_sigle', 'id': 'typterritoire_id', '': 'administratif', '': 'prefixe', },{ # DICT epci '': 'territoire_id', 'sirengrpt': 'siren', 'grpt': 'territoire_lib', '': 'territoire_sigle', 'nature': 'typterritoire_id', '': 'administratif', '': 'prefixe', },] # liste des couches administratives listé dans PATH_TER administratif = ['epci'] # DICT typterritoire_id lorsqu'une correspondance # n'est pas possible avec un champs de la table attributaire. # Relation typterritoire_id / ID (de la table attributaire). typterid = { 'bassvers' : 'bv' } mask_path = 'Documents/5_BDD/1_QGIS/' mask_file = 'mask_parcelles_cadastre.shp' #################################### #################################### #################################### # FONCTIONS def join_typterritoire(df, join): tab = join['table'] sch = join['schema'] ind = join['id'] on = join['on'] tmp = pd.read_sql_table( table_name = tab, con = con, schema = sch, index_col = ind, ) df[on['x']] = df[on['x']].str.lower() df[on['x']] = df[on['x']].replace(tmp[on['y']].str.lower().to_list(),tmp.index.to_list()) df = df[ df[on['x']].isin(tmp.index.to_list()) ] return df def find_files(File, main_path='Documents', sub_path=None): ''' @File : list @main_path : str @sub_path : str ''' sub = '' if sub_path: sub = sub_path path = os.path.join(main_path, sub) list_path = [] for F in File : matches = [str(path) for path in Path(path).rglob(F)] list_path += matches return list_path #################################### #################################### #################################### # MAIN if os.path.exists(PATHIN + mask_path + mask_file): mask = gpd.read_file(PATHIN + mask_path + mask_file, crs=crs) else : sys.exit() Table = { 'dpt': {'schema':'administratif', 'table':'departements', 'file': FILE_DPT, 'geom': True, 'dict':IN_DPT}, 'com' : {'schema':'administratif', 'table':'communes', 'file': FILE_COM, 'geom': True, 'dict':IN_COM}, # 'histo_com': {'schema':'administratif', 'table':'r_histo_com', 'file':None, 'geom': False, 'dict':None}, 'ter': {'schema':'territoires', 'table':'territoires', 'file': FILE_TER, 'geom': True, 'dict':IN_TER, 'join': { 'schema':'territoires', 'table': 'd_typterritoire', 'id': 'typterritoire_id', 'on': {'x': 'typterritoire_id', 'y': 'typterritoire_lib'}}}, 'ter_com': {'schema':'territoires', 'table':'r_ter_com', 'file':None, 'geom': False, 'dict':None} } for d, dep in enumerate(LIST_DEPT): PATH = 'Documents/5_BDD/1_QGIS/202103_BDTOPO/bdtopo_dep{0}_202103_shapefile/BDT_3-0_SHP_LAMB93_D0{0}-ED2021-03-15/'.format(dep) PATH_COM = [PATH + 'ADMINISTRATIF/COMMUNE.shp'] PATH_DPT = [PATH + 'ADMINISTRATIF/DEPARTEMENT.shp'] PATH_TER = [ PATH + 'ADMINISTRATIF/EPCI.shp', PATH + 'ZONES_REGLEMENTEES/PARC_OU_RESERVE.shp', PATH + 'HYDROGRAPHIE/BASSIN_VERSANT_TOPOGRAPHIQUE.shp', '' ] Table = { 'dpt': {'schema':'administratif', 'table':'departements', 'file': [PATHIN + path for path in PATH_DPT], 'geom': True, 'dict':IN_DPT}, 'com' : {'schema':'administratif', 'table':'communes', 'file': [PATHIN + path for path in PATH_COM], 'geom': True, 'dict':IN_COM}, # 'histo_com': {'schema':'administratif', 'table':'r_histo_com', 'file':None, 'geom': False, 'dict':None}, 'ter': {'schema':'territoires', 'table':'territoires', 'file': [PATHIN + path for path in PATH_TER], 'geom': True, 'dict':IN_TER, 'join': { 'schema':'territoires', 'table': 'd_typterritoire', 'id': 'typterritoire_id', 'on': {'x': 'typterritoire_id', 'y': 'typterritoire_lib'}}}, 'ter_com': {'schema':'territoires', 'table':'r_ter_com', 'file':None, 'geom': False, 'dict':None} } not_run = [k for k in Table.keys() if k not in run] for r in not_run: del Table[r] for tab in Table: Table['file'] = find_files(File= Table['file'], sub_path=SUB_FOLDERS, main_path=MAIN_FOLDER) if d == 0: for tab in reversed(Table): # continue sql = "TRUNCATE TABLE {0}.{1} CASCADE".format(Table[tab]['schema'], Table[tab]['table']) print(sql) con_open.execute(sql) no_r_tab = lambda x: x not in ['ter_com'] for key in filter(no_r_tab, Table.keys()): # Test existance de la table en bdd lst_tab = con.dialect.get_table_names(con, schema=Table[key]['schema']) test = Table[key]['table'] in lst_tab # Si la table existe if test: DICT = Table[key]['dict'] # Test présence d'un champ 'geom' ou 'geometry' dans la table d'export geom = False col_tab = con.dialect.get_columns(con, Table[key]['table'], schema=Table[key]['schema']) for o, obj in enumerate(col_tab): if 'geom' in obj['name']: geom = True geom_name = obj['name'] geom_type = obj['type'].geometry_type if DICT: for D, tmp in enumerate(DICT): DICT[D]['geometry'] = geom_name # Suppression des champs non utiles if DICT: for D, tmp in enumerate(DICT): if DICT[D] and None in DICT[D].keys(): del DICT[D][None] if Table[key]['file']: for f, i_file in enumerate(Table[key]['file']): # Si présence d'une géometrie dans la table à insérer if geom: # if Table[key]['geom']: # Read new table print('IMPORT shape for table {0}'.format(Table[key]['table'])) df = gpd.read_file(filename=i_file) df = gpd.sjoin(df, mask, how='inner', op='intersects', rsuffix='right') del_cols = [col for col in tutu.columns if col.endswith('right')] + ['FID'] df.drop(columns=del_cols, inplace=True) df['actif'] = True # if 'ID' in df.columns: # df.set_index('ID', inplace=True) # typ_geom_out = con.dialect.get_columns(con, Table[key]['table'], schema=Table[key]['schema']) # Harmonisation des géometries # Transformation des géometries POLYGON ==> MULTIPOLIGON geom_df = df.geometry.geom_type.unique().tolist() geom_df = [x.upper() for x in geom_df] if [geom_type] != geom_df: if geom_type == 'MULTIPOLYGON' and 'POLYGON' in geom_df: print('CORRECTION des géometries POLYGON ==> MULTIPOLYGON') lst = [] for o, obj in enumerate(df.geometry): if obj.geom_type == 'Polygon': obj = MultiPolygon([obj]) lst.append(obj) df['geometry'] = lst # elif geom_type == 'POLYGON' and 'MULTIPOLYGON' in geom_df: # df[df.geom.geom_type == 'MultiPolygon'] # pass else: print('ERROR : conflit entre la géometrie du df {0} et de la table postgis {1}'.format(geom_df,geom_type)) # Conservation des lignes appartenant au département print('CONSERVATION des données départementales') df.columns = df.columns.str.lower() if 'insee_dep' in df.columns: df = df.loc[df.insee_dep == dep] # Formatage des champs pour insertion print('FORMATAGE des données') df.rename(columns=DICT[f], inplace=True) rm_col = df.columns.difference(DICT[f].values()) df.drop(columns=rm_col, inplace=True) # break # Identification du champs 'administratif' pour la table territoire couche = i_file.split('/') couche = couche[len(couche)-1] couche = couche.split('.')[0].lower() if 'ter' == key and couche in administratif: df['administratif'] = True elif 'ter' == key and couche not in administratif: df['administratif'] = False df['siren'] = [siren[-10:] for siren in df['siren']] if 'typterritoire_id' in DICT[f].values(): key_typterr = [k for (k, v) in DICT[f].items() if v == 'typterritoire_id'][0] if 'join' in Table[key].keys() and key_typterr != 'id': df = join_typterritoire(df, Table[key]['join']) if key == 'ter' and key_typterr == 'id': # df['typterritoire_id'] = df.index.to_list() df['typterritoire_id'] = [typter[:8] for typter in df['typterritoire_id']] df['typterritoire_id'] = df['typterritoire_id'].str.lower() df['typterritoire_id'] = df['typterritoire_id'].replace(typterid) # df[on['x']].replace(tmp[on['y']].str.lower().to_list(),tmp.index.to_list()) # Si présence d'une géometrie dans la table à insérer if geom: if not isinstance(df, gpd.GeoDataFrame): df = df.set_geometry('geom', drop=True, crs=crs) df.rename(columns={'geometry': 'geom'}, inplace=True) # if Table[key]['geom']: df.to_postgis( name = Table[key]['table'], con = con, schema = Table[key]['schema'], index = False, if_exists = 'append', geom_col = geom_name, # dtype={'geom': Geometry(geometry_type='MULTIPOLYGON', srid=df.crs.to_epsg())} ) print('''INSERT TABLE OK for DEPT {} '''.format(dep)) else: None # Si la table existe pas else: print('ERROR : La table {0} n\'existe pas dans le schéma {1} !'.format( Table[key]['table'].upper(), Table[key]['schema'].upper() )) if 'ter_com' in Table.keys() and d == 0: tab = Table['ter_com'] print('IMPORT tables for table {0}'.format(tab['table'])) ter_sql = 'SELECT * FROM {sch}.{tab}'.format(sch='territoires', tab='territoires' ) ter = gpd.read_postgis( sql = ter_sql, con = con, geom_col = 'geom', crs = crs, ) com_sql = 'SELECT * FROM {sch}.{tab} WHERE actif = true'.format(sch='administratif', tab='communes' ) com = gpd.read_postgis( sql = com_sql, con = con, geom_col = 'geom', crs = crs, ) col_id = ['territoire_id', 'code_insee', 'geom'] for df in [ter, com]: rm_col = [ col for col in df.columns[~df.columns.isin(col_id)] ] df.drop(columns=rm_col, inplace=True) print('JOIN tables "territoires" & "communes"') df = gpd.sjoin(ter, com, op='intersects') rm_col = [ col for col in df.columns[~df.columns.isin(col_id)] ] rm_col.append('geom') df.drop(columns=rm_col, inplace=True) df = pd.DataFrame(df) r_sql = 'SELECT code_insee, territoire_id FROM {sch}.{tab}'.format(sch=tab['schema'], tab=tab['table'] ) r_tab = pd.read_sql( sql = r_sql, con = con, ) if not r_tab.empty: print('DROP lignes présentes dans la table {}'.format(tab['table'])) df = pd.concat([df,r_tab]).drop_duplicates(keep=False) if not df.empty: users = pd.read_sql_table( table_name = 'utilisateurs', con = con, schema = 'admin_sig' ) date_now = cdate.today() df['actif'] = True df['date_maj'] = date_now df['utilisateur_id'] = users[users.utilisateur_id == user].iloc[0]['individu_id'] df.to_sql( name = tab['table'], con = con, schema = tab['schema'], index = False, if_exists = 'append' ) else: print(''' TOUTES les relations "communes" / "territoires" existent déjà ! ''') # gdf.set_index('id', drop=True, inplace=True) # for key in Table.keys(): # # schema = Table[key]['schema'] # # table = Table[key]['table'] # # query = 'SELECT * FROM {0}.{1}'.format(schema,table) # # gdf = gpd.read_postgis(sql=query, con=con) # df = bd.get_table( # schema=Table[key]['schema'], # table=Table[key]['table']) # print(df)