日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流...

發(fā)布時(shí)間:2024/1/17 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流... 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

管理Hadoop作業(yè)的官方工作流程調(diào)度程序是Apache Oozie。與許多其他Hadoop產(chǎn)品一樣,Oozie是用Java編寫的,是基于服務(wù)器的Web應(yīng)用程序,它運(yùn)行執(zhí)行Hadoop MapReduce和Pig的工作流作業(yè)。 Oozie工作流是在XML文檔中指定的控制依賴性指導(dǎo)非循環(huán)圖(DAG)中排列的動(dòng)作集合。雖然Oozie在Hadoop社區(qū)中有很多支持,但通過(guò)XML屬性配置工作流和作業(yè)的學(xué)習(xí)曲線非常陡峭。

Luigi是Spotify創(chuàng)建的Python替代方案,可以構(gòu)建和配置復(fù)雜的批處理作業(yè)管道。它處理依賴項(xiàng)解析,工作流管理,可視化等等。它還擁有龐大的社區(qū),并支持許多Hadoop技術(shù)。在github上超過(guò)1萬(wàn)星。

本章介紹Luigi的安裝和工作流程的詳細(xì)說(shuō)明。

安裝

pip install luigi

工作流

在Luigi中,工作流由一系列操作組成,稱為任務(wù)。 Luigi任務(wù)是非特定的,也就是說(shuō),它們可以是任何可以用Python編寫的東西。任務(wù)的輸入和輸出數(shù)據(jù)的位置稱為目標(biāo)(target)。目標(biāo)通常對(duì)應(yīng)于磁盤上,HDFS上或數(shù)據(jù)庫(kù)中的文件位置。除了任務(wù)和目標(biāo)之外,Luigi還利用參數(shù)來(lái)自定義任務(wù)的執(zhí)行方式。

  • 任務(wù)

任務(wù)是構(gòu)成Luigi工作流的操作序列。每個(gè)任務(wù)都聲明其依賴于其他任務(wù)創(chuàng)建的目標(biāo)。這樣Luigi能夠創(chuàng)建依賴鏈。

  • 目標(biāo)

目標(biāo)是任務(wù)的輸入和輸出。最常見(jiàn)的目標(biāo)是磁盤上的文件,HDFS中的文件或數(shù)據(jù)庫(kù)中的記錄。 Luigi包裝了底層文件系統(tǒng)操作,以確保與目標(biāo)的交互是原子的。這允許從故障點(diǎn)重放工作流,而不必重放任何已經(jīng)成功完成的任務(wù)。

  • 參數(shù)

參數(shù)允許通過(guò)允許值從命令行,以編程方式或從其他任務(wù)傳遞任務(wù)來(lái)自定義任務(wù)。例如,任務(wù)輸出的名稱可以通過(guò)參數(shù)傳遞給任務(wù)的日期來(lái)確定。

參考資料

  • python測(cè)試開(kāi)發(fā)項(xiàng)目實(shí)戰(zhàn)-目錄
  • python工具書籍下載-持續(xù)更新
  • python 3.7極速入門教程 - 目錄
  • 原文地址
  • 本文涉及的python測(cè)試開(kāi)發(fā)庫(kù) 謝謝點(diǎn)贊!
  • [本文相關(guān)海量書籍下載](https://github.com/china-testing/python-api-tesing/blob/master/books.md

工作流本示例

#!/usr/bin/env python # 項(xiàng)目實(shí)戰(zhàn)討論QQ群630011153 144081101 # https://github.com/china-testing/python-api-tesing import luigiclass InputFile(luigi.Task):"""A task wrapping a Target """input_file = luigi.Parameter()def output(self):"""Return the target for this task"""return luigi.LocalTarget(self.input_file)class WordCount(luigi.Task):"""A task that counts the number of words in a file"""input_file = luigi.Parameter()output_file = luigi.Parameter(default='/tmp/wordcount')def requires(self):"""The task's dependencies:"""return InputFile(self.input_file)def output(self):"""The task's output"""return luigi.LocalTarget(self.output_file)def run(self):"""The task's logic"""count = {}ifp = self.input().open('r')for line in ifp:for word in line.strip().split():count[word] = count.get(word, 0) + 1ofp = self.output().open('w')for k, v in count.items():ofp.write('{}\t{}\n'.format(k, v))ofp.close()if __name__ == '__main__':luigi.run()

執(zhí)行

$ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete INFO: Informed scheduler that task WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2 has status PENDING INFO: Informed scheduler that task InputFile__home_hduser__in_0eced493f7 has status DONE INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2 has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary =====Scheduled 2 tasks of which: * 1 complete ones were encountered:- 1 InputFile(input_file=/home/hduser_/input2.txt) * 1 ran successfully:- 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)This progress looks :) because there were no failed tasks or missing dependencies===== Luigi Execution Summary =====hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt jack 2 be 2 nimble 1 quick 1

總結(jié)

以上是生活随笔為你收集整理的[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。