日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > windows >内容正文

windows

对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目...

發(fā)布時(shí)間:2024/10/8 windows 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目... 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

基于Spark2.2新聞網(wǎng)大數(shù)據(jù)實(shí)時(shí)系統(tǒng)項(xiàng)目

1. 說(shuō)明

2.環(huán)境配置

2.1 CDH-5.14.2 (安裝步驟可參考地址),關(guān)于版本是按實(shí)際操作, CDH的版本兼容性很好。

Service

hadoop01

hadoop02

hadoop03

HDFS

NameNode

DateNode

DataNode

HBase

HMaster、HRegionServer

HRegionServer

HRegionServer

Hive

Hive

Flume

Flume

Flume

Flume

Kafka

Kafka

YARN

ResourceManager

NodeManager

NodeManager

Oozie

Oozie

Hue

Hue

Spark2

Spark

Zookeeper

Zookeeper

MySQL

MySQL

2.2 主機(jī)配置

1.Hadoop01, 4核16G , centos7.2

2.Hadoop02, 2核8G, centos7.2

3.Haddop03, 2核8G, centos7.2

2.3 項(xiàng)目架構(gòu)

2.4 安裝依賴(lài)包

# yum -y install psmisc MySQL-python at bc bind-libs bind-utils cups-client cups-libs cyrus-sasl-gssapi cyrus-sasl-plain ed fuse fuse-libs httpd httpd-tools keyutils-libs-devel krb5-devel libcom_err-devel libselinux-devel libsepol-devel libverto-devel mailcap noarch mailx mod_ssl openssl-devel pcre-devel postgresql-libs python-psycopg2 redhat-lsb-core redhat-lsb-submod-security x86_64 spax time zlib-devel wget psmisc

# chmod +x /etc/rc.d/rc.local

# echo "echo 0 > /proc/sys/vm/swappiness" >>/etc/rc.d/rc.local

# echo "echo never > /sys/kernel/mm/transparent_hugepage/defrag" >>/etc/rc.d/rc.local

# echo 0 > /proc/sys/vm/swappiness

# echo never > /sys/kernel/mm/transparent_hugepage/defrag

# yum -y install rpcbind

# systemctl start rpcbind

# echo "systemctl start rpcbind" >> /etc/rc.d/rc.local

安裝perl支持

yum install perl* (yum安裝perl相關(guān)支持)

yum install cpan (perl需要的程序庫(kù),需要cpan的支持,詳細(xì)自行百度)

3. 編寫(xiě)數(shù)據(jù)生成模擬程序

3.1 模擬從nginx生成日志的log,數(shù)據(jù)來(lái)源(搜狗實(shí)驗(yàn)室下載用戶(hù)查詢(xún)?nèi)罩?#xff0c;搜索引擎查詢(xún)?nèi)罩編?kù)設(shè)計(jì)為包括約1個(gè)月(2008年6月)Sogou搜索引擎部分網(wǎng)頁(yè)查詢(xún)需求及用戶(hù)點(diǎn)擊情況的網(wǎng)頁(yè)查詢(xún)?nèi)罩緮?shù)據(jù)集合。)

3.2 數(shù)據(jù)清洗

數(shù)據(jù)格式為:訪(fǎng)問(wèn)時(shí)間\t用戶(hù)ID\t[查詢(xún)?cè)~]\t該URL在返回結(jié)果中的排名\t用戶(hù)點(diǎn)擊的順序號(hào)\t用戶(hù)點(diǎn)擊的URL其中,用戶(hù)ID是根據(jù)用戶(hù)使用瀏覽器訪(fǎng)問(wèn)搜索引擎時(shí)的Cookie信息自動(dòng)賦值,即同一次使用瀏覽器輸入的不同查詢(xún)對(duì)應(yīng)同一個(gè)用戶(hù)ID

將文件中的tab更換成逗號(hào)

cat weblog.log|tr "\t" "," > weblog2.log

將文件中的空格更換成逗號(hào)

cat weblog2.log|tr " " "," > weblog.log

