分类 Python 下的文章

给定一个固定容量的背包和一组物品,每个物品都有一定的重量和价值,目标是在不超过背包容量的前提下,选择物品的组合,使得背包内的总价值最大化.
  1. 0-1背包问题

    • 每个物品只能选择一次
  2. 完全背包问题

    • 每个物品可以选择无限次
  3. 多重背包问题

    • 每个物品可以选择有限次

解法:动态规划(Dynamic Programming,DP)

  1. 0-1背包问题

    • 设定一个二维数组 dp[i][w],表示前 i 个物品在总重量不超过 w 的情况下可以获得的最大价值
    • 状态转移方程为:dp[i][w]=max(dp[i−1][w],dp[i−1][w−weight[i]]+value[i])
    • 边界条件dp[0][w]=0 for any w

    对于第 i 个物品,有两种选择:

    • 不选择该物品,价值等于上一个状态的最大价值 dp[i-1][w]
    • 选择该物品,价值等于上一个状态中减去该物品重量后所能获得的最大价值,加上该物品的价值 dp[i-1][w - weight[i]] + value[i]
    from itertools import combinations
    
    ARRAY = [129, 146, 40, 139, 227, 38, 139, 2, 100, 200, 55]
    TARGET = 1000
    
    
    def closest_sum_greater_than_target(arr):
        closest_sum = float('inf')
        best_combination = []
    
        for r in range(1, len(arr) + 1):
            for combination in combinations(arr, r):
                current_sum = sum(combination)
                if TARGET <= current_sum < closest_sum:
                    closest_sum = current_sum
                    best_combination = combination
        print(closest_sum, best_combination)
    
    
    if __name__ == '__main__':
        closest_sum_greater_than_target(ARRAY)
    
  2. 完全背包问题

    • 数组 dp,其中 dp[j] 表示当背包容量为 j 时,所能获得的最大价值
    • 不选择第 i 个物品:dp[j] = dp[j]
    • 选择第 i 个物品:dp[j] = dp[j - weight[i]] + value[i]
    • 状态转移方程为:dp[j]=max(dp[j],dp[j−weight[i]]+value[i])
    def complete_knapsack(weights, values, capacity):
        dp = [0] * (capacity + 1)
        
        for i in range(len(weights)):
            for j in range(weights[i], capacity + 1):
                dp[j] = max(dp[j], dp[j - weights[i]] + values[i])
        
        return dp[capacity]
    
    # 例子
    weights = [1, 3, 4]
    values = [15, 20, 30]
    capacity = 4
    print(complete_knapsack(weights, values, capacity))  # 输出:60
  3. 多重背包问题

    • 将多重背包问题转化为0-1背包问题的多个子问题.如:某个物品可以选择3次,我们可以将它“展开”成3个相同但独立的物品
    • 状态转移方程与0-1背包问题类似,但需要考虑每个物品的可用数量
    def multiple_knapsack(weights, values, quantities, capacity):
        dp = [0] * (capacity + 1)
        
        for i in range(len(weights)):
            for k in range(quantities[i]):
                for j in range(capacity, weights[i] - 1, -1):
                    dp[j] = max(dp[j], dp[j - weights[i]] + values[i])
        
        return dp[capacity]
    
    # 例子
    weights = [1, 3, 4]
    values = [15, 20, 30]
    quantities = [2, 3, 1]  # 每个物品的最大选择次数
    capacity = 7
    print(multiple_knapsack(weights, values, quantities, capacity))  # 输出:95

Python3正则使用\w却能匹配中文问题

# 正则中\w等同于[A-Za-z0-9_],但是由于Python3的re库默认使用的是Unicode字符集,导致也能匹配中文
# 通过指定匹配ASCII字符即可解决问题
import re
pattern = re.compile(r'\w+', re.ASCII) # 使用ASCII标志
# pattern = re.compile(r'\w+', re.A) # 使用ASCII标志
text = "hello_世界123"
matches = pattern.findall(text)
print(matches)

