forked from CEN-RA/Plugin_QGIS
362 lines
16 KiB
Python
362 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import absolute_import
|
|
# Import the PyQt and QGIS libraries
|
|
from builtins import next
|
|
from builtins import str
|
|
from builtins import object
|
|
import qgis
|
|
from qgis.PyQt import QtCore
|
|
from qgis.PyQt.QtCore import QSettings
|
|
from qgis.PyQt import QtWidgets
|
|
from qgis.PyQt.QtWidgets import QAction, QMenu, QDialog
|
|
from qgis.PyQt.QtGui import QIcon
|
|
from PyQt5.QtCore import *
|
|
from PyQt5.QtGui import *
|
|
from PyQt5 import QtGui
|
|
from qgis.core import *
|
|
from qgis.core import QgsDataSourceUri
|
|
from qgis.PyQt.QtWidgets import (
|
|
QDialog,
|
|
QAction,
|
|
QDockWidget,
|
|
QFileDialog,
|
|
QInputDialog,
|
|
QMenu,
|
|
QToolButton,
|
|
QTableWidget,
|
|
QTableWidgetItem,
|
|
QMessageBox,
|
|
QVBoxLayout,
|
|
)
|
|
from .tools.PythonSQL import *
|
|
from .tools.resources import (
|
|
load_ui,
|
|
resources_path,
|
|
login_base,
|
|
send_issues,
|
|
)
|
|
from .issues import CenRa_Issues
|
|
|
|
from qgis.utils import iface
|
|
import os.path
|
|
import webbrowser, os
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import base64
|
|
|
|
first_conn = psycopg2.connect("host=" + host + " port=" + port + " dbname="+dbname+" user=first_cnx password=" + password)
|
|
first_cur = first_conn.cursor(cursor_factory = psycopg2.extras.DictCursor)
|
|
first_cur.execute("SELECT mdp_w, login_w FROM pg_catalog.pg_user t1, admin_sig.vm_users_sig t2 WHERE t2.oid = t1.usesysid AND (login_w = '" + os_user + "' OR login_w = '" + os_user + "')")
|
|
res_ident = first_cur.fetchone()
|
|
mdp = base64.b64decode(str(res_ident[0])).decode('utf-8')
|
|
user = res_ident[1]
|
|
first_conn.close()
|
|
|
|
EDITOR_CLASS = load_ui('CenRa_Flux_base.ui')
|
|
|
|
class Flux_Editor(QDialog, EDITOR_CLASS):
|
|
|
|
def __init__(self, parent=None):
|
|
_ = parent
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
self.settings = QgsSettings()
|
|
self.s = QSettings()
|
|
self.setWindowIcon(QtGui.QIcon(resources_path('icons','icon.png')))
|
|
self.first_start = None
|
|
self.iface = iface
|
|
self.label_3.setPixmap(QtGui.QPixmap(resources_path('ui','logo.png')))
|
|
self.commandLinkButton.setIcon(QtGui.QIcon(resources_path('ui','arrow-bottom.png')))
|
|
self.commandLinkButton_2.setIcon(QtGui.QIcon(resources_path('ui','arrow-up.png')))
|
|
|
|
self.tableWidget.setSelectionBehavior(QTableWidget.SelectRows)
|
|
self.tableWidget.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
|
|
self.comboBox_2.addItem("SIG")
|
|
self.comboBox_2.addItem('REF')
|
|
self.comboBox.currentIndexChanged.connect(self.initialisation_flux)
|
|
self.commandLinkButton.clicked.connect(self.selection_flux)
|
|
self.tableWidget.itemDoubleClicked.connect(self.selection_flux)
|
|
self.pushButton_2.clicked.connect(self.limite_flux)
|
|
self.commandLinkButton_2.clicked.connect(self.suppression_flux)
|
|
self.tableWidget_2.itemDoubleClicked.connect(self.suppression_flux)
|
|
self.comboBox_2.currentIndexChanged.connect(self.bd_source)
|
|
layout = QVBoxLayout()
|
|
self.lineEdit.textChanged.connect(self.filtre_dynamique)
|
|
layout.addWidget(self.lineEdit)
|
|
#self.lineEdit.mousePressEvent = self._mousePressEvent
|
|
|
|
#metadonnees_plugin = open(self.plugin_path + '/metadata.txt')
|
|
#infos_metadonnees = metadonnees_plugin.readlines()
|
|
|
|
def raise_(self):
|
|
"""Run method that performs all the real work"""
|
|
self.bd_source()
|
|
|
|
def bd_source(self):
|
|
self.activateWindow()
|
|
bd_origine=self.comboBox_2.currentText()
|
|
if bd_origine == 'REF':
|
|
self.run_ref()
|
|
if bd_origine == 'SIG':
|
|
self.run_sig()
|
|
|
|
def run_ref(self):
|
|
"""Run method that performs all the real work"""
|
|
while self.tableWidget_2.rowCount() > 0:
|
|
self.tableWidget_2.removeRow(self.tableWidget_2.rowCount()-1)
|
|
# print(self.tableWidget_2.rowCount())
|
|
global cur,con,dbtype
|
|
dbtype=refdb
|
|
con = psycopg2.connect("host=" + host + " port=" + port + " dbname="+dbtype+" user=" + user + " password=" + mdp)
|
|
cur = con.cursor(cursor_factory = psycopg2.extras.DictCursor)
|
|
self.initialisation_flux()
|
|
self.combobox_custom()
|
|
# Create the dialog with elements (after translation) and keep reference
|
|
# Only create GUI ONCE in callback, so that it will only load when the plugin is started
|
|
if self.first_start == True:
|
|
self.first_start = False
|
|
# show the dialog
|
|
self.show()
|
|
# Run the dialog event loop
|
|
result = self.exec_()
|
|
# See if OK was pressed
|
|
if result:
|
|
# Do something useful here - delete the line containing pass and
|
|
# substitute with your code.
|
|
pass
|
|
|
|
def run_sig(self):
|
|
"""Run method that performs all the real work"""
|
|
while self.tableWidget_2.rowCount() > 0:
|
|
self.tableWidget_2.removeRow(self.tableWidget_2.rowCount()-1)
|
|
global cur,con,dbtype
|
|
dbtype=sigdb
|
|
con = psycopg2.connect("host=" + host + " port=" + port + " dbname="+dbtype+" user=" + user + " password=" + mdp)
|
|
cur = con.cursor(cursor_factory = psycopg2.extras.DictCursor)
|
|
self.initialisation_flux()
|
|
self.combobox_custom()
|
|
# Create the dialog with elements (after translation) and keep reference
|
|
# Only create GUI ONCE in callback, so that it will only load when the plugin is started
|
|
if self.first_start == True:
|
|
self.first_start = False
|
|
# show the dialog
|
|
self.show()
|
|
# Run the dialog event loop
|
|
result = self.exec_()
|
|
# See if OK was pressed
|
|
if result:
|
|
# Do something useful here - delete the line containing pass and
|
|
# substitute with your code.
|
|
pass
|
|
|
|
def suppression_flux(self):
|
|
self.tableWidget_2.removeRow(self.tableWidget_2.currentRow())
|
|
|
|
def open_url(self, item):
|
|
url = item.data(Qt.UserRole)
|
|
|
|
def initialisation_flux(self):
|
|
if dbtype == sigdb:
|
|
if self.comboBox.currentText() == 'toutes les catégories':
|
|
custom_list=schemaname_list
|
|
elif self.comboBox.currentText() == 'travaux':
|
|
custom_list="""(SELECT schemaname,tablename from pg_catalog.pg_tables
|
|
where schemaname like '"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) UNION (SELECT schemaname,matviewname AS tablename FROM pg_catalog.pg_matviews where schemaname like '"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) order by schemaname,tablename;"""
|
|
else:
|
|
custom_list="""(SELECT schemaname,tablename from pg_catalog.pg_tables
|
|
where schemaname like '\_"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) UNION (SELECT schemaname,matviewname AS tablename FROM pg_catalog.pg_matviews where schemaname like '\_"""+ str(self.comboBox.currentText()) +"""%' order by schemaname,tablename) order by schemaname,tablename;"""
|
|
else:
|
|
if self.comboBox.currentText() == 'toutes les catégories':
|
|
custom_list=schemaname_list_ref
|
|
else:
|
|
custom_list="""SELECT schemaname,tablename from pg_catalog.pg_tables
|
|
where schemaname like '"""+ str(self.comboBox.currentText()) +"""' order by schemaname,tablename;"""
|
|
cur.execute(custom_list)
|
|
list_schema = cur.fetchall()
|
|
|
|
self.tableWidget.setRowCount(len(list_schema))
|
|
self.tableWidget.setColumnCount(3)
|
|
i=0
|
|
for value in list_schema:
|
|
if dbtype == sigdb:
|
|
type_val = str(value[0])[1:3]
|
|
schema_name=str(value[0])[4:]
|
|
table_name=str(value[1])
|
|
else:
|
|
type_val = ''
|
|
schema_name=str(value[0])
|
|
table_name=str(value[1])
|
|
if type_val == 'fo':
|
|
type_val=str(value[0])[1:5]
|
|
schema_name=str(value[0])[6:]
|
|
table_name=str(value[1])
|
|
elif type_val == 'ra':
|
|
type_val='travaux'
|
|
schema_name=str(value[0])
|
|
table_name=str(value[1])
|
|
elif type_val != '00' and type_val != '01' and type_val != '07' and type_val != '26' and type_val != '42' and type_val != '69' and type_val != 'ra':
|
|
type_val='agregation'
|
|
schema_name=str(value[0])
|
|
table_name=str(value[1])
|
|
|
|
item = QTableWidgetItem(type_val)
|
|
self.tableWidget.setItem(i,0,item)
|
|
item = QTableWidgetItem(schema_name)
|
|
self.tableWidget.setItem(i,1,item)
|
|
item = QTableWidgetItem(table_name)
|
|
self.tableWidget.setItem(i,2,item)
|
|
i=i+1
|
|
self.tableWidget.setColumnWidth(0, 20)
|
|
self.tableWidget.setColumnWidth(1, 300)
|
|
self.tableWidget.setColumnWidth(2, 300)
|
|
self.tableWidget.setHorizontalHeaderLabels(["Code","Schema","Table"])
|
|
def selection_flux(self):
|
|
selected_row = 0
|
|
selected_items = self.tableWidget.selectedItems()
|
|
|
|
# Assuming you want to compare items in the first column for uniqueness
|
|
new_item_text = selected_items[2].text()
|
|
|
|
if not self.item_already_exists(new_item_text):
|
|
self.tableWidget_2.insertRow(selected_row)
|
|
|
|
for column in range(self.tableWidget.columnCount()):
|
|
cloned_item = selected_items[column].clone()
|
|
self.tableWidget_2.setHorizontalHeaderLabels(["Code","Schema", "Table"])
|
|
self.tableWidget_2.setColumnCount(3)
|
|
self.tableWidget_2.setItem(selected_row, column, cloned_item)
|
|
|
|
self.tableWidget_2.setColumnWidth(0, 50)
|
|
self.tableWidget_2.setColumnWidth(1, 300)
|
|
self.tableWidget_2.setColumnWidth(2, 300)
|
|
|
|
|
|
def item_already_exists(self, new_item_text):
|
|
# Assuming you want to compare items in the first column for uniqueness
|
|
existing_items = self.tableWidget_2.findItems(new_item_text, QtCore.Qt.MatchExactly)
|
|
|
|
# Check if there are any existing items with the same text in the first column
|
|
return len(existing_items) > 0
|
|
|
|
|
|
|
|
def limite_flux(self):
|
|
|
|
if self.tableWidget_2.rowCount() > 5:
|
|
self.QMBquestion = QMessageBox.question(iface.mainWindow(), u"Attention !",
|
|
"Le nombre de flux à charger en une seule fois est limité à 5 pour des questions de performances. Souhaitez vous tout de même charger les " + str(
|
|
self.tableWidget_2.rowCount()) + " flux sélectionnés ? (risque de plantage de QGIS)",
|
|
QMessageBox.Yes | QMessageBox.No)
|
|
if self.QMBquestion == QMessageBox.Yes:
|
|
self.chargement_flux()
|
|
|
|
if self.QMBquestion == QMessageBox.No:
|
|
print("Annulation du chargement des couches")
|
|
|
|
if self.tableWidget_2.rowCount() <= 5:
|
|
self.chargement_flux()
|
|
|
|
def chargement_flux(self):
|
|
|
|
managerAU = QgsApplication.authManager()
|
|
k = managerAU.availableAuthMethodConfigs().keys()
|
|
|
|
def REQUEST(type):
|
|
switcher = {
|
|
'WFS': "GetFeature",
|
|
'WMS': "GetMap",
|
|
'WMS+Vecteur': "GetMap",
|
|
'WMS+Raster': "GetMap",
|
|
'WMTS': "GetMap"
|
|
}
|
|
return switcher.get(type, "nothing")
|
|
|
|
|
|
def displayOnWindows(type, uri, name):
|
|
p = []
|
|
|
|
for row in range(0, self.tableWidget_2.rowCount()):
|
|
## supression de la partie de l'url après le point d'interrogation
|
|
if dbtype == sigdb:
|
|
code = self.tableWidget_2.item(row,0).text()
|
|
schema = '_'+code+'_'+self.tableWidget_2.item(row,1).text()
|
|
if code == 'travaux' or code == 'agregation':
|
|
schema = self.tableWidget_2.item(row,1).text()
|
|
table = self.tableWidget_2.item(row,2).text()#.split("?", 1)[0]
|
|
if dbtype == refdb:
|
|
# code = self.tableWidget_2.item(row,0).text()
|
|
schema = self.tableWidget_2.item(row,1).text()
|
|
table = self.tableWidget_2.item(row,2).text()#.split("?", 1)[0]
|
|
uri = QgsDataSourceUri()
|
|
uri.setConnection(host ,port ,dbtype ,user ,mdp)
|
|
|
|
# nom du schéma à remplacer: "hydrographie" à supprimer et mettre "couches_collaboratives" lorsqu'on aura regroupé les couches à modifier dans un même schéma
|
|
|
|
uri.setDataSource(schema, table, "geom")
|
|
uri.setKeyColumn('gid')
|
|
|
|
# Chargement de la couche PostGIS
|
|
geom_type ='SELECT right(st_geometrytype(geom),-3) as a FROM '+schema+'.'+table+' GROUP BY a'
|
|
cur.execute(geom_type)
|
|
list_typegeom = cur.fetchall()
|
|
print(len(list_typegeom))
|
|
if len(list_typegeom) > 1:
|
|
for typegeom in list_typegeom:
|
|
if typegeom[0] == 'MultiPolygon':
|
|
uri.setWkbType(QgsWkbTypes.MultiPolygon)
|
|
elif typegeom[0] == 'MultiLineString':
|
|
uri.setWkbType(QgsWkbTypes.MultiLineString)
|
|
elif typegeom[0] == 'Point':
|
|
uri.setWkbType(QgsWkbTypes.Point)
|
|
if typegeom[0] == 'Polygon':
|
|
uri.setWkbType(QgsWkbTypes.Polygon)
|
|
elif typegeom[0] == 'LineString':
|
|
uri.setWkbType(QgsWkbTypes.LineString)
|
|
elif typegeom[0] == 'MultiPoint':
|
|
uri.setWkbType(QgsWkbTypes.MultiPoint)
|
|
if (typegeom[0] != None and typegeom[0] != 'Polygon'):
|
|
uri.setSrid('2154')
|
|
layer = QgsVectorLayer(uri.uri(), table, "postgres")
|
|
# Ajout de la couche au canevas QGIS
|
|
QgsProject.instance().addMapLayer(layer)
|
|
else:
|
|
layer = QgsVectorLayer(uri.uri(), table, "postgres")
|
|
# Ajout de la couche au canevas QGIS
|
|
QgsProject.instance().addMapLayer(layer)
|
|
|
|
def combobox_custom(self):
|
|
if dbtype == sigdb:
|
|
self.comboBox.clear()
|
|
self.comboBox.addItem("toutes les catégories")
|
|
self.comboBox.addItem('00')
|
|
self.comboBox.addItem('01')
|
|
self.comboBox.addItem('07')
|
|
self.comboBox.addItem('26')
|
|
self.comboBox.addItem('42')
|
|
self.comboBox.addItem('69')
|
|
self.comboBox.addItem('agregation')
|
|
self.comboBox.addItem('travaux')
|
|
self.comboBox.addItem('form')
|
|
if dbtype == refdb:
|
|
custom_list=schemaname_distinct
|
|
cur.execute(custom_list)
|
|
list_schema = cur.fetchall()
|
|
self.comboBox.clear()
|
|
self.comboBox.addItem("toutes les catégories")
|
|
for baxval in list_schema:
|
|
self.comboBox.addItem(baxval[0])
|
|
def filtre_dynamique(self, filter_text):
|
|
|
|
for i in range(self.tableWidget.rowCount()):
|
|
for j in range(self.tableWidget.columnCount()):
|
|
item = self.tableWidget.item(i, j)
|
|
match = filter_text.lower() not in item.text().lower()
|
|
self.tableWidget.setRowHidden(i, match)
|
|
if not match:
|
|
break
|
|
|
|
def popup(self):
|
|
|
|
self.dialog = Popup() # +++ - self
|
|
self.dialog.text_edit.show() |