日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq中消息的存储

發布時間:2023/12/20 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq中消息的存储 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

1. 大概原理:

所有隊列中的消息都以append的方式寫到一個文件中,當這個文件的大小超過指定的限制大小后,關閉這個文件再創建一個新的文件供消息的寫入。文件名(*.rdq)從0開始然后依次累加。當某個消息被刪除時,并不立即從文件中刪除相關信息,而是做一些記錄,當垃圾數據達到一定比例時,啟動垃圾回收處理,將邏輯相鄰的文件中的數據合并到一個文件中。

2. 消息的讀寫及刪除:

rabbitmq在啟動時會創建msg_store_persistent,msg_store_transient兩個進程,一個用于持久消息的存儲,一個用于內存不夠時,將存儲在內存中的非持久化數據轉存到磁盤中。所有隊列的消息的寫入和刪除最終都由這兩個進程負責處理,而消息的讀取則可能是隊列本身直接打開文件進行讀取,也可能是發送請求由msg_store_persisteng/msg_store_transient進程進行處理。

在進行消息的存儲時,rabbitmq會在ets表中記錄消息在文件中的映射,以及文件的相關信息。消息讀取時,根據消息ID找到該消息所存儲的文件,在文件中的偏移量,然后打開文件進行讀取。消息的刪除只是從ets表刪除指定消息的相關信息,同時更新消息對應存儲的文件的相關信息(更新文件有效數據大小)。

-record(msg_location, { msg_id, %%消息IDref_count, %%引用計數file, %%消息存儲的文件名offset, %%消息在文件中的偏移量total_size %%消息的大小}).-record(file_summary, { file, %%文件名valid_total_size, %%文件有效數據大小left, %%位于該文件左邊的文件right, %%位于該文件右邊的文件file_size, %%文件總的大小locked, %%上鎖標記 垃圾回收時防止對文件進行操作readers %%當前讀文件的隊列數})

3. 垃圾回收:

