6-10 4 views
最初平台上要记录用户操作记录的地方也不是很多,我就放到MySQL中了,随着平台的功能越来越多,发现写关系型数据库太累了,也不便于扩展。查询效率也不高,就想到了ES
这里针对ES做了一次封装,为了更好的去使用他
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Eric Winn # @Email : eng.eric.winn@gmail.com # @Time : 19-5-22 下午8:40 # @Version : 1.0 # @File : elasticsearch # @Software : PyCharm import json from elasticsearch import Elasticsearch from common.utils import get_logger, get_signer from django.conf import settings signer = get_signer() logger = get_logger() class ESAPI(object): ''' ES类封装 ''' def __init__(self): ''' 从settings表中读取ES认证信息 ''' self.hosts = settings.ELASTICSEARCH_HOST.split(',') self.port = settings.ELASTICSEARCH_PORT self.username = settings.ELASTICSEARCH_USERNAME # 密码是加密的,使用时需要解密 self.password = signer.unsign((settings.ELASTICSEARCH_PASSWORD).encode()) self.client = None def connect(self): ''' 连接ES ''' self.client = Elasticsearch( self.hosts, http_auth=(self.username, self.password), # scheme="https", port=self.port) def search(self, index=None, doc_type='tweet', body=None, **params): ''' 查询方法,如梦查询异常,根据ES的renturn格式,返回空数据 :param index: :param doc_type: 默认tweet,在7版本中已经不需要了 :param body: :param params: :return: ''' try: ret = self.client.search(index, json.dumps(body), **params) except Exception as e: logger.error(e) return {'hits': {'hits': [], 'total': 0}} return ret def create_index(self, index): ret = self.client.indices.create(index=index, ignore=400) return ret def index(self, index, doc_type='tweet', id=None, body={}): ''' 增、改方法 :param index: :param doc_type: :param id: :param body: :return: ''' class_type = body.get('classname').split(' ')[0] # classname是用来存放自定义的index data = self.search(index='classname', body={ "query": { "bool": { "should": [ {"match": {"index": index}}, {"match": {"type": class_type}} ], "minimum_should_match": 2 } } }) if data['hits']['total'] is 0: self.client.index(index='classname', doc_type=doc_type, body={'index': index, 'classname': body.get('classname'), "type": class_type}) ret = self.client.index(index=index, doc_type=doc_type, id=id, body=json.dumps(body)) return ret def get(self, index, id=0): ret = self.client.get(index=index, id=id) return ret |
classname存在的是模块,表示这条记录是属于哪个模块,或哪一类,如下图:
日志的入口,会通过调用公网的接口查询IP来源地,也会通过META信息获取使用者的UA和操作系统,然后会异步调用write_action_utils_async方法向ES中写入日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Eric Winn # @Email : eng.eric.winn@gmail.com # @Time : 19-5-23 上午6:46 # @Version : 1.0 # @File : logs # @Software : PyCharm import re import os import json import time import datetime from celery import shared_task from common.utils import get_logger from common.elasticsearch import ESAPI from users.utils import validate_ip, get_ip_city, get_login_ip FORKS = 60 TIMEOUT = 180 logger = get_logger(__file__) CACHE_MAX_TIME = 60 * 60 * 60 PERIOD_TASK = os.environ.get("PERIOD_TASK", "on") def write_action_log(*args, **kwargs): request = kwargs.get('request') kwargs['username'] = request.user.username ip = get_login_ip(request) if not (ip and validate_ip(ip)): kwargs['ip'] = ip[:15] kwargs['city'] = "Unknown" else: kwargs['ip'] = ip kwargs['city'] = get_ip_city(ip) kwargs['user_agent'] = request.META.get('HTTP_USER_AGENT', '') kwargs['os'] = request.META.get('XDG_CURRENT_DESKTOP', '') kwargs.pop('request') kwargs['detail'] = kwargs['detail'] return write_action_utils_async.delay(*args, **kwargs) @shared_task def write_action_utils_async(*args, **kwargs): es = ESAPI() es.connect() index = kwargs.get('classname').replace(' ', '-').lower() now = datetime.datetime.now() id = int(time.mktime(now.timetuple())) kwargs['timestamp'] = int(time.mktime(now.timetuple())) kwargs['datetime'] = now.strftime("%Y-%m-%d %H:%M:%S") es.index(index=index, id=id, body=kwargs) |
在需要记录日志的地方引入就可以了,如下所示
1 2 3 4 |
data = {'api': self.request.META.get('PATH_INFO'), 'params': data, 'method': self.request.method} write_action_log(classname='OS User', type='A', request=self.request, action='{} User'.format(self.request.method.capitalize()), detail=data) |
如果想赏钱,可以用微信扫描下面的二维码,一来能刺激我写博客的欲望,二来好维护云主机的费用; 另外再次标注博客原地址 itnotebooks.com 感谢!
