Serverless在游戏运营行业进行数据采集分析的最佳实践
簡介: 這個(gè)架構(gòu)不光適用于游戲運(yùn)營行業(yè),其實(shí)任何大數(shù)據(jù)采集傳輸?shù)膱鼍岸际沁m用的,目前也已經(jīng)有很多客戶正在基于Serverless的架構(gòu)跑在生產(chǎn)環(huán)境,或者正走在改造Serverless 架構(gòu)的路上。
眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的2019年,中國游戲市場營收規(guī)模約2884.8億元,同比增長17.1%。2020年因?yàn)橐咔?#xff0c;游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計(jì),截至2019年,中國移動(dòng)游戲用戶規(guī)模約6.6億人,占中國總網(wǎng)民規(guī)模8.47億的77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。
對于玩家而言,市面上的游戲數(shù)量多如牛毛,那么玩家如何能發(fā)現(xiàn)和認(rèn)知到一款游戲,并且持續(xù)的玩下去恐怕是所有游戲廠商需要思考的問題。加之2018年游戲版號(hào)停發(fā)事件,游戲廠商更加珍惜每一個(gè)已獲得版號(hào)的游戲產(chǎn)品,所以這也使得“深度打磨產(chǎn)品質(zhì)量”和“提高運(yùn)營精細(xì)程度”這兩個(gè)游戲產(chǎn)業(yè)發(fā)展方向成為廣大游戲廠商的發(fā)展思路,無論是新游戲還是老游戲都在努力落實(shí)這兩點(diǎn):
? 新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內(nèi)容。
? 老游戲:通過用戶行為分析,投入更多的精力和成本,制作更優(yōu)質(zhì)的版本內(nèi)容。
這里我們重點(diǎn)來看新游戲。一家游戲企業(yè)辛辛苦苦研發(fā)三年,等著新游戲發(fā)售時(shí)一飛沖天。那么問題來了,新游戲如何被廣大玩家看到?
首先來看看游戲行業(yè)公司的分類:
?
? 游戲研發(fā)商:研發(fā)游戲的公司,生產(chǎn)和制作游戲內(nèi)容。比如王者榮耀的所有英雄設(shè)計(jì)、游戲戰(zhàn)斗場景、戰(zhàn)斗邏輯等,全部由游戲研發(fā)公司提供。
? 游戲發(fā)行商:游戲發(fā)行商的主要工作分三大塊:市場工作、運(yùn)營工作、客服工作。游戲發(fā)行商把控游戲命脈,市場工作核心是導(dǎo)入玩家,運(yùn)營工作核心是將用戶價(jià)值最大化、賺取更多利益。
?
? 游戲平臺(tái)/渠道商:游戲平臺(tái)和渠道商的核心目的就是曝光游戲,讓盡量多的人能發(fā)現(xiàn)你的游戲。
這三種類型的業(yè)務(wù),有專注于其中某一領(lǐng)域的獨(dú)立公司,也有能承接全部業(yè)務(wù)的公司,但無論那一種,這三者之間的關(guān)系是不會(huì)變的:
?
所以不難理解,想讓更多的玩家看到你的游戲,游戲發(fā)行和運(yùn)營是關(guān)鍵。通俗來講,如果你的游戲出現(xiàn)在目前所有大家熟知的平臺(tái)廣告中,那么最起碼游戲的新用戶注冊數(shù)量是很可觀的。因此這就引入了一個(gè)關(guān)鍵詞:買量。
根據(jù)數(shù)據(jù)顯示,2019年月均買量手游數(shù)達(dá)6000+款,而2018年僅為4200款。另一方面,隨著抖音、微博等超級(jí)APP在游戲買量市場的資源傾斜,也助推手游買量的效果和效率有所提升,游戲廠商更愿意使用買量的方式來吸引用戶。
但需要注意的是,在游戲買量的精準(zhǔn)化程度不斷提高的同時(shí),買量的成本也在節(jié)節(jié)攀升,唯有合理配置買量、渠道與整合營銷之間的關(guān)系,才能將宣發(fā)資源發(fā)揮到最大的效果。
通俗來講,買量其實(shí)就是在各大主流平臺(tái)投放廣告,廣大用戶看到游戲廣告后,有可能會(huì)點(diǎn)擊廣告,然后進(jìn)入游戲廠商的宣傳頁面,同時(shí)會(huì)采集用戶的一些信息,然后游戲廠商對采集到的用戶信息進(jìn)行大數(shù)據(jù)分析,進(jìn)行進(jìn)一步的定向推廣。
游戲運(yùn)營核心訴求
游戲廠商花錢買量,換來的用戶信息以及新用戶注冊信息是為持續(xù)的游戲運(yùn)營服務(wù)的,那么這個(gè)場景的核心訴求就是采集用戶信息的完整性。比如說,某游戲廠商一天花5000w投放廣告,在某平臺(tái)某時(shí)段產(chǎn)生了每秒1w次的廣告點(diǎn)擊率,那么在這個(gè)時(shí)段內(nèi)每一個(gè)點(diǎn)擊廣告的用戶信息要完整的被采集到,然后入庫進(jìn)行后續(xù)分析。這就對數(shù)據(jù)采集系統(tǒng)提出了很高的要求。這其中,最核心的一點(diǎn)就是系統(tǒng)暴露接口的環(huán)節(jié)要能夠平穩(wěn)承載買量期間不定時(shí)的流量脈沖。在買量期間,游戲廠商通常會(huì)在多個(gè)平臺(tái)投放廣告,每個(gè)平臺(tái)投放廣告的時(shí)間是不一樣的,所以就會(huì)出現(xiàn)全天不定時(shí)的流量脈沖現(xiàn)象。如果這個(gè)環(huán)節(jié)出現(xiàn)問題,那么相當(dāng)于買量的錢就打水漂了。
數(shù)據(jù)采集系統(tǒng)傳統(tǒng)架構(gòu)
?
上圖是一個(gè)相對傳統(tǒng)的數(shù)據(jù)采集系統(tǒng)架構(gòu),最關(guān)鍵的就是暴露HTTP接口回傳數(shù)據(jù)這部分,這部分如果出問題,那么采集數(shù)據(jù)的鏈路就斷了。但這部分往往會(huì)面臨兩個(gè)挑戰(zhàn):
? 當(dāng)流量脈沖來的時(shí)候,這部分是否可以快速擴(kuò)容以應(yīng)對流量沖擊。
? 游戲運(yùn)營具備潮汐特性,并非天天都在進(jìn)行,這就需要考慮如何優(yōu)化資源利用率。
通常情況下,在游戲有運(yùn)營活動(dòng)之前,會(huì)提前通知運(yùn)維同學(xué),對這個(gè)環(huán)節(jié)的服務(wù)增加節(jié)點(diǎn),但要增加多少其實(shí)是無法預(yù)估的,只能大概拍一個(gè)數(shù)字。這是在傳統(tǒng)架構(gòu)下經(jīng)常會(huì)出現(xiàn)的場景,這就會(huì)導(dǎo)致兩個(gè)問題:
? 流量太大,節(jié)點(diǎn)加少了,導(dǎo)致一部分流量的數(shù)據(jù)沒有采集到。
? 流量沒有預(yù)期那么大,節(jié)點(diǎn)加多了,導(dǎo)致資源浪費(fèi)。
數(shù)據(jù)采集系統(tǒng)Serverless架構(gòu)
我們可以通過 函數(shù)計(jì)算(函數(shù)計(jì)算的基本概念可以參考這篇文章)來取代傳統(tǒng)架構(gòu)中暴露HTTP回傳數(shù)據(jù)這部分,從而完美解決傳統(tǒng)架構(gòu)中存在問題,先來看架構(gòu)圖:
?
傳統(tǒng)架構(gòu)中的兩個(gè)問題均可以通過函數(shù)計(jì)算百毫秒彈性的特性來解決。我們并不需要去估算營銷活動(dòng)會(huì)帶來多大的流量,也不需要去擔(dān)心和考慮對數(shù)據(jù)采集系統(tǒng)的性能,運(yùn)維同學(xué)更不需要提前預(yù)備ECS。
因?yàn)楹瘮?shù)計(jì)算的極致彈性特性,當(dāng)沒有買量、沒有營銷活動(dòng)的時(shí)候,函數(shù)計(jì)算的運(yùn)行實(shí)例是零。有買量活動(dòng)時(shí),在流量脈沖的情況下,函數(shù)計(jì)算會(huì)快速拉起實(shí)例來承載流量壓力;當(dāng)流量減少時(shí),函數(shù)計(jì)算會(huì)及時(shí)釋放沒有請求的實(shí)例進(jìn)行縮容。所以Serverless架構(gòu)帶來的優(yōu)勢有以下三點(diǎn):
? 無需運(yùn)維介入,研發(fā)同學(xué)就可以很快的搭建出來。
? 無論流量大小,均可以平穩(wěn)的承接。
? 函數(shù)計(jì)算拉起的實(shí)例數(shù)量可以緊貼流量大小的曲線,做到資源利用率最優(yōu)化,再加上按量計(jì)費(fèi)的模式,可以最大程度優(yōu)化成本。
架構(gòu)解析
從上面的架構(gòu)圖可以看到,整個(gè)采集數(shù)據(jù)階段,分了兩個(gè)函數(shù)來實(shí)現(xiàn),第一個(gè)函數(shù)的作用是單純的暴露HTTP接口接收數(shù)據(jù),第二個(gè)函數(shù)用于處理數(shù)據(jù),然后將數(shù)據(jù)發(fā)送至消息隊(duì)列Kafka和數(shù)據(jù)庫RDS。
1.接收數(shù)據(jù)函數(shù)
我們打開函數(shù)計(jì)算控制臺(tái),創(chuàng)建一個(gè)函數(shù):
? 函數(shù)類型:HTTP(即觸發(fā)器為HTTP)
? 函數(shù)名稱:receiveData
? 運(yùn)行環(huán)境:Python3
?
? 函數(shù)實(shí)例類型:彈性實(shí)例
? 函數(shù)執(zhí)行內(nèi)存:512MB
? 函數(shù)運(yùn)行超時(shí)時(shí)間:60秒
? 函數(shù)單實(shí)例并發(fā)度:1
?
? 觸發(fā)器類型:HTTP觸發(fā)器
? 觸發(fā)器名稱:defaultTrigger
? 認(rèn)證方式:anonymous(即無需認(rèn)證)
? 請求方式:GET,POST
?
創(chuàng)建好函數(shù)之后,我們通過在線編輯器編寫代碼:
# -*- coding: utf-8 -*- import logging import json import urllib.parse HELLO_WORLD = b'Hello world!\n' def handler(environ, start_response):logger = logging.getLogger() context = environ['fc.context']request_uri = environ['fc.request_uri']for k, v in environ.items():if k.startswith('HTTP_'):# process custom request headerspasstry: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數(shù)據(jù)request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK"))request_body_obj = json.loads(request_body_str)logger.info(request_body_obj["action"])logger.info(request_body_obj["articleAuthorId"])status = '200 OK'response_headers = [('Content-type', 'text/plain')]start_response(status, response_headers)return [HELLO_WORLD]此時(shí)的代碼非常簡單,就是接收用戶傳來的參數(shù),我們可以調(diào)用接口進(jìn)行驗(yàn)證:
?
可以在函數(shù)的日志查詢中看到此次調(diào)用的日志:
?
同時(shí),我們也可以查看函數(shù)的鏈路追蹤來分析每一個(gè)步驟的調(diào)用耗時(shí),比如函數(shù)接到請求→冷啟動(dòng)(無活躍實(shí)例時(shí))→準(zhǔn)備代碼→執(zhí)行初始化方法→執(zhí)行入口函數(shù)邏輯這個(gè)過程:
?
?
從調(diào)用鏈路圖中可以看到,剛才的那次請求包含了冷啟動(dòng)的時(shí)間,因?yàn)楫?dāng)時(shí)沒有活躍實(shí)例,整個(gè)過程耗時(shí)418毫秒,真正執(zhí)行入口函數(shù)代碼的時(shí)間為8毫秒。
當(dāng)再次調(diào)用接口時(shí),可以看到就直接執(zhí)行了入口函數(shù)的邏輯,因?yàn)榇藭r(shí)已經(jīng)有實(shí)例在運(yùn)行,整個(gè)耗時(shí)只有2.3毫秒:
?
2.處理數(shù)據(jù)的函數(shù)
第一個(gè)函數(shù)是通過在函數(shù)計(jì)算控制臺(tái)在界面上創(chuàng)建的,選擇了運(yùn)行環(huán)境是Python3,我們可以在官方文檔中查看預(yù)置的Python3運(yùn)行環(huán)境內(nèi)置了哪些模塊,因?yàn)榈诙€(gè)函數(shù)要操作Kafka和RDS,所以需要我們確認(rèn)對應(yīng)的模塊。
從文檔中可以看到,內(nèi)置的模塊中包含RDS的SDK模塊,但是沒有Kafka的SDK模塊,此時(shí)就需要我們手動(dòng)安裝Kafka SDK模塊,并且創(chuàng)建函數(shù)也會(huì)使用另一種方式。
Funcraft
Funcraft是一個(gè)用于支持 Serverless 應(yīng)用部署的命令行工具,能幫助我們便捷地管理函數(shù)計(jì)算、API 網(wǎng)關(guān)、日志服務(wù)等資源。它通過一個(gè)資源配置文件(template.yml),協(xié)助我們進(jìn)行開發(fā)、構(gòu)建、部署操作。
所以第二個(gè)函數(shù)我們需要使用Fun來進(jìn)行操作,整個(gè)操作分為四個(gè)步驟:
? 安裝fun工具。
? 編寫template.yml模板文件,用來描述函數(shù)。
? 安裝我們需要的第三方依賴。
? 上傳部署函數(shù)。
安裝Fun
Fun提供了三種安裝方式:
? 通過 npm 包管理安裝 —— 適合所有平臺(tái)(Windows/Mac/Linux)且已經(jīng)預(yù)裝了 npm 的開發(fā)者。
? 通過下載二進(jìn)制安裝 —— 適合所有平臺(tái)(Windows/Mac/Linux)。
? 通過 Homebrew 包管理器安裝 —— 適合 Mac 平臺(tái),更符合 MacOS 開發(fā)者習(xí)慣。
文本示例環(huán)境為Mac,所以使用npm方式安裝,非常的簡單,一行命令搞定:
sudo npm install @alicloud/fun -g
安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:
$ fun --version 3.6.20在第一次使用 fun 之前需要先執(zhí)行 fun config 命令進(jìn)行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數(shù)計(jì)算控制臺(tái)首頁的右上方獲得:
fun config
? Aliyun Account ID 01
? Aliyun Access Key ID qef6j
? Aliyun Access Key Secret ***UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
編寫template.yml
新建一個(gè)目錄,在該目錄下創(chuàng)建一個(gè)名為template.yml的YAML文件,該文件主要描述要?jiǎng)?chuàng)建的函數(shù)的各項(xiàng)配置,說白了就是將函數(shù)計(jì)算控制臺(tái)上配置的那些配置信息以YAML格式寫在文件里:
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: FCBigDataDemo: Type: 'Aliyun::Serverless::Service' Properties: Description: 'local invoke demo' VpcConfig: VpcId: 'vpc-xxxxxxxxxxx' VSwitchIds: [ 'vsw-xxxxxxxxxx' ] SecurityGroupId: 'sg-xxxxxxxxx' LogConfig: Project: fcdemo Logstore: fc_demo_store dataToKafka: Type: 'Aliyun::Serverless::Function' Properties: Initializer: index.my_initializer Handler: index.handler CodeUri: './' Description: '' Runtime: python3我們來解析以上文件的核心內(nèi)容:
? FCBigDataDemo:自定義的服務(wù)名稱。通過下面的Type屬性標(biāo)明是服務(wù),即Aliyun::Serverless::Service。
? Properties:Properties下的屬性都是該服務(wù)的各配置項(xiàng)。
? VpcConfig:服務(wù)的VPC配置,包含:
? LogConfig:服務(wù)綁定的日志服務(wù)(SLS)配置,包含:
? dataToKafka:該服務(wù)下自定義的函數(shù)名稱。通過下面的Type屬性標(biāo)明是函數(shù),即Aliyun::Serverless::Function。
? Properties:Properties下的屬性都是該函數(shù)的各配置項(xiàng)。
? Initializer:配置初始化函數(shù)。
? Handler:配置入口函數(shù)。
? Runtime:函數(shù)運(yùn)行環(huán)境。
目錄結(jié)構(gòu)為:
?
安裝第三方依賴
服務(wù)和函數(shù)的模板創(chuàng)建好之后,我們來安裝需要使用的第三方依賴。在這個(gè)示例的場景中,第二個(gè)函數(shù)需要使用Kafka SDK,所以可以通過fun工具結(jié)合Python包管理工具pip進(jìn)行安裝:
fun install --runtime python3 --package-type pip kafka-python
執(zhí)行命令后有如下提示信息:
?
此時(shí)我們會(huì)發(fā)現(xiàn)在目錄下會(huì)生成一個(gè).fun文件夾 ,我們安裝的依賴包就在該目錄下:
?
部署函數(shù)
現(xiàn)在編寫好了模板文件以及安裝好了我們需要的Kafka SDK后,還需要添加我們的代碼文件index.py,代碼內(nèi)容如下:
# -*- coding: utf-8 -*- import logging import json import urllib.parse from kafka import KafkaProducer producer = None def my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer")global producerproducer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') def handler(event, context):logger = logging.getLogger() # 接收回傳的數(shù)據(jù)event_str = json.loads(event)event_obj = json.loads(event_str)logger.info(event_obj["action"])logger.info(event_obj["articleAuthorId"])# 向Kafka發(fā)送消息global producerproducer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))producer.close()return 'hello world'代碼很簡單,這里做以簡單的解析:
? my_initializer:函數(shù)實(shí)例被拉起時(shí)會(huì)先執(zhí)行該函數(shù),然后再執(zhí)行handler函數(shù) ,當(dāng)函數(shù)實(shí)例在運(yùn)行時(shí),之后的請求都不會(huì)執(zhí)行my_initializer函數(shù) 。一般用于各種連接的初始化工作,這里將初始化Kafka Producer的方法放在了這里,避免反復(fù)初始化Produer。
? handler:該函數(shù)只有兩個(gè)邏輯,接收回傳的數(shù)據(jù)和將數(shù)據(jù)發(fā)送至Kafka的指定Topic。
下面通過fun deploy命令部署函數(shù),該命令會(huì)做兩件事:
? 根據(jù)template.yml中的配置創(chuàng)建服務(wù)和函數(shù)。
? 將index.py和.fun上傳至函數(shù)中。
?
登錄函數(shù)計(jì)算控制臺(tái),可以看到通過fun命令部署的服務(wù)和函數(shù):
?
進(jìn)入函數(shù),也可以清晰的看到第三方依賴包的目錄結(jié)構(gòu):
?
3.函數(shù)之間調(diào)用
目前兩個(gè)函數(shù)都創(chuàng)建好了,下面的工作就是由第一個(gè)函數(shù)接收到數(shù)據(jù)后拉起第二個(gè)函數(shù)發(fā)送消息給Kafka。我們只需要對第一個(gè)函數(shù)做些許改動(dòng)即可:
# -*- coding: utf-8 -*- import logging import json import urllib.parse import fc2 HELLO_WORLD = b'Hello world!\n' client = None def my_initializer(context): logger = logging.getLogger() logger.info("init fc client")global clientclient = fc2.Client(endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",accessKeyID="your_ak",accessKeySecret="your_sk") def handler(environ, start_response):logger = logging.getLogger() context = environ['fc.context']request_uri = environ['fc.request_uri']for k, v in environ.items():if k.startswith('HTTP_'):# process custom request headerspasstry: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數(shù)據(jù)request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK"))request_body_obj = json.loads(request_body_str)logger.info(request_body_obj["action"])logger.info(request_body_obj["articleAuthorId"])global clientclient.invoke_function('FCBigDataDemo','dataToKafka',payload=json.dumps(request_body_str),headers = {'x-fc-invocation-type': 'Async'})status = '200 OK'response_headers = [('Content-type', 'text/plain')]start_response(status, response_headers)return [HELLO_WORLD]如上面代碼所示,對第一個(gè)函數(shù)的代碼做了三個(gè)地方的改動(dòng):
? 導(dǎo)入函數(shù)計(jì)算的庫:import fc2
? 添加初始化方法,用于創(chuàng)建函數(shù)計(jì)算Client:
這里需要注意的時(shí),當(dāng)我們在代碼里增加了初始化方法后,需要在函數(shù)配置中指定初始化方法的入口:
?
? 通過函數(shù)計(jì)算Client調(diào)用第二個(gè)函數(shù):
global clientclient.invoke_function('FCBigDataDemo','dataToKafka',payload=json.dumps(request_body_str),headers = {'x-fc-invocation-type': 'Async'} )invoke_function函數(shù)有四個(gè)參數(shù):
? 第一個(gè)參數(shù):調(diào)用函數(shù)所在的服務(wù)名稱。
? 第二個(gè)參數(shù):調(diào)用函數(shù)的函數(shù)名稱。
? 第三個(gè)參數(shù):向調(diào)用函數(shù)傳的數(shù)據(jù)。
? 第四個(gè)參數(shù):調(diào)用第二個(gè)函數(shù)Request Header信息。這里主要通過x-fc-invocation-type這個(gè)Key來設(shè)置是同步調(diào)用還是異步調(diào)用。這里設(shè)置Async為異步調(diào)用。
如此設(shè)置,我們便可以驗(yàn)證通過第一個(gè)函數(shù)提供的HTTP接口發(fā)起請求→采集數(shù)據(jù)→調(diào)用第二個(gè)函數(shù)→將數(shù)據(jù)作為消息傳給Kafka這個(gè)流程了。
使用兩個(gè)函數(shù)的目的
到這里有些同學(xué)可能會(huì)有疑問,為什么需要兩個(gè)函數(shù),而不在第一個(gè)函數(shù)里直接向Kafka發(fā)送數(shù)據(jù)呢?我們先來看這張圖:
?
當(dāng)我們使用異步調(diào)用函數(shù)時(shí),在函數(shù)內(nèi)部會(huì)默認(rèn)先將請求的數(shù)據(jù)放入消息隊(duì)列進(jìn)行第一道削峰填谷,然后每一個(gè)隊(duì)列在對應(yīng)函數(shù)實(shí)例,通過函數(shù)實(shí)例的彈性拉起多個(gè)實(shí)例進(jìn)行第二道削峰填谷。所以這也就是為什么這個(gè)架構(gòu)能穩(wěn)定承載大并發(fā)請求的核心原因之一。
4.配置Kafka
在游戲運(yùn)營這個(gè)場景中,數(shù)據(jù)量是比較大的,所以對Kafka的性能要求也是比較高的,相比開源自建,使用云上的Kafka省去很多的運(yùn)維操作,比如:
? 我們不再需要再維護(hù)Kafka集群的各個(gè)節(jié)點(diǎn)。
? 不需要關(guān)心主從節(jié)點(diǎn)數(shù)據(jù)同步問題。
? 可以快速、動(dòng)態(tài)擴(kuò)展Kafka集群規(guī)格,動(dòng)態(tài)增加Topic,動(dòng)態(tài)增加分區(qū)數(shù)。
? 完善的指標(biāo)監(jiān)控功能,消息查詢功能。
總的來說,就是一切SLA都有云上兜底,我們只需要關(guān)注在消息發(fā)送和消息消費(fèi)即可。
所以我們可以打開Kafka開通界面,根據(jù)實(shí)際場景的需求一鍵開通Kafka實(shí)例,開通Kafka后登錄控制臺(tái),在基本信息中可以看到Kafka的接入點(diǎn):
? 默認(rèn)接入點(diǎn):走VPC內(nèi)網(wǎng)場景的接入點(diǎn)。
? SSL接入點(diǎn):走公網(wǎng)場景的接入點(diǎn)。
將默認(rèn)接入點(diǎn)配置到函數(shù)計(jì)算的第二個(gè)函數(shù)中即可。
.... producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') ....然后點(diǎn)擊左側(cè)控制臺(tái)Topic管理,創(chuàng)建Topic:
?
將創(chuàng)建好的Topic配置到函數(shù)計(jì)算的第二個(gè)函數(shù)中即可。
... # 第一個(gè)參數(shù)為Topic名稱 producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) ...上文已經(jīng)列舉過云上Kafka的優(yōu)勢,比如動(dòng)態(tài)增加Topic的分區(qū)數(shù),我們可以在Topic列表中,對Topic的分區(qū)數(shù)進(jìn)行動(dòng)態(tài)調(diào)整:
?
?
單Topic最大支持到360個(gè)分區(qū),這是開源自建無法做到的。
接下來點(diǎn)擊控制臺(tái)左側(cè)Consumer Group管理,創(chuàng)建Consumer Group:
?
至此,云上的Kafka就算配置完畢了,即Producer可以往剛剛創(chuàng)建的Topic中發(fā)消息了,Consumer可以設(shè)置剛剛創(chuàng)建的GID以及訂閱Topic進(jìn)行消息接受和消費(fèi)。
Flink Kafka消費(fèi)者
在這個(gè)場景中,Kafka后面往往會(huì)跟著Flink,所以這里簡要給大家介紹一下在Flink中如何創(chuàng)建Kafka Consumer并消費(fèi)數(shù)據(jù)。代碼片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo"); String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092"); Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo"); FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);以上就是構(gòu)建Flink Kafka Consumer和添加Kafka Source的代碼片段,還是非常簡單的。
壓測驗(yàn)證
至此,整個(gè)數(shù)據(jù)采集的架構(gòu)就搭建完畢了,下面我們通過壓測來檢驗(yàn)一下整個(gè)架構(gòu)的性能。這里使用阿里云PTS來進(jìn)行壓測。
創(chuàng)建壓測場景
打開PTS控制臺(tái),點(diǎn)擊左側(cè)菜單創(chuàng)建壓測/創(chuàng)建PTS場景:
?
在場景配置中,將第一個(gè)函數(shù)計(jì)算函數(shù)暴露的HTTP接口作為串聯(lián)鏈路,配置如下圖所示:
?
?
接口配置完后,我們來配置施壓:
?
? 壓力模式:
? 并發(fā)模式:指定有多少并發(fā)用戶同時(shí)發(fā)請求。
? RPS模式:指定每秒有多少請求數(shù)。
? 遞增模式:在壓測過程中可以通過手動(dòng)調(diào)節(jié)壓力,也可以自動(dòng)按百分比遞增壓力。
? 最大并發(fā):同時(shí)有多少個(gè)虛擬用戶發(fā)起請求。
? 遞增百分比:如果是自動(dòng)遞增的話,按這里的百分比遞增。
? 單量級(jí)持續(xù)時(shí)長:在未完全達(dá)到壓力全量的時(shí)候,每一級(jí)梯度的壓力保持的時(shí)長。
? 壓測總時(shí)長:一共需要壓測的時(shí)長。
這里因?yàn)橘Y源成本原因,并發(fā)用戶數(shù)設(shè)置為2500來進(jìn)行驗(yàn)證。
?
?
從上圖壓測中的情況來看,TPS達(dá)到了2w的封頂,549w+的請求,99.99%的請求是成功的,那369個(gè)異常也可以點(diǎn)擊查看,都是壓測工具請求超時(shí)導(dǎo)致的。
總結(jié)
至此,整個(gè)基于Serverless搭建的大數(shù)據(jù)采集傳輸?shù)募軜?gòu)就搭建好了,并且進(jìn)行了壓測驗(yàn)證,整體的性能也是不錯(cuò)的,并且整個(gè)架構(gòu)搭建起來也非常簡單和容易理解。這個(gè)架構(gòu)不光適用于游戲運(yùn)營行業(yè),其實(shí)任何大數(shù)據(jù)采集傳輸?shù)膱鼍岸际沁m用的,目前也已經(jīng)有很多客戶正在基于Serverless的架構(gòu)跑在生產(chǎn)環(huán)境,或者正走在改造Serverless 架構(gòu)的路上。
作者:計(jì)緣,阿里云解決方案架構(gòu)師
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載
?
總結(jié)
以上是生活随笔為你收集整理的Serverless在游戏运营行业进行数据采集分析的最佳实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 性能高1倍,价格低3/4!数据库实时同步
- 下一篇: MaxCompute中如何通过logvi