"""Dock file.""" __copyright__ = 'Copyright 2020, 3Liz' __license__ = 'GPL version 3' __email__ = 'info@3liz.org' import logging import os from collections import namedtuple from enum import Enum from functools import partial from pathlib import Path from xml.dom.minidom import parseString from qgis.core import ( NULL, QgsApplication, QgsDataSourceUri, QgsProject, QgsProviderConnectionException, QgsProviderRegistry, QgsRasterLayer, QgsSettings, QgsVectorLayer, ) from qgis.PyQt.QtCore import QLocale, QUrl from qgis.PyQt.QtGui import QDesktopServices, QIcon from qgis.PyQt.QtPrintSupport import QPrinter #from qgis.PyQt.QtWebKitWidgets import QWebPage from qgis.PyQt.QtWidgets import ( QDialog, QAction, QDockWidget, QFileDialog, QInputDialog, QMenu, QToolButton, ) from qgis.gui import * from qgis.utils import iface import qgis ''' from pg_metadata.connection_manager import ( check_pgmetadata_is_installed, connections_list, settings_connections_names, ) ''' try: from .tools.PythonSQL import login_base except: print('Pas de fichier PythonSQL') from .tools.resources import ( load_ui, resources_path, ) DOCK_CLASS = load_ui('CenRa_Metabase_dockwidget_base.ui') LOGGER = logging.getLogger('CenRa_Metabase') class Format(namedtuple('Format', ['label', 'ext'])): """ Format available for exporting metadata. """ pass class OutputFormats(Format, Enum): """ Output format for a metadata sheet. """ PDF = Format(label='PDF', ext='pdf') HTML = Format(label='HTML', ext='html') DCAT = Format(label='DCAT', ext='xml') class CenRa_Metabase(QDockWidget, DOCK_CLASS): def __init__(self, parent=None): _ = parent super().__init__() self.setupUi(self) self.settings = QgsSettings() self.current_datasource_uri = None self.current_connection = None #self.viewer.page().setLinkDelegationPolicy(QWebPage.DelegateAllLinks) #self.viewer.page().linkClicked.connect(self.open_link) # Help button self.external_help.setText('') self.external_help.setIcon(QIcon(QgsApplication.iconPath('mActionHelpContents.svg'))) self.external_help.clicked.connect(self.open_external_help) # Flat table button self.flatten_dataset_table.setText('') #self.flatten_dataset_table.setToolTip(tr("Add the catalog table")) self.flatten_dataset_table.setIcon(QgsApplication.getThemeIcon("/mActionAddHtml.svg")) #self.flatten_dataset_table.clicked.connect(self.add_flatten_dataset_table) # Settings menu self.config.setAutoRaise(True) #self.config.setToolTip(tr("Settings")) self.config.setPopupMode(QToolButton.ToolButtonPopupMode(2)) self.config.setIcon(QgsApplication.getThemeIcon("/mActionOptions.svg")) self.auto_open_dock_action = QAction( ("Dommage, cette option n'existe pas encore."), iface.mainWindow()) self.auto_open_dock_action.setCheckable(True) self.auto_open_dock_action.setChecked( self.settings.value("pgmetadata/auto_open_dock", True, type=bool) ) self.auto_open_dock_action.triggered.connect(self.save_auto_open_dock) menu = QMenu() menu.addAction(self.auto_open_dock_action) self.config.setMenu(menu) # Setting PDF/HTML menu self.save_button.setAutoRaise(True) #self.save_button.setToolTip(tr("Save metadata")) self.save_button.setPopupMode(QToolButton.ToolButtonPopupMode(2)) self.save_button.setIcon(QIcon(QgsApplication.iconPath('mActionFileSave.svg'))) self.save_as_pdf = QAction( ('Enregistrer en PDF') + '…', iface.mainWindow()) self.save_as_pdf.triggered.connect(partial(self.export_dock_content, OutputFormats.PDF)) self.save_as_html = QAction( ('Enregistrer en HTML') + '…', iface.mainWindow()) self.save_as_html.triggered.connect(partial(self.export_dock_content, OutputFormats.HTML)) self.save_as_dcat = QAction( ('Enregistrer en DCAT') + '…', iface.mainWindow()) self.save_as_dcat.triggered.connect(partial(self.export_dock_content, OutputFormats.DCAT)) self.menu_save = QMenu() self.menu_save.addAction(self.save_as_pdf) self.menu_save.addAction(self.save_as_html) self.menu_save.addAction(self.save_as_dcat) self.save_button.setMenu(self.menu_save) self.save_button.setEnabled(False) self.metadata = QgsProviderRegistry.instance().providerMetadata('postgres') # Display message in the dock #if not settings_connections_names(): #self.default_html_content_not_installed() #else: self.default_html_content_not_pg_layer() try: login_base() iface.layerTreeView().currentLayerChanged.connect(self.layer_changed) except: #qgis.utils.plugins['CenRa_METABASE'].initGui() qgis.utils.plugins['CenRa_METABASE'].unload() #self.default_html_content_not_pg_layer() if iface.activeLayer(): layer=iface.activeLayer() iface.layerTreeView().setCurrentLayer(None) iface.layerTreeView().setCurrentLayer(layer) def export_dock_content(self, output_format: OutputFormats): """ Export the current displayed metadata sheet to the given format. """ layer_name = iface.activeLayer().name() file_path = os.path.join( os.environ['USERPROFILE'], 'Desktop\\{name}.{ext}'.format(name=layer_name, ext=output_format.ext) ) output_file = QFileDialog.getSaveFileName( self, ("Enregistrer en {format}").format(format=output_format.label), file_path, "{label} (*.{ext})".format( label=output_format.label, ext=output_format.ext, ) ) if output_file[0] == '': return self.settings.setValue("UI/lastFileNameWidgetDir", os.path.dirname(output_file[0])) output_file_path = output_file[0] parent_folder = str(Path(output_file_path).parent) if output_format == OutputFormats.PDF: printer = QPrinter() printer.setOutputFormat(QPrinter.OutputFormat(1)) #printer.setPageMargins(20,20,20,20,QPrinter.Unit(0)) printer.setOutputFileName(output_file_path) self.viewer.print(printer) iface.messageBar().pushSuccess( ("Export PDF"), ( "The metadata has been exported as PDF successfully in " "{}").format(parent_folder, output_file_path) ) elif output_format in [OutputFormats.HTML,OutputFormats.DCAT]: if output_format == OutputFormats.HTML: data_str = self.viewer.page().currentFrame().toHtml() else: layer = iface.activeLayer() uri = layer.dataProvider().uri() dataall = self.sql_info(uri) data = self.sql_to_xml(dataall) with open(resources_path('xml', 'dcat.xml'), encoding='utf8') as xml_file: xml_template = xml_file.read() xml = parseString(xml_template.format(language=data[0][0], content=data[0][1])) data_str = xml.toprettyxml() with open(output_file[0], "w", encoding='utf8') as file_writer: file_writer.write(data_str) iface.messageBar().pushSuccess( ("Export") + ' ' + output_format.label, ( "The metadata has been exported as {format} successfully in " "{path}").format( format=output_format.label, folder=parent_folder, path=output_file_path) ) def save_auto_open_dock(self): """ Save settings about the dock. """ self.settings.setValue("pgmetadata/auto_open_dock", self.auto_open_dock_action.isChecked()) def sql_to_xml(self,dataall): distribution='' for y in dataall[1]: distribution = distribution + ( ''+ ''+ '{data}'.format(data=y[0])+ '{data}'.format(data=y[1])+ '{data}'.format(data=y[2])+ '{data}'.format(data=y[3])+ '{data}'.format(data=y[4])+ ''+ '') publisher='' for z in dataall[2]: publisher = publisher + ( ''+ ''+ '{data}'.format(data=z[1])+ '{data}'.format(data=z[3])+ ''+ '') data_str = [[dataall[0][26], '{data}'.format(data=dataall[0][1])+ '{data}'.format(data=dataall[0][4])+ '{data}'.format(data=dataall[0][5])+ '{data}'.format(data=dataall[0][26])+ '{data}'.format(data=dataall[0][28])+ '{data}'.format(data=dataall[0][20])+ '{data}'.format(data=dataall[0][11])+ '{data}'.format(data=dataall[0][21])+ '{data}'.format(data=dataall[0][13])+ distribution+ publisher+ '{data}'.format(data=", ".join(str(x) for x in dataall[0][24]))+ '{data}'.format(data=", ".join(str(x) for x in dataall[0][6]))+ '{data}'.format(data=dataall[0][12])]] return data_str @staticmethod def sql_for_layer(uri, output_format: OutputFormats): """ Get the SQL query for a given layer and output format. """ locale = QgsSettings().value("locale/userLocale", QLocale().name()) locale = locale.split('_')[0].lower() if output_format == [OutputFormats.HTML,OutputFormats.DCAT]: sql = ( "SELECT pgmetadata.get_dataset_item_html_content('{schema}', '{table}', '{locale}');" ).format(schema=uri.schema(), table=uri.table(), locale=locale) else: raise NotImplementedError('Output format is not yet implemented.') return sql def layer_changed(self, layer): """ When the layer has changed in the legend, we must check this new layer. """ self.current_datasource_uri = None self.current_connection = None self.ce_trouve_dans_psql(layer) def add_flatten_dataset_table(self): """ Add a flatten dataset table with all links and contacts. """ ''' connections, message = connections_list() if not connections: LOGGER.critical(message) self.set_html_content('PgMetadata', message) return if len(connections) > 1: dialog = QInputDialog() dialog.setComboBoxItems(connections) dialog.setWindowTitle(tr("Database")) dialog.setLabelText(tr("Choose the database to add the catalog")) if not dialog.exec_(): return connection_name = dialog.textValue() else: connection_name = connections[0] metadata = QgsProviderRegistry.instance().providerMetadata('postgres') connection = metadata.findConnection(connection_name) locale = QgsSettings().value("locale/userLocale", QLocale().name()) locale = locale.split('_')[0].lower() uri = QgsDataSourceUri(connection.uri()) uri.setTable(f'(SELECT * FROM pgmetadata.export_datasets_as_flat_table(\'{locale}\'))') uri.setKeyColumn('uid') layer = QgsVectorLayer(uri.uri(), '{} - {}'.format(tr("Catalog"), connection_name), 'postgres') QgsProject.instance().addMapLayer(layer) ''' @staticmethod def open_external_help(): QDesktopServices.openUrl(QUrl('https://plateformesig.cenra-outils.org/')) @staticmethod def open_link(url): QDesktopServices.openUrl(url) def set_html_content(self, title=None, body=None): """ Set the content in the dock. """ #link_logo=resources_path('icons', 'CEN_RA.png') css_file = resources_path('css', 'dock.css') with open(css_file, 'r', encoding='utf8') as f: css = f.read() html = '' #html += '' html += ''.format(css=css) #html += '' #html += '' #html += '' if title: html += '

