对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目...
基于Spark2.2新聞網(wǎng)大數(shù)據(jù)實(shí)時(shí)系統(tǒng)項(xiàng)目
1. 說(shuō)明
2.環(huán)境配置
2.1 CDH-5.14.2 (安裝步驟可參考地址),關(guān)于版本是按實(shí)際操作, CDH的版本兼容性很好。
Service
hadoop01
hadoop02
hadoop03
HDFS
NameNode
DateNode
DataNode
HBase
HMaster、HRegionServer
HRegionServer
HRegionServer
Hive
Hive
Flume
Flume
Flume
Flume
Kafka
Kafka
YARN
ResourceManager
NodeManager
NodeManager
Oozie
Oozie
Hue
Hue
Spark2
Spark
Zookeeper
Zookeeper
MySQL
MySQL
2.2 主機(jī)配置
1.Hadoop01, 4核16G , centos7.2
2.Hadoop02, 2核8G, centos7.2
3.Haddop03, 2核8G, centos7.2
2.3 項(xiàng)目架構(gòu)
2.4 安裝依賴(lài)包
# yum -y install psmisc MySQL-python at bc bind-libs bind-utils cups-client cups-libs cyrus-sasl-gssapi cyrus-sasl-plain ed fuse fuse-libs httpd httpd-tools keyutils-libs-devel krb5-devel libcom_err-devel libselinux-devel libsepol-devel libverto-devel mailcap noarch mailx mod_ssl openssl-devel pcre-devel postgresql-libs python-psycopg2 redhat-lsb-core redhat-lsb-submod-security x86_64 spax time zlib-devel wget psmisc
# chmod +x /etc/rc.d/rc.local
# echo "echo 0 > /proc/sys/vm/swappiness" >>/etc/rc.d/rc.local
# echo "echo never > /sys/kernel/mm/transparent_hugepage/defrag" >>/etc/rc.d/rc.local
# echo 0 > /proc/sys/vm/swappiness
# echo never > /sys/kernel/mm/transparent_hugepage/defrag
# yum -y install rpcbind
# systemctl start rpcbind
# echo "systemctl start rpcbind" >> /etc/rc.d/rc.local
安裝perl支持
yum install perl* (yum安裝perl相關(guān)支持)
yum install cpan (perl需要的程序庫(kù),需要cpan的支持,詳細(xì)自行百度)
3. 編寫(xiě)數(shù)據(jù)生成模擬程序
3.1 模擬從nginx生成日志的log,數(shù)據(jù)來(lái)源(搜狗實(shí)驗(yàn)室下載用戶(hù)查詢(xún)?nèi)罩?#xff0c;搜索引擎查詢(xún)?nèi)罩編?kù)設(shè)計(jì)為包括約1個(gè)月(2008年6月)Sogou搜索引擎部分網(wǎng)頁(yè)查詢(xún)需求及用戶(hù)點(diǎn)擊情況的網(wǎng)頁(yè)查詢(xún)?nèi)罩緮?shù)據(jù)集合。)
3.2 數(shù)據(jù)清洗
數(shù)據(jù)格式為:訪(fǎng)問(wèn)時(shí)間\t用戶(hù)ID\t[查詢(xún)?cè)~]\t該URL在返回結(jié)果中的排名\t用戶(hù)點(diǎn)擊的順序號(hào)\t用戶(hù)點(diǎn)擊的URL其中,用戶(hù)ID是根據(jù)用戶(hù)使用瀏覽器訪(fǎng)問(wèn)搜索引擎時(shí)的Cookie信息自動(dòng)賦值,即同一次使用瀏覽器輸入的不同查詢(xún)對(duì)應(yīng)同一個(gè)用戶(hù)ID
將文件中的tab更換成逗號(hào)
cat weblog.log|tr "\t" "," > weblog2.log
將文件中的空格更換成逗號(hào)
cat weblog2.log|tr " " "," > weblog.log
3.3 主要代碼段
public static void readFileByLines(String fileName) {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String tempString = null;
try {
System.out.println("以行為單位讀取文件內(nèi)容,一次讀一整行:");
fis = new FileInputStream(fileName);
從文件系統(tǒng)中的某個(gè)文件中獲取字節(jié)
isr = new InputStreamReader(fis, "GBK");
br = new BufferedReader(isr);
int count = 0;
while ((tempString = br.readLine()) != null) {
count++;
//顯示行號(hào)
Thread.sleep(300);
String str = new String(tempString.getBytes("GBK"), "UTF8");
System.out.println("row:"+count+">>>>>>>>"+str);
writeFile(writeFileName, str);
}
isr.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (isr != null) {
try {
isr.close();
} catch (IOException e1) {
}
}
}
}
3.4 打包成weblogs.jar,打包步驟, 寫(xiě)Shell腳本weblog-shell.sh
#/bin/bash
echo "start log......"
#第一個(gè)參數(shù)是原日志文件,第二個(gè)參數(shù)是日志生成輸出文件
java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log
3.5 修改weblog-shell.sh可執(zhí)行權(quán)限
chmod 777 weblog-shell.sh
4. Flume數(shù)據(jù)采集配置
4.1 將hadoop02, hadoop03中Flume數(shù)據(jù)采集到hadoop01中,而且hadoop02和hadoop03的flume配置文件大致相同
flume-collect-conf.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =exec
a1.sources.r1.command= tail -F /opt/datas/weblog-flume.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01
a1.sinks.k1.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.keep-alive = 5
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.2 hadoop01通過(guò)flume接收hadoop02與hadoop03中flume傳來(lái)的數(shù)據(jù),并將其分別發(fā)送至hbase與kafka中,配置內(nèi)容如下:
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink
a1.sources.r1.type = avro
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = hadoop01
a1.sources.r1.port = 5555
a1.sources.r1.threads = 5
#****************************flume + hbase******************************
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20
a1.sinks.hbaseSink.type = asynchbase
## HBase表名
a1.sinks.hbaseSink.table = weblogs
## HBase表的列族名稱(chēng)
a1.sinks.hbaseSink.columnFamily = info
## 自定義異步寫(xiě)入Hbase
a1.sinks.hbaseSink.serializer = main.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
## Hbase表的列 名稱(chēng)
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20
a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = hadoop01:9092
a1.sinks.kafkaSink.topic = webCount
a1.sinks.kafkaSink.zookeeperConnect = hadoop01:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
4.3 配置Flume執(zhí)行Shell腳本
flume-collect-start.sh 分發(fā)到hadoop02,hadoop03 ,/opt/shell/
#/bin/bash
echo "flume-collect start ......"
sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-collect-conf.properties -n a1 -Dflume.root.logger=INFO,console
flume-kfk-hb-start.sh 分發(fā)到hadoop01 ,/opt/shell
#/bin/bash
echo "flume-collect start ......"
sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-hbase-kafka-conf.properties -n a1 -Dflume.root.logger=INFO,console
4.4 Flume分發(fā)到Hbase集成
下載Flume源碼并導(dǎo)入IDEA開(kāi)發(fā)工具
1)將apache-flume-1.7.0-src.tar.gz源碼下載到本地解壓
2)通過(guò)IDEA導(dǎo)入flume源碼
3)根據(jù)flume-ng-hbase-sink模塊源碼修改
4)修改代碼SimpleAsyncHbaseEventSerializer.java
5)具體代碼看源碼
KfkAsyncHbaseEventSerializer.java 關(guān)鍵代碼
@Override
public List getActions() {
List actions = new ArrayList();
if (payloadColumn != null) {
byte[] rowKey;
try {
/*---------------------------代碼修改開(kāi)始---------------------------------*/
// 解析列字段
String[] columns = new String(this.payloadColumn).split(",");
// 解析flume采集過(guò)來(lái)的每行的值
String[] values = new String(this.payload).split(",");
for (int i = 0; i < columns.length; i++) {
byte[] colColumn = columns[i].getBytes();
byte[] colValue = values[i].getBytes(Charsets.UTF_8);
// 數(shù)據(jù)校驗(yàn):字段和值是否對(duì)應(yīng)
if (columns.length != values.length) break;
// 時(shí)間
String datetime = values[0].toString();
// 用戶(hù)id
String userid = values[1].toString();
// 根據(jù)業(yè)務(wù)自定義Rowkey
rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
// 插入數(shù)據(jù)
PutRequest putRequest = new PutRequest(table, rowKey, cf,
colColumn, colValue);
actions.add(putRequest);
/*---------------------------代碼修改結(jié)束---------------------------------*/
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
4.5 將項(xiàng)目打包成jar,vita-flume-ng-hbase-sink.jar,分發(fā)到CDH的Flume/libs/下
5. Kafka配置(測(cè)試環(huán)境,Kafka部署hadoop01,不做高可用)
5.1 配置
配置advertised.listeners:=PLAINTEXT://xxxx:9092
5.2 測(cè)試生產(chǎn)消費(fèi)是否成功
//create topic,副本數(shù)為1、分區(qū)數(shù)為1的topic,如果是配置了auto.create.topics.enable參數(shù)為true,可以忽略
sh bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --topic webCount --replication-factor 1 --partitions 1
//producer
sh /bin/kafka-console-producer --broker-list hadoop01:9092 --topic webCount
//consumer
sh /bin/kafka-console-consumer --zookeeper hadoop01:2181 --topic webCount --from-beginning
//delete topic
sh /bin/kafka-topics --delete --zookeeper hadoop01 --topic webCount
//topic list
sh /bin/kafka-topics --zookeeper hadoop01:2181 --list
5.3 編寫(xiě)Kafka Consumer執(zhí)行腳本kfk-test-consumer.sh,分發(fā)到/opt/shell/
#/bin/bash
echo "kfk-kafka-consumer.sh start......"
/bin/kafka-console-consumer --zookeeper hadoop01:2181 --from-beginning --topic webCount
6. Hbase配置
6.1 創(chuàng)建業(yè)務(wù)表
create 'weblogs','info'
//查看數(shù)據(jù)
count 'weblogs'
7. Hive配置
7.1 CDH配置 Hive與Hbase集成,或者配置
hbase.zookeeper.quorum
hadoop01,hadoop02,hadoop03
7.2 在hive中創(chuàng)建與hbase集成的外部表
CREATE EXTERNAL TABLE weblogs(
id string,
datetime string,
userid string,
searchname string,
retorder string,
cliorder string,
cliurl string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES("hbase.columns.mapping"=
":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl")
TBLPROPERTIES("hbase.table.name"="weblogs");
#查看hbase數(shù)據(jù)記錄
select count(*) from weblogs;
# 查看表
show tables;
# 查看前10條數(shù)據(jù)
select * from weblogs limit 10;
8. Structured Streaming配置
8.1 測(cè)試Spark與mysql
val df =spark.sql("select count(1) from weblogs").show
8.2 Structured Streaming與MySQL集成
mysql創(chuàng)建相應(yīng)的數(shù)據(jù)庫(kù)和數(shù)據(jù)表,用于接收數(shù)據(jù)
create database test;
use test;
CREATE TABLE `webCount` (
`titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
8.3 Structured Streaming關(guān)鍵代碼
/**
* 結(jié)構(gòu)化流從kafka中讀取數(shù)據(jù)存儲(chǔ)到關(guān)系型數(shù)據(jù)庫(kù)mysql
* 目前結(jié)構(gòu)化流對(duì)kafka的要求版本0.10及以上
*/
object StructuredStreamingKafka {
case class Weblog(datatime: String,
userid: String,
searchname: String,
retorder: String,
cliorder: String,
cliurl: String)
val LOGGER: Logger = LogManager.getLogger("vita")
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("yarn")
.appName("streaming")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop01:9092")
.option("subscribe", "webCount")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
// // lines.map(_.split(",")).foreach(x => print(" 0 = " + x(0) + " 1 = " + x(1) + " 2 = " + x(2) + " 3 = " + x(3) + " 4 = " + x(4) + " 5 = " + x(5)))
val weblog = lines.map(_.split(","))
.map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))
val titleCount = weblog
.groupBy("searchname")
.count()
.toDF("titleName", "count")
val url = "jdbc:mysql://hadoop01:3306/test"
val username = "root"
val password = "root"
val writer = new JDBCSink(url, username, password)
// val writer = new MysqlSink(url, username, password)
val query = titleCount
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}
}
8.4 項(xiàng)目打包,spark-weblogs.jar.
9. 啟動(dòng)流程
9.1 CDH啟動(dòng)Zookeeper,Hadoop,Hbase,Mysql,Yarn,Flume,Kafka
9.2 先在Hadoop01 執(zhí)行/opt/shell/flume-kfk-hb-start.sh 將數(shù)據(jù)分別傳到hbase和kafka中
9.3 在Hadoop02,Hadoop03 執(zhí)行/opt/shell/flume-collect-start.sh 將數(shù)據(jù)發(fā)送到Hadoop01中
9.4 在hadoop01 , 執(zhí)行提交Spark任務(wù)
spark on yarn, 集成spark-sql-kafka
sh /bin/spark2-submit \
--class com.vita.spark.StructuredStreamingKafka \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--executor-cores 2 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
/opt/jars/spark-weblogs.jar \
10
用IDEA 遠(yuǎn)程調(diào)試Spark代碼,參考地址
sh /bin/spark2-submit \
--class com.vita.spark.StructuredStreamingKafka \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--executor-cores 1 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005" \
/opt/jars/spark-weblogs.jar \
10
Yarn kill Spark任務(wù) : yarn application -kill [任務(wù)名]
9.5 在Hadoop02,Hadoop03 執(zhí)行/opt/weblog-shell.sh , 啟動(dòng) StructuredStreamingKafka來(lái)從kafka中取得數(shù)據(jù),處理后存到mysql中
9.6 登錄MySQL ,查看webCount表
總結(jié)
以上是生活随笔為你收集整理的对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: editorloop 占用_system
- 下一篇: 云服务器怎么选系统,云服务器怎么选择操作