日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

發布時間:2024/9/27 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 模拟MapReduce编程的程序案例(用于统计文本中单词出现频率) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本案例要實現的目標:

1、模擬修改配置,通過發指令的方式統計一個文件中出現的單詞的字數。

案例代碼結構如下:


在整個案例中需要有以下幾類文件:

A:worker服務端,用于類似Mapreduce接收jar,接收配置文件,執行業務邏輯

B:程序客戶端、用于組裝配置文件、發送業務執行的命令(聽過socket發送jarfile、jobconf、和job2run的命令)

代碼結構,每個包和代碼作用介紹

cn.toto.bigdata.mymr.task

TaskProcessor

核心的主體執行程序

?

ProcessLogic

定義客戶端調用必須實現的方法,相當于WebService中的接口規范

cn.toto.bigdata.mymr.io

InputFormat

封裝讀文件的組件(接口用途)

?

DefaultInputFormat

封裝讀文件的組件的實現類

?

OutPutFormat

封裝寫文件的組件(接口用途)

?

DefaultOutPutFormat

封裝寫文件的組件的實現

cn.toto.bigdata.mymr.common

Constants

常量定義

?

Context

應用上下文,用于存儲計算單詞出現字數的次數的中間變量

cn.toto.bigdata.mymr.userapp

UserLogic

客戶端對ProcessLogic規范的實現

?

UserApp

客戶端主入口程序

cn.toto.bigdata.mymr.scheduler

Runner

客戶端UserApp執行命令是依賴的Runner類,通過這里面的Socket發送命令。

?

WorkerClient

客戶端執行時需要用到的client相關的代碼

?

WorkerServer

UserApp執行時,需要提前啟動的服務端

?

WorkerRunnable

服務端執行的相關邏輯

?

運行條件:

1、將mapreduce-my-demo導出成test.jar放置在E:/test.jar下。

2、需要有用于統計用的文本文件a.txt,文件在E:\a.txt

內容截圖類似:

假設a.txt內容為:

The true

nobility is

in being

superior to

your previous

self guess

No great

discovery

was ever

made without

a bold

Knowledge will

give you

power but

character respect

The sun

is just

rising in

the morning

of another

day I

I figure

life is

a gift

and I

don’t intend

on wasting

3、首先運行:WorkerServer,相當于是啟動服務端的代碼

4、再次運行:UserApp,相當于是客戶端

5、最終的統計結果將顯示在:E:/out.txt中。統計結果如下:

nobility???? 1

but?? 1

gift?? 1

wasting??? 1

rising??????? 1

don't???????? 1

another??? 1

I??????? 3

your 1

Knowledge?????? 1

sun?? 1

without??? 1

life??? 1

The? 2

character 1

and? 1

of????? 1

power?????? 1

just? 1

day?? 1

you?? 1

on???? 1

No??? 1

a?????? 2

give? 1

figure??????? 1

previous?? 1

in????? 2

will?? 1

made??????? 1

was? 1

is????? 3

being??????? 1

bold 1

great??????? 1

respect??? 1

morning?? 1

the?? 1

ever 1

superior?? 1

guess??????? 1

discovery 1

true 1

self?? 1

to???? 1

intend?????? 1

?

6、最終的日志將存儲在:E:/task/task.log,最終的配置和工作用的jar也將會生成到這個目錄下面,效果如下:

其中job.conf的內容為:

生成的task.log效果如下:


?

接著:具體的代碼實現如下:

cn.toto.bigdata.mymr.task

TaskProcessor

核心的主體執行程序

?

ProcessLogic

定義客戶端調用必須實現的方法,相當于WebService中的接口規范

TaskProcessor代碼如下

package cn.toto.bigdata.mymr.task;

?

import java.util.HashMap;

import java.util.logging.FileHandler;

import java.util.logging.Level;

import java.util.logging.Logger;

?

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.common.Context;

import cn.toto.bigdata.mymr.io.InputFormat;

import cn.toto.bigdata.mymr.io.OutPutFormat;

?

/**

?* 1、核心的主體執行程序

?* 這里是任務執行者

?*/

