日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

网站后端_Flask-第三方库.利用Flask-Socketio扩展构建实时流应用?

發(fā)布時間:2025/7/14 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 网站后端_Flask-第三方库.利用Flask-Socketio扩展构建实时流应用? 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

模塊簡介:

說明: 此模塊主要用于構建支持實時,雙向基于事件的通信,將Websocket和Polling等其它實時通信方式封裝成了通用接口,從而可在各個平臺/瀏覽器/設備上穩(wěn)定工作.


快速安裝:

pip?install?flask-socketio <script?src="https://cdn.socket.io/socket.io-1.4.5.js"></script>


應用場景:

1. 實時分析, 服務端將數(shù)據(jù)推送到客戶端,客戶端可以為實時計數(shù)器,圖表,日志等

2. 實時聊天, 通過命名空間和房間實現(xiàn)服務端Socket多路復用.

3. 流式傳輸, 已經(jīng)支持任何二進制文件的傳輸,包括圖片,視頻,音頻.

4. 文檔合并, 運行多個用戶同時編輯一個文檔,并且能夠看到每個用戶做出的修改.


原理介紹:

客戶端: 利用基于flashsocket/websocket/iframe等封裝的的socket對象和通用抽象方法(transport接口),包含數(shù)據(jù)編碼/解碼/心跳處理等

服務端: 利用namespace+room實現(xiàn)服務端socket多路復用,namespace基于客戶端url中path部分區(qū)分應用,不同應用相互隔離,默認為/,room基于客戶端指定namespace和room限制應用消息有效范圍,如果沒有指定,則除了自己外其它屬于此namespace的socket都會收到消息.


常用事件:

服務端
connect當客戶端與服務端連接成功后被觸發(fā)
message當客戶端使用send發(fā)送數(shù)據(jù)時被觸發(fā)
disconnect當客戶端與服務端失去連接時被觸發(fā),如關閉瀏覽器,主動斷開,掉線等任何斷開連接的情況.
客戶端
connect當客戶端與服務端連接成功后被觸發(fā)
message當服務端使用send發(fā)送數(shù)據(jù)時被觸發(fā)
disconnect當客戶端主動斷開連接時被觸發(fā).


接收消息: [客戶端發(fā)送消息<- 回調(diào)確認 -> 服務接收消息]

# 方式一: 客戶端通過send發(fā)送的未命名事件數(shù)據(jù),服務端只能使用默認message事件接收處理, 客戶端定義的事件回調(diào)函數(shù)接收的數(shù)據(jù)來自于服務端message事件處理函數(shù)的返回值

socket.send([data], ...., callback)


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);socket.on('connect',?function(){socket.send({'data':?'hello?word!'},?function(data){console.log('#=>?recive?server?data',?data.data);});}); </script>
@io.on('message') def?message_handler(*args):print?'#=>?recive?{0}?data?from?client'.format(type(args[0])),?argsreturn?args

# 方式二: 客戶端通過emit發(fā)送的命名事件數(shù)據(jù),服務端只能使用對應自定義事件接收處理, 客戶端定義的事件回調(diào)函數(shù)接收的數(shù)據(jù)來自于服務端對應事件處理函數(shù)的返回值

socket.emit(event_name, [data], ...., callback)


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);socket.on('connect',?function(){socket.emit('connect?event',?{'data':?'hello?word!'},?function(data){console.log('#=>?recive?server?data',?data.data);});}); </script>
@io.on('connect?event') def?connect_event_handler(*args):print?'#=>?recive?{0}?data?from?client'.format(type(args[0])),?argsreturn?args


發(fā)送消息: [服務端發(fā)送消息 <- 回調(diào)確認 -> 客戶端接收消息]

# 方式一: 服務端通過send發(fā)送的未命名事件數(shù)據(jù),可指定namespace, callback, broadcast, room, include_self額外參數(shù),客戶端只能使用默認message事件接收處理,服務端自定義的回調(diào)函數(shù)接收的數(shù)據(jù)來自于客戶端對回調(diào)函數(shù)的調(diào)用,不是return的值.

