从实战中了解数据开发全流程——DataWorks OpenAPI实战
DataWorks作為飛天大數(shù)據(jù)平臺(tái)操作系統(tǒng),歷經(jīng)11年發(fā)展,形成了涵蓋數(shù)據(jù)集成、數(shù)據(jù)開發(fā)、數(shù)據(jù)治理、數(shù)據(jù)服務(wù)的一站式大數(shù)據(jù)開發(fā)治理平臺(tái)。很多企業(yè)用戶在使用產(chǎn)品的過程中希望他們的本地服務(wù)能夠和阿里云上的DataWorks服務(wù)進(jìn)行交互,從而提升企業(yè)大數(shù)據(jù)處理的效率,減少人工操作和運(yùn)維工作,降低數(shù)據(jù)風(fēng)險(xiǎn)和企業(yè)成本,現(xiàn)在DataWorks開放OpenAPI能力滿足企業(yè)的定制化需求。
DataWorks OpenAPI涵蓋租戶、元數(shù)據(jù)、數(shù)據(jù)開發(fā)、運(yùn)維中心、數(shù)據(jù)質(zhì)量、數(shù)據(jù)服務(wù)等DataWorks核心能力,企業(yè)版和旗艦版分別贈(zèng)送100萬次/月、1000萬次/月的免費(fèi)調(diào)用額度。
關(guān)于Dataworks OpenAPI開通要求和開放地域可查閱DataWorks OpenAPI概述
限D(zhuǎn)ataWorks企業(yè)版及以上使用立即開通
開通7天試用請(qǐng)使用釘釘掃碼聯(lián)系
實(shí)戰(zhàn)簡(jiǎn)介
我們假設(shè)這樣一個(gè)簡(jiǎn)單的場(chǎng)景,開發(fā)人員想把RDS庫(kù)里面的數(shù)據(jù)同步到一張MaxCompute分區(qū)表中,然后在自建系統(tǒng)的頁(yè)面上展示經(jīng)過數(shù)據(jù)分析后的報(bào)表數(shù)據(jù),那么如何通過DataWorks OpenAPI去完成整個(gè)鏈路的實(shí)現(xiàn)呢?
實(shí)戰(zhàn)準(zhǔn)備
一、引入DataWorks OpenAPI SDK
這一部分可參考 安裝 DataWorks OpenAPI Java SDK,除了java語言,我們還支持Python,PHP,C#,Go 等語言支持。默認(rèn)情況下我們不需要顯式去指定DataWorks OpenAPI的EndPoint,但是如果aliyun-java-sdk-core版本偏低的情況下可能會(huì)找不到DataWorks OpenAPI的Endpoint,這時(shí)候可在不升級(jí)版本的情況下通過使用如下代碼進(jìn)行請(qǐng)求。
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");IAcsClient client = new DefaultAcsClient(profile);如上代碼是顯式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.aliyuncs.com這樣的域名格式在公網(wǎng)環(huán)境下可訪問,但是有些用戶需要在VPC環(huán)境下調(diào)用OpenAPI,那么則需要把域名dataworks.${regionId}.aliyuncs.com 變更成 dataworks-vpc.${regionId}.aliyuncs.com,這樣在VPC網(wǎng)絡(luò)環(huán)境下即使不能訪問公網(wǎng)也能請(qǐng)求到DataWorks OpenAPI。
如果您不清楚regionId(地域ID)的概念,可參考地域和可用區(qū)。
二、了解DataWorks OpenAPI文檔
詳細(xì)閱讀DataWorks OpenAPI文檔對(duì)開發(fā)非常有幫助,做API開發(fā)時(shí)如果對(duì)參數(shù)的約束不太理解時(shí)可參考DataWorks OpenAPI文檔,里面對(duì)每個(gè)出入?yún)ⅰ?shù)示例、錯(cuò)誤碼描述都有詳細(xì)的解釋。點(diǎn)擊查看API參考>>
實(shí)戰(zhàn)步驟
步驟一:創(chuàng)建RDS數(shù)據(jù)源
集成租戶API可創(chuàng)建引擎、創(chuàng)建數(shù)據(jù)源、查看項(xiàng)目空間等信息。在我們這個(gè)業(yè)務(wù)場(chǎng)景中,MaxCompute分區(qū)表存在于MaxCompute引擎中,我們?cè)贒ataWorks管控臺(tái)創(chuàng)建完MaxCompute工作空間后會(huì)自動(dòng)創(chuàng)建好MaxCompute引擎的數(shù)據(jù)源,所以我們只需要使用【CreateConnection】創(chuàng)建好RDS數(shù)據(jù)源即可:
CreateConnectionRequest createRequest = new CreateConnectionRequest();createRequest.setProjectId(-1L);createRequest.setName("TEST_CONNECTION");createRequest.setConnectionType("MYSQL");createRequest.setEnvType(1);createRequest.setContent("{\"password\":\"12345\"}");Long connectionId;try {CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);Assert.assertNotNull(createResponse.getData());connectionId = createResponse.getData();UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();updateRequest.setConnectionId(connectionId);updateRequest.setDescription("1");UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);Assert.assertTrue(acsResponse.getData());DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();deleteRequest.setConnectionId(connectionId);DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);Assert.assertTrue(deleteResponse.getData());} catch (ClientException e) {e.printStackTrace();Assert.fail();}UpdateConnection和DeleteConnection可分別修改和刪除數(shù)據(jù)源信息。另外對(duì)項(xiàng)目空間的成員進(jìn)行管理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。
步驟二:表的創(chuàng)建
集成DataWorks元數(shù)據(jù)OpenAPI我們能管理引擎?zhèn)鹊谋硇畔?#xff0c;通過DataWorks管控臺(tái)和租戶API我們完成了MaxCompute引擎和RDS數(shù)據(jù)源的創(chuàng)建工作,下一步需要完成表的創(chuàng)建,可通過元數(shù)據(jù)的【CreateTable】完成:
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");IAcsClient client = new DefaultAcsClient(profile);CreateTableRequest request = new CreateTableRequest();request.setTableName("table_test");request.setColumnss(new ArrayList<>());request.setEndpoint("endpoint");CreateTableResponse response = client.getAcsResponse(request);String nextTaskId = response.getTaskInfo().getNextTaskId();System.out.println(nextTaskId);關(guān)于表管理的API集是CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable等,除了可對(duì)表進(jìn)行管理,元數(shù)據(jù)API還能對(duì)表元數(shù)據(jù)、表主題進(jìn)行管理,更多詳情可參考DataWorks OpenAPI文檔。
步驟三:任務(wù)開發(fā)和發(fā)布調(diào)度
集成數(shù)據(jù)開發(fā)API可管理文件,并對(duì)文件進(jìn)行提交和發(fā)布后生成周期任務(wù),周期任務(wù)會(huì)定時(shí)調(diào)度運(yùn)行,創(chuàng)建不同類型的文件是根據(jù)FileType這個(gè)字段決定的,目前我們已支持非常多的FileType,通過運(yùn)維中心的API【ListProgramTypeCount】可獲取所有已支持的系統(tǒng)節(jié)點(diǎn)以及自定義節(jié)點(diǎn)。
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");IAcsClient client = new DefaultAcsClient(profile);CreateFileRequest createFileRequest = new CreateFileRequest();createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());createFileRequest.setInputList(projectIdentifier+"_root");createFileRequest.setContent(content);createFileRequest.setFileName("create_file_" + caseId);createFileRequest.setFileFolderPath("業(yè)務(wù)流程/POP接口測(cè)試/MaxCompute/test_folder_3");createFileRequest.setFileDescription("create file " + caseId);createFileRequest.setRerunMode("ALL_ALLOWED");CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);content字段存儲(chǔ)SQL腳本、Shell腳本、數(shù)據(jù)集成的腳本代碼,數(shù)據(jù)集成的腳本格式可參考通過腳本模式配置任務(wù)。
使用【CreateFile】創(chuàng)建完腳本后,如需修改可使用UpdateFile、DeleteFile進(jìn)行管理。和頁(yè)面上的操作流程一致的是完成文件開發(fā)后得提交和發(fā)布文件才會(huì)生成周期實(shí)例,這里要注意的是需要輪詢SubmitFile返回的 DeploymentId,只有當(dāng)GetDeployment返回的狀態(tài)是完成時(shí)(status.finished())才表示部署成功。
如果是在標(biāo)準(zhǔn)模式的項(xiàng)目下開發(fā),提交完成后,還需要發(fā)布文件才能最終提交到調(diào)度成為周期任務(wù)。發(fā)布文件使用DeployFile,和提交文件一樣,也需要使用GetDeployment輪詢部署狀態(tài)。
DeployFileRequest request = new DeployFileRequest();request.setFileId(fileId);request.setComment("deploy file");DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);//檢查發(fā)布部署結(jié)果DeploymentStatus status = null;GetDeploymentResponse.Data.Deployment deployment = null;int retryTimes = 0;while (retryTimes < 6) {GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());Assert.assertNotNull(getDeploymentResponse.getData());deployment = getDeploymentResponse.getData().getDeployment();Assert.assertNotNull(deployment);LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",deploymentId, new Gson().toJson(deployment));Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));status = Enums.find(DeploymentStatus.class, deployment.getStatus());Assert.assertNotNull(status);if (status.finished()) {LOGGER.info("Deployment finished - FinalStatus[{}]", status);break;}LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);retryTimes++;SleepUtils.seconds(10L);}數(shù)據(jù)開發(fā)API除了可對(duì)文件管理外,還能管理文件夾、資源、函數(shù),更多詳情可參考DataWorks OpenAPI文檔。
步驟四:配置運(yùn)維監(jiān)控
通過API完成周期任務(wù)的生產(chǎn)之后,會(huì)在DataWorks平臺(tái)每天生成調(diào)度實(shí)例被定時(shí)調(diào)度運(yùn)行,使用運(yùn)維中心API可對(duì)周期任務(wù)和周期實(shí)例進(jìn)行運(yùn)維操作,可通過GetNode、GetInstance、ListInstances等API查看周期任務(wù)和周期實(shí)例,監(jiān)控實(shí)例運(yùn)行情況。
GetInstanceRequest request = new GetInstanceRequest();request.setInstanceId(INSTANCE_ID);request.setProjectEnv(PROJECT_ENV);try {GetInstanceResponse response = client.getAcsResponse(request);Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());Assert.assertEquals("kzh", bizInstanceDto.getNodeName());Assert.assertEquals("", bizInstanceDto.getParamValues());Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());} catch (Exception e) {e.printStackTrace();Assert.fail();}如果實(shí)例運(yùn)行異常可通過RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance處理。
使用CreateRemind、UpdateRemind等API可創(chuàng)建自定義報(bào)警規(guī)則,確保每天基線順利產(chǎn)出,一旦異常可告警通知到人工,然后介入。
運(yùn)維中心主要提供周期任務(wù)、手動(dòng)業(yè)務(wù)流程、基線查詢、告警配置和查詢等相關(guān)API,可參考DataWorks OpenAPI文檔。
步驟五:配置數(shù)據(jù)質(zhì)量監(jiān)控
在這個(gè)業(yè)務(wù)場(chǎng)景中,我們通過前面介紹的API已經(jīng)可以每天定時(shí)把數(shù)據(jù)從RDS同步到MaxCompute的表中了。如果我們擔(dān)心產(chǎn)生臟數(shù)據(jù)或者數(shù)據(jù)缺失影響到線上業(yè)務(wù),那么可通過數(shù)據(jù)質(zhì)量API來集成DataWorks數(shù)據(jù)質(zhì)量監(jiān)控能力,當(dāng)表數(shù)據(jù)產(chǎn)出異常時(shí),可以立刻觸發(fā)給規(guī)則訂閱人。
CreateQualityRuleRequest request = new CreateQualityRuleRequest();request.setBlockType(0);request.setComment("test-createTemplateRuleSuccess");request.setCriticalThreshold("50");request.setEntityId(entityId);request.setOperator("abs");request.setPredictType(0);request.setProjectName(PROJECT_NAME);request.setProperty("table_count");request.setPropertyType("table");request.setRuleName("createTemplateRuleSuccess");request.setRuleType(0);request.setTemplateId(7);request.setWarningThreshold("10");try {CreateQualityRuleResponse response = client.getAcsResponse(request);Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));Long templateRuleId = Long.parseLong(data.toString());Assert.assertTrue(templateRuleId > 0);return templateRuleId;} catch (Exception e) {e.printStackTrace();Assert.assertFalse(true);return null;}CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等數(shù)據(jù)質(zhì)量API集可管理數(shù)據(jù)質(zhì)量規(guī)則,更多數(shù)據(jù)質(zhì)量API可參考DataWorks OpenAPI文檔。
步驟六:生成數(shù)據(jù)服務(wù)API
我們通過元數(shù)據(jù)API完成了表創(chuàng)建,通過數(shù)據(jù)開發(fā)API完成文件和周期任務(wù)創(chuàng)建,通過數(shù)據(jù)質(zhì)量和運(yùn)維中心API配置好了監(jiān)控規(guī)則,MaxCompute分區(qū)表數(shù)據(jù)亦可順利產(chǎn)生,這時(shí)候我們還需要最后一個(gè)步驟把MaxCompute分區(qū)表的數(shù)據(jù)通過數(shù)據(jù)服務(wù)OpenAPI生成一個(gè)數(shù)據(jù)服務(wù)API向系統(tǒng)提供數(shù)據(jù)服務(wù)。
CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();createRequest.setTenantId(tenantId);createRequest.setProjectId(projectId);createRequest.setApiMode(apiMode);createRequest.setApiName(apiName);createRequest.setApiPath(apiPath);createRequest.setApiDescription("test");createRequest.setGroupId(groupId);createRequest.setVisibleRange(visibleRange);createRequest.setTimeout(10000);createRequest.setProtocols(protocols);createRequest.setRequestMethod(requestMethod);createRequest.setResponseContentType(responseType);CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);Long apiId = createResponse.getData();Assert.assertNotNull(apiId);GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();getRequest.setTenantId(tenantId);getRequest.setProjectId(projectId);getRequest.setApiId(apiId);GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);GetDataServiceApiResponse.Data data = getResponse.getData();Assert.assertEquals(apiId, data.getApiId());Assert.assertEquals(0L, data.getFolderId().longValue());使用CreateDataServiceApi、PublishDataServiceApi可把表數(shù)據(jù)轉(zhuǎn)換成數(shù)據(jù)服務(wù)API,那么整個(gè)數(shù)據(jù)生產(chǎn)鏈路就完成了,集成以上的DataWorks OpenAPI即完成了本地系統(tǒng)和云上系統(tǒng)的無縫對(duì)接。
API調(diào)試小工具
DataWorks發(fā)布的所有API全部可在線調(diào)試,并以可見即所得的方式產(chǎn)生源碼,這樣可大大提高OpenAPI的開發(fā)效率,強(qiáng)烈推薦使用。DataWorks OpenAPI調(diào)試入口>>
總結(jié)
工欲善其數(shù),必先利其器!DataWorks OpenAPI是2020年正式發(fā)布的企業(yè)數(shù)據(jù)開發(fā)提效神器。通過OpenAPI的方式,能夠極大地提高企業(yè)使用DataWorks產(chǎn)品能力的靈活性。目前已發(fā)布150+個(gè)OpenAPI,并且還在持續(xù)增加中。本期實(shí)戰(zhàn)旨在幫助企業(yè)用戶了解如何快速上手DataWorks OpenAPI的實(shí)踐應(yīng)用,通過場(chǎng)景化的實(shí)戰(zhàn)演練體驗(yàn)DataWorks OpenAPI的強(qiáng)大能力。實(shí)戰(zhàn)系列內(nèi)容持續(xù)更新中,感謝大家的關(guān)注!
關(guān)于Dataworks OpenAPI開通要求和開放地域可查閱DataWorks OpenAPI概述
限D(zhuǎn)ataWorks企業(yè)版及以上使用立即開通
開通7天試用與折扣請(qǐng)使用釘釘掃碼聯(lián)系
原文鏈接:https://developer.aliyun.com/article/781486?
版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊(cè)用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請(qǐng)查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識(shí)產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報(bào),一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的从实战中了解数据开发全流程——DataWorks OpenAPI实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: DataWorks数据建模公开课上线啦!
- 下一篇: 数据仓库如何实现湖仓一体数据分析?