public class TaskProcessor {

?

?? public static void main(String[] args) throws Exception {

????? // 加載用戶指定的所有配置參數到上下文對象中,同時讀取配置文件

????? Context context = new Context();

????? //獲取上下文中的配置文件

????? HashMap<String, String>? conf = context.getConfiguration();

?????

????? //通過打印日志的方式查看程序運行的結果

????? Logger logger = Logger.getLogger("TaskProcessor");

????? //設置日志的輸出級別是INFO級別

????? logger.setLevel(Level.INFO);

????? FileHandler fileHandler = new FileHandler("E:/task/task.log");

????? fileHandler.setLevel(Level.INFO);

????? logger.addHandler(fileHandler);

????? logger.info("context:" + context);

????? logger.info("conf:" + conf);

?????

????? //初始化文件讀取組件

????? //從配置文件中獲取用于讀取的組件的class信息

????? Class<?> forName = Class.forName(conf.get(Constants.INPUT_FORMAT));

????? InputFormat inputFormat = (InputFormat) forName.newInstance();

????? inputFormat.init(context);

?? ??

????? //inputFormat組件讀數據,并調用用戶邏輯

????? Class<?> forName2 = Class.forName(conf.get(Constants.USER_LOGIC));

????? ProcessLogic userLogic = (ProcessLogic) forName2.newInstance();

????? //對每一行調用用戶邏輯,并通過context將用戶調用結果存儲內部緩存

????? while(inputFormat.hasNext()) {

???????? Integer key = inputFormat.nextKey();

???????? String value = inputFormat.nextValue();

???????? userLogic.process(key, value, context);

????? }

????? userLogic.cleanUp(context);

?????

????? //替用戶輸出結果

????? Class<?> forName3 = Class.forName(conf.get(Constants.OUTPUT_FORMAT));

????? OutPutFormat outputFormat = (OutPutFormat) forName3.newInstance();

????? outputFormat.write(context);

?? }

}

?

?

ProcessLogic代碼如下:

package cn.toto.bigdata.mymr.task;

?

import cn.toto.bigdata.mymr.common.Context;

?

/**

?* 1、規定的業務邏輯編寫規范

?* process() ? cleanUp都沒有寫實現,這里的實現在客戶端

?*/

public abstract class ProcessLogic {

?

?? /**

?? ?* 這里的context存儲處理后的結果值

?? ?* @param key????????? :行號

?? ?* @param value??????? :所在行的一行內容

?? ?* @param context????? :應用上下文的內容

?? ?*/

?? public abstract void process(Integer key,String value,Context context);

??

?? /**

?? ?* 通過CleanUp輸出處理后的結果

?? ?*/

?? public void cleanUp(Context context){}

}

?

cn.toto.bigdata.mymr.io

InputFormat

封裝讀文件的組件(接口用途)

?

DefaultInputFormat

封裝讀文件的組件的實現類

?

OutPutFormat

封裝寫文件的組件(接口用途)

?

DefaultOutPutFormat

封裝寫文件的組件的實現

package cn.toto.bigdata.mymr.io;

?

import cn.toto.bigdata.mymr.common.Context;

?

public abstract class InputFormat {

???

?? /**

?? ?* 獲取下一行要讀的行的位置

?? ?*/

?? public abstract int nextKey();

?

?? /**

?? ?* 獲取從文件中讀取的到的行的信息

?? ?*/

?? public abstract String nextValue();

?

?? /**

?? ?* 從文件中讀取到一行信息

?? ?*/

?? public abstract String readLine() throws Exception;

??

?? /**

?? ?* 判斷是否還可以讀取到下一行的內容

?? ?*/

?? public abstract boolean hasNext() throws Exception;

??

?? /**

?? ?* 初始化要讀取的文件的路徑和文件流

?? ?*/

?? public abstract void init(Context context) throws Exception;

}

package cn.toto.bigdata.mymr.io;

?

import java.io.BufferedReader;

import java.io.FileReader;

?

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.common.Context;

?

/**

?* 這里是默認的讀取的實現類

?*/

