大话ion系列(三)
點(diǎn)擊上方“LiveVideoStack”關(guān)注我們
作者 | 王朋闖
本文為王朋闖老師創(chuàng)作的系列ion文章,LiveVideoStack已獲得授權(quán)發(fā)布,未來將持續(xù)更新。
大話ion系列(一)
大話ion系列(二)
五、offer與answer流程
1.前言
之前的文章已經(jīng)介紹了前兩次重協(xié)商:
客戶端sdk的pub的dc已經(jīng)打通,此時(shí)使用dc控制simulcast和監(jiān)聽audiolevel speaker。
客戶端sdk的sub訂閱到了房間內(nèi)的流。
接下來,SDK推流是第三次協(xié)商了。
2.offer流程
當(dāng)點(diǎn)擊ion-sfu的demo里“publish”按鈕的時(shí)候,就會(huì)觸發(fā)ion-sdk-js的操作:
把音視頻track增加到pub的pc,此時(shí)會(huì)觸發(fā)onNegotiationNeeded。
首先來看一下ion-sdk-js的代碼:
this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);這里把onNegotiationNeeded綁定到了pc.onnegotiationneeded,意思是當(dāng)推流增加track到pc時(shí),就會(huì)觸發(fā)onNegotiationNeeded。
參考:
https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/onnegotiationneeded
接下來看一下onNegotiationNeeded,是個(gè)標(biāo)準(zhǔn)的重協(xié)商流程。
private async onNegotiationNeeded() {if (!this.transports) {throw Error(ERR_NO_SESSION);}let offer: RTCSessionDescriptionInit | undefined;let answer: RTCSessionDescriptionInit | undefined;try {offer = await this.transports[Role.pub].pc.createOffer();await this.transports[Role.pub].pc.setLocalDescription(offer);answer = await this.signal.offer(offer);//在這里發(fā)送offer到SFUawait this.transports[Role.pub].pc.setRemoteDescription(answer);} catch (err) {/* tslint:disable-next-line:no-console */console.error(err);if (this.onerrnegotiate) this.onerrnegotiate(Role.pub, err, offer, answer);}}接下來看一下SFU里的處理:
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { .... case "offer":var negotiation Negotiationerr := json.Unmarshal(*req.Params, &negotiation)if err != nil {p.Logger.Error(err, "connect: error parsing offer")replyError(err)break}//調(diào)用peerLocal.Answer()answer, err := p.Answer(negotiation.Desc)if err != nil {replyError(err)break}// 發(fā)送answer_ = conn.Reply(ctx, req.ID, answer) func (p *PeerLocal) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) { ....//這里調(diào)用了publisher.Answer()answer, err := p.publisher.Answer(sdp)if err != nil {return nil, fmt.Errorf("error creating answer: %v", err)}Logger.V(0).Info("PeerLocal send answer", "peer_id", p.id)return &answer, nil } // 這里可以看到Publisher.Answer就是標(biāo)準(zhǔn)的協(xié)商流程 // 在前邊的文章詳細(xì)介紹過什么叫協(xié)商和重協(xié)商,這里不在重復(fù)了 func (p *Publisher) Answer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) {if err := p.pc.SetRemoteDescription(offer); err != nil {return webrtc.SessionDescription{}, err}for _, c := range p.candidates {if err := p.pc.AddICECandidate(c); err != nil {Logger.Error(err, "Add publisher ice candidate to peer err", "peer_id", p.id)}}p.candidates = nilanswer, err := p.pc.CreateAnswer(nil)if err != nil {return webrtc.SessionDescription{}, err}if err := p.pc.SetLocalDescription(answer); err != nil {return webrtc.SessionDescription{}, err}return answer, nil }3.answer流程
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { ....case "answer":var negotiation Negotiationerr := json.Unmarshal(*req.Params, &negotiation)if err != nil {p.Logger.Error(err, "connect: error parsing offer")replyError(err)break}// peerLocal.SetRemoteDescriptionerr = p.SetRemoteDescription(negotiation.Desc)if err != nil {replyError(err)}func (p *PeerLocal) SetRemoteDescription(sdp webrtc.SessionDescription) error {if p.subscriber == nil {return ErrNoTransportEstablished}p.Lock()defer p.Unlock()Logger.V(0).Info("PeerLocal got answer", "peer_id", p.id)// 這里調(diào)用subscriber.SetRemoteDescriptionif err := p.subscriber.SetRemoteDescription(sdp); err != nil {return fmt.Errorf("setting remote description: %w", err)}p.remoteAnswerPending = falseif p.negotiationPending {//這里兩個(gè)標(biāo)志位是為了防止重協(xié)商競(jìng)爭(zhēng)沖突p.negotiationPending = falsep.subscriber.negotiate()}return nil }//這里僅僅是調(diào)用pc.SetRemoteDescription func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error {if err := s.pc.SetRemoteDescription(desc); err != nil {Logger.Error(err, "SetRemoteDescription error")return err} .....return nil }至此,第三次重協(xié)商完成,兩端交換完sdp,接下來ice打通后會(huì)推流過來。
4.總結(jié)
pub在第一次協(xié)商后,只打通了dc,此時(shí)使用dc控制simulcast和監(jiān)聽audiolevel speaker,也可以定制自己的dc。
這樣的好處是靈活。
sub在第二次協(xié)商后,可以訂閱到房間內(nèi)的其他人的流了。
pub在第三次協(xié)商時(shí),是增加音視頻track后,然后走標(biāo)準(zhǔn)重協(xié)商流程,開始推流。
六、包的收發(fā)流程
1.前言
本文從ion-sfu中的demo點(diǎn)擊“publish”開始,講一下如何收包轉(zhuǎn)發(fā)。
前邊講到,點(diǎn)擊“publish”,會(huì)進(jìn)行第三次重協(xié)商,協(xié)商完成,客戶端此時(shí)推流到SFU。
此時(shí)會(huì)觸發(fā)OnTrack,這里的OnTrack和標(biāo)準(zhǔn)webrtc接口是一樣的,會(huì)在流到達(dá)時(shí)自動(dòng)觸發(fā)。
參考:
https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/ontrack
2.收包流程
OnTrack收包流程的入口
首先貼一下老代碼,OnTrack是在NewPublisher里邊設(shè)置的回調(diào)。
func NewPublisher(idstring,sessionSession,cfg*WebRTCTransportConfig)(*Publisher,error){ 。。。//這里要注意cfg.Setting,里邊的bufferFactory已經(jīng)設(shè)置好了為自定義的c.BufferFactory.GetOrNew//可以搜一下這個(gè)函數(shù)NewWebRTCTransportConfig,這一行“se.BufferFactory = c.BufferFactory.GetOrNew”api:=webrtc.NewAPI(webrtc.WithMediaEngine(me),webrtc.WithSettingEngine(cfg.Setting))pc,err:=api.NewPeerConnection(cfg.Configuration) 。。。p:=&Publisher{id: id,pc: pc,cfg: cfg,router: newRouter(id,session,cfg),session:session,} //===========OnTrack在這里pc.OnTrack(func(track*webrtc.TrackRemote,receiver*webrtc.RTPReceiver){ 。。。//這里AddReceiver會(huì)新建WebRTCReceiver,然后AddUpTrack//uptrack是收流的,downtrack是發(fā)流的r,pub:=p.router.AddReceiver(receiver,track)if pub{//這里會(huì)把流發(fā)布到房間內(nèi),其他peer會(huì)訂閱到p.session.Publish(p.router,r)p.mu.Lock()publisherTrack:=PublisherTrack{track,r,true}p.tracks=append(p.tracks,publisherTrack) 。。。p.mu.Unlock()if handler,ok:=p.onPublisherTrack.Load().(func(PublisherTrack));ok&&handler!=nil{//這里如果上層業(yè)務(wù),通過OnPublisherTrack設(shè)置了回調(diào),就會(huì)觸發(fā)//一般只有包導(dǎo)入的情況下,才會(huì)這樣用,比如業(yè)務(wù)不想加入房間就自動(dòng)訂閱,想要按需訂閱handler(publisherTrack)}} else {p.mu.Lock()p.tracks=append(p.tracks,PublisherTrack{track,r,false})p.mu.Unlock()}})//===========自定義buffer
這里不得不介紹一下自定義buffer了,看懂了才知道包是從哪里來的。
Pion/webrtc支持自定義BufferFactory,設(shè)置好之后,pion/webrtc的組件會(huì)使用自定義buffer。
比如pion/srtp是實(shí)際收發(fā)srtp和srtcp包的類,它們也會(huì)使用自定義buffer。
首先來看一下ion-sfu是在哪里設(shè)置自定義buffer的:
func NewWebRTCTransportConfig(cConfig)WebRTCTransportConfig{//這個(gè)SettingEngine是pion里很重要的設(shè)置類,可以控制pion/webrtc很多行為和參數(shù),比如ice-lite等se:=webrtc.SettingEngine{}se.DisableMediaEngineCopy(true)....//這里把自定義的BufferFactory給配置進(jìn)去了//意思是pion/srtp會(huì)使用這個(gè)buffer來傳包se.BufferFactory=c.BufferFactory.GetOrNew }srtp和srtcp流向是這樣的:
客戶端---srtp--->srtp.ReadStreamSRTP------->SFU 客戶端<---srtcp---srtp.ReadStreamSRTCP<------SFU當(dāng)包到達(dá)pion/srtp時(shí),就會(huì)觸發(fā)ReadStreamSRTP.init函數(shù)和ReadStreamSRTCP.init函數(shù)。
ReadStreamSRTP.init調(diào)用自定義的BufferFactory.GetOrNew函數(shù)了,new了一個(gè)buffer。
ReadStreamSRTCP.init調(diào)用自定義的BufferFactory.GetOrNew函數(shù),new一個(gè)rtcpReader。
之后收發(fā)rtp和rtcp包,就會(huì)流經(jīng)這個(gè)buffer和rtcpReader:
https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53
為什么這么搞呢?
仔細(xì)想想,如果控制了rtp和rtcp的buffer,是不是計(jì)算twcc、nack、stats等就很方便了?在buffer寫入包的同時(shí),就可以通過設(shè)置的回調(diào)函數(shù)搞各種復(fù)雜計(jì)算。
router.AddReceiver
接下來可以看到buffer的各種回調(diào)。
func(r*router)AddReceiver(receiver*webrtc.RTPReceiver,track*webrtc.TrackRemote)(Receiver,bool){r.Lock()deferr.Unlock()publish:=falsetrackID:=track.ID()//這里獲取了之前init函數(shù)中,new出來的buffer和rtcpReader,開始搞事情buff,rtcpReader:=r.bufferFactory.GetBufferPair(uint32(track.SSRC()))//設(shè)置rtcp的回調(diào),比如nack、twcc、rrbuff.OnFeedback(func(fb[]rtcp.Packet){r.rtcpCh<-fb})if track.Kind()==webrtc.RTPCodecTypeAudio{streamID:=track.StreamID()//如果是音頻track,設(shè)置OnAudioLevel回調(diào)buff.OnAudioLevel(func(leveluint8){r.session.AudioObserver().observe(streamID,level)})r.session.AudioObserver().addStream(streamID)}else if track.Kind()==webrtc.RTPCodecTypeVideo{if r.twcc==nil{//如果是視頻track,創(chuàng)建twcc計(jì)算器,并設(shè)置回調(diào),當(dāng)計(jì)算器生成twcc包就會(huì)回調(diào)r.twcc=twcc.NewTransportWideCCResponder(uint32(track.SSRC()))r.twcc.OnFeedback(func(prtcp.RawPacket){r.rtcpCh<-[]rtcp.Packet{&p}})}//設(shè)置buffer的twcc回調(diào),buffer收到包后調(diào)用,塞入twcc計(jì)算器//twcc計(jì)算生成rtcp包,再回調(diào)OnFeedback發(fā)送給客戶端buff.OnTransportWideCC(func(snuint16,timeNSint64,markerbool){r.twcc.Push(sn,timeNS,marker)})}if r.config.WithStats{r.stats[uint32(track.SSRC())]=stats.NewStream(buff)}//設(shè)置rtcpReader.OnPacketrtcpReader.OnPacket(func(bytes[]byte){//收到SDES、SR包做些處理})recv,ok:=r.receivers[trackID]if!ok{//創(chuàng)建WebRTCReceiver并設(shè)置回調(diào)recv=NewWebRTCReceiver(receiver,track,r.id)r.receivers[trackID]=recvrecv.SetRTCPCh(r.rtcpCh)recv.OnCloseHandler(func(){。。。。})publish=true}//把track buffer塞入recvrecv.AddUpTrack(track,buff,r.config.Simulcast.BestQualityFirst)//初始化buffbuff.Bind(receiver.GetParameters(),buffer.Options{MaxBitRate:r.config.MaxBandwidth,})。。。return recv,publish }這里很重要,WebRTCReceiver是真實(shí)負(fù)責(zé)收發(fā)包的,可以看到AddUpTrack已經(jīng)把buffer塞進(jìn)去了。
接下來看一下AddUpTrack是如何工作的:
func(w*WebRTCReceiver)AddUpTrack(track*webrtc.TrackRemote,buff*buffer.Buffer,bestQualityFirstbool){if w.closed.get(){return}//根據(jù)RID來區(qū)分layervarlayerintswitchtrack.RID(){//如果沒開simulcast,為""casefullResolution:layer=2casehalfResolution:layer=1default:layer=0//如果沒開simulcast,為0}w.Lock()//設(shè)置空域?qū)觢ayer的trackw.upTracks[layer]=track//設(shè)置空域?qū)觢ayer的buffw.buffers[layer]=buffw.available[layer].set(true)//設(shè)置空域?qū)觢ayer的downtrack,這里的[]*DownTrack數(shù)組,訂閱該layer的downtrack存在這里w.downTracks[layer].Store(make([]*DownTrack,0,10))w.pendingTracks[layer]=make([]*DownTrack,0,10)w.Unlock()//閉包函數(shù),按最佳質(zhì)量訂閱,切到f層subBestQuality:=func(targetLayerint){for l:=0;l<targetLayer;l++{dts:=w.downTracks[l].Load()if dts==nil{continue}for_,dt:=rangedts.([]*DownTrack){_=dt.SwitchSpatialLayer(int32(targetLayer),false)}}}//閉包函數(shù),按最差質(zhì)量訂閱,切到q層subLowestQuality:=func(targetLayerint){for l:=2;l!=targetLayer;l--{dts:=w.downTracks[l].Load()if dts==nil{continue}for_,dt:=rangedts.([]*DownTrack){_=dt.SwitchSpatialLayer(int32(targetLayer),false)}}}//是否開啟大小流if w.isSimulcast{//如果配置最佳質(zhì)量,則等到f層到來時(shí),訂閱它if bestQualityFirst&&(!w.available[2].get()||layer==2){subBestQuality(layer)//如果配置最差質(zhì)量,則等到q層到來時(shí),訂閱它}else if!bestQualityFirst&&(!w.available[0].get()||layer==0){subLowestQuality(layer)}}//啟動(dòng)讀寫流程go w.writeRTP(layer) }真正的收發(fā)包流程來了:
func(w*WebRTCReceiver)writeRTP(layerint){defer func(){//這里設(shè)置自動(dòng)清理函數(shù)w.closeOnce.Do(func(){w.closed.set(true)w.closeTracks()})}()//創(chuàng)建一個(gè)PLI包,后邊要用pli:=[]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(),MediaSSRC:w.SSRC(layer)},}for {//這里可以看到,真正讀包是從buffer里讀出來的,正是前邊講到的自定義bufferpkt,err:=w.buffers[layer].ReadExtended()if err==io.EOF{return}//如果開啟大小流if w.isSimulcast{ 。。。//這里跳過,以后再講}for_,dt:=rangew.downTracks[layer].Load().([]*DownTrack){//下行track寫入rtp包,這樣訂閱者可以收到流了if err=dt.WriteRTP(pkt,layer);err!=nil{if?err==io.EOF&&err==io.ErrClosedPipe{w.Lock()w.deleteDownTrack(layer,dt.id)w.Unlock()}log.Error().Err(err).Str("id",dt.id).Msg("Error writing to down track")}}}}3.發(fā)包流程
SessionLocal.Publish
func(s*SessionLocal)Publish(routerRouter,rReceiver){for_,p:=ranges.Peers(){// Don't sub to selfif router.ID()==p.ID()||p.Subscriber()==nil{continue}//表示根據(jù)r的信息創(chuàng)建downtrack,并增加到p.Subscriber()和r中if err:=router.AddDownTracks(p.Subscriber(),r);err!=nil{Logger.Error(err,"Error subscribing transport to Router")continue}} }router.AddDownTracks
func(r*router)AddDownTracks(s*Subscriber,recvReceiver)error{ 。。。 //如果recv不為空,表示根據(jù)recv的信息創(chuàng)建downtrack,并增加到s和recv中if recv!=nil{if_,err:=r.AddDownTrack(s,recv);err!=nil{return err}s.negotiate()return nil} //如果recv為空,表示遍歷房間中所有的receivers,并增加到s和recv中if len(r.receivers)>0{for_,rcv:=ranger.receivers{if_,err:=r.AddDownTrack(s,rcv);err!=nil{return err}}s.negotiate()}return nil }router.AddDownTrack
根據(jù)recv的信息創(chuàng)建downtrack,并增加到sub和recv中。
func(r*router)AddDownTrack(sub*Subscriber,recvReceiver)(*DownTrack,error){for_,dt:=rangesub.GetDownTracks(recv.StreamID()){//避免重復(fù)添加if dt.ID()==recv.TrackID(){return dt,nil}}codec:=recv.Codec()if err:=sub.me.RegisterCodec(codec,recv.Kind());err!=nil{return nil,err}//創(chuàng)建downtrack,downtrack用來給客戶端下發(fā)流downTrack,err:=NewDownTrack(webrtc.RTPCodecCapability{MimeType: codec.MimeType,ClockRate: codec.ClockRate,Channels: codec.Channels,SDPFmtpLine: codec.SDPFmtpLine,RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb",""},{"nack",""},{"nack","pli"}},},recv,r.bufferFactory,sub.id,r.config.MaxPacketTrack)if err!=nil{return nil,err}//把downtrack增加到pc中if downTrack.transceiver,err=sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{Direction:webrtc.RTPTransceiverDirectionSendonly,});err!=nil{return nil,err}// 設(shè)置關(guān)閉回調(diào),關(guān)閉時(shí)pc自動(dòng)刪除trackdownTrack.OnCloseHandler(func(){if sub.pc.ConnectionState()!=webrtc.PeerConnectionStateClosed{if err:=sub.pc.RemoveTrack(downTrack.transceiver.Sender());err!=nil{if err==webrtc.ErrConnectionClosed{return}Logger.Error(err,"Error closing down track")}else{//如果刪除成功,再從sub中刪除,然后重協(xié)商sub.RemoveDownTrack(recv.StreamID(),downTrack)sub.negotiate()}}})//設(shè)置OnBind回調(diào),DownTrack.Bind()里會(huì)調(diào)用這個(gè);PC協(xié)商完成時(shí),DownTrack.Bind()會(huì)觸發(fā)downTrack.OnBind(func(){gosub.sendStreamDownTracksReports(recv.StreamID())})//增加downTrack到sub中,sub只是用來管理downtracks和生成SenderReport等sub.AddDownTrack(recv.StreamID(),downTrack)//增加downTrack到WebRTCReceiver中,實(shí)際收發(fā)包是WebRTCReceiver來控制,在writeRTP中recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)returndownTrack,nil }這樣下行track也增加好了,之前的writeRTP可以正常工作了。
4.總結(jié)
收發(fā)包邏輯打通步驟:
SDK推流---->OnTrack---->router.AddReceiver(設(shè)置Buffer和上行Track)------>SessionLocal.Publish(設(shè)置下行Track)---->收發(fā)包邏輯打通
收發(fā)包流程圖簡(jiǎn)單總結(jié):
srtp.write--->buffer.write--->buffer.ReadExtended--->downtrack.writeRTP收包流程堆棧:
github.com/pion/ion-sfu/pkg/buffer.(*Buffer).Write (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:187) github.com/pion/srtp/v2.(*ReadStreamSRTP).write (/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/stream_srtp.go:64) github.com/pion/srtp/v2.(*SessionSRTP).decrypt?(/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/session_srtp.go:166)發(fā)包流程堆棧:
github.com/pion/ion-sfu/pkg/buffer.(*Buffer).ReadExtended (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:236) github.com/pion/ion-sfu/pkg/sfu.(*WebRTCReceiver).writeRTP?(/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/sfu/receiver.go:345)作者簡(jiǎn)介:
王朋闖:前百度RTN資深工程師,前金山云RTC技術(shù)專家,前VIPKID流媒體架構(gòu)師,ION開源項(xiàng)目發(fā)起人。
特別說明:
本文發(fā)布于知乎,已獲得作者授權(quán)轉(zhuǎn)載。
掃描圖中二維碼或點(diǎn)擊閱讀原文
了解大會(huì)更多信息
喜歡我們的內(nèi)容就點(diǎn)個(gè)“在看”吧!
總結(jié)
以上是生活随笔為你收集整理的大话ion系列(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【今晚8点半】:对话袁家军——成都的多媒
- 下一篇: 为什么视频压缩如此重要