#!/usr/bin/env python3 # -*- coding: UTF-8 -*- #Nom : : foncier_insert_table.py #Description : Insertion des données cadastrales à la base après de sa création. #Copyright : 2021, CEN38 #Auteur : Colas Geier #Version : 1.0 import pandas as pd # import numpy as np from sqlalchemy import create_engine, text from geoalchemy2 import Geometry import gc import sys # import time import datetime as dt # from pycen import bdd # from shapely.geometry.multipolygon import MultiPolygon # Parametrage geopandas import geopandas as gpd import warnings; warnings.filterwarnings('ignore', 'GeoSeries.isna', UserWarning) # import shapely # shapely.speedups.disable() # gpd.options.use_pygeos = True # start_time = dt.datetime.today() # tmp = dt.datetime.today() - start_time check_duplicates = False # Parametres bdd CADASTRE (in) # Données de sortie du plugin qgis "Cadastre" user_cad = 'postgres' pwd_cad = 'foncier_test1' adr_cad = '172.17.0.2' port_cad = '5432' base_cad = 'postgres' schema_cad = '202007' # Parametres bdd FONCIER (out) user_fon = 'postgres' pwd_fon = 'tutu' adr_fon = '192.168.60.9' port_fon = '5432' base_fon = 'bd_cen' schema_fon = 'cadastre' # Correspondance entre les tables crs = 'EPSG:2154' dpt_nom_tab = '_73' chunk = 100000 list_dep = ['07', '26', '42', '38'] FIND_DOUBLON = [{ 'tab_in': 'proprietaire', 'on_col': ['ddenom', 'dprnlp', 'dldnss','jdatnss','ccogrm','dsglpm','dnatpr'] } ] DICT_TAB = [{ 'table_in' : 'proprietaire', # Table source qui provient de la sortie du plugin cadastre de qgis 'index_tab': 'proprietaire', # Pkey de la table source 'columns_in': ['ccodep', 'ccocom', 'dnupro', 'dnuper', 'ccoqua', 'ddenom', 'jdatnss', 'dldnss', 'dsglpm', 'dlign3', 'dlign4', 'dlign5', 'dlign6', 'dnatpr', 'gtoper', 'ccogrm'], 'table_out': [{ 'name': 'cptprop{}'.format(dpt_nom_tab), 'geom': None, 'drop_escape': False, # Supprime les champs vides à l'intérieure des chaines de carractères 'columns_in': ['ccodep', 'ccocom', 'dnupro'], # Liste des columns à récupérer en entrée. 'columns_add': {'dnupro': ['ccodep', 'ccocom', 'dnupro']}, # Définition des champs composés devant être ajoutés 'unique': {'cols': ['dnupro'], 'keep': 'first'}, # Champs devant être uniques à l'intérieur de la table en sortie 'dict': None, # Dictionnaire pour renommer les champs {'ancien_nom1': 'nouveau_nom1', 'ancien_nom2': 'nouveau_nom2', ...} 'join': [{ 'bdd': 'in', 'table': 'suf', 'on': ['ccodep', 'ccocom', 'dnupro'], 'type': 'concat', 'select_cols' : ['ccodep', 'ccocom', 'dnupro']},{ 'bdd': 'in', 'table': 'lots', 'on': ['ccodep', 'ccocom', 'dnupro'], 'type': 'concat', 'select_cols' : ['ccodep', 'ccocom', 'dnuprol'],'dict': {'dnuprol': 'dnupro'}},{ 'bdd': 'in', 'table': 'parcelle', 'on': ['ccodep', 'ccocom', 'dnupro'], 'type': 'concat', 'select_cols' : ['ccodep', 'ccocom', 'dnupro']},] },{ 'name': 'proprios{}'.format(dpt_nom_tab), 'geom': None, 'drop_escape': True, 'columns_in': ['ccodep', 'dnuper', 'ccoqua', 'ddenom', 'jdatnss', 'dldnss', 'dsglpm', 'dlign3', 'dlign4', 'dlign5', 'dlign6', 'dnatpr', 'gtoper', 'ccogrm'], 'columns_add': {'dnuper': ['ccodep', 'dnuper']}, 'unique': {'cols': ['dnuper'], 'keep': 'first'}, 'dict': None, 'join': False },{ 'name': 'r_prop_cptprop{}'.format(dpt_nom_tab), 'geom': None, 'drop_escape': True, 'columns_in': ['ccodep', 'dnuper', 'ccocom', 'dnupro', 'dnomlp', 'dprnlp', 'epxnee', 'dnomcp', 'dprncp', 'ccodro', 'ccodem'], 'columns_add': { 'dnuper': ['ccodep', 'dnuper'], 'dnupro': ['ccodep', 'ccocom', 'dnupro']}, 'unique': {'cols': ['dnupro', 'dnuper'], 'keep': 'first'}, 'dict': None, 'join': False },] },{ 'table_in' : 'parcelle', 'index_tab': 'parcelle', 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'ccovoi', 'dparpi', 'dcntpa', 'ccocomm', 'ccoprem', 'ccosecm', 'dnuplam', 'dvoilib', 'type_filiation', 'dnupro'], 'table_out': [{ 'name': 'vl{}'.format(dpt_nom_tab), 'geom': None, 'drop_escape': False, 'columns_in' : ['ccodep', 'ccocom', 'ccovoi', 'dvoilib'], 'columns_add': { 'vl_id': ['ccodep', 'ccocom', 'ccovoi'], 'geom': None}, 'unique': {'cols': ['vl_id'], 'keep': 'first'}, 'dict': {'dvoilib': 'libelle'}, 'join': [{ # ERROR ! 2 dclssf pour 1 lot_id 'bdd': 'in', 'table': 'voie', 'on': ['ccodep', 'ccocom', 'ccovoi'], 'type': 'concat', 'select_cols' : ['ccodep', 'ccocom', 'codvoi', 'libvoi'], 'dict': {'libvoi': 'libelle', 'codvoi': 'ccovoi'}, }] },{ 'name': 'parcelles{}'.format(dpt_nom_tab), 'geom': { 'table_geom_in': 'geo_parcelle', 'index_geom': 'geo_parcelle' }, 'drop_escape': True, 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'ccovoi', 'dparpi', 'dcntpa', 'ccocomm', 'ccoprem', 'ccosecm', 'dnuplam', 'type_filiation'], 'columns_add': { 'par_id': ['ccodep', 'ccocom', 'ccopre','ccosec', 'dnupla'], 'codcom': ['ccodep', 'ccocom'], 'vl_id': ['ccodep', 'ccocom', 'ccovoi'], 'typprop_id': None }, 'unique': False, 'dict': {'type_filiation': 'type'}, 'join': False },{ 'name': 'lots{}'.format(dpt_nom_tab), # !!!!!! Ne trouve pas de parcelles sans lots (ex: 38357000AE0526) 'geom': None, 'drop_escape': True, 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dcntpa'], 'columns_add': { 'lot_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla'], 'par_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla'], 'dnulot': None, }, 'unique': False, 'dict': {'dcntpa': 'dcntlo'}, 'join': [{'bdd': 'out', 'table': 'parcelles{}'.format(dpt_nom_tab), 'on': ['par_id'], 'type': 'isin', 'select_cols' :['par_id'] }] # },{ # 'name': 'cptprop{}'.format(dpt_nom_tab), # !!!!!! Ne trouve pas de parcelles sans lots (ex: 38357000AE0526) # 'geom': None, # 'drop_escape': True, # 'columns_in' : ['ccodep', 'ccocom', 'dnupro'], # 'columns_add': { # 'dnupro': ['ccodep', 'ccocom', 'dnupro'], # }, # 'unique': {'cols': ['dnupro'], 'keep': 'first'}, # 'dict': None, # 'join': [{'bdd': 'out', 'table': 'cptprop{}'.format(dpt_nom_tab), 'on': ['dnupro'], 'type': 'notin', # 'select_cols' :['dnupro'] }] },] # },{ # 'table_in' : 'suf', # 'index_tab': 'suf', # 'columns_in' : ['ccodep', 'ccocom', 'dnupro'], # 'table_out': [{ # 'name': 'cptprop{}'.format(dpt_nom_tab), # !!!!!! Ne trouve pas de parcelles sans lots (ex: 38357000AE0526) # 'geom': None, # 'drop_escape': True, # 'columns_in' : ['ccodep', 'ccocom', 'dnupro'], # 'columns_add': { # 'dnupro': ['ccodep', 'ccocom', 'dnupro'], # }, # 'unique': {'cols': ['dnupro'], 'keep': 'first'}, # 'dict': None, # 'join': [{'bdd': 'out', 'table': 'cptprop{}'.format(dpt_nom_tab), 'on': ['dnupro'], 'type': 'notin', # 'select_cols' :['dnupro'] }] # },] },{ 'table_in' : 'lots', 'index_tab': 'lots', 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot', 'dnupdl', 'dcntlo', 'dnuprol'], 'table_out': [{ # 'name': 'cptprop{}'.format(dpt_nom_tab), # !!!!!! Ne trouve pas de parcelles sans lots (ex: 38357000AE0526) # 'geom': None, # 'drop_escape': True, # 'columns_in' : ['ccodep', 'ccocom', 'dnuprol'], # 'columns_add': { # 'dnupro': ['ccodep', 'ccocom', 'dnuprol'], # }, # 'unique': {'cols': ['dnupro'], 'keep': 'first'}, # 'dict': None, # 'join': [{'bdd': 'out', 'table': 'cptprop{}'.format(dpt_nom_tab), 'on': ['dnupro'], 'type': 'notin', # 'select_cols' :['dnupro'] }] # },{ 'name': 'lots{}'.format(dpt_nom_tab), # !!!!!! parcelles avec lots: existe par_id NOT IN parcelles_73 'geom': None, 'drop_escape': True, 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot', 'dnupdl', 'dcntlo'], 'columns_add': { 'lot_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot'], 'par_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla'],}, 'unique': {'cols': ['lot_id'], 'keep': 'first'}, 'dict': None, 'join': [{'bdd': 'out', 'table': 'parcelles{}'.format(dpt_nom_tab), 'on': ['par_id'], 'type': 'isin', 'select_cols' :['par_id'] }] },{ 'name': 'lots_natcult{}'.format(dpt_nom_tab), 'geom': None, 'drop_escape': True, 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot'], 'columns_add': { 'lot_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot'],}, 'unique': {'cols': ['lot_id'], 'keep': 'first'}, 'dict': None, 'join': [{ # ERROR ! 2 dclssf pour 1 lot_id 'bdd': 'in', 'table': 'suf', 'on': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot'], 'type': 'merge', 'select_cols' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot','dsgrpf','cnatsp','dclssf','ccosub','dcntsf'], },{ 'bdd': 'out', 'table': 'lots{}'.format(dpt_nom_tab), 'on': ['lot_id'], 'type': 'isin', 'select_cols' :['lot_id'] }] },{ 'name': 'cadastre{}'.format(dpt_nom_tab), 'geom': None, 'drop_escape': True, 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot', 'dnuprol'], 'columns_add': { 'lot_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot'], 'dnupro': ['ccodep', 'ccocom', 'dnuprol'],}, 'unique': {'cols': ['lot_id', 'dnupro'], 'keep': 'first'}, 'dict': None, 'join': [{ # ERROR ! 2 dclssf pour 1 lot_id 'bdd': 'in', 'table': 'suf', 'on': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot', 'dnuprol'], 'type': 'concat', 'select_cols' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnulot', 'dnupro'], 'dict': {'dnupro': 'dnuprol'} },{ 'bdd': 'in', 'table': 'parcelle', 'on': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnuprol'], 'type': 'concat', 'select_cols' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnupro'], 'dict': {'dnupro': 'dnuprol'} },{ 'bdd': 'out', 'table': 'lots{}'.format(dpt_nom_tab), 'on': ['lot_id'], 'type': 'isin', 'select_cols' :['lot_id'] },{ 'bdd': 'out', 'table': 'cptprop{}'.format(dpt_nom_tab), 'on': ['dnupro'], 'type': 'isin', 'select_cols' :['dnupro'] },] },] # },{ # 'table_in' : 'proprietaire', # Table source qui provient de la sortie du plugin cadastre de qgis # 'index_tab': 'proprietaire', # Pkey de la table source # 'columns_in': ['ccodep', 'dnuper', 'ccoqua', 'ddenom', 'jdatnss', 'dldnss', 'dsglpm', 'dlign3', 'dlign4', 'dlign5', 'dlign6', 'dnatpr', 'gtoper', 'ccogrm', # 'ccocom', 'dnupro', 'dnomlp', 'dprnlp', 'epxnee', 'dnomcp', 'dprncp', 'ccodro', 'ccodem'], # 'table_out': [] # },{ # 'table_in' : 'parcelle', # 'index_tab': 'parcelle', # 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnupro'], # 'table_out': [{ # 'name': 'cadastre{}'.format(dpt_nom_tab), # 'geom': None, # 'drop_escape': True, # 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnupro'], # 'columns_add': { # 'lot_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla'], # 'dnupro': ['ccodep', 'ccocom', 'dnupro'],}, # 'unique': {'cols': ['lot_id', 'dnupro'], 'keep': 'first'}, # 'dict': None, # 'join': [{ # 'bdd': 'out', 'table': 'lots{}'.format(dpt_nom_tab), 'on': ['lot_id'], 'type': 'isin', # 'select_cols' :['lot_id'], 'where': {'dnulot': None} },{ # 'bdd': 'out', 'table': 'cptprop{}'.format(dpt_nom_tab), 'on': ['dnupro'], 'type': 'isin', # 'select_cols' :['dnupro'] },] # },] # },{ # 'table_in' : 'parcelle', # 'index_tab': 'parcelle', # 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnupro'], # 'table_out': [{ # 'name': 'cadastre{}'.format(dpt_nom_tab), # 'geom': None, # 'drop_escape': True, # 'columns_in' : ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla', 'dnupro'], # 'columns_add': { # 'lot_id': ['ccodep', 'ccocom', 'ccopre', 'ccosec', 'dnupla'], # 'dnupro': ['ccodep', 'ccocom', 'dnupro'],}, # 'unique': {'cols': ['lot_id', 'dnupro'], 'keep': 'first'}, # 'dict': None, # 'join': [{ # 'bdd': 'out', 'table': 'lots{}'.format(dpt_nom_tab), 'on': ['lot_id'], 'type': 'isin', # 'select_cols' :['lot_id'], },{ # 'bdd': 'out', 'table': 'cptprop{}'.format(dpt_nom_tab), 'on': ['dnupro'], 'type': 'isin', # 'select_cols' :['dnupro'] },] # },] }] # # Connexion bdd # bd_cad = bdd.CEN( # user = user_cad, # pwd = pwd_cad, # adr = adr_cad, # base = base_cad # # schema = schema # ) ################################ ########## Fonctions ########## ################################ start_time = dt.datetime.today() def time_exec (init_time): time = dt.datetime.today() - init_time return str(time) def replace_escape_by_0 (df): # Remplacement des espaces dans les chaines de caractères par des 0 # if 'ccopre' in df.columns: # df['ccopre'].replace([None, '', ' '], '000', inplace=True) cols = ['ccopre', 'ccosec', 'dnupla', 'dparpi', 'dnuplam', 'dclssf', 'ccovoi'] for col in cols: if col in df.columns: df[col].replace([' '], '0', regex=True, inplace=True) return df def join_data (df, join, schema_in): # Jointure des données avec une autre table table = join['table'] bdd = join['bdd'] typ = join['type'] on = join['on'] if bdd == 'out': con = engine_fon sch = schema_fon if bdd == 'in': con = engine_cad sch = schema_in select_col = [] if 'select_cols' in join.keys(): select_col.extend(join['select_cols']) if 'where' in join.keys(): select_col.extend(join['where'].keys()) tmp = pd.read_sql_table( table_name = table, con = con, schema = sch, columns = select_col ) tmp = replace_escape_by_0(tmp) if 'dict' in join.keys(): tmp.rename(columns=join['dict'], inplace=True) if 'where' in join.keys(): where = join['where'] for key in where.keys(): tmp = tmp[tmp[key] == where[key] ] if typ in ['isin', 'notin']: # on = on[0] for d in [df, tmp]: d['on'] = '' for col in on: d['on'] += d[col].astype(str) if typ == 'isin': df = df[df['on'].isin(tmp['on'])] if typ == 'notin': df = df[~df['on'].isin(tmp['on'])] df.drop(columns='on', inplace=True) # if typ == 'notin': # on = on[0] # df = df[~df[on].isin(tmp[on])] # df = pd.concat([df,tmp]).drop_duplicates(on, keep=False) if typ == 'merge': df = df.merge(tmp, on = on, how='left') if typ == 'concat': df = pd.concat([df,tmp], ignore_index=True).drop_duplicates() return df def get_geom_parcelle (df,get_geo,schema): print('INIT import geodata ........... %s sec'%( time_exec(start_time) )) # Définition des variables géometriques ind_geo = get_geo['index_geom'] tab_geo = get_geo['table_geom_in'] sql = """select distinct on (t2.{0}) t2.{0}, t1.geom, t1.supf::integer as dcntpa -- récupération de la contenance cadastrale associée car présence de géometrie non référencées dans la table "parcelles" FROM "{1}".{2} t1 INNER JOIN (select distinct on ({0}) {0}, max(creat_date) creat_date, max(update_dat) update_dat FROM "{1}".{2} GROUP BY ({0})) t2 USING ({0}, creat_date, update_dat)""".format(ind_geo, schema, tab_geo) tmp = gpd.read_postgis( sql = sql, con = engine_cad, geom_col = 'geom', crs = crs, chunksize = chunk, ) if chunk: gdf = gpd.GeoDataFrame(pd.concat(tmp, ignore_index=True)) else: gdf = tmp.copy() # del tmp; gc.collect() # gdf = tmp.copy() del tmp gdf.set_index(ind_geo, inplace=True) gdf.index.name = ind_in print('END import geodata ........... %s sec'%( time_exec(start_time) )) print('INIT merge data - geodata ........... %s sec'%( time_exec(start_time) )) if not gdf[gdf.dcntpa.isna()].empty: gdf.dcntpa.fillna(0, inplace=True) gdf['dcntpa'] = gdf['dcntpa'].astype(df.dtypes['dcntpa'].type) # gdf = gdf.merge(df, on = [ind_in, 'dcntpa'], how='left') tmp = gdf.merge(df, on = [ind_in, 'dcntpa'], how='right') tmp = tmp.set_geometry('geom', drop=True, crs=crs) tmp.rename(columns={'geometry': 'geom'}, inplace=True) if tmp[tmp.geom.isna()].empty: lst_ind_df = tmp[tmp.geom.isna()].index.tolist() lst_ind_gdf = gdf.loc[gdf.index.isin(lst_ind_df)].index.tolist() tmp.loc[tmp.index.isin(lst_ind_gdf), 'geom'] = gdf.loc[gdf.index.isin(lst_ind_gdf), 'geom'] del [gdf, df] gdf = tmp.copy() del tmp export_data(gdf) def export_data( df): print('INIT export data TO {0}, {1} ........... {2} sec'.format(tab_out, df.shape[0], time_exec(start_time) )) rang = [e for e in range(0, df.shape[0], chunk*5)] for i, j in enumerate(rang): if j == max(rang) : jj = df.shape[0] else: jj = rang[i+1] df_imp = df[j:jj].copy() print('INIT export data TO {0} ..... {1}/{2} ...... {3} sec'.format(tab_out, jj, df.shape[0], time_exec(start_time) )) if 'geom' in df.columns and not df[~df['geom'].isna()].empty : df_imp = df_imp.set_geometry('geom', drop=True, crs=crs) df_imp.rename(columns={'geometry': 'geom'}, inplace=True) df_imp.to_postgis( name = tab_out, con = engine_fon, schema = schema_fon, index = False, if_exists = 'append', geom_col = 'geom', chunksize = chunk, ) else: df_imp.to_sql( name = tab_out, con = engine_fon, schema = schema_fon, index = False, if_exists = 'append', chunksize = chunk, method = 'multi', ) print('END export data TO {0} ........... {1} sec'.format(tab_out, time_exec(start_time) )) def optimize_data_frame(df): columns = df.columns for col in columns: dtype = df[col].dtypes # if dtype == 'int64' or dtype == 'int32': len_col = len(df[col].unique()) if len_col <= df.shape[0]*0.8: df[col] = df[col].astype('category') return df # Initiation des connexions bdd engine_cad = create_engine('postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(user_cad,pwd_cad,adr_cad,port_cad,base_cad), echo=False) engine_fon = create_engine('postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(user_fon,pwd_fon,adr_fon,port_fon,base_fon), echo=False) con_cad = engine_cad.connect() con_fon = engine_fon.connect() ################################ ########## Main ########## ################################ if __name__ == "__main__": ################ # CORRECTION DUPLICATES TABLE_IN if check_duplicates: for DOUBLON in FIND_DOUBLON: tab = DOUBLON['tab_in'] on_col = DOUBLON['on_col'] for col in on_col: for dep in list_dep: schema_in = dep + '_' + schema_cad sql = ''' -- il existe des doublons en raison d'orthographes voisines : -- recherche de ces doublons SELECT DISTINCT '{0}' as insee_dep, dnuper, string_agg(DISTINCT {1},' / ') as orthographes_voisines FROM "{2}".{3} GROUP BY dnuper HAVING count(DISTINCT {1}) > 1'''.format(dep, col, schema_in, tab) df = pd.read_sql( sql = sql, con = engine_cad, ) if df.empty: print('No duplicate value dep {0} table {1} column {2} ====> next request'.format(dep, tab, col)) continue for i, row in df.iterrows(): dnuper = row.dnuper choix = row.orthographes_voisines.split(' / ') choix = [i.strip() for i in choix] Question = input("""Des orthographes voisines existent pour l'identifiant : {0} dans la colonne : {1}. Les valeurs voisines sont : {2} Ecrire la mise à jour du champs {1} à enregistrer (c cancel) :""".format(dnuper,col, choix)) if Question.lower() == 'c' or Question.lower() == 'cancel': continue update = '''UPDATE "{0}".{1} SET {2} = '{3}' WHERE {2} like '{4}%' AND dnuper = '{5}';'''.format(schema_in, tab, col, Question, "%' OR {} like '".format(col).join(map(str,choix)), dnuper) try: con_cad.execute(text(update)) print(''' Update OK !''') except Exception as exept: print('ERROR : {0}'.format(update)) print(exept) sys.exit() ################ # TRUNCATE TABLE OUT for i, DICT in enumerate(DICT_TAB): # continue # Définition des variables # i = 1 # if i != 2: # continue tab_in = DICT_TAB[i]['table_in'] col_in = DICT_TAB[i]['columns_in'] ind_in = DICT_TAB[i]['index_tab'] tabs_out = DICT_TAB[i]['table_out'] for tab_out in reversed(tabs_out): # continue sql = "TRUNCATE TABLE {0}.{1} CASCADE".format(schema_fon, tab_out['name']) print(sql) con_fon.execute(sql) for dep in list_dep: schema_in = dep + '_' + schema_cad print(''' INIT import data FROM {} '''.format(schema_in)) ################ # IMPORT IN TABLE OUT for i, DICT in enumerate(DICT_TAB): # Définition des variables # i = 1 # if i != 1: # continue tab_in = DICT_TAB[i]['table_in'] col_in = DICT_TAB[i]['columns_in'] ind_in = DICT_TAB[i]['index_tab'] tabs_out = DICT_TAB[i]['table_out'] # Import data print(''' INIT import data FROM {0}........... {1} sec'''.format(tab_in, time_exec(start_time) )) tmp = pd.read_sql_table( table_name = tab_in, con = engine_cad, schema = schema_in, columns = col_in + [ind_in], chunksize = chunk, ) # Mise en forme des données # start_time = dt.datetime.today() if chunk: DF = pd.concat(tmp, ignore_index=True) else: DF = tmp.copy() DF.drop_duplicates(inplace=True) del tmp # DF = optimize_data_frame(DF) DF.set_index(ind_in, inplace=True) print('END import data ........... %s sec'%( time_exec(start_time) )) for tab in tabs_out: tab_out = tab['name'] dictio = tab['dict'] col_df = tab['columns_in'] col_ad = tab['columns_add'] get_geo = tab['geom'] drp_esc = tab['drop_escape'] unique = tab['unique'] join = tab['join'] # if tab_out == 'parcelles_73': # break # continue print('INIT TABLE {0} ........... {1} sec'.format(tab_out, time_exec(start_time) )) df = DF[DF.columns.intersection(col_df)].copy() # df = optimize_data_frame(df) # del DF; gc.collect() # Remplacement des espaces dans les chaines de caractères par des 0 df = replace_escape_by_0(df) if drp_esc: df_obj = df.select_dtypes(['object']) df[df_obj.columns] = df_obj.apply(lambda x: x.str.strip()) # df.replace([' '], '', regex=True, inplace=True) if dictio : df.rename(columns=dictio, inplace=True) if join : for j in join: if j['bdd'] == 'in' : # sys.exit() df = join_data(df, j, schema_in) if df.empty: print('df EMPTY ====> next table') # pass continue # Ajout des champs additionnels if col_ad : print('INIT addition columns ........... %s sec'%( time_exec(start_time) )) for key in col_ad.keys(): if key in df.columns: df[key + '_tmp'] = df[key].copy() col_ad[key] = [x if x != key else key+'_tmp' for x in col_ad[key]] aggreg = col_ad[key] if aggreg : df[key] = '' for col in aggreg: df[key] += df[col].fillna('') # df[key] = df[aggreg].agg(''.join, axis=1) # break else: df[key] = aggreg print('ADD column {0} : {1} ........... {2} sec'.format(key,aggreg, time_exec(start_time) )) # JOINTURE if join : for j in join: if j['bdd'] == 'out' : # break # sys.exit() df = join_data(df, j, schema_in) if df.empty: print('df EMPTY ====> next table') # pass continue # sys.exit() if unique: df.drop_duplicates(unique['cols'], keep=unique['keep'], inplace=True) # Conservation des champs utiles à l'insertion en bdd name_col_out = engine_fon.dialect.get_columns(engine_fon, tab_out, schema=schema_fon) name_col_out = [ sub['name'] for sub in name_col_out ] if 'geom' in name_col_out and 'geom' not in df.columns: name_col_out.remove('geom') df = df[df.columns.intersection(name_col_out)] #################### # Read geodataframe # Dans le cas où un champs géometrique est nécessaire. if get_geo: get_geom_parcelle(df=df, get_geo=get_geo, schema=schema_in) # print('INIT import geodata ........... %s sec'%( time_exec(start_time) )) # # Définition des variables géometriques # ind_geo = get_geo['index_geom'] # tab_geo = get_geo['table_geom_in'] # # Get geodata from Postgis # # sql = "SELECT {0}, geom FROM (SELECT {0}, geom, max(creat_date), max(update_dat) FROM {1}.{2})".format(ind_geo,schema_in,tab_geo) # sql = """select distinct on (t2.{0}) # t2.{0}, # t1.geom, # t1.supf::integer as dcntpa -- récupération de la contenance cadastrale associée car présence de géometrie non référencées dans la table "parcelles" # FROM "{1}".{2} t1 # INNER JOIN (select distinct on ({0}) {0}, max(creat_date) creat_date, max(update_dat) update_dat FROM "{1}".{2} GROUP BY ({0})) t2 # USING ({0}, creat_date, update_dat)""".format(ind_geo, schema_in, tab_geo) # tmp = gpd.read_postgis( # sql = sql, # con = engine_cad, # geom_col = 'geom', # crs = crs, # chunksize = chunk, # ) # if chunk: # gdf = gpd.GeoDataFrame(pd.concat(tmp, ignore_index=True)) # else: # gdf = tmp.copy() # # del tmp; gc.collect() # # gdf = tmp.copy() # del tmp; gc.collect() # gdf.set_index(ind_geo, inplace=True) # gdf.index.name = ind_in # print('END import geodata ........... %s sec'%( time_exec(start_time) )) # print('INIT merge data - geodata ........... %s sec'%( time_exec(start_time) )) # if not gdf[gdf.dcntpa.isna()].empty: # gdf.dcntpa.fillna(0, inplace=True) # gdf['dcntpa'] = gdf['dcntpa'].astype(df.dtypes['dcntpa'].type) # # gdf = gdf.merge(df, on = [ind_in, 'dcntpa'], how='left') # tmp = gdf.merge(df, on = [ind_in, 'dcntpa'], how='right') # tmp = tmp.set_geometry('geom', drop=True, crs=crs) # tmp.rename(columns={'geometry': 'geom'}, inplace=True) # if tmp[tmp.geom.isna()].empty: # lst_ind_df = tmp[tmp.geom.isna()].index.tolist() # lst_ind_gdf = gdf.loc[gdf.index.isin(lst_ind_df)].index.tolist() # tmp.loc[tmp.index.isin(lst_ind_gdf), 'geom'] = gdf.loc[gdf.index.isin(lst_ind_gdf), 'geom'] # del [gdf, df]; gc.collect() # gdf = tmp.copy() # del tmp; gc.collect() # export_data(gdf) # del gdf; gc.collect() # récupération de la liste des géometries où l'id est non présentent dans la table parcelles # lst = gdf[gdf.par_id.isna()].index.tolist() # # Recomposition des infos principales # par_id = [l.replace('0','',1) for l in lst] # gdf.loc[gdf.index.isin(lst), 'par_id'] = par_id # gdf.loc[gdf.index.isin(lst), 'codcom'] = [l[:5] for l in par_id] # gdf.loc[gdf.index.isin(lst), 'ccopre'] = [l[5:8] for l in par_id] # gdf.loc[gdf.index.isin(lst), 'ccosec'] = [l[8:10] for l in par_id] # gdf.loc[gdf.index.isin(lst), 'dnupla'] = [l[10:14] for l in par_id] # # gdf.loc[gdf.index.isin(lst), 'vl_id'] = [l[:8] for l in par_id] # # gdf = gdf[gdf.vl_id.str.len() == 10] else: export_data(df) del df del DF #; gc.collect() print('END transfert data FROM département {0} ........... {1} sec'.format(dep, time_exec(start_time) )) print('END SCRIPT') sys.exit() print('NOT EXIT')