re模块的多种标志(flags)说明:

  1. re.ASCII / re.A

    • 使 \w, \W, \b, \B, \d, \D, \s, 和 \S 只匹配ASCII字符,而不是匹配Unicode字符。
  2. re.IGNORECASE / re.I

    • 使匹配对大小写不敏感。例如,[A-Z] 也会匹配小写字母,[a-z] 也会匹配大写字母。
  3. re.MULTILINE / re.M

    • 多行模式。影响 ^$ 的行为。^ 匹配字符串的开始以及每行的开始(紧跟在每个换行符之后),$ 匹配字符串的结束以及每行的结束(紧跟在每个换行符之前)。
  4. re.DOTALL / re.S

    • 使 .(点号)匹配任何字符,包括换行符。默认情况下,. 不匹配换行符。
  5. re.VERBOSE / re.X

    • 详细模式。这允许你组织正则表达式,使其更易于阅读。在该模式下,空白字符被忽略,除非在字符类中或被转义,且#可用于引入注释。
  6. re.DEBUG

    • 打印关于编译表达式的调试信息。
  7. re.UNICODE / re.U

    • 使 \w, \W, \b, \B, \d, \D, \s, 和 \S 执行Unicode匹配而不是ASCII匹配。这是Python 3中的默认行为,因此通常不需要显式设置。

也可以组合使用,例如:re.I | re.M

  1. 查看Pod镜像tag

    kubectl describe pod -n <namespace> <podname>
  1. 通过Dockerfile编写相同tag的镜像

    FROM <docker_image_name>:<docker_image_tag>
    ADD ./test.txt /opt/test.txt
    docker build . --tag=<docker_image_name>:<docker_image_tag>
  1. 通过K8s更新pod

    kubectl get pod  -n <namespace> <podname> -o yaml | kubectl replace --force -f -

服务端

使用 spyne 模块

import logging

from spyne import Application, rpc, ServiceBase, String, Integer, ComplexModel, Long
from spyne.protocol.soap import Soap11
from spyne.protocol.xml import XmlDocument

TNS = 'TNS'
_CUSTOMIZE = {"min_occurs": 1, "nullable": False}


class Int(Integer):
    __type_name__ = 'int'


PARAM = (
    String.customize(encoding='utf-8', **_CUSTOMIZE),
    Int.customize(**_CUSTOMIZE),
    Long.customize(**_CUSTOMIZE)
)

# class RequestParam(ComplexModel):
#     __namespace__ = TNS
#     name = String.customize(encoding='utf-8', **_CUSTOMIZE),
#     age = Int.customize(**_CUSTOMIZE)
#     id_crd = Long.customize(**_CUSTOMIZE)


RESPONSE = [
    Integer.customize(**_CUSTOMIZE),
    String.customize(encoding='utf-8', **_CUSTOMIZE)
]


class WebService(ServiceBase):
    @rpc(*PARAM,
         _returns=RESPONSE,
         _out_message_name='return',
         _out_variable_names=["code", 'msg'])
    def user_info(self, name, age, id_crd):
        print(f'name:{name},age:{age},id_crd:{id_crd}')
        return 0, 'OK'  # code, msg


def remove_response_namespace(ctx):
    # 取消返回xml里面的namespace
    namespace = f' xmlns:ns0="{TNS}"'
    ctx.out_string[0] = ctx.out_string[0].replace(b'ns0:', b'')
    ctx.out_string[0] = ctx.out_string[0].replace(namespace.encode('utf-8'), b'')


# 加入事件管理器
WebService.event_manager.add_listener('method_return_string', remove_response_namespace)

application = Application([WebService],
                          tns=TNS,
                          in_protocol=Soap11(validator='soft'),
                          out_protocol=XmlDocument())
