Linkis1.0用户使用文档:JAVA和SCALA调用Linkis的接口示例代码
生活随笔
收集整理的這篇文章主要介紹了
Linkis1.0用户使用文档:JAVA和SCALA调用Linkis的接口示例代码
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Linkis 提供了方便的JAVA和SCALA調用的接口,只需要引入linkis-computation-client的模塊就可以進行使用,1.0后新增支持帶Label提交的方式,下面將對兼容0.X的方式和1.0新增的方式進行介紹
1. 引入依賴模塊
<dependency><groupId>com.webank.wedatasphere.linkis</groupId><artifactId>linkis-computation-client</artifactId><version>${linkis.version}</version> </dependency> 如: <dependency><groupId>com.webank.wedatasphere.linkis</groupId><artifactId>linkis-computation-client</artifactId><version>1.0.0</version> </dependency>2. 兼容0.X的Execute方法提交
2.1 Java測試代碼
建立Java的測試類UJESClientImplTestJ,具體接口含義可以見注釋:
package com.webank.wedatasphere.linkis.client.test;import com.webank.wedatasphere.linkis.common.utils.Utils; import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy; import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy; import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig; import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder; import com.webank.wedatasphere.linkis.ujes.client.UJESClient; import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl; import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction; import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction; import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult; import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult; import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult; import org.apache.commons.io.IOUtils;import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit;public class LinkisClientTest {public static void main(String[] args){String user = "hadoop";String executeCode = "show databases;";// 1. 配置DWSClientBuilder,通過DWSClientBuilder獲取一個DWSClientConfigDWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder().addServerUrl("http://${ip}:${port}") //指定ServerUrl,linkis服務器端網關的地址,如http://{ip}:{port}.connectionTimeout(30000) //connectionTimeOut 客戶端連接超時時間.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否啟用注冊發現,如果啟用,會自動發現新啟動的Gateway .loadbalancerEnabled(true) // 是否啟用負載均衡,如果不啟用注冊發現,則負載均衡沒有意義.maxConnectionSize(5) //指定最大連接數,即最大并發數.retryEnabled(false).readTimeout(30000) //執行失敗,是否允許重試.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis認證方式.setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //認證key,一般為用戶名; 認證value,一般為用戶名對應的密碼.setDWSVersion("v1").build(); //linkis后臺協議的版本,當前版本為v1// 2. 通過DWSClientConfig獲取一個UJESClientUJESClient client = new UJESClientImpl(clientConfig);try {// 3. 開始執行代碼System.out.println("user : " + user + ", code : [" + executeCode + "]");Map<String, Object> startupMap = new HashMap<String, Object>();startupMap.put("wds.linkis.yarnqueue", "default"); // 在startupMap可以存放多種啟動參數,參見linkis管理臺配置JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder().setCreator("linkisClient-Test") //creator,請求linkis的客戶端的系統名,用于做系統級隔離.addExecuteCode(executeCode) //ExecutionCode 請求執行的代碼.setEngineType((JobExecuteAction.EngineType) JobExecuteAction.EngineType$.MODULE$.HIVE()) // 希望請求的linkis的執行引擎類型,如Spark hive等.setUser(user) //User,請求用戶;用于做用戶級多租戶隔離.setStartupParams(startupMap).build());System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());// 4. 獲取腳本的執行狀態JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);int sleepTimeMills = 1000;while(!jobInfoResult.isCompleted()) {// 5. 獲取腳本的執行進度JobProgressResult progress = client.progress(jobExecuteResult);Utils.sleepQuietly(sleepTimeMills);jobInfoResult = client.getJobInfo(jobExecuteResult);}// 6. 獲取腳本的Job信息JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);// 7. 獲取結果集列表(如果用戶一次提交多個SQL,會產生多個結果集)String resultSet = jobInfo.getResultSetList(client)[0];// 8. 通過一個結果集信息,獲取具體的結果集Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();System.out.println("fileContents: " + fileContents);} catch (Exception e) {e.printStackTrace();IOUtils.closeQuietly(client);}IOUtils.closeQuietly(client);} }運行上述的代碼即可以和Linkis進行交互
3. Scala測試代碼:
package com.webank.wedatasphere.linkis.client.testimport java.util.concurrent.TimeUnitimport com.webank.wedatasphere.linkis.common.utils.Utils import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder import com.webank.wedatasphere.linkis.ujes.client.UJESClient import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.EngineType import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction} import org.apache.commons.io.IOUtilsobject LinkisClientImplTest extends App {var executeCode = "show databases;"var user = "hadoop"// 1. 配置DWSClientBuilder,通過DWSClientBuilder獲取一個DWSClientConfigval clientConfig = DWSClientConfigBuilder.newBuilder().addServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服務器端網關的地址,如http://{ip}:{port}.connectionTimeout(30000) //connectionTimeOut 客戶端連接超時時間.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否啟用注冊發現,如果啟用,會自動發現新啟動的Gateway.loadbalancerEnabled(true) // 是否啟用負載均衡,如果不啟用注冊發現,則負載均衡沒有意義.maxConnectionSize(5) //指定最大連接數,即最大并發數.retryEnabled(false).readTimeout(30000) //執行失敗,是否允許重試.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis認證方式.setAuthTokenKey("${username}").setAuthTokenValue("${password}") //認證key,一般為用戶名; 認證value,一般為用戶名對應的密碼.setDWSVersion("v1").build() //Linkis后臺協議的版本,當前版本為v1// 2. 通過DWSClientConfig獲取一個UJESClientval client = UJESClient(clientConfig)try {// 3. 開始執行代碼println("user : " + user + ", code : [" + executeCode + "]")val startupMap = new java.util.HashMap[String, Any]()startupMap.put("wds.linkis.yarnqueue", "default") //啟動參數配置val jobExecuteResult = client.execute(JobExecuteAction.builder().setCreator("LinkisClient-Test") //creator,請求Linkis的客戶端的系統名,用于做系統級隔離.addExecuteCode(executeCode) //ExecutionCode 請求執行的代碼.setEngineType(EngineType.SPARK) // 希望請求的Linkis的執行引擎類型,如Spark hive等.setStartupParams(startupMap).setUser(user).build()) //User,請求用戶;用于做用戶級多租戶隔離println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)// 4. 獲取腳本的執行狀態var jobInfoResult = client.getJobInfo(jobExecuteResult)val sleepTimeMills : Int = 1000while (!jobInfoResult.isCompleted) {// 5. 獲取腳本的執行進度 val progress = client.progress(jobExecuteResult)val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.emptyprintln("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)Utils.sleepQuietly(sleepTimeMills)jobInfoResult = client.getJobInfo(jobExecuteResult)}if (!jobInfoResult.isSucceed) {println("Failed to execute job: " + jobInfoResult.getMessage)throw new Exception(jobInfoResult.getMessage)}// 6. 獲取腳本的Job信息val jobInfo = client.getJobInfo(jobExecuteResult)// 7. 獲取結果集列表(如果用戶一次提交多個SQL,會產生多個結果集)val resultSetList = jobInfoResult.getResultSetList(client)println("All result set list:")resultSetList.foreach(println)val oneResultSet = jobInfo.getResultSetList(client).head// 8. 通過一個結果集信息,獲取具體的結果集val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContentprintln("First fileContents: ")println(fileContents)} catch {case e: Exception => {e.printStackTrace()}}IOUtils.closeQuietly(client) }3. 1.0新增的支持帶Label提交的Submit方式
1.0 新增了client.submit方法,用于對接1.0新的任務執行接口,支持傳入Label等參數
3.1 Java測試類
package com.webank.wedatasphere.linkis.client.test;import com.webank.wedatasphere.linkis.common.utils.Utils; import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy; import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig; import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder; import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant; import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant; import com.webank.wedatasphere.linkis.ujes.client.UJESClient; import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl; import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction; import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction; import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult; import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult; import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult; import org.apache.commons.io.IOUtils;import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit;public class JavaClientTest {public static void main(String[] args){String user = "hadoop";String executeCode = "show tables";// 1. 配置ClientBuilder,獲取ClientConfigDWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder().addServerUrl("http://${ip}:${port}") //指定ServerUrl,linkis服務器端網關的地址,如http://{ip}:{port}.connectionTimeout(30000) //connectionTimeOut 客戶端連接超時時間.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否啟用注冊發現,如果啟用,會自動發現新啟動的Gateway.loadbalancerEnabled(true) // 是否啟用負載均衡,如果不啟用注冊發現,則負載均衡沒有意義.maxConnectionSize(5) //指定最大連接數,即最大并發數.retryEnabled(false).readTimeout(30000) //執行失敗,是否允許重試.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis認證方式.setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //認證key,一般為用戶名; 認證value,一般為用戶名對應的密碼.setDWSVersion("v1").build(); //linkis后臺協議的版本,當前版本為v1// 2. 通過DWSClientConfig獲取一個UJESClientUJESClient client = new UJESClientImpl(clientConfig);try {// 3. 開始執行代碼System.out.println("user : " + user + ", code : [" + executeCode + "]");Map<String, Object> startupMap = new HashMap<String, Object>();// 在startupMap可以存放多種啟動參數,參見linkis管理臺配置startupMap.put("wds.linkis.yarnqueue", "q02");//指定LabelMap<String, Object> labels = new HashMap<String, Object>();//添加本次執行所依賴的的標簽:EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabellabels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1");labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");labels.put(LabelKeyConstant.CODE_TYPE_KEY, "hql");//指定sourceMap<String, Object> source = new HashMap<String, Object>();source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test");JobExecuteResult jobExecuteResult = client.submit( JobSubmitAction.builder().addExecuteCode(executeCode).setStartupParams(startupMap).setUser(user)//Job提交用戶.addExecuteUser(user)//實際執行用戶.setLabels(labels).setSource(source).build());System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());// 4. 獲取腳本的執行狀態JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);int sleepTimeMills = 1000;while(!jobInfoResult.isCompleted()) {// 5. 獲取腳本的執行進度JobProgressResult progress = client.progress(jobExecuteResult);Utils.sleepQuietly(sleepTimeMills);jobInfoResult = client.getJobInfo(jobExecuteResult);}// 6. 獲取腳本的Job信息JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);// 7. 獲取結果集列表(如果用戶一次提交多個SQL,會產生多個結果集)String resultSet = jobInfo.getResultSetList(client)[0];// 8. 通過一個結果集信息,獲取具體的結果集Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();System.out.println("fileContents: " + fileContents);} catch (Exception e) {e.printStackTrace();IOUtils.closeQuietly(client);}IOUtils.closeQuietly(client);} }3.2 Scala 測試類
package com.webank.wedatasphere.linkis.client.testimport java.util import java.util.concurrent.TimeUnitimport com.webank.wedatasphere.linkis.common.utils.Utils import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant import com.webank.wedatasphere.linkis.ujes.client.UJESClient import com.webank.wedatasphere.linkis.ujes.client.request.{JobSubmitAction, ResultSetAction} import org.apache.commons.io.IOUtilsobject ScalaClientTest {def main(args: Array[String]): Unit = {val executeCode = "show tables"val user = "hadoop"// 1. 配置DWSClientBuilder,通過DWSClientBuilder獲取一個DWSClientConfigval clientConfig = DWSClientConfigBuilder.newBuilder().addServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服務器端網關的地址,如http://{ip}:{port}.connectionTimeout(30000) //connectionTimeOut 客戶端連接超時時間.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否啟用注冊發現,如果啟用,會自動發現新啟動的Gateway.loadbalancerEnabled(true) // 是否啟用負載均衡,如果不啟用注冊發現,則負載均衡沒有意義.maxConnectionSize(5) //指定最大連接數,即最大并發數.retryEnabled(false).readTimeout(30000) //執行失敗,是否允許重試.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis認證方式.setAuthTokenKey("${username}").setAuthTokenValue("${password}") //認證key,一般為用戶名; 認證value,一般為用戶名對應的密碼.setDWSVersion("v1").build() //Linkis后臺協議的版本,當前版本為v1// 2. 通過DWSClientConfig獲取一個UJESClientval client = UJESClient(clientConfig)try {// 3. 開始執行代碼println("user : " + user + ", code : [" + executeCode + "]")val startupMap = new java.util.HashMap[String, Any]()startupMap.put("wds.linkis.yarnqueue", "q02") //啟動參數配置//指定Labelval labels: util.Map[String, Any] = new util.HashMap[String, Any]//添加本次執行所依賴的的標簽,如engineLabellabels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1")labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE")labels.put(LabelKeyConstant.CODE_TYPE_KEY, "hql")//指定sourceval source: util.Map[String, Any] = new util.HashMap[String, Any]source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test")val jobExecuteResult = client.submit(JobSubmitAction.builder.addExecuteCode(executeCode).setStartupParams(startupMap).setUser(user) //Job提交用戶.addExecuteUser(user) //實際執行用戶.setLabels(labels).setSource(source).build) //User,請求用戶;用于做用戶級多租戶隔離println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)// 4. 獲取腳本的執行狀態var jobInfoResult = client.getJobInfo(jobExecuteResult)val sleepTimeMills : Int = 1000while (!jobInfoResult.isCompleted) {// 5. 獲取腳本的執行進度val progress = client.progress(jobExecuteResult)val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.emptyprintln("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)Utils.sleepQuietly(sleepTimeMills)jobInfoResult = client.getJobInfo(jobExecuteResult)}if (!jobInfoResult.isSucceed) {println("Failed to execute job: " + jobInfoResult.getMessage)throw new Exception(jobInfoResult.getMessage)}// 6. 獲取腳本的Job信息val jobInfo = client.getJobInfo(jobExecuteResult)// 7. 獲取結果集列表(如果用戶一次提交多個SQL,會產生多個結果集)val resultSetList = jobInfoResult.getResultSetList(client)println("All result set list:")resultSetList.foreach(println)val oneResultSet = jobInfo.getResultSetList(client).head// 8. 通過一個結果集信息,獲取具體的結果集val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContentprintln("First fileContents: ")println(fileContents)} catch {case e: Exception => {e.printStackTrace()}}IOUtils.closeQuietly(client)}}參考鏈接:https://github.com/WeBankFinTech/Linkis-Doc/edit/master/zh_CN/User_Manual/Linkis1.0%E7%94%A8%E6%88%B7%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md
總結
以上是生活随笔為你收集整理的Linkis1.0用户使用文档:JAVA和SCALA调用Linkis的接口示例代码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 案例实操:Azkaban调度spark作
- 下一篇: 快速部署Linkis1.0文档