日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[Hadoop] - 自定义Mapreduce InputFormatOutputFormat

發布時間:2023/12/13 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [Hadoop] - 自定义Mapreduce InputFormatOutputFormat 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  在MR程序的開發過程中,經常會遇到輸入數據不是HDFS或者數據輸出目的地不是HDFS的,MapReduce的設計已經考慮到這種情況,它為我們提供了兩個組建,只需要我們自定義適合的InputFormat和OutputFormat,就可以完成這個需求,這里簡單的介紹一個從MongoDB中讀數據,并寫出數據到MongoDB中的一種情況,只是一個Demo,所以數據隨便找的一個。


一、自定義InputFormat

  MapReduce中Map階段的數據輸入是由InputFormat決定的,我們查看org.apache.hadoop.mapreduce.InputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現InputFormat抽象類以外,我們還需要自定義InputSplit和自定義RecordReader類,這兩個類的主要作用分別是:split確定數據分片的大小以及數據的位置信息,recordReader具體的讀取數據。

public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 獲取Map階段的數據分片集合信息 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 創建具體的數據讀取對象 }

  1、自定義InputSplit

    自定義InputSplit主要需要實現的方法有一下幾個:

public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; // 獲取當前分片的長度大小 public abstract String[] getLocations() throws IOException, InterruptedException; // 獲取當前分片的位置信息 }

  2、自定義RecordReader

    自定義RecordReader的主要實現方法有一下幾個:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在構造函數中初始化了,那么該方法可以為空public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一個key/value,如果存在返回true。否則返回false。public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; // 獲取當然keypublic abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; // 獲取當然valuepublic abstract float getProgress() throws IOException, InterruptedException; // 獲取進度信息public abstract void close() throws IOException; // 關閉資源 }

二、自定義OutputFormat

  MapReduce中Reducer階段的數據輸出是由OutputFormat決定的,決定數據的輸出目的地和job的提交對象,我們查看org.apache.hadoop.mapreduce.OutputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現OutputFormat抽象類以外,我們還需要自定義RecordWriter和自定義OutputCommitter類,其中OutputCommitter類由于不涉及到具體的輸出目的地,所以一般情況下,不用重寫,可直接使用FileOutputcommitter對象;RecordWriter類是具體的定義如何將數據寫到目的地的。

public abstract class OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取具體的數據寫出對象public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 檢查輸出配置信息是否正確public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取輸出job的提交者對象 }

  1、自定義RecordWriter

    查看RecordWriter源碼,我們可以看到主要需要實現的有下列三個方法,分別是:

public abstract class RecordWriter<K, V> { public abstract void write(K key, V value) throws IOException, InterruptedException; // 具體的寫數據的方法public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 關閉資源 }

三、詳細代碼

