分类 Django 下的文章

开发过程中经常遇到要调试https协议的接口,用Django快速模拟一个https服务端
主要模块:django-sslserver
安装方式:pip install django-sslserver
配置:
# settings.py
SECURE_SSL_REDIRECT = False # False只响应HTTPS协议请求,Ture将http重定向至https
INSTALLED_APPS = (
        "sslserver",
                )
# 第一次启动获取certificate和key
python manage.py runsslserver
# 后续启动命令
python manage.py runsslserver --certificate development.crt --key development.key

class AESTextField(models.CharField):

    def value_to_string(self, obj):
        """序列化"""
        value = self.value_from_object(obj)
        return self.get_prep_value(value)

    def to_python(self, value):
        if value is not None:
            try:
                value = AES.new('key').decrypt(value)
            except Exception as e:
                logger.exception(f'密码解密失败:{e}')
        return value

    def from_db_value(self, value, expression, connection):
        """从数据库取出来的值"""
        if value is not None:
            try:
                value = AES.new('key').decrypt(value)
            except Exception as e:
                logger.exception(f'密码解密失败:{e}')
        return value

    def get_prep_value(self, value):
        if value is not None:
            try:
                value = AES.new('key').encrypt(value)
            except Exception as e:
                logger.exception(f'密码加密失败:{e}')
        return value

    def value_from_object(self, obj):
        return super().value_from_object(obj)

    def get_db_prep_value(self, value, connection, prepared=False):
        return super().get_db_prep_value(value, connection, prepared)

    def get_db_prep_save(self, value, connection):
        return super().get_db_prep_save(value, connection)

过滤Celery定时任务已完成和未完成的任务

from django_celery_beat.models import PeriodicTask
from django_filters import rest_framework as filters


class PeriodicTaskFilter(filters.FilterSet):
    completed = filters.BooleanFilter(method='filter_completed')

    class Meta:
        model = PeriodicTask
        fields = ['completed']

    @staticmethod
    def filter_completed(queryset, name, value):
        if value:
            return queryset.filter(last_run_at__isnull=False)
        else:
            return queryset.filter(last_run_at__isnull=True)

import json

import pytz
from django_celery_beat.models import PeriodicTask, PeriodicTasks, CrontabSchedule

from phone.models import PhoneInfo


def task_add(task_name, task_class, task_queue, interval=None, crontab=None, args=None, kwargs=None):
    """
    添加celery任务
    :param kwargs:  默认'{}'
    :param args:    默认'[]'
    :param task_name: 任务名
    :param task_class: 任务代码
    :param task_queue: 任务管道
    :param interval: 定时类型
    :param crontab: 定时类型
    :return: 任务id
    """
    if args is None:
        args = []
    if kwargs is None:
        kwargs = {}
    task = PeriodicTask.objects.create(name=task_name, task=task_class, interval=interval, crontab=crontab)
    task.queue = task_queue
    task.exchange = task_queue
    task.routing_key = 'default'
    task.enabled = True
    task.args = json.dumps(args)
    task.kwargs = json.dumps(kwargs)
    task.save()
    PeriodicTasks.changed(task)
    return task.id


def task_update(task_id, crontab):
    """
    修改任务
    :param task_id: 需要修改的任务的id
    :param crontab: 定时类型
    :return:
    """
    per_task = PeriodicTask.objects.get(id=task_id)
    per_task.crontab = crontab
    per_task.save()
    PeriodicTasks.changed(per_task)


def task_delete(task_id):
    """
    删除任务
    :param task_id:
    :return:
    """
    try:
        task_query = PeriodicTask.objects.get(id=task_id)
    except PeriodicTask.DoesNotExist:
        print(f'{task_id}ID 不存在')
    else:
        task_query.enabled = False
        task_query.save()
        # 删除任务
        task_query.delete()
    return


def task_list(task_name, task_queue):
    """
    任务查询
    :param task_name: 任务名称
    :param task_queue: 任务队列
    :return: task_name任务名称、task_queue任务队列、task_args任务参数、task_class任务执行类、task_cron任务定时的表达式
    """
    # 查询目前满足条件的所有周期性任务
    per_task = PeriodicTask.objects.get(name=task_name, queue=task_queue)
    data = {
        "task_name": per_task.name,
        "task_queue": per_task.queue,
        "task_kwargs": per_task.kwargs,
        "task_class": per_task.task,
        "task_cron": per_task.crontab,
    }
    return data


def queue_update(queue_name_pre, queue_name_cur):
    """
    更改任务的队列
    :param queue_name_pre: 要改的队列名称
    :param queue_name_cur: 改变后的队列名
    :return:
    """
    all_tasks = PeriodicTask.objects.filter(queue=queue_name_pre)
    all_tasks_ids = [per_task.id for per_task in all_tasks]
    for task_id in all_tasks_ids:
        task_query = PeriodicTask.objects.get(id=task_id)
        task_query.queue = queue_name_cur
        task_query.exchange = queue_name_cur
        task_query.routing_key = queue_name_cur
        task_query.save()
    PeriodicTasks.update_changed()
    all_tasks = PeriodicTask.objects.filter(queue=queue_name_cur)
    return all_tasks


def get_cron_obj(plan_time):
    """获取定时时间类型"""
    cron_obj, flag = CrontabSchedule.objects.get_or_create(minute=plan_time.minute,
                                                           hour=plan_time.hour,
                                                           day_of_week='*',
                                                           day_of_month=plan_time.day,
                                                           month_of_year=plan_time.month,
                                                           timezone=pytz.timezone('Asia/Shanghai'))
    return cron_obj