public class DefaultInputFormat extends InputFormat{

?? //這里表示要讀取的文件的路徑

?? private String inputPath;

?? private BufferedReader br = null;

?? //這里的key是指文本中類似讀取到的指針的偏移量,是行號的偏移量

?? private int key;

?? //這里的value是指一行中的數據

?? private String value;

?? //默認讀取的行是第0

?? private int lineNumber = 0;

??

?? @Override

?? public void init(Context context) throws Exception {

????? //獲取要讀的文件的路徑

?? ??? this.inputPath = context.getConfiguration().get(Constants.INPUT_PATH);

?? ??? //開始初始化輸入流,只不過,這個流是從文件中獲取的

?? ??? this.br = new BufferedReader(new FileReader(inputPath));

?? }

??

?? @Override

?? public int nextKey() {

????? return this.key;

?? }

?

?? @Override

?? public String nextValue() {

????? return this.value;

?? }

??

?? @Override

?? public boolean hasNext() throws Exception {

????? String line = null;

????? line = readLine();

?????

????? //數據讀取完成之后行號加一

????? this.key = lineNumber++;

????? this.value = line;

?????

????? return null != line;

?? }

?

?? /**

?? ?* 讀取一行數據

?? ?*/

?? @Override

?? public String readLine() throws Exception {

????? String line = br.readLine();

????? //如果讀取到空了之后,將BufferedReader的值變成空

????? if (line == null) {

???????? br.close();

????? }

????? return line;

?? }

}

package cn.toto.bigdata.mymr.io;

?

import cn.toto.bigdata.mymr.common.Context;

?

/**

?* 用于輸出結果的類

?*/

public abstract class OutPutFormat {

?

?? /**

?? ?* 將結果寫入文件中

?? ?*/

?? public abstract void write(Context context) throws Exception;

?

?? /**

?? ?* 關閉流

?? ?*/

??? public abstract void cleanUp() throws Exception;

}

?

package cn.toto.bigdata.mymr.io;

?

import java.io.BufferedWriter;

import java.io.FileWriter;

import java.util.HashMap;

import java.util.Set;

import java.util.Map.Entry;

?

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.common.Context;

?

public class DefaultOutPutFormat extends OutPutFormat{

??? BufferedWriter bw = null;

????????

???????? @Override

???????? public void write(Context context) throws Exception {

???????? ??? String outputPath = context.getConfiguration().get(Constants.OUTPUT_PATH);

???????? ??? HashMap<String, Integer> KVBuffer = context.getKVBuffer();

???????? ??? this.bw = new BufferedWriter(new FileWriter(outputPath));

???????? ??? Set<Entry<String, Integer>> entrySet = KVBuffer.entrySet();

???????? ??? for (Entry<String, Integer> entry : entrySet) {

??????????????????????????? bw.write(entry.getKey() + "\t" + entry.getValue() + "\r");

?????????????????? }

???????? ??? bw.flush();

???????? }

?

???????? @Override

???????? public void cleanUp() throws Exception {

?????????????????? bw.close();

???????? }

?

}

cn.toto.bigdata.mymr.common

Constants

常量定義

?

Context

應用上下文,用于存儲計算單詞出現字數的次數的中間變量

package cn.toto.bigdata.mymr.common;

?

public class Constants {

?

?? public static final String JAR_PATH = "jar.path";

??

?? public static final String JAR_FILE = "job.jar";

??

?? public static final String WORKER_HOST = "worker.host";

??

?? public static final String WORKER_PORT = "worker.port";

??

?? public static final String CONF_FILE = "job.conf";

??

?? public static final String INPUT_FORMAT = "input.format.class";

??

?? public static final String OUTPUT_FORMAT = "output.format.class";

??

?? public static final String INPUT_PATH = "input.path";

??

?? public static final String OUTPUT_PATH = "output.path";

??

?? public static final String TASK_PROCESSOR = "cn.toto.bigdata.mymr.task.TaskProcessor";

?

?? public static final String USER_LOGIC = "user.logic.class";

??

?? public static final String TASK_WORK_DIR = "E:/task";

??

??

}

package cn.toto.bigdata.mymr.common;

?

import java.io.File;

import java.io.FileInputStream;

import java.io.ObjectInputStream;

import java.util.HashMap;

?

/**

?* 應用上下文,通過這個內容獲取配置文件

?* 通過這個上下文最終輸出結果

?*/