if __name__ == '__main__':
    from spyne.server.wsgi import WsgiApplication
    from wsgiref.simple_server import make_server

    logging.basicConfig(level=logging.DEBUG)
    logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)

    logging.info("listening to http://0.0.0.0:80")
    logging.info("wsdl is at: http://0.0.0.0:80?wsdl")

    wsgi_application = WsgiApplication(application)
    server = make_server('0.0.0.0', 80, wsgi_application)
    server.serve_forever()

可使用SoapUI软件发送请求

客户端

使用 zeep 模块;4.0.0版本后支持异步

from xml.dom import minidom

from zeep import Client, Settings
import urllib3
from requests import Session

urllib3.disable_warnings()
session = Session()
session.verify = False
soap_settings = Settings(raw_response=True)
client = Client('http://localhost:80/?wsdl', settings=soap_settings)
client.transport.session.verify = False


def parse_response(response_content):
    root = minidom.parseString(response_content)
    element = root.documentElement
    code_element = element.getElementsByTagName('code')
    msg_element = element.getElementsByTagName('msg')
    if code_element and msg_element:
        code = code_element[0].firstChild.data
        msg = msg_element[0].firstChild.data
        return int(code), msg
    return None, None


response = client.service.user_info(**{"name": 'wenxi',
                                       "age": 25,
                                       "id_crd": 123456789012345678
                                       })
print(response.status_code)
content = response.content.decode()
code, msg = parse_response(content)
print(code, msg)

import zipfile
from io import BytesIO


def zip_file(byte_file):
    buf = BytesIO()
    with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr('result', byte_file)
    file = buf.getvalue()
    buf.close()
    return file


def unzip_file(byte_file):
    buf = BytesIO()
    buf.write(byte_file)
    zipfile_obj = zipfile.ZipFile(buf, compression=zipfile.ZIP_DEFLATED)
    file = zipfile_obj.read(zipfile_obj.namelist()[0])
    buf.close()
    zipfile_obj.close()
    return file

from Crypto.Cipher import AES

BLOCK_SIZE = AES.block_size


class AESCipher:

    def __init__(self, key, iv):
        try:
            key = bytes.fromhex(key)
        except:
            key = bytes(key, encoding='utf-8')
        iv = bytes(iv[0:16], encoding='utf-8')
        self.cipher = AES.new(key=key, mode=AES.MODE_CBC, IV=iv)

    @staticmethod
    def pad(b):
        return b + (BLOCK_SIZE - len(b) % BLOCK_SIZE) * chr(BLOCK_SIZE - len(b) % BLOCK_SIZE).encode()

    @staticmethod
    def unpad(b):
        return b[:-ord(b[len(b) - 1:])]

    def encrypt(self, text):
        text = self.pad(text)
        encrypted_text = self.cipher.encrypt(text)
        return encrypted_text

    def decrypt(self, encrypted_text):
        decrypted_text = self.cipher.decrypt(encrypted_text)
        return self.unpad(decrypted_text)

import json

import pytz
from django_celery_beat.models import PeriodicTask, PeriodicTasks, CrontabSchedule

from phone.models import PhoneInfo


def task_add(task_name, task_class, task_queue, interval=None, crontab=None, args=None, kwargs=None):
    """
    添加celery任务
    :param kwargs:  默认'{}'
    :param args:    默认'[]'
    :param task_name: 任务名
    :param task_class: 任务代码
    :param task_queue: 任务管道
    :param interval: 定时类型
    :param crontab: 定时类型
    :return: 任务id
    """
    if args is None:
        args = []
    if kwargs is None:
        kwargs = {}
    task = PeriodicTask.objects.create(name=task_name, task=task_class, interval=interval, crontab=crontab)
    task.queue = task_queue
    task.exchange = task_queue
    task.routing_key = 'default'
    task.enabled = True
    task.args = json.dumps(args)
    task.kwargs = json.dumps(kwargs)
    task.save()
    PeriodicTasks.changed(task)
    return task.id


