日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

hadoop下实现kmeans算法——一个mapreduce的实现方法

發(fā)布時(shí)間:2025/3/21 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop下实现kmeans算法——一个mapreduce的实现方法 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

寫(xiě)mapreduce程序?qū)崿F(xiàn)kmeans算法,我們的思路可能是這樣的

1.?用一個(gè)全局變量存放上一次迭代后的質(zhì)心

2. map里,計(jì)算每個(gè)質(zhì)心與樣本之間的距離,得到與樣本距離最短的質(zhì)心,以這個(gè)質(zhì)心作為key,樣本作為value,輸出

3. reduce里,輸入的key是質(zhì)心,value是其他的樣本,這時(shí)重新計(jì)算聚類(lèi)中心,將聚類(lèi)中心put到一個(gè)全部變量t中。

4. 在main里比較前一次的質(zhì)心和本次的質(zhì)心是否發(fā)生變化,如果變化,則繼續(xù)迭代,否則退出。

本文的思路基本上是按照上面的步驟來(lái)做的,只不過(guò)有幾個(gè)問(wèn)題需要解決

1.?Hadoop是不存在自定義的全局變量的,所以上面定義一個(gè)全局變量存放質(zhì)心的想法是實(shí)現(xiàn)不了的,所以一個(gè)替代的思路是將質(zhì)心存放在文件中

2. 存放質(zhì)心的文件在什么地方讀取,如果在map中讀取,那么可以肯定我們是不能用一個(gè)mapreduce實(shí)現(xiàn)一次迭代,所以我們選擇在main函數(shù)里讀取質(zhì)心,然后將質(zhì)心set到configuration中,configuration在map和reduce都是可讀

3. 如何比較質(zhì)心是否發(fā)生變化,是在main里比較么,讀取本次質(zhì)心和上一次質(zhì)心的文件然后進(jìn)行比較,這種方法是可以實(shí)現(xiàn)的,但是顯得不夠高富帥,這個(gè)時(shí)候我們用到了自定義的counter,counter是全局變量,在map和reduce中可讀可寫(xiě),在上面的思路中,我們看到reduce是有上次迭代的質(zhì)心和剛剛計(jì)算出來(lái)的質(zhì)心的,所以直接在reduce中進(jìn)行比較就完全可以,如果沒(méi)發(fā)生變化,counter加1。只要在main里比較獲取counter的值就行了。

梳理一下,具體的步驟如下

1. main函數(shù)讀取質(zhì)心文件

2. 將質(zhì)心的字符串放到configuration中

3. 在mapper類(lèi)重寫(xiě)setup方法,獲取到configuration的質(zhì)心內(nèi)容,解析成二維數(shù)組的形式,代表質(zhì)心

4. mapper類(lèi)中的map方法讀取樣本文件,跟所有的質(zhì)心比較,得出每個(gè)樣本跟哪個(gè)質(zhì)心最近,然后輸出<質(zhì)心,樣本>

5. reducer類(lèi)中重新計(jì)算質(zhì)心,如果重新計(jì)算出來(lái)的質(zhì)心跟進(jìn)來(lái)時(shí)的質(zhì)心一致,那么自定義的counter加1

6. main中獲取counter的值,看是否等于質(zhì)心,如果不相等,那么繼續(xù)迭代,否在退出

具體的實(shí)現(xiàn)如下

1. pom依賴

這個(gè)要跟集群的一致,因?yàn)槿绻灰恢略谟?jì)算其他問(wèn)題的時(shí)候沒(méi)有問(wèn)題,但是在使用counter的時(shí)候會(huì)出現(xiàn)問(wèn)題

Java.lang.IncompatibleClassChangeError:?Found?interface?org.apache.hadoop.mapreduce.Counter,?but?class?was?expected

原因是:其實(shí)從2.0開(kāi)始,org.apache.hadoop.mapreduce.Counter從1.0版本的class改為interface,可以看一下你導(dǎo)入的這個(gè)類(lèi)是class還是interface,如果是class那么就是導(dǎo)包導(dǎo)入的不對(duì),需要修改

2. 樣本

實(shí)例樣本如下

