生活随笔
收集整理的這篇文章主要介紹了
如何在MaxCompute中利用bitmap进行数据处理?
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
很多數(shù)據(jù)開(kāi)發(fā)者使用bitmap技術(shù)對(duì)用戶(hù)數(shù)據(jù)進(jìn)行編碼和壓縮,然后利用bitmap的與/或/非的極速處理速度,實(shí)現(xiàn)類(lèi)似用戶(hù)畫(huà)像標(biāo)簽的人群篩選、運(yùn)營(yíng)分析的7日活躍等分析。
本文給出了一個(gè)使用MaxCompute MapReduce開(kāi)發(fā)一個(gè)對(duì)不同日期活躍用戶(hù)ID進(jìn)行bitmap編碼和計(jì)算的樣例。供感興趣的用戶(hù)進(jìn)一步了解、分析,并應(yīng)用在自己的場(chǎng)景下。
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;public class bitmapDemo2
{public static class BitMapper extends MapperBase {Record key;Record value;@Overridepublic void setup(TaskContext context) throws IOException {key = context.createMapOutputKeyRecord();value = context.createMapOutputValueRecord();}@Overridepublic void map(long recordNum, Record record, TaskContext context)throws IOException{RoaringBitmap mrb=new RoaringBitmap();long AID=0;{{{{AID=record.getBigint("id");mrb.add((int) AID);//獲取keykey.set(new Object[] {record.getString("active_date")});}}}}ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());mrb.serialize(new DataOutputStream(new OutputStream(){ByteBuffer mBB;OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}public void close() {}public void flush() {}public void write(int b) {mBB.put((byte) b);}public void write(byte[] b) {mBB.put(b);}public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}}.init(outbb)));String serializedstring = Base64.getEncoder().encodeToString(outbb.array());value.set(new Object[] {serializedstring});context.write(key, value);}}public static class BitReducer extends ReducerBase {private Record result = null;public void setup(TaskContext context) throws IOException {result = context.createOutputRecord();}public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {long fcount = 0;RoaringBitmap rbm=new RoaringBitmap();while (values.hasNext()){Record val = values.next();ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);RoaringBitmap p= new RoaringBitmap(irb);rbm.or(p);}ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());rbm.serialize(new DataOutputStream(new OutputStream(){ByteBuffer mBB;OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}public void close() {}public void flush() {}public void write(int b) {mBB.put((byte) b);}public void write(byte[] b) {mBB.put(b);}public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}}.init(outbb)));String serializedstring = Base64.getEncoder().encodeToString(outbb.array());result.set(0, key.get(0));result.set(1, serializedstring);context.write(result);}}public static void main( String[] args ) throws OdpsException{System.out.println("begin.........");JobConf job = new JobConf();job.setMapperClass(BitMapper.class);job.setReducerClass(BitReducer.class);job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3DJobClient.runJob(job);}
}
對(duì)Java應(yīng)用打包后,上傳到MaxCompute項(xiàng)目中,即可在MaxCompute中調(diào)用該MR作業(yè),對(duì)輸入表的數(shù)據(jù)按日期作為key進(jìn)行用戶(hù)id的編碼,同時(shí)按照相同日期對(duì)bitmap后的用戶(hù)id取OR操作(根據(jù)需要可以取AND,例如存留場(chǎng)景),并將處理后的數(shù)據(jù)寫(xiě)入目標(biāo)結(jié)構(gòu)表當(dāng)中供后續(xù)處理使用。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的如何在MaxCompute中利用bitmap进行数据处理?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。