由于執行消息刪除操作時,并不立即對在文件中對消息進行刪除,也就是說消息依然在文件中,僅僅是垃圾數據而已。當垃圾數據超過一定比例后(默認比例為50%),并且至少有三個及以上的文件時,rabbitmq觸發垃圾回收。垃圾回收會先找到符合要求的兩個文件(根據#file_summary{}中left,right找邏輯上相鄰的兩個文件,并且兩個文件的有效數據可在一個文件中存儲),然后鎖定這兩個文件,并先對左邊文件的有效數據進行整理,再將右邊文件的有效數據寫入到左邊文件,同時更新消息的相關信息(存儲的文件,文件中的偏移量),文件的相關信息(文件的有效數據,左邊文件,右邊文件),最后將右邊的文件刪除。


4. 性能考慮:

(1)操作引用計數(flying_ets)

隊列在進行消息的寫入和刪除操作前,會在flying_ets表里通過+1,-1的方式進行計數,然后投遞請求給msg_store_persistent/msg_store_transient進程進行處理,進程在真正寫操作或者刪除之前會再次判斷flying_ets中對應消息的計數決定是否需要進行相應操作。這樣,對于頻繁寫入和刪除的操作,概率減少實際的寫入和刪除。

client_write(MsgId, Msg, Flow,CState=#client_msstate{cur_file_cache_ets=CurFileCacheEts,client_ref=CRef}) ->ok = client_update_flying(+1, MsgId, CState),ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),ok = server_cast(CState, {write, CRef, MsgId, Flow}).remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],server_cast(CState, {remove, CRef, MsgIds}).client_update_flying(Diff, MsgId,#client_msstate{flying_ets = FlyingEts,client_ref = CRef}) ->Key = {MsgId, CRef},case ets:insert_new(FlyingEts, {Key, Diff}) oftrue ->ok;false ->try ets:update_counter(FlyingEts, Key, {2, Diff}) of...end.handle_cast({write, CRef, MsgId, Flow},State = #msstate{cur_file_cache_ets=CurFileCacheEts,clients=Clients}) ->...true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),case update_flying(-1, MsgId, CRef, State) ofprocess ->[{MsgId,Msg,_PWC}]=ets:lookup(CurFileCacheEts, MsgId),noreply(write_message(MsgId, Msg, CRef, State));ignore ->...end;handle_cast({remove, CRef, MsgIds}, State) ->{RemovedMsgIds, State1} =lists:foldl(fun (MsgId, {Removed, State2}) ->case update_flying(+1, MsgId, CRef, State2) ofprocess ->{[MsgId | Removed],remove_message(MsgId, CRef, State2)};ignore ->{Removed, State2}endend, {[], State}, MsgIds),...update_flying(Diff,MsgId,CRef,#msstate{flying_ets = FlyingEts }) ->Key = {MsgId, CRef},NDiff = -Diff,case ets:lookup(FlyingEts, Key) of[] ->ignore;[{_, Diff}] ->ignore;[{_, NDiff}] ->ets:update_counter(FlyingEts, Key, {2, Diff}),true = ets:delete_object(FlyingEts, {Key, 0}),process;[{_, 0}] ->true = ets:delete_object(FlyingEts, {Key, 0}),ignore;[{_, Err}] ->throw({bad_flying_ets_record, Diff, Err, Key})end.

(2)盡可能的并發讀

在讀取消息的時候,都先根據消息ID找到對應存儲的文件,如果文件存在并且未被鎖住,則直接打開文件,從指定位置讀取消息的內容。

如果消息存儲的文件被鎖住了,或者對應的文件不存在了,則發送請求,由msg_store_persistent/msg_store_transient進程進行處理。

(3)消息緩存

1)利用ets表進行緩存?

對于當前正在寫的文件,所有消息在寫入前都會在cur_file_cache_ets表中存一份,消息讀取時會優先從這里進行查找。文件關閉時,會將cur_file_cache_ets表中引用計數為0的消息進行清除。

2)file_handle_cache的寫緩存

rabbitmq中對文件的操作封轉到了file_handle_cache模塊,以寫模式打開文件時,默認有1M大小的緩存,即在進行文件的寫操作時,是先寫入到這個緩存中,當緩存超過大小或者顯式刷新,才將緩存中的內容刷入磁盤中。

rabbit_msg_store.erl-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MBopen_file(Dir, FileName, Mode) ->file_handle_cache:open(form_filename(Dir, FileName),?BINARY_MODE ++ Mode,[{write_buffer,?HANDLE_CACHE_BUFFER_SIZE}]).file_handle_cache.erlappend(Ref,Data) ->with_handles([Ref],fun ([#handle { is_write = false }]) ->{error, not_open_for_writing};([Handle]) ->case maybe_seek(eof, Handle) of{{ok, _Offset}, #handle{hdl = Hdl,offset = Offset,write_buffer_size_limit = 0,at_eof = true }= Handle1} ->Offset1 = Offset + iolist_size(Data),{prim_file:write(Hdl, Data),[Handle1#handle{is_dirty=true,offset=Offset1 }]};{{ok, _Offset},#handle{write_buffer = WriteBuffer,write_buffer_size = Size,write_buffer_size_limit= Limit,at_eof = true } = Handle1} ->WriteBuffer1 = [Data | WriteBuffer],Size1 = Size + iolist_size(Data),Handle2=Handle1#handle{write_buffer=WriteBuffer1,write_buffer_size=Size1},case Limit =/= infinity andalso Size1 > Limit oftrue ->{Result,Handle3} = write_buffer(Handle2),{Result, [Handle3]};false ->{ok, [Handle2]}end;{{error, _} = Error, Handle1} ->{Error, [Handle1]}endend).



轉載于:https://my.oschina.net/hncscwc/blog/182083

總結

以上是生活随笔為你收集整理的rabbitmq中消息的存储的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。