最近項目需求,基於geoserver開發了一套服務發布相關api,CSDN記錄撸碼生活。
不廢話上代碼:
1、通過PG的jdbc連接參數,生成geoserver rest創建數據存儲需要的xml
def gen_conn_xml(self,jdbc_url):
'''
通過PG的jdbc連接參數,生成geoserver rest創建數據存儲需要的xml
'''
#jdbc:postgresql://192.168.10.28:5432/rmbs?user=a&password=b
_host = jdbc_url.split('//')[1].split(':')[0]
_port = jdbc_url.split('//')[1].split(':')[1].split('/')[0]
_database = jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[0]
res = {}
for kv in jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[1].split('&'):
res[kv.split('=')[0]]= kv.split('=')[1]
#print(host,port,db,res)
_user = res['user'] if 'user' in res else ''
_password = res['password'] if 'password' in res else ''
_data = [
'<connectionParameters>',
'<host>'+_host+'</host>',
'<port>'+_port+'</port>',
'<database>'+_database+'</database>',
'<user>'+_user+'</user>',
'<passwd>'+_password+'</passwd>',
'<dbtype>postgis</dbtype>',
'</connectionParameters>'
]
return ''.join(_data)
2、獲取所有工作區,返回json數據
def get_workspaces(self):
"""
獲取所有工作區,返回json數據
"""
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
3、 獲取指定工作區,返回json數據
def get_workspace(self,ws_name):
"""
獲取指定工作區,返回json數據
"""
_curl = self.gen_url('workspaces/%s' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
4、 創建指定工作區,返回true/false
def create_workspace(self,ws_name):
"""
創建指定工作區,返回true/false
"""
_data = '<workspace><name>' + ws_name + '</name></workspace>'
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ws_name:
return True
else:
print(_res)
return False
5、根據工作區名稱,獲取數據存儲
def get_datastores(self,ws_name):
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
6、創建新的數據存儲
def create_datastore(self,ws_name,ds_name,jdbc_url):
_data = ''.join([
'<dataStore>',
'<name>'+ds_name+'</name>',
self.gen_conn_xml(jdbc_url),
'</dataStore>'])
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ds_name:
return True
else:
print(_res)
return False
7、根據工作區獲取服務圖層
def get_layers(self,ws_name):
_curl = self.gen_url('workspaces/%s/layers' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
8、獲取指定的工作區和服務名稱的圖層
def get_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such layer' in _res:
print(_res)
return None
9、獲取已發布的服務
def get_featureType(self,ws_name,ds_name,featureTypeName):
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json',
'Accept':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such feature type' in _res:
print(_res)
return None
10、發布圖層服務
def publish_layer(self,ws_name,ds_name,layerName):
_data = ''.join([
'<featureType>',
'<name>'+layerName+'</name>',
'<nativeName>'+layerName+'</nativeName>',
'</featureType>'])
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes' % (ws_name,ds_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False
11、更新服務樣式
def update_layer_style(self,ws_name,layerName,style_name):
_data = ''.join([
'<layer>',
'<defaultStyle>',
'<name>' + style_name + '</name>',
'</defaultStyle>',
'<styles>',
'<style><name>polygon</name></style>',
'<style><name>' + style_name + '</name></style>',
'</styles>',
'</layer>'])
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/xml'
}
_res = self.put(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False
12、刪除圖層
def delete_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
13、刪除服務
def delete_featureType(self,ws_name,ds_name,featureTypeName):
'''取消發布'''
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res) == 0:
return True
else:
print('delete_featureType->',_res)
return None
14、按名稱獲取樣式
def get_style(self, style_name, workspace=None):
try:
url = "{}/styles/{}.json".format(self.rest_root, style_name)
if workspace is not None:
url = "{}/workspaces/{}/styles/{}.json".format(self.rest_root, workspace, style_name)
headers = {
"Content-type": "application/json",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
}
r = requests.get(url, auth=self.auth, headers=headers)
if r.ok and int(r.status_code) == 200:
return True
else:
return False
except Exception as e:
return False
15、獲取所有樣式
def get_styles(self, workspace=None):
try:
url = "{}/styles.json".format(self.rest_root)
if workspace is not None:
url = "{}/workspaces/{}/styles.json".format(self.rest_root, workspace)
r = requests.get(url, auth=self.auth)
return r.json()
except Exception as e:
return "get_styles error: {}".format(e)
16、添加一個新樣式
def add_new_style(self, path, name=None, workspace=None, sld_version="1.1.0"):
if name is None:
name = os.path.basename(path)
f = name.split(".")
name = f[0] if len(f) > 0 else name
headers = {"content-type": "text/xml"}
url = "{}/workspaces/{}/styles".format(self.rest_root, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
if sld_version == "1.1.0" or sld_version == "1.1":
sld_content_type = "application/vnd.ogc.se+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
# workspace = "default"
url = "{}/styles".format(self.rest_root)
style_xml = "<style><name>{0}</name><filename>{0}.sld</filename></style>".format(name)
try:
r = requests.post(url,data=style_xml,auth=self.auth,headers=headers)
with open(path, "rb") as f:
r_sld = requests.put(url + "/" + name,data=f.read(),auth=self.auth,headers=header_sld)
if r_sld.status_code not in [200, 201]:
return r.ok, r.status_code, r.content
return r_sld.ok, r_sld.status_code
except Exception as e:
return False,str(e)
17、按名稱刪除樣式
def delete_style(self, style_name, workspace = None):
try:
payload = {"recurse": "true"}
url = "{}/workspaces/{}/styles/{}".format(self.rest_root, workspace, style_name)
if workspace is None:
url = "{}/styles/{}".format(self.rest_root, style_name)
r = requests.delete(url, auth=self.auth, params=payload)
if r.status_code == 200:
return True
else:
raise False
except Exception as e:
return False
代碼匯總:
調用示例:
# -*-coding:utf-8-*-
import sys,os
_root_path = os.path.abspath(os.path.join(os.path.dirname(__file__),".."))
print(_root_path)
sys.path.append(_root_path)
import time
import json
import requests
#定義全局配置信息
from Cfg import Cfg
cfg = Cfg()
class GeoServerUtil():
def __init__(self,root=None,auth=None) -> None:
self.rest_root = root if root != None else cfg.geoserver_root + '/geoserver/rest'
self.auth = auth if auth != None else cfg.geoserver_auth # ('admin','geoserver')
def gen_url(self,_api=''):
return '%s/%s' % (self.rest_root,_api)
def gen_conn_xml(self,jdbc_url):
'''
通過PG的jdbc連接參數,生成geoserver rest創建數據存儲需要的xml
'''
#jdbc:postgresql://192.168.10.28:5432/rmbs?user=a&password=b
_host = jdbc_url.split('//')[1].split(':')[0]
_port = jdbc_url.split('//')[1].split(':')[1].split('/')[0]
_database = jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[0]
res = {}
for kv in jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[1].split('&'):
res[kv.split('=')[0]]= kv.split('=')[1]
#print(host,port,db,res)
_user = res['user'] if 'user' in res else ''
_password = res['password'] if 'password' in res else ''
_data = [
'<connectionParameters>',
'<host>'+_host+'</host>',
'<port>'+_port+'</port>',
'<database>'+_database+'</database>',
'<user>'+_user+'</user>',
'<passwd>'+_password+'</passwd>',
'<dbtype>postgis</dbtype>',
'</connectionParameters>'
]
return ''.join(_data)
def get(self,_url,_headers):
print(_url)
_res = requests.get(_url,auth=self.auth,headers=_headers)
#print(_res.text)
return _res.text
def post(self,_url,_headers,_data):
print(_url)
print(_data)
_res = requests.post(_url,auth=self.auth,headers=_headers,data=_data)
#print(_res.text)
return _res.text
def put(self,_url,_headers,_data):
print(_url)
print(_data)
_res = requests.put(_url,auth=self.auth,headers=_headers,data=_data)
#print(_res.text)
return _res.text
def delete(self,_url,_headers,_data):
print(_url)
print(_data)
_res = requests.delete(_url,auth=self.auth,headers=_headers,data=_data)
#print(_res.text)
return _res.text
def get_workspaces(self):
"""
獲取所有工作區,返回json數據
"""
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def get_workspace(self,ws_name):
"""
獲取指定工作區,返回json數據
"""
_curl = self.gen_url('workspaces/%s' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def create_workspace(self,ws_name):
"""
創建指定工作區,返回true/false
"""
_data = '<workspace><name>' + ws_name + '</name></workspace>'
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ws_name:
return True
else:
print(_res)
return False
def get_datastores(self,ws_name):
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def get_store(self,ws_name,ds_name):
_curl = self.gen_url('workspaces/%s/datastores/%s' % (ws_name,ds_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def create_datastore(self,ws_name,ds_name,jdbc_url):
_data = ''.join([
'<dataStore>',
'<name>'+ds_name+'</name>',
self.gen_conn_xml(jdbc_url),
'</dataStore>'])
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ds_name:
return True
else:
print(_res)
return False
def get_layers(self,ws_name):
_curl = self.gen_url('workspaces/%s/layers' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def get_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such layer' in _res:
print(_res)
return None
def get_featureType(self,ws_name,ds_name,featureTypeName):
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json',
'Accept':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such feature type' in _res:
print(_res)
return None
def publish_layer(self,ws_name,ds_name,layerName):
_data = ''.join([
'<featureType>',
'<name>'+layerName+'</name>',
'<nativeName>'+layerName+'</nativeName>',
'</featureType>'])
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes' % (ws_name,ds_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False
def update_layer_style(self,ws_name,layerName,style_name):
_data = ''.join([
'<layer>',
'<defaultStyle>',
'<name>' + style_name + '</name>',
'</defaultStyle>',
'<styles>',
'<style><name>polygon</name></style>',
'<style><name>' + style_name + '</name></style>',
'</styles>',
'</layer>'])
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/xml'
}
_res = self.put(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False
def delete_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def delete_featureType(self,ws_name,ds_name,featureTypeName):
'''取消發布'''
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res) == 0:
return True
else:
print('delete_featureType->',_res)
return None
# 按名稱獲取樣式
def get_style(self, style_name, workspace=None):
try:
url = "{}/styles/{}.json".format(self.rest_root, style_name)
if workspace is not None:
url = "{}/workspaces/{}/styles/{}.json".format(self.rest_root, workspace, style_name)
headers = {
"Content-type": "application/json",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
}
r = requests.get(url, auth=self.auth, headers=headers)
if r.ok and int(r.status_code) == 200:
return True
else:
return False
except Exception as e:
return False
def get_styles(self, workspace=None):
try:
url = "{}/styles.json".format(self.rest_root)
if workspace is not None:
url = "{}/workspaces/{}/styles.json".format(self.rest_root, workspace)
r = requests.get(url, auth=self.auth)
return r.json()
except Exception as e:
return "get_styles error: {}".format(e)
# 添加一個新樣式
def add_new_style(self, path, name=None, workspace=None, sld_version="1.1.0"):
if name is None:
name = os.path.basename(path)
f = name.split(".")
name = f[0] if len(f) > 0 else name
headers = {"content-type": "text/xml"}
url = "{}/workspaces/{}/styles".format(self.rest_root, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
if sld_version == "1.1.0" or sld_version == "1.1":
sld_content_type = "application/vnd.ogc.se+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
# workspace = "default"
url = "{}/styles".format(self.rest_root)
style_xml = "<style><name>{0}</name><filename>{0}.sld</filename></style>".format(name)
try:
r = requests.post(url,data=style_xml,auth=self.auth,headers=headers)
with open(path, "rb") as f:
r_sld = requests.put(url + "/" + name,data=f.read(),auth=self.auth,headers=header_sld)
if r_sld.status_code not in [200, 201]:
return r.ok, r.status_code, r.content
return r_sld.ok, r_sld.status_code
except Exception as e:
return False,str(e)
# 按名稱刪除樣式
def delete_style(self, style_name, workspace = None):
try:
payload = {"recurse": "true"}
url = "{}/workspaces/{}/styles/{}".format(self.rest_root, workspace, style_name)
if workspace is None:
url = "{}/styles/{}".format(self.rest_root, style_name)
r = requests.delete(url, auth=self.auth, params=payload)
if r.status_code == 200:
return True
else:
raise False
except Exception as e:
return False
class Pg2GeoServer_publishmap():
def get_pg_cfg(self,jdbc_url):
#jdbc:postgresql://192.168.10.28:5432/rmbs?user=a&password=b
_host = jdbc_url.split('//')[1].split(':')[0]
_port = jdbc_url.split('//')[1].split(':')[1].split('/')[0]
_database = jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[0]
res = {}
for kv in jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[1].split('&'):
res[kv.split('=')[0]]= kv.split('=')[1]
#print(host,port,db,res)
res['host']=_host
res['port']=_port
res['database']=_database
return res
def publish(self,rid,ws_name,ds_name,layer_name,layer_name2,style_name,group_name):
"""
發布服務,當layer_name2為空時使用layer_name進行發布\n
layer_name用於方案配置時確定數據源
"""
print('正在進行服務發布...')
cfg.WriteLog(rid,"正在進行服務發布...%s" % (''))
url_1 = layer_name.split('|')[0]
dbtable_1 = layer_name.split('|')[1]
schema_1 = 'public'
if layer_name2!=None and len(layer_name2)>0:
dbtable_1 = layer_name2
#如果圖層帶有架構,則去掉架構名
if '.' in dbtable_1:
schema_1 = dbtable_1.split('.')[0]
dbtable_1 = dbtable_1.split('.')[1]
print(url_1,dbtable_1)
#print('連接到GeoServer...')
#cfg.WriteLog(rid,"連接到GeoServer...%s" % (''))
cat = GeoServerUtil() #Catalog(geoserver_rest, username = user, password = passwd)
ws = cat.get_workspace (ws_name)
if ws is None:
print('創建工作區...')
cfg.WriteLog(rid,"創建工作區...%s" % (ws_name))
ws = cat.create_workspace(ws_name)
ds = cat.get_store (ws_name, ds_name)
if ds is None:
print('創建數據存儲...')
cfg.WriteLog(rid,"創建數據存儲...%s" % (ds_name))
ds = cat.create_datastore(ws_name,ds_name,url_1)
#pg_cfg = self.get_pg_cfg(url_1)
#ds.connection_parameters.update (host=pg_cfg['host'], port=pg_cfg['port'], database=pg_cfg['database'], user=pg_cfg['user'], passwd=pg_cfg['password'], dbtype='postgis', schema=schema_1)
#cat.save(ds)
b = False
layer = cat.get_featureType(ws_name,ds_name,dbtable_1)
while(not layer is None):
print('圖層服務已經存在,准備刪除...')
cfg.WriteLog(rid,"圖層服務已經存在,准備刪除...%s" % (dbtable_1))
k = cat.delete_featureType(ws_name,ds_name,dbtable_1)
print('k->',k)
k = cat.delete_layer(ws_name,dbtable_1)
print('k->',k)
time.sleep(1)
layer = cat.get_featureType(ws_name,ds_name,dbtable_1)
print("===",layer)
if layer is None:
#print('獲取圖層坐標系...')
#cfg.WriteLog(rid,"獲取圖層坐標系...%s" % (''))
#res_json = PostgresqlUtilClass(url_1).get_geometry_columns(dbtable_1)
#print(res_json)
print('發布服務...')
cfg.WriteLog(rid,"發布服務...%s" % (dbtable_1))
#_srs = 'EPSG:%s' % (res_json['srid'])
#ft = cat.publish_featuretype(dbtable_1, ds, _srs, srs=_srs)
b = cat.publish_layer(ws_name,ds_name,dbtable_1)
if style_name != '':
print("更新服務樣式...")
cfg.WriteLog(rid,"更新服務樣式...%s" % (style_name))
b = cat.update_layer_style(ws_name,dbtable_1,style_name)
print("服務發布完畢!%s" % (''))
cfg.WriteLog(rid,"服務發布完畢!%s" % (''))
return b
def autoPush_tif(self,geoserverWorkeName, fileName):
geoserver = GeoServerUtil()
geocat = Catalog(geoserver.rest_root,username=geoserver.auth[0], password=geoserver.auth[-1])
namespace = geoserverWorkeName
# 要發布的柵格數據
train_path = fileName
ws = geocat.get_workspace(geoserverWorkeName)
if ws is None and geoserverWorkeName:
geocat.create_workspace(geoserverWorkeName,'http://{0}'.format(geoserverWorkeName))
store_name = train_path.replace('\\','/').split('/')[-1].split('.')[0]
geostore = geocat.create_coveragestore3(store_name,train_path,namespace)
print(store_name)
print(geostore)
print(1)
if __name__ == '__main__':
print('Begin...')
b = Pg2GeoServer_publishmap().autoPush_tif('tifserver', r'file:D:\Data\idata-gisserver\SGis-Srv\tmp_dir\8407dc28-86b7-4adb-24ac-a434c03423af\sample_downFiles\JUU2JTk1JUIwJUU2JThEJUFFJUU2JTk2JTg3JUU0JUJCJUI2\c2FtcGxl\img_05.tif')
b = Pg2GeoServer_publishmap().autoPush_tif('ws_sgis1234','D:/data/tif/sfdem.tif')
'''k = GeoServerUtil().get_featureType('sde','rmbs','pmml_35272_110_35273_0')
print(k)'''
print('Ok!')
if b == False:
exit(1)