send(message, namespace, callback, broadcast, room, include_self)


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);socket.on('message',?function(data,?func){console.log('#=>?recive?server?data',?data.data);func();}); </script>
def?message_event_callback(*args):print?'#=>?client?called?{0}'.format(inspect.stack()[0][-4:-2]) @io.on('connect') def?connect_event_handler():io.send({'data':?'hello?word!'},?callback=message_event_callback)

# 方式二: 服務端通過emit發(fā)送的命名事件數(shù)據(jù),可指定namespace, callback, broadcast, room, include_self額外參數(shù),客戶端只能使用對應自定義事件接收處理,服務端自定義的回調(diào)函數(shù)接收的數(shù)據(jù)來自于客戶端對回調(diào)函數(shù)的調(diào)用,不是return的值.

emit(event, args, namespace, callback, broadcast, room, include_self)


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);socket.on('connect?event',?function(data,?func){console.log('#=>?recive?server?data',?data.data);func();}); </script>
def?connect_event_callback(*args):print?'#=>?client?called?{0}'.format(inspect.stack()[0][-4:-2]) @io.on('connect') def?connect_event_handler():io.emit('connect?event',{'data':?'hello?word!'},callback=message_event_callback)


廣播消息: [服務端發(fā)送消息 <- 回調(diào)確認 -> 客戶端接收消息]

# 方式一: 服務端通過send發(fā)送的未命名事件數(shù)據(jù),指定broadcast=True額外參數(shù),可配合namespace/room/include_self額外參數(shù)來控制消息發(fā)往的應用,發(fā)往的房間,是否發(fā)給自己,客戶端只能使用默認message事件接收處理


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);var?user?=?{name:?['zmanman',?'qmanman',?'smanman',?'lmanman'][Math.ceil(Math.random()*3)],};console.log('#=>?current?user:?',?user);socket.on('connect',?function(){socket.send(user);});socket.on('message',?function(data){var?$content?=?$('<p>廣播:?'+?data.name?+'上線.</p>');$('#socketio').append($content);}); </script>
@io.on('message',?namespace='/') def?online_notify_handler(*args):print?'#=>?recive?{0}?data?from?client'.format(type(args[0])),?argssend(args[0],?broadcast=True)

# 方式二: 服務端通過emit發(fā)送的命名事件數(shù)據(jù),指定broadcast=True額外參數(shù),可配合namespace/room/include_self額外參數(shù)來控制消息發(fā)往的應用,發(fā)往的房間,是否發(fā)給自己,客戶端只能使用對應自定義事件接收處理


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);var?user?=?{name:?['zmanman',?'qmanman',?'smanman',?'lmanman'][Math.ceil(Math.random()*3)],};console.log('#=>?current?user:?',?user);socket.on('connect',?function(){socket.emit('online?notify',?user);});socket.on('online?notify',?function(data){var?$content?=?$('<p>廣播:?'+?data.name?+'上線.</p>');$('#socketio').append($content);}); </script>
@io.on('online?notify',?namespace='/') def?online_notify_handler(*args):print?'#=>?recive?{0}?data?from?client'.format(type(args[0])),?argsemit('online?notify',?args[0],?broadcast=True)


分組廣播: [服務端發(fā)送消息 <- 回調(diào)確認 -> 客戶端接收消息]