class PlanTask(object):

    def __init__(self, model, instance):
        if model == "model0":
            # task_name, task_class, task_queue, interval = None, crontab = None, args = None, kwargs = None
            self.data = {"task_name": f'model0-计划{instance.pk}',
                         "task_class": "model0.tasks.model_0",
                         }
        elif model == "model1":
            # task_name, task_class, task_queue, interval = None, crontab = None, args = None, kwargs = None
            self.data = {"task_name": f'model1-计划{instance.pk}',
                         "task_class": "model1.tasks.model_1",
                         }
        else:
            pass
        self.data.update({"task_queue": "manager", "interval": None, "crontab": self.get_cron_obj(instance.plan_time),
                          "args": [instance.pk]})
        self.instance = instance

    @staticmethod
    def get_cron_obj(plan_time):
        """获取定时时间类型"""
        cron_obj, flag = CrontabSchedule.objects.get_or_create(minute=plan_time.minute,
                                                               hour=plan_time.hour,
                                                               day_of_week='*',
                                                               day_of_month=plan_time.day,
                                                               month_of_year=plan_time.month,
                                                               timezone=pytz.timezone('Asia/Shanghai'))
        return cron_obj

    def implement_task_add(self):
        """添加Celery计划"""
        task_id = task_add(**self.data)
        self.instance.task_id = task_id
        self.instance.save(update_fields=['task_id'])

    def implement_task_update(self, validated_data):
        """更新Celery计划"""
        if self.instance.is_enable:  # 已启用
            if self.instance.task_id is None:  # 未创建celery计划任务,创建任务
                self.implement_task_add()
            if self.instance.task_id and 'plan_time' in validated_data.keys():  # 已创建celery计划任务,需变更时间
                crontab = get_cron_obj(self.instance.plan_time)
                # 更新任务时间
                task_update(self.instance.task_id, crontab)
        else:  # 未启用
            if self.instance.task_id:  # 已存在计划任务,需删除计划任务
                task_delete(self.instance.task_id)
                self.instance.task_id = None
                self.instance.save(update_fields=['task_id'])


if __name__ == '__main__':
    instance = PhoneInfo.objects.first()
    validated_data = {"plan_time": "2020-02-25T16:23:26.091538+08:00"}
    # 添加计划
    PlanTask(model='model1', instance=instance).implement_task_add()
    # 修改计划
    PlanTask(model='model1', instance=instance).implement_task_update(validated_data)
    # 删除计划
    task_delete(instance.task_id)

pypi地址

CKEDITOR_IMAGE_BACKEND = "pillow" # 图片
# 上传文件夹
CKEDITOR_UPLOAD_PATH = 'upload/'
UPLOAD_DIR_NAME = 'upload'
UPLOAD_DIR = os.path.join(BASE_DIR, UPLOAD_DIR_NAME)  # 路径
if not os.path.exists(UPLOAD_DIR):
    os.mkdir(UPLOAD_DIR)

CKEDITOR_CONFIGS = {
    'default': {
        'language': 'zh-cn',
        'image_previewText': ' ',  # 替换上传图片时显示区域出现的字符串
        'tabSpaces': 8,
        'toolbar': (
            ['Source', 'Save', 'NewPage', 'Preview', 'Templates'],
            ['Cut', 'Copy', 'Paste', 'PasteText', 'PasteFromWord', 'Print', 'SpellChecker', 'Scayt'],
            ['Undo', 'Redo', 'Find', 'Replace', 'SelectAll', 'RemoveFormat'],
            ['Form', 'Checkbox', 'Radio', 'TextField', 'Textarea', 'Select', 'Button', 'ImageButton', 'HiddenField'],
            ['Bold', 'Italic', 'Underline', 'Strike', 'Subscript', 'Superscript'],
            ['NumberedList', 'BulletedList', 'Outdent', 'Indent', 'Blockquote'],
            ['JustifyLeft', 'JustifyCenter', 'JustifyRight', 'JustifyBlock'],
            ['Link', 'Unlink', 'Anchor'],
            ['Image', 'Flash', 'Table', 'HorizontalRule', 'Smiley', 'SpecialChar', 'PageBreak'],
            ['Styles', 'Format', 'Font', 'FontSize'],
            ['TextColor', 'BGColor', 'lineheight'],
            ['Maximize', 'ShowBlocks', 'About', 'pbckcode'],
        ),
        'extraPlugins': ','.join(['lineheight'])
    }
}

给插件添加行距

Ckeditor的文件夹修改其配置文件config.js并保存为utf-8格式

/**
 * @license Copyright (c) 2003-2020, CKSource - Frederico Knabben. All rights reserved.
 * For licensing, see https://ckeditor.com/legal/ckeditor-oss-license
 */

CKEDITOR.editorConfig = function( config ) {
    // Define changes to default configuration here. For example:
    // config.language = 'fr';
    // config.uiColor = '#AADC6E';
    config.extraPlugins += (config.extraPlugins ? ',lineheight' : 'lineheight');    //添加行距插件
    config.font_names='宋体/宋体;仿宋/仿宋;微软雅黑/微软雅黑;楷体/楷体;黑体/黑体;'+ config.font_names;    //字体添加
};

plugins文件夹下解压放入行距插件文件夹