2026-03-19 14:06:08 +01:00

441 lines
14 KiB
Python

# Standard
import json
# PyQt
from qgis.core import (
QgsAuthMethodConfig,
QgsDataSourceUri,QgsVectorLayer,
QgsJsonUtils,
QgsProject,
QgsCoordinateReferenceSystem,
QgsCoordinateTransform
)
from qgis.PyQt.QtCore import (
QByteArray,
QJsonDocument,
QObject,
QUrl,
pyqtSignal,
QTextCodec,
Qt
)
from qgis.PyQt.QtNetwork import QNetworkReply, QNetworkRequest
from qgis.PyQt.QtWidgets import QAction, QMessageBox, QDialog, QApplication
# from PyQt5.QtCore import Qt
# from PyQt5.QtWidgets import QApplication
# from PyQt6.QtCore import Qt
# from PyQt6.QtWidgets import QApplication
# Project
from gn_tools.__about__ import (
__api_sn_web__,
__api_sn_exp__,
__api_lst_ex__,
__exept_export__,
)
# from gn_tools.processing import ImportDataGbif
from .gbif import ImportDataGbif
dict_exp = []
class ParamWidget(QDialog):
finished_dl = pyqtSignal()
"""
Create a filter, specific to GeoNature ZH API and fetch all the features corresponding.
"""
def __init__(
self,
parent=None,
network_manager=None
):
super().__init__()
print(parent.dlg.tab)
self.dlg = parent.dlg
self.src_name = parent.src_name
self.domain_url = parent.domain_url
self.network_manager = network_manager
# self.zhs = None
# self.json_filter = {}
# self.total_zh = 0
self._pending_downloads = 0
if self.src_name == 'GBIF':
self.dlg.source_import.addItems(['Synthèse GBIF'])
self.dlg.source_import.currentTextChanged.connect(
self.print_change
)
else :
QApplication.setOverrideCursor(Qt.WaitCursor)
self.cookie = (network_manager.cookieJar()
.cookiesForUrl(QUrl(self.domain_url))[0]
)
self.send_tab_import()
@property
def pending_downloads(self):
return self._pending_downloads
def send_tab_import(self):
if self.src_name == 'GBIF':
self.dlg.source_import.addItems(['Synthèse GBIF'])
self.dlg.source_import.currentTextChanged.connect(
self.print_change
)
else :
self.dlg.source_import.clear()
request = self.get_api(api=__api_lst_ex__)
reply = self.network_manager.get(request)
reply.finished.connect(lambda: self.import_finished(reply))
# reply_exp
# dict_exp = {i for i in exp_all if exp_all[i].geometry_srid and 'sinp' not in exp_all[i].label.lower}
# lst_exp = [i.label for i in dict_exp]
# # self.dlg.source_import.setText(self.tr("Choose source data"))
# self.dlg.source_import.addItems(lst_exp)
# self.dlg.source_import.currentTextChanged.connect(
# self.print_change
# )
def import_finished(self, reply):
self._pending_downloads -= 1
QApplication.restoreOverrideCursor()
print(reply.attribute(QNetworkRequest.HttpStatusCodeAttribute))
if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) != 200:
print(f"code: {reply.error()} message: {reply.errorString()}")
else:
self.lst_exp = ['Synthèse taxons']
if self.src_name == 'GéoNature':
data_request = reply.readAll().data().decode()
data = json.loads(data_request)
items = data['items']
self.dlg.dict_exp = [i for i in items if i['geometry_srid'] ]
if len(__exept_export__) > 0:
for j in __exept_export__ :
self.dlg.dict_exp = [i for i in self.dlg.dict_exp if j not in i['label'].lower()]
self.lst_exp += [i['label'] for i in self.dlg.dict_exp]
self.dlg.source_import.addItems(self.lst_exp)
self.dlg.source_import.currentTextChanged.connect(
self.print_change
)
if self.pending_downloads == 0:
self.finished_dl.emit()
def print_change(self):
print(self.dlg.source_import.currentIndex())
def get_api(self,api=None,apiFilter=None):
self._pending_downloads += 1
# Function launched two times, one to check zh number the other to dl them
url_api = self.domain_url
if api:
url_api += str(api)
url = QUrl(url_api)
print(url)
# Define request properties (cookie, header)
request = QNetworkRequest(url)
request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
request.setHeader(QNetworkRequest.CookieHeader, self.cookie)
if apiFilter:
doc = QJsonDocument(self.json_filter)
else:
doc = QJsonDocument({})
return request
# data_request = reply.readAll().data().decode()
# print('tom')
# print(data_request)
# return json.loads(data_request)
# print(data)
class LoadingImport(QDialog):
finished_dl = pyqtSignal()
"""
Create a filter, specific to GeoNature ZH API and fetch all the features corresponding.
"""
def __init__(
self,
parent=None,
network_manager=None
):
super().__init__()
print(parent.dlg.tab)
self.dlg = parent.dlg
self.syn_params = parent.syn_params
self.source = self.dlg.source_import.currentText()
self.src_name = parent.src_name
self.domain_url = parent.domain_url
self.network_manager = network_manager
self.cookie = (network_manager.cookieJar()
.cookiesForUrl(QUrl(self.domain_url))[0]
) if self.src_name != 'GBIF' else None
self.rectangle = parent.rectangle
self.syn_params = parent.syn_params
self.project = parent.project
# self.zhs = None
# self.json_filter = {}
# self.total_zh = 0
QApplication.setOverrideCursor(Qt.WaitCursor)
self._pending_downloads = 0
self.run()
@property
def pending_downloads(self):
return self._pending_downloads
def run(self,items=None):
self._pending_downloads += 1
self.api = None
if self.source == 'Synthèse taxons':
url_api = self.domain_url + __api_sn_web__
self.transform_bouds(4326)
url_api += '?limit=800000'
if self.src_name == 'GéoNature':
geo_param = {
"geoIntersection": {
"type": "Feature",
"properties": {},
"geometry": json.loads(self.rectangle.asJson())
}
} if self.rectangle else {}
elif self.src_name == "Biodiv'AuRA":
geo_param = {
"geoIntersection": self.rectangle.asWkt(),
"with_areas":False
} if self.rectangle else {}
json_param = {
"modif_since_validation": False,
**geo_param,
**self.syn_params
}
doc = QJsonDocument(
json_param
)
print(doc.toJson())
elif self.source == 'Synthèse taxons export':
url_api = self.domain_url + __api_sn_exp__
url_api += '?export_format=geojson'
if self.src_name == 'GéoNature':
lst_id_synth = [
i['properties']['id_synthese']
for i in items['features']
]
elif self.src_name == "Biodiv'AuRA":
lst_id_synth = [
j['id']
for i in items['features']
for j in i['properties']['observations']
]
doc = QJsonDocument(lst_id_synth)
elif self.source == 'Synthèse GBIF':
ImportDataGbif(
self.network_manager, self.project, self.rectangle
)
# self.close()
return QApplication.restoreOverrideCursor()
# self.cookie = None
# url_api = self.domain_url
# url_api += '?advanced=false&geometry={polygon}&offset={offset}'.format(
# offset=1, polygon=self.rectangle.asWkt())
elif self.source in [i['label'] for i in self.dlg.dict_exp]:
self.api = [i for i in self.dlg.dict_exp if i['label'] == self.source][0]
url_api = self.domain_url + 'api/exports/api/%i'%self.api['id']
self.transform_bouds(self.api['geometry_srid'])
url_api += '?limit=0'
if self.rectangle :
url_api += "&geometry=" + str(self.rectangle.asWkt())
url = QUrl(url_api)
print(url)
# print('crs rect : ',self.rectangle.sourceCrs())
# Define request properties (cookie, header)
request = QNetworkRequest(url)
request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
if self.cookie:
request.setHeader(QNetworkRequest.CookieHeader, self.cookie)
if 'Synthèse taxons' in self.source :
reply = self.network_manager.post(request,doc.toJson())
else:
reply = self.network_manager.get(request)
reply.finished.connect(lambda: self.loading_finished(reply))
def loading_finished(self,reply):
self._pending_downloads -= 1
print(reply.attribute(QNetworkRequest.HttpStatusCodeAttribute))
if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) != 200:
print(f"code: {reply.error()} message: {reply.errorString()}")
else:
data_request = reply.readAll().data().decode()
data = json.loads(data_request)
items = data['items'] if 'items' in data.keys() else data
if self.source == 'Synthèse taxons':
self.source = 'Synthèse taxons export'
self.run(items)
return
items_point,items_poly,items_line = self.filter_gn_geom(items)
if items_point['features']:
self.addMapLayer(items_point,items_point['geom_type'])
if items_poly['features']:
self.addMapLayer(items_poly,items_poly['geom_type'])
if items_line['features']:
self.addMapLayer(items_line,items_line['geom_type'])
if self.pending_downloads == 0:
self.finished_dl.emit()
QApplication.restoreOverrideCursor()
print("END")
def filter_gn_geom(self,items):
nb_items = len(items['features'])
print('total_count : ',nb_items)
items_poly = {"features":[],"type": "FeatureCollection","geom_type": "MultiPolygon"}
items_line = {"features":[],"type": "FeatureCollection","geom_type": "MultiLine"}
items_point = {
"features":[i for i in items['features'] if 'point' in i['geometry']['type'].lower()],
"type": "FeatureCollection",
"geom_type": "Point"
}
len_data = len(items_point['features'])
if nb_items> len_data:
items_poly = {
"features":[i for i in items['features'] if 'polygon' in i['geometry']['type'].lower()],
"type": "FeatureCollection",
"geom_type": "MultiPolygon"
}
len_data += len(items_poly['features'])
if nb_items> len_data:
items_line = {
"features":[i for i in items['features'] if 'line' in i['geometry']['type'].lower()],
"type": "FeatureCollection",
"geom_type": "MultiLineString"
}
len_data += len(items_line['features'])
return items_point,items_poly,items_line
# def filter_biodiv_geom(self,items):
# items_point = {
# "features":[ {
# 'geometry' : i['geometry'],
# 'properties': j
# }
# for i in items['features'] if 'point' in i['geometry']['type'].lower()
# for j in i['properties']['observations']
# ],
# "type": "FeatureCollection",
# "geom_type": "Point"
# }
# items_poly = {
# "features":[ {
# 'geometry' : i['geometry'],
# 'properties': j
# }
# for i in items['features'] if 'polygon' in i['geometry']['type'].lower()
# for j in i['properties']['observations']
# ],
# "type": "FeatureCollection",
# "geom_type": "MultiPolygon"
# }
# items_line = {
# "features":[ {
# 'geometry' : i['geometry'],
# 'properties': j
# }
# for i in items['features'] if 'line' in i['geometry']['type'].lower()
# for j in i['properties']['observations']
# ],
# "type": "FeatureCollection",
# "geom_type": "MultiLineString"
# }
# return items_point,items_poly,items_line
def addMapLayer(self,data,geom_type):
mon_layer = ' - '.join([self.src_name[:6], self.source])
fcString = json.dumps(data)
codec = QTextCodec.codecForName("UTF-8")
fields = QgsJsonUtils.stringToFields(fcString, codec)
feats = QgsJsonUtils.stringToFeatureList(fcString, fields, codec)
if self.api:
crs_api = QgsCoordinateReferenceSystem(self.api['geometry_srid'])
else :
crs_api = QgsCoordinateReferenceSystem(4326)
vl = QgsVectorLayer(geom_type, mon_layer, "memory")
if vl.sourceCrs() != crs_api:
print('Changed crs !')
# crs = vl.crs()
# crs.createFromId(self.api['geometry_srid'])
vl.setCrs(crs_api)
dp = vl.dataProvider()
dp.addAttributes(fields)
vl.updateFields()
dp.addFeatures(feats)
vl.updateExtents()
if dp.errors():
msg = QMessageBox()
msg.warning(
None,
self.tr("Warning"),
self.tr(
"Houston, we've got a problem ... \n\n %s !"%str(dp.errors())
), # noqa: E501
)
QgsProject.instance().addMapLayer(vl)
def transform_bouds(self,crs):
sourceCrs = QgsCoordinateReferenceSystem(4326)
destCrs = QgsCoordinateReferenceSystem(crs)
if sourceCrs != destCrs:
tr = QgsCoordinateTransform(sourceCrs, destCrs, self.project)
self.rectangle.transform(tr)