说明
按现有的几个架构部件,构建数据流。
S = Redis Stream。这个可以作为缓冲队列和简单任务队列,速度非常快,至少是万条/秒的速度。
Q = RabbitMQ。这个作为任务队列,消息也主要是元数据。读速比较慢,但有一些特性,然后自带前端,作为任务队列比较合适。
M = Mongo。这个作为数据主库还是比较合适的。具有丰富的数据操作模式,同时性能也不错。
C = ClickHouse。这个特别适合作为任务数据库。因为列式存储的特性,其吞吐性能,简单统计功能甚至逼近了程序处理的速度。例如,存储10万条数据,大约也就3秒;统计900万数据某个字段的长度,时间也不到5秒。(过去在处理上,基本上按照100万条/秒来评估默认的程序处理能力)
本次目标是搭建一个worker,可以通过参数化方式,完成两个S间的流转。除了M和C之前一般不会直接流转,那么应该有 4*3 - 2 = 10 种组件间的流转。
内容
整体的实现逻辑顺序为:
- 1 使用QManager完成S2S的动作(函数)
- 2 将函数定义为celery task
- 3 将flask-celery发布为systemd服务
1 S2S 函数
首先是QManager, 这个是对RedisAgent进行封装和集成的对象,本质上是个二传手。
QManager 集成了:
- 1 判断队列是否可以写入
- 2 并行写入
- 3 fetch和range两种方式取数
- 4 删除消息
import requests as req
class QManager:
def __init__(self , batch_size = 1000,
redis_agent_host = 'http://172.17.0.1:24021/',
redis_connection_hash =None,
q_max_len = 100000):
self.batch_size = batch_size
self.redis_agent_host = redis_agent_host
self.redis_connection_hash = redis_connection_hash
self.q_max_len = q_max_len
def auto_connect(self, db_server_name):
print('这里应该根据某个参数值,自动切换为合适的连接')
def info(self):
return req.post(self.redis_agent_host + 'info/',json = {'connection_hash':self.redis_connection_hash}).json()
# redis没有提供命令来列出streams
# def qname_list(self, stream_name = '*'):
# return req.post(self.redis_agent_host + 'info_stream/',json = {'stream_name':stream_name}).json()
# 查看队列长度
def stream_len(self, stream_name):
cur_len_resp = req.post(self.redis_agent_host + 'len_of_queue/',
json ={'stream_name':stream_name,'connection_hash':self.redis_connection_hash}).json()
return cur_len_resp['data']
# 创建队列和分组
def ensure_group(self, stream_name, group_name ='group1', start_point='0'):
return req.post(self.redis_agent_host +'ensure_group/',json ={'stream_name':stream_name,
'group_name':group_name,
'start_point':start_point}).json()
# 判断队列是否可以插入
def _is_q_available(self,stream_name):
cur_len = self.stream_len(stream_name)
if cur_len + self.batch_size >=self.q_max_len:
return False
else:
return True
# 基于并发方法,向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右
def parrallel_write_msg(self,stream_name, data_listofdict = None, time_out = 30,
is_return_msg_id_list=False):
resp_dict = req.post(self.redis_agent_host + 'batch_add_msg/',json ={'connection_hash':self.redis_connection_hash,
'stream_name':stream_name,
'msg_dict_list':data_listofdict,
'maxlen':self.q_max_len,
'is_return_msg_id_list':is_return_msg_id_list},
timeout=time_out).json()
return resp_dict
# 读取
# 批量获取数据 get
def xrange(self, stream_name, count = None):
cur_count = count or self.batch_size
recs_resp = req.post(self.redis_agent_host + 'xrange/',
json ={'connection_hash':self.redis_connection_hash,
'stream_name':stream_name,
'count':cur_count}).json()
return recs_resp
# 批量获取数据 fetch
def xfetch(self, stream_name, count = None,group_name = 'group1' , consumer_name = 'consumer1'):
cur_count = count or self.batch_size
return req.post(self.redis_agent_host + 'fetch_msg/',json = {'connection_hash':self.redis_connection_hash,
'stream_name':stream_name,
'group_name':group_name,
'consumer_name':consumer_name,
'count':cur_count}).json()
# 批量删除消息
def xdel(self,stream_name,mid_or_list =None):
if len(mid_or_list):
return req.post(self.redis_agent_host + 'del_msg/',
json ={'connection_hash':self.redis_connection_hash,
'stream_name':stream_name,
'mid_or_list':mid_or_list}).json()
@staticmethod
def extract_msg_id(some_msg_list):
return [x['_msg_id'] for x in some_msg_list]
基于此,稍微修改就可以完成S2S的任务
按照边的方式,给到left和right的参数信息。使用这些信息分别初始化left和right的QManager。最后按照配置里的约定,执行n次同步。每次执行时,都会看下目标队列是否已满,若已满则放弃写入,否则执行写入,然后删除消息。
# local
cfg = {'target_q_max_len': 10,'source_read_batch_num':1,'target_write_batch_num':1,
'source_redis_agent_host':'http://172.17.0.1:24021/','source_connection_hash':None,
'target_redis_agent_host':'http://172.17.0.1:24021/','target_connection_hash':None,
'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),
'target_stream':'.'.join(['STREAM','test','test', 'stream_out'])
}
# read
source_qm = QManager(batch_size =cfg['source_read_batch_num'],
redis_agent_host = cfg['source_redis_agent_host'],
redis_connection_hash = cfg['source_connection_hash']
)
# write
target_qm = QManager(batch_size =cfg['target_write_batch_num'],
redis_agent_host = cfg['target_redis_agent_host'],
redis_connection_hash = cfg['target_connection_hash']
)
# 确保队列的存在
if True:
source_qm.ensure_group(cfg['source_stream'])
target_qm.ensure_group(cfg['target_stream'])
'''
主逻辑:
- 1 判断目标队列是否满,如果是,那么直接退出
- 2 从源队列取数(采用xrange方法),如果没有数据,直接退出【每对stream之间,只会有一个 sniffer 】
- 3 将源队列数据写入目标队列
- 4 从源队列中删除这些数据
'''
print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))
for _ in range(cfg['max_exec_cnt']):
if target_qm._is_q_available(cfg['target_stream']):
print('target q ok')
msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])
msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']
if len(msg_list) == 0:
print('source q empty')
break
else:
# 写入目标队列
target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)
# 将写入的消息从源队列删除
to_del_msg_id_list = source_qm.extract_msg_id(msg_list)
source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)
else:
break
2 Celery Task
然后将上述功能函数写入Flask-Celery
第一部分是在 celery的修饰器下,将任务函数搬进去。然后在app下定义了任务的调用,主要是用到了delay方法,实现异步调用。
# =======================以下是正式的内容
@celery_.task
def s2s_handler(cfg_dict = None):
cfg = cfg_dict
# read
source_qm = QManager(batch_size =cfg['source_read_batch_num'],
redis_agent_host = cfg['source_redis_agent_host'],
redis_connection_hash = cfg['source_connection_hash']
)
# write
target_qm = QManager(batch_size =cfg['target_write_batch_num'],
redis_agent_host = cfg['target_redis_agent_host'],
redis_connection_hash = cfg['target_connection_hash']
)
print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))
for _ in range(cfg['max_exec_cnt']):
if target_qm._is_q_available(cfg['target_stream']):
print('target q ok')
msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])
msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']
if len(msg_list) == 0:
print('source q empty')
break
else:
# 写入目标队列
target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)
# 将写入的消息从源队列删除
to_del_msg_id_list = source_qm.extract_msg_id(msg_list)
source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)
else:
break
# 执行任务的路由 POST
@app.route("/s2s/", methods=['GET','POST'] )
def s2s():
input_data = request.get_json()
# 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果
result = s2s_handler.delay(input_data)
return result.id
调用测试,存入一万条消息(之前还有70条残留),任务执行后,source_q中的数据将会逐渐流转到target_q
# debug - 样例数据写入源队列
data_listofdict = [{'msg_id': i, 'data':'test'} for i in range(10000)]
source_qm.parrallel_write_msg(cfg['source_stream'], data_listofdict= data_listofdict)
print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))
source q len 10070
target q len 230
import requests as req
# 假设是发往本机: 注意,地址是127.0.0.1
cfg1 = {'target_q_max_len': 100000,'source_read_batch_num':1,'target_write_batch_num':1,
'source_redis_agent_host':'http://127.0.0.1:24021/','source_connection_hash':None,
'target_redis_agent_host':'http://127.0.0.1:24021/','target_connection_hash':None,
'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),
'target_stream':'.'.join(['STREAM','test','test', 'stream_out']),
'max_exec_cnt':10}
resp = req.post('http://127.0.0.1:24104/s2s/',json = cfg1 )
# 返回任务号
In [9]: resp.text
Out[9]: '177e57b7-09c5-43f0-ae1f-0cbe8e41dbf5'
# 流转了10条消息
In [10]: print('source q len ', source_qm.stream_len(cfg['source_stream']))
...: print('target q len ', target_qm.stream_len(cfg['target_stream']))
source q len 10060
target q len 240
3 Systemd Service
由于服务是在宿主机启动的,而且是基础服务,所以使用systemd配置自启动。启动命令有点小坑,可参考 一次搞定 Linux systemd 服务脚本
本次要点就在于要用forking启动【采用sh脚本启动其他进程时Type须为forking】,因为要启动flask和celery两个服务才行。
[Unit]
Description=test # 简单描述服务
After=network.target # 描述服务类别,表示本服务需要在network服务启动后在启动
Before=xxx.service # 表示需要在某些服务启动之前启动,After和Before字段只涉及启动顺序,不涉及依赖关系
[Service]
Type=forking # 设置服务的启动方式
User=USER # 设置服务运行的用户
Group=USER # 设置服务运行的用户组
WorkingDirectory=/PATH # 设置服务运行的路径(cwd)
KillMode=control-group # 定义systemd如何停止服务
Restart=no # 定义服务进程退出后,systemd的重启方式,默认是不重启
ExecStart=/start.sh # 服务启动命令,命令需要绝对路径(采用sh脚本启动其他进程时Type须为forking)
[Install]
WantedBy=multi-user.target # 多用户
然后就好了