public class Context {

???????? private HashMap<String, Integer> KVBuffer = new HashMap<String, Integer>();

???????? private HashMap<String, String> conf;

????????

???????? @SuppressWarnings("unchecked")

???????? public Context() throws Exception {

?????????????????? //加載配置參數

?????????????????? File file = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE);

?????????????????? if (file.exists()) {

??????????????????????????? @SuppressWarnings("resource")

??????????????????????????? ObjectInputStream oi = new ObjectInputStream(new FileInputStream(file));

??????????????????????????? this.conf = (HashMap<String, String>) oi.readObject();

?????????????????? } else {

??????????????????????????? // throw new RuntimeException("read conf failed ....");

?????????????????? }

???????? }

????????

???????? /**

???????? ?* 通過這種變量最后輸出結果

???????? ?*/

???????? public void write(String k, Integer v) {

?????????????????? KVBuffer.put(k, v);

???????? }

?

???????? public HashMap<String, Integer> getKVBuffer() {

?????????????????? return KVBuffer;

???????? }

?

???????? public void setKVBuffer(HashMap<String, Integer> tmpKV) {

?????????????????? this.KVBuffer = tmpKV;

???????? }

?

???????? /**

???????? ?* 獲取配置文件中的信息

???????? ?*/

???????? public HashMap<String, String> getConfiguration() {

?????????????????? return conf;

???????? }

?

???????? /**

???????? ?* 在Context()構造函數里面已經有了conf的配置,這里再次傳入說明配置可以讓用戶手動指定

???????? ?*/

???????? public void setConfiguration(HashMap<String, String> configuration) {

?????????????????? this.conf = configuration;

???????? }

}

cn.toto.bigdata.mymr.userapp

UserLogic

客戶端對ProcessLogic規范的實現

?

UserApp

客戶端主入口程序

package cn.toto.bigdata.mymr.userapp;

?

import java.util.HashMap;

import java.util.Set;

import java.util.Map.Entry;

?

import cn.toto.bigdata.mymr.common.Context;

import cn.toto.bigdata.mymr.task.ProcessLogic;

?

public class UserLogic extends ProcessLogic {

?

???????? private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();

????????

???????? @Override

???????? public void process(Integer key, String value, Context context) {

?????????????????? String [] words = value.split(" ");

?????????????????? for(String word : words) {

??????????????????????????? Integer count = wordCount.get(word);

??????????????????????????? if (count == null) {

???????????????????????????????????? wordCount.put(word, 1);

??????????????????????????? } else {

???????????????????????????????????? wordCount.put(word, count + 1);

??????????????????????????? }

?????????????????? }

???????? }

?

???????? public void cleanUp(Context context) {

?????????????????? Set<Entry<String, Integer>> entrySet = wordCount.entrySet();

?????????????????? for(Entry<String, Integer> entry : entrySet) {

??????????????????????????? context.write(entry.getKey(), entry.getValue());

?????????????????? }

???????? }

}

package cn.toto.bigdata.mymr.userapp;

?

import java.util.HashMap;

?

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.scheduler.Runner;

?