def task_update(task_id, crontab):
    """
    修改任务
    :param task_id: 需要修改的任务的id
    :param crontab: 定时类型
    :return:
    """
    per_task = PeriodicTask.objects.get(id=task_id)
    per_task.crontab = crontab
    per_task.save()
    PeriodicTasks.changed(per_task)


def task_delete(task_id):
    """
    删除任务
    :param task_id:
    :return:
    """
    try:
        task_query = PeriodicTask.objects.get(id=task_id)
    except PeriodicTask.DoesNotExist:
        print(f'{task_id}ID 不存在')
    else:
        task_query.enabled = False
        task_query.save()
        # 删除任务
        task_query.delete()
    return


def task_list(task_name, task_queue):
    """
    任务查询
    :param task_name: 任务名称
    :param task_queue: 任务队列
    :return: task_name任务名称、task_queue任务队列、task_args任务参数、task_class任务执行类、task_cron任务定时的表达式
    """
    # 查询目前满足条件的所有周期性任务
    per_task = PeriodicTask.objects.get(name=task_name, queue=task_queue)
    data = {
        "task_name": per_task.name,
        "task_queue": per_task.queue,
        "task_kwargs": per_task.kwargs,
        "task_class": per_task.task,
        "task_cron": per_task.crontab,
    }
    return data


def queue_update(queue_name_pre, queue_name_cur):
    """
    更改任务的队列
    :param queue_name_pre: 要改的队列名称
    :param queue_name_cur: 改变后的队列名
    :return:
    """
    all_tasks = PeriodicTask.objects.filter(queue=queue_name_pre)
    all_tasks_ids = [per_task.id for per_task in all_tasks]
    for task_id in all_tasks_ids:
        task_query = PeriodicTask.objects.get(id=task_id)
        task_query.queue = queue_name_cur
        task_query.exchange = queue_name_cur
        task_query.routing_key = queue_name_cur
        task_query.save()
    PeriodicTasks.update_changed()
    all_tasks = PeriodicTask.objects.filter(queue=queue_name_cur)
    return all_tasks


def get_cron_obj(plan_time):
    """获取定时时间类型"""
    cron_obj, flag = CrontabSchedule.objects.get_or_create(minute=plan_time.minute,
                                                           hour=plan_time.hour,
                                                           day_of_week='*',
                                                           day_of_month=plan_time.day,
                                                           month_of_year=plan_time.month,
                                                           timezone=pytz.timezone('Asia/Shanghai'))
    return cron_obj


class PlanTask(object):

    def __init__(self, model, instance):
        if model == "model0":
            # task_name, task_class, task_queue, interval = None, crontab = None, args = None, kwargs = None
            self.data = {"task_name": f'model0-计划{instance.pk}',
                         "task_class": "model0.tasks.model_0",
                         }
        elif model == "model1":
            # task_name, task_class, task_queue, interval = None, crontab = None, args = None, kwargs = None
            self.data = {"task_name": f'model1-计划{instance.pk}',
                         "task_class": "model1.tasks.model_1",
                         }
        else:
            pass
        self.data.update({"task_queue": "manager", "interval": None, "crontab": self.get_cron_obj(instance.plan_time),
                          "args": [instance.pk]})
        self.instance = instance

    @staticmethod
    def get_cron_obj(plan_time):
        """获取定时时间类型"""
        cron_obj, flag = CrontabSchedule.objects.get_or_create(minute=plan_time.minute,
                                                               hour=plan_time.hour,
                                                               day_of_week='*',
                                                               day_of_month=plan_time.day,
                                                               month_of_year=plan_time.month,
                                                               timezone=pytz.timezone('Asia/Shanghai'))
        return cron_obj

    def implement_task_add(self):
        """添加Celery计划"""
        task_id = task_add(**self.data)
        self.instance.task_id = task_id
        self.instance.save(update_fields=['task_id'])

    def implement_task_update(self, validated_data):
        """更新Celery计划"""
        if self.instance.is_enable:  # 已启用
            if self.instance.task_id is None:  # 未创建celery计划任务,创建任务
                self.implement_task_add()
            if self.instance.task_id and 'plan_time' in validated_data.keys():  # 已创建celery计划任务,需变更时间
                crontab = get_cron_obj(self.instance.plan_time)
                # 更新任务时间
                task_update(self.instance.task_id, crontab)
        else:  # 未启用
            if self.instance.task_id:  # 已存在计划任务,需删除计划任务
                task_delete(self.instance.task_id)
                self.instance.task_id = None
                self.instance.save(update_fields=['task_id'])


