Python实现Soap客户端/服务端
服务端
使用 spyne 模块
import logging
from spyne import Application, rpc, ServiceBase, String, Integer, ComplexModel, Long
from spyne.protocol.soap import Soap11
from spyne.protocol.xml import XmlDocument
TNS = 'TNS'
_CUSTOMIZE = {"min_occurs": 1, "nullable": False}
class Int(Integer):
__type_name__ = 'int'
PARAM = (
String.customize(encoding='utf-8', **_CUSTOMIZE),
Int.customize(**_CUSTOMIZE),
Long.customize(**_CUSTOMIZE)
)
# class RequestParam(ComplexModel):
# __namespace__ = TNS
# name = String.customize(encoding='utf-8', **_CUSTOMIZE),
# age = Int.customize(**_CUSTOMIZE)
# id_crd = Long.customize(**_CUSTOMIZE)
RESPONSE = [
Integer.customize(**_CUSTOMIZE),
String.customize(encoding='utf-8', **_CUSTOMIZE)
]
class WebService(ServiceBase):
@rpc(*PARAM,
_returns=RESPONSE,
_out_message_name='return',
_out_variable_names=["code", 'msg'])
def user_info(self, name, age, id_crd):
print(f'name:{name},age:{age},id_crd:{id_crd}')
return 0, 'OK' # code, msg
def remove_response_namespace(ctx):
# 取消返回xml里面的namespace
namespace = f' xmlns:ns0="{TNS}"'
ctx.out_string[0] = ctx.out_string[0].replace(b'ns0:', b'')
ctx.out_string[0] = ctx.out_string[0].replace(namespace.encode('utf-8'), b'')
# 加入事件管理器
WebService.event_manager.add_listener('method_return_string', remove_response_namespace)
application = Application([WebService],
tns=TNS,
in_protocol=Soap11(validator='soft'),
out_protocol=XmlDocument())
if __name__ == '__main__':
from spyne.server.wsgi import WsgiApplication
from wsgiref.simple_server import make_server
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
logging.info("listening to http://0.0.0.0:80")
logging.info("wsdl is at: http://0.0.0.0:80?wsdl")
wsgi_application = WsgiApplication(application)
server = make_server('0.0.0.0', 80, wsgi_application)
server.serve_forever()
可使用SoapUI软件发送请求
客户端
使用 zeep 模块;4.0.0版本后支持异步
from xml.dom import minidom
from zeep import Client, Settings
import urllib3
from requests import Session
urllib3.disable_warnings()
session = Session()
session.verify = False
soap_settings = Settings(raw_response=True)
client = Client('http://localhost:80/?wsdl', settings=soap_settings)
client.transport.session.verify = False
def parse_response(response_content):
root = minidom.parseString(response_content)
element = root.documentElement
code_element = element.getElementsByTagName('code')
msg_element = element.getElementsByTagName('msg')
if code_element and msg_element:
code = code_element[0].firstChild.data
msg = msg_element[0].firstChild.data
return int(code), msg
return None, None
response = client.service.user_info(**{"name": 'wenxi',
"age": 25,
"id_crd": 123456789012345678
})
print(response.status_code)
content = response.content.decode()
code, msg = parse_response(content)
print(code, msg)