日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark 框架安全认证实现

發布時間:2024/1/17 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark 框架安全认证实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

導言

隨著大數據集群的使用,大數據的安全受到越來越多的關注一個安全的大數據集群的使用,運維必普通的集群更為復雜。
集群的安全通常基于kerberos集群完成安全認證。kerberos基本原理可參考:一張圖了解Kerberos訪問流程

Spark應用(On Yarn模式下)在安全的hadoop集群下的訪問,需要訪問各種各樣的組件/進程,如ResourceManager,NodeManager,NameNode,DataNode,Kafka,Hmaster,HregionServer,MetaStore等等。尤其是在長時運行的應用,如sparkStreaming,StructedStreaming,如何保證用戶認證后的長期有效性,其安全/認證更為復雜。

一個Spark應用提交用戶要先在kdc中完成用戶的認證,及拿到對應service服務的票據之后才能訪問對應的服務。由于Spark應用運行時涉及yarnclient,driver,applicationMaster,executor等多個服務,這其中每個進程都應當是同一個用戶啟動并運行,這就涉及到多個進程中使用同一個用戶的票據來對各種服務進行訪問,本文基于Spark2.3對此做簡要分析。

  • spark應用包含進程
進程功能yarn-client模式yarn-cluster模式
yarnclientSpark應用提交app的模塊yarn-client模式下生命周期與driver一致;yarn-cluster模式下可以設置為app提交后即退出,或者提交后一直監控app運行狀態
driverspark應用驅動器,調度應用邏輯,應用的“大腦”yarn-client模式下運行在yarnclient的JVM中;yarn-cluster模式下運行在applicationMaster中
applicationMaster基于yarn服務抽象出的app管理者yarn-client模式下僅僅負責啟動/監控container,匯報應用狀態的功能;yarn-cluster模式下負責啟動/監控container,匯報應用狀態的功,同時包含driver功能
Executorspark應用的執行器,yarn應用的container實體,業務邏輯的實際執行者

spark應用的提交用戶認證之后才能提交應用,所以在yarnclient/driver的邏輯中必然會執行到kerberos認證相關的登錄認證。然而其他的進程如applicationMaster,executor等均需要經過認證,應用提交后才由用戶啟動,這些進程則可以不進行kerberos認證而是利用Hadoop的token機制完成認證,減小kerberos服務壓力,同時提高訪問效率。

  • Hadoop Token機制

Hadoop的token實現基類為org.apache.hadoop.security.token.Token,

/*** Construct a token from the components.* @param identifier the token identifier* @param password the token's password* @param kind the kind of token* @param service the service for this token*/public Token(byte[] identifier, byte[] password, Text kind, Text service) {this.identifier = identifier;this.password = password;this.kind = kind;this.service = service;}

不同的服務也可hadoop的token來交互,只要使用不同的identifer來區分token即可。 如NMTokenIdentifier, AMRMTokenIdentifier,AuthenticationTokenIdentifier等不同的tokenIdentifier來區分不同的服務類型的token。

Spark應用各進程的安全實現

yarnclient的實現

此處yarnclient指的是向ResourceManager提交yarn應用的客戶端。在spark中,向yarn提交應用有兩種應用有yarn-client,yarn-cluster模式。在這兩種應用模式下提交應用,yarn client邏輯有些許不同。

安全hadoop場景下spark的用戶登錄認證機制

  • spark提交應用時,通過--principal, --keytab參數傳入認證所需文件。
    在sparkSubmit中prepareSubmitEnvironment時,完成認證

    // assure a keytab is available from any place in a JVMif (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {if (args.principal != null) {if (args.keytab != null) {require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")// Add keytab and principal configurations in sysProps to make them available// for later use; e.g. in spark sql, the isolated class loader used to talk// to HiveMetastore will use these settings. They will be set as Java system// properties and then loaded by SparkConfsparkConf.set(KEYTAB, args.keytab)sparkConf.set(PRINCIPAL, args.principal)UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)}}}
  • 在yarn-cluster模式下,不會調用業務層代碼,即不會初始化SparkContext,其通過YarnClusterApplication的start方法調用client.submitApplication提交應用

  • 在yarn-client模式下,會在yarnclient邏輯中調用業務代碼,即會初始化并運行SparkContext,通過YarnClientSchedulerBackend其調度client.submitApplication提交應用。

在client的submitApplication方法中提交app,之后創建amContext,準備本地資源,此時會將本地的文件上傳至HDFS,其中就包括keytab文件,同時會生成spark_conf.properties配置文件以供am使用,該配置文件中會包含keytab的配置

val props = new Properties()sparkConf.getAll.foreach { case (k, v) =>props.setProperty(k, v)}// Override spark.yarn.key to point to the location in distributed cache which will be used// by AM.Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }

其中的amKeytabFileName是在setUpCredentials時設置如下,該值為指定的keytab文件加上隨機的字符串后綴,騎在am重點使用,可參考下節的介紹。

