#!/usr/bin/env python3 # -*- coding: UTF-8 -*- #Nom : : update.py #Description : #Copyright : 2021, CEN38 #Auteur : Colas Geier #Version : 1.0 """ Module contains tools for processing files into DataFrames or other objects """ import geopandas as gpd import pandas as pd from .params import DIC_REF_HAB, DIC_UNIQUE_KEY from .tools import _get_table ##################################### ### Update ### ##################################### def __get_pkey__(engine,table_name,schema): pk = engine.dialect.get_pk_constraint(engine,table_name=table_name,schema=schema) return pk def _get_dic(self): if self._table: select_cols = DIC_REF_HAB[self._table] return select_cols def _get_schema_name(self): engine = self.con lst_sch = engine.dialect.get_schema_names(engine) tab = [] for sch in lst_sch: lst_tab = engine.dialect.get_table_names(engine,schema=sch) if self._table in lst_tab: tab += [sch] if len(tab) > 1: print('La table %s exitent dans plusieurs schéma ! Préciser l''argument ''schema'''%self._table) elif len(tab) == 0: print('La table %s sitée n''existe pas ...') else: return tab[0] def _check_data_exist(self, df): engine = self.con pk = __get_pkey__(engine,table_name=self._table,schema=self._schema) self._pkey = pk['constrained_columns'][0] pkey = self._pkey if self._table in DIC_UNIQUE_KEY.keys(): pkey = DIC_UNIQUE_KEY[self._table] data_indb = _get_table(self.con, self._schema, self._table, cols=[pkey]) data_exist_not = df[~df[pkey].isin(data_indb[pkey])] data_exist = df[df[pkey].isin(data_indb[pkey])] data_exist_eq, data_exist_ne = _check_eq_df(self, data_exist) return [data_exist_eq, data_exist_ne, data_exist_not] def _check_eq_df(self,df): from .zh import _get_table pkey = self._pkey params = {} if isinstance(pkey, list): for key in pkey: p = {key: df[key]} params = {**params, **p} else: params = {pkey:df[pkey]} data = _get_table(self.con, self._schema, self._table, cols=df.columns.tolist(), params_col=params) if self._table in DIC_UNIQUE_KEY.keys(): if DIC_UNIQUE_KEY[self._table] != pkey: df = df.sort_values(DIC_UNIQUE_KEY[self._table]) data = data.sort_values(DIC_UNIQUE_KEY[self._table]) else : df = df.sort_values(pkey) data = data.sort_values(pkey) else: df = df.sort_values(pkey) data = data.sort_values(pkey) if self._table in DIC_UNIQUE_KEY.keys(): if DIC_UNIQUE_KEY[self._table] != pkey: df.set_index(pkey, inplace=True) data.set_index(pkey, inplace=True) eq = df[df.eq(data).all(axis=1)] ne = df[df.ne(data).any(axis=1)] eq.reset_index(drop=False, inplace=True) ne.reset_index(drop=False, inplace=True) else: eq = df[df.eq(data).all(axis=1)] ne = df[df.ne(data).any(axis=1)] else: eq = df[df.eq(data).all(axis=1)] ne = df[df.ne(data).any(axis=1)] return [eq,ne] def __openFile__(file): if isinstance(file,str): df = pd.read_table(file) class update_ref_table: ''' Mise à jour des tables de référance situées dans les schéma : ref_habitats, ref_hydro Paramètres : ------------ file : str. chemain du vecteur. table : str. Nom de la table à mettre à jour. schema : str. Nom du schéma. update : bool. Si True, mise à jour des champs différents. Défault False. ''' def __init__(self, file, table, schema=None, update=False): from .params import con self.con = con self._file = file self._table = table self._updt = update self._schema = schema if not self._schema: self._schema = _get_schema_name(self) self._pkey = __get_pkey__( self.con,table_name=self._table,schema=self._schema )['constrained_columns'][0] self._update_table_with_geom() def _update_table_with_geom(self): dic = _get_dic(self) select_cols = list(dic.values()) if isinstance(self._file,str): df = gpd.read_file(self._file) elif isinstance(self._file,gpd.GeoDataFrame): df = self._file.copy() else: print('Argument ''file'' ERROR !') if 'id' not in df.columns: df.index.name = 'id' df.reset_index(drop=False, inplace=True) if not df.crs: df.set_crs(epsg=2154, inplace=True) if df.crs.srs != 'epsg:2154': df.to_crs(epsg=2154, inplace=True) df.rename(columns=dic, inplace=True) df = df[select_cols] df = df.set_geometry('geom') if 'Polygon' in df.geom_type.unique() and 'MultiPolygon' in df.geom_type.unique(): from shapely.geometry.multipolygon import MultiPolygon tmp = df.loc[df.geom_type == 'Polygon'].copy() geom = [MultiPolygon([x]) for x in tmp.loc[tmp.geom_type == 'Polygon','geom']] tmp = tmp.set_geometry(geom) df = pd.concat([df.drop(tmp.index), tmp]).sort_values('id') if 'LineString' in df.geom_type.unique() and 'MultiLineString' in df.geom_type.unique(): from shapely.geometry.multilinestring import MultiLineString tmp = df.loc[df.geom_type == 'LineString'].copy() geom = [MultiLineString([x]) for x in tmp.loc[tmp.geom_type == 'LineString','geom']] tmp = tmp.set_geometry(geom) df = pd.concat([df.drop(tmp.index), tmp]).sort_values('id') df_exst, df_updt, df_imp = _check_data_exist(self,df) len_imp = len(df_imp) len_updt = len(df_updt) len_exst = len(df_exst) if len_exst > 0: print('DATAS {0}/{1} already exist'.format(len_exst,len(df))) else: print('NO DATA EXIST ...') if df_imp.empty: print('NO DATA TO IMPORT ...') else : Q = input('IMPORT {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_imp,len(df))) while Q.lower() in ['view','v']: print(df_imp) Q = input('IMPORT {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_imp,len(df))) if Q.lower() in ['yes','y']: print('IMPORT {0}/{1} ...'.format(len_imp,len(df))) df_imp.to_postgis( name = self._table, con = self.con, schema = self._schema, if_exists = 'append', geom_col = 'geom' ) elif Q.lower() in ['no','n']: pass if df_updt.empty: print('NO DATA TO UPDATE ...') else : Q = input('UPDATE {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_updt,len(df))) while Q.lower() in ['view','v']: print(df_updt) Q = input('UPDATE {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_updt,len(df))) if Q.lower() in ['yes','y']: print('UPDATE {0}/{1} ...'.format(len_updt,len(df))) update.update_to_sql( df=df_updt, con=self.con, table_name=self._table, schema_name=self._schema, key_name=self._pkey ) elif Q.lower() in ['no','n']: pass class update: def update_ref(file, table, schema=None, update=False): update_ref_table(file, table, schema, update) def update_to_sql(df, con, table_name, schema_name, key_name): from sys import exit a = [] b = [] table = table_name schema = schema_name primary_key = key_name pkey = __get_pkey__( con,table_name=table,schema=schema) if pkey not in df.columns: exit('Le champs clé primaire "%s" ne figure pas dans le DataFrame'%pkey) if isinstance(primary_key, str): primary_key = [primary_key] for col in df.columns: if col in primary_key: b.append("t.{col}=f.{col}".format(col=col)) else: a.append("{col}=t.{col}".format(col=col)) if isinstance(df, gpd.GeoDataFrame): df.to_postgis( name = 'temp_table', con = con, schema = schema, if_exists = 'replace', geom_col = df.geometry.name ) else: df.to_sql( name = 'temp_table', con = con, schema = schema, if_exists = 'replace', index = False, method = 'multi' ) update_stmt_1 = "UPDATE {sch}.{final_table} f".format(sch=schema,final_table=table) update_stmt_2 = " FROM {sch}.temp_table t".format(sch=schema) update_stmt_6 = " WHERE %s"%' AND '.join(b) update_stmt_3 = " SET " update_stmt_4 = ", ".join(a) update_stmt_5 = update_stmt_1 + update_stmt_3 + update_stmt_4 + update_stmt_2 + update_stmt_6 + ";" drop_stmt = "DROP TABLE {sch}.temp_table ;".format(sch=schema) with con.begin() as cnx: cnx.execute(update_stmt_5) cnx.execute(drop_stmt) return print('END update')