在游戏运营行业,Serverless 如何解决数据采集分析痛点?
作者 | 計(jì)緣
來(lái)源|阿里巴巴云原生公眾號(hào)
眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹(shù)。在疫情之前的 2019 年,中國(guó)游戲市場(chǎng)營(yíng)收規(guī)模約 2884.8 億元,同比增長(zhǎng) 17.1%。2020 年因?yàn)橐咔?#xff0c;游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國(guó)網(wǎng)民最普遍的娛樂(lè)方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計(jì),截至 2019 年,中國(guó)移動(dòng)游戲用戶規(guī)模約 6.6 億人,占中國(guó)總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見(jiàn)游戲作為一種低門(mén)檻、低成本的娛樂(lè)手段,已成為大部分人生活中習(xí)以為常的一部分。
對(duì)于玩家而言,市面上的游戲數(shù)量多如牛毛,那么玩家如何能發(fā)現(xiàn)和認(rèn)知到一款游戲,并且持續(xù)的玩下去恐怕是所有游戲廠商需要思考的問(wèn)題。加之 2018 年游戲版號(hào)停發(fā)事件,游戲廠商更加珍惜每一個(gè)已獲得版號(hào)的游戲產(chǎn)品,所以這也使得“深度打磨產(chǎn)品質(zhì)量”和“提高運(yùn)營(yíng)精細(xì)程度”這兩個(gè)游戲產(chǎn)業(yè)發(fā)展方向成為廣大游戲廠商的發(fā)展思路,無(wú)論是新游戲還是老游戲都在努力落實(shí)這兩點(diǎn):
-
新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內(nèi)容。
-
老游戲:通過(guò)用戶行為分析,投入更多的精力和成本,制作更優(yōu)質(zhì)的版本內(nèi)容。
這里我們重點(diǎn)來(lái)看新游戲。一家游戲企業(yè)辛辛苦苦研發(fā)三年,等著新游戲發(fā)售時(shí)一飛沖天。那么問(wèn)題來(lái)了,新游戲如何被廣大玩家看到?
首先來(lái)看看游戲行業(yè)公司的分類(lèi):
-
游戲研發(fā)商:研發(fā)游戲的公司,生產(chǎn)和制作游戲內(nèi)容。比如王者榮耀的所有英雄設(shè)計(jì)、游戲戰(zhàn)斗場(chǎng)景、戰(zhàn)斗邏輯等,全部由游戲研發(fā)公司提供。
-
游戲發(fā)行商:游戲發(fā)行商的主要工作分三大塊:市場(chǎng)工作、運(yùn)營(yíng)工作、客服工作。游戲發(fā)行商把控游戲命脈,市場(chǎng)工作核心是導(dǎo)入玩家,運(yùn)營(yíng)工作核心是將用戶價(jià)值最大化、賺取更多利益。
- 游戲平臺(tái)/渠道商:游戲平臺(tái)和渠道商的核心目的就是曝光游戲,讓盡量多的人能發(fā)現(xiàn)你的游戲。
這三種類(lèi)型的業(yè)務(wù),有專(zhuān)注于其中某一領(lǐng)域的獨(dú)立公司,也有能承接全部業(yè)務(wù)的公司,但無(wú)論那一種,這三者之間的關(guān)系是不會(huì)變的:
所以不難理解,想讓更多的玩家看到你的游戲,游戲發(fā)行和運(yùn)營(yíng)是關(guān)鍵。通俗來(lái)講,如果你的游戲出現(xiàn)在目前所有大家熟知的平臺(tái)廣告中,那么最起碼游戲的新用戶注冊(cè)數(shù)量是很可觀的。因此這就引入了一個(gè)關(guān)鍵詞:買(mǎi)量。
根據(jù)數(shù)據(jù)顯示,2019 年月均買(mǎi)量手游數(shù)達(dá) 6000+ 款,而 2018 年僅為 4200 款。另一方面,隨著抖音、微博等超級(jí) APP 在游戲買(mǎi)量市場(chǎng)的資源傾斜,也助推手游買(mǎi)量的效果和效率有所提升,游戲廠商更愿意使用買(mǎi)量的方式來(lái)吸引用戶。
但需要注意的是,在游戲買(mǎi)量的精準(zhǔn)化程度不斷提高的同時(shí),買(mǎi)量的成本也在節(jié)節(jié)攀升,唯有合理配置買(mǎi)量、渠道與整合營(yíng)銷(xiāo)之間的關(guān)系,才能將宣發(fā)資源發(fā)揮到最大的效果。
通俗來(lái)講,買(mǎi)量其實(shí)就是在各大主流平臺(tái)投放廣告,廣大用戶看到游戲廣告后,有可能會(huì)點(diǎn)擊廣告,然后進(jìn)入游戲廠商的宣傳頁(yè)面,同時(shí)會(huì)采集用戶的一些信息,然后游戲廠商對(duì)采集到的用戶信息進(jìn)行大數(shù)據(jù)分析,進(jìn)行進(jìn)一步的定向推廣。
游戲運(yùn)營(yíng)核心訴求
游戲廠商花錢(qián)買(mǎi)量,換來(lái)的用戶信息以及新用戶注冊(cè)信息是為持續(xù)的游戲運(yùn)營(yíng)服務(wù)的,那么這個(gè)場(chǎng)景的核心訴求就是采集用戶信息的完整性。
比如說(shuō),某游戲廠商一天花 5000w 投放廣告,在某平臺(tái)某時(shí)段產(chǎn)生了每秒 1w 次的廣告點(diǎn)擊率,那么在這個(gè)時(shí)段內(nèi)每一個(gè)點(diǎn)擊廣告的用戶信息要完整的被采集到,然后入庫(kù)進(jìn)行后續(xù)分析。這就對(duì)數(shù)據(jù)采集系統(tǒng)提出了很高的要求。
這其中,最核心的一點(diǎn)就是系統(tǒng)暴露接口的環(huán)節(jié)要能夠平穩(wěn)承載買(mǎi)量期間不定時(shí)的流量脈沖。在買(mǎi)量期間,游戲廠商通常會(huì)在多個(gè)平臺(tái)投放廣告,每個(gè)平臺(tái)投放廣告的時(shí)間是不一樣的,所以就會(huì)出現(xiàn)全天不定時(shí)的流量脈沖現(xiàn)象。如果這個(gè)環(huán)節(jié)出現(xiàn)問(wèn)題,那么相當(dāng)于買(mǎi)量的錢(qián)就打水漂了。
數(shù)據(jù)采集系統(tǒng)傳統(tǒng)架構(gòu)
上圖是一個(gè)相對(duì)傳統(tǒng)的數(shù)據(jù)采集系統(tǒng)架構(gòu),最關(guān)鍵的就是暴露 HTTP 接口回傳數(shù)據(jù)這部分,這部分如果出問(wèn)題,那么采集數(shù)據(jù)的鏈路就斷了。但這部分往往會(huì)面臨兩個(gè)挑戰(zhàn):
-
當(dāng)流量脈沖來(lái)的時(shí)候,這部分是否可以快速擴(kuò)容以應(yīng)對(duì)流量沖擊。
-
游戲運(yùn)營(yíng)具備潮汐特性,并非天天都在進(jìn)行,這就需要考慮如何優(yōu)化資源利用率。
通常情況下,在游戲有運(yùn)營(yíng)活動(dòng)之前,會(huì)提前通知運(yùn)維同學(xué),對(duì)這個(gè)環(huán)節(jié)的服務(wù)增加節(jié)點(diǎn),但要增加多少其實(shí)是無(wú)法預(yù)估的,只能大概拍一個(gè)數(shù)字。這是在傳統(tǒng)架構(gòu)下經(jīng)常會(huì)出現(xiàn)的場(chǎng)景,這就會(huì)導(dǎo)致兩個(gè)問(wèn)題:
-
流量太大,節(jié)點(diǎn)加少了,導(dǎo)致一部分流量的數(shù)據(jù)沒(méi)有采集到。
-
流量沒(méi)有預(yù)期那么大,節(jié)點(diǎn)加多了,導(dǎo)致資源浪費(fèi)。
數(shù)據(jù)采集系統(tǒng) Serverless 架構(gòu)
我們可以通過(guò)函數(shù)計(jì)算 FC 來(lái)取代傳統(tǒng)架構(gòu)中暴露 HTTP 回傳數(shù)據(jù)這部分,從而完美解決傳統(tǒng)架構(gòu)中存在問(wèn)題,參考文章:《資源成本雙優(yōu)化!看 Serverless 顛覆編程教育的創(chuàng)新實(shí)踐》。
先來(lái)看架構(gòu)圖:
傳統(tǒng)架構(gòu)中的兩個(gè)問(wèn)題均可以通過(guò)函數(shù)計(jì)算百毫秒彈性的特性來(lái)解決。我們并不需要去估算營(yíng)銷(xiāo)活動(dòng)會(huì)帶來(lái)多大的流量,也不需要去擔(dān)心和考慮對(duì)數(shù)據(jù)采集系統(tǒng)的性能,運(yùn)維同學(xué)更不需要提前預(yù)備 ECS。
因?yàn)楹瘮?shù)計(jì)算的極致彈性特性,當(dāng)沒(méi)有買(mǎi)量、沒(méi)有營(yíng)銷(xiāo)活動(dòng)的時(shí)候,函數(shù)計(jì)算的運(yùn)行實(shí)例是零。有買(mǎi)量活動(dòng)時(shí),在流量脈沖的情況下,函數(shù)計(jì)算會(huì)快速拉起實(shí)例來(lái)承載流量壓力;當(dāng)流量減少時(shí),函數(shù)計(jì)算會(huì)及時(shí)釋放沒(méi)有請(qǐng)求的實(shí)例進(jìn)行縮容。所以 Serverless 架構(gòu)帶來(lái)的優(yōu)勢(shì)有以下三點(diǎn):
-
無(wú)需運(yùn)維介入,研發(fā)同學(xué)就可以很快的搭建出來(lái)。
-
無(wú)論流量大小,均可以平穩(wěn)的承接。
-
函數(shù)計(jì)算拉起的實(shí)例數(shù)量可以緊貼流量大小的曲線,做到資源利用率最優(yōu)化,再加上按量計(jì)費(fèi)的模式,可以最大程度優(yōu)化成本。
架構(gòu)解析
從上面的架構(gòu)圖可以看到,整個(gè)采集數(shù)據(jù)階段,分了兩個(gè)函數(shù)來(lái)實(shí)現(xiàn),第一個(gè)函數(shù)的作用是單純的暴露 HTTP 接口接收數(shù)據(jù),第二個(gè)函數(shù)用于處理數(shù)據(jù),然后將數(shù)據(jù)發(fā)送至消息隊(duì)列 Kafka 和數(shù)據(jù)庫(kù) RDS。
1. 接收數(shù)據(jù)函數(shù)
我們打開(kāi)函數(shù)計(jì)算控制臺(tái),創(chuàng)建一個(gè)函數(shù):
-
函數(shù)類(lèi)型:HTTP(即觸發(fā)器為 HTTP)
-
函數(shù)名稱(chēng):receiveData
-
運(yùn)行環(huán)境:Python3
-
函數(shù)實(shí)例類(lèi)型:彈性實(shí)例
-
函數(shù)執(zhí)行內(nèi)存:512MB
-
函數(shù)運(yùn)行超時(shí)時(shí)間:60 秒
-
函數(shù)單實(shí)例并發(fā)度:1
-
觸發(fā)器類(lèi)型:HTTP 觸發(fā)器
-
觸發(fā)器名稱(chēng):defaultTrigger
-
認(rèn)證方式:anonymous(即無(wú)需認(rèn)證)
-
請(qǐng)求方式:GET,POST
創(chuàng)建好函數(shù)之后,我們通過(guò)在線編輯器編寫(xiě)代碼:
# -*- coding: utf-8 -*- import logging import json import urllib.parse HELLO_WORLD = b'Hello world!\n' def handler(environ, start_response):logger = logging.getLogger() context = environ['fc.context']request_uri = environ['fc.request_uri']for k, v in environ.items():if k.startswith('HTTP_'):# process custom request headerspasstry: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數(shù)據(jù)request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK"))request_body_obj = json.loads(request_body_str)logger.info(request_body_obj["action"])logger.info(request_body_obj["articleAuthorId"])status = '200 OK'response_headers = [('Content-type', 'text/plain')]start_response(status, response_headers)return [HELLO_WORLD]此時(shí)的代碼非常簡(jiǎn)單,就是接收用戶傳來(lái)的參數(shù),我們可以調(diào)用接口進(jìn)行驗(yàn)證:
可以在函數(shù)的日志查詢(xún)中看到此次調(diào)用的日志:
同時(shí),我們也可以查看函數(shù)的鏈路追蹤來(lái)分析每一個(gè)步驟的調(diào)用耗時(shí),比如函數(shù)接到請(qǐng)求→冷啟動(dòng)(無(wú)活躍實(shí)例時(shí))→準(zhǔn)備代碼→執(zhí)行初始化方法→執(zhí)行入口函數(shù)邏輯這個(gè)過(guò)程:
從調(diào)用鏈路圖中可以看到,剛才的那次請(qǐng)求包含了冷啟動(dòng)的時(shí)間,因?yàn)楫?dāng)時(shí)沒(méi)有活躍實(shí)例,整個(gè)過(guò)程耗時(shí) 418 毫秒,真正執(zhí)行入口函數(shù)代碼的時(shí)間為 8 毫秒。
當(dāng)再次調(diào)用接口時(shí),可以看到就直接執(zhí)行了入口函數(shù)的邏輯,因?yàn)榇藭r(shí)已經(jīng)有實(shí)例在運(yùn)行,整個(gè)耗時(shí)只有 2.3 毫秒:
2. 處理數(shù)據(jù)的函數(shù)
第一個(gè)函數(shù)是通過(guò)在函數(shù)計(jì)算控制臺(tái)在界面上創(chuàng)建的,選擇了運(yùn)行環(huán)境是 Python3,我們可以在官方文檔中查看預(yù)置的 Python3 運(yùn)行環(huán)境內(nèi)置了哪些模塊,因?yàn)榈诙€(gè)函數(shù)要操作 Kafka 和 RDS,所以需要我們確認(rèn)對(duì)應(yīng)的模塊。
從文檔中可以看到,內(nèi)置的模塊中包含 RDS 的 SDK 模塊,但是沒(méi)有 Kafka 的 SDK 模塊,此時(shí)就需要我們手動(dòng)安裝 Kafka SDK 模塊,并且創(chuàng)建函數(shù)也會(huì)使用另一種方式。
1)Funcraft
Funcraft 是一個(gè)用于支持 Serverless 應(yīng)用部署的命令行工具,能幫助我們便捷地管理函數(shù)計(jì)算、API 網(wǎng)關(guān)、日志服務(wù)等資源。它通過(guò)一個(gè)資源配置文件(template.yml),協(xié)助我們進(jìn)行開(kāi)發(fā)、構(gòu)建、部署操作。
所以第二個(gè)函數(shù)我們需要使用 Fun 來(lái)進(jìn)行操作,整個(gè)操作分為四個(gè)步驟:
- 安裝 Fun 工具。
- 編寫(xiě) template.yml 模板文件,用來(lái)描述函數(shù)。
- 安裝我們需要的第三方依賴(lài)。
- 上傳部署函數(shù)。
2)安裝 Fun
Fun 提供了三種安裝方式:
- 通過(guò) npm 包管理安裝 —— 適合所有平臺(tái)(Windows/Mac/Linux)且已經(jīng)預(yù)裝了 npm 的開(kāi)發(fā)者。
- 通過(guò)下載二進(jìn)制安裝 —— 適合所有平臺(tái)(Windows/Mac/Linux)。
- 通過(guò) Homebrew 包管理器安裝 —— 適合 Mac 平臺(tái),更符合 MacOS 開(kāi)發(fā)者習(xí)慣。
文本示例環(huán)境為 Mac,所以使用 npm 方式安裝,非常的簡(jiǎn)單,一行命令搞定:
sudo npm install @alicloud/fun -g安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:
$ fun --version 3.6.20在第一次使用 fun 之前需要先執(zhí)行 fun config 命令進(jìn)行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數(shù)計(jì)算控制臺(tái)首頁(yè)的右上方獲得:
fun config
? Aliyun Account ID ***01
? Aliyun Access Key ID ***qef6j
? Aliyun Access Key Secret ***UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
3)編寫(xiě) template.yml
新建一個(gè)目錄,在該目錄下創(chuàng)建一個(gè)名為 template.yml 的 YAML 文件,該文件主要描述要?jiǎng)?chuàng)建的函數(shù)的各項(xiàng)配置,說(shuō)白了就是將函數(shù)計(jì)算控制臺(tái)上配置的那些配置信息以 YAML 格式寫(xiě)在文件里:
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: FCBigDataDemo: Type: 'Aliyun::Serverless::Service' Properties: Description: 'local invoke demo' VpcConfig: VpcId: 'vpc-xxxxxxxxxxx' VSwitchIds: [ 'vsw-xxxxxxxxxx' ] SecurityGroupId: 'sg-xxxxxxxxx' LogConfig: Project: fcdemo Logstore: fc_demo_store dataToKafka: Type: 'Aliyun::Serverless::Function' Properties: Initializer: index.my_initializer Handler: index.handler CodeUri: './' Description: '' Runtime: python3我們來(lái)解析以上文件的核心內(nèi)容:
-
FCBigDataDemo:自定義的服務(wù)名稱(chēng)。通過(guò)下面的 Type 屬性標(biāo)明是服務(wù),即 Aliyun::Serverless::Service。
-
Properties:Properties 下的屬性都是該服務(wù)的各配置項(xiàng)。
-
VpcConfig:服務(wù)的 VPC 配置,包含:
-
VpcId:VPC ID。
-
VSwitchIds:交換機(jī) ID,這里是數(shù)組,可以配置多個(gè)交換機(jī)。
-
SecurityGroupId:安全組 ID。
-
-
LogConfig:服務(wù)綁定的日志服務(wù)(SLS)配置,包含:
-
Project:日志服務(wù)項(xiàng)目。
-
Logstore:LogStore 名稱(chēng)。
-
-
dataToKafka:該服務(wù)下自定義的函數(shù)名稱(chēng)。通過(guò)下面的 Type 屬性標(biāo)明是函數(shù),即 Aliyun::Serverless::Function。
-
Properties:Properties下的屬性都是該函數(shù)的各配置項(xiàng)。
-
Initializer:配置初始化函數(shù)。
-
Handler:配置入口函數(shù)。
-
Runtime:函數(shù)運(yùn)行環(huán)境。
目錄結(jié)構(gòu)為:
4)安裝第三方依賴(lài)
服務(wù)和函數(shù)的模板創(chuàng)建好之后,我們來(lái)安裝需要使用的第三方依賴(lài)。在這個(gè)示例的場(chǎng)景中,第二個(gè)函數(shù)需要使用 Kafka SDK,所以可以通過(guò) fun 工具結(jié)合 Python 包管理工具 pip 進(jìn)行安裝:
fun install --runtime python3 --package-type pip kafka-python執(zhí)行命令后有如下提示信息:
此時(shí)我們會(huì)發(fā)現(xiàn)在目錄下會(huì)生成一個(gè).fun文件夾 ,我們安裝的依賴(lài)包就在該目錄下:
5)部署函數(shù)
現(xiàn)在編寫(xiě)好了模板文件以及安裝好了我們需要的 Kafka SDK 后,還需要添加我們的代碼文件 index.py,代碼內(nèi)容如下:
# -*- coding: utf-8 -*- import logging import json import urllib.parse from kafka import KafkaProducer producer = None def my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer")global producerproducer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') def handler(event, context):logger = logging.getLogger() # 接收回傳的數(shù)據(jù)event_str = json.loads(event)event_obj = json.loads(event_str)logger.info(event_obj["action"])logger.info(event_obj["articleAuthorId"])# 向Kafka發(fā)送消息global producerproducer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))producer.close()return 'hello world'代碼很簡(jiǎn)單,這里做以簡(jiǎn)單的解析:
-
my_initializer:函數(shù)實(shí)例被拉起時(shí)會(huì)先執(zhí)行該函數(shù),然后再執(zhí)行 handler 函數(shù) ,當(dāng)函數(shù)實(shí)例在運(yùn)行時(shí),之后的請(qǐng)求都不會(huì)執(zhí)行 my_initializer 函數(shù) 。一般用于各種連接的初始化工作,這里將初始化 Kafka Producer 的方法放在了這里,避免反復(fù)初始化 Produer。
-
handler:該函數(shù)只有兩個(gè)邏輯,接收回傳的數(shù)據(jù)和將數(shù)據(jù)發(fā)送至 Kafka 的指定 Topic。
-
下面通過(guò) fun deploy 命令部署函數(shù),該命令會(huì)做兩件事:
- 根據(jù) template.yml 中的配置創(chuàng)建服務(wù)和函數(shù)。
- 將 index.py 和 .fun 上傳至函數(shù)中。
登錄函數(shù)計(jì)算控制臺(tái),可以看到通過(guò) fun 命令部署的服務(wù)和函數(shù):
進(jìn)入函數(shù),也可以清晰的看到第三方依賴(lài)包的目錄結(jié)構(gòu):
3. 函數(shù)之間調(diào)用
目前兩個(gè)函數(shù)都創(chuàng)建好了,下面的工作就是由第一個(gè)函數(shù)接收到數(shù)據(jù)后拉起第二個(gè)函數(shù)發(fā)送消息給 Kafka。我們只需要對(duì)第一個(gè)函數(shù)做些許改動(dòng)即可:
# -*- coding: utf-8 -*- import logging import json import urllib.parse import fc2 HELLO_WORLD = b'Hello world!\n' client = None def my_initializer(context): logger = logging.getLogger() logger.info("init fc client")global clientclient = fc2.Client(endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",accessKeyID="your_ak",accessKeySecret="your_sk") def handler(environ, start_response):logger = logging.getLogger() context = environ['fc.context']request_uri = environ['fc.request_uri']for k, v in environ.items():if k.startswith('HTTP_'):# process custom request headerspasstry: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數(shù)據(jù)request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK"))request_body_obj = json.loads(request_body_str)logger.info(request_body_obj["action"])logger.info(request_body_obj["articleAuthorId"])global clientclient.invoke_function('FCBigDataDemo','dataToKafka',payload=json.dumps(request_body_str),headers = {'x-fc-invocation-type': 'Async'})status = '200 OK'response_headers = [('Content-type', 'text/plain')]start_response(status, response_headers)return [HELLO_WORLD]如上面代碼所示,對(duì)第一個(gè)函數(shù)的代碼做了三個(gè)地方的改動(dòng):
-
導(dǎo)入函數(shù)計(jì)算的庫(kù):import fc2
-
添加初始化方法,用于創(chuàng)建函數(shù)計(jì)算 Client:
這里需要注意的時(shí),當(dāng)我們?cè)诖a里增加了初始化方法后,需要在函數(shù)配置中指定初始化方法的入口:
- 通過(guò)函數(shù)計(jì)算 Client 調(diào)用第二個(gè)函數(shù)
invoke_function 函數(shù)有四個(gè)參數(shù):
-
第一個(gè)參數(shù):調(diào)用函數(shù)所在的服務(wù)名稱(chēng)。
-
第二個(gè)參數(shù):調(diào)用函數(shù)的函數(shù)名稱(chēng)。
-
第三個(gè)參數(shù):向調(diào)用函數(shù)傳的數(shù)據(jù)。
-
第四個(gè)參數(shù):調(diào)用第二個(gè)函數(shù) Request Header 信息。這里主要通過(guò) x-fc-invocation-type 這個(gè) Key 來(lái)設(shè)置是同步調(diào)用還是異步調(diào)用。這里設(shè)置 Async 為異步調(diào)用。
如此設(shè)置,我們便可以驗(yàn)證通過(guò)第一個(gè)函數(shù)提供的 HTTP 接口發(fā)起請(qǐng)求→采集數(shù)據(jù)→調(diào)用第二個(gè)函數(shù)→將數(shù)據(jù)作為消息傳給 Kafka 這個(gè)流程了。
使用兩個(gè)函數(shù)的目的
到這里有些同學(xué)可能會(huì)有疑問(wèn),為什么需要兩個(gè)函數(shù),而不在第一個(gè)函數(shù)里直接向 Kafka 發(fā)送數(shù)據(jù)呢?我們先來(lái)看這張圖:
當(dāng)我們使用異步調(diào)用函數(shù)時(shí),在函數(shù)內(nèi)部會(huì)默認(rèn)先將請(qǐng)求的數(shù)據(jù)放入消息隊(duì)列進(jìn)行第一道削峰填谷,然后每一個(gè)隊(duì)列在對(duì)應(yīng)函數(shù)實(shí)例,通過(guò)函數(shù)實(shí)例的彈性拉起多個(gè)實(shí)例進(jìn)行第二道削峰填谷。所以這也就是為什么這個(gè)架構(gòu)能穩(wěn)定承載大并發(fā)請(qǐng)求的核心原因之一。
4. 配置 Kafka
在游戲運(yùn)營(yíng)這個(gè)場(chǎng)景中,數(shù)據(jù)量是比較大的,所以對(duì) Kafka 的性能要求也是比較高的,相比開(kāi)源自建,使用云上的 Kafka 省去很多的運(yùn)維操作,比如:
- 我們不再需要再維護(hù) Kafka 集群的各個(gè)節(jié)點(diǎn)。
- 不需要關(guān)心主從節(jié)點(diǎn)數(shù)據(jù)同步問(wèn)題。
- 可以快速、動(dòng)態(tài)擴(kuò)展 Kafka 集群規(guī)格,動(dòng)態(tài)增加 Topic,動(dòng)態(tài)增加分區(qū)數(shù)。
- 完善的指標(biāo)監(jiān)控功能,消息查詢(xún)功能。
總的來(lái)說(shuō),就是一切 SLA 都有云上兜底,我們只需要關(guān)注在消息發(fā)送和消息消費(fèi)即可。
所以我們可以打開(kāi) Kafka 開(kāi)通界面,根據(jù)實(shí)際場(chǎng)景的需求一鍵開(kāi)通 Kafka 實(shí)例,開(kāi)通 Kafka 后登錄控制臺(tái),在基本信息中可以看到 Kafka 的接入點(diǎn):
- 默認(rèn)接入點(diǎn):走 VPC 內(nèi)網(wǎng)場(chǎng)景的接入點(diǎn)。
- SSL 接入點(diǎn):走公網(wǎng)場(chǎng)景的接入點(diǎn)。
將默認(rèn)接入點(diǎn)配置到函數(shù)計(jì)算的第二個(gè)函數(shù)中即可。
.... producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') ....然后點(diǎn)擊左側(cè)控制臺(tái) Topic 管理,創(chuàng)建 Topic:
將創(chuàng)建好的 Topic 配置到函數(shù)計(jì)算的第二個(gè)函數(shù)中即可。
... # 第一個(gè)參數(shù)為T(mén)opic名稱(chēng) producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) ...上文已經(jīng)列舉過(guò)云上 Kafka 的優(yōu)勢(shì),比如動(dòng)態(tài)增加 Topic 的分區(qū)數(shù),我們可以在 Topic 列表中,對(duì) Topic 的分區(qū)數(shù)進(jìn)行動(dòng)態(tài)調(diào)整:
單 Topic 最大支持到 360 個(gè)分區(qū),這是開(kāi)源自建無(wú)法做到的。
接下來(lái)點(diǎn)擊控制臺(tái)左側(cè) Consumer Group 管理,創(chuàng)建 Consumer Group:
至此,云上的 Kafka 就算配置完畢了,即 Producer 可以往剛剛創(chuàng)建的 Topic 中發(fā)消息了,Consumer 可以設(shè)置剛剛創(chuàng)建的 GID 以及訂閱 Topic 進(jìn)行消息接受和消費(fèi)。
Flink Kafka 消費(fèi)者
在這個(gè)場(chǎng)景中,Kafka 后面往往會(huì)跟著 Flink,所以這里簡(jiǎn)要給大家介紹一下在 Flink 中如何創(chuàng)建 Kafka Consumer 并消費(fèi)數(shù)據(jù)。代碼片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo"); String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092"); Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo"); FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);以上就是構(gòu)建 Flink Kafka Consumer 和添加 Kafka Source 的代碼片段,還是非常簡(jiǎn)單的。
壓測(cè)驗(yàn)證
至此,整個(gè)數(shù)據(jù)采集的架構(gòu)就搭建完畢了,下面我們通過(guò)壓測(cè)來(lái)檢驗(yàn)一下整個(gè)架構(gòu)的性能。這里使用阿里云 PTS 來(lái)進(jìn)行壓測(cè)。
創(chuàng)建壓測(cè)場(chǎng)景
打開(kāi) PTS 控制臺(tái),點(diǎn)擊左側(cè)菜單創(chuàng)建壓測(cè)/創(chuàng)建 PTS 場(chǎng)景:
在場(chǎng)景配置中,將第一個(gè)函數(shù)計(jì)算函數(shù)暴露的 HTTP 接口作為串聯(lián)鏈路,配置如下圖所示:
接口配置完后,我們來(lái)配置施壓:
-
壓力模式:
-
并發(fā)模式:指定有多少并發(fā)用戶同時(shí)發(fā)請(qǐng)求。
-
RPS模式:指定每秒有多少請(qǐng)求數(shù)。
-
-
遞增模式:在壓測(cè)過(guò)程中可以通過(guò)手動(dòng)調(diào)節(jié)壓力,也可以自動(dòng)按百分比遞增壓力。
-
最大并發(fā):同時(shí)有多少個(gè)虛擬用戶發(fā)起請(qǐng)求。
-
遞增百分比:如果是自動(dòng)遞增的話,按這里的百分比遞增。
-
單量級(jí)持續(xù)時(shí)長(zhǎng):在未完全達(dá)到壓力全量的時(shí)候,每一級(jí)梯度的壓力保持的時(shí)長(zhǎng)。
-
壓測(cè)總時(shí)長(zhǎng):一共需要壓測(cè)的時(shí)長(zhǎng)。
這里因?yàn)橘Y源成本原因,并發(fā)用戶數(shù)設(shè)置為 2500 來(lái)進(jìn)行驗(yàn)證。
從上圖壓測(cè)中的情況來(lái)看,TPS 達(dá)到了 2w 的封頂,549w+ 的請(qǐng)求,99.99% 的請(qǐng)求是成功的,那 369 個(gè)異常也可以點(diǎn)擊查看,都是壓測(cè)工具請(qǐng)求超時(shí)導(dǎo)致的。
總結(jié)
至此,整個(gè)基于 Serverless 搭建的大數(shù)據(jù)采集傳輸?shù)募軜?gòu)就搭建好了,并且進(jìn)行了壓測(cè)驗(yàn)證,整體的性能也是不錯(cuò)的,并且整個(gè)架構(gòu)搭建起來(lái)也非常簡(jiǎn)單和容易理解。這個(gè)架構(gòu)不光適用于游戲運(yùn)營(yíng)行業(yè),其實(shí)任何大數(shù)據(jù)采集傳輸?shù)膱?chǎng)景都是適用的,目前也已經(jīng)有很多客戶正在基于 Serverless 的架構(gòu)跑在生產(chǎn)環(huán)境,或者正走在改造 Serverless 架構(gòu)的路上。
基于 Serverless 還有很多其他的應(yīng)用場(chǎng)景,之后我會(huì)一一分享給大家,大家如果有任何疑問(wèn)也可以加入釘釘群:35712134 來(lái)尋找答案,我們不見(jiàn)不散!
重磅下載
由 23 位阿里云技術(shù)專(zhuān)家精心打造 222 頁(yè)“橙子書(shū)”《Serverless 入門(mén)與實(shí)戰(zhàn)》重磅上線,帶你由表及里,由淺入深學(xué)習(xí) Serverless,助你 2021 年“橙”風(fēng)而行,心想事“橙”!
本書(shū)亮點(diǎn)
-
從架構(gòu)演進(jìn)開(kāi)始,介紹 Serverless 架構(gòu)及技術(shù)選型構(gòu)建 Serverless 思維;
-
了解業(yè)界流行的 Serverless 架構(gòu)運(yùn)行原理;
-
掌握 10 大 Serverless 真實(shí)落地案例,活學(xué)活用。
點(diǎn)擊即可直接下載!
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專(zhuān)家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的在游戏运营行业,Serverless 如何解决数据采集分析痛点?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 无责任畅想:云原生中间件的下一站
- 下一篇: seata-golang 接入指南