val f = new File(keytab)// Generate a file name that can be used for the keytab file, that does not conflict// with any user file.amKeytabFileName = f.getName + "-" + UUID.randomUUID().toStringsparkConf.set(PRINCIPAL.key, principal)

獲取相關組件的token,注意:此處的token均非與yarn服務交互相關token,這里只有與HDFS,HBASE,Hive服務交互的token。

def obtainDelegationTokens(hadoopConf: Configuration,creds: Credentials): Long = { delegationTokenProviders.values.flatMap { provider =>if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {// 各provider的obtainDelegationTokens方法中,會獲取對應組件的token,并放入credentials中provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)} else {logDebug(s"Service ${provider.serviceName} does not require a token." +s" Check your configuration to see if security is disabled or not.")None} }.foldLeft(Long.MaxValue)(math.min)

}

Spark中常訪問的服務使用token機制的有hive,hbase,hdfs,對應的tokenProvider如下:

服務tokenProvidertoken獲取類token獲取方法
HDFSHadoopFSDelegationTokenProviderorg.apache.hadoop.hbase.security.token.TokenUtilobtainToken
HIVEHiveDelegationTokenProviderorg.apache.hadoop.hive.ql.metadatagetDelegationToken
HBASEHBaseDelegationTokenProviderorg.apache.hadoop.hdfs.DistributedFileSystemaddDelegationTokens

以HbaseDelegationTokenProvider為例,主要是通過反射調用hbase的TokenUtil類的obtainTOken方法,對應的obtainDelegationTokens方法如下:

override def obtainDelegationTokens(hadoopConf: Configuration,sparkConf: SparkConf,creds: Credentials): Option[Long] = { try {val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)val obtainToken = mirror.classLoader.loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").getMethod("obtainToken", classOf[Configuration])logDebug("Attempting to fetch HBase security token.")val token = obtainToken.invoke(null, hbaseConf(hadoopConf)).asInstanceOf[Token[_ <: TokenIdentifier]]logInfo(s"Get token from HBase: ${token.toString}")creds.addToken(token.getService, token) } catch {case NonFatal(e) =>logDebug(s"Failed to get token from service $serviceName", e) } None }

PS : HBase的token獲取的用戶需要具有hbase:meta表的exec權限,否則無法成功獲取token

在獲取token后,將token設置到amContainer中,并放入appContext中

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { val dob = new DataOutputBuffer credentials.writeTokenStorageToStream(dob) amContainer.setTokens(ByteBuffer.wrap(dob.getData)) } // appContext.setAMContainerSpec(containerContext)

driver的token更新

在yarn-client模式下,driver在yarnclient進程中啟動,同樣需要訪問業務層及集群的相關組件如hdfs。driver通過讀取am更新在hdfs路徑下的credentials文件來保證driver節點的token有效。

// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver // reads the credentials from HDFS, just like the executors and updates its own credentials // cache. if (conf.contains("spark.yarn.credentials.file")) {YarnSparkHadoopUtil.startCredentialUpdater(conf) }

在yarn-cluster模式下,driver運行在applicationMaster的JVM中,其安全相關由Am同一操作

ApplicationMaster 的安全認證

applicationMaster是Yarn進行應用調度/管理的核心,需要與RM/NM等進行交互以便應用運行。其中相關的交互均通過token完成認證,認證實現由Yarn內部框架完成。查看am日志發現,即是在非安全(非kerberos)的場景下,同樣會使用到token。而與hdfs,hbase等服務交互使用的token則需Spark框架來實現。

applicationMaster中與YARN相關的認證

  • AM與RM的認證

在ResourceManager接收到應用提交的ApplicationSubmissionContext后,在其AmLauncher.java的run方法中為am設置生成“YARN_AM_RM_TOKEN,該token用于am于rm通信使用”

public Token<AMRMTokenIdentifier> createAndGetAMRMToken(ApplicationAttemptId appAttemptId) { this.writeLock.lock(); try {LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);AMRMTokenIdentifier identifier =new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey().getKeyId());byte[] password = this.createPassword(identifier);appAttemptSet.add(appAttemptId);return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,identifier.getKind(), new Text()); } finally {this.writeLock.unlock(); } }
  • AM與NM的認證

Am在啟動之后,會向ResourceManager申請container,并與對應的NodeManager通信以啟動container。然而AM與NM通信的token是如何得到的呢?

查看AMRMClientImpl類可以看到,AM向RM發送分配請求,RM接收到請求后,將container要分配至的NM節點的Token放置response中返回給AM。Am接收到response后,會保存NMToken,并判定是否需要更新YARN_AM_RM_TOKEN

//通過rmClient向RM發送分配請求 allocateResponse = rmClient.allocate(allocateRequest); //拿到response后,保存NMToken并根據response判定是否需要更新AMRM通信的TOken if (!allocateResponse.getNMTokens().isEmpty()) {populateNMTokens(allocateResponse.getNMTokens());}if (allocateResponse.getAMRMToken() != null) {updateAMRMToken(allocateResponse.getAMRMToken());}

RM通過ApplicationMasterService響應allocation請求

