欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 锐评 > 【基于国产RK3588-NPU的yolov5的AI智能盒子】

【基于国产RK3588-NPU的yolov5的AI智能盒子】

2026/5/2 17:01:41 来源:https://blog.csdn.net/lixiang987654321/article/details/146127357  浏览:    关键词:【基于国产RK3588-NPU的yolov5的AI智能盒子】

基于国产RK3588-NPU的yolov5的AI智能盒子

  • 背景
  • 识别效果
  • 区别
  • Python版本目标识别实现
  • cmake(c/c++)版本实现

背景

前面写了一篇关于基YOLOV5实现的AI智能盒子的实现方案,这篇文章着重讲了如何在NVIDIA-英伟达芯片上如何实现目标识别的过程(可能已经被官方屏蔽了)。但是因为中美芯片限制问题,很多朋友联系到我,跟我提了是否可以基于国产芯片来迁移yolov5框架平台?国产芯片NPU识别效果是否比GPU更强更便宜?
所以,本人抽空买了最主流几个国产芯片的盒子,然后再国产芯片上实现了yolov5的模型到国产芯片NPU模型的迁移,同时也发现国产的芯片确实有如下几大好处:
(1)国产芯片价格更便宜,与nvidia采购的5000多元芯片(一张图片识别大约34ms)相比,采购的RK3588-NPU芯片价格才1000元不到(900多)且性能旗鼓相当或更好(一张图片识别大约30ms)。
(2)国产芯片没有采购限制,也可以用于国内的任意项目,特别是政府项目,用AI的地方较多(与几个朋友接触下来才发现几乎所有项目都与政府项目相关),政府特别要求必须是国产芯片,否则无法使用。
(3)国产芯片设备更稳定。前期采购了3个英伟达的盒子,持续跑了大半年,有一个芯片已经坏了,国产芯片怎么折腾它都不容易损坏芯片。
(4)国产芯片兼顾了python接口和C开发接口,对于保密性要求较高的人来说,使用C接口开发,编译出可执行文件,相比python而言,源码保密性更强,无法反编译,避免劳动成果被窃取。

识别效果

先给大家看看基于RK3588芯片识别的效果如下:

国产NPU-rk3588识别效果

国产NPU-rk3588识别效果

区别

国产芯片与英伟达芯片两者实现的区别:

  • 基于GPU的yolov5版本,既可以使用CPU跑(测试项目),也可以使用GPU跑(正式项目)
  • 基于国产NPU的版本,即可跑CPU,也可以跑NPU,但不能跑GPU
  • 基于GPU的硬件基本是国外掌握,基于NPU的是国产针对神经网络算法的升级优化版,学习效率更高
  • 国产芯片的价格更低,效率更高
  • 国产NPU芯片使用的模型各大厂家不太一样,提供接口不一样,GPU的基本使用cuda统计接口库

正因为国产芯片NPU各个厂家实现接口不一,所以我这里就选了一款最主流应用最广的芯片:RK3588芯片(瑞芯微的),它使用的是rknn模型,GPU使用的python的pt模型,所以在使用过程中需要将yolov5的pt模型转为rknn模型!

Python版本目标识别实现

瑞芯微帮我们提供了跑rknn模型的接口底层python接口,所以基于python的实现最终落在了如何将算法应用框架在python上实现而已,难点有如下几点

  • 熟悉瑞芯微python版本的详细接口文档,并应用于实战项目
  • 业务层封装:包括设备管理、取流、实时分析、场景算法叠加、报警比对存储、录像控制、实时上报、本地缓存及清理等等
  • 提供web端用户接口,管理模型、设备、场景、报警、系统配置等功能
  • 目标识别基础上实现目标跟踪(跟多场景使用,特别是摘要分析、数人头–相同人不计数)

python版本实现源码(右上角)
在这里插入图片描述

基于python版本的瑞芯微文档学习和接口使用,这里就不在详细描述了,这里重点介绍一下几个要点实现:
(1)盒子的封装Facade模式
主函数不用太复杂,其实就是一个盒子实体的启动和停止,这里雷同Facade模式用法一样统领全局即可

from util.log import LogRecord
from util.display import DisplayLogRecord.init()
from service.box import Boxif __name__ == '__main__':# create Ai boxbox = Box()# init Ai boxbox.init()# start Ai boxbox.start()# show imageDisplay.show()# wait box exitbox.join()# release Ai boxbox.release()pass

(2)盒子的初始化
盒子在启动分析计算前,有必要初始化所有内部配置,包括数据库连接池创建、消息缓存、接口启动、数据清理预备等等。

    # init ai boxdef init(self):# init databaseresult = DbPool.create(Global.dbIp, Global.dbPort, Global.dbUser, Global.dbPwd, Global.dbName)if not result:logger.error(f'start box failed [create database failed].')returnDbPool.init(3)DataBase.config_mapper.init()# init bufferself.buffer = MessageBuffer()self.buffer.start()# init web apiself.api = ApiService()self.api.start()# init memoryself.load()# start alarm clearself.cleaner = Cleaner()self.cleaner.start()# start file clearself.file = FileClear()self.file.start()# start upload serviceself.upload = UploadService()self.upload.start()# start heart serviceself.heart = HeartService()self.heart.start()# websocket serviceself.sock = SocketService()self.sock.start()pass