{title}

'.format(title=title) if body: html += body html += '' # It must be a file, even if it does not exist on the file system. base_url = QUrl.fromLocalFile(resources_path('images', 'must_be_a_file.png')) self.viewer.setHtml(html)#, base_url) def ce_trouve_dans_psql(self,layer): try: uri = layer.dataProvider().uri() except: self.default_html_content_not_pg_layer() self.save_button.setEnabled(False) uri='' if uri != '': if not uri.table(): layertype = layer.providerType().lower() if layertype == 'wms' or layertype == 'wfs': self.set_html_to_wms(layer) else: self.default_html_content_not_pg_layer() self.save_button.setEnabled(False) else: data_count = self.sql_check(uri) #print(data_count) if data_count == 0: self.default_html_content_not_metadata() self.save_button.setEnabled(False) else: self.build_html_content(layer,uri) self.save_button.setEnabled(True) def build_html_content(self,layer,uri): body = '' dataall=self.sql_info(uri) data=dataall[0] data_url=dataall[1] data_contact=dataall[2] #print(len(data_url)) data_collonne=[field.name() for field in layer.dataProvider().fields()] body += '

Identification

' body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=(", ".join(str(x) for x in data[6]))) body += ''.format(data=(", ".join(str(x) for x in data[24]))) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += '
Titre{data[4]}
Description{data[5]}
Categories{data}
Thèmes{data}
Mots-clés{data[7]}
Dernier mise à jour{data[23]}
Langue{data[26]}
' body += '

