生活随笔
收集整理的這篇文章主要介紹了
hadoop下实现kmeans算法——一个mapreduce的实现方法
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
寫(xiě)mapreduce程序?qū)崿F(xiàn)kmeans算法,我們的思路可能是這樣的
1.?用一個(gè)全局變量存放上一次迭代后的質(zhì)心
2. map里,計(jì)算每個(gè)質(zhì)心與樣本之間的距離,得到與樣本距離最短的質(zhì)心,以這個(gè)質(zhì)心作為key,樣本作為value,輸出
3. reduce里,輸入的key是質(zhì)心,value是其他的樣本,這時(shí)重新計(jì)算聚類(lèi)中心,將聚類(lèi)中心put到一個(gè)全部變量t中。
4. 在main里比較前一次的質(zhì)心和本次的質(zhì)心是否發(fā)生變化,如果變化,則繼續(xù)迭代,否則退出。
本文的思路基本上是按照上面的步驟來(lái)做的,只不過(guò)有幾個(gè)問(wèn)題需要解決
1.?Hadoop是不存在自定義的全局變量的,所以上面定義一個(gè)全局變量存放質(zhì)心的想法是實(shí)現(xiàn)不了的,所以一個(gè)替代的思路是將質(zhì)心存放在文件中
2. 存放質(zhì)心的文件在什么地方讀取,如果在map中讀取,那么可以肯定我們是不能用一個(gè)mapreduce實(shí)現(xiàn)一次迭代,所以我們選擇在main函數(shù)里讀取質(zhì)心,然后將質(zhì)心set到configuration中,configuration在map和reduce都是可讀
3. 如何比較質(zhì)心是否發(fā)生變化,是在main里比較么,讀取本次質(zhì)心和上一次質(zhì)心的文件然后進(jìn)行比較,這種方法是可以實(shí)現(xiàn)的,但是顯得不夠高富帥,這個(gè)時(shí)候我們用到了自定義的counter,counter是全局變量,在map和reduce中可讀可寫(xiě),在上面的思路中,我們看到reduce是有上次迭代的質(zhì)心和剛剛計(jì)算出來(lái)的質(zhì)心的,所以直接在reduce中進(jìn)行比較就完全可以,如果沒(méi)發(fā)生變化,counter加1。只要在main里比較獲取counter的值就行了。
梳理一下,具體的步驟如下
1. main函數(shù)讀取質(zhì)心文件
2. 將質(zhì)心的字符串放到configuration中
3. 在mapper類(lèi)重寫(xiě)setup方法,獲取到configuration的質(zhì)心內(nèi)容,解析成二維數(shù)組的形式,代表質(zhì)心
4. mapper類(lèi)中的map方法讀取樣本文件,跟所有的質(zhì)心比較,得出每個(gè)樣本跟哪個(gè)質(zhì)心最近,然后輸出<質(zhì)心,樣本>
5. reducer類(lèi)中重新計(jì)算質(zhì)心,如果重新計(jì)算出來(lái)的質(zhì)心跟進(jìn)來(lái)時(shí)的質(zhì)心一致,那么自定義的counter加1
6. main中獲取counter的值,看是否等于質(zhì)心,如果不相等,那么繼續(xù)迭代,否在退出
具體的實(shí)現(xiàn)如下
1. pom依賴
這個(gè)要跟集群的一致,因?yàn)槿绻灰恢略谟?jì)算其他問(wèn)題的時(shí)候沒(méi)有問(wèn)題,但是在使用counter的時(shí)候會(huì)出現(xiàn)問(wèn)題
Java.lang.IncompatibleClassChangeError:?Found?interface?org.apache.hadoop.mapreduce.Counter,?but?class?was?expected
原因是:其實(shí)從2.0開(kāi)始,org.apache.hadoop.mapreduce.Counter從1.0版本的class改為interface,可以看一下你導(dǎo)入的這個(gè)類(lèi)是class還是interface,如果是class那么就是導(dǎo)包導(dǎo)入的不對(duì),需要修改
2. 樣本
實(shí)例樣本如下
[plain]?view plaincopy
1,1?? 2,2?? 3,3?? -3,-3?? -4,-4?? -5,-5??
3. 質(zhì)心
這個(gè)質(zhì)心是從樣本中隨機(jī)找的
[plain]?view plaincopy
1,1?? 2,2??
4. 代碼實(shí)現(xiàn)
首先定義一個(gè)Center類(lèi),這個(gè)類(lèi)主要存放了質(zhì)心的個(gè)數(shù)k,還有兩個(gè)從hdfs上讀取質(zhì)心文件的方法,一個(gè)用來(lái)讀取初始的質(zhì)心,這個(gè)實(shí)在文件中,還有一個(gè)是用來(lái)讀取每次迭代后的質(zhì)心文件夾,這個(gè)是在文件夾中的,代碼如下
Center類(lèi)
[java]?view plaincopy
public?class?Center?{?? ?? ????protected?static?int?k?=?2;??????? ?????? ????? ? ? ? ? ?? ????public?String?loadInitCenter(Path?path)?throws?IOException?{?? ?????????? ????????StringBuffer?sb?=?new?StringBuffer();?? ?????????? ????????Configuration?conf?=?new?Configuration();?? ????????FileSystem?hdfs?=?FileSystem.get(conf);?? ????????FSDataInputStream?dis?=?hdfs.open(path);?? ????????LineReader?in?=?new?LineReader(dis,?conf);?? ????????Text?line?=?new?Text();?? ????????while(in.readLine(line)?>?0)?{?? ????????????sb.append(line.toString().trim());?? ????????????sb.append("\t");?? ????????}?? ?????????? ????????return?sb.toString().trim();?? ????}?? ?????? ????? ? ? ? ? ?? ????public?String?loadCenter(Path?path)?throws?IOException?{?? ?????????? ????????StringBuffer?sb?=?new?StringBuffer();?? ?????????? ????????Configuration?conf?=?new?Configuration();?? ????????FileSystem?hdfs?=?FileSystem.get(conf);?? ????????FileStatus[]?files?=?hdfs.listStatus(path);?? ?????????? ????????for(int?i?=?0;?i?<?files.length;?i++)?{?? ?????????????? ????????????Path?filePath?=?files[i].getPath();?? ????????????if(!filePath.getName().contains("part"))?continue;?? ????????????FSDataInputStream?dis?=?hdfs.open(filePath);?? ????????????LineReader?in?=?new?LineReader(dis,?conf);?? ????????????Text?line?=?new?Text();?? ????????????while(in.readLine(line)?>?0)?{?? ????????????????sb.append(line.toString().trim());?? ????????????????sb.append("\t");?? ????????????}?? ????????}?? ?????????? ????????return?sb.toString().trim();?? ????}?? }??
KmeansMR類(lèi)
[java]?view plaincopy
public?class?KmeansMR?{?? ?? ????private?static?String?FLAG?=?"KCLUSTER";?? ?????????? ????public?static?class?TokenizerMapper??? ????extends?Mapper<Object,?Text,?Text,?Text>{?? ?????????? ????????double[][]?centers?=?new?double[Center.k][];?? ????????String[]?centerstrArray?=?null;?? ?????????? ????????@Override?? ????????public?void?setup(Context?context)?{?? ?????????????? ?????????????? ????????????String?kmeansS?=?context.getConfiguration().get(FLAG);?? ????????????centerstrArray?=?kmeansS.split("\t");?? ????????????for(int?i?=?0;?i?<?centerstrArray.length;?i++)?{?? ????????????????String[]?segs?=?centerstrArray[i].split(",");?? ????????????????centers[i]?=?new?double[segs.length];?? ????????????????for(int?j?=?0;?j?<?segs.length;?j++)?{?? ????????????????????centers[i][j]?=?Double.parseDouble(segs[j]);?? ????????????????}?? ????????????}?? ????????}?? ?????????? ????????public?void?map(Object?key,?Text?value,?Context?context?? ?????????????????)?throws?IOException,?InterruptedException?{?? ?????????????? ????????????String?line?=?value.toString();?? ????????????String[]?segs?=?line.split(",");?? ????????????double[]?sample?=?new?double[segs.length];?? ????????????for(int?i?=?0;?i?<?segs.length;?i++)?{?? ????????????????sample[i]?=?Float.parseFloat(segs[i]);?? ????????????}?? ?????????????? ????????????double?min?=?Double.MAX_VALUE;?? ????????????int?index?=?0;?? ????????????for(int?i?=?0;?i?<?centers.length;?i++)?{?? ????????????????double?dis?=?distance(centers[i],?sample);?? ????????????????if(dis?<?min)?{?? ????????????????????min?=?dis;?? ????????????????????index?=?i;?? ????????????????}?? ????????????}?? ?????????????? ????????????context.write(new?Text(centerstrArray[index]),?new?Text(line));?? ????????}?? ????}?? ?? ????public?static?class?IntSumReducer??? ????extends?Reducer<Text,Text,NullWritable,Text>?{?? ?? ????????Counter?counter?=?null;?? ?????????? ????????public?void?reduce(Text?key,?Iterable<Text>?values,??? ????????????????????Context?context?? ????????????????????)?throws?IOException,?InterruptedException?{?? ?????????????? ????????????double[]?sum?=?new?double[Center.k];?? ????????????int?size?=?0;?? ?????????????? ????????????for(Text?text?:?values)?{?? ????????????????String[]?segs?=?text.toString().split(",");?? ????????????????for(int?i?=?0;?i?<?segs.length;?i++)?{?? ????????????????????sum[i]?+=?Double.parseDouble(segs[i]);?? ????????????????}?? ????????????????size?++;?? ????????????}?? ?????????????? ?????????????? ????????????StringBuffer?sb?=?new?StringBuffer();?? ????????????for(int?i?=?0;?i?<?sum.length;?i++)?{?? ????????????????sum[i]?/=?size;?? ????????????????sb.append(sum[i]);?? ????????????????sb.append(",");?? ????????????}?? ?????????????? ?????????????? ????????????boolean?flag?=?true;?? ????????????String[]?centerStrArray?=?key.toString().split(",");?? ????????????for(int?i?=?0;?i?<?centerStrArray.length;?i++)?{?? ????????????????if(Math.abs(Double.parseDouble(centerStrArray[i])?-?sum[i])?>?0.00000000001)?{?? ????????????????????flag?=?false;?? ????????????????????break;?? ????????????????}?? ????????????}?? ?????????????? ????????????if(flag)?{?? ????????????????counter?=?context.getCounter("myCounter",?"kmenasCounter");?? ????????????????counter.increment(1l);?? ????????????}?? ????????????context.write(null,?new?Text(sb.toString()));?? ????????}?? ????}?? ?? ????public?static?void?main(String[]?args)?throws?Exception?{?? ?? ????????Path?kMeansPath?=?new?Path("/dsap/middata/kmeans/kMeans");???? ????????Path?samplePath?=?new?Path("/dsap/middata/kmeans/sample");???? ?????????? ????????Center?center?=?new?Center();?? ????????String?centerString?=?center.loadInitCenter(kMeansPath);?? ?????????? ????????int?index?=?0;???? ????????while(index?<?5)?{?? ?????????????? ????????????Configuration?conf?=?new?Configuration();?? ????????????conf.set(FLAG,?centerString);????? ?????????????? ????????????kMeansPath?=?new?Path("/dsap/middata/kmeans/kMeans"?+?index);????? ?????????????? ?????????????? ????????????FileSystem?hdfs?=?FileSystem.get(conf);?? ????????????if(hdfs.exists(kMeansPath))?hdfs.delete(kMeansPath);?? ?? ????????????Job?job?=?new?Job(conf,?"kmeans"?+?index);??? ????????????job.setJarByClass(KmeansMR.class);?? ????????????job.setMapperClass(TokenizerMapper.class);?? ????????????job.setReducerClass(IntSumReducer.class);?? ????????????job.setOutputKeyClass(NullWritable.class);?? ????????????job.setOutputValueClass(Text.class);?? ????????????job.setMapOutputKeyClass(Text.class);?? ????????????job.setMapOutputValueClass(Text.class);?? ????????????FileInputFormat.addInputPath(job,?samplePath);?? ????????????FileOutputFormat.setOutputPath(job,?kMeansPath);?? ????????????job.waitForCompletion(true);?? ?????????????? ?????????????? ????????????long?counter?=?job.getCounters().getGroup("myCounter").findCounter("kmenasCounter").getValue();?? ????????????if(counter?==?Center.k)?System.exit(0);?? ?????????????? ????????????center?=?new?Center();?? ????????????centerString?=?center.loadCenter(kMeansPath);?? ?????????????? ????????????index?++;?? ????????}?? ????????System.exit(0);?? ????}?? ?????? ????public?static?double?distance(double[]?a,?double[]?b)?{?? ?????????? ????????if(a?==?null?||?b?==?null?||?a.length?!=?b.length)?return?Double.MAX_VALUE;?? ????????double?dis?=?0;?? ????????for(int?i?=?0;?i?<?a.length;?i++)?{?? ????????????dis?+=?Math.pow(a[i]?-?b[i],?2);?? ????????}?? ????????return?Math.sqrt(dis);?? ????}?? }?????
5. 結(jié)果
產(chǎn)生了兩個(gè)文件夾,分別是第一次、第二次迭代后的聚類(lèi)中心
最后的聚類(lèi)中心的內(nèi)容如下
from:?http://blog.csdn.net/nwpuwyk/article/details/29564249?utm_source=tuicool&utm_medium=referral
總結(jié)
以上是生活随笔為你收集整理的hadoop下实现kmeans算法——一个mapreduce的实现方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。