(3)设备的热加载

        # previous concurrent numberpatrol_num = SystemConfig.PATROL_NUM# analysis listanalysis = {}while not Global.restart:# get all memory devicedevices = DeviceManager.get_device_list()# concurrent analysis number changedif patrol_num != SystemConfig.PATROL_NUM or len(devices) <= 0:patrol_num = SystemConfig.PATROL_NUMlogger.info(f'analysis quality has changed or no analysis device.')# stop all analysislogger.info(f'stop stream analysis thread.')for key, item in analysis.items():# stop the analysisanalysis[key].stop()# wait thread exitanalysis[key].join()# clear analysis listanalysis.clear()# stop thread poollogger.info(f'stop analysis thread pool.')if self.pool is not None:self.pool.shutdown()self.pool = None# create analysis poolif self.pool is None and len(devices) > 0:logger.info(f'create analysis pool')self.pool = ThreadPoolExecutor(max_workers=patrol_num)# disabled device listkeys = []for device in devices:# not in the analysis periodif device.id in analysis:# disabled or not in the analysis periodif device.enabled == 0 or not device.in_analysis_time():# stop analysis threadanalysis[device.id].stop()analysis[device.id].join()# add to remove listkeys.append(device.id)# remove all disabled devicefor key in keys:del analysis[key]# list deleted deviceskeys = []for key, item in analysis.items():# device deleted or nodelatest_device = Nonefor device in devices:if key == device.id:latest_device = devicebreak# device has been deletedif latest_device is None:# stop analysis threadanalysis[key].stop()analysis[key].join()keys.append(key)# device has changedelif analysis[key].device.changed(latest_device):# stop analysis threadanalysis[key].stop()analysis[key].join()keys.append(key)# remove all deleted devicefor key in keys:del analysis[key]# add analysis thread dynamicallyfor device in devices:# thread is in analyzingif device.id in analysis:# update device scenesanalysis[device.id].update()continue# device is disabledif device.enabled == 0:continue# not in analysis periodif not device.in_analysis_time():continue# concurrent task exceeds limitif len(analysis) >= patrol_num:break# create analysis threaddetector = Analysis(self.pool, device)detector.init()detector.start()analysis[device.id] = detectorlogger.info(f'current task count: {len(analysis)}')# sleep for 10 secondfor s in range(10):if Global.restart:breaktime.sleep(1)# wait detector finishfor deviceId, detector in analysis.items():detector.stop()detector.join()

(4)websocket协议第三方接口实现