3.3 主要代碼段

public static void readFileByLines(String fileName) {

FileInputStream fis = null;

InputStreamReader isr = null;

BufferedReader br = null;

String tempString = null;

try {

System.out.println("以行為單位讀取文件內(nèi)容,一次讀一整行:");

fis = new FileInputStream(fileName);

從文件系統(tǒng)中的某個(gè)文件中獲取字節(jié)

isr = new InputStreamReader(fis, "GBK");

br = new BufferedReader(isr);

int count = 0;

while ((tempString = br.readLine()) != null) {

count++;

//顯示行號(hào)

Thread.sleep(300);

String str = new String(tempString.getBytes("GBK"), "UTF8");

System.out.println("row:"+count+">>>>>>>>"+str);

writeFile(writeFileName, str);

}

isr.close();

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

if (isr != null) {

try {

isr.close();

} catch (IOException e1) {

}

}

}

}

3.4 打包成weblogs.jar,打包步驟, 寫(xiě)Shell腳本weblog-shell.sh

#/bin/bash

echo "start log......"

#第一個(gè)參數(shù)是原日志文件,第二個(gè)參數(shù)是日志生成輸出文件

java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

3.5 修改weblog-shell.sh可執(zhí)行權(quán)限

chmod 777 weblog-shell.sh

4. Flume數(shù)據(jù)采集配置

4.1 將hadoop02, hadoop03中Flume數(shù)據(jù)采集到hadoop01中,而且hadoop02和hadoop03的flume配置文件大致相同

flume-collect-conf.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type =exec

a1.sources.r1.command= tail -F /opt/datas/weblog-flume.log

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop01

a1.sinks.k1.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 1000

a1.channels.c1.keep-alive = 5

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

4.2 hadoop01通過(guò)flume接收hadoop02與hadoop03中flume傳來(lái)的數(shù)據(jù),并將其分別發(fā)送至hbase與kafka中,配置內(nèi)容如下:

a1.sources = r1

a1.channels = kafkaC hbaseC

a1.sinks = kafkaSink hbaseSink

a1.sources.r1.type = avro

a1.sources.r1.channels = hbaseC kafkaC

a1.sources.r1.bind = hadoop01

a1.sources.r1.port = 5555

a1.sources.r1.threads = 5

#****************************flume + hbase******************************

a1.channels.hbaseC.type = memory

a1.channels.hbaseC.capacity = 10000

a1.channels.hbaseC.transactionCapacity = 10000

a1.channels.hbaseC.keep-alive = 20

a1.sinks.hbaseSink.type = asynchbase

## HBase表名

a1.sinks.hbaseSink.table = weblogs

## HBase表的列族名稱(chēng)

a1.sinks.hbaseSink.columnFamily = info

## 自定義異步寫(xiě)入Hbase

a1.sinks.hbaseSink.serializer = main.hbase.KfkAsyncHbaseEventSerializer

a1.sinks.hbaseSink.channel = hbaseC

## Hbase表的列 名稱(chēng)

a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl

#****************************flume + kafka******************************

a1.channels.kafkaC.type = memory

a1.channels.kafkaC.capacity = 10000

a1.channels.kafkaC.transactionCapacity = 10000

a1.channels.kafkaC.keep-alive = 20

a1.sinks.kafkaSink.channel = kafkaC

a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.kafkaSink.brokerList = hadoop01:9092

a1.sinks.kafkaSink.topic = webCount

a1.sinks.kafkaSink.zookeeperConnect = hadoop01:2181

a1.sinks.kafkaSink.requiredAcks = 1

a1.sinks.kafkaSink.batchSize = 1

a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

4.3 配置Flume執(zhí)行Shell腳本

flume-collect-start.sh 分發(fā)到hadoop02,hadoop03 ,/opt/shell/

#/bin/bash

echo "flume-collect start ......"

sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-collect-conf.properties -n a1 -Dflume.root.logger=INFO,console

flume-kfk-hb-start.sh 分發(fā)到hadoop01 ,/opt/shell

