From 2630b7fc4bf6e8f7fd8eaff17c406fb53c3ed36b Mon Sep 17 00:00:00 2001 From: Colas Geier Date: Fri, 31 Jan 2025 11:52:33 +0100 Subject: [PATCH] =?UTF-8?q?Clean=20d=C3=A9pot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- insert_liste_alerte.py | 289 ----------------------------------------- pivot_bdc_status.py | 289 ----------------------------------------- 2 files changed, 578 deletions(-) delete mode 100644 insert_liste_alerte.py delete mode 100644 pivot_bdc_status.py diff --git a/insert_liste_alerte.py b/insert_liste_alerte.py deleted file mode 100644 index b9201db..0000000 --- a/insert_liste_alerte.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: UTF-8 -*- - -def test_data(con,tab,col,status): - sql = ''' - SELECT count({col}) FROM {sch}.{tab} WHERE {col} = '{status}' - ;'''.format(sch='taxonomie',tab=tab,col=col,status=status) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def test_cor_values(con,vals): - sql = ''' - SELECT count(id_value_text) FROM {sch}.{tab} WHERE (id_value,id_text) = {vals} - ;'''.format(sch='taxonomie',tab='bdc_statut_cor_text_values',vals=vals) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def test_status_type(con,col,status): - sql = ''' - SELECT count({col}) FROM {sch}.{tab} WHERE {col} = '{status}' - ;'''.format(sch='taxonomie',tab='bdc_statut_type',col=col,status=status) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def insert_status_alerte(con): - """ - Insert type de statut pour les alertes - - Si la valeur AL n'existe pas dans la table bdc_statut_type, on l'insert - """ - if test_data(con,'bdc_statut_type','cd_type_statut','AL') > 0: - # if test_status_type(con,'cd_type_statut','AL') > 0: - print('ALERTE STATUS ALREADY EXISTS') - else: - sql = ''' - INSERT INTO {sch}.{tab} (cd_type_statut,lb_type_statut,regroupement_type,thematique,type_value) VALUES - ('AL','Liste d''alerte départementale','Alerte','STATUTS','VALUE') - ;'''.format(sch='taxonomie',tab='bdc_statut_type') - with con_gn.begin() as cnx: - cnx.execute(sql) - -def insert_status_values(con): - """ - Inserts predefined status values into the 'bdc_statut_values' table if they do not already exist. - - This function iterates over a list of status values, checking whether each value already exists in the - specified database table. If a value does not exist, it inserts the value into the table. Status values - include a code and a label, which describe the extinction risk or conservation status of a taxonomic - group at the departmental level. - - Args: - con: A SQLAlchemy connection object to the database. - - Note: - This function assumes the existence of a schema named 'taxonomie' and a table named - 'bdc_statut_values' in the database connected via 'con'. It also assumes the availability of a - 'con_gn' connection for executing the SQL commands. - """ - - vals = [ - ['RE','Disparue au niveau départemental'], - ['AS-1','Quasi menacée (localisées sans signe de déclin)'], - ['AS-2','Quasi menacée (répandues mais en déclin)'], - ['AS-3','Quasi menacée (répandues, déclin à confirmer)'] - ] - for val in vals: - if test_data(con,'bdc_statut_values','label_statut',val[1]) > 0: - print('ALERTE VALUE STATUS ALREADY EXISTS : ',val[1]) - else: - sql = ''' - INSERT INTO {sch}.{tab} (code_statut,label_statut) VALUES - ('{val0}','{val1}') - '''.format(sch='taxonomie',tab='bdc_statut_values',val0=val[0],val1=val[1]) - with con_gn.begin() as cnx: - cnx.execute(sql) - -def get_text_id(con,cd_doc): - sql = ''' - SELECT id_text FROM {sch}.{tab} WHERE cd_doc = '{cd_doc}' - ;'''.format(sch='taxonomie',tab='bdc_statut_text',cd_doc=cd_doc) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def get_area_id(con,area): - sql = ''' - SELECT id_area FROM {sch}.{tab} WHERE area_name = '{area}' - ;'''.format(sch='ref_geo',tab='l_areas',area=area) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def get_values_id(con,col,terme): - if isinstance(terme,int|str): - terme = [terme] - else: - terme = list(terme) - sql = ''' - SELECT id_value FROM {sch}.{tab} WHERE {col} IN {terme} AND label_statut <> 'Disparue au niveau régional' - ;'''.format(sch='taxonomie',tab='bdc_statut_values',col=col,terme=tuple(terme)).replace(',)',')') - with con.begin() as cnx: - return cnx.execute(sql).all() - -def insert_status_cor_text_area(con,id_doc): - id_text = get_text_id(con,id_doc) - id_area = get_area_id(con,'Isère') - - check_sql = ''' - SELECT count(id_text) FROM {sch}.{tab} WHERE (id_text,id_area) = ({id_text},{id_area}) - ;'''.format(sch='taxonomie',tab='bdc_statut_cor_text_area',id_text=id_text,id_area=id_area) - with con.begin() as cnx: - if cnx.execute(check_sql).one()[0] == 0: - sql = (''' - INSERT INTO {sch}.{tab} (id_text,id_area) VALUES ({id_text},{id_area}) - ;'''.format(sch='taxonomie',tab='bdc_statut_cor_text_area',id_text=id_text,id_area=id_area)) - cnx.execute(sql) - - -def insert_status_cor_text_values(con,id_doc,values): - insert_status_values(con) - id_text = get_text_id(con,id_doc) - id_vals = [x[0] for x in get_values_id(con,'code_statut',values)] - - zip_vals = tuple(zip(id_vals,[id_text]*len(id_vals))) - cor_vals = [x for x in zip_vals if test_cor_values(con,x)==0] - - sql = (''' - INSERT INTO {sch}.{tab} (id_value,id_text) VALUES {values} - ;''' - .format(sch='taxonomie',tab='bdc_statut_cor_text_values',values=cor_vals) - .replace(r'[','') - .replace(r']','')) - - with con_gn.begin() as cnx: - cnx.execute(sql) - -def get_id_status_cor_text_values(con,id_doc,values): - id_text = get_text_id(con,id_doc) - id_vals = [x[0] for x in get_values_id(con,'code_statut',values)] - - zip_vals = tuple(zip(id_vals,[id_text]*len(id_vals))) - cor_vals = tuple(x for x in zip_vals if test_cor_values(con,x)>0) - - sql = (''' - SELECT id_value_text FROM {sch}.{tab} WHERE (id_value,id_text) IN {cor_vals} - ;''' - .format(sch='taxonomie',tab='bdc_statut_cor_text_values',cor_vals=cor_vals) - .replace('),)','))')) - - with con.begin() as cnx: - return cnx.execute(sql).all() - -def test_status_text(con,col,cd_doc): - sql = ''' - SELECT count({col}) FROM {sch}.{tab} WHERE {col} = '{cd_doc}' - ;'''.format(sch='taxonomie',tab='bdc_statut_text',col=col,cd_doc=cd_doc) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def insert_statut_text(con,cd_doc,doc): - if test_data(con,'bdc_statut_text','cd_doc',cd_doc['id_doc']) > 0: - # if test_status_text(con,'cd_doc',cd_doc) > 0: - print('ALERTE TEXT STATUS ALREADY EXISTS : ',doc) - else: - sql = ''' - INSERT INTO {sch}.{tab} (cd_type_statut,cd_doc,cd_sig,niveau_admin,lb_adm_tr,doc_url,enable) VALUES - ('AL',{cd_doc},'INSEED38','Département','Isère','{doc}',TRUE) - ;'''.format(sch='taxonomie',tab='bdc_statut_text',cd_doc=cd_doc['id_doc'],doc=doc) - with con_gn.begin() as cnx: - cnx.execute(sql) - - insert_status_cor_text_area(con,cd_doc['id_doc']) - insert_status_cor_text_values(con,cd_doc['id_doc'],cd_doc['id_values']) - -def get_cd_ref(con,cd_nom): - sql = ''' - SELECT cd_ref FROM {sch}.{tab} WHERE cd_nom = '{cd_nom}' - ;'''.format(sch='taxonomie',tab='taxref',cd_nom=cd_nom) - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def get_max_idstatuttaxo(con): - sql = ''' - SELECT max(id) FROM {sch}.{tab} - ;'''.format(sch='taxonomie',tab='bdc_statut_taxons') - with con.begin() as cnx: - return cnx.execute(sql).one()[0] - -def insert_status_taxo(con,cd_nom,cd_doc,status): - id_statut_cor = get_id_status_cor_text_values(con,cd_doc,status)[0][0] - cd_ref = get_cd_ref(con,cd_nom) - id_statut_taxo = get_max_idstatuttaxo(con) + 1 - - sql_check = ''' - SELECT count(id) FROM {sch}.{tab} - WHERE id_value_text = '{id_statut_cor}' AND cd_nom = '{cd_nom}' AND cd_ref = '{cd_ref}' - ;'''.format(sch='taxonomie',tab='bdc_statut_taxons',cd_ref=cd_ref,cd_nom=cd_nom,id_statut_cor=id_statut_cor) - with con.begin() as cnx: - check = cnx.execute(sql_check).one()[0] - - if check == 0: - sql = ''' - INSERT INTO {sch}.{tab} (id,id_value_text,cd_nom,cd_ref) VALUES - ('{id_statut_taxo}','{id_statut_cor}','{cd_nom}','{cd_ref}') - ;'''.format(sch='taxonomie',tab='bdc_statut_taxons',id_statut_taxo=id_statut_taxo,cd_ref=cd_ref,cd_nom=cd_nom,id_statut_cor=id_statut_cor) - with con_gn.begin() as cnx: - cnx.execute(sql) - -def get_status_type(con,col,status): - sql = ''' - SELECT * FROM {sch}.{tab} WHERE {col} = '{status}' - ;'''.format(sch='taxonomie',tab='bdc_statut_type',col=col,status=status) - return pd.read_sql(sql,con) - -def get_taxonomie(con,cd_nom): - if isinstance(cd_nom,int): - cd_nom = [cd_nom] - else: - cd_nom = list(cd_nom) - sql = ''' - SELECT cd_nom,cd_ref,cd_sup,lb_nom,lb_auteur,nom_complet_html,nom_valide,regne,phylum,classe,ordre,famille,group1_inpn,group2_inpn FROM {sch}.{tab} WHERE cd_nom IN {cd_nom} - ;'''.format(sch='taxonomie',tab='taxref',cd_nom=tuple(cd_nom)).replace(",)",")") - return pd.read_sql(sql,con) - - -if __name__ == "__main__": - - import pandas as pd - # Définition de la connection à la bdd GéoNature - from pycen import con_gn - - # Le premier feuillet du fichier Excel lu. - # Composition minimale: [CD_NOM,Statut, Source, Source_url] - # ATTENTION: - # - le CD_NOM doit correspondre au CD_NOM de la table taxref - # - Les taxons dont la Source_url est None ou NA seront ignorés - file = '/home/colas/Documents/9_PROJETS/6_GEONATURE/listes_alertes_isère.xlsx' - # Echelle administratif des listes à implémenter - niveau_admin = 'Département' - # Nom du niveau adminsitratif - lb_adm_tr = 'Isère' - # Code SIG du niveau adminnistratif - cd_sig = 'INSEED38' - - insert_status_alerte(con_gn) - # Dictionnaire des listes d’alerte à intégrer, - # identifiant et codes status respectivement concernés - cd_doc = { - 'Statut_de_conservation_des_poissons_et_écrevisses_en_Isère_2015':{'id_doc':999990,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA',]}, - 'Liste_d’alerte_sur_les_orthoptères_menacés_en_Isère_2014':{'id_doc':999991,'id_values':['RE','CR','EN','VU','AS-1','AS-2','AS-3','LC','DD','NA']}, - 'Statuts_de_conservation_de_la_faune_sauvage_en_isere_2016':{'id_doc':999992,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA','NE',]}, - 'Liste_rouge_des_Odonates_de_l’Isère_2013':{'id_doc':999993,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA',]}, - 'Liste_rouge_des_lépidoprere_rhopaloceres_et_zygenes_de_l’Isère_2015':{'id_doc':999994,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA','NE','EX']}, - } - - df = (pd.read_excel(file,keep_default_na=False) - .rename(columns={ - 'Statut':'code_statut', - 'CD_NOM':'cd_nom', - 'Source':'source', - 'Source_url':'doc_url', - })) - df = df.loc[df.doc_url!=''] - for d in df.source.unique(): - doc_url = df.loc[df.source==d,'doc_url'].unique()[0] - insert_statut_text(con_gn,cd_doc[d],doc_url) - - # INSERTION dans la table bdc_statut_taxons - # Boucle sur chaque taxon. Peu prendre quelques minutes. - [ - insert_status_taxo( - con = con_gn, - cd_nom = row.cd_nom, - cd_doc = cd_doc[row.source]['id_doc'], - status = row.code_statut) - for row in df.itertuples() - ] - - st = get_status_type(con_gn,'cd_type_statut','AL') - for c in st: - df[c] = st[c][0] - tax = get_taxonomie(con_gn,df['cd_nom']) - del tax['nom_valide'] - del df['source'] - del df['nom_français'] - del df['nom_latin'] - df = df.merge(tax,how='inner',on='cd_nom') - df['cd_sig'] = cd_sig - df['lb_adm_tr'] = lb_adm_tr - df['niveau_admin'] = niveau_admin - df.to_sql('bdc_statut',con_gn,schema='taxonomie',if_exists='append',index=False) diff --git a/pivot_bdc_status.py b/pivot_bdc_status.py deleted file mode 100644 index 1437d00..0000000 --- a/pivot_bdc_status.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: UTF-8 -*- - -import requests -import numpy as np -import pandas as pd -import os - -def get_status(lst,con): - sql = """ - SELECT - t.cd_nom, - t.cd_ref, - t.regne, - t.phylum, - t.classe, - t.ordre, - t.famille, - t.group1_inpn, - t.group2_inpn, - t.group3_inpn, - t.nom_vern, - t.nom_complet, - t.nom_valide, - t.lb_nom, - --s.* - s.rq_statut, - s.code_statut, - s.cd_type_statut, - s.label_statut, - s.niveau_admin, - s.full_citation, - s.doc_url - FROM taxonomie.taxref t - JOIN taxonomie.v_bdc_status s USING (cd_nom) - WHERE t.cd_nom IN {cd_nom} - ;""".format(cd_nom = tuple(lst)) - return pd.read_sql_query(sql,con) - -def get_api_status(api,cd_nom:int): - res = requests.api.get('%s/%i'%(api,cd_nom)) - if res.status_code == 200: - return res.json() - else : - raise('Error : %i\tcd_nom : %i'%(res.status_code,cd_nom)) - -def get_taxon_status(lst,api): - from datetime import datetime as dt - init = dt.now() - st = [get_api_status(api,x) for x in lst] # TOO LONG - print(dt.now()-init) - phylo = { - 'cd_ref':[x['cd_ref'] for x in st], - 'nom_valide':[x['nom_valide'] if 'nom_valide' in x.keys() else None for x in st], - 'nom_vernac':[x['nom_vern'] if 'nom_vern' in x.keys() else None for x in st], - 'regne':[x['regne'] if 'regne' in x.keys() else None for x in st], - 'group1_inp':[x['group1_inpn'] if 'group1_inpn' in x.keys() else None for x in st], - 'group2_inp':[x['group2_inp'] if 'group2_inp' in x.keys() else None for x in st], - 'group3_inpn':[x['group3_inpn'] for x in st], - 'classe':[x['classe'] if 'classe' in x.keys() else None for x in st], - 'ordre':[x['ordre'] if 'ordre' in x.keys() else None for x in st], - 'famille':[x['famille'] if 'famille' in x.keys() else None for x in st]} - cd_status = { - 'AL':[ - [val['values'][v]['code_statut'] - for val in x['status']['AL']['text'].values() for v in val['values'] ] - if 'AL' in x['status'].keys() else None - for x in st - ], - 'BERN':[ - [val['values'][v]['code_statut'] - for val in x['status']['BERN']['text'].values() for v in val['values'] ] - if 'BERN' in x['status'].keys() else None - for x in st - ], - 'BONN':[ - [val['values'][v]['code_statut'] - for val in x['status']['BONN']['text'].values() for v in val['values'] ] - if 'BONN' in x['status'].keys() else None - for x in st - ], - 'DH':[ - [val['values'][v]['code_statut'] - for val in x['status']['DH']['text'].values() for v in val['values'] ] - if 'DH' in x['status'].keys() else None - for x in st - ], - 'DO':[ - [val['values'][v]['code_statut'] - for val in x['status']['DO']['text'].values() for v in val['values'] ] - if 'DO' in x['status'].keys() else None - for x in st - ], - 'LRE':[ - [val['values'][v]['code_statut'] - for val in x['status']['LRE']['text'].values() for v in val['values'] ] - if 'LRE' in x['status'].keys() else None - for x in st - ], - 'LRM':[ - [val['values'][v]['code_statut'] - for val in x['status']['LRM']['text'].values() for v in val['values'] ] - if 'LRM' in x['status'].keys() else None - for x in st - ], - 'LRN':[ - [val['values'][v]['code_statut'] - for val in x['status']['LRN']['text'].values() for v in val['values'] ] - if 'LRN' in x['status'].keys() else None - for x in st - ], - 'LRR':[ - [val['values'][v]['code_statut'] - for val in x['status']['LRR']['text'].values() for v in val['values'] ] - if 'LRR' in x['status'].keys() else None - for x in st - ], - 'PAPNAT':[ - [val['values'][v]['code_statut'] - for val in x['status']['PAPNAT']['text'].values() for v in val['values'] ] - if 'PAPNAT' in x['status'].keys() else None - for x in st - ], - 'PD':[ - [val['values'][v]['code_statut'] - for val in x['status']['PD']['text'].values() for v in val['values'] ] - if 'PD' in x['status'].keys() else None - for x in st - ], - 'PNA':[ - [val['values'][v]['code_statut'] - for val in x['status']['PNA']['text'].values() for v in val['values'] ] - if 'PNA' in x['status'].keys() else None - for x in st - ], - 'PR':[ - [val['values'][v]['code_statut'] - for val in x['status']['PR']['text'].values() for v in val['values'] ] - if 'PR' in x['status'].keys() else None - for x in st - ], - 'REGL':[ - [val['values'][v]['code_statut'] - for val in x['status']['REGL']['text'].values() for v in val['values'] ] - if 'REGL' in x['status'].keys() else None - for x in st - ], - 'REGLII':[ - [val['values'][v]['code_statut'] - for val in x['status']['REGLII']['text'].values() for v in val['values'] ] - if 'REGLII' in x['status'].keys() else None - for x in st - ], - 'REGLLUTTE':[ - [val['values'][v]['code_statut'] - for val in x['status']['REGLLUTTE']['text'].values() for v in val['values'] ] - if 'REGLLUTTE' in x['status'].keys() else None - for x in st - ], - 'REGLSO':[ - [val['values'][v]['code_statut'] - for val in x['status']['REGLSO']['text'].values() for v in val['values'] ] - if 'REGLSO' in x['status'].keys() else None - for x in st - ], - 'SCAP NAT':[ - [val['values'][v]['code_statut'] - for val in x['status']['SCAP NAT']['text'].values() for v in val['values'] ] - if 'SCAP NAT' in x['status'].keys() else None - for x in st - ], - 'SCAP REG':[ - [val['values'][v]['code_statut'] - for val in x['status']['SCAP REG']['text'].values() for v in val['values'] ] - if 'SCAP REG' in x['status'].keys() else None - for x in st - ], - 'SENSNAT':[ - [val['values'][v]['code_statut'] - for val in x['status']['SENSNAT']['text'].values() for v in val['values'] ] - if 'SENSNAT' in x['status'].keys() else None - for x in st - ], - 'ZDET':[ - [val['values'][v]['code_statut'] - for val in x['status']['ZDET']['text'].values() for v in val['values'] ] - if 'ZDET' in x['status'].keys() else None - for x in st - ], - 'exPNA':[ - [val['values'][v]['code_statut'] - for val in x['status']['exPNA']['text'].values() for v in val['values'] ] - if 'exPNA' in x['status'].keys() else None - for x in st - ] - } - return pd.DataFrame({**phylo,**cd_status}) - -dict_dep = { - '38':'Isère', - '42':'Loire', - '07':'Ardèche', - '26':'Drôme', -} - - -if __name__ == "__main__": - # Définition de la connection à la bdd GéoNature - from pycen import con_gn - # NOT USE FOR NOW - API Taxref - api_taxref = 'https://geonature.cen-isere.fr/taxhub/api/taxref' - - # Paramètres de chargement du fichier des taxons - PATH = '/home/colas/Documents/tmp/CHARVAS' - file = 'liste_sp_CHAR.xlsx' - sheet = 'liste_sp' - - # Liste des CD_NOM en entrée - cd_col = 'cd_ref' # Nom de la colonne à utiliser dans le feuillet ``sheet`` - - # Lecture des données - taxlist = pd.read_excel(os.path.join(PATH,file),sheet,usecols=[cd_col],header=0) - tab_sp = pd.read_excel(os.path.join(PATH,file),sheet,index_col=cd_col) - lst = taxlist[cd_col] - - # Récupération des statuts - df = get_status(taxlist[cd_col].astype(str),con_gn) - - # Distinction LRR [old vs new] région - is_lrr = df.cd_type_statut == 'LRR' - df.loc[is_lrr & (df.niveau_admin == 'Région'),'cd_type_statut'] = 'LRR_AURA' - df.loc[is_lrr & (df.niveau_admin == 'Ancienne région'),'cd_type_statut'] = 'LRR_RA' - del df['niveau_admin'] - - for c in ['cd_ref','cd_nom','lb_nom']: - if c in tab_sp.columns: - # if 'cd_nom' not in df.columns and c == 'cd_ref': continue - tab_sp.drop(c,axis=1,inplace=True) - - pivot = pd.pivot_table( - df, - values='code_statut', - index=['cd_nom', 'cd_ref','lb_nom'#,'niveau_admin','lb_adm_tr' - ], - columns=['cd_type_statut'], - aggfunc=list,fill_value=None) - - for c in pivot.columns: - pivot[c] = [x[0] if x is not np.NaN and len(x)==1 else x for x in pivot[c]] - if 'DH' in pivot.columns: - pivot['DH'] = [','.join(x) if (x is not np.NaN) and (len(x)==2) else x for x in pivot['DH']] - pivot.DH.replace({'CDH':''},regex=True,inplace=True) - - pivot = tab_sp.merge(pivot,on=[cd_col],how='left') - - pivlib = pd.pivot_table( - df, - values='label_statut', - index=[ - 'cd_nom', 'cd_ref','lb_nom'#,'niveau_admin','lb_adm_tr' - ], - columns=['cd_type_statut'], - aggfunc=list,fill_value=None) - for c in pivlib.columns: - pivlib[c] = [x[0] if x is not np.NaN and len(x)==1 else x for x in pivlib[c]] - if 'DH' in pivot.columns: - pivlib['DH'] = [','.join(x) if (x is not np.NaN) and (len(x)==2) else x for x in pivlib['DH']] - pivlib.DH.replace({'CDH':''},regex=True,inplace=True) - - pivlib = tab_sp.merge(pivlib,on=[cd_col],how='left') - - print('INIT writer') - NAME_OUT = os.path.join(PATH,sheet+'_status.xlsx') - with pd.ExcelWriter(NAME_OUT) as writer: - df.to_excel( - writer,sheet_name='v_bdc_status',index=False - ) - # writer.save() - print('v_bdc_status OK !') - pivot.to_excel( - writer,sheet_name='pivot_table' - ) - # writer.save() - print('pivot_table OK !') - pivlib.to_excel( - writer,sheet_name='pivot_libel' - ) - # writer.save() - print('pivot_libel OK !')