  自定義InputFormat&InputSplit

1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.util.ArrayList; 7 import java.util.List; 8 import java.util.Map; 9 10 import org.apache.hadoop.conf.Configurable; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.Writable; 14 import org.apache.hadoop.mapreduce.InputFormat; 15 import org.apache.hadoop.mapreduce.InputSplit; 16 import org.apache.hadoop.mapreduce.JobContext; 17 import org.apache.hadoop.mapreduce.MRJobConfig; 18 import org.apache.hadoop.mapreduce.RecordReader; 19 import org.apache.hadoop.mapreduce.TaskAttemptContext; 20 import org.apache.log4j.Logger; 21 22 import com.mongodb.BasicDBObject; 23 import com.mongodb.BasicDBObjectBuilder; 24 import com.mongodb.DB; 25 import com.mongodb.DBCollection; 26 import com.mongodb.DBObject; 27 import com.mongodb.Mongo; 28 import com.mongodb.MongoException; 29 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable { 31 private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class); 32 33 /** 34 * 空的對象,主要作用是不進行任何操作,類似于NullWritable 35 */ 36 public static class NullMongoDBWritable implements MongoDBWritable, Writable { 37 @Override 38 public void write(DBCollection collection) throws MongoException { 39 // TODO Auto-generated method stub 40 } 41 42 @Override 43 public void readFields(DBObject object) throws MongoException { 44 // TODO Auto-generated method stub 45 } 46 47 @Override 48 public void write(DataOutput out) throws IOException { 49 // TODO Auto-generated method stub 50 } 51 52 @Override 53 public void readFields(DataInput in) throws IOException { 54 // TODO Auto-generated method stub 55 } 56 57 @Override 58 public DBObject fetchWriteDBObject(DBObject old) throws MongoException { 59 // TODO Auto-generated method stub 60 return old; 61 } 62 63 } 64 65 /** 66 * MongoDB的input split類 67 */ 68 public static class MongoDBInputSplit extends InputSplit implements Writable { 69 private long end = 0; 70 private long start = 0; 71 72 /** 73 * 默認構造方法 74 */ 75 public MongoDBInputSplit() { 76 } 77 78 /** 79 * 便利的構造方法 80 * 81 * @param start 82 * 集合中查詢的文檔開始行號 83 * @param end 84 * 集合中查詢的文檔結束行號 85 */ 86 public MongoDBInputSplit(long start, long end) { 87 this.start = start; 88 this.end = end; 89 } 90 91 public long getEnd() { 92 return end; 93 } 94 95 public long getStart() { 96 return start; 97 } 98 99 @Override 100 public void write(DataOutput out) throws IOException { 101 out.writeLong(this.start); 102 out.writeLong(this.end); 103 } 104 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 this.start = in.readLong(); 108 this.end = in.readLong(); 109 } 110 111 @Override 112 public long getLength() throws IOException, InterruptedException { 113 // 分片大小 114 return this.end - this.start; 115 } 116 117 @Override 118 public String[] getLocations() throws IOException, InterruptedException { 119 // TODO 返回一個空的數組,表示不進行數據本地化的優化,那么map執行節點隨機選擇。 120 return new String[] {}; 121 } 122 123 } 124 125 protected MongoDBConfiguration mongoConfiguration; // mongo相關配置信息 126 protected Mongo mongo; // mongo連接 127 protected String databaseName; // 連接的數據庫名稱 128 protected String collectionName; // 連接的集合名稱 129 protected DBObject conditionQuery; // 選擇條件 130 protected DBObject fieldQuery; // 需要的字段條件 131 132 @Override 133 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 134 DBCollection dbCollection = null; 135 try { 136 dbCollection = this.getDBCollection(); 137 // 獲取數量大小 138 long count = dbCollection.count(this.getConditionQuery()); 139 int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 140 long chunkSize = (count / chunks); // 分片數量 141 142 // 開始分片,只是簡單的分配每個分片的數據量 143 List<InputSplit> splits = new ArrayList<InputSplit>(); 144 for (int i = 0; i < chunks; i++) { 145 MongoDBInputSplit split = null; 146 if ((i + 1) == chunks) { 147 split = new MongoDBInputSplit(i * chunkSize, count); 148 } else { 149 split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize); 150 } 151 splits.add(split); 152 } 153 return splits; 154 } catch (Exception e) { 155 throw new IOException(e); 156 } finally { 157 dbCollection = null; 158 closeConnection(); // 關閉資源的連接 159 } 160 } 161 162 @Override 163 public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 164 return createRecordReader((MongoDBInputSplit) split, context.getConfiguration()); 165 } 166 167 protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) { 168 // 獲取從mongodb中讀取數據需要轉換成的value class,默認為NullMongoDBWritable 169 Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass(); 170 return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery()); 171 } 172 173 @Override 174 public void setConf(Configuration conf) { 175 mongoConfiguration = new MongoDBConfiguration(conf); 176 databaseName = this.mongoConfiguration.getInputDatabaseName(); // 輸入數據的數據庫 177 collectionName = this.mongoConfiguration.getInputCollectionName(); // 輸入數據的集合 178 getMongo(); // 初始化 179 getConditionQuery(); // 初始化 180 getFieldQuery(); // 初始化 181 } 182 183 @Override 184 public Configuration getConf() { 185 return this.mongoConfiguration.getConfiguration(); 186 } 187 188 public Mongo getMongo() { 189 try { 190 if (null == this.mongo) { 191 this.mongo = this.mongoConfiguration.getMongoConnection(); 192 } 193 } catch (Exception e) { 194 throw new RuntimeException(e); 195 } 196 return mongo; 197 } 198 199 public DBObject getConditionQuery() { 200 if (null == this.conditionQuery) { 201 Map<String, String> conditions = this.mongoConfiguration.getInputConditions(); 202 BasicDBObjectBuilder builder = new BasicDBObjectBuilder(); 203 for (Map.Entry<String, String> entry : conditions.entrySet()) { 204 if (entry.getValue() != null) { 205 builder.append(entry.getKey(), entry.getValue()); 206 } else { 207 builder.push(entry.getKey()); 208 } 209 } 210 if (builder.isEmpty()) { 211 this.conditionQuery = new BasicDBObject(); 212 } else { 213 this.conditionQuery = builder.get(); 214 } 215 } 216 return this.conditionQuery; 217 } 218 219 public DBObject getFieldQuery() { 220 if (fieldQuery == null) { 221 String[] fields = this.mongoConfiguration.getInputFieldNames(); 222 if (fields != null && fields.length > 0) { 223 BasicDBObjectBuilder builder = new BasicDBObjectBuilder(); 224 for (String field : fields) { 225 builder.push(field); 226 } 227 fieldQuery = builder.get(); 228 } else { 229 fieldQuery = new BasicDBObject(); 230 } 231 } 232 return fieldQuery; 233 } 234 235 protected DBCollection getDBCollection() { 236 DB db = getMongo().getDB(this.databaseName); 237 if (this.mongoConfiguration.isEnableAuth()) { 238 String username = this.mongoConfiguration.getUsername(); 239 String password = this.mongoConfiguration.getPassword(); 240 if (!db.authenticate(username, password.toCharArray())) { 241 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password); 242 } 243 } 244 return db.getCollection(collectionName); 245 } 246 247 protected void closeConnection() { 248 try { 249 if (null != this.mongo) { 250 this.mongo.close(); 251 this.mongo = null; 252 } 253 } catch (Exception e) { 254 LOG.debug("Exception on close", e); 255 } 256 } 257 } MongoDBInputFormat.java

  自定義RecordReader

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.ReflectionUtils;import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject;public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {private Class<? extends MongoDBWritable> valueClass;private LongWritable key;private T value;private long pos;private Configuration conf;private MongoDBInputFormat.MongoDBInputSplit split;private DBCollection collection;private DBObject conditionQuery;private DBObject fieldQuery;private DBCursor cursor;public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,DBObject fieldQuery) {this.split = split;this.valueClass = valueClass;this.collection = collection;this.conditionQuery = conditionQuery;this.fieldQuery = fieldQuery;this.conf = conf;}@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// do nothing }@SuppressWarnings("unchecked")@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (key == null) {key = new LongWritable();}if (value == null) {value = (T) ReflectionUtils.newInstance(valueClass, conf);}if (null == cursor) {cursor = executeQuery();}if (!cursor.hasNext()) {return false;}key.set(pos + split.getStart()); // 設置keyvalue.readFields(cursor.next()); // 設置valuepos++;} catch (Exception e) {throw new IOException("Exception in nextKeyValue", e);}return true;}protected DBCursor executeQuery() {try {return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());} catch (IOException | InterruptedException e) {throw new RuntimeException(e);}}@Overridepublic LongWritable getCurrentKey() throws IOException, InterruptedException {return this.key;}@Overridepublic T getCurrentValue() throws IOException, InterruptedException {return this.value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return pos;}@Overridepublic void close() throws IOException {if (collection != null) {collection.getDB().getMongo().close();}}} MongoDBRecordReader.java

  自定義OutputFormat&RecordWriter

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger;import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.Mongo;public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);/*** A RecordWriter that writes the reduce output to a MongoDB collection* * @param <K>* @param <T>*/public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {private Mongo mongo;private String databaseName;private String collectionName;private MongoDBConfiguration dbConf;private DBCollection dbCollection;private DBObject dbObject;private boolean enableFetchMethod;public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {this.mongo = mongo;this.databaseName = databaseName;this.collectionName = collectionName;this.dbConf = dbConf;this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();getDbCollection();// 創建連接 }protected DBCollection getDbCollection() {if (null == this.dbCollection) {DB db = this.mongo.getDB(this.databaseName);if (this.dbConf.isEnableAuth()) {String username = this.dbConf.getUsername();String password = this.dbConf.getPassword();if (!db.authenticate(username, password.toCharArray())) {throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);}}this.dbCollection = db.getCollection(this.collectionName);}return this.dbCollection;}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {if (this.enableFetchMethod) {this.dbObject = key.fetchWriteDBObject(null);this.dbObject = value.fetchWriteDBObject(this.dbObject);// 寫數據this.dbCollection.insert(this.dbObject);// 在這里可以做一個緩存,一起提交,如果數據量大的情況下。this.dbObject = null;} else {// 直接調用寫方法 key.write(dbCollection);value.write(dbCollection);}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (this.mongo != null) {this.dbCollection = null;this.mongo.close();}}}@Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {try {MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());String databaseName = dbConf.getOutputDatabaseName();String collectionName = dbConf.getOutputCollectionName();Mongo mongo = dbConf.getMongoConnection();return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);} catch (Exception e) {LOG.error("Create the record writer occur exception.", e);throw new IOException(e);}}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {// 不進行檢測 }@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {// 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我們這里直接使用FileOutputCommitterreturn new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);}/*** 設置output屬性* * @param job* @param databaseName* @param collectionName*/public static void setOutput(Job job, String databaseName, String collectionName) {job.setOutputFormatClass(MongoDBOutputFormat.class);job.setReduceSpeculativeExecution(false);MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());mdc.setOutputCollectionName(collectionName);mdc.setOutputDatabaseName(databaseName);}/*** 靜止使用fetch方法* * @param conf*/public static void disableFetchMethod(Configuration conf) {conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);} } MongoDBOutputFormat.java

  其他涉及到的java代碼

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.db.DBWritable;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable; import com.mongodb.Mongo; import com.mongodb.ServerAddress;public class MongoDBConfiguration {public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";// 在recordwriter中到底是否調用fetch方法,默認調用。如果設置為不調用,那么就直接使用writer方法public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";private Configuration conf;public MongoDBConfiguration(Configuration conf) {this.conf = conf;}/*** 獲取Configuration對象* * @return*/public Configuration getConfiguration() {return this.conf;}/*** 設置連接信息* * @param host* @param port* @return*/public MongoDBConfiguration configureDB(String host, int port) {return this.configureDB(host, port, false, null, null);}/*** 設置連接信息* * @param host* @param port* @param enableAuth* @param username* @param password* @return*/public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {this.conf.set(BIND_HOST_PROPERTY, host);this.conf.setInt(BIND_PORT_PROPERTY, port);if (enableAuth) {this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);this.conf.set(USERNAME_PROPERTY, username);this.conf.set(PASSWORD_PROPERTY, password);}return this;}/*** 獲取MongoDB的連接對象Connection對象* * @return* @throws UnknownHostException*/public Mongo getMongoConnection() throws UnknownHostException {return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));}/*** 獲取設置的host* * @return*/public String getBindHost() {return this.conf.get(BIND_HOST_PROPERTY, "localhost");}/*** 獲取設置的port* * @return*/public int getBindPort() {return this.conf.getInt(BIND_PORT_PROPERTY, 27017);}/*** 獲取是否開啟安全驗證,默認的Mongodb是不開啟的。* * @return*/public boolean isEnableAuth() {return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);}/*** 獲取完全驗證所需要的用戶名* * @return*/public String getUsername() {return this.conf.get(USERNAME_PROPERTY);}/*** 獲取安全驗證所需要的密碼* * @return*/public String getPassword() {return this.conf.get(PASSWORD_PROPERTY);}public String getPartition() {return conf.get(PARTITION_PROPERTY, "|");}public MongoDBConfiguration setPartition(String partition) {conf.set(PARTITION_PROPERTY, partition);return this;}public String getInputDatabaseName() {return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setInputDatabaseName(String databaseName) {conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getInputCollectionName() {return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");}public void setInputCollectionName(String tableName) {conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);}public String[] getInputFieldNames() {return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);}public void setInputFieldNames(String... fieldNames) {conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);}public Map<String, String> getInputConditions() {Map<String, String> result = new HashMap<String, String>();String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);if (conditions != null && conditions.length > 0) {String partition = this.getPartition();String[] values = null;for (String condition : conditions) {values = condition.split(partition);if (values != null && values.length == 2) {result.put(values[0], values[1]);} else {result.put(condition, null);}}}return result;}public void setInputConditions(Map<String, String> conditions) {if (conditions != null && conditions.size() > 0) {String[] values = new String[conditions.size()];String partition = this.getPartition();int k = 0;for (Map.Entry<String, String> entry : conditions.entrySet()) {if (entry.getValue() != null) {values[k++] = entry.getKey() + partition + entry.getValue();} else {values[k++] = entry.getKey();}}conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);}}public Class<? extends MongoDBWritable> getValueClass() {return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);}public void setInputClass(Class<? extends DBWritable> inputClass) {conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);}public String getOutputDatabaseName() {return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setOutputDatabaseName(String databaseName) {conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getOutputCollectionName() {return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");}public void setOutputCollectionName(String tableName) {conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);}public boolean isEnableUseFetchMethod() {return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);}public void setOutputUseFetchMethod(boolean useFetchMethod) {conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);} } MongoDBConfiguration.java package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoException;public interface MongoDBWritable {/*** 往mongodb的集合中寫數據* * @param collection* @throws MongoException*/public void write(DBCollection collection) throws MongoException;/*** 獲取要寫的mongoDB對象* * @param old* @return* @throws MongoException*/public DBObject fetchWriteDBObject(DBObject old) throws MongoException;/*** 從mongodb的集合中讀數據* * @param collection* @throws MongoException*/public void readFields(DBObject object) throws MongoException; } MongoDBWritable.java package com.gerry.mongo.hadoop2x.mr.mongodb.nw;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Date; import java.util.HashSet; import java.util.Set;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable; import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObjectBuilder; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoException;public class Demo {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// 設置輸入的mongodb的數據庫和集合,以及對應的輸入對象value,這里的數據庫和集合要求存在,否則是沒有數據的,當然沒有數據不會出問題conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);Job job = Job.getInstance(conf, "mongodb-demo");job.setJarByClass(Demo.class);job.setMapperClass(DemoMapper.class);job.setReducerClass(DemoReducer.class);job.setOutputKeyClass(DemoInputValueAndOutputKey.class);job.setOutputValueClass(DemoOutputValue.class);job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);job.setMapOutputValueClass(NullWritable.class);job.setInputFormatClass(MongoDBInputFormat.class);MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 這個可以不存在 job.waitForCompletion(true);}public static class DemoOutputValue implements Writable, MongoDBWritable {private Date clientTime;private long count;@Overridepublic void write(DBCollection collection) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加當前對象的value值,如果存在同樣的key,那么加序號builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);return builder.get();}@Overridepublic void readFields(DBObject object) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.clientTime.getTime());out.writeLong(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.clientTime = new Date(in.readLong());this.count = in.readLong();}public Date getClientTime() {return clientTime;}public void setClientTime(Date clientTime) {this.clientTime = clientTime;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}}public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {private String name;private Integer age;private String sex;@Overridepublic void write(DataOutput out) throws IOException {if (this.name == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.name);}if (this.age == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeInt(this.age);}if (this.sex == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.sex);}}@Overridepublic void readFields(DataInput in) throws IOException {this.name = in.readBoolean() ? in.readUTF() : null;this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;this.sex = in.readBoolean() ? in.readUTF() : null;}@Overridepublic void write(DBCollection collection) throws MongoException {DBObject object = new BasicDBObject();object.put("name", this.name);object.put("age", this.age.intValue());object.put("sex", this.sex);collection.insert(object);}@Overridepublic void readFields(DBObject object) throws MongoException {this.name = (String) object.get("name");this.age = (Integer) object.get("age");this.sex = (String) object.get("sex");}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加當前對象的value值,如果存在同樣的key,那么加序號if (this.name != null) {builder.append(getKey(keys, "name", 0), this.name);}if (this.age != null) {builder.append(getKey(keys, "age", 0), this.age.intValue());}if (this.sex != null) {builder.append(getKey(keys, "sex", 0), this.sex);}return builder.get();}@Overridepublic String toString() {return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";}@Overridepublic int compareTo(DemoInputValueAndOutputKey o) {int tmp;if (this.name == null) {if (o.name != null) {return -1;}} else if (o.name == null) {return 1;} else {tmp = this.name.compareTo(o.name);if (tmp != 0) {return tmp;}}if (this.age == null) {if (o.age != null) {return -1;}} else if (o.age == null) {return 1;} else {tmp = this.age - o.age;if (tmp != 0) {return tmp;}}if (this.sex == null) {if (o.sex != null) {return -1;}} else if (o.sex == null) {return 1;} else {return this.sex.compareTo(o.sex);}return 0;}}/*** 直接輸出* * @author jsliuming* */public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {@Overrideprotected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}/*** 寫出數據,只做一個統計操作* * @author jsliuming* */public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {private DemoOutputValue outputValue = new DemoOutputValue();@Overrideprotected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (@SuppressWarnings("unused")NullWritable value : values) {sum++;}outputValue.setClientTime(new Date());outputValue.setCount(sum);context.write(key, outputValue);}}/*** 轉換key,作用是當key存在keys集合中的時候,在key后面添加序號* * @param keys* @param key* @param index* @return*/public static String getKey(Set<String> keys, String key, int index) {while (keys.contains(key)) {key = key + (index++);}return key;} } Demo

四、結果截圖

?

轉載于:https://www.cnblogs.com/liuming1992/p/4758504.html

總結

以上是生活随笔為你收集整理的[Hadoop] - 自定义Mapreduce InputFormatOutputFormat的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

欧美日bb| 国产激情小视频在线观看 | 999国内精品永久免费视频 | 国产一级精品绿帽视频 | 国产精品一区二区吃奶在线观看 | 狠狠色丁香婷婷综合视频 | 久久精品久久精品久久39 | 毛片网站观看 | 在线国产视频一区 | 亚洲综合在线五月 | 久久成年视频 | 欧美一级视频在线观看 | 麻豆精品国产传媒 | 国产精品高清免费在线观看 | 成人a视频在线观看 | 日韩av伦理片 | 91av蜜桃 | 亚洲精品乱码久久久久 | 国产黄色免费观看 | 国产精品都在这里 | 激情亚洲综合在线 | 亚洲黄色片在线 | 日韩av一区二区三区四区 | 日日噜噜噜噜夜夜爽亚洲精品 | 色视频网站在线观看一=区 a视频免费在线观看 | 波多野结衣一区 | 国产护士hd高朝护士1 | 91av视频导航 | 一本—道久久a久久精品蜜桃 | 91av99| 美女黄色网在线播放 | 深夜免费福利视频 | 日韩黄色一级电影 | 日日躁天天躁 | 伊人五月婷 | 久久激情小视频 | 精品一区电影国产 | 欧美伦理电影一区二区 | 精品伦理一区二区三区 | 久青草视频 | 日韩av手机在线观看 | 成人毛片在线视频 | 免费色视频网址 | 77国产精品 | 永久免费在线 | 亚洲最新av在线网站 | 国内精品久久久久久中文字幕 | 五月婷婷在线视频观看 | 国产va饥渴难耐女保洁员在线观看 | 免费成人在线视频网站 | 999国产在线 | 人人爽爽人人 | 东方av在线免费观看 | 亚洲aⅴ乱码精品成人区 | 91黄色免费看| 999久久久欧美日韩黑人 | 超碰公开在线观看 | 911亚洲精品第一 | 日韩在线免费不卡 | 深夜免费福利视频 | 国产精品久久久久久麻豆一区 | 中文字幕高清免费日韩视频在线 | 97香蕉久久超级碰碰高清版 | 亚洲精品自拍视频在线观看 | 亚洲经典在线 | 丁香视频免费观看 | 一区二区视频在线免费观看 | 91精品啪在线观看国产81旧版 | 欧美一级电影在线观看 | 日本在线观看中文字幕无线观看 | 久久久国产精品亚洲一区 | 中文字幕观看在线 | 国产又粗又猛又黄又爽视频 | 亚洲欧洲精品视频 | 一二三区在线 | 欧美日韩亚洲在线观看 | 精品中文字幕在线观看 | 国产成人精品久 | 97香蕉视频| 91av短视频| 色婷婷久久久综合中文字幕 | 成人免费观看电影 | 天天久久夜夜 | 色婷婷激情网 | 日韩欧美视频在线播放 | 日韩羞羞 | 狠狠色丁香婷婷综合久小说久 | 久久a国产| 日日夜夜人人精品 | 日韩午夜高清 | 欧洲不卡av | 97国产视频 | av不卡免费在线观看 | 国产精品视频大全 | 国产小视频精品 | 97成人在线视频 | 综合久久久 | 国产精品亚洲综合久久 | 免费一级特黄录像 | 成人久久久久久久久 | 天天操天天干天天操天天干 | 亚洲激情网站免费观看 | 日本中文字幕视频 | 亚洲综合色丁香婷婷六月图片 | 天天操天天怕 | 97超碰在线免费观看 | 亚洲成人黄色在线观看 | 少妇18xxxx性xxxx片 | 激情中文在线 | 天操夜夜操 | 国产在线看一区 | 国产精品色婷婷视频 | 国产在线播放观看 | 色婷婷狠狠 | 久久视频一区 | 国产精品成人av电影 | 国产又粗又猛又黄 | 亚洲免费观看视频 | 一级国产视频 | 国产丝袜在线 | 中文字幕乱码亚洲精品一区 | 亚洲国产高清在线观看视频 | 国产精品黄网站在线观看 | 黄网站免费久久 | 97在线精品视频 | 久久 在线 | 国产四虎在线 | 国产三级午夜理伦三级 | 国产精品久久久亚洲 | 久久久久久久久久久影院 | 国产免费中文字幕 | 久久久九色精品国产一区二区三区 | av中文字幕网址 | 国产精品成人久久 | 福利一区在线 | av亚洲产国偷v产偷v自拍小说 | 天天操天天操天天操天天 | 五月婷香蕉久色在线看 | 丁香九月激情 | 亚州精品天堂中文字幕 | 天天干天天干天天射 | 五月婷婷导航 | 狠狠干夜夜操天天爽 | 日韩高清在线一区二区 | 狠狠做六月爱婷婷综合aⅴ 日本高清免费中文字幕 | 欧美日韩国产精品爽爽 | 亚洲精品中文字幕在线观看 | 久久精品一区八戒影视 | 国产精品久久影院 | 99色网站| 毛片随便看 | 亚洲精品免费在线观看视频 | 久久久麻豆 | 国内精品久久久久国产 | 96久久精品 | 久久国产免费 | 婷婷六月综合亚洲 | 国产一区成人在线 | 麻豆视频免费入口 | 亚洲美女精品区人人人人 | 亚洲综合色丁香婷婷六月图片 | a级国产乱理伦片在线观看 亚洲3级 | 久久精品视频5 | 国产一区二区在线免费观看 | 亚洲国产97在线精品一区 | 国产成人专区 | 黄色三级免费网址 | 男女免费视频观看 | 久久噜噜少妇网站 | 成年人免费观看国产 | 国产精品热视频 | 在线观看一级 | 亚洲一区久久久 | 欧美特一级 | 国产精品99爱 | 涩涩在线 | 三级av在线 | 啪啪免费视频网站 | 在线黄色毛片 | 日韩视频区 | 中文在线字幕免费观看 | 久草在线综合网 | 亚洲精品视频网址 | 狠狠干夜夜操 | 久久y | av线上免费观看 | 夜夜躁天天躁很躁波 | 亚洲黄色在线观看 | 黄色免费观看网址 | 国产精品热 | 超碰伊人网 | 婷婷色在线视频 | 成年人在线视频观看 | 国产尤物在线观看 | 韩日视频在线 | 狠狠插狠狠干 | 友田真希av | 国产91全国探花系列在线播放 | 日日操日日插 | 色网av| 国产精品入口麻豆www | 免费观看一区二区三区视频 | 日韩三区在线 | 天天做日日做天天爽视频免费 | 久久久久激情 | 亚洲视频h| 久久激情五月丁香伊人 | 日韩精品免费在线观看视频 | 黄色成品视频 | 91精品久久久久久粉嫩 | 天天看天天操 | 黄色免费电影网站 | 国产精品99久久久 | 国产一级久久久 | 亚洲黄色在线观看 | 成年人在线观看网站 | 亚洲九九| 久久精彩 | 亚洲精品免费播放 | 97超碰总站 | 综合久久婷婷 | 最新在线你懂的 | 欧美一级黄大片 | 国产自产高清不卡 | 色狠狠操 | 久久国产精品99国产精 | 精品久久国产一区 | 天天夜夜狠狠操 | 日本性生活免费看 | va视频在线观看 | 免费看一级片 | 国产综合精品一区二区三区 | 久久免费视频在线观看 | 99视频精品| 92中文资源在线 | 国产男女爽爽爽免费视频 | 亚洲精品视频免费看 | 久久国产热 | 波多野结衣在线观看一区二区三区 | 欧美在线视频精品 | 一区二区三区韩国免费中文网站 | 久久99精品久久久久久 | 亚洲狠狠婷婷综合久久久 | 国产69久久久 | 国产精品毛片一区二区 | 中文字幕一区二区三区四区视频 | 久久精品99精品国产香蕉 | 亚洲精品午夜视频 | 99久久精品国产欧美主题曲 | 日韩中文免费视频 | 一本到在线 | 久草在线观 | 欧美孕交vivoestv另类 | 精品国产91亚洲一区二区三区www | 999亚洲国产996395 | 久久99久久99精品中文字幕 | 999在线视频 | 日韩在线免费小视频 | 91福利免费 | 日韩午夜视频在线观看 | 99免费看片 | 国产 成人 久久 | 亚洲va欧美 | 久久av黄色| 欧美精品免费在线观看 | 久久国产一区二区 | 激情欧美一区二区免费视频 | 久久国产品 | 亚洲精品综合久久 | 一级理论片在线观看 | 激情网在线观看 | 黄色视屏免费在线观看 | 九九热在线播放 | 精品美女久久久久 | 成人福利在线 | 444av| 在线播放一区二区三区 | 成年人视频在线免费 | 国产精品久久久久影视 | 成人av免费网站 | 黄色免费大片 | 欧美另类高清 videos | 日本中文乱码卡一卡二新区 | 国产精品久久久久久a | 久99久精品视频免费观看 | 日韩精品视频在线观看免费 | 日b视频国产 | 99久久精品视频免费 | www久久国产| 久久电影网站中文字幕 | 亚洲一一在线 | 亚洲精品免费播放 | 丁香花在线视频观看免费 | 国产99久久久久 | 久久99国产精品自在自在app | 天天干天天操天天拍 | 亚洲精品国产拍在线 | 视频高清 | 一级免费黄视频 | 亚洲国产精品成人va在线观看 | 天天操天天射天天爱 | 黄色午夜网站 | 免费热情视频 | 免费av电影网站 | 99精品久久久久久久久久综合 | 亚洲天天做 | 91亚洲欧美激情 | 国产做a爱一级久久 | 丝袜美腿av | 中文字幕资源在线 | 国产精品网红福利 | 91爱爱电影 | 日韩黄色影院 | 亚洲国产日韩一区 | 成人av资源 | 欧美日韩一区二区久久 | 天堂在线一区二区三区 | 亚洲电影一区二区 | 国产精品色视频 | 久久一级片 | 美女网站久久 | 国产精品国产三级国产aⅴ入口 | 国产日韩在线观看一区 | 不卡av在线 | 麻豆影视在线免费观看 | 国产精品欧美久久 | 国产精品丝袜在线 | 中文字幕在线日 | 国产精品欧美一区二区三区不卡 | 91免费观看视频网站 | 欧洲亚洲国产视频 | 国产精品一区二区久久 | 国产1区2区 | 日韩精品免费在线观看视频 | 日韩av午夜在线观看 | 国产精品你懂的在线观看 | 国产精品一区二区三区久久 | 久久精品国产精品亚洲 | 在线91精品 | 国产在线播放一区二区三区 | 国产不卡视频在线 | 亚洲欧美乱综合图片区小说区 | 中文在线最新版天堂 | 国产亚洲精品久久久久久无几年桃 | 国产日韩视频在线 | 激情综合色综合久久 | 九九视频在线播放 | 成人三级黄色 | 美女亚洲精品 | 91av电影| 亚洲精品综合一二三区在线观看 | 久久精品在线 | 久久人人爽爽人人爽人人片av | 99视频精品 | 国产a国产a国产a | 97在线观看免费视频 | 国产一级电影在线 | 国产丝袜制服在线 | 成人免费视频网址 | 色综合久久综合中文综合网 | 在线观看日韩专区 | 国产精品激情偷乱一区二区∴ | 日韩一区在线播放 | 久久综合影视 | 亚洲黄色区 | 天天干人人干 | 国产精品精品久久久久久 | 五月婷婷丁香综合 | 三日本三级少妇三级99 | 欧美专区国产专区 | 五月天丁香视频 | 亚洲精品视频在线观看视频 | 超碰97中文 | 99久久精品电影 | 久草在线一免费新视频 | 国产中文字幕视频在线观看 | 国产精品福利一区 | 91精品免费在线观看 | 国产精品一区二区精品视频免费看 | 999久久久久久久久 69av视频在线观看 | 成人在线观看你懂的 | 91视频免费网址 | 黄色一级在线免费观看 | 亚洲天堂网视频在线观看 | 少妇激情久久 | 久久一区二 | 日韩美精品视频 | 日韩免费福利 | 97超碰人人模人人人爽人人爱 | 久久av中文字幕片 | 玖玖精品在线 | 成人毛片100免费观看 | av免费电影在线观看 | 久久久久久久免费看 | 成av人电影 | 91精品国产一区二区在线观看 | 九九九热 | 99视频在线看 | 欧美一级黄色视屏 | 亚洲国产精品成人女人久久 | 日韩免费一区 | 国产正在播放 | 三日本三级少妇三级99 | 欧美一级片在线观看视频 | 激情综合网五月激情 | 日韩电影久久久 | 久久精品国产亚洲精品 | 伊人五月 | 亚洲aⅴ在线观看 | 日韩专区中文字幕 | 麻豆视频免费播放 | 亚洲高清不卡av | 免费看三级网站 | 特级黄色电影 | 69中文字幕 | 黄色片软件网站 | 国产精品嫩草影视久久久 | 激情中文在线 | 亚洲精品在线观看av | 色.com| 粉嫩一区二区三区粉嫩91 | 网址你懂的在线观看 | 欧美一级片在线播放 | 国产在线精品视频 | 日韩精品一区二区免费 | www最近高清中文国语在线观看 | 99精品网站 | 国产黄色一级片 | 小草av在线播放 | 亚洲女同videos | 免费视频区 | 亚洲精品视频一二三 | 国产 精品 资源 | 操操操操网| 日韩在线高清免费视频 | 在线免费观看黄色小说 | 国产高清免费av | 欧美激情视频免费看 | 在线看片91 | 国产精品91一区 | 在线成人性视频 | 亚洲精品乱码久久久久久蜜桃欧美 | 久久99久久99精品 | 久热电影 | 9797在线看片亚洲精品 | 精品免费久久 | 欧美日韩视频免费看 | 精品一区二区6 | 午夜精品久久久久久中宇69 | 国产色爽 | 91九色网站| 一区二区三区高清 | av片在线观看免费 | 综合色婷婷 | 国产精品永久久久久久久久久 | 国产在线视频在线观看 | 中文字幕av在线 | 最近日本韩国中文字幕 | 香蕉视频免费看 | 免费在线观看一级片 | 国产精品a久久久久 | 美女久久久久久久久久久 | 日韩欧美精品在线观看视频 | 亚洲精品在线观看网站 | 久久久首页| 黄色大片av | 国产精品色婷婷视频 | 国产精品国产三级国产aⅴ9色 | 久久国产精品99国产精 | 国产一级大片免费看 | 欧美日韩观看 | 91在线看黄 | 国产高清黄 | 国产99久久| 亚洲精品456在线播放乱码 | 热久久电影 | 蜜桃传媒一区二区 | 成年免费在线视频 | 中文字幕国产在线 | 成人免费在线播放视频 | 久久刺激视频 | 黄av在线 | 日韩欧美极品 | 色a网 | 中文亚洲欧美日韩 | 亚洲国产成人av网 | 精品一区免费 | 日本丶国产丶欧美色综合 | 欧美激情视频一区二区三区免费 | 1024手机基地在线观看 | 成人免费色 | 在线观看免费版高清版 | 综合激情伊人 | 色av资源网 | 人人草在线视频 | 国产精品自产拍在线观看 | 综合色综合| 国产精品电影在线 | 中文字幕日本电影 | 久久在线 | 亚洲日本一区二区在线 | 国产在线观看你懂得 | 日韩www在线 | 国产在线视频一区二区 | 久艹视频免费观看 | 一区二区三区三区在线 | 成人免费观看视频网站 | 精品国产乱码久久久久 | 少妇bbr搡bbb搡bbb | 精品国产一区二 | 国产一级免费视频 | 91麻豆传媒 | 日韩免费福利 | 成人av久久 | 国产中文字幕第一页 | 美女福利视频一区二区 | 日日夜夜亚洲 | 国产精品视频在线观看 | 人人精品久久 | 久久伊人八月婷婷综合激情 | av网站有哪些 | 天天摸天天操天天舔 | 日韩久久精品一区二区三区 | 久久国产午夜精品理论片最新版本 | 亚洲精品视频在线观看免费视频 | 色久天 | a天堂最新版中文在线地址 久久99久久精品国产 | 日韩黄色在线观看 | 99久久网站| 成人蜜桃视频 | 午夜精品久久久久久久99热影院 | 五月av在线 | 欧美一区影院 | 精品久久久久一区二区国产 | 久久精品国产免费观看 | av大全在线 | 成人av中文字幕在线观看 | 4p变态网欧美系列 | 黄色在线观看免费网站 | 日韩在线观看中文字幕 | 激情网婷婷 | 国产午夜不卡 | 国产成人久久av977小说 | 午夜精品久久久久久久99婷婷 | 精品久久久久久亚洲综合网站 | 国产视频亚洲视频 | 在线观看日韩精品视频 | 日韩色高清| 久久成人国产精品免费软件 | 婷婷久久久| 国产在线观看地址 | 27xxoo无遮挡动态视频 | 一区二区影视 | www.五月天激情 | 亚洲国内精品在线 | 欧美另类69 | 91av视频观看 | 国产在线精品一区 | 欧美色图亚洲图片 | 久久夜色精品国产欧美一区麻豆 | 91大神精品视频 | 黄色成人免费电影 | 国产精品99久久免费黑人 | 亚洲精品中文字幕视频 | 久久精品一区二区三 | 日韩福利在线观看 | 国产成人精品午夜在线播放 | 欧美在线视频二区 | 十八岁以下禁止观看的1000个网站 | 国产第一页在线观看 | 亚洲国产97在线精品一区 | 亚洲视频大全 | 久久久久久久久久久影院 | av在线小说| 日韩国产精品久久久久久亚洲 | 成年人看片 | 51精品国自产在线 | 国产黄色片在线免费观看 | 国产又粗又猛又黄又爽 | 天堂av官网 | 国产精品露脸在线 | www.久草.com| 97在线观看免费高清完整版在线观看 | 亚洲国产大片 | 国产一区二区三区四区大秀 | 久久久性 | 成人久久久久久久久久 | 久久久九色精品国产一区二区三区 | 亚洲第一av在线 | 成人a视频片观看免费 | 欧美日韩国产在线观看 | 亚洲天天在线 | 日韩免费三区 | 国产一区二区精品久久 | 免费h精品视频在线播放 | 一级大片在线观看 | 中文字幕一区二区三区在线视频 | 中文字幕色网站 | 日日干夜夜操视频 | 国产精品都在这里 | 亚洲精品国产自产拍在线观看 | 91九色最新地址 | 伊人色播 | 成人高清在线 | 中文字幕av最新 | 久久九九免费视频 | 综合色天天 | 国产日韩精品一区二区 | 国产精品美女在线 | 一区中文字幕在线观看 | 天天草天天爽 | 日韩一区二区免费播放 | 夜夜操网站 | 国产成人三级三级三级97 | 在线观看日韩专区 | 亚洲精品大全 | 久久国产经典视频 | 狠狠狠色| 国产成人在线网站 | 日韩在线观看高清 | 精品国产一区二区久久 | 免费看wwwwwwwwwww的视频 久久久久久99精品 91中文字幕视频 | 国产精品久久久久久一区二区 | 在线观看国产一区 | 在线观看av网站 | 美女视频黄是免费的 | 国产呻吟在线 | 久久久久国产精品午夜一区 | 成片人卡1卡2卡3手机免费看 | av免费在线网| 在线看中文字幕 | 夜夜躁日日躁狠狠久久88av | 国产精品xxxx18a99 | 国产一区二区在线免费观看 | 午夜精品视频免费在线观看 | 亚洲第一香蕉视频 | 亚洲欧洲国产精品 | 欧美一级看片 | 精品久久久久久国产91 | 狠狠躁夜夜躁人人爽视频 | 日躁夜躁狠狠躁2001 | 国产在线观看99 | 精品99视频 | 黄色成人免费电影 | 久久欧美综合 | 伊人狠狠色丁香婷婷综合 | 在线观看日韩专区 | 久久免费精品一区二区三区 | 一级全黄毛片 | 一区二区三区四区久久 | www免费看| av成人在线播放 | 亚洲成aⅴ人片久久青草影院 | 免费网站在线 | 国产成人性色生活片 | 人人干在线观看 | 亚洲国产人午在线一二区 | 五月婷婷综合在线视频 | 国偷自产视频一区二区久 | 婷五月激情 | 96亚洲精品久久久蜜桃 | 福利视频区 | 六月丁香综合 | 夜添久久精品亚洲国产精品 | 成人一区二区三区在线观看 | 成人免费av电影 | 玖玖精品视频 | 色婷婷综合五月 | 午夜婷婷综合 | 国产网站在线免费观看 | 午夜三级毛片 | 国产午夜精品久久 | 深爱激情亚洲 | 亚洲污视频 | 国产成人av福利 | 五月亚洲综合 | 日本黄色大片免费 | 69中文字幕 | 狠狠操影视 | 色九九视频| 久久久久久久久久伊人 | 国产精品99爱| 日韩一区在线播放 | 美女免费视频网站 | 国产成人精品午夜在线播放 | 国产亚洲精品综合一区91 | 国产一区高清在线 | av 一区二区三区 | 日韩av一区二区在线播放 | 综合天堂av久久久久久久 | 久久综合久久伊人 | 国产黄色av网站 | 97色国产| 在线免费观看黄色av | 亚洲黄a| 国产一级在线 | 日韩精品一区二区三区第95 | aa级黄色大片 | 在线看片中文字幕 | 一区二区三区av在线 | av综合网址 | 成人在线播放免费观看 | 黄色一集片 | 日韩高清久久 | 精品国产免费一区二区三区五区 | 成人亚洲欧美 | 日本激情视频中文字幕 | 91污视频在线| 欧美成年黄网站色视频 | 免费情趣视频 | 成人久久 | 中文字幕91视频 | 中文国产字幕 | 麻豆视频网址 | 一区二区在线电影 | 国产又黄又爽又猛视频日本 | 99热这里只有精品免费 | 国产成人一区二区三区 | 激情网色 | 久久看片网 | 黄色成年 | 国产精品久久久久久久久毛片 | 玖玖综合网 | 91成人免费电影 | 国产精品久久久久久久免费大片 | 狠狠色丁香婷婷综合橹88 | www.五月天婷婷 | 久久 精品一区 | 欧美在线视频日韩 | 日韩高清 一区 | 99视频免费看 | 国产亚洲精品久久久久动 | 国产精品2019 | 色久五月| 99亚洲国产 | 国产91精品在线观看 | 777奇米四色| 亚洲国产成人精品在线观看 | 久草在线资源观看 | 不卡av在线 | 免费观看一级 | 久草免费色站 | 中文字幕在线影院 | 97超碰中文 | 五月天电影免费在线观看一区 | 国产精品 亚洲精品 | 91丨九色丨蝌蚪丨老版 | 美女福利视频在线 | 亚洲最大av网站 | 亚洲1区在线 | 久久久亚洲国产精品麻豆综合天堂 | 麻豆视频免费入口 | 国产一区二区免费在线观看 | 国产小视频你懂的在线 | 免费成人av在线看 | 久久小视频| 久久在线视频在线 | 在线免费黄网站 | 免费在线视频一区二区 | 成人午夜网| 久久久久久福利 | 国产精品h在线观看 | 欧美,日韩 | 在线观看黄污 | 青青视频一区 | 伊人资源站 | 又湿又紧又大又爽a视频国产 | 精品久久久久国产 | 美女视频久久黄 | 成人av高清在线 | 欧美99热 | 五月婷婷久久综合 | 国产精品久久久久久69 | 成人国产一区 | www.在线观看av | 麻豆成人精品 | www.午夜视频 | 久久久99精品免费观看乱色 | 午夜精品麻豆 | 欧美91片| 91在线观看高清 | 成人国产精品电影 | 亚洲精品自在在线观看 | 婷婷精品国产欧美精品亚洲人人爽 | 婷婷六月综合网 | 一区二区三区免费网站 | 久久字幕精品一区 | 国产热re99久久6国产精品 | 六月丁香社区 | 精品国产视频在线 | 欧美日产一区 | 精品视频999 | 国产视频精品网 | 久福利| 男女拍拍免费视频 | 免费成人av网站 | 操操操日日日干干干 | 国产麻豆剧传媒免费观看 | 亚洲高清色综合 | 超碰97成人| 九九精品毛片 | 久久久久久久久久久黄色 | 奇米先锋 | 男女男视频 | 色婷婷综合在线 | 亚洲人成在线观看 | 亚洲成av人片在线观看香蕉 | 色www免费视频 | 午夜 免费 | 毛片基地黄久久久久久天堂 | 午夜精品福利一区二区三区蜜桃 | 狠狠色丁香婷婷综合 | 亚洲精品天天 | 国产精品小视频网站 | 亚洲一区二区三区miaa149 | 中文字幕亚洲在线观看 | av免费看看| 亚洲国产精品日韩 | 天天干夜夜操视频 | 国产香蕉97碰碰久久人人 | 久久精品视频网站 | 国产成人精品在线观看 | 成年人在线视频观看 | 亚洲视频一区二区三区在线观看 | 在线看av的网址 | 黄色成人影视 | 国产电影黄色av | 欧美 日韩 性 | 韩日视频在线 | 久久久久在线视频 | av电影免费观看 | 天天色棕合合合合合合 | 啪啪肉肉污av国网站 | 91九色蝌蚪视频网站 | 亚洲精品男人天堂 | 最新中文字幕视频 | 久久久www成人免费毛片 | 欧美精品一区二区在线观看 | 久久久久国产一区二区三区四区 | 久久免费视频这里只有精品 | 在线观看黄av | 中文字幕av最新 | 99热在线观看 | 久九视频 | 亚洲最新av | 精品视频在线视频 | 97av精品| 92国产精品久久久久首页 | 伊人六月 | 992tv又爽又黄的免费视频 | 国产午夜影院 | 在线综合色 | 国产精品人成电影在线观看 | 日本福利视频在线 | 国产专区日韩专区 | 亚洲国产欧洲综合997久久, | 国产美女精品视频免费观看 | 99视频在线精品国自产拍免费观看 | 日韩在线观看影院 | 精品亚洲一区二区三区 | 国产精品区在线观看 | 国产理论一区二区三区 | 在线观看亚洲电影 | 亚洲 欧美 另类人妖 | 91精品成人久久 | 亚洲高清久久久 | 国产视频欧美视频 | 欧美性色综合网 | 国内精品久久久久久 | 91理论电影| 国产精品va在线播放 | 狠狠操操操 | 黄色成人影视 | 黄色小说在线观看视频 | 国产精品久久一卡二卡 | 亚洲国产高清在线观看视频 | 国产精品免费观看久久 | 久久不卡免费视频 | mm1313亚洲精品国产 | 免费黄色在线网址 | 97在线观看免费观看 | 一区二区视频在线播放 | 7799av| 日韩在线一二三区 | 人人爽爽人人 | 国产一区二区三区久久久 | 亚洲视频网站在线观看 | 日韩高清精品一区二区 | 久久精品一区二区三区四区 | 97理论片 | 亚洲3级| 久久精品一级片 | 日韩欧美一区二区在线观看 | 中文字幕在线观看完整 | 黄色综合 | 四虎国产精品永久在线国在线 | 亚洲精品久久久久久久蜜桃 | 欧洲亚洲女同hd | 欧美日韩国产综合网 | 免费av片在线 | 99精品视频网站 | 国产精品一区二区av日韩在线 | 日韩精品一区二区三区外面 | 国产精品一区二区在线免费观看 | 国产精品女人久久久 | 久久a免费视频 | 国产亚洲成av片在线观看 | 久久久久这里只有精品 | 一区二区 精品 | 久久黄色免费视频 | 日韩av福利在线 | 久久精彩免费视频 | 亚洲国产最新 | 97视频免费 | 天天亚洲综合 | 免费99精品国产自在在线 | 草久在线 | 免费在线观看中文字幕 | 国产不卡在线视频 | 日韩欧美在线综合网 | 天天色综合久久 | 久久综合狠狠综合久久综合88 | 欧美 日韩 国产 成人 在线 | 国产录像在线观看 | www.天天干.com | 国产理论在线 | 久久字幕网 | 免费网址你懂的 | 成年人在线观看视频免费 | 日韩成人邪恶影片 | 亚洲精选视频免费看 | 国产女教师精品久久av | 久久国产精品一区二区三区四区 | 午夜视频在线观看网站 | 香蕉久久国产 | 国产精品白浆视频 | 97香蕉超级碰碰久久免费软件 | 久久精品网站免费观看 | 天天色播| 国产午夜亚洲精品 | 成人av电影在线 | 黄色av三级在线 | 中文字幕在线看片 | 国产精品毛片一区二区 | 亚洲免费视频观看 | 国产精品青草综合久久久久99 | .国产精品成人自产拍在线观看6 | 欧美巨大荫蒂茸毛毛人妖 | 在线观看色视频 | 欧美人交a欧美精品 | 超碰人人国产 | 99视频免费看 | 69热国产视频 | 狠狠色狠狠色综合系列 | 亚洲精品456在线播放第一页 | 91午夜精品 | 成人午夜影院 | 色com网 | 最近日本韩国中文字幕 | 91在线免费视频 | 久草99| 96精品高清视频在线观看软件特色 | 亚洲成人av电影在线 | 国产一区视频在线观看免费 | 天天天天天天天天操 | 黄色官网在线观看 | 五月av在线| 4438全国亚洲精品在线观看视频 | 国产一级一片免费播放放a 一区二区三区国产欧美 | 久久不射电影网 | www.色的| 婷婷在线综合 | 久草五月 | 久久久久免费精品视频 | 人人射人人爱 | 国产精品久久久久婷婷二区次 | 夜夜躁狠狠燥 | 色综合久久久久久中文网 | 国产手机在线播放 | 久久xxxx | 日韩欧美电影在线 | 国产精品一区二区在线观看 | 久久久精品国产一区二区电影四季 | 狠狠干网站 | 精品国产免费观看 | 黄色成人av在线 | 国产精品久久久久久爽爽爽 | 91在线文字幕 | 免费国产亚洲视频 | 国产精品久久久99 | 亚洲永久国产精品 | 日韩在线视频观看 | 欧美精品久久久久久久 | 一区二区不卡在线观看 | 亚州视频在线 | 国产美女在线免费观看 | 亚洲精品综合欧美二区变态 | 国产成人香蕉 |