# 方式一: 服務端通過send發(fā)送的未命名事件數(shù)據(jù),指定broadcast=True和room=xxoo額外參數(shù),可配合namespace/include_self額外參數(shù)來控制消息發(fā)往的應用,發(fā)往的房間,是否發(fā)給自己,服務端提供了join_room和leave_room函數(shù)來對請求分組,客戶端只能使用默認message事件接收處理


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);var?name?=?['zmanman',?'qmanman',?'smanman',?'lmanman',][Math.ceil(Math.random()*3)];socket.on('connect',?function(){socket.send('user?join',?{'room':?'room1',?'user':?name});});socket.on('message',?function(data){if(data.action=='sys?boradcast'){console.log('公告:?'?+?data.message);};}); </script>
#!/usr/bin/env?python #?-*-?coding:?utf-8?-*- #?@Date????:?2016-12-27?19:22:04 #?@Author??:?李滿滿?(xmdevops@vip.qq.com) #?@Link????:?http://xmdevops.blog.51cto.com/ #?@Version?:?$Id$ from?__future__?import?absolute_import #?說明:?導入公共模塊 from?flask?import?request from?datetime?import?datetime from?flask_socketio?import?join_room,?leave_room #?說明:?導入其它模塊 from?.?import?io io.room_users?=?{} io.reqid_info?=?{} @io.on('connect') def?connect_event_handler():@io.on('message')def?user_join_handler(action,?data):if?action?==?'user?join':room?=?data['room']user?=?data['user']susr?=?'{0}_{1}'.format(user,?request.sid)io.room_users.setdefault(room,?[]).append(susr)join_room(room)io.reqid_info.setdefault(request.sid,?{}).update({'room':?room,'user':?user,})message?=?'#=>?[{0}]?user:?{1}?action:?{2}?room?{3}.'.format(datetime.now(),?susr,?'join',?room)print?messageprint?'#=>?current?rooms:',?io.room_users.keys()users?=?[]for?item_users?in?io.room_users.itervalues():users.extend(item_users)print?'#=>?current?users:',?usersio.send({'action':?'sys?boradcast',?'message':?message})@io.on('disconnect')def?disconnect_handler():user?=?io.reqid_info[request.sid]['user']susr?=?'{0}_{1}'.format(user,?request.sid)room?=?io.reqid_info[request.sid]['room']io.room_users[room].remove(susr)message?=?'#=>?[{0}]?user:?{1}?action:?{2}?room?{3}.'.format(datetime.now(),?susr,?'leave',?room)print?messageprint?'#=>?current?rooms:',?io.room_users.keys()users?=?[]for?item_users?in?io.room_users.values():users.extend(item_users)print?'#=>?current?users:',?usersio.send({'action':?'sys?boradcast',?'message':?message})leave_room(room)

說明: 如上定義io.room_users = {},io.reqid_info = {}分別存儲房間用戶和請求信息,主要是為了存儲在線用戶以及用戶信息,實際生產(chǎn)中可用Redis等后端替換.

擴展: 此插件運行時內(nèi)存中也維護了一份兒request.sid屬于哪個namespace哪個room,但由于connect時沒有指定namespace和room而room并沒有默認值,作者使用了None代替,并且為了后續(xù)的雙向回調(diào)跟蹤,所以在room中也包含了request.sid,而且在接口方面也做的不盡人意,所以還是推薦自己手動實現(xiàn)幾個全局對象來存儲這些信息.


# 方式二: 服務端通過emit發(fā)送的命名事件數(shù)據(jù),指定broadcast=True和room=xxoo額外參數(shù),可配合namespace/include_self額外參數(shù)來控制消息發(fā)往的應用,發(fā)往的房間,是否發(fā)給自己,服務端提供了join_room和leave_room函數(shù)來對請求分組,客戶端只能使用自定義事件接收處理