public class UserApp {

???

???????? public static void main(String[] args) throws Exception {

?????????????????? HashMap<String, String> conf = new HashMap<String,String>();

?????????????????? conf.put(Constants.INPUT_PATH, "E:/a.txt");

?????????????????? conf.put(Constants.OUTPUT_PATH, "E:/out.txt");

?????????????????? conf.put(Constants.INPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultInputFormat");

?????????????????? conf.put(Constants.OUTPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultOutPutFormat");

?????????????????? conf.put(Constants.JAR_PATH, "E:/test.jar");

?????????????????? conf.put(Constants.WORKER_HOST, "localhost");

?????????????????? conf.put(Constants.WORKER_PORT, "9889");

?????????????????? conf.put(Constants.USER_LOGIC, "cn.toto.bigdata.mymr.userapp.UserLogic");

??????????????????

?????????????????? Runner runner = new Runner(conf);

?????????????????? runner.submit("localhost", 9889);

???????? }

}

cn.toto.bigdata.mymr.scheduler

Runner

客戶端UserApp執行命令是依賴的Runner類,通過這里面的Socket發送命令。

?

WorkerClient

客戶端執行時需要用到的client相關的代碼

?

WorkerServer

UserApp執行時,需要提前啟動的服務端

?

WorkerRunnable

服務端執行的相關邏輯

package cn.toto.bigdata.mymr.scheduler;

?

import java.io.FileOutputStream;

import java.io.ObjectOutputStream;

import java.util.HashMap;

?

import cn.toto.bigdata.mymr.common.Constants;

?

public class Runner {

??? private HashMap<String, String> conf;

????????

??? public Runner(HashMap<String, String> conf) {

??? ???????? this.conf = conf;

??? }

???

??? public void submit(String host,int port) throws Exception {

??? ???????? ObjectOutputStream jobConfStream = new ObjectOutputStream(new FileOutputStream(Constants.CONF_FILE));

?????????????????? jobConfStream.writeObject(conf);

??????????????????

?????????????????? WorkerClient workerClient = new WorkerClient(conf);

?????????????????? workerClient.submit();

??? }

}

package cn.toto.bigdata.mymr.scheduler;

?

import java.io.FileInputStream;

import java.io.OutputStream;

import java.net.Socket;

import java.util.HashMap;

?

import cn.toto.bigdata.mymr.common.Constants;

?

public class WorkerClient {

???

???????? private HashMap<String, String> conf;

???????? Socket socket = null;

???????? OutputStream so = null;

????????

???????? public WorkerClient(HashMap<String, String> conf) {

?????????????????? this.conf = conf;

???????? }

????????

???????? public void submit() throws Exception {

?????????????????? socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT)));

?????????????????? so = socket.getOutputStream();

?

?????????????????? String jarPath = conf.get(Constants.JAR_PATH);

?

?????????????????? // 發送jar包

?????????????????? byte[] buff = new byte[4096];

?????????????????? FileInputStream jarIns = new FileInputStream(jarPath);

?????????????????? so.write("jarfile".getBytes());

?????????????????? int read = 0;

?????????????????? while ((read=jarIns.read(buff)) != -1) {

??????????????????????????? so.write(buff,0,read);

?????????????????? }

?????????????????? jarIns.close();

?????????????????? so.close();

?????????????????? socket.close();

??????????????????

?????????????????? // 發送job.conf文件

?????????????????? socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT)));

?????????????????? so = socket.getOutputStream();

??????????????????

?????????????????? FileInputStream confIns = new FileInputStream(Constants.CONF_FILE);

?????????????????? so.write("jobconf".getBytes());

?????????????????? while ((read = confIns.read(buff)) != -1) {

??????????????????????????? so.write(buff,0,read);

?????????????????? }

?????????????????? confIns.close();

?????????????????? so.close();

?????????????????? socket.close();

?

?????????????????? // 發送啟動命令

?????????????????? socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT)));

?????????????????? so = socket.getOutputStream();

?????????????????? so.write("job2run".getBytes());

?????????????????? String shell = "java -cp E:/test.jar cn.toto.bigdata.mymr.task.TaskProcessor";

?????????????????? so.write(shell.getBytes());

?????????????????? so.close();

?????????????????? socket.close();

???????? }

}

package cn.toto.bigdata.mymr.scheduler;

?

import java.net.ServerSocket;

import java.net.Socket;

?

public class WorkerServer {

?

???????? public static void main(String[] args) throws Exception {

?????????????????? ServerSocket ssc = new ServerSocket(9889);

?????????????????? System.out.println("Worker服務器啟動-->9889");

?????????????????? while (true) {

??????????????????????????? Socket accept = ssc.accept();

??????????????????????????? new Thread(new WorkerRunnable(accept)).start();

?????????????????? }

???????? }

}

package cn.toto.bigdata.mymr.scheduler;

?

import java.io.BufferedReader;

import java.io.File;

import java.io.FileOutputStream;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.net.Socket;

?

import cn.toto.bigdata.mymr.common.Constants;

?

