题目如下:
spam系统,假设我们可以获得线上的实时请求(按时间顺序)
每个请求包含如下信息:
时间(unix时间戳)
用户名
动作(提问、回答、评论)
内容
依次考虑如何解决以下问题:
1.当发现动作频率较高的用户时,输出报警(1分钟内超过10次评论、回答、提问)
2.当发现一个用户连续发重复的内容时,输出报警(连续发了10个相同的评论、回答、提问)
3.使用你觉得最优美的方法将上面的策略与程序分离开。使得上面的策略可以随时更新而不用修改程序。
要求:
服务监听一个端口,使用测试程序模拟用户行为不断向服务发送请求,
请求格式为json,如:
{"time":1323333"user":"xxx","action":"answer","content":"abcde"}
服务输出报警格式如下
xxx,"频繁提问"
一下是我的代码:
client.py
import socket
import json
import time
import gevent
import random
def generateContents():
"""
随机生成内容
"""
chars = [chr(c) for c in range(ord('a'), ord('z')+1)]
result = [''.join(random.sample(chars, 5)) for i in range(3)]
return result
class Worker():
def __init__(self, name, socket):
self.name = name
self.socket = socket
self.actions = ['question', 'answer', 'comment']
self.contents = generateContents()
def __call__(self):
while 1:
data = self._generateData()
data = json.dumps(data)
self._request(data)
def _request(self, data):
s = self.socket.socket()
socket = self.socket.gethostname()
port = 1234
s.connect((socket, port))
s.recv(1024)
s.send(data)
s.close()
gevent.sleep(random.randint(1, 3) * 0.5)
def _generateData(self):
data = {}
data['time'] = time.time()
data['user'] = self.name
data['action'] = random.choice(self.actions)
data['content'] = random.choice(self.contents)
return data
if __name__ == '__main__':
threads = [gevent.spawn(Worker(chr(i), socket))
for i in range(ord('a'), ord('f'))]
gevent.joinall(threads)
server.py
import socket
import json
from functools import wraps
from MyExamine import MyExamine
def run():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 1234
s.bind((host, port))
s.listen(10)
while 1:
c, addr = s.accept()
c.send('This is a simple server'.encode('utf-8'))
rec = c.recv(1024)
do_something(rec)
c.close()
def format2Json(func):
def f(*args):
data = json.loads(args[0])
func(data)
return f
@format2Json
@MyExamine
def do_something(data):
"""
模拟后台处理用户发来的请求
"""
print data
if __name__ == '__main__':
run()
MyExamine.py
from baseExamine import BaseExamine
from collections import deque
class MyExamine(BaseExamine):
def __init__(self, func):
self.rateCache = {}
self.timesCache = {}
self.func = func
def examineRate(self, data):
userQueue = self.rateCache.setdefault(data['user'], deque())
while userQueue:
item = userQueue.pop()
if data['time'] - item['time'] < 60:
userQueue.append(item)
break
userQueue.appendleft(data)
if len(userQueue) > 10:
self.showWarning('User: %s 频繁操作'.decode('utf8') % data['user'])
def examineContentTimes(self, data):
userActions = self.timesCache.setdefault(data['user'], {})
contentTimes = self.timesCache.setdefault(data['action'], {})
key = data['content']
contentTimes[key] = contentTimes.get(key, 0) + 1
if contentTimes[key] > 10:
self.showWarning('User: %s %s %s 超过10次'.decode('utf8') %
(data['user'], data['action'], key))
def showWarning(self, msg):
print '\033[;31m' + msg + '\033[0m'
def __call__(self, *args):
data = args[0]
self.examineRate(data)
self.examineContentTimes(data)
self.func(*args)
baseExamine.py
class BaseExamine():
"""
模板方法的基类
"""
def __init__(self, func):
pass
def __call__(self, *args):
pass
求大神指点,谢谢!