基于python的视频监控系统_Python远程视频监控程序的实例代码
老板由于事務(wù)繁忙無(wú)法經(jīng)常親臨教研室,于是讓我搞個(gè)監(jiān)控系統(tǒng),讓他在辦公室就能看到教研室來(lái)了多少人。o(>﹏<)o|||
最初我的想法是直接去網(wǎng)上下個(gè)軟件,可是找來(lái)找去不是有毒就是收費(fèi),無(wú)奈技術(shù)不到家無(wú)法破解,只得另尋他法。
正當(dāng)沒(méi)有辦法的時(shí)候,我看到一篇博文一個(gè)基于python的高速視頻傳輸程序 ,看完茅塞頓開,覺(jué)得完全可以自己寫一個(gè),在此感謝作者詹姆斯。
這個(gè)程序包括一個(gè)服務(wù)器和一個(gè)客戶端。需要的庫(kù)有 VideoCapture 和 pygame,一個(gè)用來(lái)得到攝像頭的視頻,一個(gè)用來(lái)顯示。Python庫(kù)可以點(diǎn)這里下載:Python Extension Packages。進(jìn)去后ctrl+F找到相應(yīng)的庫(kù),然后選擇相應(yīng)的版本即可,這里還有很多其他的庫(kù)可提供下載。
我想到的解決方案是,在教研室開一臺(tái)電腦,接一個(gè)USB攝像頭,然后開啟一個(gè)服務(wù)器程序,等待著老板使用客戶端連接,由于是實(shí)時(shí)視頻傳輸,使用UDP協(xié)議。(主要傳輸部分采用詹姆斯的代碼)。
服務(wù)器端代碼如下:
# -*- coding: UTF-8 -*-
import socket
import time
import traceback
from VideoCapture import Device
import threading
# 全局變量
is_sending = False
cli_address = ('', 0)
# 主機(jī)地址和端口
host = ''
port = 10218
# 初始化UDP socket
ser_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ser_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ser_socket.bind((host, port))
# 接收線程類,用于接收客戶端發(fā)送的消息
class UdpReceiver(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.thread_stop = False
def run(self):
while not self.thread_stop:
# 聲明全局變量,接收消息后更改
global cli_address
global is_sending
try:
message, address = ser_socket.recvfrom(2048)
except:
traceback.print_exc()
continue
# print message,cli_address
cli_address = address
if message == 'startCam':
print 'start camera',
is_sending = True
ser_socket.sendto('startRcv', cli_address)
if message == 'quitCam':
is_sending = False
print 'quit camera',
def stop(self):
self.thread_stop = True
# 創(chuàng)建接收線程
receiveThread = UdpReceiver()
receiveThread.setDaemon(True) # 該選項(xiàng)設(shè)置后使得主線程退出后子線程同時(shí)退出
receiveThread.start()
# 初始化攝像頭
cam = Device()
cam.setResolution(320,240)
# 主線程循環(huán),發(fā)送視頻數(shù)據(jù)
while 1:
if is_sending:
img = cam.getImage().resize((160,120))
data = img.tostring()
ser_socket.sendto(data, cli_address)
time.sleep(0.05)
else:
time.sleep(1)
receiveThread.stop()
ser_socket.close()
服務(wù)器啟動(dòng)一個(gè)子線程,來(lái)監(jiān)聽(tīng)客戶端發(fā)送的消息。當(dāng)有消息時(shí),將is_sending改為True,則服務(wù)器向該客戶端發(fā)送視頻數(shù)據(jù)。具體信息可以看代碼注釋。
客戶端代碼如下:
# -*- coding: UTF-8 -*-
import socket, time
import pygame
from pygame.locals import *
from sys import exit
# 服務(wù)器地址,初始化socket
ser_address = ('localhost', 10218)
cli_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 設(shè)置超時(shí)
cli_socket.settimeout(5)
# 向服務(wù)器發(fā)送消息,并判斷接收時(shí)是否超時(shí),若超時(shí)則重發(fā)
while 1:
cli_socket.sendto('startCam', ser_address)
try:
message, address = cli_socket.recvfrom(2048)
if message == 'startRcv':
print message
break
except socket.timeout:
continue
# 此句無(wú)用。。防止窗口初始化后等待數(shù)據(jù)
cli_socket.recvfrom(65536)
# 初始化視頻窗口
pygame.init()
screen = pygame.display.set_mode((640,480))
pygame.display.set_caption('Web Camera')
pygame.display.flip()
# 設(shè)置時(shí)間,可以用來(lái)控制幀率
clock = pygame.time.Clock()
# 主循環(huán),顯示視頻信息
while 1:
try:
data, address = cli_socket.recvfrom(65536)
except socket.timeout:
continue
camshot = pygame.image.frombuffer(data, (160,120), 'RGB')
camshot = pygame.transform.scale(camshot, (640, 480))
for event in pygame.event.get():
if event.type == pygame.QUIT:
cli_socket.sendto('quitCam', ser_address)
cli_socket.close()
pygame.quit()
exit()
screen.blit(camshot, (0,0))
pygame.display.update()
clock.tick(20)
總結(jié)
以上是生活随笔為你收集整理的基于python的视频监控系统_Python远程视频监控程序的实例代码的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: vue子组件获取父组件数据_在vue.j
- 下一篇: visual studio code p