程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Hikvision Industrial Camera MVS Secondary Development under Linux System-Python

編輯:Python

文章目錄

  • LinuxHikvision industrial camera under the systemMVS二次開發-Python

LinuxHikvision industrial camera under the systemMVS二次開發-Python

環境:樹莓派 Ubuntu系統
編程環境:Python3.7 Node(Forgot the version,都可以,Best stable version)
需要安裝的模塊:Python端:cv2 websockets fastapi等;Node端:主要是ws(Used to transmit video streams)
Installation can see the article on the Internet,Many have written,However, the Raspberry Pi avoids pits here:Raspberry Pi entry ispi用戶,需要sudo suto switch toroot用戶,安裝PythonModule time if presentpi用戶下安裝,用rootUser run is not,會報no module name... 錯誤.另外,安裝cv2module doesn't seem to workpip安裝,Need to use its ownapt-get安裝,But I don't know if it's just me here.
I have done experiments on Raspberry Pi in class before,很簡單,However, there are still many problems and difficulties in actual practice,Did it for almost a week,Because the Raspberry Pi only has one network port, it has to be exchanged with the camera,Document the project.
Let's first introduce the function as a whole,Mainly the front end wrote a page callpythonVideo frames delivered by the backend camera,And realize the function of clicking the button to take pictures.
首先,It needs to be installed on the systemMVS,Here are some articles written by other bloggers,可參考:https://blog.csdn.net/cugyzy/article/details/120974745
The following is the project directory I wrote:

I refer to many bloggers as well when writing the codeMVSAfter the official installation is complete/opt/MVS/Samples/armhf/Python/GarbImageThe code below and the official documentation on how to call the camera.
First encapsulate each method in a classHKCamera中,如下:

