基于KCF和MobileNet V2以及KalmanFilter的摄像头监测系统

简介

这是一次作业。Tracking这一块落后Detection很多年了,一般认为Detection做好了,那么只要能够做的足够快,就能达到Tracking的效果了,实则不然,现在最快的我认为就是一些可以在手机等arm下使用的轻量神经网络了,但是其牺牲了准确性,依然达不到追踪的效果,因为你无法将多次识别的Object视为统一对象画出运动轨迹。Tracking与Detection的根本区别在于Tracking可以很快的识别,因为基本上只需要识别一次,然后调用跟踪算法对目标进行跟进就可以了,而跟踪算法只需要在目标所在位置附近进行搜‘索判断是否存在目标就可以了,不用像Detection那样整张图遍历来寻找目标。而本篇也是使用的是轻量神经网络MobileNet和MIL以及KCF追踪算法,两者结合使用,达到了一定的追踪效果。

由于比赛需要,我改了一改,做成了robomaster的追踪程序。。

[Tracking] KCF + KalmanFilter目标跟踪-LMLPHP

难点&解决

多个人的追踪

对于同时能够追踪多目标,我的想法是写一个Person类,实例化出不同的person对象,对个person对象只完成自己对象本身的追踪工作,不会干扰到其他对象的追踪。在对象走出视频流区域一段时间(可以作为参数设置)后将对象销毁。对于初始化问题,可以采用MobieNet的训练结果进行对象参数的初始化,当视频流中没有目标时只运行MobileNet的识别即可。识别必须每隔一段时间进行一次,将那些识别出的人与我们已经实例化出的人进行距离比较,相当于一个匹配,已有的保留,没有的再继续进行实例化,然后追踪。

人从各个方向运动的追踪

由于人可能从各个方向进入视频流,因此对分类器的要求会比较高,所以我们需要训练大量的实地场景的各种姿态进入视频流的图片,当然,由于宿舍空间有限,我没办法做出人从监视器各个角度各个方向进出的训练集。因此这个问题其实有待解决,但是我觉得可以通过丰富训练集来解决(废话)。

人遮挡状态下的追踪

对于遮挡状态下的追踪,我打算这么解决,遮挡首先分为短时间遮挡和长时间遮挡,对于短时间遮挡我们可以采用消失计时的方法,设置一个阈值,在消失阈值范围内输出原框,或者原来有速度进行一个预测,但是预测肯定会出问题,因为预测是按照前一帧的速度来预测的,因此预测的框会一直按照速度方向平移,所以速度应该在预测的时候逐渐减小,这样才能避免一直有速度的预测。还有就是可以通过卡尔曼滤波来做,这个我有做的打算,正在研究他的论文。这个问题属于Long-Term-Tracking问题,现有的方法有的是采用分块识别,就是分别识别人的某一部分,然后把识别到的结果合起来以及其他方法,具体还在看。

光照条件变化时候的追踪

对于光照变化的时候的追踪,我觉得这就完全可以交给我们的神经网络,神经网络提取的特征是可以保证在多尺度和各种光照条件下实现较高准确度的分类的,因此,在光照较暗和光照较强的条件下我们的神经网络都可以取得比较好的效果,因此是可以完成识别的任务的。

KCF & KalmanFilter

KCF

KCF算法是核相关滤波的简称,利用循环移位进行稠密采样,FFT快速变换进行分类器的训练,同时结合了多通道的HOG特征,大致的流程是,先利用循环矩阵不断对图像移位,得到多个样本,在第t帧中的当前位置附近利用这些样本训练一个分类器,这个分类器可以对框中是否有人做出一个概率响应,因此当我们来到下一帧的时候呢,先用循环矩阵对前一帧的区域进行循环移位得到若干样本,然后用前一帧训练的分类器分类得到输出响应,以响应最大的作为预测位置,然后再训练,再预测。这个算法的推导我会专门写一篇博客。

