2026-01-09 11:16:56 +01:00

454 lines
21 KiB
Python

"""Dock file."""
__copyright__ = 'Copyright 2020, 3Liz'
__license__ = 'GPL version 3'
__email__ = 'info@3liz.org'
import logging
import os
from collections import namedtuple
from enum import Enum
from functools import partial
from pathlib import Path
from xml.dom.minidom import parseString
from qgis.core import (
QgsApplication,
QgsProviderRegistry,
QgsSettings,
)
from qgis.PyQt.QtCore import QLocale, QUrl
from qgis.PyQt.QtGui import QDesktopServices, QIcon
from qgis.PyQt.QtPrintSupport import QPrinter
# from qgis.PyQt.QtWebKitWidgets import QWebPage
from qgis.PyQt.QtWidgets import (
QAction,
QDockWidget,
QFileDialog,
QMenu,
QToolButton,
)
from qgis.utils import iface
import qgis
from .tools.resources import (
load_ui,
resources_path,
)
try:
from .tools.PythonSQL import login_base
except ValueError:
print('Pas de fichier PythonSQL')
DOCK_CLASS = load_ui('CenRa_Metabase_dockwidget_base.ui')
LOGGER = logging.getLogger('CenRa_Metabase')
class Format(namedtuple('Format', ['label', 'ext'])):
""" Format available for exporting metadata. """
pass
class OutputFormats(Format, Enum):
""" Output format for a metadata sheet. """
PDF = Format(label='PDF', ext='pdf')
HTML = Format(label='HTML', ext='html')
DCAT = Format(label='DCAT', ext='xml')
class CenRa_Metabase(QDockWidget, DOCK_CLASS):
def __init__(self, parent=None):
_ = parent
super().__init__()
self.setupUi(self)
self.settings = QgsSettings()
self.current_datasource_uri = None
self.current_connection = None
# self.viewer.page().setLinkDelegationPolicy(QWebPage.DelegateAllLinks)
# self.viewer.page().linkClicked.connect(self.open_link)
# Help button
self.external_help.setText('')
self.external_help.setIcon(QIcon(QgsApplication.iconPath('mActionHelpContents.svg')))
self.external_help.clicked.connect(self.open_external_help)
# Flat table button
self.flatten_dataset_table.setText('')
# self.flatten_dataset_table.setToolTip(tr("Add the catalog table"))
self.flatten_dataset_table.setIcon(QgsApplication.getThemeIcon("/mActionAddHtml.svg"))
# self.flatten_dataset_table.clicked.connect(self.add_flatten_dataset_table)
# Settings menu
self.config.setAutoRaise(True)
# self.config.setToolTip(tr("Settings"))
self.config.setPopupMode(QToolButton.ToolButtonPopupMode(2))
self.config.setIcon(QgsApplication.getThemeIcon("/mActionOptions.svg"))
self.auto_open_dock_action = QAction(
("Dommage, cette option n'existe pas encore."),
iface.mainWindow())
self.auto_open_dock_action.setCheckable(True)
self.auto_open_dock_action.setChecked(
self.settings.value("pgmetadata/auto_open_dock", True, type=bool)
)
self.auto_open_dock_action.triggered.connect(self.save_auto_open_dock)
menu = QMenu()
menu.addAction(self.auto_open_dock_action)
self.config.setMenu(menu)
# Setting PDF/HTML menu
self.save_button.setAutoRaise(True)
# self.save_button.setToolTip(tr("Save metadata"))
self.save_button.setPopupMode(QToolButton.ToolButtonPopupMode(2))
self.save_button.setIcon(QIcon(QgsApplication.iconPath('mActionFileSave.svg')))
self.save_as_pdf = QAction(
('Enregistrer en PDF') + '',
iface.mainWindow())
self.save_as_pdf.triggered.connect(partial(self.export_dock_content, OutputFormats.PDF))
self.save_as_html = QAction(
('Enregistrer en HTML') + '',
iface.mainWindow())
self.save_as_html.triggered.connect(partial(self.export_dock_content, OutputFormats.HTML))
self.save_as_dcat = QAction(
('Enregistrer en DCAT') + '',
iface.mainWindow())
self.save_as_dcat.triggered.connect(partial(self.export_dock_content, OutputFormats.DCAT))
self.menu_save = QMenu()
self.menu_save.addAction(self.save_as_pdf)
self.menu_save.addAction(self.save_as_html)
self.menu_save.addAction(self.save_as_dcat)
self.save_button.setMenu(self.menu_save)
self.save_button.setEnabled(False)
self.metadata = QgsProviderRegistry.instance().providerMetadata('postgres')
# Display message in the dock
# if not settings_connections_names():
# self.default_html_content_not_installed()
# else:
self.default_html_content_not_pg_layer()
iface.layerTreeView().currentLayerChanged.connect(self.layer_changed)
try:
login_base()
iface.layerTreeView().currentLayerChanged.connect(self.layer_changed)
except ValueError:
# qgis.utils.plugins['CenRa_METABASE'].initGui()
qgis.utils.plugins['CenRa_METABASE'].unload()
# self.default_html_content_not_pg_layer()
if iface.activeLayer():
layer = iface.activeLayer()
iface.layerTreeView().setCurrentLayer(None)
iface.layerTreeView().setCurrentLayer(layer)
def export_dock_content(self, output_format: OutputFormats):
""" Export the current displayed metadata sheet to the given format. """
layer_name = iface.activeLayer().name()
file_path = os.path.join(
os.environ['USERPROFILE'],
'Desktop\\{name}.{ext}'.format(name=layer_name, ext=output_format.ext)
)
output_file = QFileDialog.getSaveFileName(
self,
("Enregistrer en {format}").format(format=output_format.label),
file_path,
"{label} (*.{ext})".format(
label=output_format.label,
ext=output_format.ext,
)
)
if output_file[0] == '':
return
self.settings.setValue("UI/lastFileNameWidgetDir", os.path.dirname(output_file[0]))
output_file_path = output_file[0]
parent_folder = str(Path(output_file_path).parent)
if output_format == OutputFormats.PDF:
printer = QPrinter()
printer.setOutputFormat(QPrinter.OutputFormat(1))
# printer.setPageMargins(20,20,20,20,QPrinter.Unit(0))
printer.setOutputFileName(output_file_path)
self.viewer.print(printer)
iface.messageBar().pushSuccess(
("Export PDF"),
(
"The metadata has been exported as PDF successfully in "
"<a href=\"{}\">{}</a>").format(parent_folder, output_file_path)
)
elif output_format in [OutputFormats.HTML, OutputFormats.DCAT]:
if output_format == OutputFormats.HTML:
data_str = self.viewer.page().currentFrame().toHtml()
else:
layer = iface.activeLayer()
uri = layer.dataProvider().uri()
dataall = self.sql_info(uri)
data = self.sql_to_xml(dataall)
with open(resources_path('xml', 'dcat.xml'), encoding='utf8') as xml_file:
xml_template = xml_file.read()
xml = parseString(xml_template.format(language=data[0][0], content=data[0][1]))
data_str = xml.toprettyxml()
with open(output_file[0], "w", encoding='utf8') as file_writer:
file_writer.write(data_str)
iface.messageBar().pushSuccess(
("Export") + ' ' + output_format.label,
(
"The metadata has been exported as {format} successfully in "
"<a href=\"{folder}\">{path}</a>").format(
format=output_format.label, folder=parent_folder, path=output_file_path)
)
def save_auto_open_dock(self):
""" Save settings about the dock. """
self.settings.setValue("pgmetadata/auto_open_dock", self.auto_open_dock_action.isChecked())
def sql_to_xml(self, dataall):
distribution = ''
for y in dataall[1]:
distribution = distribution + ('<dcat:distribution>' + '<dcat:Distribution>' + '<dct:title>{data}</dct:title>'.format(data=y[0]) + '<dcat:downloadURL>{data}</dcat:downloadURL>'.format(data=y[1]) + '<dcat:mediaType>{data}</dcat:mediaType>'.format(data=y[2]) + '<dct:format>{data}</dct:format>'.format(data=y[3]) + '<dct:bytesize>{data}</dct:bytesize>'.format(data=y[4]) + '</dcat:Distribution>' + '</dcat:distribution>')
publisher = ''
for z in dataall[2]:
publisher = publisher + ('<dct:publisher>' + '<foaf:Organization>' + '<foaf:name>{data}</foaf:name>'.format(data=z[1]) + '<foaf:mbox>{data}</foaf:mbox>'.format(data=z[3]) + '</foaf:Organization>' + '</dct:publisher>')
data_str = [[dataall[0][26], '<dct:identifier>{data}</dct:identifier>'.format(data=dataall[0][1]) + '<dct:title>{data}</dct:title>'.format(data=dataall[0][4]) + '<dct:description>{data}</dct:description>'.format(data=dataall[0][5]) + '<dct:language>{data}</dct:language>'.format(data=dataall[0][26]) + '<dct:spatial>{data}</dct:spatial>'.format(data=dataall[0][28]) + '<dct:created rdf:datatype="http://www.w3.org/2001/XMLSchema#dateTime">{data}</dct:created>'.format(data=dataall[0][20]) + '<dct:issued rdf:datatype="http://www.w3.org/2001/XMLSchema#dateTime">{data}</dct:issued>'.format(data=dataall[0][11]) + '<dct:modified rdf:datatype="http://www.w3.org/2001/XMLSchema#dateTime">{data}</dct:modified>'.format(data=dataall[0][21]) + '<dct:license>{data}</dct:license>'.format(data=dataall[0][13]) + distribution + publisher + '<dcat:theme>{data}</dcat:theme>'.format(data=", ".join(str(x) for x in dataall[0][24])) + '<dcat:keyword>{data}</dcat:keyword>'.format(data=", ".join(str(x) for x in dataall[0][6])) + '<dct:accrualPeriodicity>{data}</dct:accrualPeriodicity>'.format(data=dataall[0][12])]]
return data_str
@staticmethod
def sql_for_layer(uri, output_format: OutputFormats):
""" Get the SQL query for a given layer and output format. """
locale = QgsSettings().value("locale/userLocale", QLocale().name())
locale = locale.split('_')[0].lower()
if output_format == [OutputFormats.HTML, OutputFormats.DCAT]:
sql = (
"SELECT pgmetadata.get_dataset_item_html_content('{schema}', '{table}', '{locale}');"
).format(schema=uri.schema(), table=uri.table(), locale=locale)
else:
raise NotImplementedError('Output format is not yet implemented.')
return sql
def layer_changed(self, layer):
""" When the layer has changed in the legend, we must check this new layer. """
self.current_datasource_uri = None
self.current_connection = None
self.ce_trouve_dans_psql(layer)
def add_flatten_dataset_table(self):
""" Add a flatten dataset table with all links and contacts. """
'''
connections, message = connections_list()
if not connections:
LOGGER.critical(message)
self.set_html_content('PgMetadata', message)
return
if len(connections) > 1:
dialog = QInputDialog()
dialog.setComboBoxItems(connections)
dialog.setWindowTitle(tr("Database"))
dialog.setLabelText(tr("Choose the database to add the catalog"))
if not dialog.exec_():
return
connection_name = dialog.textValue()
else:
connection_name = connections[0]
metadata = QgsProviderRegistry.instance().providerMetadata('postgres')
connection = metadata.findConnection(connection_name)
locale = QgsSettings().value("locale/userLocale", QLocale().name())
locale = locale.split('_')[0].lower()
uri = QgsDataSourceUri(connection.uri())
uri.setTable(f'(SELECT * FROM pgmetadata.export_datasets_as_flat_table(\'{locale}\'))')
uri.setKeyColumn('uid')
layer = QgsVectorLayer(uri.uri(), '{} - {}'.format(tr("Catalog"), connection_name), 'postgres')
QgsProject.instance().addMapLayer(layer)
'''
@staticmethod
def open_external_help():
QDesktopServices.openUrl(QUrl('https://plateformesig.cenra-outils.org/'))
@staticmethod
def open_link(url):
QDesktopServices.openUrl(url)
def set_html_content(self, title=None, body=None):
""" Set the content in the dock. """
# ink_logo=resources_path('icons', 'CEN_RA.png')
css_file = resources_path('css', 'dock.css')
with open(css_file, 'r', encoding='utf8') as f:
css = f.read()
html = '<html><head>'
# html += '<script src="http://ignf.github.io/geoportal-sdk/latest/dist/2d/GpSDK2D.js" defer ></script>'
html += '<style>{css}</style></head><body>'.format(css=css)
# html += '<link rel="stylesheet" href="http://ignf.github.io/geoportal-sdk/latest/dist/2d/GpSDK2D.css" >'
# html += '<script src="file:///C:/Users/tlaveille/Desktop/maps.js" defer></script>'
# html += '<noscript>Your browser does not support JavaScript!</noscript>'
if title:
html += '<h2>{title} <img class=logo src=https://i2.wp.com/www.cen-rhonealpes.fr/wp-content/uploads/2013/04/cen-rhonealpes-couleurs1.jpg?w=340&ssl=1></h2>'.format(title=title)
if body:
html += body
html += '</body></html>'
# It must be a file, even if it does not exist on the file system.
# base_url = QUrl.fromLocalFile(resources_path('images', 'must_be_a_file.png'))
self.viewer.setHtml(html)
def ce_trouve_dans_psql(self, layer):
try:
uri = layer.dataProvider().uri()
except AttributeError:
self.default_html_content_not_pg_layer()
self.save_button.setEnabled(False)
uri = ''
if uri != '':
if not uri.table():
layertype = layer.providerType().lower()
if layertype == 'wms' or layertype == 'wfs':
self.set_html_to_wms(layer)
else:
self.default_html_content_not_pg_layer()
self.save_button.setEnabled(False)
else:
data_count = self.sql_check(uri)
# print(data_count)
if data_count == 0:
self.default_html_content_not_metadata()
self.save_button.setEnabled(False)
else:
self.build_html_content(layer, uri)
self.save_button.setEnabled(True)
def build_html_content(self, layer, uri):
body = ''
dataall = self.sql_info(uri)
data = dataall[0]
data_url = dataall[1]
data_contact = dataall[2]
# print(len(data_url))
# data_collonne = [field.name() for field in layer.dataProvider().fields()]
body += '<div><h3>Identification</h3><table class="table table-condensed">'
body += '<tr><th>Titre</th><td>{data[4]}</td></tr>'.format(data=data)
body += '<tr><th>Description</th><td>{data[5]}</td></tr>'.format(data=data)
body += '<tr><th>Categories</th><td>{data}</td></tr>'.format(data=(", ".join(str(x) for x in data[6])))
body += '<tr><th>Thèmes</th><td>{data}</td></tr>'.format(data=(", ".join(str(x) for x in data[24])))
body += '<tr><th>Mots-clés</th><td>{data[7]}</td></tr>'.format(data=data)
body += '<tr><th>Dernier mise à jour</th><td>{data[23]}</td></tr>'.format(data=data)
body += '<tr><th>Langue</th><td>{data[26]}</td></tr>'.format(data=data)
body += '</table></div>'
body += '<div><h3>Properties spatial</h3><table class="table table-condensed">'
body += '<tr><th>Niveau</th><td>{data[8]}</td></tr>'.format(data=data)
body += '<tr><th>Echelle minimum</th><td>{data[9]}</td></tr>'.format(data=data)
body += '<tr><th>Echelle maximum</th><td>{data[10]}</td></tr>'.format(data=data)
body += '<tr><th>Nombre d\'entités </th><td>{data[15]}</td></tr>'.format(data=data)
body += '<tr><th>Type de géométrie</th><td>{data[16]}</td></tr>'.format(data=data)
body += '<tr><th>Nom de projection</th><td>{data[17]}</td></tr>'.format(data=data)
body += '<tr><th>ID de projection</th><td>{data[18]}</td></tr>'.format(data=data)
body += '<tr><th>Emprise</th><td>{data[28]}</td></tr>'.format(data=data)
body += '</table></div>'
# body += '<div id="map"></div>'
body += '<div><h3>Publication</h3><table class="table table-condensed">'
body += '<tr><th>Date</th><td>{data[11]}</td></tr>'.format(data=data)
body += '<tr><th>Fréquence de mise à jour</th><td>{data[12]}</td></tr>'.format(data=data)
body += '<tr><th>Licence</th><td>{data[13]}</td></tr>'.format(data=data)
body += '<tr><th>Licence attribué</th><td>{data[25]}</td></tr>'.format(data=data)
body += '<tr><th>Restriction</th><td>{data[14]}</td></tr>'.format(data=data)
body += '</table></div>'
body += '<div><h3>Lien</h3><table class="table table-condensed table-striped table-bordered">'
body += '<tr><th>Type</th><th>URL</th><th>Type MIME</th><th>Format</th><th>Taille</th></tr>'
for value_url in data_url:
body += '<tr><td>{value_url[0]}</td><td>{value_url[1]}</td><td>{value_url[2]}</td><td>{value_url[3]}</td><td>{value_url[4]}</td></tr>'.format(value_url=value_url)
body += '</table></div>'
'''
body += '<div><h3>Liste des champs</h3><table class="table table-condensed table-striped table-bordered">'
for collonne in data_collonne:
body += '<tr><th>{collonne}</th><td>{defini}</td></tr>'.format(collonne=collonne,defini='')
body += '</table></div>'
'''
body += '<div><h3>Contacts</h3><table class="table table-condensed table-striped table-bordered">'
body += '<tr><th>Rôle</th><th>Nom</th><th>Organisation</th><th>Email</th><th>Télèphone</th></tr>'
for value_contact in data_contact:
body += '<tr><td>{value_contact[0]}</td><td>{value_contact[1]}</td><td>{value_contact[2]}</td><td>{value_contact[3]}</td><td>{value_contact[4]}</td></tr>'.format(value_contact=value_contact)
body += '</table></div>'
body += '<div><h3>Metadata</h3><table class="table table-condensed">'
body += '<tr><th>Table</th><td>{data[2]}</td></tr>'.format(data=data)
body += '<tr><th>Schema</th><td>{data[3]}</td></tr>'.format(data=data)
body += '<tr><th>Date de création</th><td>{data[20]}</td></tr>'.format(data=data)
body += '<tr><th>Date de modification</th><td>{data[21]}</td></tr>'.format(data=data)
body += '<tr><th>Encodage</th><td>{data[27]}</td></tr>'.format(data=data)
body += '<tr><th>UUID</th><td>{data[1]}</td></tr>'.format(data=data)
body += '</table></div>'
self.set_html_content(
layer.name(), body)
def set_html_to_wms(self, layer):
self.set_html_content(
'CenRa Metadata', (layer.htmlMetadata()))
def default_html_content_not_pg_layer(self):
""" When it's not a PostgreSQL layer. """
self.set_html_content(
'CenRa Metadata', ('Vous devez cliquer sur une couche dans la légende qui est stockée dans PostgreSQL.'))
def default_html_content_not_metadata(self):
self.set_html_content(
'CenRa Metadata', ('La couche ne contien pas de métadonnée.'))
def sql_check(self, uri):
cur = login_base()
table = uri.table()
schema = uri.schema()
sql_count = """SELECT count(uid) FROM metadata.dataset
WHERE schema_name LIKE '""" + schema + """' AND table_name LIKE '""" + table + """';"""
cur.execute(sql_count)
data_count = cur.fetchall()
cur.close()
return data_count[0][0]
def sql_info(self, uri):
cur = login_base()
table = uri.table()
schema = uri.schema()
# [s for s in iface.activeLayer().source().split(" ") if "dbname" in s][0].split("'")[1]
sql_find = """SELECT *,right(left(st_astext(geom,2),-2),-9) FROM metadata.dataset
WHERE schema_name LIKE '""" + schema + """' AND table_name LIKE '""" + table + """';"""
cur.execute(sql_find)
data_general = cur.fetchall()
sql_findurl = """SELECT type,url,mime,format,taille FROM metadata.dataurl WHERE schema_name LIKE '""" + schema + """' AND table_name LIKE '""" + table + """';"""
cur.execute(sql_findurl)
data_url = cur.fetchall()
sql_findcontact = """SELECT role,nom,organisation,email,telephone FROM metadata.datacontact WHERE schema_name LIKE '""" + schema + """' AND table_name LIKE '""" + table + """';"""
cur.execute(sql_findcontact)
data_contact = cur.fetchall()
cur.close()
return data_general[0], data_url, data_contact