if __name__ == '__main__':
    instance = PhoneInfo.objects.first()
    validated_data = {"plan_time": "2020-02-25T16:23:26.091538+08:00"}
    # 添加计划
    PlanTask(model='model1', instance=instance).implement_task_add()
    # 修改计划
    PlanTask(model='model1', instance=instance).implement_task_update(validated_data)
    # 删除计划
    task_delete(instance.task_id)

  1. Supervisor官网
  2. 安装pip install supervisor
  3. 配置

    • 生成默认配置echo_supervisord_conf > /etc/supervisord.conf
    • 自定义配置
    ;django-channels配置
    [fcgi-program:my_channels]
    socket = tcp://0.0.0.0:2020
    directory = /home/my_channels
    numprocs = 8
    process_name = my_channels_2020-%(process_num)d
    command = /root/.virtualenvs/my_channels/bin/daphne -u /home/my_channels/runtime/my_channels_2020-%(process_num)d.sock --fd 0 --access-log - --proxy-headers my_channels.asgi:application
    autostart = true
    autorestart = true
    stdout_logfile = /home/my_channels/logging/my_channels.log
    stdout_logfile_maxbytes = 50MB
    stdout_logfile_backups = 10
    redirect_stderr = true
    priority = 997
    user = root
    killasgroup = true
    stopasgroup = true
    startsecs = 15
    ;CeleryWorker配置
    [program:celery-worker-my_channels]
    command = /root/.virtualenvs/my_channels/bin/celery -A my_channels worker -l info -Q default,manager
    directory = /home/my_channels
    stdout_logfile = /home/my_channels/logging/celery-worker-my_channels.log
    stdout_logfile_maxbytes = 50MB
    stdout_logfile_backups = 10
    stderr_logfile = /home/my_channels/logging/celery-worker-my_channels-error.log
    stderr_logfile_maxbytes = 50MB
    stderr_logfile_backups = 10
    autostart = true
    autorestart = true
    priority = 997
    user = root
    killasgroup = true
    stopasgroup = true
    redirect_stderr = true
    startsecs = 15
    ;Celery定时任务配置
    [program:celery-beat-my_channels]
    command = /root/.virtualenvs/my_channels/bin/celery -A my_channels beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    directory = /home/my_channels
    stdout_logfile = /home/my_channels/logging/celery-beat-my_channels.log
    stdout_logfile_maxbytes = 50MB
    stdout_logfile_backups = 10
    stderr_logfile = /home/my_channels/logging/celery-beat-my_channels-error.log
    stderr_logfile_maxbytes = 50MB
    stderr_logfile_backups = 10
    autostart = true
    autorestart = true
    priority = 997
    user = root
    killasgroup = true
    stopasgroup = true
    redirect_stderr = true
    startsecs = 15

    启动命令supervisord -c /etc/supervisord.conf
    更新配置supervisorctl update

原网页

”Django is a high-level Python Web framework that encourages rapid development and clean, pragmatic design”. If you are building something that is similar to a e-commerce site, then you should probably go with Django. It will get your work done quick. You don’t have to worry about too many technology choices. It provides everything thing you need from template engine to ORM. It will be slightly opinionated about the way you structure your app, which is good If you ask me. And it has the strongest community of all the other libraries, which means easy help is available.