Kalman波波

状态方程:[Tracking] KCF + KalmanFilter目标跟踪-LMLPHP

测量方程:[Tracking] KCF + KalmanFilter目标跟踪-LMLPHP

xk是状态向量,zk是测量向量,Ak是状态转移矩阵,uk是控制向量,Bk是控制矩阵,wk是系统误差(噪声),Hk是测量矩阵,vk是测量误差(噪声)。wk和vk都是高斯噪声,即

[Tracking] KCF + KalmanFilter目标跟踪-LMLPHP

实际应用的推导过程如下:

[Tracking] KCF + KalmanFilter目标跟踪-LMLPHP
[Tracking] KCF + KalmanFilter目标跟踪-LMLPHP

使用

关于使用KCF,我是写了个类,这样可以做多目标的跟踪,不然就只能单目标啦。而且加入了kalmanfilter来预测并且修正观测值。

class Person:
    def __init__(self, bg, bbox,delta_time = 0.2 ,acc = 2):
        self._zs = 0
        self._bbox = bbox
        self._tracker = cv.TrackerKCF_create()
        self._center = (int(bbox[0]+bbox[2]/2), int(bbox[1]+bbox[3]/2))
        self._mask = np.zeros(bg.shape, dtype = np.uint8)
        self._shape = bg.shape
        self._no_time = 0
        self._tracker.init(bg,bbox)
        self._frame = bg
        self._predicted = None
        self.kalman = cv.KalmanFilter(4,2,0)# 状态空间4D 分别是x y vx vy,测量空间2D 分别是 x y
        self.kalman.transitionMatrix = np.array([[1,0,delta_time,0],[0,1,0,delta_time],[0,0,1,0],[0,0,0,1]],dtype = np.float32)
        self.kalman.measurementMatrix = np.array([[1,0,0,0],[0,1,0,0]],dtype = np.float32)
        self.kalman.statePre = np.array([[self._center[0]],[self._center[1]],[0],[0]],dtype = np.float32)
        self.kalman.statePost = np.array([[self._center[0]],[self._center[1]],[0],[0]],dtype = np.float32)
        self.kalman.processNoiseCov = acc * np.array([[0.25*delta_time**4,0,0.5*delta_time**3,0],[0,0.25*delta_time**4,0,0.5*delta_time**3],[0.5*delta_time**3,0,delta_time**2,0],[0,0.5*delta_time**3,0,delta_time**2]],dtype = np.float32)
    def update(self,new_bbox,center):
        self._bbox = new_bbox
        self._center = center
    def precess(self,src):
        self._zs = self._zs + 1
        h,w = self._shape[:2]
        frame = copy.copy(src)
        padding = 5 # padding
        ret, bbox = self._tracker.update(frame) # bbox: x y w h
        p1,p2 = (int(bbox[0]),int(bbox[1])),(int(bbox[0])+int(bbox[2]),int(bbox[1])+int(bbox[3]))
        center = (int((p1[0]+p2[0])/2),int((p1[1]+p2[1])/2))
        global person_count
        if self._no_time == 20:
            self._no_time = 0
            self._mask = np.zeros(self._shape,dtype=np.uint8)
            self._frame = src
            return (False,src)
        if ret and p1[0]>=padding and p1[1]<= (w-padding):#and int(bbox[0])>=padding and int(bbox[0] + bbox[2])<= (w-padding) #and int(bbox[1])>=padding and int(bbox[1] + bbox[3])<=(h-padding)
            self._no_time = 0
            s = np.array([[np.float32(center[0])],[np.float32(center[1])]])
            self.kalman.correct(s)
            center = self.kalman.predict().astype(np.int)
            #print(center[0],center[1])
            center = (center[0,0],center[1,0])
            cv.line(self._mask,self._center,center,(255,255,0),2)
            mmask = cv.cvtColor(self._mask.astype(np.uint8),cv.COLOR_BGR2GRAY)
            mmask = cv.bitwise_not(mmask)
            self._frame = cv.add(frame, self._mask, mask = mmask)
            self.update(bbox,center)
            #self._predicted = [self._bbox[i]+self._speed[i] if i<2 else self._bbox[i] for i in range(4)]
            #predict_1,predict_2 = (int(self._predicted[0]),int(self._predicted[1])),(int(self._predicted[0])+int(self._predicted[2]),int(self._predicted[1])+int(self._predicted[3]))
            #cv.rectangle(self._frame,predict_1,predict_2,(0,255,255),2,1) # 画预测框
            #cv.putText(self._frame,"predicted",predict_1,cv.FONT_HERSHEY_SIMPLEX,0.5,(0,255,255),2)
            cv.rectangle(self._frame, p1, p2, (255, 0, 0), 2, 1) # 画识别框
            cv.putText(self._frame,"recognized",p2,cv.FONT_HERSHEY_SIMPLEX,0.5,(255,0,0),2)
            #cv.waitKey(10)
            return (True,self._frame)
        else:
            ret,bbox = recg_car(frame)
            if ret:
                p1,p2 = (int(bbox[0]),int(bbox[1])),(int(bbox[0])+int(bbox[2]),int(bbox[1])+int(bbox[3]))
                center = (int((p1[0]+p2[0])/2),int((p1[1]+p2[1])/2))
                s = np.array([[np.float32(center[0])],[np.float32(center[1])]])
                self.kalman.correct(s)
                center = self.kalman.predict().astype(np.int)
                center = (center[0,0],center[1,0])
                cv.line(self._mask,self._center,center,(255,255,0),2)
                mmask = cv.cvtColor(self._mask.astype(np.uint8),cv.COLOR_BGR2GRAY)
                mmask = cv.bitwise_not(mmask)
                self._frame = cv.add(frame, self._mask, mask = mmask)
                self.update(bbox,center)
                cv.rectangle(self._frame, p1, p2, (255, 0, 0), 2, 1) # 画识别框
                cv.putText(self._frame,"recognized",p2,cv.FONT_HERSHEY_SIMPLEX,0.5,(255,0,0),2)
                return (True,self._frame)
            else:
                self._no_time = self._no_time + 1
                mmask = cv.cvtColor(self._mask.astype(np.uint8),cv.COLOR_BGR2GRAY)
                mmask = cv.bitwise_not(mmask)
                self._frame = cv.add(frame, self._mask, mask = mmask)
                return (True,self._frame)

