#!/usr/bin/env python3 # -*- coding: UTF-8 -*- #Nom : : tools.py #Description : #Copyright : 2021, CEN38 #Auteur : Colas Geier #Version : 1.0 from pandas import Series, Index, read_sql, merge ##################################### ### Fonctions générales ### ##################################### def _aggr_cols(df, lst_col, sep=''): df['aggreg'] = '' for c,col in enumerate(lst_col): add = '' if c > 0: add = sep df.loc[~df[col].isna(),'aggreg'] = df.loc[~df[col].isna(),'aggreg'] + add + df.loc[~df[col].isna(),col] return df def to_tuple(obj): if isinstance(obj, (list,Series)): obj = tuple(obj) if isinstance(obj, (int, str)) : obj = tuple([obj]) return obj def to_colStringSQL(obj): if isinstance(obj, (int)) : obj = str(obj) if isinstance(obj, (list,Index)): obj = ",".join(obj) # Don't work with df.columns return obj def to_upper(obj): if isinstance(obj, Series): obj = Series([o.upper() for o in list(obj)]) if isinstance(obj, tuple): obj = tuple([o.upper() for o in list(obj)]) if isinstance(obj, list) : obj = [o.upper() for o in obj] if isinstance(obj, str) : obj = obj.upper() return obj def to_upperfirst(obj): if isinstance(obj, Series): obj = Series([o.upper()[0] + o.lower()[1:] for o in list(obj)]) if isinstance(obj, tuple): obj = tuple([o.upper()[0] + o.lower()[1:] for o in list(obj)]) if isinstance(obj, list) : obj = [o.upper()[0] + o.lower()[1:] for o in obj] if isinstance(obj, str) : obj = obj.upper()[0] + obj.lower()[1:] return obj def dropZ(df,geom_col='geom'): from shapely import wkb df = df.copy() df.loc[df[geom_col].has_z, geom_col] = [ wkb.loads(wkb.dumps(geom, output_dimension=2)) for geom in df.loc[df[geom_col].has_z, geom_col] ] # if all(df_sites['geom'].has_z): # # Suppression de la dimension Z # geom_type = df_sites['geom'].geom_type # df_sites['geom'] = [wkb.loads(wkb.dumps(geom, output_dimension=2)) for geom in df_sites['geom']] # df_sites.set_geometry('geom', drop=False, inplace=True, crs=crs) return df def remove_empty_keys(d): for k in d.keys(): if not d[k]: del d[k] def _get_table(con, schema, table, ids=None, nom=None, cols=None, params_col={}, statut='actif'): ''' Selection d'une table: Parameters ---------- con : Connection sqlalchemy schema : str. Nom du schema PostgreSQL. table : str. Nom de la table PostgreSQL. ids : liste. Identifiant de la table. Doit contenir un champ 'id'. nom : liste. Nom de la table. Doit contenir un champ 'nom'. cols : liste. Colonnes de la table à sélectionner. params_col : Dict. Paramètre de Séléction IN. requete : {'column': [liste]} sql trad : 'column IN (liste)' statut : str. 'actif', 'history', 'all'. Statut des sites à récupérer, 'actif'; Date_fin IS NULL 'history'; A une Date_fin 'all'; Tous les Sites ''' sql = 'SELECT * FROM {sch}.{tab}'.format(sch=schema, tab=table) if params_col: params_col = { k: v for k, v in params_col.items() if v } if cols : sql = sql.replace('*', to_colStringSQL(cols) ) # Si arg (ids|nom|params_col), ajout de 'WHERE' if ids or nom or params_col or (statut!='all' and table=='sites') : sql = sql + ' WHERE ' if ids : sql = sql + 'id IN %(ids)s' if ids and (nom or params_col or (statut!='all' and table=='sites')) : sql = sql + ' AND ' if nom : sql = sql + 'nom IN %(nom)s' if nom and (params_col or (statut!='all' and table=='sites')) : sql = sql + ' AND ' if statut=='actif' and table=='sites': sql = sql + ' date_fin IS NULL ' if statut=='history' and table=='sites': sql = sql + ' date_fin IS NOT NULL ' if params_col and (statut!='all' and table=='sites') : sql = sql + ' AND ' if params_col : sql = sql + ' AND '.join([k + ' IN %({})s'.format(k) for k in params_col.keys()]) params_col = {key:to_tuple(params_col[key]) for key in params_col.keys()} sql = sql.replace("',)","')") df = read_sql( sql = sql, con = con, params = {'ids': to_tuple(ids), 'nom': to_tuple(nom), **params_col }) if 'geom' in df.columns: df = _set_geom(df) return df def _set_geom(df, hex=True): from shapely.wkb import loads import geopandas as gpd # set_geometry if hex: df['geom'] = [(loads(geom, hex=True)) for geom in df['geom']] df = df.set_geometry('geom', crs='EPSG:2154') return df def _get_param(schema, param_table, type_table=None, type_court=True): from .params import con if type_table: typ = _get_table(con, schema, table=type_table) par = _get_table(con, schema, table=param_table, params_col={'id_type':typ.id.tolist()}) df = merge(par, typ, left_on='id_type', right_on='id', how='left', suffixes=(None, '_typ')) \ .drop(columns=['id_type','id_typ']) if 'description_typ' in df.columns: del df['description_typ'] if type_court: df = df.drop(columns=['nom_typ']).rename(columns={'nom_court_typ':'type'}) else : df = df.drop(columns=['nom_court_typ'],errors='ignore').rename(columns={'nom_typ':'type'}) index = ['id'] if 'type' in df.columns: index += ['type'] df = df.set_index(index).reset_index() else: df = _get_table(con, schema, table=param_table) return df def _get_relation_tab(schema, tab, id_site=None, nom_site=None, last_update=False, geom=False,params_col={},milieu=None,statut='actif'): ''' Parameters ---------- schema : str. Default : None. Schéma de la database New_cen38. tab : str. Default : None. Schéma de la database New_cen38. id_site : str,list. Default : None. Identifiants des sites présent dans la table 'sites'. nom_site : str,list. Nom des sites présent dans la table 'sites'. last_update : bool. Default : False. If True, récupération des dernières données à jour. If False, récupération des toutes les données. geom : bool. Default : False. Return les geometries des sites params_col : dict. Default : {}. Application des conditions de séléction des données sous la forme d'un dictionnaire {'nomcolumn': conditions}. milieu : str. Default : None. Nom d'un milieu référencé dans la table `sites.type_milieu`. Liste récupérable avec la fonction `pyzh.sites._get_typ_milieux()` statut : str. 'actif', 'history', 'all'. Statut des sites à récupérer, 'actif'; Date_fin IS NULL 'history'; A une Date_fin 'all'; Tous les Sites Return ---------- df ''' from .params import con from .sites.sites import get_sitesGeom # from .pers.pers import _merge_relation, _merge_author table = 'sites' dfSG = get_sitesGeom(columns='date', id_site=id_site, nom_site=nom_site, last_update=last_update,params_col=params_col,milieu=milieu,statut=statut) if not geom and not dfSG.empty: dfSG.drop('geom',1,inplace=True) ids = dfSG.id.tolist() table = tab if ids : df = _get_table(con, schema, table, params_col={'id_geom_site':ids}) if last_update: tmp = ['id', 'date', 'valid'] col = [*df.columns[~df.columns.isin(tmp)]] df = df.sort_values(col).reset_index(drop=True) df.drop_duplicates(subset=col, keep='last', inplace=True) df.reset_index(drop=True, inplace=True) df = df[df.valid].copy() if 'date' in dfSG.columns and 'date' in df.columns: dfSG.rename(columns={'date':'date_geom'}, inplace=True) df.rename(columns={'date':'date_'+table.rsplit('_',1)[1][:5]}, inplace=True) # if table == 'r_site_sub': # print('DF : {}'.format(df)) # print(df.empty) # print('DFSG : {}'.format(dfSG)) if not df.empty: df = merge(dfSG,df, how='left', left_on='id', right_on='id_geom_site', suffixes=('_x', None)) \ .drop(['id_x','id_geom_site'],1) \ .set_index('id').reset_index() return df else: print('PAS de géometries de sites sélectionnées ...') def _get_relation_autor(df, relation_tab, schema, id_df, id_relation, id_rela_auth='id_auteur'): from .pers.pers import _merge_relation, _merge_author if 'site' in relation_tab: suffixe = relation_tab.split('_')[1].split('site')[1] suffixe = '_' + suffixe df = _merge_relation(df=df,table=relation_tab,schema=schema, left_id = id_df, right_id = id_relation) df = _merge_author(df=df, col_aut=id_rela_auth, on_index=True) df.rename(columns={'auteur': 'auteur'+suffixe}, inplace=True) return df def to_geoms(geometries): from shapely.geometry import Polygon,LineString for geometry in geometries: if isinstance(geometry, (Polygon,LineString)): yield geometry else: yield from geometry def union_polygons_geometry(df): ''' Transforme un GeoDataFrame de Polygons et/ou MultiPolygons en un MultiPolygon unique: Parameters ---------- df : GeoDataFrame. ''' from shapely.geometry import MultiPolygon name_geom = df.geometry.name poly = df.loc[df.geom_type=='Polygon',name_geom].tolist() multipoly = df.loc[df.geom_type=='MultiPolygon',name_geom].tolist() if poly: mp2 = MultiPolygon(poly) if poly and multipoly: res = MultiPolygon(to_geoms([*mp2, *multipoly])) elif not poly and multipoly: res = MultiPolygon(to_geoms(multipoly)) elif not multipoly and poly: res = MultiPolygon(poly) return res def union_lines_geometry(df): from shapely.geometry import MultiLineString name_geom = df.geometry.name line = df.loc[df.geom_type=='LineString',name_geom].tolist() multiline = df.loc[df.geom_type=='MultiLineString',name_geom].tolist() if line: mp2 = MultiLineString(line) if line and multiline: res = MultiLineString(to_geoms([*mp2, *multiline])) elif not line and multiline: res = MultiLineString(to_geoms([*multiline])) elif not multiline and line: res = MultiLineString(line) return res def calc_recouvrmt(df1,df2): ''' Calcule le recouvrement de df2 sur df1 pour chaque géométrie de df1: Parameters ---------- df1 : GeoDataFrame. df2 : GeoDataFrame. ''' from geopandas import sjoin tmp = sjoin( df1, df2[['geom']], op = 'intersects', how = 'left') tmp.dropna(subset=['index_right'],inplace=True) tmp.index_right = tmp.index_right.astype(int) tmp.reset_index(inplace=True) tmp = tmp.join( df2[['geom']].rename(columns={'geom': 'right_geom'}), on=['index_right'], how='left') tmp2 = tmp[['index_right','right_geom']].copy() \ .rename(columns={'right_geom': 'geom'}) \ .set_geometry('geom') tmp1 = tmp[['id_site','geom']].copy() \ .set_geometry('geom') if not tmp1.geom.values.is_valid.all(): tmp1.loc[~tmp1.geom.values.is_valid,'geom'] = tmp1.loc[~tmp1.geom.values.is_valid,'geom'].buffer(0) if not tmp2.geom.values.is_valid.all(): tmp2.loc[~tmp2.geom.values.is_valid,'geom'] = tmp2.loc[~tmp2.geom.values.is_valid,'geom'].buffer(0) tmp['perc_rcvmt'] = (tmp1.intersection(tmp2).area/tmp1.area)*100 tmp = tmp.groupby(['id_site']).sum().reset_index() df1 = df1.merge(tmp[['id_site','perc_rcvmt']], on=['id_site'], how='left') df1.perc_rcvmt.fillna(0, inplace=True) df1.perc_rcvmt = df1.perc_rcvmt.round(2) return df1 def Polygons_to_MultiPolygon(df): from shapely.geometry import MultiPolygon from pandas import concat df = df.copy() multi = df.loc[df.geom_type=='MultiPolygon'].copy() poly = df.loc[df.geom_type=='Polygon'].copy() poly['geom'] = [MultiPolygon([geom]) for geom in df.loc[df.geom_type=='Polygon','geom'] ] df = concat([multi,poly]) df.sort_index(inplace=True) return df