"""Dock file."""
__copyright__ = 'Copyright 2020, 3Liz'
__license__ = 'GPL version 3'
__email__ = 'info@3liz.org'
import logging
import os
from collections import namedtuple
from enum import Enum
from functools import partial
from pathlib import Path
from xml.dom.minidom import parseString
from qgis.core import (
NULL,
QgsApplication,
QgsDataSourceUri,
QgsProject,
QgsProviderConnectionException,
QgsProviderRegistry,
QgsRasterLayer,
QgsSettings,
QgsVectorLayer,
)
from qgis.PyQt.QtCore import QLocale, QUrl
from qgis.PyQt.QtGui import QDesktopServices, QIcon
from qgis.PyQt.QtPrintSupport import QPrinter
from qgis.PyQt.QtWebKitWidgets import QWebPage
from qgis.PyQt.QtWidgets import (
QDialog,
QAction,
QDockWidget,
QFileDialog,
QInputDialog,
QMenu,
QToolButton,
)
from qgis.gui import *
from qgis.utils import iface
import qgis
'''
from pg_metadata.connection_manager import (
check_pgmetadata_is_installed,
connections_list,
settings_connections_names,
)
'''
from CenRa_METABASE.tools.PythonSQL import login_base
from .tools.resources import (
load_ui,
resources_path,
)
DOCK_CLASS = load_ui('CenRa_Metabase_dockwidget_base.ui')
LOGGER = logging.getLogger('CenRa_Metabase')
class Format(namedtuple('Format', ['label', 'ext'])):
""" Format available for exporting metadata. """
pass
class OutputFormats(Format, Enum):
""" Output format for a metadata sheet. """
PDF = Format(label='PDF', ext='pdf')
HTML = Format(label='HTML', ext='html')
DCAT = Format(label='DCAT', ext='xml')
class CenRa_Metabase(QDockWidget, DOCK_CLASS):
def __init__(self, parent=None):
_ = parent
super().__init__()
self.setupUi(self)
self.settings = QgsSettings()
self.current_datasource_uri = None
self.current_connection = None
self.viewer.page().setLinkDelegationPolicy(QWebPage.DelegateAllLinks)
self.viewer.page().linkClicked.connect(self.open_link)
# Help button
self.external_help.setText('')
self.external_help.setIcon(QIcon(QgsApplication.iconPath('mActionHelpContents.svg')))
self.external_help.clicked.connect(self.open_external_help)
# Flat table button
self.flatten_dataset_table.setText('')
#self.flatten_dataset_table.setToolTip(tr("Add the catalog table"))
self.flatten_dataset_table.setIcon(QgsApplication.getThemeIcon("/mActionAddHtml.svg"))
#self.flatten_dataset_table.clicked.connect(self.add_flatten_dataset_table)
# Settings menu
self.config.setAutoRaise(True)
#self.config.setToolTip(tr("Settings"))
self.config.setPopupMode(QToolButton.InstantPopup)
self.config.setIcon(QgsApplication.getThemeIcon("/mActionOptions.svg"))
self.auto_open_dock_action = QAction(
("Dommage, cette option n'existe pas encore."),
iface.mainWindow())
self.auto_open_dock_action.setCheckable(True)
self.auto_open_dock_action.setChecked(
self.settings.value("pgmetadata/auto_open_dock", True, type=bool)
)
self.auto_open_dock_action.triggered.connect(self.save_auto_open_dock)
menu = QMenu()
menu.addAction(self.auto_open_dock_action)
self.config.setMenu(menu)
# Setting PDF/HTML menu
self.save_button.setAutoRaise(True)
#self.save_button.setToolTip(tr("Save metadata"))
self.save_button.setPopupMode(QToolButton.InstantPopup)
self.save_button.setIcon(QIcon(QgsApplication.iconPath('mActionFileSave.svg')))
self.save_as_pdf = QAction(
('Enregistrer en PDF') + '…',
iface.mainWindow())
self.save_as_pdf.triggered.connect(partial(self.export_dock_content, OutputFormats.PDF))
self.save_as_html = QAction(
('Enregistrer en HTML') + '…',
iface.mainWindow())
self.save_as_html.triggered.connect(partial(self.export_dock_content, OutputFormats.HTML))
self.save_as_dcat = QAction(
('Enregistrer en DCAT') + '…',
iface.mainWindow())
self.save_as_dcat.triggered.connect(partial(self.export_dock_content, OutputFormats.DCAT))
self.menu_save = QMenu()
self.menu_save.addAction(self.save_as_pdf)
self.menu_save.addAction(self.save_as_html)
self.menu_save.addAction(self.save_as_dcat)
self.save_button.setMenu(self.menu_save)
self.save_button.setEnabled(False)
self.metadata = QgsProviderRegistry.instance().providerMetadata('postgres')
# Display message in the dock
#if not settings_connections_names():
#self.default_html_content_not_installed()
#else:
self.default_html_content_not_pg_layer()
try:
login_base()
iface.layerTreeView().currentLayerChanged.connect(self.layer_changed)
except:
#qgis.utils.plugins['CenRa_METABASE'].initGui()
qgis.utils.plugins['CenRa_METABASE'].unload()
#self.default_html_content_not_pg_layer()
if iface.activeLayer():
layer=iface.activeLayer()
iface.layerTreeView().setCurrentLayer(None)
iface.layerTreeView().setCurrentLayer(layer)
def export_dock_content(self, output_format: OutputFormats):
""" Export the current displayed metadata sheet to the given format. """
layer_name = iface.activeLayer().name()
file_path = os.path.join(
os.environ['USERPROFILE'],
'Desktop\\{name}.{ext}'.format(name=layer_name, ext=output_format.ext)
)
output_file = QFileDialog.getSaveFileName(
self,
("Enregistrer en {format}").format(format=output_format.label),
file_path,
"{label} (*.{ext})".format(
label=output_format.label,
ext=output_format.ext,
)
)
if output_file[0] == '':
return
self.settings.setValue("UI/lastFileNameWidgetDir", os.path.dirname(output_file[0]))
output_file_path = output_file[0]
parent_folder = str(Path(output_file_path).parent)
if output_format == OutputFormats.PDF:
printer = QPrinter()
printer.setOutputFormat(QPrinter.PdfFormat)
printer.setPageMargins(20, 20, 20, 20, QPrinter.Millimeter)
printer.setOutputFileName(output_file_path)
self.viewer.print(printer)
iface.messageBar().pushSuccess(
("Export PDF"),
(
"The metadata has been exported as PDF successfully in "
"{}").format(parent_folder, output_file_path)
)
elif output_format in [OutputFormats.HTML,OutputFormats.DCAT]:
if output_format == OutputFormats.HTML:
data_str = self.viewer.page().currentFrame().toHtml()
else:
layer = iface.activeLayer()
uri = layer.dataProvider().uri()
dataall = self.sql_info(uri)
data = self.sql_to_xml(dataall)
with open(resources_path('xml', 'dcat.xml'), encoding='utf8') as xml_file:
xml_template = xml_file.read()
xml = parseString(xml_template.format(language=data[0][0], content=data[0][1]))
data_str = xml.toprettyxml()
with open(output_file[0], "w", encoding='utf8') as file_writer:
file_writer.write(data_str)
iface.messageBar().pushSuccess(
("Export") + ' ' + output_format.label,
(
"The metadata has been exported as {format} successfully in "
"{path}").format(
format=output_format.label, folder=parent_folder, path=output_file_path)
)
def save_auto_open_dock(self):
""" Save settings about the dock. """
self.settings.setValue("pgmetadata/auto_open_dock", self.auto_open_dock_action.isChecked())
def sql_to_xml(self,dataall):
distribution=''
for y in dataall[1]:
distribution = distribution + (
''+
''+
'{data}'.format(data=y[0])+
'{data}'.format(data=y[1])+
'{data}'.format(data=y[2])+
'{data}'.format(data=y[3])+
'{data}'.format(data=y[4])+
''+
'')
publisher=''
for z in dataall[2]:
publisher = publisher + (
''+
''+
'{data}'.format(data=z[1])+
'{data}'.format(data=z[3])+
''+
'')
data_str = [[dataall[0][26],
'{data}'.format(data=dataall[0][1])+
'{data}'.format(data=dataall[0][4])+
'{data}'.format(data=dataall[0][5])+
'{data}'.format(data=dataall[0][26])+
'{data}'.format(data=dataall[0][28])+
'{data}'.format(data=dataall[0][20])+
'{data}'.format(data=dataall[0][11])+
'{data}'.format(data=dataall[0][21])+
'{data}'.format(data=dataall[0][13])+
distribution+
publisher+
'{data}'.format(data=", ".join(str(x) for x in dataall[0][24]))+
'{data}'.format(data=", ".join(str(x) for x in dataall[0][6]))+
'{data}'.format(data=dataall[0][12])]]
return data_str
@staticmethod
def sql_for_layer(uri, output_format: OutputFormats):
""" Get the SQL query for a given layer and output format. """
locale = QgsSettings().value("locale/userLocale", QLocale().name())
locale = locale.split('_')[0].lower()
if output_format == [OutputFormats.HTML,OutputFormats.DCAT]:
sql = (
"SELECT pgmetadata.get_dataset_item_html_content('{schema}', '{table}', '{locale}');"
).format(schema=uri.schema(), table=uri.table(), locale=locale)
else:
raise NotImplementedError('Output format is not yet implemented.')
return sql
def layer_changed(self, layer):
""" When the layer has changed in the legend, we must check this new layer. """
self.current_datasource_uri = None
self.current_connection = None
self.ce_trouve_dans_psql(layer)
def add_flatten_dataset_table(self):
""" Add a flatten dataset table with all links and contacts. """
'''
connections, message = connections_list()
if not connections:
LOGGER.critical(message)
self.set_html_content('PgMetadata', message)
return
if len(connections) > 1:
dialog = QInputDialog()
dialog.setComboBoxItems(connections)
dialog.setWindowTitle(tr("Database"))
dialog.setLabelText(tr("Choose the database to add the catalog"))
if not dialog.exec_():
return
connection_name = dialog.textValue()
else:
connection_name = connections[0]
metadata = QgsProviderRegistry.instance().providerMetadata('postgres')
connection = metadata.findConnection(connection_name)
locale = QgsSettings().value("locale/userLocale", QLocale().name())
locale = locale.split('_')[0].lower()
uri = QgsDataSourceUri(connection.uri())
uri.setTable(f'(SELECT * FROM pgmetadata.export_datasets_as_flat_table(\'{locale}\'))')
uri.setKeyColumn('uid')
layer = QgsVectorLayer(uri.uri(), '{} - {}'.format(tr("Catalog"), connection_name), 'postgres')
QgsProject.instance().addMapLayer(layer)
'''
@staticmethod
def open_external_help():
QDesktopServices.openUrl(QUrl('https://plateformesig.cenra-outils.org/'))
@staticmethod
def open_link(url):
QDesktopServices.openUrl(url)
def set_html_content(self, title=None, body=None):
""" Set the content in the dock. """
#link_logo=resources_path('icons', 'CEN_RA.png')
css_file = resources_path('css', 'dock.css')
with open(css_file, 'r', encoding='utf8') as f:
css = f.read()
html = '
'
#html += ''
html += ''.format(css=css)
#html += ''
#html += ''
#html += ''
if title:
html += '
{title}
'.format(title=title)
if body:
html += body
html += ''
# It must be a file, even if it does not exist on the file system.
base_url = QUrl.fromLocalFile(resources_path('images', 'must_be_a_file.png'))
self.viewer.setHtml(html, base_url)
def ce_trouve_dans_psql(self,layer):
try:
uri = layer.dataProvider().uri()
except:
self.default_html_content_not_pg_layer()
self.save_button.setEnabled(False)
uri=''
if uri != '':
if not uri.table():
layertype = layer.providerType().lower()
if layertype == 'wms' or layertype == 'wfs':
self.set_html_to_wms(layer)
else:
self.default_html_content_not_pg_layer()
self.save_button.setEnabled(False)
else:
data_count = self.sql_check(uri)
#print(data_count)
if data_count == 0:
self.default_html_content_not_metadata()
self.save_button.setEnabled(False)
else:
self.build_html_content(layer,uri)
self.save_button.setEnabled(True)
def build_html_content(self,layer,uri):
body = ''
dataall=self.sql_info(uri)
data=dataall[0]
data_url=dataall[1]
data_contact=dataall[2]
#print(len(data_url))
data_collonne=[field.name() for field in layer.dataProvider().fields()]
body += '
Identification
'
body += '
Titre
{data[4]}
'.format(data=data)
body += '
Description
{data[5]}
'.format(data=data)
body += '
Categories
{data}
'.format(data=(", ".join(str(x) for x in data[6])))
body += '
Thèmes
{data}
'.format(data=(", ".join(str(x) for x in data[24])))
body += '
Mots-clés
{data[7]}
'.format(data=data)
body += '
Dernier mise à jour
{data[23]}
'.format(data=data)
body += '
Langue
{data[26]}
'.format(data=data)
body += '
'
body += '
Properties spatial
'
body += '
Niveau
{data[8]}
'.format(data=data)
body += '
Echelle minimum
{data[9]}
'.format(data=data)
body += '
Echelle maximum
{data[10]}
'.format(data=data)
body += '
Nombre d\'entités
{data[15]}
'.format(data=data)
body += '
Type de géométrie
{data[16]}
'.format(data=data)
body += '
Nom de projection
{data[17]}
'.format(data=data)
body += '
ID de projection
{data[18]}
'.format(data=data)
body += '
Emprise
{data[28]}
'.format(data=data)
body += '
'
#body += ''
body += '
Publication
'
body += '
Date
{data[11]}
'.format(data=data)
body += '
Fréquence de mise à jour
{data[12]}
'.format(data=data)
body += '
Licence
{data[13]}
'.format(data=data)
body += '
Licence attribué
{data[25]}
'.format(data=data)
body += '
Restriction
{data[14]}
'.format(data=data)
body += '
'
body += '
Lien
'
body += '
Type
URL
Type MIME
Format
Taille
'
for value_url in data_url:
body += '
{value_url[0]}
{value_url[1]}
{value_url[2]}
{value_url[3]}
{value_url[4]}
'.format(value_url=value_url)
body += '
'
'''
body += '
Liste des champs
'
for collonne in data_collonne:
body += '
{collonne}
{defini}
'.format(collonne=collonne,defini='')
body += '
'
'''
body += '
Contacts
'
body += '
Rôle
Nom
Organisation
Email
Télèphone
'
for value_contact in data_contact:
body += '
{value_contact[0]}
{value_contact[1]}
{value_contact[2]}
{value_contact[3]}
{value_contact[4]}
'.format(value_contact=value_contact)
body += '
'
body += '
Metadata
'
body += '
Table
{data[2]}
'.format(data=data)
body += '
Schema
{data[3]}
'.format(data=data)
body += '
Date de création
{data[20]}
'.format(data=data)
body += '
Date de modification
{data[21]}
'.format(data=data)
body += '
Encodage
{data[27]}
'.format(data=data)
body += '
UUID
{data[1]}
'.format(data=data)
body += '
'
self.set_html_content(
layer.name(), body)
def set_html_to_wms(self,layer):
self.set_html_content(
'CenRa Metadata',(layer.htmlMetadata()))
def default_html_content_not_pg_layer(self):
""" When it's not a PostgreSQL layer. """
self.set_html_content(
'CenRa Metadata', ('Vous devez cliquer sur une couche dans la légende qui est stockée dans PostgreSQL.'))
def default_html_content_not_metadata(self):
self.set_html_content(
'CenRa Metadata', ('La couche ne contien pas de métadonnée.'))
def sql_check(self,uri):
cur=login_base()
table = uri.table()
schema = uri.schema()
sql_count = """SELECT count(uid) FROM metadata.dataset
WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';"""
cur.execute(sql_count)
data_count = cur.fetchall()
cur.close()
return data_count[0][0]
def sql_info(self,uri):
cur=login_base()
table = uri.table()
schema = uri.schema()
#[s for s in iface.activeLayer().source().split(" ") if "dbname" in s][0].split("'")[1]
sql_find = """SELECT *,right(left(st_astext(geom,2),-2),-9) FROM metadata.dataset
WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';"""
cur.execute(sql_find)
data_general = cur.fetchall()
sql_findurl = """SELECT type,url,mime,format,taille FROM metadata.dataurl WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';"""
cur.execute(sql_findurl)
data_url = cur.fetchall()
sql_findcontact = """SELECT role,nom,organisation,email,telephone FROM metadata.datacontact WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';"""
cur.execute(sql_findcontact)
data_contact = cur.fetchall()
cur.close()
return data_general[0],data_url,data_contact