日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【开发工具】Spark Shell 的使用

發布時間:2025/3/21 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【开发工具】Spark Shell 的使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。 本文鏈接:https://blog.csdn.net/u010416101/article/details/88913354

前言

前一章中我們介紹了Spark的Standalone模式的安裝. 本章我們介紹下Spark Shell操作窗口的基本的安裝.

  • 基本啟動與使用
  • 基本算子使用

基本啟動與使用

  • 本地啟動
    進入./bin目錄, 使用spark-shell即可啟動. 未鏈接集群, 直接啟動了一個Worker結點. 可以通過 http://localhost:4040 進行訪問.
localhost:bin Sean$ spark-shell Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/29 17:15:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/29 17:15:01 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0) 19/03/29 17:15:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Spark context Web UI available at http://192.168.31.80:4040 Spark context available as 'sc' (master = local[*], app id = local-1553850902335). Spark session available as 'spark'. Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.2.1/_/

Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 本地啟動 - 鏈接集群 / 指定配置 spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
localhost:bin Sean$ spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/30 15:25:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://127.0.0.1:4040 Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190330152508-0001). Spark session available as 'spark'. Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.2.1/_/

Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@66d2885c

scala>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

啟動成功后, 我們可以在http://localhost:8080上面看到spark-shell進程.

隨后,我們可以使用spark-shell內使用Scala語言完成一定的操作.


Spark submit

當我們在生產部署與發布的時候通常是使用spark-submit腳本進行提交的.(./bin目錄下.) 我們通常是使用Maven將程序進行打包, 隨后通過spark-submit提交進行.
(注: Maven打全碼包這邊就不再敘述了, 更多請看Maven 打包實戰.)


Q & A

localhost:bin Sean$ spark-shell --master spark://192.168.31.80:7077 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/29 18:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/29 18:11:38 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0) 19/03/29 18:11:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 19/03/29 18:11:39 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult:at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)... 1 more 19/03/29 18:11:59 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult:at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)... 1 more 19/03/29 18:12:19 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult:at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)... 1 more 19/03/29 18:12:39 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 19/03/29 18:12:39 WARN StandaloneSchedulerBackend: Application ID is not initialized yet. 19/03/29 18:12:39 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master 19/03/29 18:12:40 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)at $line3.$read$$iw$$iw.<init>(<console>:15)at $line3.$read$$iw.<init>(<console>:42)at $line3.$read.<init>(<console>:44)at $line3.$read$.<init>(<console>:48)at $line3.$read$.<clinit>(<console>)at $line3.$eval$.$print$lzycompute(<console>:7)at $line3.$eval$.$print(<console>:6)at $line3.$eval.$print(<console>)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)at org.apache.spark.repl.Main$.doMain(Main.scala:74)at org.apache.spark.repl.Main$.main(Main.scala:54)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)... 47 elided <console>:14: error: not found: value sparkimport spark.implicits._^ <console>:14: error: not found: value sparkimport spark.sql^ Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.2.1/_/

Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191

解決措施: 這種問題的主要形成問題主要有如下幾種:

  • 檢查防火墻信息, 查看到Spark的地址是否打開;
  • 檢查本地的Scala版本是否與遠程Spark提交的Scala版本一致.(conf/spark-env.sh文件//etc/profile/spark-shell直接啟動時顯示的版本號)
  • 本地的/etc/hosts的配置錯誤.
  • 本地啟動偽集群時, 需要將spark-env.sh文件與slaves進行如下配置.
# spark-env.sh export SPARK_MASTER_IP=127.0.0.1 export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_PORT=7077
  • 1
  • 2
  • 3
  • 4
# slaves localhost
  • 1
  • 2

[1]. 關于Spark報錯不能連接到Server的解決辦法(Failed to connect to master master_hostname:7077)
[2]. Unable to connect to Spark master
[3]. Spark報錯——AnnotatedConnectException拒絕連接
[4]. 單機spark綁定端口

localhost:bin Sean$ spark-shell --master spark://127.0.0.1:7077 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/30 15:23:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/30 15:23:55 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 19/03/30 15:23:55 ERROR Inbox: Ignoring error org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: FAILEDat org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:509)at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:146)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:216)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) 19/03/30 15:23:55 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)at $line3.$read$$iw$$iw.<init>(<console>:15)at $line3.$read$$iw.<init>(<console>:42)at $line3.$read.<init>(<console>:44)at $line3.$read$.<init>(<console>:48)at $line3.$read$.<clinit>(<console>)at $line3.$eval$.$print$lzycompute(<console>:7)at $line3.$eval$.$print(<console>:6)at $line3.$eval.$print(<console>)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)at org.apache.spark.repl.Main$.doMain(Main.scala:74)at org.apache.spark.repl.Main$.main(Main.scala:54)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)... 47 elided <console>:14: error: not found: value sparkimport spark.implicits._^ <console>:14: error: not found: value sparkimport spark.sql^ Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.2.1/_/

Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala> localhost:bin Sean$

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 本地硬盤 & 內存 & CPU核數資源不夠時都可能造成上述問題.
    spark-shell任務完成, 但是資源不夠失敗了.()我們可以通過-- total-executors 1減少所占用的CPU核數與內存數目.

基本算子使用

由于本地的機器限制, 我們這邊就直接使用spark-shell進行下面的算子操作.

前置條件
  • Spark
  • Hadoop
基本操作
  • 進入bin/spark-shell
localhost:bin Sean$ ./spark-shell Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/05/25 16:26:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://127.0.0.1:4040 Spark context available as 'sc' (master = local[*], app id = local-1558772763953). Spark session available as 'spark'. Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.2.1/_/

Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 將文件放到Hadoop上
# 傳輸到本地服務器 hadoop fs -put / hello2019.sh / # 傳輸到遠程服務器 hadoop fs -put hello2019.sh hdfs://localhost:9000/
  • 1
  • 2
  • 3
  • 4
  • 從Hadoop上讀取數據
# sc 即sparkContext scala> sc res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@25198ead

計算WordCount

scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))