import datetime
import threading
import os
import time
from socket import create_connectionimport requests
from requests_toolbelt import MultipartEncoder
import json
import sslfrom config.config import SystemConfig, Global
from db.db import DataBase
from db.model.device import Device
from db.model.model import Model
from db.model.scene import Scene
from service.clear import Cleaner
from service.memory import ModelManager, DeviceManager, SceneManager
from util.file import FileClear
from util.helper import Helper
from util.log import LogRecord
logger = LogRecord.get_logger()# socket service
class SocketService(threading.Thread):# constructiondef __init__(self):threading.Thread.__init__(self)# socketself.ws = Noneself.connected = Falseself.do_run = True# receiveself.msg_thread = None# stop servicedef stop(self):self.do_run = False# stop socketif self.ws is not None:self.ws.close()# stop receiveif self.msg_thread is not None:if self.msg_thread.is_alive():self.msg_thread.join()pass# send responsedef send(self, url, cmdId, state, desc, data=None, file_path=None):try:if url is None or cmdId is None:return# dict infodict_info = {}# send fileif file_path is not None:file = open(file_path, "rb")name = os.path.basename(file_path)dict_info['file'] = (name, file, 'application/octet-stream')# send dataif data is not None:dict_info['data'] = data# multipartencoder = MultipartEncoder(fields=dict_info)# request headerheaders = {'Content-Type': encoder.content_type}# request urlresult_url = (url + "?boxId=" + SystemConfig.ID + "&cmdId=" + cmdId +"&state=" + str(state) + "&desc=" + desc)response = requests.post(result_url, json=data, headers=headers, verify=False)if response.status_code != 200:return False# responseresult = response.json()if result['error'] != 0:error = result['error']logger.error(f'upload box {SystemConfig.ID} command result failed, reason: {error}')return Falsereturn Trueexcept Exception as e:logger.error(f'send result error {e}')passdef run(self) -> None:last_time = datetime.datetime.now()while not Global.restart and self.do_run:try:# system restartif Global.restart:break# socket address is nullif SystemConfig.SOCKET_URL is None or SystemConfig.SOCKET_URL == '':time.sleep(1)continue# connect to serveif not self.connected:try:# connect to serveself.ws = create_connection(SystemConfig.SOCKET_URL, sslopt={"cert_reqs": ssl.CERT_NONE})self.connected = self.ws.connected# connect successif self.ws.connected:logger.info(f'@connect websocket success: {SystemConfig.SOCKET_URL}')# request bodybox = {'id': SystemConfig.ID,'ip': Global.ip,'port': Global.port,'cmd': 'connect'}# send dataself.ws.send(json.dumps(box))# update timelast_time = datetime.datetime.now()# synchronize data activelyself.sync_server_data()passexcept Exception as e2:logger.error(f'websocket connect error:{e2}')self.connected = Falsepass# not connectedif not self.connected:time.sleep(3)continue# get socket addressresult_url = SystemConfig.ALARM_URLif SystemConfig.ALARM_URL is not None:if not result_url.endswith("/"):result_url += "/"result_url += "result"# start data receive threadif self.msg_thread is None:def receive():while not Global.restart and self.do_run:try:# disconnectedif not self.ws.connected:time.sleep(1)continue# receive commandresponse = self.ws.recv()if '' == response:time.sleep(0)continue# parse commanddata = json.loads(response)# get command idcmd_id = data['id']# get command typecmd = data['cmd']# get command targetbid = data['boxId']if bid != SystemConfig.ID:continuelogger.info(f'@receive command: {cmd}:{data}')if result_url is None or result_url == '':continue# restart boxif 0 == cmd:self.reboot(result_url, cmd_id)pass# get box configurationelif 1 == cmd:self.get_config(result_url, cmd_id)pass# update box configurationelif 2 == cmd:config = data['config']self.set_config(result_url, cmd_id, config)pass# add new modelelif 10 == cmd:model = data['model']self.add_model(result_url, cmd_id, model)pass# update modelelif 11 == cmd:model = data['model']self.set_model(result_url, cmd_id, model)pass# delete modelelif 12 == cmd:model_id = data['modelId']self.del_model(result_url, cmd_id, model_id)pass# add new deviceelif 20 == cmd:device = data['device']self.add_device(result_url, cmd_id, device)pass# update deviceelif 21 == cmd:device = data['device']self.set_device(result_url, cmd_id, device)pass# delete deviceelif 22 == cmd:device_id = data['deviceId']self.del_device(result_url, cmd_id, device_id)pass# add new sceneelif 25 == cmd:scene = data['scene']self.add_scene(result_url, cmd_id, scene)pass# update sceneelif 26 == cmd:scene = data['scene']self.set_scene(result_url, cmd_id, scene)pass# delete sceneelif 27 == cmd:scene_id = data['sceneId']self.del_scene(result_url, cmd_id, scene_id)pass# remove boxelif 40 == cmd:self.del_box(result_url, cmd_id)passexcept Exception as e1:logger.error(f'websocket handle error {e1}.')self.ws.close()self.connected = Falsepasspass# start threadself.msg_thread = threading.Thread(target=receive)self.msg_thread.start()# websocket heart, prevent disconnected due to lack of dataif (datetime.datetime.now() - last_time).seconds > 15:# heart databox = {'id': SystemConfig.ID,'ip': Global.ip,'port': Global.port,'cmd': 'heart'}# send heart dataself.ws.send(json.dumps(box))# update heart timelast_time = datetime.datetime.now()else:time.sleep(0.01)passexcept Exception as e:self.ws.close()self.connected = Falselogger.error(f'websocket receive error {e}, reconnect again.')# synchronize data from servedef sync_server_data(self):try:logger.info(f'@start sync data from server..')# synchronize configuration from serverself.sync_config(SystemConfig.ALARM_URL)# synchronize model from serveself.sync_model(SystemConfig.ALARM_URL)# synchronize device from serveself.sync_device(SystemConfig.ALARM_URL)# synchronize scene from serveself.sync_scene(SystemConfig.ALARM_URL)logger.info(f'@sync data from server finish..')except Exception as e:logger.error(f'synchronize data from server error:{e}')pass# synchronize configurationdef sync_config(self, req_url):try:# 模拟请求头headers = {'User-Agent': "Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, likeGecko) Chrome / 96.0.4664 .93 Safari / 537.36",}# request urlparams = {'boxId': SystemConfig.ID}response = requests.get(req_url + "/config", params=params, headers=headers, verify=False)if response.status_code != 200:return# responseresult = response.json()if ('error' not in result) or result['error'] != 0:logger.error(f'get system config from server failed.')return# find configuration infoif 'value' not in result:return# get configurationconfig = result['value']if 'saveDays' in config:SystemConfig.SAVE_DAYS = int(config['saveDays'])DataBase.config_mapper.update('SAVE_DAYS', str(config['saveDays']), 1)if 'concurrency' in config:SystemConfig.PATROL_NUM = int(config['concurrency'])DataBase.config_mapper.update('PATROL_NUM', str(config['concurrency']), 1)if 'showLabel' in config:SystemConfig.SHOW_LABEL = int(config['showLabel'])DataBase.config_mapper.update('SHOW_LABEL', str(config['showLabel']), 1)if 'enableRecord' in config:SystemConfig.ALARM_RECORD = int(config['enableRecord'])DataBase.config_mapper.update('ALARM_RECORD', str(config['enableRecord']), 1)if 'requestUrl' in config:SystemConfig.ALARM_URL = str(config['requestUrl'])DataBase.config_mapper.update('ALARM_URL', str(config['requestUrl']), 1)if 'socketUrl' in config:SystemConfig.SOCKET_URL = str(config['socketUrl'])DataBase.config_mapper.update('SOCKET_URL', str(config['socketUrl']), 1)except Exception as e:logger.error(f'synchronize config from server error {e}')pass# synchronize modeldef sync_model(self, req_url):try:# 模拟请求头headers = {'User-Agent': "Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(HTML, likeGecko) Chrome / 96.0.4664 .93 Safari / 537.36",}# query parampage_index = 1page_size = 30# new modelsgets = []ids = DataBase.model_mapper.ids()while True:# request mode by pageparams = {'boxId': SystemConfig.ID, 'pageIndex': page_index, 'pageSize': page_size}response = requests.get(req_url + "/models", params=params, headers=headers, verify=False)if response.status_code != 200:return# responseresult = response.json()if ('error' not in result) or result['error'] != 0:logger.error(f'get models from server failed.')returnif 'value' not in result:return# get modelsmodels = result['value']for model in models:# model urlfile_url = model['model']model_id = model['id']path = Helper.get_model(model_id)gets.append(model_id)# check model exists or notitem = DataBase.model_mapper.find(model['id'])if item is not None:# update local model_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Model(model['id'], model['name'], model['description'], None,model['cls'], model['normal'], model['alarm'],model['width'], model['height'], model['enabled'],_time)DataBase.model_mapper.update(info)ModelManager.update_model(info)continue# add new modelelse:# remove local if existsFileClear.push(path)# download model fileres = requests.get(file_url, verify=False)if res.status_code != 200:continuewith open(path, "wb") as f:f.write(res.content)# add model to localurl = str("http://" + Global.fileIp + ":" + str(Global.filePort) +"/file/download?name=" + model_id + '.rknn')_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Model(model_id, model['name'], model['description'],url, model['cls'], model['normal'], model['alarm'],model['width'], model['height'],model['enabled'], _time)DataBase.model_mapper.insert(info)ModelManager.add_model(info)pass# to the end of pageif len(models) < page_size:break# get next pagepage_index += 1pass# remove not exists modelfor model_id in ids:if model_id not in gets:# move model filereal_path = Helper.get_model(model_id)FileClear.push(real_path)# remove from database and memoryDataBase.model_mapper.delete(model_id)ModelManager.del_model(model_id)except Exception as e:logger.error(f'synchronize model from server error {e}')pass# synchronize devicedef sync_device(self, req_url):try:# request headerheaders = {'User-Agent': "Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(HTML, likeGecko) Chrome / 96.0.4664 .93 Safari / 537.36",}# query parampage_index = 1page_size = 30# new device listgets = []# local device listids = []while True:# get local device idarr = DataBase.device_mapper.ids(page_index, page_size)if len(arr) <= 0:breakids = ids + arrpage_index += 1# get server devicepage_index = 1while True:# request paramparams = {'boxId': SystemConfig.ID, 'pageIndex': page_index, 'pageSize': page_size}response = requests.get(req_url + "/devices", params=params, headers=headers, verify=False)if response.status_code != 200:return# responseresult = response.json()if ('error' not in result) or result['error'] != 0:logger.error(f'get devices from server failed.')returnif 'value' not in result:return# get device listdevices = result['value']logger.info(f'$$$$$ get device from server: {devices}')for device in devices:# add to device listgets.append(device['id'])# prepare device paramparams = ''if 'params' in device:params = device['params']silence = 5if 'silence' in device:silence = device['silence']# use NPU core numberreserver1 = "0"if 'reserver1' in device:reserver1 = device['reserver1']# check device existstemp = DataBase.device_mapper.find(device['id'])# update local deviceif temp is not None:# device not changedif (temp.updateTime is not None andtemp.updateTime.strftime("%Y-%m-%d %H:%M:%S") >= device['updateTime']):logger.info(f'@@device {temp.id} not change, ignore it.')continuelogger.info(f'@@device {temp.id} exists, update local device.')# update local device_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Device(device['id'], device['name'],device['rtspUrl'], device['username'],device['password'], device['manufacture'],device['enabled'], device['frameGap'],device['startTime'], device['stopTime'],params, silence, _time, _time,_time, None, None, reserver1)DataBase.device_mapper.update(info)DeviceManager.update_device(info)# add new deviceelse:device_id = device['id']logger.info(f'@@device {device_id} not exists, add device to local.')_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Device(device['id'], device['name'],device['rtspUrl'], device['username'],device['password'], device['manufacture'],device['enabled'], device['frameGap'],device['startTime'], device['stopTime'],params, silence, _time, _time,_time, None, None, reserver1)DataBase.device_mapper.insert(info)DeviceManager.add_device(info)pass# to the end of pageif len(devices) < page_size:break# find next pagepage_index += 1pass# remove not exists devicefor device_id in ids:if device_id not in gets:logger.info(f'@@device {device_id} not found at local, remove device from local.')# find local devicedevice = DataBase.device_mapper.find(device_id)if device is not None:device.remove_image()# remove device from localDataBase.device_mapper.delete(device_id)DeviceManager.del_device(device_id)DataBase.scene_mapper.clear(device_id)SceneManager.del_scene_list(device_id)# clean device dataCleaner.clear_device(device_id)except Exception as e:logger.error(f'synchronize device from server error {e}')pass# synchronize scenedef sync_scene(self, req_url):try:# request headerheaders = {'User-Agent': "Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(HTML, likeGecko) Chrome / 96.0.4664 .93 Safari / 537.36",}# request parampage_index = 1page_size = 30# server listgets = []# local listids = []while True:arr = DataBase.scene_mapper.ids(page_index, page_size)if len(arr) <= 0:breakids = ids + arrpage_index += 1# query from servepage_index = 1while True:# request pramparams = {'boxId': SystemConfig.ID, 'pageIndex': page_index, 'pageSize': page_size}response = requests.get(req_url + "/scenes", params=params, headers=headers, verify=False)if response.status_code != 200:return# responseresult = response.json()if ('error' not in result) or result['error'] != 0:logger.error(f'get scenes from server failed.')returnif 'value' not in result:return# get scenescenes = result['value']logger.info(f'$$$$$ get scenes from server: {scenes}')for item in scenes:scene = Scene.from_dict(item)_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")scene.updateTime = _timescene.createTime = _time# append to serve listgets.append(scene.id)# check local existstemp = DataBase.scene_mapper.find(scene.id)# update local sceneif temp is not None:logger.info(f'@@update local scene.')DataBase.scene_mapper.update(scene)SceneManager.update_scene(scene)# add new sceneelse:logger.info(f'@@add scene to local.')DataBase.scene_mapper.insert(scene)SceneManager.add_scene(scene)pass# to the end of pageif len(scenes) < page_size:break# find next pagepage_index += 1pass# remove local scenefor _id in ids:if _id not in gets:logger.info(f'@@scene {_id} not exits at local, remove scene from local.')DataBase.scene_mapper.delete(_id)SceneManager.del_scene(_id)except Exception as e:logger.error(f'synchronize scene from server error {e}')pass# reboot boxdef reboot(self, result_url, cmdId, ):# set global reboot flagGlobal.restart = True# response to serveself.send(result_url, cmdId, 1, "", None, None)# wait app exittime.sleep(5)# reboot systemHelper.reboot()pass# get config from localdef get_config(self, result_url, cmdId):try:# find local configuration listrecords = DataBase.config_mapper.list()arr = []if records is not None:for item in records:arr.append({"key": item.key,"params": item.params,"enabled": item.enabled,"createTime": item.createTime.strftime("%Y-%m-%d %H:%M:%S"),})data = json.dumps(arr)self.send(result_url, cmdId, 1, "", data, None)except Exception as e:logger.error(f'get local configuration failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# update configuration to localdef set_config(self, result_url, cmdId, config):try:# prepare paramsif 'saveDays' in config:SystemConfig.SAVE_DAYS = int(config['saveDays'])DataBase.config_mapper.update('SAVE_DAYS', str(config['saveDays']), 1)if 'concurrency' in config:SystemConfig.PATROL_NUM = int(config['concurrency'])DataBase.config_mapper.update('PATROL_NUM', str(config['concurrency']), 1)if 'showLabel' in config:SystemConfig.SHOW_LABEL = int(config['showLabel'])DataBase.config_mapper.update('SHOW_LABEL', str(config['showLabel']), 1)if 'enableRecord' in config:SystemConfig.ALARM_RECORD = int(config['enableRecord'])DataBase.config_mapper.update('ALARM_RECORD', str(config['enableRecord']), 1)if 'autoRestart' in config:SystemConfig.AUTO_RESTART = int(config['autoRestart'])DataBase.config_mapper.update('AUTO_RESTART', str(config['autoRestart']), 1)if 'requestUrl' in config:SystemConfig.ALARM_URL = str(config['requestUrl'])DataBase.config_mapper.update('ALARM_URL', str(config['requestUrl']), 1)if 'socketUrl' in config:SystemConfig.SOCKET_URL = str(config['socketUrl'])DataBase.config_mapper.update('SOCKET_URL', str(config['socketUrl']), 1)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'update local configuration failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# add model to localdef add_model(self, result_url, cmdId, model, send=True):try:# get model paramfile_url = model['model']model_id = model['id']path = Helper.get_model(model['id'])# check model fileif os.path.exists(path):item = DataBase.model_mapper.find(model['id'])if item is not None:returnHelper.remove_file(path)# download model fileres = requests.get(file_url, verify=False)if res.status_code != 200:self.send(result_url, cmdId, 0, "下载模型失败", None, None)returnwith open(path, "wb") as f:f.write(res.content)# generate model infourl = str("http://" + Global.fileIp + ":" + str(Global.filePort) +"/file/download?name=" + model_id + '.rknn')_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Model(model_id, model['name'], model['description'],url, model['cls'], model['normal'],model['alarm'], model['width'], model['height'],model['enabled'], _time)# add model to localDataBase.model_mapper.insert(info)ModelManager.add_model(info)if send:self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'add system model failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# update modeldef set_model(self, result_url, cmdId, model):try:# generate model info_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Model(model['id'], model['name'],model['description'], None,model['cls'], model['normal'],model['alarm'], model['width'],model['height'], model['enabled'],_time)# update model to localDataBase.model_mapper.update(info)ModelManager.update_model(info)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'update model failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# delete modeldef del_model(self, result_url, cmdId, modelId):try:# find model infomodel = DataBase.model_mapper.find(modelId)if model is not None:model.delete_file()# delete from localDataBase.model_mapper.delete(modelId)ModelManager.del_model(modelId)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'delete model failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# add device to localdef add_device(self, result_url, cmdId, device):try:# device paramparams = ''if 'params' in device:params = device['params']silence = 5if 'silence' in device:silence = device['silence']# use NPU core numberreserver1 = "0"if 'reserver1' in device:reserver1 = device['reserver1']_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Device(device['id'], device['name'],device['rtspUrl'], device['username'],device['password'], device['manufacture'],device['enabled'], device['frameGap'],device['startTime'], device['stopTime'],params, silence, _time, _time, _time,None, None, reserver1)# add device to localDataBase.device_mapper.insert(info)DeviceManager.add_device(info)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'add device to local failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# update devicedef set_device(self, result_url, cmdId, device):try:# device paramif ('id' not in device or 'name' not in device or'rtspUrl' not in device or 'frameGap' not in device):self.send(result_url, cmdId, 0, "设备参数无效", None, None)returnparams = ''if 'params' in device:params = device['params']silence = 5if 'silence' in device:silence = device['silence']# use NPU core numberreserver1 = "0"if 'reserver1' in device:reserver1 = device['reserver1']# device no updatedtemp = DataBase.device_mapper.find(device['id'])if (temp is not None and temp.updateTime is not None andtemp.updateTime.strftime("%Y-%m-%d %H:%M:%S") >= device['updateTime']):self.send(result_url, cmdId, 1, "", None, None)returndevice_id = device['id']logger.info(f'@@set device,update {device_id} info.')_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Device(device['id'], device['name'],device['rtspUrl'], device['username'],device['password'], device['manufacture'],device['enabled'], device['frameGap'],device['startTime'], device['stopTime'],params, silence, _time, _time, _time,None, None, reserver1)# update local deviceDataBase.device_mapper.update(info)DeviceManager.update_device(info)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'update local device failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# device local devicedef del_device(self, result_url, cmdId, deviceId):try:# find local devicelogger.info(f'@@begin delete device {deviceId}.')device = DataBase.device_mapper.find(deviceId)if device is not None:device.remove_image()logger.info(f'@@delete device {deviceId}.')# delete from localDataBase.device_mapper.delete(deviceId)DeviceManager.del_device(deviceId)# delete device scenesDataBase.scene_mapper.clear(deviceId)SceneManager.del_scene_list(deviceId)# clear device dataCleaner.clear_device(deviceId)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'delete device failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# add device scenedef add_scene(self, result_url, cmd_id, item):try:# add scene modelmodel = item['model']if isinstance(model, dict):self.add_model(result_url, cmd_id, model, False)pass# generate scene infoscene = Scene.from_dict(item)logger.info(f'add device {scene.deviceId} scene {scene.id}.')_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Scene(scene.id, scene.deviceId,scene.detectType, scene.threshold,scene.model, scene.alarmInfo,scene.alarmCode, _time, _time)# add scene to localDataBase.scene_mapper.insert(info)SceneManager.add_scene(info)self.send(result_url, cmd_id, 1, "", None, None)except Exception as e:logger.error(f'add scene failed:{e}')self.send(result_url, cmd_id, 0, e.__str__(), None, None)pass# update scenedef set_scene(self, result_url, cmd_id, item):try:# add scene modelmodel = item['model']if isinstance(model, dict):self.add_model(result_url, cmd_id, model, False)pass# 场景信息scene = Scene.from_dict(item)logger.info(f'update device {scene.deviceId} scene {scene.id}.')_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")info = Scene(scene.id, scene.deviceId,scene.detectType, scene.threshold,scene.model, scene.alarmInfo,scene.alarmCode, _time, _time)# update local sceneDataBase.scene_mapper.update(info)SceneManager.update_scene(info)self.send(result_url, cmd_id, 1, "", None, None)except Exception as e:logger.error(f'update local scene failed:{e}')self.send(result_url, cmd_id, 0, e.__str__(), None, None)pass# delete scenedef del_scene(self, result_url, cmdId, sceneId):try:logger.info(f'@@delete scene {sceneId}.')# delete scene from localDataBase.scene_mapper.delete(sceneId)SceneManager.del_scene(sceneId)self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'delete local scene failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass# remove ai boxdef del_box(self, result_url, cmdId):try:logger.info(f'@@delete box {SystemConfig.ID}, disabled all device.')# disabled all modelDataBase.device_mapper.disable_all()self.send(result_url, cmdId, 1, "", None, None)except Exception as e:logger.error(f'remove ai box failed:{e}')self.send(result_url, cmdId, 0, e.__str__(), None, None)pass