import sys
from ctypes import *
import os
import numpy as np
import time
import cv2
import random
import copy
sys.path.append("/opt/MVS/Samples/armhf/Python/MvImport") #打開MVS中的MvImport文件,The file path opened for different systems can follow the actual file path change
from MvCameraControl_class import * #調用了MvCameraControl_class.py文件
class HKCamera():
def __init__(self, CameraIdx=0, log_path=None):
# enumerate all the camera devices
deviceList = self.enum_devices()
# generate a camera instance
self.camera = self.open_camera(deviceList, CameraIdx, log_path)
self.start_camera()
def __del__(self):
if self.camera is None:
return
# 停止取流
ret = self.camera.MV_CC_StopGrabbing()
if ret != 0:
raise Exception("stop grabbing fail! ret[0x%x]" % ret)
# 關閉設備
ret = self.camera.MV_CC_CloseDevice()
if ret != 0:
raise Exception("close deivce fail! ret[0x%x]" % ret)
# 銷毀句柄
ret = self.camera.MV_CC_DestroyHandle()
if ret != 0:
raise Exception("destroy handle fail! ret[0x%x]" % ret)
@staticmethod
def enum_devices(device=0, device_way=False):
""" device = 0 enumerate network ports、USB口、未知設備、cameralink 設備 device = 1 枚舉GenTL設備 """
if device_way == False:
if device == 0:
cameraType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_UNKNOW_DEVICE | MV_1394_DEVICE | MV_CAMERALINK_DEVICE
deviceList = MV_CC_DEVICE_INFO_LIST()
# 枚舉設備
ret = MvCamera.MV_CC_EnumDevices(cameraType, deviceList)
if ret != 0:
raise Exception("enum devices fail! ret[0x%x]" % ret)
return deviceList
else:
pass
elif device_way == True:
pass
def open_camera(self, deviceList, CameraIdx, log_path):
# generate a camera instance
camera = MvCamera()
# 選擇設備並創建句柄
stDeviceList = cast(deviceList.pDeviceInfo[CameraIdx], POINTER(MV_CC_DEVICE_INFO)).contents
if log_path is not None:
ret = self.camera.MV_CC_SetSDKLogPath(log_path)
if ret != 0:
raise Exception("set Log path fail! ret[0x%x]" % ret)
# 創建句柄,生成日志
ret = camera.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
raise Exception("create handle fail! ret[0x%x]" % ret)
else:
# 創建句柄,不生成日志
ret = camera.MV_CC_CreateHandleWithoutLog(stDeviceList)
if ret != 0:
raise Exception("create handle fail! ret[0x%x]" % ret)
# 打開相機
ret = camera.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
raise Exception("open device fail! ret[0x%x]" % ret)
return camera
def start_camera(self):
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = self.camera.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
raise Exception("get payload size fail! ret[0x%x]" % ret)
self.nDataSize = stParam.nCurValue
self.pData = (c_ubyte * self.nDataSize)()
self.stFrameInfo = MV_FRAME_OUT_INFO_EX()
memset(byref(self.stFrameInfo), 0, sizeof(self.stFrameInfo))
self.camera.MV_CC_StartGrabbing()
def get_Value(self, param_type, node_name):
""" :param cam: 相機實例 :param_type: Get the node value type :param node_name: 節點名 可選 int 、float 、enum 、bool 、string 型節點 :return: 節點值 """
if param_type == "int_value":
stParam = MVCC_INTVALUE_EX()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE_EX))
ret = self.camera.MV_CC_GetIntValueEx(node_name, stParam)
if ret != 0:
raise Exception("獲取 int 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stParam.nCurValue
elif param_type == "float_value":
stFloatValue = MVCC_FLOATVALUE()
memset(byref(stFloatValue), 0, sizeof(MVCC_FLOATVALUE))
ret = self.camera.MV_CC_GetFloatValue(node_name, stFloatValue)
if ret != 0:
raise Exception("獲取 float 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stFloatValue.fCurValue
elif param_type == "enum_value":
stEnumValue = MVCC_ENUMVALUE()
memset(byref(stEnumValue), 0, sizeof(MVCC_ENUMVALUE))
ret = self.camera.MV_CC_GetEnumValue(node_name, stEnumValue)
if ret != 0:
raise Exception("獲取 enum 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stEnumValue.nCurValue
elif param_type == "bool_value":
stBool = c_bool(False)
ret = self.camera.MV_CC_GetBoolValue(node_name, stBool)
if ret != 0:
raise Exception("獲取 bool 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stBool.value
elif param_type == "string_value":
stStringValue = MVCC_STRINGVALUE()
memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE))
ret = self.camera.MV_CC_GetStringValue(node_name, stStringValue)
if ret != 0:
raise Exception("獲取 string 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stStringValue.chCurValue
else:
return None
def set_Value(self, param_type, node_name, node_value):
""" :param cam: 相機實例 :param param_type: The node value type that needs to be set int: float: enum: Refer to this option in the client Enum Entry Value 值即可 bool: 對應 0 為關,1 為開 string: Input values ​​are numbers or English characters,不能為漢字 :param node_name: Node name to be set :param node_value: The value to set to the node :return: """
if param_type == "int_value":
ret = self.camera.MV_CC_SetIntValueEx(node_name, int(node_value))
if ret != 0:
raise Exception("設置 int type data node %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "float_value":
ret = self.camera.MV_CC_SetFloatValue(node_name, float(node_value))
if ret != 0:
raise Exception("設置 float type data node %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "enum_value":
ret = self.camera.MV_CC_SetEnumValue(node_name, node_value)
if ret != 0:
raise Exception("設置 enum type data node %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "bool_value":
ret = self.camera.MV_CC_SetBoolValue(node_name, node_value)
if ret != 0:
raise Exception("設置 bool type data node %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "string_value":
ret = self.camera.MV_CC_SetStringValue(node_name, str(node_value))
if ret != 0:
raise Exception("設置 string type data node %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
def set_exposure_time(self, exp_time):
self.set_Value(param_type="float_value", node_name="ExposureTime", node_value=exp_time)
def get_exposure_time(self):
return self.get_Value(param_type="float_value", node_name="ExposureTime")
def get_image(self, width=None):
""" :param cam: 相機實例 :active_way:Different methods of active fetching 分別是(getImagebuffer)(getoneframetimeout) :return: """
ret = self.camera.MV_CC_GetOneFrameTimeout(self.pData, self.nDataSize, self.stFrameInfo, 1000)
if ret == 0:
#image = np.asarray(self.pData).reshape((self.stFrameInfo.nHeight, self.stFrameInfo.nWidth))
image = np.asarray(self.pData)
if width is not None:
image = cv2.resize(image, (width, int(self.stFrameInfo.nHeight * width / self.stFrameInfo.nWidth)))
num = random.randint(1,10)
cv2.putText(image, str(num), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)
pass
return image
else:
return None
#實時展示
def show_runtime_info(self, image):
# exp_time = self.get_exposure_time()
#cv2.putText(image, ("exposure time = %1.1fms" % (exp_time * 0.001)), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)
num = random.randint(1,10)
cv2.putText(image, str(num), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)
#Take photos to store photos
def take_picture(self,image):
#print("take picture!")
path = "/home/pi/Downloads"
if not os.path.exists(path):
os.mkdir(path)
random_str = ""
base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
length = len(base_str)-1
for i in range(8):
random_str += base_str[random.randint(0,length)]
file_path = path+"/"+random_str+".jpg"
cv2.imwrite(file_path,image)
return file_path
""" bmpsize = self.stFrameInfo.nWidth * self.stFrameInfo.nHeight * 3 + 54 bmp_buf = (c_ubyte * bmpsize)() path = "/home/pi/Downloads" if not os.path.exists(path): os.mkdir(path) random_str = "" base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" length = len(base_str)-1 for i in range(8): random_str += base_str[random.randint(0,length)] file_path = path+"/"+random_str+".bmp" print(file_path) file_open = open(file_path.encode('ascii'), 'wb+') try: img_buff = copy.deepcopy(bmp_buf) file_open.write(img_buff, ) res = True except: raise Exception("save file executed failed") finally: file_open.close() """

再寫一個Serveras a backend service,And transmit video frames in real time:

# websocket
import sys
import cv2
import numpy as np
import asyncio
from threading import Thread
import websockets
import base64
sys.path.append("/home/pi/Desktop/HKtest/HKProject/modules") #打開MVS中的MvImport文件,The file path opened for different systems can follow the actual file path change
from HKCamera import *
from fastapi import FastAPI
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
#設置允許訪問的域名
origins = ["*"] #也可以設置為"*",即為所有.
#Set up cross-domain parameter transfer
app.add_middleware(
CORSMiddleware,
allow_origins=origins, #設置允許的origins來源00
allow_credentials=True,
allow_methods=["*"], # 設置允許跨域的http方法,比如 get、post、put等.
allow_headers=["*"]) #允許跨域的headers,It can be used to identify the source, etc.
#isTakePicture = False
#isDownTake = False
encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),95]
camera = HKCamera()
# Send video frames to the server in real time
async def send_msg(websocket):
#global isTakePicture,isDownTake
try:
while True:
image = camera.get_image(width=800)
if image is not None:
# camera.show_runtime_info(image)
# cv2.imshow("", image)
# 圖像編碼
result, imgencode = cv2.imencode('.jpg', image, encode_param)
data = np.array(imgencode)
img = data.tostring()
# base64編碼傳輸
img = base64.b64encode(img).decode()
await websocket.send("data:image/jpeg;base64," + img)
except Exception as e:
print(e)
# 客戶端主邏輯
async def main_logic():
async with websockets.connect('ws://127.0.0.1:3000') as websocket:
await send_msg(websocket)
#loop = asyncio.get_event_loop()
#result = loop.run_until_complete(main_logic())
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(main_logic())
#=================next is web server========================
@app.get("/server")
def takePicture():
image = camera.get_image(width=800)
if image is not None:
file_path = camera.take_picture(image)
msg = ""
if file_path != None:
msg = "Picture has been saved!"
else:
msg = "saved fail!"
return {
"msg":msg,"file_path":file_path}
if __name__ == '__main__':
new_loop = asyncio.new_event_loop()
t = Thread(target=start_thread_loop,args=(new_loop,))
t.start()
uvicorn.run(app, host="127.0.0.1", port=8000)

There are still many problems when writing this part of the code,Because of the real-time transmission of video frame pictures,Therefore, it is impossible to monitor the requests sent by the front end at the same time,Therefore, knowledge of multithreading is required,Send video frame pictures as child threads,The backend service acts as the main thread,Therefore, they can be performed simultaneously without interfering with each other.Learned a new thing here–協程,Python有一個包是asyncio,There can be multi-coroutine development,Here is also the problem that was solved after being inspired by this idea after encountering a problem,More research can be done later.

前端部分:
首先是node服務,需要開啟websocketservice and send the content to each client after receiving the video frame:

let WebSocketServer = require('ws').Server,
wss = new WebSocketServer({
 port: 3000 });
wss.on('connection', function (ws) {

console.log('客戶端已連接');
// console.log(ws);
// ws.on('message', function (message) {

// wss.clients.forEach(function each(client) {

// let data = [{msgtest:"sfsdfuisdj=====",message:message}];
// let data = [{msgtest:"sfsdfuisdj====="}];
// client.send(data);
// console.log(data);
// });
// console.log(message.length);
// console.log(message); 
// });
ws.onmessage = function(evt){

wss.clients.forEach(function each(client){

client.send(evt.data);
});
console.log(evt.data.length);
}
});

I read some blogs before using the above commentary,But I can't display the picture when I run it,Because what the page receives is a binary file,讀不出來,Just change it to the following,Direct transmission isbase64.

頁面代碼:

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<img id="resImg" src="" />
</div>
<div id="test">
<button id="take">點擊拍照</button>
</div>
<script src="jquery.min.js"></script>
<script> let ws = new WebSocket("ws://127.0.0.1:3000/"); var image = document.getElementById("resImg"); //var divtest = document.getElementById("test"); ws.onopen = function (evt) {
 console.log("Connection open ..."); ws.send("Hello WebSockets!"); }; ws.onmessage = function (evt) {
 //console.log(evt) //console.log("==="+evt.data); //console.log("==="+JSON.stringify(evt.data)); //divtest.innerHTML=evt; image.setAttribute("src", evt.data); //$("#resImg").attr("src", evt.data); // console.log("Received Message: " + JSON.stringify(evt.data)); // ws.close(); }; ws.onclose = function (evt) {
 console.log("Connection closed."); }; let btn = document.getElementById('take'); btn.onclick = function () {
 //創建對象 const xhr = new XMLHttpRequest() //設置請求方法和url xhr.open('GET', 'http://localhost:8000/server') //發送 xhr.send() //事件綁定 處理服務端返回的結果 xhr.onreadystatechange = function () {
 if (xhr.readyState === 4) {
 //服務端返回了所有結果 if (xhr.status >= 200 && xhr.readyState <= 300) {
 //2開頭的都表示成功 //處理結果 console.log(xhr.status) //狀態碼 console.log(xhr.statusText) //狀態字符串 console.log(xhr.getAllResponseHeaders) //所有響應頭 console.log(xhr.response) //響應體 alert(xhr.response) } } } } </script>
</body>
</html>

啟動node服務命令:node websocket.js
運行python文件命令:python HKCameraServer.py

【參考】https://blog.csdn.net/weixin_42613125/article/details/121089120
【參考】https://blog.csdn.net/qq_39570716/article/details/114066097?spm=1001.2014.3001.5501
【參考】https://www.jianshu.com/p/e3933a56285f
【參考】https://blog.csdn.net/qq_23107577/article/details/113984935?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-4-113984935-blog-123371414.pc_relevant_antiscanv3&spm=1001.2101.3001.4242.3&utm_relevant_index=7
【參考】https://blog.csdn.net/zhuzheqing/article/details/109819702
【參考】https://blog.csdn.net/qq_36917144/article/details/117292871?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-2-117292871-blog-109819702.pc_relevant_downloadblacklistv1&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-2-117292871-blog-109819702.pc_relevant_downloadblacklistv1&utm_relevant_index=4


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved