日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据仓库之电商数仓-- 1、用户行为数据采集

發布時間:2025/3/17 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据仓库之电商数仓-- 1、用户行为数据采集 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 一、數據倉庫概念
  • 二、項目需求及架構設計
    • 2.1 項目需求分析
    • 2.2 項目框架
      • 2.2.1 技術選型
      • 2.2.2 系統數據流程設計
      • 2.2.3 框架版本選型
      • 2.2.4 服務器選型
      • 2.2.5 集群規模
      • 2.2.6 集群資源規劃設計
  • 三、數據生成模塊
    • 3.1 目標數據
      • 3.1.1 頁面日志
      • 3.1.2 事件日志
      • 3.1.3 曝光日志
      • 3.1.4 啟動日志
      • 3.1.5 錯誤日志
    • 3.2數據埋點
      • 3.2.1 主流埋點方式
      • 3.2.2 埋點數據上報時機
      • 3.2.3 埋點數據日志結構
    • 3.3 服務器和JDK準備
    • 3.4 模擬數據
      • 3.4.1 使用說明
      • 3.4.2 集群日志生成腳本
  • 四、數據采集模塊
    • 4.1 集群所有進程查看腳本
    • 4.2 zookeeper安裝
      • 4.2.1 安裝ZK
      • 4.2.2 ZK集群啟動停止腳本
    • 4.3 Kafka安裝
      • 4.3.1 Kafka集群安裝
      • 4.3.2 Kafka集群啟動停止腳本
      • 4.3.3 常用命令
      • 4.3.4 項目經驗之Kafka機器數量計算
      • 4.3.5 項目經驗之壓力測試
      • 4.3.6 項目經驗之Kafka分區數計算
    • 4.4采集日志Flume
      • 4.4.1 日至采集Flume安裝
      • 4.4.2 項目經驗之Flume組件選型
      • 4.4.3 日志采集 Flume 配置
      • 4.4.4 Flume攔截器
      • 4.4.5 測試Flume-Kafka通道
      • 4.4.6 日至采集Flume啟動停止腳本
    • 4.5 消費Kafka數據Flume
      • 4.5.1 項目經驗之Flume組件選型
        • 4.5.1.1 FileChannel 和 MemoryChannel 區別
        • 4.5.1.2 FileChannel優化
        • 4.5.1.3 Sink:HDFS Sink
      • 4.5.2 Flume時間戳攔截器
      • 4.5.3 消費者Flume配置
      • 4.5.4 消費者Flume啟動停止腳本
    • 4.6 采集通道啟動停止腳本
  • 五、常見問題及解決方案
    • 5.1 2NN頁面不能顯示完整信息

-----------------------------------------------------分隔符-----------------------------------------------------
數據倉庫之電商數倉-- 1、用戶行為數據采集==>
數據倉庫之電商數倉-- 2、業務數據采集平臺==>
數據倉庫之電商數倉-- 3.1、電商數據倉庫系統(DIM層、ODS層、DWD層)==>
數據倉庫之電商數倉-- 3.2、電商數據倉庫系統(DWS層)==>
數據倉庫之電商數倉-- 3.3、電商數據倉庫系統(DWT層)==>
數據倉庫之電商數倉-- 3.4、電商數據倉庫系統(ADS層)==>
數據倉庫之電商數倉-- 4、可視化報表Superset==>
數據倉庫之電商數倉-- 5、即席查詢Kylin==>

一、數據倉庫概念

數據倉庫(Data Warehouse),是為企業制定決策,提供數據支持的。可以幫助企業改進業務流程、提高產品質量等。

數據倉庫的輸入數據通常包括:業務數據、用戶行為數據和爬蟲數據等。

業務數據:就是各行業在處理事務過程中產生的數據。如用戶在電商網站中登錄、下載、支付等過程中,需要和網站后臺數據庫進行增刪改查交互,產生的數據就是業務數據。業務數據通常存儲在MySQL、Oracle等數據庫中。

二、項目需求及架構設計

2.1 項目需求分析

