通常voc數據集或coco數據集的label種類能夠滿足大部分的目標檢測需求,但是對於特定場景業務的時候,就需要自定義自己的數據集,這個時候的模型,就不能直接用上文訓練好的模型了
這時候需要對模型進行遷移學習,通常遷移學習的方法有二種:
二者區別在於是否凍結全連層之外層的權重。
上文的方法就是這裡的第一種方法
上文已經得到目標檢測的模型onnx,接下來需要對模型進行測試,部署和預測來驗證模型是否能使用
使用 ONNXRuntime 測試時候可以正常加載模型
import os
import onnxruntime
def load_onnx(model_dir):
model_path = os.path.join(model_dir)
session = onnxruntime.InferenceSession(model_path)
input_names = [input.name for input in session.get_inputs()]
output_names = [output.name for output in session.get_outputs()]
return session, input_names, output_names
session, input_names, output_names = load_onnx('model.onnx')
print(input_names, output_names)
模型的部署,就是把我們的模型,做成一個Detector對象,方便後續做目標檢測的時候調用
在部署之前需要把
PaddleDetection/output_inference/yolov3_mobilenet_v3_large_270e_voc/infer_cfg.yml,放到保存onnx模型同一個文件夾下
把onnx模型名稱改成inference.onnx
構建onnx檢測推理器,文件夾如下
創建onnx_detection檢測器 detection.py
import os
import yaml
import numpy as np
import onnxruntime
from functools import reduce
from .preprocess import preprocess, Resize, Normalize, Permute, PadStride
# Global dictionary
SUPPORT_MODELS = {
'YOLO', 'SSD', 'RetinaNet', 'EfficientDet', 'RCNN', 'TTF', 'FCOS'
}
class Detector(object):
""" Args: config (object): config of model, defined by `Config(model_dir)` model_dir (str): root path of __model__, __params__ and infer_cfg.yml use_gpu (bool): whether use gpu run_mode (str): mode of running(fluid/trt_fp32/trt_fp16) threshold (float): threshold to reserve the result for output. """
def __init__(self, config, model_dir):
self.config = config
self.session, self.input_names, self.output_names = load_onnx(
model_dir)
def preprocess(self, im):
preprocess_ops = []
for op_info in self.config.preprocess_infos:
new_op_info = op_info.copy()
op_type = new_op_info.pop('type')
if op_type == 'Resize':
new_op_info['arch'] = self.config.arch
preprocess_ops.append(eval(op_type)(**new_op_info))
im, im_info = preprocess(im, preprocess_ops)
inputs = create_inputs(im, im_info, self.config.arch)
return inputs, im_info
def postprocess(self, np_boxes, im_info, threshold=0.5):
if self.config.arch in ['SSD']:
w, h = im_info['origin_shape']
np_boxes[:, 2] *= h
np_boxes[:, 3] *= w
np_boxes[:, 4] *= h
np_boxes[:, 5] *= w
expect_boxes = (np_boxes[:, 1] > threshold) & (np_boxes[:, 0] > -1)
np_boxes = np_boxes[expect_boxes, :]
return np_boxes
def predict(self, image, threshold=0.5):
''' Args: image (str/np.ndarray): path of image/ np.ndarray read by cv2 threshold (float): threshold of predicted box' score Returns: results (dict): include 'boxes': np.ndarray: shape:[N,6], N: number of box, matix element:[class, score, x_min, y_min, x_max, y_max] MaskRCNN's results include 'masks': np.ndarray: shape:[N, class_num, mask_resolution, mask_resolution] '''
inputs, im_info = self.preprocess(image)
np_boxes = self.session.run(self.output_names, inputs)[0]
results = []
if reduce(lambda x, y: x * y, np_boxes.shape) >= 6:
for result in self.postprocess(np_boxes,
im_info,
threshold=threshold):
results.append([int(result[0]), result[1]] +
[int(_) for _ in result[2:]])
return results
def create_inputs(im, im_info, model_arch='YOLO'):
"""generate input for different model type Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image model_arch (str): model type Returns: inputs (dict): input of model """
inputs = {
}
inputs['image'] = im
origin_shape = list(im_info['origin_shape'])
pad_shape = list(
im_info['pad_shape']) if im_info['pad_shape'] is not None else list(
im_info['resize_shape'])
scale_x, scale_y = im_info['scale']
if 'YOLO' in model_arch:
im_size = np.array([origin_shape]).astype('int32')
inputs['im_size'] = im_size
elif 'RetinaNet' in model_arch or 'EfficientDet' in model_arch:
scale = scale_x
im_info = np.array([pad_shape + [scale]]).astype('float32')
inputs['im_info'] = im_info
elif ('RCNN' in model_arch) or ('FCOS' in model_arch):
scale = scale_x
im_info = np.array([pad_shape + [scale]]).astype('float32')
im_shape = np.array([origin_shape + [1.]]).astype('float32')
inputs['im_info'] = im_info
inputs['im_shape'] = im_shape
elif 'TTF' in model_arch:
scale_factor = np.array([scale_x, scale_y] * 2).astype('float32')
inputs['scale_factor'] = scale_factor
return inputs
class Det_Config():
"""set config of preprocess, postprocess and visualize Args: model_dir (str): root path of model.yml """
def __init__(self, model_dir):
# parsing Yaml config for Preprocess
deploy_file = os.path.join(model_dir, 'infer_cfg.yml')
with open(deploy_file) as f:
yml_conf = yaml.safe_load(f)
self.check_model(yml_conf)
self.arch = yml_conf['arch']
self.preprocess_infos = yml_conf['Preprocess']
self.min_subgraph_size = yml_conf['min_subgraph_size']
self.labels = yml_conf['label_list']
self.print_config()
def check_model(self, yml_conf):
""" Raises: ValueError: loaded model not in supported model type """
for support_model in SUPPORT_MODELS:
if support_model in yml_conf['arch']:
return True
raise ValueError("Unsupported arch: {}, expect {}".format(
yml_conf['arch'], SUPPORT_MODELS))
def print_config(self):
print('----------- Model Configuration -----------')
print('%s: %s' % ('Model Arch', self.arch))
print('%s: ' % ('Transform Order'))
for op_info in self.preprocess_infos:
print('--%s: %s' % ('transform op', op_info['type']))
print('--------------------------------------------')
def load_onnx(model_dir):
model_path = os.path.join(model_dir, 'inference.onnx')
session = onnxruntime.InferenceSession(model_path)
input_names = [input.name for input in session.get_inputs()]
output_names = [output.name for output in session.get_outputs()]
return session, input_names, output_names
創建onnx_detection推理器 preprocess.py
import cv2
import numpy as np
from PIL import Image
# Global dictionary
RESIZE_SCALE_SET = {
'RCNN',
'RetinaNet',
'FCOS',
'SOLOv2',
}
def decode_image(im_file, im_info):
"""read rgb image Args: im_file (str/np.ndarray): path of image/ np.ndarray read by cv2 im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """
if isinstance(im_file, str):
with open(im_file, 'rb') as f:
im_read = f.read()
data = np.frombuffer(im_read, dtype='uint8')
im = cv2.imdecode(data, 1) # BGR mode, but need RGB mode
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
im_info['origin_shape'] = im.shape[:2]
im_info['resize_shape'] = im.shape[:2]
else:
# im = cv2.cvtColor(im_file, cv2.COLOR_BGR2RGB)
im = im_file
im_info['origin_shape'] = im.shape[:2]
im_info['resize_shape'] = im.shape[:2]
return im, im_info
class Resize(object):
"""resize image by target_size and max_size Args: arch (str): model type target_size (int): the target size of image max_size (int): the max size of image use_cv2 (bool): whether us cv2 image_shape (list): input shape of model interp (int): method of resize """
def __init__(self,
arch,
target_size,
max_size,
use_cv2=True,
image_shape=None,
interp=cv2.INTER_LINEAR):
self.target_size = target_size
self.max_size = max_size
self.image_shape = image_shape
self.arch = arch
self.use_cv2 = use_cv2
self.interp = interp
def __call__(self, im, im_info):
""" Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """
im_channel = im.shape[2]
im_scale_x, im_scale_y = self.generate_scale(im)
im_info['resize_shape'] = [
im_scale_x * float(im.shape[0]), im_scale_y * float(im.shape[1])
]
if self.use_cv2:
im = cv2.resize(im,
None,
None,
fx=im_scale_x,
fy=im_scale_y,
interpolation=self.interp)
else:
resize_w = int(im_scale_x * float(im.shape[1]))
resize_h = int(im_scale_y * float(im.shape[0]))
if self.max_size != 0:
raise TypeError(
'If you set max_size to cap the maximum size of image,'
'please set use_cv2 to True to resize the image.')
im = im.astype('uint8')
im = Image.fromarray(im)
im = im.resize((int(resize_w), int(resize_h)), self.interp)
im = np.array(im)
# padding im when image_shape fixed by infer_cfg.yml
if self.max_size != 0 and self.image_shape is not None:
padding_im = np.zeros((self.max_size, self.max_size, im_channel),
dtype=np.float32)
im_h, im_w = im.shape[:2]
padding_im[:im_h, :im_w, :] = im
im = padding_im
im_info['scale'] = [im_scale_x, im_scale_y]
return im, im_info
def generate_scale(self, im):
""" Args: im (np.ndarray): image (np.ndarray) Returns: im_scale_x: the resize ratio of X im_scale_y: the resize ratio of Y """
origin_shape = im.shape[:2]
im_c = im.shape[2]
if self.max_size != 0 and self.arch in RESIZE_SCALE_SET:
im_size_min = np.min(origin_shape[0:2])
im_size_max = np.max(origin_shape[0:2])
im_scale = float(self.target_size) / float(im_size_min)
if np.round(im_scale * im_size_max) > self.max_size:
im_scale = float(self.max_size) / float(im_size_max)
im_scale_x = im_scale
im_scale_y = im_scale
else:
im_scale_x = float(self.target_size) / float(origin_shape[1])
im_scale_y = float(self.target_size) / float(origin_shape[0])
return im_scale_x, im_scale_y
class Normalize(object):
"""normalize image Args: mean (list): im - mean std (list): im / std is_scale (bool): whether need im / 255 is_channel_first (bool): if True: image shape is CHW, else: HWC """
def __init__(self, mean, std, is_scale=True, is_channel_first=False):
self.mean = mean
self.std = std
self.is_scale = is_scale
self.is_channel_first = is_channel_first
def __call__(self, im, im_info):
""" Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """
im = im.astype(np.float32, copy=False)
if self.is_channel_first:
mean = np.array(self.mean)[:, np.newaxis, np.newaxis]
std = np.array(self.std)[:, np.newaxis, np.newaxis]
else:
mean = np.array(self.mean)[np.newaxis, np.newaxis, :]
std = np.array(self.std)[np.newaxis, np.newaxis, :]
if self.is_scale:
im = im / 255.0
im -= mean
im /= std
return im, im_info
class Permute(object):
"""permute image Args: to_bgr (bool): whether convert RGB to BGR channel_first (bool): whether convert HWC to CHW """
def __init__(self, to_bgr=False, channel_first=True):
self.to_bgr = to_bgr
self.channel_first = channel_first
def __call__(self, im, im_info):
""" Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """
if self.channel_first:
im = im.transpose((2, 0, 1)).copy()
if self.to_bgr:
im = im[[2, 1, 0], :, :]
return im, im_info
class PadStride(object):
""" padding image for model with FPN Args: stride (bool): model with FPN need image shape % stride == 0 """
def __init__(self, stride=0):
self.coarsest_stride = stride
def __call__(self, im, im_info):
""" Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """
coarsest_stride = self.coarsest_stride
if coarsest_stride == 0:
return im
im_c, im_h, im_w = im.shape
pad_h = int(np.ceil(float(im_h) / coarsest_stride) * coarsest_stride)
pad_w = int(np.ceil(float(im_w) / coarsest_stride) * coarsest_stride)
padding_im = np.zeros((im_c, pad_h, pad_w), dtype=np.float32)
padding_im[:, :im_h, :im_w] = im
im_info['pad_shape'] = padding_im.shape[1:]
return padding_im, im_info
def preprocess(im, preprocess_ops):
# process image by preprocess_ops
im_info = {
'scale': [1., 1.],
'origin_shape': None,
'resize_shape': None,
'pad_shape': None,
}
im, im_info = decode_image(im, im_info)
for operator in preprocess_ops:
im, im_info = operator(im, im_info)
im = np.array((im, )).astype('float32')
return im, im_info
init.py
from .detection import Det_Config
from .detection import Detector
導入檢測器與推理器部署生成onnx檢測對象
from onnx_detection import Det_Config, Detector
model_dir = 'paddle2onnx/onnx-model'
det_config = Det_Config(model_dir)
detector = Detector(det_config, model_dir)
import cv2
img_path = 'PaddleDetection/dataset/voc/JPEGImages/001.jpg'
img = cv2.imread(img_path)
img =cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
results = detector.predict(img,threshold=0.5)
def draw_results(results, img):
for result in results:
class_id, scores, x_min, y_min, x_max, y_max = result
cv2.rectangle(img, (x_min, y_min), (x_max, y_max), (255, 255, 255))
draw_results(results, img)
cv2.imwrite('save.jpg', img)
from IPython.display import display, Image
display(Image('save.jpg', format='jpg'))
到這裡我們已經獲得了目標檢測器,以及知道了檢測輸出的結果了
[class_id, scores, x_min, y_min, x_max, y_max]
創建一個檢測器類
import cv2
import numpy as np
class ObjectionDetection:
def __init__(self,detector,threshold):
self.detector=detector
self.threshold=threshold
self.classes=[]
def load_class_names(self,class_path='label.txx')
with open(class_path,'r') as f:
class_name=f.strip()
self.classes.append(class_name)
self.colors=np.random.uniform(0, 255, size=(80, 3))
return self.classes
def detect(self,img):
return self.detector.predict(img,self.threshold)
detector 就是之前上面部署得到的那個detector
目標跟蹤核心目的,判斷每一幀檢測出的物體,前後幀下檢測的物體,是否是同一物體。
這個判斷標准是通過前後二幀,檢測出的矩形框間隔距離大小來判斷的,
通過一個字典數據{},來保存對象的id,以及對象的中心點位置。
以車輛檢測為例,車輛在進入到這個檢測范圍內的時候,根據車輛進入檢測范圍的時間順序,已經為每個車輛分好了一個唯一的對象id,以及當前的中心點位置數據,如下面
tracking_objects[object_id]=pt
隨著每一幀的更新,獲取到新的中心位置pt2,通過前後幀的中心位置判斷大小,這裡打比方,如果二個中心位置差小於20,那就認為前一幀這個框的檢測的物體與後一幀檢測的物體是一樣的。將前一幀的中心位置pt2迭代賦值
tracking_objects[object_id]=pt2
創建跟蹤器object_tracking.py
import cv2
import numpy as np
from object_detection import ObjectDetection
import math
#給視頻增加一條線,通過矩形框與直線發生碰撞來判斷車輛行駛情況
line = [(0, 800), (1920, 800)]
# 車輛總數
counter = 0
# 正向車道的車輛數據
counter_up = 0
# 逆向車道的車輛數據
counter_down = 0
# 線與線的碰撞檢測:叉乘的方法判斷兩條線是否相交
# 計算叉乘符號
def ccw(A, B, C):
return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])
# 檢測AB和CD兩條直線是否相交
def intersect(A, B, C, D):
return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D)
# Initialize Object Detection
od = ObjectDetection()
cap = cv2.VideoCapture("los_angeles.mp4")
# Initialize count
count = 0
center_points_prev_frame = []
tracking_objects = {
}
track_id = 0
while True:
ret, frame = cap.read()
count += 1
if not ret:
break
# Point current frame
center_points_cur_frame = []
# Detect objects on frame
(class_ids, scores, boxes) = od.detect.predict(frame,threshold0.5)
for box in boxes:
(x, y, w, h) = box
cx = int((x + x + w) / 2)
cy = int((y + y + h) / 2)
center_points_cur_frame.append((cx, cy))
#print("FRAME N°", count, " ", x, y, w, h)
# cv2.circle(frame, (cx, cy), 5, (0, 0, 255), -1)
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# Only at the beginning we compare previous and current frame
if count <= 2:
for pt in center_points_cur_frame:
for pt2 in center_points_prev_frame:
distance = math.hypot(pt2[0] - pt[0], pt2[1] - pt[1])
if distance < 20:
tracking_objects[track_id] = pt
track_id += 1
else:
tracking_objects_copy = tracking_objects.copy()
center_points_cur_frame_copy = center_points_cur_frame.copy()
for object_id, pt2 in tracking_objects_copy.items():
object_exists = False
for pt in center_points_cur_frame_copy:
distance = math.hypot(pt2[0] - pt[0], pt2[1] - pt[1])
# Update IDs position
if distance < 20:
tracking_objects[object_id] = pt
object_exists = True
if pt in center_points_cur_frame:
center_points_cur_frame.remove(pt)
continue
# Remove IDs lost
if not object_exists:
tracking_objects.pop(object_id)
# Add new IDs found
for pt in center_points_cur_frame:
tracking_objects[track_id] = pt
track_id += 1
for object_id, pt in tracking_objects.items():
cv2.circle(frame, pt, 5, (0, 0, 255), -1)
cv2.putText(frame, str(object_id), (pt[0], pt[1] - 7), 0, 1, (0, 0, 255), 2)
for pt in center_points_cur_frame:
for pt2 in center_points_prev_frame:
i = int(0)
if intersect(pt, pt2, line[0], line[1]):
counter += 1
# 判斷行進方向
if pt2[1] > pt[0]:
counter_down += 1
else:
counter_up += 1
i += 1
print("Tracking objects")
print(tracking_objects)
print("CUR FRAME LEFT PTS")
print(center_points_cur_frame)
text_counter_up = 'vehicle leaving the area:%s' % (counter_down)
text_counter_down = 'vehicle access area:%s' % (counter_up)
cv2.line(frame, line[0], line[1], (0, 255, 0), 3)
cv2.putText(frame, str(counter), (30, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (255, 0, 0), 3)
cv2.putText(frame, text_counter_up, (130, 80), cv2.FONT_HERSHEY_DUPLEX, 1.5, (0, 255, 0), 3)
cv2.putText(frame, text_counter_down, (130, 180), cv2.FONT_HERSHEY_DUPLEX, 1.5, (0, 0, 255), 3)
cv2.imshow("Frame", frame)
# Make a copy of the points
center_points_prev_frame = center_points_cur_frame.copy()
key = cv2.waitKey(1)
if key == 27:
break
cap.release()
cv2.destroyAllWindows()