”Flask is a microframework for Python based on Werkzeug, Jinja 2 and good intentions”. Beware - “microframework” may be misleading. This does not mean that Flask is a half-baked library. This mean the core of Flask is very, very simple. Unlike Django, It will not make any Technology decisions for you. You are free to choose any template engine or ORM that pleases you. Even though it comes with Jinja template engine by default, you are always free to choose our own. As far as I know Flask comes in handy for writing APIs endpoints(RESTful rervices).

”Twisted is an event-driven networking engine written in python”. This is a high-performance engine. The main reason for its speed is something called as deferred. Twisted is built on top of deferred. For those of you who don’t know about deferred, it is the mechanism through with asynchronous architecture is achieved. Twisted is very fast. But is not suitable for writing conventional WebApps. If you want to do something low-level networking stuff, Twisted is your friend.

”Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling, WebSockets, and other applications that require a long-lived connection to each user”. Tornado stands some where between Django and Flask. If you want to write something with Django or Flask, but if you need a better performance, you can opt for Tornado. It can handle C10k problem very well if it is architected right.

”Cyclone is a web server framework for Python that implements the Tornado API as a Twisted protocol”. Now, what if you want something with Twisted’s performance and easy to write conventional webapps? Say hello to Cyclone. I would prefer Cyclone over Tornado. It has an API that is very similar to Tornado. As a matter of fact, this is a fork of Tornado. But the problem is it has relatively small community. Alexandre Fiori is the only main commiter to the repo.

”Pyramid is a general, open source, Python web application development framework. Its primary goal is to make it easier for a Python developer to create web applications”. I haven’t really used Pyramid, but I went through the documentation. From what I understand, Pyramid is very similar to Flask and I think you can use Pyramid wherever Flask seems appropriate and vice-versa.

  1. WebServer是监听端口,负责HTTP连接管理/数据收发/HTTP协议实现等底层上的处理.
  2. Web框架定义的是单个HTTP请求处理的流程.
  3. Nginx是反向代理服务器,是一个特殊的WebServer应用,和WebServer并不是同级的概念.Tornado既是WebServer又是Web框架,这两者并不矛盾。举例来说,你写了一个Tornado应用之后,直接把Tornado端口跑在8000,这个时候,通过localhost:8000就能访问到你的网页.这里分两步,Tornado完成了底部IO事件的监听和数据接受等工作,这是Tornado完成了其作为WebServer的使命.然后你通过按照Tornado框架定义的流程,在对应的地方写了个get函数,实现了这个页面的具体内容,这是Tornado作为Web框架体现了作用.那么nginx有什么用?它是个反向代理,反向代理顾名思义,其作用就是将接收到的HTTP请求按照一定的规则转发给后端其他服务器处理.比如在你的一台机器上跑了三个Tornado应用:foo1,foo2,foo3.端口分别为8000,8001,8003.你希望用户可以直接通过80端口来访问这些应用.这个时候你就可以用Nginx来达到这个目的.让Nginx跑在80端口,当它接收到请求时,如果是/foo1,就转发给8000端口处理;如果是/foo2,就转发给8001端口处理;foo3类似.所以,Tornado和Nginx并没有什么联系.实际上,很多框架都实现了一些简易WebServer,用于调试.Tornado的WebServer是异步的,以可以处理大量的非活跃长连接著称.

# API
http://sp0.baidu.com/8aQDcjqpAAV3otqbppnN2DJv/api.php?query=123.123.123.123&co=&resource_id=6006&oe=utf8    # 这个是百度(有些不准确, 最好用的还是去百度地图的API)
http://ip-api.com/json/123.123.123.123?lang=zh-CN
http://ip.ws.126.net/ipquery?ip=123.123.123.123
http://whois.pconline.com.cn/jsFunction.jsp?callback=jsShow&ip=123.123.123.123
import json
import re

import requests