項目需求

  • 用戶行為數據采集平臺搭建;
  • 業務數據采集平臺搭建;
  • 數據倉庫緯度建模;
  • 分析設備、會員、商品、地區、活動等電商核心主題,統計的報表接近100個;
  • 采用即席查詢工具,隨時進行指標分析;
  • 對集群性能進行監控,發生異常需要報警;
  • 元數據管理;
  • 質量監控;
  • 權限管理;
  • 2.2 項目框架

    2.2.1 技術選型

  • 項目技術如何選型?
  • 框架版本如何選型(Apache、CDH、HDP)
  • 服務器使用物理機還是云主機?
  • 如何確認集群規模?
  • ??:
    技術選型主要考慮因素:數據量大小、業務需求、行業內經驗、技術成熟度、開發維護成本、總成本預算。

    數據采集傳輸:Flume, Kafka, Sqoop, Logstash, DataX;
    數據存儲:MySQL, HDFS, HBase, Redis, MongoDB;
    數據計算:Hive, Tex, Spark, Flink, Storm;
    數據查詢:presto, Kylin, Impala, Druid, Clickouse, Doris;
    數據可視化:Echarts, Superset, QuickBI, DataV;
    任務調度:Azkaban, Oozie, DolphinScheduler, Airflow;
    集群監控:Zabbix, Prometheus;
    元數據管理:Atalas;
    權限管理:Ranger, Sentry.

    2.2.2 系統數據流程設計

    2.2.3 框架版本選型

  • 如何選擇Apache/CDH/HDP版本?
  • 1). Apache:運維麻煩,組件間兼容性需要自己調研,開源;
    2). CDH:國內使用最多的版本,不開源;
    3). HDP:開源,可進行二次開發,但沒有CDH穩定,國內使用甚少。

  • 云服務選擇
  • 1). 阿里云EMR、MaxCompute、DataWorks
    2). 亞馬遜云EMR
    3). 騰訊云EMR
    4). 華為云EMR

  • 具體版本型號
  • Apache框架版本:

    tips:
    框架選型最好選擇半年前的穩定版!

    2.2.4 服務器選型

  • 物理機:需專業運維人員;
  • 云主機:若是選擇阿里云,運維工作全由阿里云完成。
  • 2.2.5 集群規模

  • 如何確認集群規模?(假設:每臺服務器8T磁盤,128G內存)
  • 1). 每天日活躍用戶100萬,每人一天平均100條:100萬*100條=1億條;
    2). 每條日志1k左右,每天1億條:100000000 / 1024 /1024 = 約100G;
    3). 半年內不擴容服務器:100G*180天=約18T;
    4). 保存3個副本:18T*3=54T;
    5). 預留20%~30%Buf = 54T/0.7 = 77T;
    6). 約8T*10臺服務器。

  • 若是考慮數倉分層,數據壓縮,又要怎么計算?
  • 2.2.6 集群資源規劃設計

    在企業中通常會搭建一套生產集群和一套測試集群。生產集群運行生產任務,測試集群用于上線前代碼編寫和測試。

  • 生產集群
  • 1). 消耗內存的需分開;
    2). 數據傳輸數據比較緊密的放在一起(Kafka、Zookeeper);
    3). 客戶端盡量放在一到兩臺服務器上,方便外部訪問;
    4). 有依賴關系的盡量放在同一臺服務器上(如Hive和Azkaban Executor)。

  • 測試集群
  • 三、數據生成模塊

    3.1 目標數據

    我們要收集和分析數據主要包括頁面數據、時間數據、曝光數據、啟動數據和錯誤數據。

    3.1.1 頁面日志

    頁面數據主要記錄一個頁面的用戶訪問情況,包括訪問時間、停留時間、頁面路徑等信息。

    3.1.2 事件日志

    時間數據主要記錄應用內一個具體操作行為,包括操作類型
    操作對象、操作對象描述等信息。

    3.1.3 曝光日志

    曝光數據主要記錄頁面所曝光的內容,包括曝光對象,曝光類型等信息。

    3.1.4 啟動日志

    啟動數據記錄應用的啟動信息。

    3.1.5 錯誤日志

    錯誤數據記錄應用使用過程中的錯誤信息,包括錯誤編號及錯誤信息。

    3.2數據埋點

    3.2.1 主流埋點方式

    目前主流的埋點方式,有代碼埋點(前端/后端)、可視化埋點、全埋點三種。

    代碼埋點是通過調用埋點 SDK 函數,在需要埋點的業務邏輯功能位置調用接口,上報埋點數據。例如,我們對頁面中的某個按鈕埋點后,當這個按鈕被點擊時,可以在這個按鈕 對應的 OnClick 函數里面調用 SDK 提供的數據發送接口,來發送數據。

    可視化埋點只需要研發人員集成采集 SDK,不需要寫埋點代碼,業務人員就可以通過 訪問分析平臺的“圈選”功能,來“圈”出需要對用戶行為進行捕捉的控件,并對該事件進行命名。圈選完畢后,這些配置會同步到各個用戶的終端上,由采集 SDK 按照圈選的配置 自動進行用戶行為數據的采集和發送。

    全埋點是通過在產品中嵌入 SDK,前端自動采集頁面上的全部用戶行為事件,上報埋點數據,相當于做了一個統一的埋點。然后再通過界面配置哪些數據需要在系統里面進行分析。

    3.2.2 埋點數據上報時機

    埋點數據上報時機包括兩種方式:

    方式一,在離開該頁面時,上傳在這個頁面產生的所有數據(頁面、事件、曝光、錯誤 等)。
    優點:批處理,減少了服務器接收數據壓力,網絡IO少;
    缺點:實效性差。

    方式二,每個事件、動作、錯誤等產生后,立即發送。
    優點:響應及時;
    缺點:對服務器接收數據壓力比較大,網絡IO增加。

    本項目采用方式一埋點。

    3.2.3 埋點數據日志結構

    我們的日志結構大致可分為兩類,一是普通頁面埋點日志,二是啟動日志。

    普通頁面日志結構如下,每條日志包含了,當前頁面的頁面信息,所有事件(動作)、 所有曝光信息以及錯誤信息。除此之外,還包含了一系列公共信息,包括設備信息,地理位 置,應用信息等,即下邊的 common 字段。

    3.3 服務器和JDK準備

    分別安裝 hadoop102、hadoop103、hadoop104 三臺主機。
    服務器和JDK的準備內容看這里==>

    3.4 模擬數據

    3.4.1 使用說明

  • 創建 applog 路徑
  • [xiaobai@hadoop102 module]$ mkdir applog

    1). 將 application.yml、gmall2020-mock-log-2021-01-22.jar、path.json、logback.xml 上傳到 hadoop102 的/opt/module/applog 目錄下??

    2). 上傳文件

  • 配置文件??
  • 1). 配置application.yml文件:

    # 外部配置打開 logging.config: "./logback.xml" #業務日期 mock.date: "2020-06-14"#模擬數據發送模式 #mock.type: "http" #mock.type: "kafka" mock.type: "log"#http模式下,發送的地址 mock.url: "http://localhost:8080/applog"#kafka模式下,發送的地址 mock:kafka-server: "hadoop102:9092,hadoop103:9092,hadoop104:9092"kafka-topic: "ODS_BASE_LOG"#啟動次數 mock.startup.count: 200 #設備最大值 mock.max.mid: 500000 #會員最大值 mock.max.uid: 100 #商品最大值 mock.max.sku-id: 35 #頁面平均訪問時間 mock.page.during-time-ms: 20000 #錯誤概率 百分比 mock.error.rate: 3 #每條日志發送延遲 ms mock.log.sleep: 10 #商品詳情來源 用戶查詢,商品推廣,智能推薦, 促銷活動 mock.detail.source-type-rate: "40:25:15:20" #領取購物券概率 mock.if_get_coupon_rate: 75 #購物券最大id mock.max.coupon-id: 3 #搜索關鍵詞 mock.search.keyword: "圖書,小米,iphone11,電視,口紅,ps5,蘋果手機,小米盒子"

    2). 配置path.json文件:

    [{"path":["home","good_list","good_detail","cart","trade","payment"],"rate":20 },{"path":["home","search","good_list","good_detail","login","good_detail","cart","trade","payment"],"rate":40 },{"path":["home","mine","orders_unpaid","trade","payment"],"rate":10 },{"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","trade","payment"],"rate":5 },{"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","home"],"rate":5 },{"path":["home","good_detail"],"rate":10 },{"path":["home" ],"rate":10 }

    3). 配置logback文件:

    <?xml version="1.0" encoding="UTF-8"?> <configuration><property name="LOG_HOME" value="/opt/module/applog/log" /><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%msg%n</pattern></encoder></appender><appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern></rollingPolicy><encoder><pattern>%msg%n</pattern></encoder></appender><!-- 將某一個包下日志單獨打印日志 --><logger name="com.xiaobai.gmall2020.mock.log.util.LogUtil"level="INFO" additivity="false"><appender-ref ref="rollingFile" /><appender-ref ref="console" /></logger><root level="error" ><appender-ref ref="console" /></root> </configuration> ~
  • 生成日志
  • 1). 如圖,在/opt/module/applog目錄下執行以下命令生成對應的日志文件log:

    java -jar gmall2020-mock-log-2021-01-22.jar

    2). 在/opt/module/applog/log目錄下查看生成日志:

    3.4.2 集群日志生成腳本

  • 在 /home/xiaobai/bin目錄下創建腳本lg.sh:
  • [xiaobai@hadoop102 bin]$ vim lg.sh #!/bin/bashfor i in hadoop102 hadoop103; do echo "========== $i ==========" ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-01-22.jar >/dev/null 2>&1 &" done

    注:
    1). /opt/module/applog/為 jar 包及配置文件所在路徑
    2). /dev/null 代表 linux 的空設備文件,所有往這個文件里面寫入的內容都會丟失,俗
    稱“黑洞”。
    標準輸入 0:從鍵盤獲得輸入 /proc/self/fd/0;
    標準輸出 1:輸出到屏幕(即控制臺) /proc/self/fd/1;
    錯誤輸出 2:輸出到屏幕(即控制臺) /proc/self/fd/2。

  • 修改腳本執行權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 lg.sh

  • 將applog分發給其他節點:
  • [xiaobai@hadoop102 module]$ xsync applog/
  • 刪除hadoop104上的applog以及hadoop102上的log,并測試lg.sh:
  • [xiaobai@hadoop104 module]$ rm -rf applog/ [xiaobai@hadoop102 applog]$ rm -rf log

    如圖,hadoop102 / hadoop103加載出了log:

    四、數據采集模塊

    4.1 集群所有進程查看腳本

  • 在/home/xiaobai/bin 目錄下創建腳本 xcall.sh:
  • [xiaobai@hadoop102 bin]$ vim xcall.sh #! /bin/bashfor i in hadoop102 hadoop103 hadoop104 do echo --------- $i ----------ssh $i "$*" done
  • 修改腳本執行權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 xcall.sh
  • 啟動腳本:
  • [xiaobai@hadoop102 bin]$ xcall.sh jps --------- hadoop102 ---------- 3081 Jps --------- hadoop103 ---------- 2961 Jps --------- hadoop104 ---------- 3295 Jps

    4.2 zookeeper安裝

    4.2.1 安裝ZK

  • 集群規劃
    在hadoop102、hadoop103、hadoop104三個節點上部署zookeeper。

  • 解壓安裝

  • 1). 解壓zookeeper安裝包到/opt/module/目錄下:

    [xiaobai@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

    2). 修改/opt/module/apache-zookeeper-3.5.7-bin名稱為zookeeper-3.5.7:

    [xiaobai@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
  • 配置服務器編號:
  • 1). 在/opt/module/zookeeper-3.5.7目錄下創建zkData:

    [xiaobai@hadoop102 zookeeper-3.5.7]$ mkdir zkData

    2). 在/opt/module/zookeeper-3.5.7/zkData目錄下創建一個myid文件:

    [xiaobai@hadoop102 zookeeper-3.5.7]$ vim myid

    添加myid文件,??一定要在Linux里面創建!
    在文件中添加域server對應的編號:

    2

    3). 同步/opt/module/zookeeper-3.5.7目錄到hadoop103、hadop104:

    [xiaobai@hadoop102 module]$ xsync zookeeper-3.5.7/

    分別在hadoop103、hadoop104上修改myid文件中內容為3、4:

    [xiaobai@hadoop103 zkData]$ vim myid 3 [xiaobai@hadoop104 zkData]$ vim myid 4
  • 配置zoo.cfg文件:
  • 1). 重命名/opt/module/zookeeper-3.5.7/conf目錄下的zoo_sample.cfg為zoo.cfg :

    [xiaobai@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg

    2). 打開zoo.cfg 文件:

    [xiaobai@hadoop102 conf]$ vim zoo.cfg

    如圖,修改數據存儲路徑配置:

    dataDir=/opt/module/zookeeper-3.5.7/zkData

    如圖,增加如下配置:

    #######################cluster################################### server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888

    3). 同步分發zoo.cfg 配置文件:

    [xiaobai@hadoop102 conf]$ xsync zoo.cfg

    4). 配置參數解讀:

    server.A=B:C:D

    A 是一個數字,表示這是第幾號服務器;

    集群模式下配置一個文件 myid,這個文件在 dataDir 目錄下,這個文件里面有一個數據就是 A 的值,Zookeeper 啟動時讀取此文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個 server;

    B 是這個服務器的地址;

    C 是這個服務器 Follower 與集群中的 Leader 服務器交換信息的端口;

    D 是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。

  • 集群操作
  • 1). 分別啟動zookeeper:

    [xiaobai@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start [xiaobai@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start [xiaobai@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start

    2). 查看狀態:

    [xiaobai@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower[xiaobai@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower[xiaobai@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader

    4.2.2 ZK集群啟動停止腳本

  • 在hadoop102的/home/xiaobai/bin目錄下創建腳本zk.sh:
  • 在腳本中編寫如下內容:

    #!/bin/bashcase $1 in "start")for i in hadoop102 hadoop103 hadoop104doecho "---------- $i ------------"ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"done ;; "stop")for i in hadoop102 hadoop103 hadoop104doecho "---------- $i ------------"ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"done ;; "status")for i in hadoop102 hadoop103 hadoop104doecho "---------- $i ------------"ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"done ;; esac
  • 增加腳本執行權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 zk.sh
  • Zookeeper 集群啟動腳本:
  • [xiaobai@hadoop102 ~]$ zk.sh start
  • Zookeeper 集群停止腳本:
  • [xiaobai@hadoop102 ~]$ zk.sh stop
  • 查看Zookeeper 狀態:
  • [xiaobai@hadoop102 ~]$ zk.sh status ---------- hadoop102 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader ---------- hadoop103 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower ---------- hadoop104 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower

    4.3 Kafka安裝

    4.3.1 Kafka集群安裝

  • 解壓縮kafka_2.11-2.4.1.tgz至/opt/module/目錄下:
  • [xiaobai@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
  • 修改kafka_2.11-2.4.1名稱為kafka:
  • [xiaobai@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka
  • 在/opt/module/kafka路徑下創建logs文件夾:
  • [xiaobai@hadoop102 kafka]$ mkdir logs
  • 如圖,在/opt/module/kafka/config目錄下修改配置文件server.properties:
  • [xiaobai@hadoop102 config]$ vim server.properties #broker 的全局唯一編號,不能重復 broker.id=0 #刪除 topic 功能使能 delete.topic.enable=true #kafka 運行日志存放的路徑 log.dirs=/opt/module/kafka/logs #配置連接 Zookeeper 集群地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181


  • 分發kafka:
  • [xiaobai@hadoop102 module]$ xsync kafka/
  • 修改各自的broker.id:
  • [xiaobai@hadoop103 config]$ vim server.properties broker.id=1 [xiaobai@hadoop104 config]$ vim server.properties broker.id=2
  • 配置環境變量:
  • [xiaobai@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh #KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:KAFKA_HOME/bin[xiaobai@hadoop102 kafka]$ source /etc/profile.d/my_env.sh
  • 分發my_env.sh:
  • [xiaobai@hadoop102 kafka]$ sudo /home/xiaobai/bin/xsync /etc/profile.d/my_env.sh
  • 在hadoop103、hadoop104上使用source刷新my_env.sh :
  • [xiaobai@hadoop103 config]$ source /etc/profile.d/my_env.sh [xiaobai@hadoop104 config]$ source /etc/profile.d/my_env.sh
  • 啟動集群:
    依次在 hadoop102、hadoop103、hadoop104 節點上啟動 kafka:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties [xiaobai@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties [xiaobai@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
  • 關閉集群:
    在/opt/module/kafka目錄下執行以下命令:
  • bin/kafka-server-stop.sh stop

    4.3.2 Kafka集群啟動停止腳本

  • 在/home/xiaobai/bin目錄下創建腳本文件kf.sh:
  • #!/bin/bashcase $1 in "start")for i in hadoop102 hadoop103 hadoop104doecho "------啟動 $i KAFKA------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done ;; "stop")for i in hadoop102 hadoop103 hadoop104doecho "------停止 $i KAFKA------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"done ;; esac
  • 賦予權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 kf.sh
  • 使用腳本開啟kafka:
  • [xiaobai@hadoop102 bin]$ kf.sh start
  • 使用腳本停止kafka:
  • [xiaobai@hadoop102 bin]$ kf.sh stop

    4.3.3 常用命令

  • 查看Kafka Topic列表:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list __consumer_offsets topic_log

  • 創建Kafka Topic:
    /opt/module/kafka/目錄下創建日志主題:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_log

  • 刪除Kafka Topic:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_log
  • Kafka生產消息:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-console-producer.sh \ > --broker-list hadoop102:9092 --topic topic_log >hello world >xiaobai is coming >work
  • Kafka消費消息:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ > --bootstrap-server hadoop102:9092 --from-beginning --topic topic_log hello world xiaobai is coming work

    注??:–from-beginning:會把主題中以往所有的數據都讀取出來。根據業務場景選擇是否增加該配置。

  • 查看Kafka Topic詳情:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --describe --topic topic_log

    4.3.4 項目經驗之Kafka機器數量計算

    Kafka機器數量(經驗公式)= 2 *(峰值生產速度 * 副本數 / 100)+ 1;

    先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署Kafka的數量。

  • 峰值生產速度:可以通過壓測得到;

  • 副本數:

    默認副本數是1個,在企業里會有2-3個,通常為2;
    副本多可以提高可靠性,但是會降低網絡傳輸速率;
    如峰值生產速度為50M/s,副本數為2;
    Kafka機器數量 = 2 * (50 * 2 / 100)+ 1 = 3臺。

  • 4.3.5 項目經驗之壓力測試

    Kafka壓測:

    用Kafka官方自帶的腳本,對Kafka進行壓測:

    kafka-consumer-perf-test.sh
    kafka-producer-perf-test.sh

    Kafka壓測時,在硬盤讀寫速度一定的情況下,可以查看到哪些地方出現了瓶頸(CPU,內存,網絡 IO);一般都是網絡 IO 達到瓶頸。

    4.3.6 項目經驗之Kafka分區數計算

  • 創建一個只有 1 個分區的 topic;
  • 測試這個 topic 的 producer 吞吐量和 consumer 吞吐量;
  • 假設他們的值分別是 Tp 和 Tc,單位可以是 MB/s;
  • 然后假設總的目標吞吐量是 Tt,那么分區數=Tt / min(Tp,Tc) 例如:producer 吞吐量=20m/s;
    consumer 吞吐量=50m/s,期望吞吐量 100m/s;
    分區數=100 / 20 =5 分區;
  • 分區數一般設置為:3-10 個。
    建議參考==》
  • 4.4采集日志Flume

    4.4.1 日至采集Flume安裝

  • 解壓 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目錄下:
  • [xiaobai@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  • 修改apache-flume-1.9.0-bin的名稱為 flume
  • [xiaobai@hadoop102 module]$ mv apache-flume-1.9.0-bin/ flume
  • 將lib文件夾下的guava-11.0.2.jar 刪除以兼容hadoop3.2.2:
  • [xiaobai@hadoop102 lib]$ rm -rf guava-11.0.2.jar

    注??:刪除guava-11.0.2.jar 的服務器節點,一定要配置hadoop環境變量,否則會報異常。

  • 將 flume/conf 下 的 flume-env.sh.template 文 件 修 改 為 flume-env.sh, 并 配 置flume-env.sh 文件:
  • [xiaobai@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh [xiaobai@hadoop102 conf]$ vim flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_144
  • 分發flume:
  • [xiaobai@hadoop102 module]$ xsync flume/

    4.4.2 項目經驗之Flume組件選型

  • source
  • 1). flume1.6 source:

    exec :
    優點:可以實時監控文件變化;
    缺點:有丟數據的風險。

    spooling:
    優點:可以實現斷點續傳;
    缺點:不能實時監控文件變化。

    2). flume 1.7 taildir source??

    Taildir Source 相比 Exec Source、Spooling Directory Source 的優勢:
    a. 可實現斷點續傳,多目錄;Flume1.6 以前需要自己自定義 Source 記錄每次讀取文件位置,實現斷點續傳。
    b. 可以實時監控文件的變化。
    Exec Source 可以實時搜集數據,但是在 Flume 不運行或者 Shell 命令出錯的情況下,數據將會丟失;Spooling Directory Source 監控目錄,支持斷點續傳,但不能實時監控文件變化。

    3). batchSize 大小如何設置?

    答:Event 1K 左右時,500-1000 合適(默認為 100)

  • Channel
  • file channel??: 數據存儲在磁盤中,可靠性高,效率低;
    memory channel??: 數據存儲在內存中,效率高,可靠性差。

    采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 數據存儲在 Kafka 里面, 所以數據是存儲在磁盤中。

    4.4.3 日志采集 Flume 配置

  • Flume配置分析:

    注??:Flume 直接讀 log 日志的數據,log 日志的格式是 app.yyyy-mm-dd.log。

  • Flume的具體配置如下:

  • 1). 在/opt/module/flume/conf 目錄下創建 file-flume-kafka.conf 文件:

    [xiaobai@hadoop102 conf]$ vim file-flume-kafka.conf

    2). 在文件配置如下內容:

    #定義組件 a1.sources=r1 a1.channels=c1#配置source (Taildirsource) a1.sources.r1.type=TAILDIR a1.sources.r1.filegroups=f1 a1.sources.r1.filegroups.f1=/opt/module/applog/log/app.* a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json#配置攔截器(ETL數據清洗 判斷json是否完整) a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=com.xiaobai.flume.interceptor.ETLInterceptor$Builder#配置channel a1.channels.c1.type=org.apache.flume.channel.kafka.kafkaChannel a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic=topic_log a1.channels.c1.parseAsFlumeEvent=false#配置sink(Kafka Channel 無Sink)#拼接組件 a1.sources.r1.channels=c1

    3). 分發file-flume-kafka.conf:

    [xiaobai@hadoop102 conf]$ xsync file-flume-kafka.conf

    tips: com.xiaobai.flume.interceptor.ETLInterceptor是自定義的攔截器全類名。

    4.4.4 Flume攔截器

  • 創建 Maven 工程 flume-interceptor;
  • 創建包名: com.xiaobai.flume.interceptor;
  • 在 pom.xml 文件中添加如下配置:
  • <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
  • 在com.xiaobai.flume.interceptor包下創建JSONUtils類:
  • package com.xiaobai.flume.interceptor;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException;public class JSONUtils {// //test數據 // public static void main(String[] args) { // System.out.println(isValidate("{123323")); // }//驗證數據是否jsonpublic static boolean isValidate(String log) {try {JSON.parse(log);return true;} catch(JSONException e){return false;}} }
  • 在com.xiaobai.flume.interceptor下創建ETLInterceptor類:
  • package com.xiaobai.flume.interceptor;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset; import java.util.Iterator; import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 取數據后進行校驗//1 獲取數據byte[] body = event.getBody();String log = new String(body, Charset.forName("UTF-8"));//2 校驗if(JSONUtils.isValidate(log)){return event;}return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while(iterator.hasNext()){Event next = iterator.next();if(intercept(next) == null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {} }
  • 打包:

  • 將打包好的帶依賴的jar包傳入hadoop102 的/opt/module/flume/lib 目錄下:
    過濾查找是否存在帶依賴的jar包flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar:

  • [xiaobai@hadoop102 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 分發jar包至hadoop103、hadoop104:
  • [xiaobai@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 分別在 hadoop102、hadoop103 上啟動 Flume:
  • [xiaobai@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf & [xiaobai@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

    4.4.5 測試Flume-Kafka通道

  • 生成日志:
  • [xiaobai@hadoop102 kafka]$ lg.sh
  • 消費Kafka數據,觀察控制臺是否獲取到數據:
  • [xiaobai@hadoop102 kafka]$ bin/kafka-console-consumer.sh \--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

    如圖所示就是已經獲取到數據了!我這里用兩個遠程工具同時開了好幾個遠程端口進行不同操作,所以這個端口看起來會不一樣。

    注??:
    這里獲取不到數據的話,后面會很麻煩!!!!
    前車之鑒,一定要認真檢查!配置文件!少個字母多個字母 包名不正確之類的都會引起錯誤!!!若是配置文件都沒錯的話 就檢查Kafka、Flume、Zookeeper是否正確啟動,仔細檢查狀態!!!再檢查Flume的攔截器代碼是否正常!!所引用的包名是否正確!!!!!!!!試圖引起注意!!!!!!!!n多個感嘆號?足以證明會有多重要 我是當時不以為然 沒在意 結果到后面進行不下去了 又返回來重新檢查了一遍的😭!

    4.4.6 日至采集Flume啟動停止腳本

  • 創建日至采集Flume啟動停止腳本f1.sh:
  • [xiaobai@hadoop102 bin]$ vim f1.sh
  • 填入以下內容:
  • #!/bin/bashcase $1 in "start")for i in hadoop102 hadoop103doecho "------啟動$i采集flume------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"done ;; "stop")for i in hadoop102 hadoop103doecho "------停止$i采集flume------" ssh $i "ssh $i ps -ef | grep file-flume-kafka.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"done ;; esac
  • 增加腳本執行權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 f1.sh
  • f1集群啟動腳本:
  • [xiaobai@hadoop102 bin]$ f1.sh start
  • f1集群停止腳本:
  • [xiaobai@hadoop102 bin]$ f1.sh stop

    ??注:

  • nohup,該命令可以在你退出帳戶/關閉終端之后繼續運行相應的進程。nohup 就是不掛起的意思,不掛斷地運行命令;
  • awk 默認分隔符為空格;
  • xargs 表示取出前面命令運行的結果,作為后面命令的輸入參數;
  • $2是在雙引號 “ ”呢句會被解析為腳本的第二個參數,但此處表達的意思是awk的第二個值,所以需要轉義,用\$2表示。
  • 4.5 消費Kafka數據Flume

    4.5.1 項目經驗之Flume組件選型

    4.5.1.1 FileChannel 和 MemoryChannel 區別

    MemoryChannel 傳輸數據速度更快,但因為數據保存在 JVM 的堆內存中,Agent 進程掛掉會導致數據丟失,適用于對數據質量要求不高的需求。

    FileChannel 傳輸速度相對于 Memory 慢,但數據安全保障高,Agent 進程掛掉也可以從失敗中恢復數據。

    選型:
    金融類公司、對錢要求非常準確的公司通常會選擇 FileChannel;
    傳輸的是普通日志信息(京東內部一天丟 100 萬-200 萬條,這是非常正常的),通常選擇 MemoryChannel。

    4.5.1.2 FileChannel優化

    通過配置 dataDirs 指向多個路徑,每個路徑對應不同的硬盤,增大 Flume 吞吐量。 官方說明如下:
    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

    checkpointDir 和 backupCheckpointDir 也盡量配置在不同硬盤對應的目錄中,保證checkpoint 壞掉后,可以快速使用 backupCheckpointDir 恢復數據。

    FileChannel底層原理

    4.5.1.3 Sink:HDFS Sink

  • HDFS 存入大量小文件,有什么影響?
  • 元數據層面: 每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在 Namenode 內存中。所以小文件過多,會占用 Namenode 服務器大量內存,影響 Namenode 性能和使用壽命;

    計算層面:默認情況下 MR 會對每個小文件啟用一個 Map 任務計算,非常影響計算性能。同時也影響磁盤尋址時間。

  • HDFS 小文件處理
  • 官方默認的這三個參數配置寫入 HDFS 后會產生小文件,hdfs.rollInterval、hdfs.rollSize、 hdfs.rollCount

    基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 幾個參數綜合作用,效果如下:
    1). 文件在達到 128M 時會滾動生成新文件;
    2). 文件創建超 3600 秒時會滾動生成新文件.

    4.5.2 Flume時間戳攔截器

    由于 flume 默認會用 linux 系統時間,作為輸出到 HDFS 路徑的時間。如果數據是 23:59 分產生的。Flume 消費 kafka 里面的數據時,有可能已經是第二天00:00以后了,那么該部門數據會被發往第二天的 HDFS 路徑。我們希望的是根據日志里面的實際時間,發往 HDFS 的路徑,所以下面攔截器作用是獲取日志中的實際時間。

  • 在 com.xiaobai.flume.interceptor 包下創建 TimeStampInterceptor 類:
  • package com.xiaobai.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map;public class TimeStampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//將日志攔截,取出header里面的key,取出body里面對應的時間,將ts的值賦值給header里的key timestamp//1. 獲取header頭Map<String, String> headers = event.getHeaders();//2. 獲取body中的tsbyte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");//3. 將ts賦值給timestampheaders.put("timestamp",ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {} }
  • 重新打包:
  • 需要先將打好的包放入到 hadoop104 的/opt/module/flume/lib 文件夾下面:
  • [xiaobai@hadoop104 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

    4.5.3 消費者Flume配置

  • Flume配置分析:
  • 具體配置如下:
  • 1). 在 hadoop104 的/opt/module/flume/conf 目錄下創建 kafka-flume-hdfs.conf 文件:

    [xiaobai@hadoop104 conf]$ vim kafka-flume-hdfs.conf

    2). 在文件配置如下內容:

    ## 定義組件 ## 定義組件 a1.sources=r1 a1.channels=c1 a1.sinks=k1## 配置source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.batchSize=5000 a1.sources.r1.batchDurationMills=2000#時間戳攔截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.xiaobai.flume.interceptor.TimeStampInterceptor$Builder## 配置channel a1.channels.c1.type = file a1.channels.c1.checkpointDir=/opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs=/opt/module/flume/data/behavior1/ #a1.channels.c1.maxFileSize = 2146435071 #a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6## 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path =/origin_data/gmall/log/topic_log/%Y-%m-%da1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=134217728 a1.sinks.k1.hdfs.rollCount=0a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false #a1.sinks.k1.hdfs.fileType=DataStream ##a1.sinks.k1..hdfs.writeFormat=Text # # #a1.sinks.k1.hdfs.rollInterval = 10 #a1.sinks.k1.hdfs.rollSize = 134217728 #a1.sinks.k1.hdfs.rollCount = 0## 控制輸出文件是原生文件 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop## 拼接組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1

    4.5.4 消費者Flume啟動停止腳本

  • 在/home/xiaobai/bin目錄下創建腳本 f2.sh:
  • [xiaobai@hadoop102 bin]$ vim f2.sh

    在腳本中填寫如下內容:

    #!/bin/bashcase $1 in "start"){for i in hadoop104doecho "------啟動$i消費flume------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"done };; "stop"){for i in hadoop104doecho "------停止$i消費flume------"ssh $i "ssh $i ps -ef | grep kafka-flume-hdfs | grep -v grep | awk '{print \$2}' | xargs -n1 kill"done };; esac
  • 增加腳本執行權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 f2.sh
  • f2 集群啟動腳本:
  • [xiaobai@hadoop102 bin]$ f2.sh start
  • f2 集群停止腳本:
  • [xiaobai@hadoop102 bin]$ f2.sh stop

    4.6 采集通道啟動停止腳本

  • 在/home/xiaobai/bin 目錄下創建腳本 cluster.sh:
  • [xiaobai@hadoop102 bin]$ vim cluster.sh
  • 填入以下內容:
  • #!/bin/bashcase $1 in "start")echo "------啟動集群------"zk.sh startmyhadoop.sh startkf.sh startf1.sh startf2.sh start;; "stop")echo "------停止集群------"f2.sh stopf1.sh stopkf.sh stopmyhadoop.sh stopzk.sh stop ;; esac
  • 增加腳本執行權限:
  • [xiaobai@hadoop102 bin]$ chmod 777 cluster.sh
  • cluster 集群啟動腳本:
  • [xiaobai@hadoop102 bin]$ cluster.sh start
  • cluster 集群停止腳本:
  • 五、常見問題及解決方案

    5.1 2NN頁面不能顯示完整信息

    1. 問題描述:
    如圖,訪問2NN頁面http://hadoop104:9868,看不到詳細信息。

    2. 解決方法:

  • 在瀏覽器上按command+shift+i進入以下界面,點擊console會有錯誤提示信息:
  • 如圖,找到要修改的文件:
  • [xiaobai@hadoop104 static]$ pwd /opt/module/hadoop-3.2.2/share/hadoop/hdfs/webapps/static[xiaobai@hadoop104 static]$ vim dfs-dust.js

    輸入以下代碼顯示行號:

    :set nu

    如圖,shift+d:修改替換61行:

    return new Date(Number(v)).toLocalString();

  • 分發dfs-dust.js:
  • [xiaobai@hadoop104 static]$ xsync dfs-dust.js
  • 由于此界面是靜態網頁,所以需強制刷新http://hadoop104:9868頁面,點擊瀏覽器三個小點–>更多工具–>清除瀏覽數據–清除數據,顯示以下界面:
  • 3. 問題描述:
    hadoop104拒絕連接請求。

    4. 解決方法:
    如果網絡暢通且關閉了防火墻,那可能是由于集群忘記開啟,重新啟動集群再次刷新hadoop104:9868界面即可。

    總結

    以上是生活随笔為你收集整理的数据仓库之电商数仓-- 1、用户行为数据采集的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。