服务端

使用 spyne 模块

import logging

from spyne import Application, rpc, ServiceBase, String, Integer, ComplexModel, Long
from spyne.protocol.soap import Soap11
from spyne.protocol.xml import XmlDocument

TNS = 'TNS'
_CUSTOMIZE = {"min_occurs": 1, "nullable": False}


class Int(Integer):
    __type_name__ = 'int'


PARAM = (
    String.customize(encoding='utf-8', **_CUSTOMIZE),
    Int.customize(**_CUSTOMIZE),
    Long.customize(**_CUSTOMIZE)
)

# class RequestParam(ComplexModel):
#     __namespace__ = TNS
#     name = String.customize(encoding='utf-8', **_CUSTOMIZE),
#     age = Int.customize(**_CUSTOMIZE)
#     id_crd = Long.customize(**_CUSTOMIZE)


RESPONSE = [
    Integer.customize(**_CUSTOMIZE),
    String.customize(encoding='utf-8', **_CUSTOMIZE)
]


class WebService(ServiceBase):
    @rpc(*PARAM,
         _returns=RESPONSE,
         _out_message_name='return',
         _out_variable_names=["code", 'msg'])
    def user_info(self, name, age, id_crd):
        print(f'name:{name},age:{age},id_crd:{id_crd}')
        return 0, 'OK'  # code, msg


def remove_response_namespace(ctx):
    # 取消返回xml里面的namespace
    namespace = f' xmlns:ns0="{TNS}"'
    ctx.out_string[0] = ctx.out_string[0].replace(b'ns0:', b'')
    ctx.out_string[0] = ctx.out_string[0].replace(namespace.encode('utf-8'), b'')


# 加入事件管理器
WebService.event_manager.add_listener('method_return_string', remove_response_namespace)

application = Application([WebService],
                          tns=TNS,
                          in_protocol=Soap11(validator='soft'),
                          out_protocol=XmlDocument())
if __name__ == '__main__':
    from spyne.server.wsgi import WsgiApplication
    from wsgiref.simple_server import make_server

    logging.basicConfig(level=logging.DEBUG)
    logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)

    logging.info("listening to http://0.0.0.0:80")
    logging.info("wsdl is at: http://0.0.0.0:80?wsdl")

    wsgi_application = WsgiApplication(application)
    server = make_server('0.0.0.0', 80, wsgi_application)
    server.serve_forever()

可使用SoapUI软件发送请求

客户端

使用 zeep 模块;4.0.0版本后支持异步

from xml.dom import minidom

from zeep import Client, Settings
import urllib3
from requests import Session

urllib3.disable_warnings()
session = Session()
session.verify = False
soap_settings = Settings(raw_response=True)
client = Client('http://localhost:80/?wsdl', settings=soap_settings)
client.transport.session.verify = False


def parse_response(response_content):
    root = minidom.parseString(response_content)
    element = root.documentElement
    code_element = element.getElementsByTagName('code')
    msg_element = element.getElementsByTagName('msg')
    if code_element and msg_element:
        code = code_element[0].firstChild.data
        msg = msg_element[0].firstChild.data
        return int(code), msg
    return None, None


response = client.service.user_info(**{"name": 'wenxi',
                                       "age": 25,
                                       "id_crd": 123456789012345678
                                       })
print(response.status_code)
content = response.content.decode()
code, msg = parse_response(content)
print(code, msg)