Clean dépot

This commit is contained in:
Colas Geier 2025-01-31 11:52:33 +01:00
parent 85b1c5ee87
commit 2630b7fc4b
2 changed files with 0 additions and 578 deletions

View File

@ -1,289 +0,0 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
def test_data(con,tab,col,status):
sql = '''
SELECT count({col}) FROM {sch}.{tab} WHERE {col} = '{status}'
;'''.format(sch='taxonomie',tab=tab,col=col,status=status)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def test_cor_values(con,vals):
sql = '''
SELECT count(id_value_text) FROM {sch}.{tab} WHERE (id_value,id_text) = {vals}
;'''.format(sch='taxonomie',tab='bdc_statut_cor_text_values',vals=vals)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def test_status_type(con,col,status):
sql = '''
SELECT count({col}) FROM {sch}.{tab} WHERE {col} = '{status}'
;'''.format(sch='taxonomie',tab='bdc_statut_type',col=col,status=status)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def insert_status_alerte(con):
"""
Insert type de statut pour les alertes
Si la valeur AL n'existe pas dans la table bdc_statut_type, on l'insert
"""
if test_data(con,'bdc_statut_type','cd_type_statut','AL') > 0:
# if test_status_type(con,'cd_type_statut','AL') > 0:
print('ALERTE STATUS ALREADY EXISTS')
else:
sql = '''
INSERT INTO {sch}.{tab} (cd_type_statut,lb_type_statut,regroupement_type,thematique,type_value) VALUES
('AL','Liste d''alerte départementale','Alerte','STATUTS','VALUE')
;'''.format(sch='taxonomie',tab='bdc_statut_type')
with con_gn.begin() as cnx:
cnx.execute(sql)
def insert_status_values(con):
"""
Inserts predefined status values into the 'bdc_statut_values' table if they do not already exist.
This function iterates over a list of status values, checking whether each value already exists in the
specified database table. If a value does not exist, it inserts the value into the table. Status values
include a code and a label, which describe the extinction risk or conservation status of a taxonomic
group at the departmental level.
Args:
con: A SQLAlchemy connection object to the database.
Note:
This function assumes the existence of a schema named 'taxonomie' and a table named
'bdc_statut_values' in the database connected via 'con'. It also assumes the availability of a
'con_gn' connection for executing the SQL commands.
"""
vals = [
['RE','Disparue au niveau départemental'],
['AS-1','Quasi menacée (localisées sans signe de déclin)'],
['AS-2','Quasi menacée (répandues mais en déclin)'],
['AS-3','Quasi menacée (répandues, déclin à confirmer)']
]
for val in vals:
if test_data(con,'bdc_statut_values','label_statut',val[1]) > 0:
print('ALERTE VALUE STATUS ALREADY EXISTS : ',val[1])
else:
sql = '''
INSERT INTO {sch}.{tab} (code_statut,label_statut) VALUES
('{val0}','{val1}')
'''.format(sch='taxonomie',tab='bdc_statut_values',val0=val[0],val1=val[1])
with con_gn.begin() as cnx:
cnx.execute(sql)
def get_text_id(con,cd_doc):
sql = '''
SELECT id_text FROM {sch}.{tab} WHERE cd_doc = '{cd_doc}'
;'''.format(sch='taxonomie',tab='bdc_statut_text',cd_doc=cd_doc)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def get_area_id(con,area):
sql = '''
SELECT id_area FROM {sch}.{tab} WHERE area_name = '{area}'
;'''.format(sch='ref_geo',tab='l_areas',area=area)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def get_values_id(con,col,terme):
if isinstance(terme,int|str):
terme = [terme]
else:
terme = list(terme)
sql = '''
SELECT id_value FROM {sch}.{tab} WHERE {col} IN {terme} AND label_statut <> 'Disparue au niveau régional'
;'''.format(sch='taxonomie',tab='bdc_statut_values',col=col,terme=tuple(terme)).replace(',)',')')
with con.begin() as cnx:
return cnx.execute(sql).all()
def insert_status_cor_text_area(con,id_doc):
id_text = get_text_id(con,id_doc)
id_area = get_area_id(con,'Isère')
check_sql = '''
SELECT count(id_text) FROM {sch}.{tab} WHERE (id_text,id_area) = ({id_text},{id_area})
;'''.format(sch='taxonomie',tab='bdc_statut_cor_text_area',id_text=id_text,id_area=id_area)
with con.begin() as cnx:
if cnx.execute(check_sql).one()[0] == 0:
sql = ('''
INSERT INTO {sch}.{tab} (id_text,id_area) VALUES ({id_text},{id_area})
;'''.format(sch='taxonomie',tab='bdc_statut_cor_text_area',id_text=id_text,id_area=id_area))
cnx.execute(sql)
def insert_status_cor_text_values(con,id_doc,values):
insert_status_values(con)
id_text = get_text_id(con,id_doc)
id_vals = [x[0] for x in get_values_id(con,'code_statut',values)]
zip_vals = tuple(zip(id_vals,[id_text]*len(id_vals)))
cor_vals = [x for x in zip_vals if test_cor_values(con,x)==0]
sql = ('''
INSERT INTO {sch}.{tab} (id_value,id_text) VALUES {values}
;'''
.format(sch='taxonomie',tab='bdc_statut_cor_text_values',values=cor_vals)
.replace(r'[','')
.replace(r']',''))
with con_gn.begin() as cnx:
cnx.execute(sql)
def get_id_status_cor_text_values(con,id_doc,values):
id_text = get_text_id(con,id_doc)
id_vals = [x[0] for x in get_values_id(con,'code_statut',values)]
zip_vals = tuple(zip(id_vals,[id_text]*len(id_vals)))
cor_vals = tuple(x for x in zip_vals if test_cor_values(con,x)>0)
sql = ('''
SELECT id_value_text FROM {sch}.{tab} WHERE (id_value,id_text) IN {cor_vals}
;'''
.format(sch='taxonomie',tab='bdc_statut_cor_text_values',cor_vals=cor_vals)
.replace('),)','))'))
with con.begin() as cnx:
return cnx.execute(sql).all()
def test_status_text(con,col,cd_doc):
sql = '''
SELECT count({col}) FROM {sch}.{tab} WHERE {col} = '{cd_doc}'
;'''.format(sch='taxonomie',tab='bdc_statut_text',col=col,cd_doc=cd_doc)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def insert_statut_text(con,cd_doc,doc):
if test_data(con,'bdc_statut_text','cd_doc',cd_doc['id_doc']) > 0:
# if test_status_text(con,'cd_doc',cd_doc) > 0:
print('ALERTE TEXT STATUS ALREADY EXISTS : ',doc)
else:
sql = '''
INSERT INTO {sch}.{tab} (cd_type_statut,cd_doc,cd_sig,niveau_admin,lb_adm_tr,doc_url,enable) VALUES
('AL',{cd_doc},'INSEED38','Département','Isère','{doc}',TRUE)
;'''.format(sch='taxonomie',tab='bdc_statut_text',cd_doc=cd_doc['id_doc'],doc=doc)
with con_gn.begin() as cnx:
cnx.execute(sql)
insert_status_cor_text_area(con,cd_doc['id_doc'])
insert_status_cor_text_values(con,cd_doc['id_doc'],cd_doc['id_values'])
def get_cd_ref(con,cd_nom):
sql = '''
SELECT cd_ref FROM {sch}.{tab} WHERE cd_nom = '{cd_nom}'
;'''.format(sch='taxonomie',tab='taxref',cd_nom=cd_nom)
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def get_max_idstatuttaxo(con):
sql = '''
SELECT max(id) FROM {sch}.{tab}
;'''.format(sch='taxonomie',tab='bdc_statut_taxons')
with con.begin() as cnx:
return cnx.execute(sql).one()[0]
def insert_status_taxo(con,cd_nom,cd_doc,status):
id_statut_cor = get_id_status_cor_text_values(con,cd_doc,status)[0][0]
cd_ref = get_cd_ref(con,cd_nom)
id_statut_taxo = get_max_idstatuttaxo(con) + 1
sql_check = '''
SELECT count(id) FROM {sch}.{tab}
WHERE id_value_text = '{id_statut_cor}' AND cd_nom = '{cd_nom}' AND cd_ref = '{cd_ref}'
;'''.format(sch='taxonomie',tab='bdc_statut_taxons',cd_ref=cd_ref,cd_nom=cd_nom,id_statut_cor=id_statut_cor)
with con.begin() as cnx:
check = cnx.execute(sql_check).one()[0]
if check == 0:
sql = '''
INSERT INTO {sch}.{tab} (id,id_value_text,cd_nom,cd_ref) VALUES
('{id_statut_taxo}','{id_statut_cor}','{cd_nom}','{cd_ref}')
;'''.format(sch='taxonomie',tab='bdc_statut_taxons',id_statut_taxo=id_statut_taxo,cd_ref=cd_ref,cd_nom=cd_nom,id_statut_cor=id_statut_cor)
with con_gn.begin() as cnx:
cnx.execute(sql)
def get_status_type(con,col,status):
sql = '''
SELECT * FROM {sch}.{tab} WHERE {col} = '{status}'
;'''.format(sch='taxonomie',tab='bdc_statut_type',col=col,status=status)
return pd.read_sql(sql,con)
def get_taxonomie(con,cd_nom):
if isinstance(cd_nom,int):
cd_nom = [cd_nom]
else:
cd_nom = list(cd_nom)
sql = '''
SELECT cd_nom,cd_ref,cd_sup,lb_nom,lb_auteur,nom_complet_html,nom_valide,regne,phylum,classe,ordre,famille,group1_inpn,group2_inpn FROM {sch}.{tab} WHERE cd_nom IN {cd_nom}
;'''.format(sch='taxonomie',tab='taxref',cd_nom=tuple(cd_nom)).replace(",)",")")
return pd.read_sql(sql,con)
if __name__ == "__main__":
import pandas as pd
# Définition de la connection à la bdd GéoNature
from pycen import con_gn
# Le premier feuillet du fichier Excel lu.
# Composition minimale: [CD_NOM,Statut, Source, Source_url]
# ATTENTION:
# - le CD_NOM doit correspondre au CD_NOM de la table taxref
# - Les taxons dont la Source_url est None ou NA seront ignorés
file = '/home/colas/Documents/9_PROJETS/6_GEONATURE/listes_alertes_isère.xlsx'
# Echelle administratif des listes à implémenter
niveau_admin = 'Département'
# Nom du niveau adminsitratif
lb_adm_tr = 'Isère'
# Code SIG du niveau adminnistratif
cd_sig = 'INSEED38'
insert_status_alerte(con_gn)
# Dictionnaire des listes dalerte à intégrer,
# identifiant et codes status respectivement concernés
cd_doc = {
'Statut_de_conservation_des_poissons_et_écrevisses_en_Isère_2015':{'id_doc':999990,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA',]},
'Liste_dalerte_sur_les_orthoptères_menacés_en_Isère_2014':{'id_doc':999991,'id_values':['RE','CR','EN','VU','AS-1','AS-2','AS-3','LC','DD','NA']},
'Statuts_de_conservation_de_la_faune_sauvage_en_isere_2016':{'id_doc':999992,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA','NE',]},
'Liste_rouge_des_Odonates_de_lIsère_2013':{'id_doc':999993,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA',]},
'Liste_rouge_des_lépidoprere_rhopaloceres_et_zygenes_de_lIsère_2015':{'id_doc':999994,'id_values':['RE','CR','EN','VU','NT','LC','DD','NA','NE','EX']},
}
df = (pd.read_excel(file,keep_default_na=False)
.rename(columns={
'Statut':'code_statut',
'CD_NOM':'cd_nom',
'Source':'source',
'Source_url':'doc_url',
}))
df = df.loc[df.doc_url!='']
for d in df.source.unique():
doc_url = df.loc[df.source==d,'doc_url'].unique()[0]
insert_statut_text(con_gn,cd_doc[d],doc_url)
# INSERTION dans la table bdc_statut_taxons
# Boucle sur chaque taxon. Peu prendre quelques minutes.
[
insert_status_taxo(
con = con_gn,
cd_nom = row.cd_nom,
cd_doc = cd_doc[row.source]['id_doc'],
status = row.code_statut)
for row in df.itertuples()
]
st = get_status_type(con_gn,'cd_type_statut','AL')
for c in st:
df[c] = st[c][0]
tax = get_taxonomie(con_gn,df['cd_nom'])
del tax['nom_valide']
del df['source']
del df['nom_français']
del df['nom_latin']
df = df.merge(tax,how='inner',on='cd_nom')
df['cd_sig'] = cd_sig
df['lb_adm_tr'] = lb_adm_tr
df['niveau_admin'] = niveau_admin
df.to_sql('bdc_statut',con_gn,schema='taxonomie',if_exists='append',index=False)

View File

@ -1,289 +0,0 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import requests
import numpy as np
import pandas as pd
import os
def get_status(lst,con):
sql = """
SELECT
t.cd_nom,
t.cd_ref,
t.regne,
t.phylum,
t.classe,
t.ordre,
t.famille,
t.group1_inpn,
t.group2_inpn,
t.group3_inpn,
t.nom_vern,
t.nom_complet,
t.nom_valide,
t.lb_nom,
--s.*
s.rq_statut,
s.code_statut,
s.cd_type_statut,
s.label_statut,
s.niveau_admin,
s.full_citation,
s.doc_url
FROM taxonomie.taxref t
JOIN taxonomie.v_bdc_status s USING (cd_nom)
WHERE t.cd_nom IN {cd_nom}
;""".format(cd_nom = tuple(lst))
return pd.read_sql_query(sql,con)
def get_api_status(api,cd_nom:int):
res = requests.api.get('%s/%i'%(api,cd_nom))
if res.status_code == 200:
return res.json()
else :
raise('Error : %i\tcd_nom : %i'%(res.status_code,cd_nom))
def get_taxon_status(lst,api):
from datetime import datetime as dt
init = dt.now()
st = [get_api_status(api,x) for x in lst] # TOO LONG
print(dt.now()-init)
phylo = {
'cd_ref':[x['cd_ref'] for x in st],
'nom_valide':[x['nom_valide'] if 'nom_valide' in x.keys() else None for x in st],
'nom_vernac':[x['nom_vern'] if 'nom_vern' in x.keys() else None for x in st],
'regne':[x['regne'] if 'regne' in x.keys() else None for x in st],
'group1_inp':[x['group1_inpn'] if 'group1_inpn' in x.keys() else None for x in st],
'group2_inp':[x['group2_inp'] if 'group2_inp' in x.keys() else None for x in st],
'group3_inpn':[x['group3_inpn'] for x in st],
'classe':[x['classe'] if 'classe' in x.keys() else None for x in st],
'ordre':[x['ordre'] if 'ordre' in x.keys() else None for x in st],
'famille':[x['famille'] if 'famille' in x.keys() else None for x in st]}
cd_status = {
'AL':[
[val['values'][v]['code_statut']
for val in x['status']['AL']['text'].values() for v in val['values'] ]
if 'AL' in x['status'].keys() else None
for x in st
],
'BERN':[
[val['values'][v]['code_statut']
for val in x['status']['BERN']['text'].values() for v in val['values'] ]
if 'BERN' in x['status'].keys() else None
for x in st
],
'BONN':[
[val['values'][v]['code_statut']
for val in x['status']['BONN']['text'].values() for v in val['values'] ]
if 'BONN' in x['status'].keys() else None
for x in st
],
'DH':[
[val['values'][v]['code_statut']
for val in x['status']['DH']['text'].values() for v in val['values'] ]
if 'DH' in x['status'].keys() else None
for x in st
],
'DO':[
[val['values'][v]['code_statut']
for val in x['status']['DO']['text'].values() for v in val['values'] ]
if 'DO' in x['status'].keys() else None
for x in st
],
'LRE':[
[val['values'][v]['code_statut']
for val in x['status']['LRE']['text'].values() for v in val['values'] ]
if 'LRE' in x['status'].keys() else None
for x in st
],
'LRM':[
[val['values'][v]['code_statut']
for val in x['status']['LRM']['text'].values() for v in val['values'] ]
if 'LRM' in x['status'].keys() else None
for x in st
],
'LRN':[
[val['values'][v]['code_statut']
for val in x['status']['LRN']['text'].values() for v in val['values'] ]
if 'LRN' in x['status'].keys() else None
for x in st
],
'LRR':[
[val['values'][v]['code_statut']
for val in x['status']['LRR']['text'].values() for v in val['values'] ]
if 'LRR' in x['status'].keys() else None
for x in st
],
'PAPNAT':[
[val['values'][v]['code_statut']
for val in x['status']['PAPNAT']['text'].values() for v in val['values'] ]
if 'PAPNAT' in x['status'].keys() else None
for x in st
],
'PD':[
[val['values'][v]['code_statut']
for val in x['status']['PD']['text'].values() for v in val['values'] ]
if 'PD' in x['status'].keys() else None
for x in st
],
'PNA':[
[val['values'][v]['code_statut']
for val in x['status']['PNA']['text'].values() for v in val['values'] ]
if 'PNA' in x['status'].keys() else None
for x in st
],
'PR':[
[val['values'][v]['code_statut']
for val in x['status']['PR']['text'].values() for v in val['values'] ]
if 'PR' in x['status'].keys() else None
for x in st
],
'REGL':[
[val['values'][v]['code_statut']
for val in x['status']['REGL']['text'].values() for v in val['values'] ]
if 'REGL' in x['status'].keys() else None
for x in st
],
'REGLII':[
[val['values'][v]['code_statut']
for val in x['status']['REGLII']['text'].values() for v in val['values'] ]
if 'REGLII' in x['status'].keys() else None
for x in st
],
'REGLLUTTE':[
[val['values'][v]['code_statut']
for val in x['status']['REGLLUTTE']['text'].values() for v in val['values'] ]
if 'REGLLUTTE' in x['status'].keys() else None
for x in st
],
'REGLSO':[
[val['values'][v]['code_statut']
for val in x['status']['REGLSO']['text'].values() for v in val['values'] ]
if 'REGLSO' in x['status'].keys() else None
for x in st
],
'SCAP NAT':[
[val['values'][v]['code_statut']
for val in x['status']['SCAP NAT']['text'].values() for v in val['values'] ]
if 'SCAP NAT' in x['status'].keys() else None
for x in st
],
'SCAP REG':[
[val['values'][v]['code_statut']
for val in x['status']['SCAP REG']['text'].values() for v in val['values'] ]
if 'SCAP REG' in x['status'].keys() else None
for x in st
],
'SENSNAT':[
[val['values'][v]['code_statut']
for val in x['status']['SENSNAT']['text'].values() for v in val['values'] ]
if 'SENSNAT' in x['status'].keys() else None
for x in st
],
'ZDET':[
[val['values'][v]['code_statut']
for val in x['status']['ZDET']['text'].values() for v in val['values'] ]
if 'ZDET' in x['status'].keys() else None
for x in st
],
'exPNA':[
[val['values'][v]['code_statut']
for val in x['status']['exPNA']['text'].values() for v in val['values'] ]
if 'exPNA' in x['status'].keys() else None
for x in st
]
}
return pd.DataFrame({**phylo,**cd_status})
dict_dep = {
'38':'Isère',
'42':'Loire',
'07':'Ardèche',
'26':'Drôme',
}
if __name__ == "__main__":
# Définition de la connection à la bdd GéoNature
from pycen import con_gn
# NOT USE FOR NOW - API Taxref
api_taxref = 'https://geonature.cen-isere.fr/taxhub/api/taxref'
# Paramètres de chargement du fichier des taxons
PATH = '/home/colas/Documents/tmp/CHARVAS'
file = 'liste_sp_CHAR.xlsx'
sheet = 'liste_sp'
# Liste des CD_NOM en entrée
cd_col = 'cd_ref' # Nom de la colonne à utiliser dans le feuillet ``sheet``
# Lecture des données
taxlist = pd.read_excel(os.path.join(PATH,file),sheet,usecols=[cd_col],header=0)
tab_sp = pd.read_excel(os.path.join(PATH,file),sheet,index_col=cd_col)
lst = taxlist[cd_col]
# Récupération des statuts
df = get_status(taxlist[cd_col].astype(str),con_gn)
# Distinction LRR [old vs new] région
is_lrr = df.cd_type_statut == 'LRR'
df.loc[is_lrr & (df.niveau_admin == 'Région'),'cd_type_statut'] = 'LRR_AURA'
df.loc[is_lrr & (df.niveau_admin == 'Ancienne région'),'cd_type_statut'] = 'LRR_RA'
del df['niveau_admin']
for c in ['cd_ref','cd_nom','lb_nom']:
if c in tab_sp.columns:
# if 'cd_nom' not in df.columns and c == 'cd_ref': continue
tab_sp.drop(c,axis=1,inplace=True)
pivot = pd.pivot_table(
df,
values='code_statut',
index=['cd_nom', 'cd_ref','lb_nom'#,'niveau_admin','lb_adm_tr'
],
columns=['cd_type_statut'],
aggfunc=list,fill_value=None)
for c in pivot.columns:
pivot[c] = [x[0] if x is not np.NaN and len(x)==1 else x for x in pivot[c]]
if 'DH' in pivot.columns:
pivot['DH'] = [','.join(x) if (x is not np.NaN) and (len(x)==2) else x for x in pivot['DH']]
pivot.DH.replace({'CDH':''},regex=True,inplace=True)
pivot = tab_sp.merge(pivot,on=[cd_col],how='left')
pivlib = pd.pivot_table(
df,
values='label_statut',
index=[
'cd_nom', 'cd_ref','lb_nom'#,'niveau_admin','lb_adm_tr'
],
columns=['cd_type_statut'],
aggfunc=list,fill_value=None)
for c in pivlib.columns:
pivlib[c] = [x[0] if x is not np.NaN and len(x)==1 else x for x in pivlib[c]]
if 'DH' in pivot.columns:
pivlib['DH'] = [','.join(x) if (x is not np.NaN) and (len(x)==2) else x for x in pivlib['DH']]
pivlib.DH.replace({'CDH':''},regex=True,inplace=True)
pivlib = tab_sp.merge(pivlib,on=[cd_col],how='left')
print('INIT writer')
NAME_OUT = os.path.join(PATH,sheet+'_status.xlsx')
with pd.ExcelWriter(NAME_OUT) as writer:
df.to_excel(
writer,sheet_name='v_bdc_status',index=False
)
# writer.save()
print('v_bdc_status OK !')
pivot.to_excel(
writer,sheet_name='pivot_table'
)
# writer.save()
print('pivot_table OK !')
pivlib.to_excel(
writer,sheet_name='pivot_libel'
)
# writer.save()
print('pivot_libel OK !')