#/bin/bash

echo "flume-collect start ......"

sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-hbase-kafka-conf.properties -n a1 -Dflume.root.logger=INFO,console

4.4 Flume分發(fā)到Hbase集成

下載Flume源碼并導(dǎo)入IDEA開(kāi)發(fā)工具

1)將apache-flume-1.7.0-src.tar.gz源碼下載到本地解壓

2)通過(guò)IDEA導(dǎo)入flume源碼

3)根據(jù)flume-ng-hbase-sink模塊源碼修改

4)修改代碼SimpleAsyncHbaseEventSerializer.java

5)具體代碼看源碼

KfkAsyncHbaseEventSerializer.java 關(guān)鍵代碼

@Override

public List getActions() {

List actions = new ArrayList();

if (payloadColumn != null) {

byte[] rowKey;

try {

/*---------------------------代碼修改開(kāi)始---------------------------------*/

// 解析列字段

String[] columns = new String(this.payloadColumn).split(",");

// 解析flume采集過(guò)來(lái)的每行的值

String[] values = new String(this.payload).split(",");

for (int i = 0; i < columns.length; i++) {

byte[] colColumn = columns[i].getBytes();

byte[] colValue = values[i].getBytes(Charsets.UTF_8);

// 數(shù)據(jù)校驗(yàn):字段和值是否對(duì)應(yīng)

if (columns.length != values.length) break;

// 時(shí)間

String datetime = values[0].toString();

// 用戶(hù)id

String userid = values[1].toString();

// 根據(jù)業(yè)務(wù)自定義Rowkey

rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);

// 插入數(shù)據(jù)

PutRequest putRequest = new PutRequest(table, rowKey, cf,

colColumn, colValue);

actions.add(putRequest);

/*---------------------------代碼修改結(jié)束---------------------------------*/

}

} catch (Exception e) {

throw new FlumeException("Could not get row key!", e);

}

}

return actions;

}

4.5 將項(xiàng)目打包成jar,vita-flume-ng-hbase-sink.jar,分發(fā)到CDH的Flume/libs/下

5. Kafka配置(測(cè)試環(huán)境,Kafka部署hadoop01,不做高可用)

5.1 配置

配置advertised.listeners:=PLAINTEXT://xxxx:9092

5.2 測(cè)試生產(chǎn)消費(fèi)是否成功

//create topic,副本數(shù)為1、分區(qū)數(shù)為1的topic,如果是配置了auto.create.topics.enable參數(shù)為true,可以忽略

sh bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --topic webCount --replication-factor 1 --partitions 1

//producer

sh /bin/kafka-console-producer --broker-list hadoop01:9092 --topic webCount

//consumer

sh /bin/kafka-console-consumer --zookeeper hadoop01:2181 --topic webCount --from-beginning

//delete topic

sh /bin/kafka-topics --delete --zookeeper hadoop01 --topic webCount

//topic list

sh /bin/kafka-topics --zookeeper hadoop01:2181 --list

5.3 編寫(xiě)Kafka Consumer執(zhí)行腳本kfk-test-consumer.sh,分發(fā)到/opt/shell/

#/bin/bash

echo "kfk-kafka-consumer.sh start......"

/bin/kafka-console-consumer --zookeeper hadoop01:2181 --from-beginning --topic webCount

6. Hbase配置

6.1 創(chuàng)建業(yè)務(wù)表

create 'weblogs','info'

//查看數(shù)據(jù)

count 'weblogs'

7. Hive配置

7.1 CDH配置 Hive與Hbase集成,或者配置

hbase.zookeeper.quorum

hadoop01,hadoop02,hadoop03

7.2 在hive中創(chuàng)建與hbase集成的外部表

CREATE EXTERNAL TABLE weblogs(

id string,

datetime string,

userid string,

searchname string,

retorder string,

cliorder string,

cliurl string

)

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES("hbase.columns.mapping"=

":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl")

TBLPROPERTIES("hbase.table.name"="weblogs");