使用MobileNet V2模型进行训练和预测

关于训练过程我就不一一介绍了,之前的博客也有写到怎么做,直接贴代码(完整的)。

import cv2 as cv
import sys
import numpy as np
import os
import copy
import tensorflow as tf
sys.path.append("..")
from utils import label_map_util
from utils import visualization_utils as vis_util
DEBUG = False # 表示不是调试模式
THRE_VAL = 0.4 # 这里设置的是置信度的阈值,如果大于这个阈值就在图像里面把他给框出来
# ['BOOSTING', 'MIL', 'KCF', 'TLD', 'MEDIANFLOW', 'GOTURN']
#tracker = cv.TrackerKCF_create()
#tracker = cv.TrackerMIL_create()
PATH_TO_CKPT ='/home/xueaoru/trace/car/frozen_inference_graph.pb' #网络结构配置文件
PATH_TO_LABELS = '/home/xueaoru/trace/car/label_map.pbtxt' # 标签映射关系配置文件
NUM_CLASSES = 2 # 分类数目
label_map = label_map_util.load_labelmap(PATH_TO_LABELS) # 调用函数加载labelmap,相当于把文本转换成了json文件
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
# 上面一句是把每个labelmap格式的数据转换为dict类型的数据,每隔id对应一个输出的name
category_index = label_map_util.create_category_index(categories) # 得到每个id,也就是key
detection_graph = tf.Graph() #加载默认图
with detection_graph.as_default():
    od_graph_def = tf.GraphDef()
    with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:# 加载网络模型
        serialized_graph = fid.read()
        od_graph_def.ParseFromString(serialized_graph)
        tf.import_graph_def(od_graph_def, name='')

    sess = tf.Session(graph=detection_graph)# 运行开启session