以上是基于python版本部分源码实现,请看图右上角
在这里插入图片描述

cmake(c/c++)版本实现

瑞芯微同时也提供了c接口,对于c/c++开发人员来说也是一种很大的福利,因为相对于python而言,它有如下好处:

  • 源码保密性极好,python是解释型语言,源码再要python虚拟机上跑,源码容易泄漏。c/c++是编译型语言,编译后无法反编译。
  • 报部署更方便,python需要安装各种环境和插件,部署十分麻烦,但是c/c++编译后直接打包成一个可执行文件,解压即可实现部署,分分钟搞定一两个小时的事。

缺点是:c/c++比较复杂,对开发员技术要求较高,所有的基本要靠自己实现对应类库和工具库,使用不好容易内存泄漏,python和java一样,完全不用考虑内存问题且类库极其丰富。

如果c/c++程序员,建议首选cmake版本了,这里我也挑几个重点实现,如需要的找表情包右上角联系:

(1)cmake编译选项配置
包括依赖库编译、头文件、安装指令等配置

cmake_minimum_required(VERSION 3.4.1)project(box)# required c++ 17
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)# skip 3rd-party lib dependencies
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--allow-shlib-undefined")# install target and libraries
set(CMAKE_INSTALL_PREFIX ${CMAKE_SOURCE_DIR}/install/box)# set install library path
set(CMAKE_SKIP_INSTALL_RPATH FALSE)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib")# find oatpp-websocket
find_package(oatpp-websocket REQUIRED)# 查找oatpp依赖
find_package(oatpp REQUIRED)# set platform
set(LIB_ARCH aarch64)
if (CMAKE_C_COMPILER MATCHES "aarch64")#set(LIB_ARCH aarch64)
else()#set(LIB_ARCH armhf)
endif()# include local header
include_directories(${CMAKE_SOURCE_DIR})# spd log
include_directories(${CMAKE_SOURCE_DIR}/3rdparty)
include_directories(${CMAKE_SOURCE_DIR}/3rdparty/spdlog-1.10.0/include)
include_directories(${CMAKE_SOURCE_DIR}/3rdparty/rapidjson-1.1.0/include)# rknn api
if(TARGET_SOC STREQUAL "rk356x")set(RKNN_API_PATH ${CMAKE_SOURCE_DIR}/runtime/RK356X/${CMAKE_SYSTEM_NAME}/librknn_api)
elseif(TARGET_SOC STREQUAL "rk3588")set(RKNN_API_PATH ${CMAKE_SOURCE_DIR}/runtime/RK3588/${CMAKE_SYSTEM_NAME}/librknn_api)
else()message(FATAL_ERROR "TARGET_SOC is not set, ref value: rk356x or rk3588 or rv110x")
endif()if (CMAKE_SYSTEM_NAME STREQUAL "Android")set(RKNN_RT_LIB ${RKNN_API_PATH}/${CMAKE_ANDROID_ARCH_ABI}/librknnrt.so)
else()set(RKNN_RT_LIB ${RKNN_API_PATH}/${LIB_ARCH}/librknnrt.so)
endif()
include_directories(${RKNN_API_PATH}/include)# opencv
if (CMAKE_SYSTEM_NAME STREQUAL "Android")set(OpenCV_DIR ${CMAKE_SOURCE_DIR}/3rdparty/opencv/OpenCV-android-sdk/sdk/native/jni/abi-${CMAKE_ANDROID_ARCH_ABI})
else()if(LIB_ARCH STREQUAL "armhf")set(OpenCV_DIR ${CMAKE_SOURCE_DIR}/3rdparty/opencv/opencv-linux-armhf)else()set(OpenCV_DIR ${CMAKE_SOURCE_DIR}/3rdparty/opencv/opencv-linux-aarch64)endif()
endif()
find_package(OpenCV REQUIRED)
message("opencv OpenCV_LIBS: ${OpenCV_LIBS}")
message("opencv OpenCV_LIBRARIES: ${OpenCV_LIBRARIES}")# rga
if(TARGET_SOC STREQUAL "rk356x")set(RGA_PATH ${CMAKE_SOURCE_DIR}/3rdparty/rga/RK356X)
elseif(TARGET_SOC STREQUAL "rk3588")set(RGA_PATH ${CMAKE_SOURCE_DIR}/3rdparty/rga/RK3588)
else()message(FATAL_ERROR "TARGET_SOC is not set, ref value: rk356x or rk3588")
endif()
if (CMAKE_SYSTEM_NAME STREQUAL "Android")set(RGA_LIB ${RGA_PATH}/lib/Android/${CMAKE_ANDROID_ARCH_ABI}/librga.so)
else()set(RGA_LIB ${RGA_PATH}/lib/Linux//${LIB_ARCH}/librga.so)
endif()
include_directories( ${RGA_PATH}/include)# zlmediakit
set(ZLMEDIAKIT_PATH ${CMAKE_SOURCE_DIR}/3rdparty/zlmediakit)
if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
include_directories(${ZLMEDIAKIT_PATH}/include)
set(ZLMEDIAKIT_LIBS ${ZLMEDIAKIT_PATH}/${LIB_ARCH}/libmk_api.so)
endif()if (ZLMEDIAKIT_LIBS)add_definitions(-DBUILD_VIDEO_RTSP)
endif()# include deepsort
include_directories(${PROJECT_SOURCE_DIR}/src/bytetrack/include)set(CMAKE_INSTALL_RPATH "lib")# 设置源码列表变量
file(GLOB LOCAL_INCLUDE_DIR ./src/*.h ./src/*/*.h./utils/*.h)
file(GLOB LOCAL_SOURCES_DIR ./src/*.cpp ./src/*/*.cpp./src/*/*.c./src/bytetrack/src/*.cpp./utils/*.cpp)# add box video target
add_executable(box${LOCAL_INCLUDE_DIR}${LOCAL_SOURCES_DIR})# link dynamic library Eigen3::Eigen
target_link_libraries(box${RKNN_RT_LIB}${RGA_LIB}${OpenCV_LIBS}${MPP_LIBS}${ZLMEDIAKIT_LIBS}uuidoatpp::oatppoatpp::oatpp-websocket
)# install target and libraries
set(CMAKE_INSTALL_PREFIX ${CMAKE_SOURCE_DIR}/install/box)
# install target 
install(TARGETS box DESTINATION ./)
# install library
install(PROGRAMS ${RKNN_RT_LIB} DESTINATION lib)
install(PROGRAMS ${RGA_LIB} DESTINATION lib)
install(PROGRAMS ${MPP_LIBS} DESTINATION lib)
install(PROGRAMS ${ZLMEDIAKIT_LIBS} DESTINATION lib)
# install model file
install(DIRECTORY ${CMAKE_SOURCE_DIR}/model DESTINATION ./)
# install app directory
install(DIRECTORY ${CMAKE_SOURCE_DIR}/app DESTINATION ./)
# install config file
install(FILES ${CMAKE_SOURCE_DIR}/config.ini DESTINATION ./)

(2)Facade盒子封装启动

#include "service/Box.h"
#include "service/Display.h"int main(int argc, char** argv)
{Box::GetInstance().Init();Box::GetInstance().Start();Display::GetInstance().Show();Box::GetInstance().Stop();return 0;
}(3) 盒子的初始化
```c++void Box::Init()
{// load configurationGlobal::GetInstance().Load();// init databaseDBPoll::GetInstance().Init(1, 1, Global::GetInstance().dbIp, Global::GetInstance().dbPort, Global::GetInstance().dbUser, Global::GetInstance().dbPwd, Global::GetInstance().dbName);// create all tableDBPoll::GetInstance().CreateTables();// load data to memorythis->Load();// start file clearm_file_clear = std::make_shared<FileClear>();if (m_file_clear){m_file_clear->Start();}// http serverm_http_server = std::make_shared<HttpServer>();if (m_http_server){m_http_server->Start();}// alarm uploaderm_uploader = std::make_shared<Uploader>();if (m_uploader){m_uploader->Start();}// device snapshotm_snapshot = std::make_shared<Snapshot>();if (m_snapshot){m_snapshot->Start();}// start message broadcastm_msg_buffer = std::make_shared<MsgBuffer>();if (m_msg_buffer){m_msg_buffer->Start();}// start web socket servicem_websocket = std::make_shared<WebsocektClient>();if (m_websocket){m_websocket->Start();}
}

(3)盒子的启动及模型设备热加载

void Box::Start()
{LOG_INFO("start ai box.");m_run = true;m_thread = std::thread([&]() {// struct typedeftypedef std::map<std::string, AnalysisPtr> ANALYSISES;typedef std::vector<DeviceInfo> DEVICES;ANALYSISES analysises;int patrol_num = Config::GetInstance().PATROL_NUM;while (m_run && !Global::GetInstance().restart) {// get all memory devicesDEVICES devices;DeviceManager::GetInstance().GetList(devices);// concurrent analysis number changedif (patrol_num != Config::GetInstance().PATROL_NUM || devices.size() <= 0){if (patrol_num != Config::GetInstance().PATROL_NUM){LOG_INFO("analysis quality changed, stop analysis thread.");patrol_num = Config::GetInstance().PATROL_NUM;}else{LOG_INFO("no analysis device, stop analysis thread.");}				// stop all analysisfor (ANALYSISES::iterator it = analysises.begin(); it  != analysises.end(); ++it){AnalysisPtr& analysis = it->second;if (analysis) {analysis->Stop();}}analysises.clear();LOG_INFO("stop analysis thread pool.");if (m_pool){m_pool->ShutDown();m_pool.reset();m_pool = nullptr;}}// create analysis poolif (!m_pool && devices.size() > 0){m_pool = std::make_shared<Pool>();m_pool->Start(patrol_num);}// device disabledfor (size_t index = 0; index < devices.size(); ++index){DeviceInfo& device = devices[index];if (analysises.find(device.GetId()) == analysises.end()){continue;}AnalysisPtr analysis = analysises[device.GetId()];if (device.GetEnabled() == 0 && analysis){analysis->Stop();}}// device deletedfor (ANALYSISES::iterator it = analysises.begin(); it != analysises.end();){bool find = false;DeviceInfo device;for (size_t index = 0; index < devices.size(); ++index) {DeviceInfo& dev = devices[index];if (it->first == dev.GetId()){find = true;device = dev;break;}}// device deletedif (!find){it->second->Stop();it = analysises.erase(it);continue;}// device changedif (device.Changed(it->second->GetDevice())){it->second->Stop();it = analysises.erase(it);continue;}++it;}// add analysis thread dynamicallyfor (size_t index = 0; index < devices.size(); ++index) {// get current deviceDeviceInfo& device = devices[index];// thread is in analyzingif (analysises.find(device.GetId()) != analysises.end()){AnalysisPtr detector = analysises[device.GetId()];if (detector){detector->Update();}continue;}// device is disabledif (device.GetEnabled() == 0){continue;}// not in analysis periodif (!device.InTime()){continue;}// exceeds task limitif (analysises.size() >= patrol_num){continue;}// create analysis threadAnalysisPtr detector = std::make_shared<Analysis>(m_pool, device);detector->Init();detector->Start();analysises[device.GetId()] = detector;}// get system modelsstd::vector<ModelInfo> models;ModelManager::GetInstance().GetList(models);// clear local model filestd::string dir = Helper::GetFileSavePath("");std::vector<std::string> model_files;FileUtil::GetFileList(dir, false, model_files);for (size_t index = 0; index < model_files.size(); ++index){std::string& file_path = model_files[index];std::filesystem::path p(file_path);bool find = false;for (size_t j = 0; j < models.size(); ++j){if (p.stem() == models[j].GetId()){find = true;break;}}if (!find){// get file create timestd::filesystem::file_time_type t = std::filesystem::last_write_time(file_path);auto systemTime = std::chrono::time_point_cast<std::chrono::system_clock::duration>(t - std::filesystem::file_time_type::clock::now() + std::chrono::system_clock::now());std::time_t tt = std::chrono::system_clock::to_time_t(systemTime);std::time_t tt2 = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());// create long time agoif (tt2 - tt > 60){// delete old model fileFileUtil::RemoveFile(file_path);}}}// sleep for few secondsfor (size_t index = 0; index < 10; ++index){if (!m_run || Global::GetInstance().restart){break;}std::this_thread::sleep_for(std::chrono::seconds(1));}}});
}

(4)盒子的停止

void Box::Stop()
{LOG_INFO("stop ai box.");// stop threadm_run = false;// stop web socketif (m_websocket){m_websocket->Stop();m_websocket.reset();m_websocket = nullptr;}// wait thread exitif (m_thread.joinable()){m_thread.join();}// stop file clearif (m_file_clear){m_file_clear->Stop();m_file_clear.reset();m_file_clear = nullptr;}// stop http serverif (m_http_server) {m_http_server->Stop();m_http_server.reset();m_http_server = nullptr;}// stop alarm uploaderif (m_uploader){m_uploader->Stop();m_uploader.reset();m_uploader = nullptr;}// stop snapshotif (m_snapshot){m_snapshot->Stop();m_snapshot.reset();m_snapshot = nullptr;}// stop broadcastif (m_msg_buffer){m_msg_buffer->Stop();m_msg_buffer.reset();m_msg_buffer = nullptr;}// depose db connectionDBPoll::GetInstance().Close();
}

c/c++版本实现源码框架
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词