保存到HDFS上 多個文件

scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+).sortBy(_._2,false).saveAsTextFile(“hdfs://localhost:9000/wordcount/20190525/out”)

保存到HDFS上 一個文件 (reduceByKey(+,1))

scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+,1).sortBy(_._2,false).saveAsTextFile(“hdfs://localhost:9000/wordcount/20190525-2/out”)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 輸出結果(單文件)
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525/out/* Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 19/05/25 16:29:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (t,8) (l,3) (a,3) (i,3) (y,3) (p,2) (e,2) (c,2) (0,1) (b,1) (h,1) (2,1) ( ,1) (k,1) (o,1) (9,1) (1,1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 輸出結果(多文件)
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525-2/out/* Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 19/05/25 16:30:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (t,8) (a,3) (i,3) (y,3) (l,3) (e,2) (p,2) (c,2) (0,1) (k,1) (b,1) (h,1) (2,1) ( ,1) (o,1) (9,1) (1,1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

WordCount細節

  • textFile()
    從某個地方讀取數據, 并轉換為RDD返回.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").collect res10: Array[String] = Array(hello 2019, cat, pitty, kitty, able, pitty, cat)
  • 1
  • 2
  • map()
    遍歷所有的元素.

  • split()
    拆分.

  • collect
    搜集到主結點.

scala> sc.textFile("hdfs://localhost:9000/wordcount/input").map(_.split(" ")).collect res12: Array[Array[String]] = Array(Array(hello, 2019), Array(cat), Array(pitty), Array(kitty), Array(able), Array(pitty), Array(cat))
  • 1
  • 2
  • flatMap() & map().flatten
    map().flatten是Scala內到寫法, Spark內貌似沒有這樣到寫法.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).collect res17: Array[String] = Array(hello, 2019, cat, pitty, kitty, able, pitty, cat)
  • 1
  • 2
  • map((_,1))
    添加計數操作.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).collect res19: Array[(String, Int)] = Array((hello,1), (2019,1), (cat,1), (pitty,1), (kitty,1), (able,1), (pitty,1), (cat,1))
  • 1
  • 2
  • reduceByKey()
    根據key值進行劃分. reduceByKey是RDD獨有的算子, Scala內不存在.
    后面的((F),1)的1是什么含義, 指定分區數目.(上面的輸出例子, 將結果寫入1個文件還是3個文件.)
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => (x+y)).collect res20: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))

scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split(" ")).map((,1)).reduceByKey(+).collect
res21: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))

scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split(" ")).map((,1)).reduceByKey(+,1).collect
res22: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (kitty,1), (cat,2))

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • reduceByKey 與 groupByKey的區別?
    reduceByKey先在各個分片進行計算, 最后進行匯總計算. groupByKey直接進行匯總計算.


    深入理解groupByKey、reduceByKey區別——本質就是一個local machine的reduce操作
    reduceByKey應用舉例

  • sortBy(_._2,false)
    Spark上的sortBy(X,false), 后一個參數可以表示是生序還是降序的.

scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
  • 1
  • 2
  • saveAsTextFile()
    將處理后的結果存儲出來.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525/out")
  • 1
  • 2

Reference

[1]. 深入理解groupByKey、reduceByKey區別——本質就是一個local machine的reduce操作
[2]. reduceByKey應用舉例

</div><link href="https://csdnimg.cn/release/phoenix/mdeditor/markdown_views-095d4a0b23.css" rel="stylesheet"></div>

總結

以上是生活随笔為你收集整理的【开发工具】Spark Shell 的使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。