image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
detection_boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
detection_scores = detection_graph.get_tensor_by_name('detection_scores:0')
detection_classes = detection_graph.get_tensor_by_name('detection_classes:0')
num_detections = detection_graph.get_tensor_by_name('num_detections:0')
cap = cv.VideoCapture("/home/xueaoru/下载/red1.mp4")
#cap = cv.VideoCapture(1)
person_exist = False
class Person:
    def __init__(self, bg, bbox,delta_time = 0.2 ,acc = 2):
        self._zs = 0
        self._bbox = bbox
        self._tracker = cv.TrackerKCF_create()
        self._center = (int(bbox[0]+bbox[2]/2), int(bbox[1]+bbox[3]/2))
        self._mask = np.zeros(bg.shape, dtype = np.uint8)
        self._shape = bg.shape
        self._no_time = 0
        self._tracker.init(bg,bbox)
        self._frame = bg
        self._predicted = None
        self.kalman = cv.KalmanFilter(4,2,0)# 状态空间4D 分别是x y vx vy,测量空间2D 分别是 x y
        self.kalman.transitionMatrix = np.array([[1,0,delta_time,0],[0,1,0,delta_time],[0,0,1,0],[0,0,0,1]],dtype = np.float32)
        self.kalman.measurementMatrix = np.array([[1,0,0,0],[0,1,0,0]],dtype = np.float32)
        self.kalman.statePre = np.array([[self._center[0]],[self._center[1]],[0],[0]],dtype = np.float32)
        self.kalman.statePost = np.array([[self._center[0]],[self._center[1]],[0],[0]],dtype = np.float32)
        self.kalman.processNoiseCov = acc * np.array([[0.25*delta_time**4,0,0.5*delta_time**3,0],[0,0.25*delta_time**4,0,0.5*delta_time**3],[0.5*delta_time**3,0,delta_time**2,0],[0,0.5*delta_time**3,0,delta_time**2]],dtype = np.float32)
    def update(self,new_bbox,center):
        self._bbox = new_bbox
        self._center = center
    def precess(self,src):
        self._zs = self._zs + 1
        h,w = self._shape[:2]
        frame = copy.copy(src)
        padding = 5 # padding
        ret, bbox = self._tracker.update(frame) # bbox: x y w h
        p1,p2 = (int(bbox[0]),int(bbox[1])),(int(bbox[0])+int(bbox[2]),int(bbox[1])+int(bbox[3]))
        center = (int((p1[0]+p2[0])/2),int((p1[1]+p2[1])/2))
        global person_count
        if self._no_time == 20:
            self._no_time = 0
            self._mask = np.zeros(self._shape,dtype=np.uint8)
            self._frame = src
            return (False,src)
        if ret and p1[0]>=padding and p1[1]<= (w-padding):#and int(bbox[0])>=padding and int(bbox[0] + bbox[2])<= (w-padding) #and int(bbox[1])>=padding and int(bbox[1] + bbox[3])<=(h-padding)
            self._no_time = 0
            s = np.array([[np.float32(center[0])],[np.float32(center[1])]])
            self.kalman.correct(s)
            center = self.kalman.predict().astype(np.int)
            #print(center[0],center[1])
            center = (center[0,0],center[1,0])
            cv.line(self._mask,self._center,center,(255,255,0),2)
            mmask = cv.cvtColor(self._mask.astype(np.uint8),cv.COLOR_BGR2GRAY)
            mmask = cv.bitwise_not(mmask)
            self._frame = cv.add(frame, self._mask, mask = mmask)
            self.update(bbox,center)
            #self._predicted = [self._bbox[i]+self._speed[i] if i<2 else self._bbox[i] for i in range(4)]
            #predict_1,predict_2 = (int(self._predicted[0]),int(self._predicted[1])),(int(self._predicted[0])+int(self._predicted[2]),int(self._predicted[1])+int(self._predicted[3]))
            #cv.rectangle(self._frame,predict_1,predict_2,(0,255,255),2,1) # 画预测框
            #cv.putText(self._frame,"predicted",predict_1,cv.FONT_HERSHEY_SIMPLEX,0.5,(0,255,255),2)
            cv.rectangle(self._frame, p1, p2, (255, 0, 0), 2, 1) # 画识别框
            cv.putText(self._frame,"recognized",p2,cv.FONT_HERSHEY_SIMPLEX,0.5,(255,0,0),2)
            #cv.waitKey(10)
            return (True,self._frame)
        else:
            ret,bbox = recg_car(frame)
            if ret:
                p1,p2 = (int(bbox[0]),int(bbox[1])),(int(bbox[0])+int(bbox[2]),int(bbox[1])+int(bbox[3]))
                center = (int((p1[0]+p2[0])/2),int((p1[1]+p2[1])/2))
                s = np.array([[np.float32(center[0])],[np.float32(center[1])]])
                self.kalman.correct(s)
                center = self.kalman.predict().astype(np.int)
                center = (center[0,0],center[1,0])
                cv.line(self._mask,self._center,center,(255,255,0),2)
                mmask = cv.cvtColor(self._mask.astype(np.uint8),cv.COLOR_BGR2GRAY)
                mmask = cv.bitwise_not(mmask)
                self._frame = cv.add(frame, self._mask, mask = mmask)
                self.update(bbox,center)
                cv.rectangle(self._frame, p1, p2, (255, 0, 0), 2, 1) # 画识别框
                cv.putText(self._frame,"recognized",p2,cv.FONT_HERSHEY_SIMPLEX,0.5,(255,0,0),2)
                return (True,self._frame)
            else:
                self._no_time = self._no_time + 1
                mmask = cv.cvtColor(self._mask.astype(np.uint8),cv.COLOR_BGR2GRAY)
                mmask = cv.bitwise_not(mmask)
                self._frame = cv.add(frame, self._mask, mask = mmask)
                return (True,self._frame)

