#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import requests import numpy as np import pandas as pd import os def get_status(lst,con): sql = """ SELECT t.cd_nom, t.cd_ref, t.regne, t.phylum, t.classe, t.ordre, t.famille, t.group1_inpn, t.group2_inpn, t.group3_inpn, t.nom_vern, t.nom_complet, t.nom_valide, t.lb_nom, --s.* s.cd_sig, s.rq_statut, s.code_statut, s.cd_type_statut, s.label_statut, s.niveau_admin, s.full_citation, s.doc_url FROM taxonomie.taxref t JOIN taxonomie.v_bdc_status s USING (cd_nom) WHERE t.cd_nom IN {cd_nom} ;""".format(cd_nom = tuple(lst)) return pd.read_sql_query(sql,con) def get_type_status(con): sql = """ SELECT * FROM taxonomie.bdc_statut_type ;""" return pd.read_sql_query(sql,con) def get_api_status(api,cd_nom:int): res = requests.api.get('%s/%i'%(api,cd_nom)) if res.status_code == 200: return res.json() else : raise('Error : %i\tcd_nom : %i'%(res.status_code,cd_nom)) def get_taxon_status(lst,api): from datetime import datetime as dt init = dt.now() st = [get_api_status(api,x) for x in lst] # TOO LONG print(dt.now()-init) phylo = { 'cd_ref':[x['cd_ref'] for x in st], 'nom_valide':[x['nom_valide'] if 'nom_valide' in x.keys() else None for x in st], 'nom_vernac':[x['nom_vern'] if 'nom_vern' in x.keys() else None for x in st], 'regne':[x['regne'] if 'regne' in x.keys() else None for x in st], 'group1_inp':[x['group1_inpn'] if 'group1_inpn' in x.keys() else None for x in st], 'group2_inp':[x['group2_inp'] if 'group2_inp' in x.keys() else None for x in st], 'group3_inpn':[x['group3_inpn'] for x in st], 'classe':[x['classe'] if 'classe' in x.keys() else None for x in st], 'ordre':[x['ordre'] if 'ordre' in x.keys() else None for x in st], 'famille':[x['famille'] if 'famille' in x.keys() else None for x in st]} cd_status = { 'AL':[ [val['values'][v]['code_statut'] for val in x['status']['AL']['text'].values() for v in val['values'] ] if 'AL' in x['status'].keys() else None for x in st ], 'BERN':[ [val['values'][v]['code_statut'] for val in x['status']['BERN']['text'].values() for v in val['values'] ] if 'BERN' in x['status'].keys() else None for x in st ], 'BONN':[ [val['values'][v]['code_statut'] for val in x['status']['BONN']['text'].values() for v in val['values'] ] if 'BONN' in x['status'].keys() else None for x in st ], 'DH':[ [val['values'][v]['code_statut'] for val in x['status']['DH']['text'].values() for v in val['values'] ] if 'DH' in x['status'].keys() else None for x in st ], 'DO':[ [val['values'][v]['code_statut'] for val in x['status']['DO']['text'].values() for v in val['values'] ] if 'DO' in x['status'].keys() else None for x in st ], 'LRE':[ [val['values'][v]['code_statut'] for val in x['status']['LRE']['text'].values() for v in val['values'] ] if 'LRE' in x['status'].keys() else None for x in st ], 'LRM':[ [val['values'][v]['code_statut'] for val in x['status']['LRM']['text'].values() for v in val['values'] ] if 'LRM' in x['status'].keys() else None for x in st ], 'LRN':[ [val['values'][v]['code_statut'] for val in x['status']['LRN']['text'].values() for v in val['values'] ] if 'LRN' in x['status'].keys() else None for x in st ], 'LRR':[ [val['values'][v]['code_statut'] for val in x['status']['LRR']['text'].values() for v in val['values'] ] if 'LRR' in x['status'].keys() else None for x in st ], 'PAPNAT':[ [val['values'][v]['code_statut'] for val in x['status']['PAPNAT']['text'].values() for v in val['values'] ] if 'PAPNAT' in x['status'].keys() else None for x in st ], 'PD':[ [val['values'][v]['code_statut'] for val in x['status']['PD']['text'].values() for v in val['values'] ] if 'PD' in x['status'].keys() else None for x in st ], 'PNA':[ [val['values'][v]['code_statut'] for val in x['status']['PNA']['text'].values() for v in val['values'] ] if 'PNA' in x['status'].keys() else None for x in st ], 'PR':[ [val['values'][v]['code_statut'] for val in x['status']['PR']['text'].values() for v in val['values'] ] if 'PR' in x['status'].keys() else None for x in st ], 'REGL':[ [val['values'][v]['code_statut'] for val in x['status']['REGL']['text'].values() for v in val['values'] ] if 'REGL' in x['status'].keys() else None for x in st ], 'REGLII':[ [val['values'][v]['code_statut'] for val in x['status']['REGLII']['text'].values() for v in val['values'] ] if 'REGLII' in x['status'].keys() else None for x in st ], 'REGLLUTTE':[ [val['values'][v]['code_statut'] for val in x['status']['REGLLUTTE']['text'].values() for v in val['values'] ] if 'REGLLUTTE' in x['status'].keys() else None for x in st ], 'REGLSO':[ [val['values'][v]['code_statut'] for val in x['status']['REGLSO']['text'].values() for v in val['values'] ] if 'REGLSO' in x['status'].keys() else None for x in st ], 'SCAP NAT':[ [val['values'][v]['code_statut'] for val in x['status']['SCAP NAT']['text'].values() for v in val['values'] ] if 'SCAP NAT' in x['status'].keys() else None for x in st ], 'SCAP REG':[ [val['values'][v]['code_statut'] for val in x['status']['SCAP REG']['text'].values() for v in val['values'] ] if 'SCAP REG' in x['status'].keys() else None for x in st ], 'SENSNAT':[ [val['values'][v]['code_statut'] for val in x['status']['SENSNAT']['text'].values() for v in val['values'] ] if 'SENSNAT' in x['status'].keys() else None for x in st ], 'ZDET':[ [val['values'][v]['code_statut'] for val in x['status']['ZDET']['text'].values() for v in val['values'] ] if 'ZDET' in x['status'].keys() else None for x in st ], 'exPNA':[ [val['values'][v]['code_statut'] for val in x['status']['exPNA']['text'].values() for v in val['values'] ] if 'exPNA' in x['status'].keys() else None for x in st ] } return pd.DataFrame({**phylo,**cd_status}) def filter_bio_geo(df,zone_bio): idNotBioGeo = [] # Filtre du dommaine biogeographique sur la plaine Rhodanienne et Alpine test_rhod = df.rq_statut.str.contains('rhodanienne : Non déterminante',na=False) test_alpi = df.rq_statut.str.contains('Alpine : Non déterminante',na=False) if zone_bio == 'rhod': idNotBioGeo = df[test_rhod].index elif zone_bio == 'alpi': idNotBioGeo = df[test_alpi].index elif zone_bio == 'all': idNotBioGeo = df[test_rhod&test_alpi].index if not idNotBioGeo.empty: df.drop(idNotBioGeo, inplace=True) return df def form_territoire(df,terr): is_dep = df.cd_sig.str.contains('INSEED') dep = ( ['38','42','07','26'] if terr == 'platiere' else ['38','69','01'] if terr == 'negria' else ['38'] ) keep_not = df[is_dep&(~df.cd_sig.str[-2:].isin(dep))].index df.drop(keep_not,inplace=True) is_dep = df.cd_sig.str.contains('INSEED') is_38 = df.cd_sig=='INSEED38' if terr == 'isere': filter_id = df[is_dep&(~is_38)].index df.drop(filter_id,inplace=True) else: lst_stat = df[is_dep&(~is_38)].cd_type_statut.unique() # is_lst = df.cd_type_statut.isin(lst_stat) df.loc[is_dep&(~is_38),'cd_type_statut'] = ( df[is_dep&(~is_38)] .cd_type_statut + '_' + df[is_dep&(~is_38)].cd_sig.str.strip('INSEED') ) df.loc[is_38,'cd_type_statut'] = ( df[is_38] .cd_type_statut + '_38' ) return df dict_dep = { '38':'Isère', '42':'Loire', '07':'Ardèche', '26':'Drôme', } cols_rename = { 'nom_vernaculaire': 'nom_vern' } if __name__ == "__main__": # Définition de la connection à la bdd GéoNature from pycen import con_gn # NOT USE FOR NOW - API Taxref api_taxref = 'https://geonature.cen-isere.fr/taxhub/api/taxref' # Paramètres de chargement du fichier des taxons PATH0 = '/home/cgeier/Téléchargements' PATH = '' file = 'synthese_observations_2026-04-23.xlsx' sheet = 'observations' zone_bio_znieff = 'rhod' # ['rhod', 'alpi', 'all'] territoire = 'negria' # ['isere', 'platiere','negria'] # [GEOMETRY PARAMS] keep_geomtype = None # ['polygon', 'point', 'ligne', None] geom_col = 'geometrie_wkt_4326' # Liste des CD_NOM en entrée cd_col = 'cd_nom' # Nom de la colonne à utiliser dans le feuillet ``sheet`` # Lecture des données taxlist = pd.read_excel(os.path.join(PATH0,PATH,file),sheet,usecols=[cd_col],header=0) tab_sp = pd.read_excel(os.path.join(PATH0,PATH,file),sheet,index_col=cd_col) tab_sp.rename(columns=cols_rename,inplace=True) # Exclusion d'un type de géométrie if keep_geomtype is not None: as_geom = tab_sp[geom_col].str.contains(keep_geomtype.upper(),na=False) tab_sp = tab_sp[as_geom] taxlist = taxlist[taxlist.cd_nom.isin(tab_sp.index.tolist())] lst = taxlist[cd_col] # Récupération des statuts df = get_status(taxlist[cd_col].astype(str).unique(),con_gn) typ = get_type_status(con_gn) typ = typ[typ.cd_type_statut.isin(df.cd_type_statut.unique())] # Distinction LRR [old vs new] région is_lrr = df.cd_type_statut == 'LRR' df.loc[is_lrr & (df.niveau_admin == 'Région'),'cd_type_statut'] = 'LRR_AURA' df.loc[is_lrr & (df.niveau_admin == 'Ancienne région'),'cd_type_statut'] = 'LRR_RA' del df['niveau_admin'] # Filtre du dommaine biogeographique sur la plaine Rhodanienne et Alpine df = filter_bio_geo(df,zone_bio_znieff) # Filtre des statuts vis à vis du territoire d'intérêts # Conservation des statuts Adrèche, Drôme et Loire pour la Platière df = form_territoire(df,territoire) # for c in ['cd_ref','cd_nom','lb_nom']: # if c in tab_sp.columns: # # if 'cd_nom' not in df.columns and c == 'cd_ref': continue # tab_sp.drop(c,axis=1,inplace=True) keep_cols = same_col = df.columns[df.columns.isin(tab_sp.reset_index(drop=False).columns)] as_vern = keep_cols.str.contains('vern').any() as_numb = keep_cols.str.contains('nombre').any() if not as_vern: keep_cols = [*keep_cols,'nom_vern'] if not as_numb: keep_cols = [*keep_cols,'nombre_min','nombre_max'] else: same_col = [x for x in keep_cols if x not in ['nombre_min','nombre_max']] piv = pd.pivot_table( df, values='code_statut', index=['cd_nom', 'cd_ref','lb_nom','nom_vern'#,'niveau_admin','lb_adm_tr' ], columns=['cd_type_statut'], aggfunc=list,fill_value=None) for c in piv.columns: piv[c] = [x[0] if x is not np.NaN and len(x)==1 else x for x in piv[c]] if 'DH' in piv.columns: piv['DH'] = [','.join(x) if (x is not np.NaN) and (len(x)==2) else x for x in piv['DH']] piv.DH.replace({'CDH':''},regex=True,inplace=True) # piv.reset_index(-1, inplace=True) # tabsp_nb = tab_sp.reset_index(drop=False)[[*same_col,'nombre_min','nombre_max']].groupby([*same_col]).sum() tabsp_nb = ( tab_sp.reset_index(drop=False)[same_col] .drop_duplicates() .merge( tab_sp.reset_index(drop=False)[[cd_col,'nombre_min','nombre_max','date_debut','date_fin']] .groupby([cd_col]) .agg({ 'nombre_min':'sum', 'nombre_max':'sum', 'date_debut':'min', 'date_fin':'max' }), on = cd_col ) ) tabsp_nb.set_index(cd_col,inplace=True) tabsp_nb.sort_index(inplace=True) # pivot = tab_sp.merge(piv,on=[cd_col],how='left') pivot = tabsp_nb.merge(piv,on=[cd_col],how='left') pivlib = pd.pivot_table( df, values='label_statut', index=[ 'cd_nom', 'cd_ref','lb_nom','nom_vern'#,'niveau_admin','lb_adm_tr' ], columns=['cd_type_statut'], aggfunc=list,fill_value=None) for c in pivlib.columns: pivlib[c] = [x[0] if x is not np.NaN and len(x)==1 else x for x in pivlib[c]] if 'DH' in pivot.columns: pivlib['DH'] = [','.join(x) if (x is not np.NaN) and (len(x)==2) else x for x in pivlib['DH']] pivlib.DH.replace({'CDH':''},regex=True,inplace=True) # pivlib = tab_sp.merge(pivlib,on=[cd_col],how='left') pivotlib = tabsp_nb.merge(pivlib,on=[cd_col],how='left') tx_nb_missing = df[~df.cd_nom.isin(tabsp_nb.index)].shape[0] if tx_nb_missing > 0: print('WARNING : %i taxon(s) is MISING !! \n'%tx_nb_missing) print('INIT writer') NAME_OUT = os.path.join('~/',sheet+'_status.xlsx') if keep_geomtype is not None: NAME_OUT = NAME_OUT[:-5]+' (%s only).xlsx'%keep_geomtype.lower() with pd.ExcelWriter(NAME_OUT) as writer: df.to_excel( writer,sheet_name='v_bdc_status',index=False ) # writer.save() print('v_bdc_status OK !') pivot.to_excel( writer,sheet_name='pivot_table' ) # writer.save() print('pivot_table OK !') pivotlib.to_excel( writer,sheet_name='pivot_libel' ) # writer.save() print('pivot_libel OK !') typ.to_excel( writer,sheet_name='dic_type_statut',index=False ) # writer.save() print('dic_type_statut OK !') print('END writing %s'%NAME_OUT)