程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Linux系統下海康工業相機MVS二次開發-Python

編輯:Python

文章目錄

  • Linux系統下海康工業相機MVS二次開發-Python

Linux系統下海康工業相機MVS二次開發-Python

環境:樹莓派 Ubuntu系統
編程環境:Python3.7 Node(忘了版本了,都可以,最好穩定版本)
需要安裝的模塊:Python端:cv2 websockets fastapi等;Node端:主要是ws(用來傳輸視頻流)
安裝可以看網上的文章,很多有寫,不過樹莓派這裡避坑:樹莓派進入是pi用戶,需要sudo su才能切換為root用戶,安裝Python模塊時如果在pi用戶下安裝,用root用戶運行是是沒有的,會報no module name... 錯誤。另外,安裝cv2模塊時似乎不能用pip安裝,需要用它自帶的apt-get安裝,不過不知道是不是只有我這裡才是這樣的。
以前上課做過樹莓派的實驗,很簡單,但是真正實踐時還是遇到了許多問題和困難,做了接近一周,因為樹莓派只有一個網口得和相機換來換去,記錄一下工程吧。
先整體介紹一下功能,主要是前端寫了一個頁面調用python後端相機傳送的視頻幀,並實現點擊按鈕拍照功能。
首先,需要在系統安裝MVS,這裡引用一下其他博主寫的文章,可參考:https://blog.csdn.net/cugyzy/article/details/120974745
以下是我寫的工程目錄:

寫代碼時參考了許多博主以及MVS官方安裝完成後在/opt/MVS/Samples/armhf/Python/GarbImage下的代碼以及官方的對於相機的調用方法文檔。
先將各個方法封裝在類HKCamera中,如下:

