日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送

發布時間:2025/3/8 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

使用 SSE(Server-Sent Events) 進行 HTTP 服務器推送

這個示例是一個類似 twitter 的 web 應用程序,使用 Server-Sent Events 來支持實時刷新。

運行

docker-compose up

然后, 瀏覽 http://localhost:8080

您可以添加自己的帖子或點擊按鈕獲得隨機生成的帖子。

無論哪種方式,feeds 列表和 feed 中的帖子都應該是最新的。嘗試使用第二個瀏覽器窗口查看更新。

它是如何工作的

  • 可以創建和更新帖子。

  • 帖子可以包含標簽。

  • 每個標簽都有自己的 feed,其中包含來自該標簽的所有帖子。

  • 所有的帖子都存儲在 MySQL 中。這就是寫模型。

  • 所有 feed 都異步更新并存儲在 MongoDB 中。這是讀模型。

為什么要使用單獨的寫和讀模型?

對于這個示例應用程序,使用多語言持久性(兩個數據庫引擎)當然有些過頭了。我們這樣做是為了展示這個技術,以及如何很容易地將它應用到 Watermill。

專用的讀模型對于具有高讀/寫比率的應用程序是一種有用的模式。所有寫操作都被原子地應用到寫模型(在我們的例子中是 MySQL)。事件處理程序異步更新讀模型(我們使用 Mongo)。

讀取模型中的數據可以按原樣使用。也可以獨立于寫模型進行擴展。

請記住,要使用此模式,應用程序中必須接受最終的一致性。而且,在大多數用例中,您可能不需要使用它。務實!

SSE Router

SSERouter?來自 watermill-http。當創建一個新的路由器時,你需要傳遞一個上游訂閱者。來自該訂閱服務器的消息將觸發通過 HTTP 推送更新。

在本例中,我們使用 NATS 作為 Pub/Sub,但這可以是 Watermill 支持的任何 Pub/Sub。

sseRouter, err := watermillHTTP.NewSSERouter(
watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)

Stream Adapters(流適配器)

要使用?SSERouter,你需要準備一個帶有兩個方法的?StreamAdapter。

GetResponse?類似于標準的 HTTP 處理程序。修改現有的處理程序來匹配這個簽名應該非常容易。

Validate?是一個額外的方法,它告訴我們是否應該為特定的?Message?推送更新。

type StreamAdapter interface {
// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}

Validate?示例如下所示。它檢查消息是否來自與用戶通過 HTTP 請求發送的相同的 post ID。

func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
postUpdated := PostUpdated{}

err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}

postID := chi.URLParam(r, "id")

return postUpdated.OriginalPost.ID == postID
}

如果你想為每條消息觸發一個更新,你可以簡單地返回?true。

func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
return true
}

在開始?SSERouter?之前,您需要添加帶有特定主題的處理程序。?AddHandler?返回一個可以在任何路由庫中使用的標準 HTTP 處理程序。

postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)

// ...

r.Get("/posts/{id}", postHandler)

Event handlers(事件處理程序)

該示例使用 Watermill 進行所有異步通信,包括 SSE。

發布了以下事件:

  • PostCreated

    • 將 post 添加到貼子中包含標簽的所有 feeds 中。

  • FeedUpdated

    • 將更新推送到當前訪問 feed 頁面的所有客戶端。

  • PostUpdated

    • a) 對于現有標簽,帖子內容將在標簽中更新。

    • b) 如果添加了新的標簽,文章將被添加到標簽的 feed 中。

    • c) 如果標簽已刪除,則該帖子將從標簽的 feed 中刪除。

    • 將更新推送給所有當前訪問 post 頁面的客戶端。

    • 使用帖子中存在的標簽更新所有 feeds 中的帖子

前端 app

前端應用程序是使用 Vue.js 和 Bootstrap 構建的。

最有趣的部分是?EventSource?的使用。

this.es = new EventSource('/api/feeds/' + this.feed)

this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);

Refs

  • watermill.io

總結

以上是生活随笔為你收集整理的服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。