Plugin_QGIS/CenRa_FLUX/flux_editor.py

362 lines
16 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import absolute_import
# Import the PyQt and QGIS libraries
from builtins import next
from builtins import str
from builtins import object
import qgis
from qgis.PyQt import QtCore
from qgis.PyQt.QtCore import QSettings
from qgis.PyQt import QtWidgets
from qgis.PyQt.QtWidgets import QAction, QMenu, QDialog
from qgis.PyQt.QtGui import QIcon
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5 import QtGui
from qgis.core import *
from qgis.core import QgsDataSourceUri
from qgis.PyQt.QtWidgets import (
QDialog,
QAction,
QDockWidget,
QFileDialog,
QInputDialog,
QMenu,
QToolButton,
QTableWidget,
QTableWidgetItem,
QMessageBox,
QVBoxLayout,
)
from .tools.PythonSQL import *
from .tools.resources import (
load_ui,
resources_path,
login_base,
send_issues,
)
from .issues import CenRa_Issues
from qgis.utils import iface
import os.path
import webbrowser, os
import psycopg2
import psycopg2.extras
import base64
first_conn = psycopg2.connect("host=" + host + " port=" + port + " dbname="+dbname+" user=first_cnx password=" + password)
first_cur = first_conn.cursor(cursor_factory = psycopg2.extras.DictCursor)
first_cur.execute("SELECT mdp_w, login_w FROM pg_catalog.pg_user t1, admin_sig.vm_users_sig t2 WHERE t2.oid = t1.usesysid AND (login_w = '" + os_user + "' OR login_w = '" + os_user + "')")
res_ident = first_cur.fetchone()
mdp = base64.b64decode(str(res_ident[0])).decode('utf-8')
user = res_ident[1]
first_conn.close()
EDITOR_CLASS = load_ui('CenRa_Flux_base.ui')
class Flux_Editor(QDialog, EDITOR_CLASS):
def __init__(self, parent=None):
_ = parent
super().__init__()
self.setupUi(self)
self.settings = QgsSettings()
self.s = QSettings()
self.setWindowIcon(QtGui.QIcon(resources_path('icons','icon.png')))
self.first_start = None
self.iface = iface
self.label_3.setPixmap(QtGui.QPixmap(resources_path('ui','logo.png')))
self.commandLinkButton.setIcon(QtGui.QIcon(resources_path('ui','arrow-bottom.png')))
self.commandLinkButton_2.setIcon(QtGui.QIcon(resources_path('ui','arrow-up.png')))
self.tableWidget.setSelectionBehavior(QTableWidget.SelectRows)
self.tableWidget.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
self.comboBox_2.addItem("SIG")
self.comboBox_2.addItem('REF')
self.comboBox.currentIndexChanged.connect(self.initialisation_flux)
self.commandLinkButton.clicked.connect(self.selection_flux)
self.tableWidget.itemDoubleClicked.connect(self.selection_flux)
self.pushButton_2.clicked.connect(self.limite_flux)
self.commandLinkButton_2.clicked.connect(self.suppression_flux)
self.tableWidget_2.itemDoubleClicked.connect(self.suppression_flux)
self.comboBox_2.currentIndexChanged.connect(self.bd_source)
layout = QVBoxLayout()
self.lineEdit.textChanged.connect(self.filtre_dynamique)
layout.addWidget(self.lineEdit)
#self.lineEdit.mousePressEvent = self._mousePressEvent
#metadonnees_plugin = open(self.plugin_path + '/metadata.txt')
#infos_metadonnees = metadonnees_plugin.readlines()
def raise_(self):
"""Run method that performs all the real work"""
self.bd_source()
def bd_source(self):
self.activateWindow()
bd_origine=self.comboBox_2.currentText()
if bd_origine == 'REF':
self.run_ref()
if bd_origine == 'SIG':
self.run_sig()
def run_ref(self):
"""Run method that performs all the real work"""
while self.tableWidget_2.rowCount() > 0:
self.tableWidget_2.removeRow(self.tableWidget_2.rowCount()-1)
# print(self.tableWidget_2.rowCount())
global cur,con,dbtype
dbtype=refdb
con = psycopg2.connect("host=" + host + " port=" + port + " dbname="+dbtype+" user=" + user + " password=" + mdp)
cur = con.cursor(cursor_factory = psycopg2.extras.DictCursor)
self.initialisation_flux()
self.combobox_custom()
# Create the dialog with elements (after translation) and keep reference
# Only create GUI ONCE in callback, so that it will only load when the plugin is started
if self.first_start == True:
self.first_start = False
# show the dialog
self.show()
# Run the dialog event loop
result = self.exec_()
# See if OK was pressed
if result:
# Do something useful here - delete the line containing pass and
# substitute with your code.
pass
def run_sig(self):
"""Run method that performs all the real work"""
while self.tableWidget_2.rowCount() > 0:
self.tableWidget_2.removeRow(self.tableWidget_2.rowCount()-1)
global cur,con,dbtype
dbtype=sigdb
con = psycopg2.connect("host=" + host + " port=" + port + " dbname="+dbtype+" user=" + user + " password=" + mdp)
cur = con.cursor(cursor_factory = psycopg2.extras.DictCursor)
self.initialisation_flux()
self.combobox_custom()
# Create the dialog with elements (after translation) and keep reference
# Only create GUI ONCE in callback, so that it will only load when the plugin is started
if self.first_start == True:
self.first_start = False
# show the dialog
self.show()
# Run the dialog event loop
result = self.exec_()
# See if OK was pressed
if result:
# Do something useful here - delete the line containing pass and
# substitute with your code.
pass
def suppression_flux(self):
self.tableWidget_2.removeRow(self.tableWidget_2.currentRow())
def open_url(self, item):
url = item.data(Qt.UserRole)
def initialisation_flux(self):
if dbtype == sigdb:
if self.comboBox.currentText() == 'toutes les catégories':
custom_list=schemaname_list
elif self.comboBox.currentText() == 'travaux':
custom_list="""(SELECT schemaname,tablename from pg_catalog.pg_tables
where schemaname like '"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) UNION (SELECT schemaname,matviewname AS tablename FROM pg_catalog.pg_matviews where schemaname like '"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) order by schemaname,tablename;"""
else:
custom_list="""(SELECT schemaname,tablename from pg_catalog.pg_tables
where schemaname like '\_"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) UNION (SELECT schemaname,matviewname AS tablename FROM pg_catalog.pg_matviews where schemaname like '\_"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) order by schemaname,tablename;"""
else:
if self.comboBox.currentText() == 'toutes les catégories':
custom_list=schemaname_list_ref
else:
custom_list="""SELECT schemaname,tablename from pg_catalog.pg_tables
where schemaname like '"""+ str(self.comboBox.currentText()) +"""' order by schemaname,tablename;"""
cur.execute(custom_list)
list_schema = cur.fetchall()
self.tableWidget.setRowCount(len(list_schema))
self.tableWidget.setColumnCount(3)
i=0
for value in list_schema:
if dbtype == sigdb:
type_val = str(value[0])[1:3]
schema_name=str(value[0])[4:]
table_name=str(value[1])
else:
type_val = ''
schema_name=str(value[0])
table_name=str(value[1])
if type_val == 'fo':
type_val=str(value[0])[1:5]
schema_name=str(value[0])[6:]
table_name=str(value[1])
elif type_val == 'ra':
type_val='travaux'
schema_name=str(value[0])
table_name=str(value[1])
elif type_val != '00' and type_val != '01' and type_val != '07' and type_val != '26' and type_val != '42' and type_val != '69' and type_val != 'ra':
type_val='agregation'
schema_name=str(value[0])
table_name=str(value[1])
item = QTableWidgetItem(type_val)
self.tableWidget.setItem(i,0,item)
item = QTableWidgetItem(schema_name)
self.tableWidget.setItem(i,1,item)
item = QTableWidgetItem(table_name)
self.tableWidget.setItem(i,2,item)
i=i+1
self.tableWidget.setColumnWidth(0, 20)
self.tableWidget.setColumnWidth(1, 300)
self.tableWidget.setColumnWidth(2, 300)
self.tableWidget.setHorizontalHeaderLabels(["Code","Schema","Table"])
def selection_flux(self):
selected_row = 0
selected_items = self.tableWidget.selectedItems()
# Assuming you want to compare items in the first column for uniqueness
new_item_text = selected_items[2].text()
if not self.item_already_exists(new_item_text):
self.tableWidget_2.insertRow(selected_row)
for column in range(self.tableWidget.columnCount()):
cloned_item = selected_items[column].clone()
self.tableWidget_2.setHorizontalHeaderLabels(["Code","Schema", "Table"])
self.tableWidget_2.setColumnCount(3)
self.tableWidget_2.setItem(selected_row, column, cloned_item)
self.tableWidget_2.setColumnWidth(0, 50)
self.tableWidget_2.setColumnWidth(1, 300)
self.tableWidget_2.setColumnWidth(2, 300)
def item_already_exists(self, new_item_text):
# Assuming you want to compare items in the first column for uniqueness
existing_items = self.tableWidget_2.findItems(new_item_text, QtCore.Qt.MatchExactly)
# Check if there are any existing items with the same text in the first column
return len(existing_items) > 0
def limite_flux(self):
if self.tableWidget_2.rowCount() > 5:
self.QMBquestion = QMessageBox.question(iface.mainWindow(), u"Attention !",
"Le nombre de flux à charger en une seule fois est limité à 5 pour des questions de performances. Souhaitez vous tout de même charger les " + str(
self.tableWidget_2.rowCount()) + " flux sélectionnés ? (risque de plantage de QGIS)",
QMessageBox.Yes | QMessageBox.No)
if self.QMBquestion == QMessageBox.Yes:
self.chargement_flux()
if self.QMBquestion == QMessageBox.No:
print("Annulation du chargement des couches")
if self.tableWidget_2.rowCount() <= 5:
self.chargement_flux()
def chargement_flux(self):
managerAU = QgsApplication.authManager()
k = managerAU.availableAuthMethodConfigs().keys()
def REQUEST(type):
switcher = {
'WFS': "GetFeature",
'WMS': "GetMap",
'WMS+Vecteur': "GetMap",
'WMS+Raster': "GetMap",
'WMTS': "GetMap"
}
return switcher.get(type, "nothing")
def displayOnWindows(type, uri, name):
p = []
for row in range(0, self.tableWidget_2.rowCount()):
## supression de la partie de l'url après le point d'interrogation
if dbtype == sigdb:
code = self.tableWidget_2.item(row,0).text()
schema = '_'+code+'_'+self.tableWidget_2.item(row,1).text()
if code == 'travaux' or code == 'agregation':
schema = self.tableWidget_2.item(row,1).text()
table = self.tableWidget_2.item(row,2).text()#.split("?", 1)[0]
if dbtype == refdb:
# code = self.tableWidget_2.item(row,0).text()
schema = self.tableWidget_2.item(row,1).text()
table = self.tableWidget_2.item(row,2).text()#.split("?", 1)[0]
uri = QgsDataSourceUri()
uri.setConnection(host ,port ,dbtype ,user ,mdp)
# nom du schéma à remplacer: "hydrographie" à supprimer et mettre "couches_collaboratives" lorsqu'on aura regroupé les couches à modifier dans un même schéma
uri.setDataSource(schema, table, "geom")
uri.setKeyColumn('gid')
# Chargement de la couche PostGIS
geom_type ='SELECT right(st_geometrytype(geom),-3) as a FROM '+schema+'.'+table+' GROUP BY a'
cur.execute(geom_type)
list_typegeom = cur.fetchall()
print(len(list_typegeom))
if len(list_typegeom) > 1:
for typegeom in list_typegeom:
if typegeom[0] == 'MultiPolygon':
uri.setWkbType(QgsWkbTypes.MultiPolygon)
elif typegeom[0] == 'MultiLineString':
uri.setWkbType(QgsWkbTypes.MultiLineString)
elif typegeom[0] == 'Point':
uri.setWkbType(QgsWkbTypes.Point)
if typegeom[0] == 'Polygon':
uri.setWkbType(QgsWkbTypes.Polygon)
elif typegeom[0] == 'LineString':
uri.setWkbType(QgsWkbTypes.LineString)
elif typegeom[0] == 'MultiPoint':
uri.setWkbType(QgsWkbTypes.MultiPoint)
if (typegeom[0] != None and typegeom[0] != 'Polygon'):
uri.setSrid('2154')
layer = QgsVectorLayer(uri.uri(), table, "postgres")
# Ajout de la couche au canevas QGIS
QgsProject.instance().addMapLayer(layer)
else:
layer = QgsVectorLayer(uri.uri(), table, "postgres")
# Ajout de la couche au canevas QGIS
QgsProject.instance().addMapLayer(layer)
def combobox_custom(self):
if dbtype == sigdb:
self.comboBox.clear()
self.comboBox.addItem("toutes les catégories")
self.comboBox.addItem('00')
self.comboBox.addItem('01')
self.comboBox.addItem('07')
self.comboBox.addItem('26')
self.comboBox.addItem('42')
self.comboBox.addItem('69')
self.comboBox.addItem('agregation')
self.comboBox.addItem('travaux')
self.comboBox.addItem('form')
if dbtype == refdb:
custom_list=schemaname_distinct
cur.execute(custom_list)
list_schema = cur.fetchall()
self.comboBox.clear()
self.comboBox.addItem("toutes les catégories")
for baxval in list_schema:
self.comboBox.addItem(baxval[0])
def filtre_dynamique(self, filter_text):
for i in range(self.tableWidget.rowCount()):
for j in range(self.tableWidget.columnCount()):
item = self.tableWidget.item(i, j)
match = filter_text.lower() not in item.text().lower()
self.tableWidget.setRowHidden(i, match)
if not match:
break
def popup(self):
self.dialog = Popup() # +++ - self
self.dialog.text_edit.show()