// 通過調度器為cotnainer分配NodeManager并生成該NodeManager的Token放入allcation中Allocation allocation =this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals);......if (!allocation.getContainers().isEmpty()) {allocateResponse.setNMTokens(allocation.getNMTokens());}

AM在準備啟動container時,將當前用戶的token都設置進ContainerLaunchContext中

def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]).asInstanceOf[ContainerLaunchContext] val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData()))

ApplicationMaster業務相關的服務的token更新

Am啟動的資源情況

查看Am啟動命令大致如下,可以發現有指定配置文件,而該配置文件即為yarnclient生成上傳至hdfs,在am啟動前由NodeManager從hdfs中copy至本地路徑,供container使用:

/usr/jdk64/jdk1.8.0_77//bin/java -server -Xmx512m -Djava.io.tmpdir=/localpath/*/tmp -Dspark.yarn.app.container.log.dir=/localpath/*/ org.apache.spark.deploy.yarn.ExecutorLauncher --arg host:port --properties-file /localpath/*/__spark_conf__/__spark_conf__.properties

查看此配置文件可以看到有如下配置項:

spark.yarn.principal=ocsp-ygcluster@ASIAINFO.COM spark.yarn.keytab=hbase.headless.keytab-18f29b79-b7a6-4cb2-b79d-4305432a5e9a

下圖為am進程使用到的資源文件

?

am進程資源.jpg

如上可以看出,am雖然運行在集群中,但運行時認證相關的資源已經準備就緒。下面分析其運行中關于安全的邏輯

Am安全認證及token更新邏輯

在applicationMaster中,定期更新token,并寫入文件到hdfs的相關目錄,并清理舊文件以供各executor使用。
在ApplicationMaster啟動后,進行login登錄并啟動名為am-kerberos-renewer的dameon線程定期登錄,保證用戶認證的有效性

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

// If a principal and keytab were provided, log in to kerberos, and set up a thread to // renew the kerberos ticket when needed. Because the UGI API does not expose the TTL // of the TGT, use a configuration to define how often to check that a relogin is necessary. // checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed. val principal = sparkConf.get(PRINCIPAL).orNull val keytab = sparkConf.get(KEYTAB).orNull if (principal != null && keytab != null) {UserGroupInformation.loginUserFromKeytab(principal, keytab)val renewer = new Thread() {override def run(): Unit = Utils.tryLogNonFatalError {while (true) {TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()}}}renewer.setName("am-kerberos-renewer")renewer.setDaemon(true)renewer.start()// Transfer the original user's tokens to the new user, since that's needed to connect to// YARN. It also copies over any delegation tokens that might have been created by the// client, which will then be transferred over when starting executors (until new ones// are created by the periodic task).val newUser = UserGroupInformation.getCurrentUser()SparkHadoopUtil.get.transferCredentials(original, newUser)newUser } else {SparkHadoopUtil.get.createSparkUser() } }

在am中啟動AMCredentialRenewerStarter線程,調度認證登錄及token renew邏輯

if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {val credentialRenewerThread = new Thread {setName("AMCredentialRenewerStarter")setContextClassLoader(userClassLoader)override def run(): Unit = {val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf,yarnConf,conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))val credentialRenewer =new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)credentialRenewer.scheduleLoginFromKeytab()}}credentialRenewerThread.start()credentialRenewerThread.join()}

在scheduleLoginFromKeytab中,會周期調度登錄,token獲取更新寫入hdfs文件等操作。
其核心邏輯如下

調度周期:

各種組件的token更新周期如hdfs的更新周期dfs.namenode.delegation.token.renew-interval默認為1天,hbase的token更新周期hbase.auth.key.update.interval默認為1天;調度更新的周期為如上各組件最小值的75%,

調度流程:

//將生成的token寫入hdfs目錄${spark.yarn.credentials.file}-${timeStamp}-${nextSuffix} writeNewCredentialsToHDFS(principal, keytab) //刪除邏輯為保留五個(${spark.yarn.credentials.file.retention.count})文件,文件更新時間早于五天(${spark.yarn.credentials.file.retention.days})的全部清理 cleanupOldFiles()

Executor的認證機制

executor的認證同樣使用的是token機制。executor啟動之后,根據driver啟動設置的${spark.yarn.credentials.file}啟動token更新:

if (driverConf.contains("spark.yarn.credentials.file")) {logInfo("Will periodically update credentials from: " +driverConf.get("spark.yarn.credentials.file"))Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").getMethod("startCredentialUpdater", classOf[SparkConf]).invoke(null, driverConf)}

Executor中的token更新是讀取hdfs目錄{timeStamp}-${nextSuffix}目錄下的文件,讀取到緩存中,以便保證讀取到的是更新后的token使用。

安全Spark的使用

Spark框架完成的kerberos認證及使用token與其他服務交互的機制使用較為簡單,只需要在提交應用時的spark-submit命令行中加入--principal appuserName --keytab /path/to/user.keytab即可


?

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Spark 框架安全认证实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。