Properties spatial

' body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += '
Niveau{data[8]}
Echelle minimum{data[9]}
Echelle maximum{data[10]}
Nombre d\'entités {data[15]}
Type de géométrie{data[16]}
Nom de projection{data[17]}
ID de projection{data[18]}
Emprise{data[28]}
' #body += '
' body += '

Publication

' body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += '
Date{data[11]}
Fréquence de mise à jour{data[12]}
Licence{data[13]}
Licence attribué{data[25]}
Restriction{data[14]}
' body += '

Lien

' body += '' for value_url in data_url: body += ''.format(value_url=value_url) body += '
TypeURLType MIMEFormatTaille
{value_url[0]}{value_url[1]}{value_url[2]}{value_url[3]}{value_url[4]}
' ''' body += '

Liste des champs

' for collonne in data_collonne: body += ''.format(collonne=collonne,defini='') body += '
{collonne}{defini}
' ''' body += '

Contacts

' body += '' for value_contact in data_contact: body += ''.format(value_contact=value_contact) body += '
RôleNomOrganisationEmailTélèphone
{value_contact[0]}{value_contact[1]}{value_contact[2]}{value_contact[3]}{value_contact[4]}
' body += '

Metadata

' body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += ''.format(data=data) body += '
Table{data[2]}
Schema{data[3]}
Date de création{data[20]}
Date de modification{data[21]}
Encodage{data[27]}
UUID{data[1]}
' self.set_html_content( layer.name(), body) def set_html_to_wms(self,layer): self.set_html_content( 'CenRa Metadata',(layer.htmlMetadata())) def default_html_content_not_pg_layer(self): """ When it's not a PostgreSQL layer. """ self.set_html_content( 'CenRa Metadata', ('Vous devez cliquer sur une couche dans la légende qui est stockée dans PostgreSQL.')) def default_html_content_not_metadata(self): self.set_html_content( 'CenRa Metadata', ('La couche ne contien pas de métadonnée.')) def sql_check(self,uri): cur=login_base() table = uri.table() schema = uri.schema() sql_count = """SELECT count(uid) FROM metadata.dataset WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';""" cur.execute(sql_count) data_count = cur.fetchall() cur.close() return data_count[0][0] def sql_info(self,uri): cur=login_base() table = uri.table() schema = uri.schema() #[s for s in iface.activeLayer().source().split(" ") if "dbname" in s][0].split("'")[1] sql_find = """SELECT *,right(left(st_astext(geom,2),-2),-9) FROM metadata.dataset WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';""" cur.execute(sql_find) data_general = cur.fetchall() sql_findurl = """SELECT type,url,mime,format,taille FROM metadata.dataurl WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';""" cur.execute(sql_findurl) data_url = cur.fetchall() sql_findcontact = """SELECT role,nom,organisation,email,telephone FROM metadata.datacontact WHERE schema_name LIKE '"""+schema+"""' AND table_name LIKE '"""+table+"""';""" cur.execute(sql_findcontact) data_contact = cur.fetchall() cur.close() return data_general[0],data_url,data_contact