forked from CEN38/plugin_gn_tools
470 lines
16 KiB
Python
470 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: UTF-8 -*-
|
|
|
|
# import requests
|
|
# import numpy as np
|
|
# import pandas as pd
|
|
# import os
|
|
# Standard
|
|
import json
|
|
|
|
from qgis.core import QgsGeometry, QgsProject, QgsJsonUtils, QgsVectorLayer
|
|
# PyQt
|
|
from qgis.PyQt.QtCore import (
|
|
QUrl,
|
|
pyqtSignal,
|
|
QJsonDocument,
|
|
Qt,
|
|
# QTextCodec
|
|
)
|
|
from qgis.PyQt.QtNetwork import QNetworkReply, QNetworkRequest
|
|
from qgis.PyQt.QtWidgets import (
|
|
QAction,
|
|
QMessageBox,
|
|
QDialog,
|
|
QApplication
|
|
)
|
|
from gn_tools.__about__ import __api_cd_sta__
|
|
|
|
def get_status(lst,con):
|
|
sql = """
|
|
SELECT
|
|
t.cd_nom,
|
|
t.cd_ref,
|
|
t.regne,
|
|
t.phylum,
|
|
t.classe,
|
|
t.ordre,
|
|
t.famille,
|
|
t.group1_inpn,
|
|
t.group2_inpn,
|
|
t.group3_inpn,
|
|
t.nom_vern,
|
|
t.nom_complet,
|
|
t.nom_valide,
|
|
t.lb_nom,
|
|
--s.*
|
|
s.cd_sig,
|
|
s.rq_statut,
|
|
s.code_statut,
|
|
s.cd_type_statut,
|
|
s.label_statut,
|
|
s.niveau_admin,
|
|
s.full_citation,
|
|
s.doc_url
|
|
FROM taxonomie.taxref t
|
|
JOIN taxonomie.v_bdc_status s USING (cd_nom)
|
|
WHERE t.cd_nom IN {cd_nom}
|
|
;""".format(cd_nom = tuple(lst))
|
|
return pd.read_sql_query(sql,con)
|
|
|
|
def get_type_status(con):
|
|
sql = """
|
|
SELECT * FROM taxonomie.bdc_statut_type
|
|
;"""
|
|
return pd.read_sql_query(sql,con)
|
|
|
|
def get_api_status(api,cd_nom:int):
|
|
res = requests.api.get('%s/%i'%(api,cd_nom))
|
|
if res.status_code == 200:
|
|
return res.json()
|
|
else :
|
|
raise('Error : %i\tcd_nom : %i'%(res.status_code,cd_nom))
|
|
|
|
def get_taxon_status(lst,api):
|
|
from datetime import datetime as dt
|
|
init = dt.now()
|
|
st = [get_api_status(api,x) for x in lst] # TOO LONG
|
|
print(dt.now()-init)
|
|
phylo = {
|
|
'cd_ref':[x['cd_ref'] for x in st],
|
|
'nom_valide':[x['nom_valide'] if 'nom_valide' in x.keys() else None for x in st],
|
|
'nom_vernac':[x['nom_vern'] if 'nom_vern' in x.keys() else None for x in st],
|
|
'regne':[x['regne'] if 'regne' in x.keys() else None for x in st],
|
|
'group1_inp':[x['group1_inpn'] if 'group1_inpn' in x.keys() else None for x in st],
|
|
'group2_inp':[x['group2_inp'] if 'group2_inp' in x.keys() else None for x in st],
|
|
'group3_inpn':[x['group3_inpn'] for x in st],
|
|
'classe':[x['classe'] if 'classe' in x.keys() else None for x in st],
|
|
'ordre':[x['ordre'] if 'ordre' in x.keys() else None for x in st],
|
|
'famille':[x['famille'] if 'famille' in x.keys() else None for x in st]}
|
|
cd_status = {
|
|
'AL':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['AL']['text'].values() for v in val['values'] ]
|
|
if 'AL' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'BERN':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['BERN']['text'].values() for v in val['values'] ]
|
|
if 'BERN' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'BONN':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['BONN']['text'].values() for v in val['values'] ]
|
|
if 'BONN' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'DH':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['DH']['text'].values() for v in val['values'] ]
|
|
if 'DH' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'DO':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['DO']['text'].values() for v in val['values'] ]
|
|
if 'DO' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'LRE':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['LRE']['text'].values() for v in val['values'] ]
|
|
if 'LRE' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'LRM':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['LRM']['text'].values() for v in val['values'] ]
|
|
if 'LRM' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'LRN':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['LRN']['text'].values() for v in val['values'] ]
|
|
if 'LRN' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'LRR':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['LRR']['text'].values() for v in val['values'] ]
|
|
if 'LRR' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'PAPNAT':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['PAPNAT']['text'].values() for v in val['values'] ]
|
|
if 'PAPNAT' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'PD':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['PD']['text'].values() for v in val['values'] ]
|
|
if 'PD' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'PNA':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['PNA']['text'].values() for v in val['values'] ]
|
|
if 'PNA' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'PR':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['PR']['text'].values() for v in val['values'] ]
|
|
if 'PR' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'REGL':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['REGL']['text'].values() for v in val['values'] ]
|
|
if 'REGL' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'REGLII':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['REGLII']['text'].values() for v in val['values'] ]
|
|
if 'REGLII' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'REGLLUTTE':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['REGLLUTTE']['text'].values() for v in val['values'] ]
|
|
if 'REGLLUTTE' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'REGLSO':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['REGLSO']['text'].values() for v in val['values'] ]
|
|
if 'REGLSO' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'SCAP NAT':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['SCAP NAT']['text'].values() for v in val['values'] ]
|
|
if 'SCAP NAT' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'SCAP REG':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['SCAP REG']['text'].values() for v in val['values'] ]
|
|
if 'SCAP REG' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'SENSNAT':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['SENSNAT']['text'].values() for v in val['values'] ]
|
|
if 'SENSNAT' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'ZDET':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['ZDET']['text'].values() for v in val['values'] ]
|
|
if 'ZDET' in x['status'].keys() else None
|
|
for x in st
|
|
],
|
|
'exPNA':[
|
|
[val['values'][v]['code_statut']
|
|
for val in x['status']['exPNA']['text'].values() for v in val['values'] ]
|
|
if 'exPNA' in x['status'].keys() else None
|
|
for x in st
|
|
]
|
|
}
|
|
return pd.DataFrame({**phylo,**cd_status})
|
|
|
|
|
|
def filter_bio_geo(df,zone_bio):
|
|
idNotBioGeo = []
|
|
# Filtre du dommaine biogeographique sur la plaine Rhodanienne et Alpine
|
|
test_rhod = df.rq_statut.str.contains('rhodanienne : Non déterminante')
|
|
test_alpi = df.rq_statut.str.contains('Alpine : Non déterminante')
|
|
|
|
if zone_bio == 'rhod':
|
|
idNotBioGeo = df[test_rhod].index
|
|
elif zone_bio == 'alpi':
|
|
idNotBioGeo = df[test_alpi].index
|
|
elif zone_bio == 'all':
|
|
idNotBioGeo = df[test_rhod&test_alpi].index
|
|
|
|
if not idNotBioGeo.empty:
|
|
df.drop(idNotBioGeo, inplace=True)
|
|
|
|
return df
|
|
|
|
|
|
def form_territoire(df,terr):
|
|
is_dep = df.cd_sig.str.contains('INSEED')
|
|
is_38 = df.cd_sig=='INSEED38'
|
|
|
|
if terr == 'isere':
|
|
filter_id = df[is_dep&(~is_38)].index
|
|
df.drop(filter_id,inplace=True)
|
|
elif terr == 'platiere':
|
|
lst_stat = df[is_dep&(~is_38)].cd_type_statut.unique()
|
|
# is_lst = df.cd_type_statut.isin(lst_stat)
|
|
df.loc[is_dep&(~is_38),'cd_type_statut'] = ( df[is_dep&(~is_38)]
|
|
.cd_type_statut + '_' + df[is_dep&(~is_38)].cd_sig.str.strip('INSEED')
|
|
)
|
|
df.loc[is_38,'cd_type_statut'] = ( df[is_38]
|
|
.cd_type_statut + '_38'
|
|
)
|
|
|
|
return df
|
|
|
|
|
|
dict_dep = {
|
|
'38':'Isère',
|
|
'42':'Loire',
|
|
'07':'Ardèche',
|
|
'26':'Drôme',
|
|
}
|
|
|
|
|
|
class PivotStatus(QDialog):
|
|
finished_dl = pyqtSignal()
|
|
"""
|
|
Create a filter, specific to GeoNature ZH API and fetch all the features corresponding.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
parent=None,
|
|
network_manager=None,
|
|
layer=None,
|
|
cd_nom=None
|
|
):
|
|
super().__init__()
|
|
print(parent.dlg.tab)
|
|
self.dlg = parent.dlg
|
|
self.src_name = parent.src_name
|
|
self.domain_url = parent.domain_url
|
|
self.network_manager = network_manager
|
|
self.layer = layer
|
|
self.cd_nom = cd_nom
|
|
self.url_api = self.domain_url + __api_cd_sta__
|
|
|
|
# QApplication.setOverrideCursor(Qt.CursorShape(3))
|
|
self.multi_dep = True
|
|
|
|
self.cookie = (self.network_manager.cookieJar()
|
|
.cookiesForUrl(QUrl(self.domain_url))[0]
|
|
)
|
|
self._pending_downloads = 0
|
|
self.import_status()
|
|
|
|
|
|
@property
|
|
def pending_downloads(self):
|
|
return self._pending_downloads
|
|
|
|
|
|
def import_status(self):
|
|
idx = self.layer.fields().indexOf(self.cd_nom)
|
|
unique_cd = self.layer.uniqueValues(idx)
|
|
print(len(unique_cd))
|
|
self.res = {}
|
|
self.get_status(unique_cd)
|
|
|
|
|
|
def get_status(self,cd_nom):
|
|
self._pending_downloads += 1
|
|
url = QUrl(
|
|
self.url_api
|
|
+ '?fields=status&cd_nom='
|
|
+ ','.join(map(str,cd_nom))
|
|
)
|
|
request = QNetworkRequest(url)
|
|
request.setHeader(QNetworkRequest.KnownHeaders(0), "application/json")
|
|
request.setHeader(QNetworkRequest.KnownHeaders(4), self.cookie)
|
|
reply = self.network_manager.get(request)
|
|
reply.finished.connect(lambda: self.import_finished(reply))
|
|
|
|
|
|
def import_finished(self, reply):
|
|
self._pending_downloads -= 1
|
|
print(reply.attribute(QNetworkRequest.Attribute(0)))
|
|
|
|
if reply.attribute(QNetworkRequest.Attribute(0)) != 200:
|
|
print(f"code: {reply.error()} message: {reply.errorString()}")
|
|
else:
|
|
data_request = reply.readAll().data().decode()
|
|
data = json.loads(data_request)
|
|
|
|
self.layer_type = self.layer.wkbType().name
|
|
self.layer_crs = self.layer.crs()
|
|
|
|
bdc = self.v_bdc_status(data)
|
|
bdc_cd = self._v_bdc_(data,'code')
|
|
bdc_lab = self._v_bdc_(data,'label')
|
|
|
|
self.add_new_layer(bdc)
|
|
self.add_new_layer(bdc_cd)
|
|
self.add_new_layer(bdc_lab)
|
|
|
|
# remak_data = {
|
|
# "features" :[
|
|
# {
|
|
# "geometry":
|
|
# json.loads(
|
|
# QgsGeometry.unaryUnion([
|
|
# g.geometry() for g in self.layer.getFeatures()
|
|
# if g.attribute(name_cd_nom) == i['cd_ref']
|
|
# ]).asJson()
|
|
# ),
|
|
# "properties" : i ,
|
|
# "type": "Feature"
|
|
# }
|
|
# for i in data['items']
|
|
# ],
|
|
# "type": "FeatureCollection",
|
|
# "geom_type": "Multi"+layer_type
|
|
# }
|
|
# fcString = json.dumps(remak_data)
|
|
# codec = QTextCodec.codecForName("UTF-8")
|
|
# fields = QgsJsonUtils.stringToFields(fcString, codec)
|
|
# feats = QgsJsonUtils.stringToFeatureList(fcString, fields, codec)
|
|
|
|
# vl = QgsVectorLayer(remak_data['geom_type'], lay_status_name, "memory")
|
|
# if vl.sourceCrs() != layer_crs:
|
|
# print('Changed crs !')
|
|
# # crs = vl.crs()
|
|
# # crs.createFromId(self.api['geometry_srid'])
|
|
# vl.setCrs(layer_crs)
|
|
|
|
# dp = vl.dataProvider()
|
|
# dp.addAttributes(fields)
|
|
# vl.updateFields()
|
|
# dp.addFeatures(feats)
|
|
# vl.updateExtents()
|
|
|
|
if self.pending_downloads == 0:
|
|
self.finished_dl.emit()
|
|
QApplication.restoreOverrideCursor()
|
|
|
|
|
|
def v_bdc_status(self,data):
|
|
return {
|
|
"features" :[
|
|
{
|
|
"properties" : {**{
|
|
ii:i[ii]
|
|
for ii in i if ii != 'status'
|
|
},
|
|
**ii
|
|
},
|
|
"type": "Feature"
|
|
}
|
|
for i in data['items']
|
|
for ii in i['status']
|
|
],
|
|
"type": "FeatureCollection",
|
|
"geom_type": "No geometry",
|
|
"layer_name": 'Status - ' + self.layer.name()
|
|
}
|
|
|
|
def _v_bdc_(self,data,type_status):
|
|
dtype = 'code_statut' if type_status=='code' else 'label_statut'
|
|
return {
|
|
"features" :[
|
|
{
|
|
"geometry": json.loads(
|
|
QgsGeometry.unaryUnion([
|
|
g.geometry() for g in self.layer.getFeatures()
|
|
if g.attribute(self.cd_nom) == i['cd_ref']
|
|
]).asJson()
|
|
),
|
|
"properties" : {
|
|
**{
|
|
ii:i[ii]
|
|
for ii in i if ii != 'status'
|
|
}, **{
|
|
ii['cd_type_statut'] + '_' + ii['cd_sig'].strip('INSEED')
|
|
if self.multi_dep and 'INSEED' in ii['cd_sig']
|
|
else ii['cd_type_statut'] : ii[dtype]
|
|
for ii in i['status']
|
|
}
|
|
},
|
|
"type": "Feature"
|
|
}
|
|
for i in data['items']
|
|
],
|
|
"type": "FeatureCollection",
|
|
"geom_type": self.layer_type if "Multi" in self.layer_type else "Multi" + self.layer_type,
|
|
"layer_name": 'Status %s - '%type_status + self.layer.name()
|
|
}
|
|
|
|
def add_new_layer(self,data):
|
|
geom_type = data['geom_type']
|
|
layername = data['layer_name']
|
|
|
|
fcString = json.dumps(data)
|
|
# codec = QTextCodec.codecForName("UTF-8")
|
|
fields = QgsJsonUtils.stringToFields(fcString)
|
|
feats = QgsJsonUtils.stringToFeatureList(fcString, fields)
|
|
|
|
vl = QgsVectorLayer(geom_type, layername, "memory")
|
|
if vl.sourceCrs() != self.layer_crs:
|
|
print('Changed crs !')
|
|
# crs = vl.crs()
|
|
# crs.createFromId(self.api['geometry_srid'])
|
|
vl.setCrs(self.layer_crs)
|
|
|
|
dp = vl.dataProvider()
|
|
dp.addAttributes(fields)
|
|
vl.updateFields()
|
|
dp.addFeatures(feats)
|
|
vl.updateExtents()
|
|
|
|
QgsProject.instance().addMapLayer(vl)
|