public class WorkerRunnable implements Runnable {

???????? Socket socket;

???????? InputStream in = null;

???????? volatile long confSize = 0;

???????? volatile long jarSize = 0;

?

???????? public WorkerRunnable(Socket socket) {

?????????????????? this.socket = socket;

???????? }

?

???????? @Override

???????? public void run() {

?????????????????? try {

??????????????????????????? this.in = socket.getInputStream();

??????????????????????????? byte[] protocal = new byte[7];

??????????????????????????? int read = in.read(protocal, 0, 7);

??????????????????????????? if (read < 7) {

???????????????????????????????????? System.out.println("客戶端請求不符合協議規范......");

???????????????????????????????????? return;

??????????????????????????? }

??????????????????????????? String command = new String(protocal);

??????????????????????????? switch (command) {

??????????????????????????? case "jarfile":

???????????????????????????????????? receiveJarFile();

???????????????????????????????????? break;

??????????????????????????? case "jobconf":

???????????????????????????????????? receiveConfFile();

???????????????????????????????????? break;

??????????????????????????? case "job2run":

???????????????????????????????????? runJob();

???????????????????????????????????? break;

??????????????????????????? default:

???????????????????????????????????? System.out.println("客戶端請求不符合協議規范.....");

???????????????????????????????????? socket.close();

???????????????????????????????????? break;

??????????????????????????? }

?

?????????????????? } catch (Exception e) {

??????????????????????????? e.printStackTrace();

?????????????????? }

?

???????? }

?

???????? private void receiveConfFile() throws Exception {

?????????????????? System.out.println("開始接收conf文件");

?????????????????? FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE);

?????????????????? byte[] buff = new byte[4096];

?????????????????? int read = 0;

?????????????????? while ((read = in.read(buff)) != -1) {

??????????????????????????? confSize += read;

??????????????????????????? fo.write(buff, 0, read);

?????????????????? }

?????????????????? fo.flush();

?????????????????? fo.close();

?????????????????? in.close();

?????????????????? socket.close();

?

???????? }

?

???????? private void receiveJarFile() throws Exception {

?????????????????? System.out.println("開始接收jar文件");

?????????????????? FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE);

?????????????????? byte[] buff = new byte[4096];

?????????????????? int read = 0;

?????????????????? while ((read = in.read(buff)) != -1) {

??????????????????????????? jarSize += read;

??????????????????????????? fo.write(buff, 0, read);

?????????????????? }

?????????????????? fo.flush();

?????????????????? fo.close();

?????????????????? in.close();

?????????????????? socket.close();

?

???????? }

?

???????? private void runJob() throws Exception {

?

?????????????????? byte[] buff = new byte[4096];

?????????????????? int read = in.read(buff);

?????????????????? String shell = new String(buff, 0, read);

?????????????????? System.out.println("接收到啟動命令......." + shell);

?????????????????? in.close();

?????????????????? socket.close();

?????????????????? Thread.sleep(500);

?

?????????????????? File jarFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE);

?????????????????? File confFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE);

?????????????????? System.out.println("jarfile 存在?" + jarFile.exists());

?????????????????? System.out.println("confFile 存在?" + confFile.exists());

?????????????????? System.out.println("jarfile可讀?" + jarFile.canRead());

?????????????????? System.out.println("jarfile可寫?" + jarFile.canWrite());

?????????????????? System.out.println("confFile可讀?" + confFile.canRead());

?????????????????? System.out.println("confFile可寫?" + confFile.canWrite());

?

?????????????????? System.out.println("jarFile.length():" + jarFile.length());

?????????????????? System.out.println("confFile.length():" + confFile.length());

?

?????????????????? /*if (jarFile.length() == jarSize && confFile.length() == confSize) {

??????????????????????????? System.out.println("jar 和 conf 文件已經準備就緒......");

?????????????????? }*/

?????????????????? System.out.println("開始啟動數據處理TaskProcessor......");

?

?????????????????? Process exec = Runtime.getRuntime().exec(shell);

?????????????????? int waitFor = exec.waitFor();

??????????????????

?????????????????? InputStream errStream = exec.getErrorStream();

?????????????????? BufferedReader errReader = new BufferedReader(new InputStreamReader(errStream));

?????????????????? String inLine = null;

?????????????????? /*

?????????????????? ?* InputStream stdStream = exec.getInputStream(); BufferedReader

?????????????????? ?* stdReader = new BufferedReader(new InputStreamReader(stdStream));

?????????????????? ?* while ((inLine = stdReader.readLine()) != null) {

?????????????????? ?* System.out.println(inLine); }

?????????????????? ?*/

?????????????????? while ((inLine = errReader.readLine()) != null) {

??????????????????????????? System.out.println(inLine);

?????????????????? }

??????????????????

?????????????????? if (waitFor == 0) {

??????????????????????????? System.out.println("task成功運行完畢.....");

?????????????????? } else {

??????????????????????????? System.out.println("task異常退出......");

?????????????????? }

?

???????? }

?

}

?

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。