[plain]?view plaincopy
  • 1,1??
  • 2,2??
  • 3,3??
  • -3,-3??
  • -4,-4??
  • -5,-5??
  • 3. 質(zhì)心

    這個(gè)質(zhì)心是從樣本中隨機(jī)找的

    [plain]?view plaincopy
  • 1,1??
  • 2,2??
  • 4. 代碼實(shí)現(xiàn)

    首先定義一個(gè)Center類(lèi),這個(gè)類(lèi)主要存放了質(zhì)心的個(gè)數(shù)k,還有兩個(gè)從hdfs上讀取質(zhì)心文件的方法,一個(gè)用來(lái)讀取初始的質(zhì)心,這個(gè)實(shí)在文件中,還有一個(gè)是用來(lái)讀取每次迭代后的質(zhì)心文件夾,這個(gè)是在文件夾中的,代碼如下

    Center類(lèi)

    [java]?view plaincopy
  • public?class?Center?{??
  • ??
  • ????protected?static?int?k?=?2;?????//質(zhì)心的個(gè)數(shù)??
  • ??????
  • ????/**?
  • ?????*?從初始的質(zhì)心文件中加載質(zhì)心,并返回字符串,質(zhì)心之間用tab分割?
  • ?????*?@param?path?
  • ?????*?@return?
  • ?????*?@throws?IOException?
  • ?????*/??
  • ????public?String?loadInitCenter(Path?path)?throws?IOException?{??
  • ??????????
  • ????????StringBuffer?sb?=?new?StringBuffer();??
  • ??????????
  • ????????Configuration?conf?=?new?Configuration();??
  • ????????FileSystem?hdfs?=?FileSystem.get(conf);??
  • ????????FSDataInputStream?dis?=?hdfs.open(path);??
  • ????????LineReader?in?=?new?LineReader(dis,?conf);??
  • ????????Text?line?=?new?Text();??
  • ????????while(in.readLine(line)?>?0)?{??
  • ????????????sb.append(line.toString().trim());??
  • ????????????sb.append("\t");??
  • ????????}??
  • ??????????
  • ????????return?sb.toString().trim();??
  • ????}??
  • ??????
  • ????/**?
  • ?????*?從每次迭代的質(zhì)心文件中讀取質(zhì)心,并返回字符串?
  • ?????*?@param?path?
  • ?????*?@return?
  • ?????*?@throws?IOException?
  • ?????*/??
  • ????public?String?loadCenter(Path?path)?throws?IOException?{??
  • ??????????
  • ????????StringBuffer?sb?=?new?StringBuffer();??
  • ??????????
  • ????????Configuration?conf?=?new?Configuration();??
  • ????????FileSystem?hdfs?=?FileSystem.get(conf);??
  • ????????FileStatus[]?files?=?hdfs.listStatus(path);??
  • ??????????
  • ????????for(int?i?=?0;?i?<?files.length;?i++)?{??
  • ??????????????
  • ????????????Path?filePath?=?files[i].getPath();??
  • ????????????if(!filePath.getName().contains("part"))?continue;??
  • ????????????FSDataInputStream?dis?=?hdfs.open(filePath);??
  • ????????????LineReader?in?=?new?LineReader(dis,?conf);??
  • ????????????Text?line?=?new?Text();??
  • ????????????while(in.readLine(line)?>?0)?{??
  • ????????????????sb.append(line.toString().trim());??
  • ????????????????sb.append("\t");??
  • ????????????}??
  • ????????}??
  • ??????????
  • ????????return?sb.toString().trim();??
  • ????}??
  • }??
  • KmeansMR類(lèi)

    [java]?view plaincopy
  • public?class?KmeansMR?{??
  • ??
  • ????private?static?String?FLAG?=?"KCLUSTER";??
  • ??????????
  • ????public?static?class?TokenizerMapper???
  • ????extends?Mapper<Object,?Text,?Text,?Text>{??
  • ??????????
  • ????????double[][]?centers?=?new?double[Center.k][];??
  • ????????String[]?centerstrArray?=?null;??
  • ??????????
  • ????????@Override??
  • ????????public?void?setup(Context?context)?{??
  • ??????????????
  • ????????????//將放在context中的聚類(lèi)中心轉(zhuǎn)換為數(shù)組的形式,方便使用??
  • ????????????String?kmeansS?=?context.getConfiguration().get(FLAG);??
  • ????????????centerstrArray?=?kmeansS.split("\t");??
  • ????????????for(int?i?=?0;?i?<?centerstrArray.length;?i++)?{??
  • ????????????????String[]?segs?=?centerstrArray[i].split(",");??
  • ????????????????centers[i]?=?new?double[segs.length];??
  • ????????????????for(int?j?=?0;?j?<?segs.length;?j++)?{??
  • ????????????????????centers[i][j]?=?Double.parseDouble(segs[j]);??
  • ????????????????}??
  • ????????????}??
  • ????????}??
  • ??????????
  • ????????public?void?map(Object?key,?Text?value,?Context?context??
  • ?????????????????)?throws?IOException,?InterruptedException?{??
  • ??????????????
  • ????????????String?line?=?value.toString();??
  • ????????????String[]?segs?=?line.split(",");??
  • ????????????double[]?sample?=?new?double[segs.length];??
  • ????????????for(int?i?=?0;?i?<?segs.length;?i++)?{??
  • ????????????????sample[i]?=?Float.parseFloat(segs[i]);??
  • ????????????}??
  • ????????????//求得距離最近的質(zhì)心??
  • ????????????double?min?=?Double.MAX_VALUE;??
  • ????????????int?index?=?0;??
  • ????????????for(int?i?=?0;?i?<?centers.length;?i++)?{??
  • ????????????????double?dis?=?distance(centers[i],?sample);??
  • ????????????????if(dis?<?min)?{??
  • ????????????????????min?=?dis;??
  • ????????????????????index?=?i;??
  • ????????????????}??
  • ????????????}??
  • ??????????????
  • ????????????context.write(new?Text(centerstrArray[index]),?new?Text(line));??
  • ????????}??
  • ????}??
  • ??
  • ????public?static?class?IntSumReducer???
  • ????extends?Reducer<Text,Text,NullWritable,Text>?{??
  • ??
  • ????????Counter?counter?=?null;??
  • ??????????
  • ????????public?void?reduce(Text?key,?Iterable<Text>?values,???
  • ????????????????????Context?context??
  • ????????????????????)?throws?IOException,?InterruptedException?{??
  • ??????????????
  • ????????????double[]?sum?=?new?double[Center.k];??
  • ????????????int?size?=?0;??
  • ????????????//計(jì)算對(duì)應(yīng)維度上值的加和,存放在sum數(shù)組中??
  • ????????????for(Text?text?:?values)?{??
  • ????????????????String[]?segs?=?text.toString().split(",");??
  • ????????????????for(int?i?=?0;?i?<?segs.length;?i++)?{??
  • ????????????????????sum[i]?+=?Double.parseDouble(segs[i]);??
  • ????????????????}??
  • ????????????????size?++;??
  • ????????????}??
  • ??????????????
  • ????????????//求sum數(shù)組中每個(gè)維度的平均值,也就是新的質(zhì)心??
  • ????????????StringBuffer?sb?=?new?StringBuffer();??
  • ????????????for(int?i?=?0;?i?<?sum.length;?i++)?{??
  • ????????????????sum[i]?/=?size;??
  • ????????????????sb.append(sum[i]);??
  • ????????????????sb.append(",");??
  • ????????????}??
  • ??????????????
  • ????????????/**判斷新的質(zhì)心跟老的質(zhì)心是否是一樣的*/??
  • ????????????boolean?flag?=?true;??
  • ????????????String[]?centerStrArray?=?key.toString().split(",");??
  • ????????????for(int?i?=?0;?i?<?centerStrArray.length;?i++)?{??
  • ????????????????if(Math.abs(Double.parseDouble(centerStrArray[i])?-?sum[i])?>?0.00000000001)?{??
  • ????????????????????flag?=?false;??
  • ????????????????????break;??
  • ????????????????}??
  • ????????????}??
  • ????????????//如果新的質(zhì)心跟老的質(zhì)心是一樣的,那么相應(yīng)的計(jì)數(shù)器加1??
  • ????????????if(flag)?{??
  • ????????????????counter?=?context.getCounter("myCounter",?"kmenasCounter");??
  • ????????????????counter.increment(1l);??
  • ????????????}??
  • ????????????context.write(null,?new?Text(sb.toString()));??
  • ????????}??
  • ????}??
  • ??
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ??
  • ????????Path?kMeansPath?=?new?Path("/dsap/middata/kmeans/kMeans");??//初始的質(zhì)心文件??
  • ????????Path?samplePath?=?new?Path("/dsap/middata/kmeans/sample");??//樣本文件??
  • ????????//加載聚類(lèi)中心文件??
  • ????????Center?center?=?new?Center();??
  • ????????String?centerString?=?center.loadInitCenter(kMeansPath);??
  • ??????????
  • ????????int?index?=?0;??//迭代的次數(shù)??
  • ????????while(index?<?5)?{??
  • ??????????????
  • ????????????Configuration?conf?=?new?Configuration();??
  • ????????????conf.set(FLAG,?centerString);???//將聚類(lèi)中心的字符串放到configuration中??
  • ??????????????
  • ????????????kMeansPath?=?new?Path("/dsap/middata/kmeans/kMeans"?+?index);???//本次迭代的輸出路徑,也是下一次質(zhì)心的讀取路徑??
  • ??????????????
  • ????????????/**判斷輸出路徑是否存在,如果存在,則刪除*/??
  • ????????????FileSystem?hdfs?=?FileSystem.get(conf);??
  • ????????????if(hdfs.exists(kMeansPath))?hdfs.delete(kMeansPath);??
  • ??
  • ????????????Job?job?=?new?Job(conf,?"kmeans"?+?index);???
  • ????????????job.setJarByClass(KmeansMR.class);??
  • ????????????job.setMapperClass(TokenizerMapper.class);??
  • ????????????job.setReducerClass(IntSumReducer.class);??
  • ????????????job.setOutputKeyClass(NullWritable.class);??
  • ????????????job.setOutputValueClass(Text.class);??
  • ????????????job.setMapOutputKeyClass(Text.class);??
  • ????????????job.setMapOutputValueClass(Text.class);??
  • ????????????FileInputFormat.addInputPath(job,?samplePath);??
  • ????????????FileOutputFormat.setOutputPath(job,?kMeansPath);??
  • ????????????job.waitForCompletion(true);??
  • ??????????????
  • ????????????/**獲取自定義counter的大小,如果等于質(zhì)心的大小,說(shuō)明質(zhì)心已經(jīng)不會(huì)發(fā)生變化了,則程序停止迭代*/??
  • ????????????long?counter?=?job.getCounters().getGroup("myCounter").findCounter("kmenasCounter").getValue();??
  • ????????????if(counter?==?Center.k)?System.exit(0);??
  • ????????????/**重新加載質(zhì)心*/??
  • ????????????center?=?new?Center();??
  • ????????????centerString?=?center.loadCenter(kMeansPath);??
  • ??????????????
  • ????????????index?++;??
  • ????????}??
  • ????????System.exit(0);??
  • ????}??
  • ??????
  • ????public?static?double?distance(double[]?a,?double[]?b)?{??
  • ??????????
  • ????????if(a?==?null?||?b?==?null?||?a.length?!=?b.length)?return?Double.MAX_VALUE;??
  • ????????double?dis?=?0;??
  • ????????for(int?i?=?0;?i?<?a.length;?i++)?{??
  • ????????????dis?+=?Math.pow(a[i]?-?b[i],?2);??
  • ????????}??
  • ????????return?Math.sqrt(dis);??
  • ????}??
  • }?????

  • 5. 結(jié)果

    產(chǎn)生了兩個(gè)文件夾,分別是第一次、第二次迭代后的聚類(lèi)中心


    最后的聚類(lèi)中心的內(nèi)容如下


    from:?http://blog.csdn.net/nwpuwyk/article/details/29564249?utm_source=tuicool&utm_medium=referral

    總結(jié)

    以上是生活随笔為你收集整理的hadoop下实现kmeans算法——一个mapreduce的实现方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。