<script?type="text/javascript">var?socket?=?io.connect(location.protocol+'//'+document.domain+':'+location.port);var?name?=?['zmanman',?'qmanman',?'smanman',?'lmanman',][Math.ceil(Math.random()*3)];socket.on('connect',?function(){socket.emit('user?join',?{'room':?'room1',?'user':?name});});socket.on('sys?broadcast',?function(data){console.log('公告:?'?+?data.message);}); </script>
#!/usr/bin/env?python #?-*-?coding:?utf-8?-*- #?@Date????:?2016-12-27?19:22:04 #?@Author??:?李滿滿?(xmdevops@vip.qq.com) #?@Link????:?http://xmdevops.blog.51cto.com/ #?@Version?:?$Id$ from?__future__?import?absolute_import #?說明:?導入公共模塊 import?inspect from?flask?import?request from?datetime?import?datetime from?flask_socketio?import?join_room,?leave_room #?說明:?導入其它模塊 from?.?import?io io.room_users?=?{} io.reqid_info?=?{} @io.on('connect') def?connect_event_handler():@io.on('user?join')def?user_join_handler(data):user?=?data['user']room?=?data['room']susr?=?'{0}_{1}'.format(user,?request.sid)io.room_users.setdefault(room,?[]).append(susr)io.reqid_info.setdefault(request.sid,?{}).update({'room':?room,'user':?user,})join_room(room)message?=?'#=>?[{0}]?user:?{1}?action:?{2}?room:?{3}.'.format(datetime.now(),?susr,?inspect.stack()[0][-4:-2],?room)print?messageusers?=?[]for?item_users?in?io.room_users.itervalues():users.extend(item_users)print?'#=>?current?users:?',?usersio.emit('sys?broadcast',?{'message':?message})@io.on('disconnect')def?disconnect_handler():user?=?io.reqid_info[request.sid]['user']susr?=?'{0}_{1}'.format(user,?request.sid)room?=?io.reqid_info[request.sid]['room']io.room_users[room].remove(susr)message?=?'#=>?[{0}]?user:?{1}?action:?{2}?room:?{3}.'.format(datetime.now(),?susr,?inspect.stack()[0][-4:-2],?room)print?messageio.emit('sys?broadcast',?{'message':?message})leave_room(room)


錯誤處理:


#?說明:?處理指定NameSpace的異常 @io.on_error() def?error_handler(e):print?request.event['message']print?request.event['args'] #?說明:?處理所有NameSpace的異常 @io.on_error_default def?default_error_handler(e):print?request.event['message']print?request.event['args']

說明: request.event是在被io.on裝飾的時候被附加上去的屬性,是一個字典,默認包含message和args,也就是事件名和事件相關的參數(shù),在出現(xiàn)異常時可通過打印它們來獲取異常請求信息.


異步切換:


說明: flask-socketio和客戶端和服務端的交互是雙向的,當你循環(huán)send/emit的時候會出現(xiàn)緩沖區(qū)阻塞,可通過io.sleep(0.1)或import eventlet;eventlet.monkey_patch()打補丁來實現(xiàn)異步IO,但是這樣發(fā)送極快,如果使用eventlet或基于eventlet的gevent異步協(xié)程庫的時候使用eventlet.sleep(0.1),減慢切換時間,這樣防止瀏覽器端卡死.


全局對象:


說明: 作為FLASK插件,基于程序上下文可使用current_app, g全局對象,基于請求上下文可使用request, session全局對象, 所有的會話是基于request.sid, 且運行時會自動注冊event和namespace到request對象.


用戶驗證:


說明: flask-socketio默認是不支持用戶驗證的,可借助flask-login插件通過login_user()函數(shù)來將用戶ID信息存儲到本地,然后通過程序上下文中的current_user對象來還原用戶對象,判斷是否登錄是否有權限訪問等.


消息隊列:


說明: flask-socketio還支持多消費者分布式橫向擴展,只需在實例化SocketIO時指定async_mode和message_queue,message_queue目前只支持redis://和amqp://,接口分別使用的是redis和kombu.由于每個節(jié)點都維護著一份兒客戶端連接集,所以如果使用了前端Nginx負載均衡,需要Session同步或IP_HASH算法負載,節(jié)點啟動時會自動訂閱flask-socketio頻道,所以我們可以用PY-REDIS客戶端去發(fā)布消息,那么所有節(jié)點都可以獲取到消息然后再分發(fā)給指定的客戶端.

轉載于:https://blog.51cto.com/xmdevops/1889053

總結

以上是生活随笔為你收集整理的网站后端_Flask-第三方库.利用Flask-Socketio扩展构建实时流应用?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。