diff --git a/5_GEONATURE/GN_ZH/ins_zh_from_api.py b/5_GEONATURE/GN_ZH/ins_zh_from_api.py new file mode 100644 index 0000000..24965d2 --- /dev/null +++ b/5_GEONATURE/GN_ZH/ins_zh_from_api.py @@ -0,0 +1,431 @@ +import geopandas as gpd +from uuid import uuid4 +import json,urllib.request +from numpy import ndarray + +dict_columns = { + 'code_zh': 'code', + 'nom_zh': 'main_name', + 'date_visite':'create_date', +} + + +def get_nomenclature_id(con,cd,typ): + """ + Get the id_nomenclature for a given cd and typ. + + Parameters + ---------- + con : sqlalchemy.engine.Engine + The database connection. + cd : str + The code of the nomenclature. + typ : str + The type of the nomenclature. + + Returns + ------- + int or None + The id of the nomenclature or None if the nomenclature does not exist. + """ + sql = """ + SELECT ref_nomenclatures.get_id_nomenclature('{typ}', '{cd}') + """.format(cd=cd,typ=typ) + with con.begin() as cnx: + res = cnx.execute(sql) + + return res.one()[0] if res else None + +def to_tzh(df,con): + tab = 't_zh' + sch = 'pr_zh' + lst_columns = [ + x['name'] for x in con.dialect.get_columns(con,tab,sch) if x['name'] in df.columns + ] + + to_ins = df[lst_columns].copy() + try: + to_ins.to_postgis( + name=tab, + con=con, + schema=sch, + index=False, + if_exists='append', + method='multi' + ) + except Exception as e: + print(e) + finally: + print("INSERT data TO t_zh OK !") + + return gpd.pd.read_sql('SELECT id_zh,code FROM pr_zh.t_zh',con=con) + # id_zh = df[['code','pk']].rename(columns={'pk':'id_zh'}) + +def to_cor_lim_list(cor_lim,con): + """ + Insert into cor_lim_list the list of delimitation for each zh area + :param cor_lim: a dataframe with columns id_lim_list,cd_nomenclature_delimitation + :param con: a sqlalchemy connection + :return: None + """ + _cor_lim = (cor_lim + .set_index('id_lim_list') + .cd_nomenclature_delimitation.str.split(',',expand=True) + .stack() + .str.strip() + .to_frame('cd_lim') + .droplevel(-1) + ) + _cor_lim['id_lim'] = [get_nomenclature_id(con,x,'CRIT_DELIM') for x in _cor_lim.cd_lim] + _cor_lim.drop('cd_lim',axis=1,inplace=True) + try: + _cor_lim.to_sql( + name='cor_lim_list', + con=con, + schema='pr_zh', + index=True, + if_exists='append', + method='multi' + ) + except Exception as e: + print(e) + finally: + print("INSERT Délimitation TO cor_lim_list OK !") + +def to_cor_zh_cb(id_zh,df,con): + """ + Insert habitats to cor_zh_cb table. + + Parameters + ---------- + id_zh : pd.DataFrame + DataFrame containing 'code' and 'id_zh' columns. + df : pd.DataFrame + DataFrame containing 'code' and 'habitat_corine_biotope' columns. + con : sqlalchemy.engine.Engine + Connection to the geonature database. + + Returns + ------- + None + """ + cor_zh_cb = (df + .merge(id_zh,how='left',on='code') + .set_index('id_zh') + .habitat_corine_biotope.str.split(',') + .explode() + .str.strip() + .to_frame('lb_code') + .droplevel(-1) + ) + try: + cor_zh_cb.to_sql( + name='cor_zh_cb', + con=con, + schema='pr_zh', + index=True, + if_exists='append', + method='multi' + ) + except Exception as e: + print(e) + finally: + print("INSERT habitats to cor_zh_cb OK !") + +def check_cd_nomenclature_impact(cd): + dizaine = range(0,100,10) + _cd = [x+'.0' if float(x) not in dizaine and not x.endswith('.0') else x for x in cd] + return _cd + +def to_t_activity(id_zh,actv,con): + _activ = (actv + .merge(id_zh,how='left',on='code') + .drop('code',axis=1)) + + t_activ = gpd.pd.DataFrame() + for i,x in _activ.iterrows(): + res = gpd.pd.DataFrame(json.loads(x['acti_impact'])) + res['id_zh'] = x['id_zh'] + t_activ = gpd.pd.concat([t_activ,res]) + t_activ.set_index('id_zh',inplace=True) + t_activ.dropna(inplace=True,axis=1,how='all') + t_activ.dropna(inplace=True,axis=0,how='all') + + t_activ['id_activity'] = [get_nomenclature_id(con,x,'ACTIV_HUM') for x in t_activ.cd_activite_humaine] + t_activ['id_position'] = [get_nomenclature_id(con,x,'LOCALISATION') for x in t_activ.localisation] + t_activ['id_impact_list'] = [uuid4() for _ in range(len(t_activ))] + + impact_list = (t_activ[['id_impact_list','impact']] + .set_index('id_impact_list') + .impact.explode() + .to_frame('cd_impact') + .reset_index() + ) + impact_list['_cd_impact'] = check_cd_nomenclature_impact(impact_list.cd_impact) + impact_list['id_impact'] = [get_nomenclature_id(con,x,'IMPACTS') for x in impact_list._cd_impact] + + t_activ.drop(['cd_activite_humaine','localisation','impact'],axis=1,inplace=True) + impact_list.drop(['cd_impact','_cd_impact'],axis=1,inplace=True) + + try: + t_activ.to_sql( + name='t_activity', + con=con, + schema='pr_zh', + index=True, + if_exists='append', + method='multi' + ) + except Exception as e: + print(e) + finally: + print("INSERT activity to t_activity OK !") + + try: + impact_list.to_sql( + name='cor_impact_list', + con=con, + schema='pr_zh', + index=True, + if_exists='append', + method='multi' + ) + except Exception as e: + print(e) + finally: + print("INSERT impact to cor_impact_list OK !") + + +def check_habitat(habitat,con): + sql = ''' + SELECT lb_code FROM pr_zh.bib_cb + WHERE lb_code {symbol} {code} + '''.format( + symbol='IN' if isinstance(habitat,(list,gpd.pd.Series,ndarray)) else '=', + code=tuple(habitat) if isinstance(habitat,(list,gpd.pd.Series,ndarray)) else f"'{habitat}'" + ) + with con.begin() as cnx: + res = cnx.execute(sql).all() + return [x[0] for x in res if x] + + +def filter_habitat(habitat,con): + iscd_zh = check_habitat(habitat,con) + _cd_zh = habitat[habitat.isin(iscd_zh)] + _cd_notzh = habitat[~habitat.isin(iscd_zh)] + cd_zh = _cd_zh.groupby(_cd_zh.index).aggregate(', '.join) + cd_notzh = _cd_notzh.groupby(_cd_notzh.index).aggregate(', '.join) + return cd_zh,cd_notzh + + +def check_observ(obs,org,con): + _obs = normalize_observers(obs) + sql = ''' + SELECT + r.id_role AS ids_observers, + CONCAT(r.nom_role, ' ', prenom_role) AS observers + FROM utilisateurs.t_roles r + JOIN utilisateurs.bib_organismes USING (id_organisme) + WHERE CONCAT(UPPER(r.nom_role), ' ', INITCAP(prenom_role)) {symbol} {code} + AND nom_organisme = '{org}' + '''.format( + symbol='IN' if isinstance(_obs,(list,gpd.pd.Series,ndarray)) else '=', + code=tuple(_obs) if isinstance(_obs,(list,gpd.pd.Series,ndarray)) else f"'{_obs}'", + org=org + ) + with con.begin() as cnx: + res = cnx.execute(sql).all() + return [x[0] for x in res if x] + +def insert_observ(obs,org,con): + check_observ(obs,org,con) + pass + +def normalize_observers(obs): + _obs = obs.str.split(' ',expand=True) + _obs[0] = _obs[0].str.upper() + _obs[1] = _obs[1].str.title() + _obs_stack = _obs.stack().droplevel(-1) + return _obs_stack.groupby(_obs_stack.index).aggregate(' '.join) + +def insert_orga(org,con): + pass +def select_orga_user(org,con): + sql =''' + SELECT id_organisme FROM utilisateurs.bib_organismes + WHERE nom_organisme = '{nom}' + '''.format(nom=org[0]) + with con.begin() as cnx: + _res = cnx.execute(sql) + res = _res.one_or_none() + return res + + +def insert_orga_user(org,con): + res = select_orga_user(org,con) + if not res : + Q = input("Organisme `{}` non trouvé dans le schéma `utilisateurs`, voulez-vous l'ajouter ? (y/n) ".format(org[0])) + if Q.lower() != 'y': + return 'No' + sql =''' + INSERT INTO utilisateurs.bib_organismes (nom_organisme) + VALUES ('{nom}') + --ON CONFLICT (nom_organisme) DO NOTHING + RETURNING id_organisme; + '''.format(nom=org[0]) + with con.begin() as cnx: + _res = cnx.execute(sql) + res = _res.one_or_none() + else: + print("Organisme `{}` existant dans le schéma `utilisateurs`".format(org[0])) + return res[0] + +def select_orga_przh(org,con): + sql =''' + SELECT id_org FROM pr_zh.bib_organismes + WHERE name = '{nom}' + '''.format(nom=org[0]) + with con.begin() as cnx: + _res = cnx.execute(sql) + res = _res.one_or_none() + return res + +def insert_orga_przh(org,con): + res = select_orga_przh(org,con) + if not res : + Q = input("Organisme `{}` non trouvé dans le schéma `pr_zh`, voulez-vous l'ajouter ? (y/n) ".format(org[0])) + if Q.lower() != 'y': + return 'No' + sql =''' + INSERT INTO pr_zh.bib_organismes (name,abbrevation,is_op_org) + VALUES ('{nom}', {abbrev}, TRUE) + --ON CONFLICT (name,abbrevation) DO NOTHING + RETURNING id_org; + '''.format(nom=org[0],abbrev=f"'{org[1]}'" if org[1] else 'NULL') + with con.begin() as cnx: + _res = cnx.execute(sql) + res = _res.fetchall() + else: + print("Organisme `{}` existant dans le schéma `pr_zh`".format(org[0])) + return res[0] + +def insert_users_missing(user,org,con): + id_org = insert_orga_przh(org,con) + id_user_orga = insert_orga_user(org,con) + obsv_missing = insert_observ(user,id_user_orga,con) + + +def insert_zh_fromapi(url,con,dep_filter,orga,prefix_hab_rq=''): + """ + Insert data from API into geonature database + + Parameters + ---------- + url : str + url of the API + con : sqlalchemy.engine.Engine + connection to the geonature database + dep_filter : str + filter code_zh by this department code + prefix_hab_rq : str + prefix to add to remark_pres field for habitats not in geonature database + + Returns + ------- + None + """ + api = gpd.read_file(url) + df = (api#[api.code_zh.str.startswith(dep_filter)] + .rename(columns=dict_columns) + .rename_geometry('geom') + .merge(load_missing_propertie(url,'cd_nomenclature_delimitation',dep_filter),on='code') + ) + + insert_users_missing(df.observers,orga,con) + + df['zh_uuid'] = [uuid4() for _ in range(len(df))] + df['id_lim_list'] = [uuid4() for _ in range(len(df))] + df['id_sdage'] = [get_nomenclature_id(con,x,'SDAGE') for x in df.cd_typo_sdage] + + cd_hab = (df + .set_index('code') + .habitat_corine_biotope + .str.split(',').explode() + .str.strip()) + + hab_zh,hab_notzh = filter_habitat(cd_hab,con) + _df = (df + .drop('habitat_corine_biotope',axis=1) + .merge(hab_zh,how='left',right_index=True,left_on='code')) + _df = _df.merge( + prefix_hab_rq + hab_notzh.rename('remark_pres'), + how='left',right_index=True,left_on='code' + ) + + c = _df[_df.action=="Créer"].copy() + u = _df[_df.action=="Modifier"].copy() + + if not c.empty: + id_zh = to_tzh(c,con) + to_cor_zh_cb(id_zh,c,con) + to_cor_lim_list(c[['id_lim_list','cd_nomenclature_delimitation']],con) + to_t_activity(id_zh,c[['code','acti_impact']],con) + if not u.empty: + raise('Script à coder !') + + +def load_missing_propertie(url,propertie,dep_filter='38'): + """ + Load from api the missing properties for the given url, propertie and dep_filter. + + Parameters + ---------- + url : str + url of the api + propertie : str + name of the propertie to retrieve + dep_filter : str + filter for the department, default is '38' + + Returns + ------- + pd.DataFrame + DataFrame with columns 'code' and propertie + """ + + data = urllib.request.urlopen(url).read() + output = json.loads(data) + features = output['items']['features'] + + res = { + 'code':[x['properties']['code_zh'] for x in features if x['properties']['code_zh'].startswith(dep_filter)], + propertie:[ + ','.join(x['properties'][propertie]) + for x in features + if x['properties']['code_zh'].startswith(dep_filter)], + } + return gpd.pd.DataFrame(res) + +if __name__ == "__main__": + + from sqlalchemy import create_engine + from sqlalchemy.engine import URL + # Parametres bdd + user = '' + pwd = "" + adr = '' + base = '' + url = URL.create("postgresql+psycopg2", username=user, password=pwd, host=adr, database=base) + con_gn = create_engine(url) + + from pycen import con_gn + # Numéro de département permettant d'identifier les zones humides concernées par le territoire + # ['38', '05'], default : 38 + dep_filter = '38' + # Préfixe ajouté dans le champs remark_pres lorsque des habitats décrits ne sont pas des habitats dit "humides" + prefix_hab_rmk = 'Autre(s) habitat(s) décrit(s) :\n' + # [Nom de l'organisme, Abbreviation] + organisme = ['Parc national des Écrins','PNE'] + + api = 'https://geonature.ecrins-parcnational.fr/api/exports/api/21?departement=38' + # insert_zh_fromapi(api,con_gn,dep_filter,organisme,prefix_hab_rq=prefix_hab_rmk)