import sys
from ctypes import *
import os
import numpy as np
import time
import cv2
import random
import copy
sys.path.append("/opt/MVS/Samples/armhf/Python/MvImport") #打開MVS中的MvImport文件,對於不同系統打開的文件路徑跟隨實際文件路徑變化即可
from MvCameraControl_class import * #調用了MvCameraControl_class.py文件
class HKCamera():
def __init__(self, CameraIdx=0, log_path=None):
# enumerate all the camera devices
deviceList = self.enum_devices()
# generate a camera instance
self.camera = self.open_camera(deviceList, CameraIdx, log_path)
self.start_camera()
def __del__(self):
if self.camera is None:
return
# 停止取流
ret = self.camera.MV_CC_StopGrabbing()
if ret != 0:
raise Exception("stop grabbing fail! ret[0x%x]" % ret)
# 關閉設備
ret = self.camera.MV_CC_CloseDevice()
if ret != 0:
raise Exception("close deivce fail! ret[0x%x]" % ret)
# 銷毀句柄
ret = self.camera.MV_CC_DestroyHandle()
if ret != 0:
raise Exception("destroy handle fail! ret[0x%x]" % ret)
@staticmethod
def enum_devices(device=0, device_way=False):
""" device = 0 枚舉網口、USB口、未知設備、cameralink 設備 device = 1 枚舉GenTL設備 """
if device_way == False:
if device == 0:
cameraType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_UNKNOW_DEVICE | MV_1394_DEVICE | MV_CAMERALINK_DEVICE
deviceList = MV_CC_DEVICE_INFO_LIST()
# 枚舉設備
ret = MvCamera.MV_CC_EnumDevices(cameraType, deviceList)
if ret != 0:
raise Exception("enum devices fail! ret[0x%x]" % ret)
return deviceList
else:
pass
elif device_way == True:
pass
def open_camera(self, deviceList, CameraIdx, log_path):
# generate a camera instance
camera = MvCamera()
# 選擇設備並創建句柄
stDeviceList = cast(deviceList.pDeviceInfo[CameraIdx], POINTER(MV_CC_DEVICE_INFO)).contents
if log_path is not None:
ret = self.camera.MV_CC_SetSDKLogPath(log_path)
if ret != 0:
raise Exception("set Log path fail! ret[0x%x]" % ret)
# 創建句柄,生成日志
ret = camera.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
raise Exception("create handle fail! ret[0x%x]" % ret)
else:
# 創建句柄,不生成日志
ret = camera.MV_CC_CreateHandleWithoutLog(stDeviceList)
if ret != 0:
raise Exception("create handle fail! ret[0x%x]" % ret)
# 打開相機
ret = camera.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
raise Exception("open device fail! ret[0x%x]" % ret)
return camera
def start_camera(self):
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = self.camera.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
raise Exception("get payload size fail! ret[0x%x]" % ret)
self.nDataSize = stParam.nCurValue
self.pData = (c_ubyte * self.nDataSize)()
self.stFrameInfo = MV_FRAME_OUT_INFO_EX()
memset(byref(self.stFrameInfo), 0, sizeof(self.stFrameInfo))
self.camera.MV_CC_StartGrabbing()
def get_Value(self, param_type, node_name):
""" :param cam: 相機實例 :param_type: 獲取節點值得類型 :param node_name: 節點名 可選 int 、float 、enum 、bool 、string 型節點 :return: 節點值 """
if param_type == "int_value":
stParam = MVCC_INTVALUE_EX()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE_EX))
ret = self.camera.MV_CC_GetIntValueEx(node_name, stParam)
if ret != 0:
raise Exception("獲取 int 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stParam.nCurValue
elif param_type == "float_value":
stFloatValue = MVCC_FLOATVALUE()
memset(byref(stFloatValue), 0, sizeof(MVCC_FLOATVALUE))
ret = self.camera.MV_CC_GetFloatValue(node_name, stFloatValue)
if ret != 0:
raise Exception("獲取 float 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stFloatValue.fCurValue
elif param_type == "enum_value":
stEnumValue = MVCC_ENUMVALUE()
memset(byref(stEnumValue), 0, sizeof(MVCC_ENUMVALUE))
ret = self.camera.MV_CC_GetEnumValue(node_name, stEnumValue)
if ret != 0:
raise Exception("獲取 enum 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stEnumValue.nCurValue
elif param_type == "bool_value":
stBool = c_bool(False)
ret = self.camera.MV_CC_GetBoolValue(node_name, stBool)
if ret != 0:
raise Exception("獲取 bool 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stBool.value
elif param_type == "string_value":
stStringValue = MVCC_STRINGVALUE()
memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE))
ret = self.camera.MV_CC_GetStringValue(node_name, stStringValue)
if ret != 0:
raise Exception("獲取 string 型數據 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
return stStringValue.chCurValue
else:
return None
def set_Value(self, param_type, node_name, node_value):
""" :param cam: 相機實例 :param param_type: 需要設置的節點值得類型 int: float: enum: 參考於客戶端中該選項的 Enum Entry Value 值即可 bool: 對應 0 為關,1 為開 string: 輸入值為數字或者英文字符,不能為漢字 :param node_name: 需要設置的節點名 :param node_value: 設置給節點的值 :return: """
if param_type == "int_value":
ret = self.camera.MV_CC_SetIntValueEx(node_name, int(node_value))
if ret != 0:
raise Exception("設置 int 型數據節點 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "float_value":
ret = self.camera.MV_CC_SetFloatValue(node_name, float(node_value))
if ret != 0:
raise Exception("設置 float 型數據節點 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "enum_value":
ret = self.camera.MV_CC_SetEnumValue(node_name, node_value)
if ret != 0:
raise Exception("設置 enum 型數據節點 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "bool_value":
ret = self.camera.MV_CC_SetBoolValue(node_name, node_value)
if ret != 0:
raise Exception("設置 bool 型數據節點 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
elif param_type == "string_value":
ret = self.camera.MV_CC_SetStringValue(node_name, str(node_value))
if ret != 0:
raise Exception("設置 string 型數據節點 %s 失敗 ! 報錯碼 ret[0x%x]" % (node_name, ret))
def set_exposure_time(self, exp_time):
self.set_Value(param_type="float_value", node_name="ExposureTime", node_value=exp_time)
def get_exposure_time(self):
return self.get_Value(param_type="float_value", node_name="ExposureTime")
def get_image(self, width=None):
""" :param cam: 相機實例 :active_way:主動取流方式的不同方法 分別是(getImagebuffer)(getoneframetimeout) :return: """
ret = self.camera.MV_CC_GetOneFrameTimeout(self.pData, self.nDataSize, self.stFrameInfo, 1000)
if ret == 0:
#image = np.asarray(self.pData).reshape((self.stFrameInfo.nHeight, self.stFrameInfo.nWidth))
image = np.asarray(self.pData)
if width is not None:
image = cv2.resize(image, (width, int(self.stFrameInfo.nHeight * width / self.stFrameInfo.nWidth)))
num = random.randint(1,10)
cv2.putText(image, str(num), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)
pass
return image
else:
return None
#實時展示
def show_runtime_info(self, image):
# exp_time = self.get_exposure_time()
#cv2.putText(image, ("exposure time = %1.1fms" % (exp_time * 0.001)), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)
num = random.randint(1,10)
cv2.putText(image, str(num), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)
#拍照存儲照片
def take_picture(self,image):
#print("take picture!")
path = "/home/pi/Downloads"
if not os.path.exists(path):
os.mkdir(path)
random_str = ""
base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
length = len(base_str)-1
for i in range(8):
random_str += base_str[random.randint(0,length)]
file_path = path+"/"+random_str+".jpg"
cv2.imwrite(file_path,image)
return file_path
""" bmpsize = self.stFrameInfo.nWidth * self.stFrameInfo.nHeight * 3 + 54 bmp_buf = (c_ubyte * bmpsize)() path = "/home/pi/Downloads" if not os.path.exists(path): os.mkdir(path) random_str = "" base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" length = len(base_str)-1 for i in range(8): random_str += base_str[random.randint(0,length)] file_path = path+"/"+random_str+".bmp" print(file_path) file_open = open(file_path.encode('ascii'), 'wb+') try: img_buff = copy.deepcopy(bmp_buf) file_open.write(img_buff, ) res = True except: raise Exception("save file executed failed") finally: file_open.close() """

再寫一個Server做為後端服務,並實時傳送視頻幀:

# websocket
import sys
import cv2
import numpy as np
import asyncio
from threading import Thread
import websockets
import base64
sys.path.append("/home/pi/Desktop/HKtest/HKProject/modules") #打開MVS中的MvImport文件,對於不同系統打開的文件路徑跟隨實際文件路徑變化即可
from HKCamera import *
from fastapi import FastAPI
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
#設置允許訪問的域名
origins = ["*"] #也可以設置為"*",即為所有。
#設置跨域傳參
app.add_middleware(
CORSMiddleware,
allow_origins=origins, #設置允許的origins來源00
allow_credentials=True,
allow_methods=["*"], # 設置允許跨域的http方法,比如 get、post、put等。
allow_headers=["*"]) #允許跨域的headers,可以用來鑒別來源等作用。
#isTakePicture = False
#isDownTake = False
encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),95]
camera = HKCamera()
# 向服務器端實時發送視頻幀
async def send_msg(websocket):
#global isTakePicture,isDownTake
try:
while True:
image = camera.get_image(width=800)
if image is not None:
# camera.show_runtime_info(image)
# cv2.imshow("", image)
# 圖像編碼
result, imgencode = cv2.imencode('.jpg', image, encode_param)
data = np.array(imgencode)
img = data.tostring()
# base64編碼傳輸
img = base64.b64encode(img).decode()
await websocket.send("data:image/jpeg;base64," + img)
except Exception as e:
print(e)
# 客戶端主邏輯
async def main_logic():
async with websockets.connect('ws://127.0.0.1:3000') as websocket:
await send_msg(websocket)
#loop = asyncio.get_event_loop()
#result = loop.run_until_complete(main_logic())
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(main_logic())
#=================next is web server========================
@app.get("/server")
def takePicture():
image = camera.get_image(width=800)
if image is not None:
file_path = camera.take_picture(image)
msg = ""
if file_path != None:
msg = "Picture has been saved!"
else:
msg = "saved fail!"
return {
"msg":msg,"file_path":file_path}
if __name__ == '__main__':
new_loop = asyncio.new_event_loop()
t = Thread(target=start_thread_loop,args=(new_loop,))
t.start()
uvicorn.run(app, host="127.0.0.1", port=8000)

寫這部分代碼時還是遇到了許多問題,因為要實時傳輸視頻幀圖片,所以又要同時監聽前端發送的請求是不能同時進行的,因此需要用到多線程的知識,將視頻幀圖片傳送作為子線程,後端服務作為主線程,因此可以同時進行且互不干擾。這裡學到了一個新東西–協程,Python有一個包是asyncio,裡面可以進行多協程的開發,這裡也是遇到問題後通過這個思想受到啟發後才解決了問題,後面可以再多研究研究。

前端部分:
首先是node服務,需要開啟websocket服務並在收到視頻幀後將內容發送到各個客戶端:

let WebSocketServer = require('ws').Server,
wss = new WebSocketServer({
 port: 3000 });
wss.on('connection', function (ws) {

console.log('客戶端已連接');
// console.log(ws);
// ws.on('message', function (message) {

// wss.clients.forEach(function each(client) {

// let data = [{msgtest:"sfsdfuisdj=====",message:message}];
// let data = [{msgtest:"sfsdfuisdj====="}];
// client.send(data);
// console.log(data);
// });
// console.log(message.length);
// console.log(message); 
// });
ws.onmessage = function(evt){

wss.clients.forEach(function each(client){

client.send(evt.data);
});
console.log(evt.data.length);
}
});

之前看了一些博客是用的上面注釋的寫法,但我在運行時卻顯示不出圖片,因為頁面接收到的是二進制文件,讀不出來,改為下面這種寫法就好了,直接傳送的是base64。

頁面代碼:

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<img id="resImg" src="" />
</div>
<div id="test">
<button id="take">點擊拍照</button>
</div>
<script src="jquery.min.js"></script>
<script> let ws = new WebSocket("ws://127.0.0.1:3000/"); var image = document.getElementById("resImg"); //var divtest = document.getElementById("test"); ws.onopen = function (evt) {
 console.log("Connection open ..."); ws.send("Hello WebSockets!"); }; ws.onmessage = function (evt) {
 //console.log(evt) //console.log("==="+evt.data); //console.log("==="+JSON.stringify(evt.data)); //divtest.innerHTML=evt; image.setAttribute("src", evt.data); //$("#resImg").attr("src", evt.data); // console.log("Received Message: " + JSON.stringify(evt.data)); // ws.close(); }; ws.onclose = function (evt) {
 console.log("Connection closed."); }; let btn = document.getElementById('take'); btn.onclick = function () {
 //創建對象 const xhr = new XMLHttpRequest() //設置請求方法和url xhr.open('GET', 'http://localhost:8000/server') //發送 xhr.send() //事件綁定 處理服務端返回的結果 xhr.onreadystatechange = function () {
 if (xhr.readyState === 4) {
 //服務端返回了所有結果 if (xhr.status >= 200 && xhr.readyState <= 300) {
 //2開頭的都表示成功 //處理結果 console.log(xhr.status) //狀態碼 console.log(xhr.statusText) //狀態字符串 console.log(xhr.getAllResponseHeaders) //所有響應頭 console.log(xhr.response) //響應體 alert(xhr.response) } } } } </script>
</body>
</html>

啟動node服務命令:node websocket.js
運行python文件命令:python HKCameraServer.py

【參考】https://blog.csdn.net/weixin_42613125/article/details/121089120
【參考】https://blog.csdn.net/qq_39570716/article/details/114066097?spm=1001.2014.3001.5501
【參考】https://www.jianshu.com/p/e3933a56285f
【參考】https://blog.csdn.net/qq_23107577/article/details/113984935?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-4-113984935-blog-123371414.pc_relevant_antiscanv3&spm=1001.2101.3001.4242.3&utm_relevant_index=7
【參考】https://blog.csdn.net/zhuzheqing/article/details/109819702
【參考】https://blog.csdn.net/qq_36917144/article/details/117292871?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-2-117292871-blog-109819702.pc_relevant_downloadblacklistv1&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-2-117292871-blog-109819702.pc_relevant_downloadblacklistv1&utm_relevant_index=4


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved