不少數據開發者使用bitmap技術對用戶數據進行編碼和壓縮,而後利用bitmap的與/或/非的極速處理速度,實現相似用戶畫像標籤的人羣篩選、運營分析的7日活躍等分析。
本文給出了一個使用MaxCompute MapReduce開發一個對不一樣日期活躍用戶ID進行bitmap編碼和計算的樣例。供感興趣的用戶進一步瞭解、分析,並應用在本身的場景下。java
import com.aliyun.odps.OdpsException; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.MapperBase; import com.aliyun.odps.mapred.ReducerBase; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Base64; import java.util.Iterator; public class bitmapDemo2 { public static class BitMapper extends MapperBase { Record key; Record value; @Override public void setup(TaskContext context) throws IOException { key = context.createMapOutputKeyRecord(); value = context.createMapOutputValueRecord(); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { RoaringBitmap mrb=new RoaringBitmap(); long AID=0; { { { { AID=record.getBigint("id"); mrb.add((int) AID); //獲取key key.set(new Object[] {record.getString("active_date")}); } } } } ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes()); mrb.serialize(new DataOutputStream(new OutputStream(){ ByteBuffer mBB; OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;} public void close() {} public void flush() {} public void write(int b) { mBB.put((byte) b);} public void write(byte[] b) {mBB.put(b);} public void write(byte[] b, int off, int l) {mBB.put(b,off,l);} }.init(outbb))); String serializedstring = Base64.getEncoder().encodeToString(outbb.array()); value.set(new Object[] {serializedstring}); context.write(key, value); } } public static class BitReducer extends ReducerBase { private Record result = null; public void setup(TaskContext context) throws IOException { result = context.createOutputRecord(); } public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException { long fcount = 0; RoaringBitmap rbm=new RoaringBitmap(); while (values.hasNext()) { Record val = values.next(); ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0))); ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb); RoaringBitmap p= new RoaringBitmap(irb); rbm.or(p); } ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes()); rbm.serialize(new DataOutputStream(new OutputStream(){ ByteBuffer mBB; OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;} public void close() {} public void flush() {} public void write(int b) { mBB.put((byte) b);} public void write(byte[] b) {mBB.put(b);} public void write(byte[] b, int off, int l) {mBB.put(b,off,l);} }.init(outbb))); String serializedstring = Base64.getEncoder().encodeToString(outbb.array()); result.set(0, key.get(0)); result.set(1, serializedstring); context.write(result); } } public static void main( String[] args ) throws OdpsException { System.out.println("begin........."); JobConf job = new JobConf(); job.setMapperClass(BitMapper.class); job.setReducerClass(BitReducer.class); job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("id:string")); InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job); // +------------+-------------+ // | id | active_date | // +------------+-------------+ // | 1 | 20190729 | // | 2 | 20190729 | // | 3 | 20190730 | // | 4 | 20190801 | // | 5 | 20190801 | // +------------+-------------+ OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job); // +-------------+------------+ // | active_date | bit_map | // +-------------+------------+ // 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D // 20190730,OjAAAAEAAAAAAAAAEAAAAAMA // 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D JobClient.runJob(job); } }
對Java應用打包後,上傳到MaxCompute項目中,便可在MaxCompute中調用該MR做業,對輸入表的數據按日期做爲key進行用戶id的編碼,同時按照相同日期對bitmap後的用戶id取OR操做(根據須要能夠取AND,例如存留場景),並將處理後的數據寫入目標結構表當中供後續處理使用。segmentfault
閱讀原文app
本文爲雲棲社區原創內容,未經容許不得轉載。ide