pycen/pycen/update.py

303 lines
9.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#Nom : : update.py
#Description :
#Copyright : 2021, CEN38
#Auteur : Colas Geier
#Version : 1.0
"""
Module contains tools for processing files into DataFrames or other objects
"""
import geopandas as gpd
import pandas as pd
from .params import DIC_REF_HAB, DIC_UNIQUE_KEY
from .tools import _get_table
#####################################
### Update ###
#####################################
def __get_pkey__(engine,table_name,schema):
pk = engine.dialect.get_pk_constraint(engine,table_name=table_name,schema=schema)
return pk
def __get_dtype__(engine,table_name,schema):
cols = engine.dialect.get_columns(engine,table_name=table_name,schema=schema)
# name_cols = [i['name'] for i in cols]
# type_cols = [i['type'] for i in cols]
type_cols = {i['name']:i['type'] for i in cols}
return type_cols
def _get_dic(self):
if self._table:
select_cols = DIC_REF_HAB[self._table]
return select_cols
def _get_schema_name(self):
engine = self.con
lst_sch = engine.dialect.get_schema_names(engine)
tab = []
for sch in lst_sch:
lst_tab = engine.dialect.get_table_names(engine,schema=sch)
if self._table in lst_tab:
tab += [sch]
if len(tab) > 1:
print('La table %s exitent dans plusieurs schéma ! Préciser l''argument ''schema'''%self._table)
elif len(tab) == 0:
print('La table %s sitée n''existe pas ...')
else:
return tab[0]
def _check_data_exist(self, df):
engine = self.con
pk = __get_pkey__(engine,table_name=self._table,schema=self._schema)
self._pkey = pk['constrained_columns'][0]
pkey = self._pkey
if self._table in DIC_UNIQUE_KEY.keys():
pkey = DIC_UNIQUE_KEY[self._table]
data_indb = _get_table(self.con, self._schema, self._table, cols=[pkey])
data_exist_not = df[~df[pkey].isin(data_indb[pkey])]
data_exist = df[df[pkey].isin(data_indb[pkey])]
data_exist_eq, data_exist_ne = _check_eq_df(self, data_exist)
return [data_exist_eq, data_exist_ne, data_exist_not]
def _check_eq_df(self,df):
from .zh import _get_table
pkey = self._pkey
params = {}
if isinstance(pkey, list):
for key in pkey:
p = {key: df[key]}
params = {**params, **p}
else:
params = {pkey:df[pkey]}
data = _get_table(self.con, self._schema, self._table, cols=df.columns.tolist(), params_col=params)
if self._table in DIC_UNIQUE_KEY.keys():
if DIC_UNIQUE_KEY[self._table] != pkey:
df = df.sort_values(DIC_UNIQUE_KEY[self._table])
data = data.sort_values(DIC_UNIQUE_KEY[self._table])
else :
df = df.sort_values(pkey)
data = data.sort_values(pkey)
else:
df = df.sort_values(pkey)
data = data.sort_values(pkey)
if self._table in DIC_UNIQUE_KEY.keys():
if DIC_UNIQUE_KEY[self._table] != pkey:
df.set_index(pkey, inplace=True)
data.set_index(pkey, inplace=True)
eq = df[df.eq(data).all(axis=1)]
ne = df[df.ne(data).any(axis=1)]
eq.reset_index(drop=False, inplace=True)
ne.reset_index(drop=False, inplace=True)
else:
eq = df[df.eq(data).all(axis=1)]
ne = df[df.ne(data).any(axis=1)]
else:
eq = df[df.eq(data).all(axis=1)]
ne = df[df.ne(data).any(axis=1)]
return [eq,ne]
def __openFile__(file):
if isinstance(file,str):
df = pd.read_table(file)
class update_ref_table:
'''
Mise à jour des tables de référance situées dans les
schéma :
ref_habitats, ref_hydro
Paramètres :
------------
file : str. chemain du vecteur.
table : str. Nom de la table à mettre à jour.
schema : str. Nom du schéma.
update : bool. Si True, mise à jour des champs différents.
Défault False.
'''
def __init__(self, file, table, schema=None, update=False):
from .params import con
self.con = con
self._file = file
self._table = table
self._updt = update
self._schema = schema
if not self._schema:
self._schema = _get_schema_name(self)
self._pkey = __get_pkey__(
self.con,table_name=self._table,schema=self._schema
)['constrained_columns'][0]
self._update_table_with_geom()
def _update_table_with_geom(self):
dic = _get_dic(self)
select_cols = list(dic.values())
if isinstance(self._file,str):
df = gpd.read_file(self._file)
elif isinstance(self._file,gpd.GeoDataFrame):
df = self._file.copy()
else:
print('Argument ''file'' ERROR !')
if 'id' not in df.columns:
df.index.name = 'id'
df.reset_index(drop=False, inplace=True)
if not df.crs:
df.set_crs(epsg=2154, inplace=True)
if df.crs.srs != 'epsg:2154':
df.to_crs(epsg=2154, inplace=True)
df.rename(columns=dic, inplace=True)
df = df[select_cols]
df = df.set_geometry('geom')
if 'Polygon' in df.geom_type.unique() and 'MultiPolygon' in df.geom_type.unique():
from shapely.geometry.multipolygon import MultiPolygon
tmp = df.loc[df.geom_type == 'Polygon'].copy()
geom = [MultiPolygon([x]) for x in tmp.loc[tmp.geom_type == 'Polygon','geom']]
tmp = tmp.set_geometry(geom)
df = pd.concat([df.drop(tmp.index), tmp]).sort_values('id')
if 'LineString' in df.geom_type.unique() and 'MultiLineString' in df.geom_type.unique():
from shapely.geometry.multilinestring import MultiLineString
tmp = df.loc[df.geom_type == 'LineString'].copy()
geom = [MultiLineString([x]) for x in tmp.loc[tmp.geom_type == 'LineString','geom']]
tmp = tmp.set_geometry(geom)
df = pd.concat([df.drop(tmp.index), tmp]).sort_values('id')
df_exst, df_updt, df_imp = _check_data_exist(self,df)
len_imp = len(df_imp)
len_updt = len(df_updt)
len_exst = len(df_exst)
if len_exst > 0:
print('DATAS {0}/{1} already exist'.format(len_exst,len(df)))
else:
print('NO DATA EXIST ...')
if df_imp.empty:
print('NO DATA TO IMPORT ...')
else :
Q = input('IMPORT {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_imp,len(df)))
while Q.lower() in ['view','v']:
print(df_imp)
Q = input('IMPORT {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_imp,len(df)))
if Q.lower() in ['yes','y']:
print('IMPORT {0}/{1} ...'.format(len_imp,len(df)))
df_imp.to_postgis(
name = self._table,
con = self.con,
schema = self._schema,
if_exists = 'append',
geom_col = 'geom'
)
elif Q.lower() in ['no','n']:
pass
if df_updt.empty:
print('NO DATA TO UPDATE ...')
else :
Q = input('UPDATE {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_updt,len(df)))
while Q.lower() in ['view','v']:
print(df_updt)
Q = input('UPDATE {0}/{1} ... Yes(y)/No(n)/View(v) ?'.format(len_updt,len(df)))
if Q.lower() in ['yes','y']:
print('UPDATE {0}/{1} ...'.format(len_updt,len(df)))
update.update_to_sql(
df=df_updt,
con=self.con,
table_name=self._table,
schema_name=self._schema,
key_name=self._pkey
)
elif Q.lower() in ['no','n']:
pass
class update:
def update_ref(file, table, schema=None, update=False):
update_ref_table(file, table, schema, update)
def update_to_sql(df, con, table_name, schema_name, key_name):
from sys import exit
a = []
b = []
table = table_name
schema = schema_name
primary_key = key_name
pkey = __get_pkey__(
con,table_name=table,schema=schema)
type_cols = __get_dtype__(
con,table_name=table,schema=schema)
if not all(item in pkey['constrained_columns'] for item in df.columns):
print('Le(s) champs clé primaire "%s" ne figure pas dans le DataFrame'%pkey['constrained_columns'])
Q = input('Voulez-vous continuer la mise à jour ? (y/n) ')
if Q.lower() == 'y':
pass
else :
return print('Données non mise à jour')
# if pkey not in df.columns:
# exit('Le champs clé primaire "%s" ne figure pas dans le DataFrame'%pkey)
if isinstance(primary_key, str):
primary_key = [primary_key]
for col in df.columns:
if col in primary_key:
b.append("t.{col}=f.{col}".format(col=col))
else:
dtype = type_cols[col]
if hasattr(dtype,'enums') :
dty = '.'.join([dtype.schema,dtype.name])
a.append("{col}=t.{col}::{typ}".format(col=col,typ=dty))
else:
a.append("{col}=t.{col}".format(col=col))
if isinstance(df, gpd.GeoDataFrame):
df.to_postgis(
name = 'temp_table',
con = con,
schema = schema,
if_exists = 'replace',
geom_col = df.geometry.name
)
else:
df.to_sql(
name = 'temp_table',
con = con,
schema = schema,
if_exists = 'replace',
index = False,
method = 'multi'
)
update_stmt_1 = "UPDATE {sch}.{final_table} f".format(sch=schema,final_table=table)
update_stmt_2 = " FROM {sch}.temp_table t".format(sch=schema)
update_stmt_6 = " WHERE %s"%' AND '.join(b)
update_stmt_3 = " SET "
update_stmt_4 = ", ".join(a)
update_stmt_5 = update_stmt_1 + update_stmt_3 + update_stmt_4 + update_stmt_2 + update_stmt_6 + ";"
drop_stmt = "DROP TABLE {sch}.temp_table ;".format(sch=schema)
with con.begin() as cnx:
cnx.execute(update_stmt_5)
cnx.execute(drop_stmt)
return print('END update')