def recg_car(frame):
    image_expanded = np.expand_dims(frame, axis=0)
    (boxes, scores, classes, num) = sess.run(
    [detection_boxes, detection_scores, detection_classes, num_detections],
    feed_dict={image_tensor: image_expanded})

    score = np.squeeze(scores)
    max_index = np.argmax(score)
    score = score[max_index]
    # print(score)
    if score > THRE_VAL:
        box = np.squeeze(boxes)[max_index]#(ymin,xmin,ymax,xmax)
        h,w,_ = frame.shape
        min_point = (int(box[1]*w),int(box[0]*h))
        max_point = (int(box[3]*w),int(box[2]*h))
        bbox = (min_point[0], min_point[1], max_point[0]-min_point[0], max_point[1] - min_point[1])
        return True,bbox
    else:
        return False,None
ret, frame = cap.read()
if not ret:
    print("err")
    sys.exit()
ret,bbox = recg_car(frame)
person = Person(frame,bbox)
while True:
    ret,frame = cap.read()
    time = cv.getTickCount()
    if not ret:
        break

    person_exist,frame = person.precess(frame)
    cv.imshow("frame",frame)
    time = cv.getTickCount() - time
    print("处理时间:"+str(time*1000/cv.getTickFrequency())+"ms")
    key = cv.waitKey(1) & 0xff
    if key ==27:
        break
cap.release()
cv.destroyAllWindows()

05-19 03:57