#查看hbase數(shù)據(jù)記錄

select count(*) from weblogs;

# 查看表

show tables;

# 查看前10條數(shù)據(jù)

select * from weblogs limit 10;

8. Structured Streaming配置

8.1 測(cè)試Spark與mysql

val df =spark.sql("select count(1) from weblogs").show

8.2 Structured Streaming與MySQL集成

mysql創(chuàng)建相應(yīng)的數(shù)據(jù)庫(kù)和數(shù)據(jù)表,用于接收數(shù)據(jù)

create database test;

use test;

CREATE TABLE `webCount` (

`titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL,

`count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

8.3 Structured Streaming關(guān)鍵代碼

/**

* 結(jié)構(gòu)化流從kafka中讀取數(shù)據(jù)存儲(chǔ)到關(guān)系型數(shù)據(jù)庫(kù)mysql

* 目前結(jié)構(gòu)化流對(duì)kafka的要求版本0.10及以上

*/

object StructuredStreamingKafka {

case class Weblog(datatime: String,

userid: String,

searchname: String,

retorder: String,

cliorder: String,

cliurl: String)

val LOGGER: Logger = LogManager.getLogger("vita")

def main(args: Array[String]): Unit = {

val spark = SparkSession

.builder()

.master("yarn")

.appName("streaming")

.getOrCreate()

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "hadoop01:9092")

.option("subscribe", "webCount")

.load()

import spark.implicits._

val lines = df.selectExpr("CAST(value AS STRING)").as[String]

// // lines.map(_.split(",")).foreach(x => print(" 0 = " + x(0) + " 1 = " + x(1) + " 2 = " + x(2) + " 3 = " + x(3) + " 4 = " + x(4) + " 5 = " + x(5)))

val weblog = lines.map(_.split(","))

.map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))

val titleCount = weblog

.groupBy("searchname")

.count()

.toDF("titleName", "count")

val url = "jdbc:mysql://hadoop01:3306/test"

val username = "root"

val password = "root"

val writer = new JDBCSink(url, username, password)

// val writer = new MysqlSink(url, username, password)

val query = titleCount

.writeStream

.foreach(writer)

.outputMode("update")

.trigger(ProcessingTime("5 seconds"))

.start()

query.awaitTermination()

}

}

8.4 項(xiàng)目打包,spark-weblogs.jar.

9. 啟動(dòng)流程

9.1 CDH啟動(dòng)Zookeeper,Hadoop,Hbase,Mysql,Yarn,Flume,Kafka

9.2 先在Hadoop01 執(zhí)行/opt/shell/flume-kfk-hb-start.sh 將數(shù)據(jù)分別傳到hbase和kafka中

9.3 在Hadoop02,Hadoop03 執(zhí)行/opt/shell/flume-collect-start.sh 將數(shù)據(jù)發(fā)送到Hadoop01中

9.4 在hadoop01 , 執(zhí)行提交Spark任務(wù)

spark on yarn, 集成spark-sql-kafka

sh /bin/spark2-submit \

--class com.vita.spark.StructuredStreamingKafka \

--master yarn \

--deploy-mode cluster \

--executor-memory 1G \

--executor-cores 2 \

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \

/opt/jars/spark-weblogs.jar \

10

用IDEA 遠(yuǎn)程調(diào)試Spark代碼,參考地址

sh /bin/spark2-submit \

--class com.vita.spark.StructuredStreamingKafka \

--master yarn \

--deploy-mode cluster \

--executor-memory 1G \

--executor-cores 1 \

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \

--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005" \

/opt/jars/spark-weblogs.jar \

10

Yarn kill Spark任務(wù) : yarn application -kill [任務(wù)名]

9.5 在Hadoop02,Hadoop03 執(zhí)行/opt/weblog-shell.sh , 啟動(dòng) StructuredStreamingKafka來(lái)從kafka中取得數(shù)據(jù),處理后存到mysql中

9.6 登錄MySQL ,查看webCount表

總結(jié)

以上是生活随笔為你收集整理的对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。