def search_sp0_baidu_com(ip):
    """http://sp0.baidu.com/8aQDcjqpAAV3otqbppnN2DJv/api.php?query=123.123.123.123&co=&resource_id=6006&oe=utf8"""
    url = f'http://sp0.baidu.com/8aQDcjqpAAV3otqbppnN2DJv/api.php?query={ip}&co=&resource_id=6006&oe=utf8'
    try:
        response = requests.get(url, timeout=5)
        json_obj = response.json()
        # json_obj数据
        """{'status': '0', 't': '', 'set_cache_time': '', 'data': [{'location': '北京市北京市 联通', 'titlecont': 
        'IP地址查询', 'origip': '123.123.123.123', 'origipquery': '123.123.123.123', 'showlamp': '1', 'showLikeShare': 1, 
        'shareImage': 1, 'ExtendedLocation': '', 'OriginQuery': '123.123.123.123', 'tplt': 'ip', 'resourceid': 
        '6006', 'fetchkey': '123.123.123.123', 'appinfo': '', 'role_id': 0, 'disp_type': 0}]} """
        ip_address = json_obj['data'][0]["location"]
    except Exception as e:
        print(e)
        return None
    else:
        return ip_address


def search_ip_api_com(ip):
    """http://ip-api.com/json/123.123.123.123?lang=zh-CN"""
    url = f'http://ip-api.com/json/{ip}?lang=zh-CN'
    try:
        response = requests.get(url, timeout=5)
        json_obj = response.json()
        # json_obj数据
        """{'status': 'success', 'country': '中国', 'countryCode': 'CN', 'region': 'BJ', 'regionName': 
        '北京市', 'city': '朝阳', 'zip': '', 'lat': 39.9771, 'lon': 116.384, 'timezone': 'Asia/Shanghai', 'isp': 'China 
        Unicom Sichuan Province Network', 'org': '', 'as': 'AS4837 CHINA UNICOM China169 Backbone', 
        'query': '123.123.123.123'} """
        ip_address = f"{json_obj['regionName']}{json_obj['city']}"
    except Exception as e:
        print(e)
        return None
    else:
        return ip_address


def search_ip_ws_126_net(ip):
    """http://ip.ws.126.net/ipquery?ip=123.123.123.123"""
    url = f'http://ip.ws.126.net/ipquery?ip={ip}'
    try:
        response = requests.get(url, timeout=5)
        text = response.text
        # text数据
        """var lo="北京市", lc="北京市";var localAddress={city:"北京市", province:"北京市"}"""
        city = re.match(r".*city:\"([\u4e00-\u9fa5]+)\".*", text, re.S).group(1)
        province = re.match(r".*province:\"([\u4e00-\u9fa5]+)\".*", text, re.S).group(1)
        ip_address = f'{province}{city}'
    except Exception as e:
        print(e)
        return None
    else:
        return ip_address


def search_whois_pconline_com_cn(ip):
    """http://whois.pconline.com.cn/jsFunction.jsp?callback=jsShow&ip=123.123.123.123"""
    url = f'http://whois.pconline.com.cn/ipJson.jsp?ip={ip}'
    try:
        response = requests.get(url, timeout=5)
        text = response.text.encode('utf-8')
        # text数据
        """if(window.IPCallBack) {IPCallBack({"ip":"123.123.123.123","pro":"北京市","proCode":"110000",
        "city":"北京市","cityCode":"110000","region":"顺义区","regionCode":"110113","addr":"北京市顺义区 联通","regionNames":"",
        "err":""});} """
        json_str = re.match(r".*?{IPCallBack\((.*)\);}.*", text, re.S).group(1)
        json_obj = json.loads(json_str)
        ip_address = f"{json_obj['pro']} {json_obj['city']} {json_obj['region']}"
    except Exception as e:
        print(e)
        return None
    else:
        return ip_address


def search_ip_address(ip):
    ip_address = search_sp0_baidu_com(ip)
    if ip_address is None:
        ip_address = search_ip_api_com(ip)
    if ip_address is None:
        ip_address = search_ip_ws_126_net(ip)
    if ip_address is None:
        ip_address = search_whois_pconline_com_cn(ip)
    return ip_address


if __name__ == '__main__':
    print(search_ip_address('123.123.123.123'))