[Hadoop] - 自定義Mapreduce InputFormatOutputFormat

  在MR程序的開發過程中,經常會遇到輸入數據不是HDFS或者數據輸出目的地不是HDFS的,MapReduce的設計已經考慮到這種情況,它為我們提供了兩個組建,只需要我們自定義適合的InputFormat和OutputFormat,就可以完成這個需求,這里簡單的介紹一個從MongoDB中讀數據,并寫出數據到MongoDB中的一種情況,只是一個Demo,所以數據隨便找的一個。


一、自定義InputFormat

  MapReduce中Map階段的數據輸入是由InputFormat決定的,我們查看org.apache.hadoop.mapreduce.InputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現InputFormat抽象類以外,我們還需要自定義InputSplit和自定義RecordReader類,這兩個類的主要作用分別是:split確定數據分片的大小以及數據的位置信息,recordReader具體的讀取數據。

public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 獲取Map階段的數據分片集合信息 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 創建具體的數據讀取對象
}

  1、自定義InputSplit

    自定義InputSplit主要需要實現的方法有一下幾個:

public abstract class InputSplit {  public abstract long getLength() throws IOException, InterruptedException; // 獲取當前分片的長度大小  public abstract String[] getLocations() throws IOException, InterruptedException; // 獲取當前分片的位置信息  
}

  2、自定義RecordReader

    自定義RecordReader的主要實現方法有一下幾個:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
  public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在構造函數中初始化了,那么該方法可以為空public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一個key/value,如果存在返回true。否則返回false。public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;  // 獲取當然keypublic abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;  // 獲取當然valuepublic abstract float getProgress() throws IOException, InterruptedException;  // 獲取進度信息public abstract void close() throws IOException; // 關閉資源
}

二、自定義OutputFormat

  MapReduce中Reducer階段的數據輸出是由OutputFormat決定的,決定數據的輸出目的地和job的提交對象,我們查看org.apache.hadoop.mapreduce.OutputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現OutputFormat抽象類以外,我們還需要自定義RecordWriter和自定義OutputCommitter類,其中OutputCommitter類由于不涉及到具體的輸出目的地,所以一般情況下,不用重寫,可直接使用FileOutputcommitter對象;RecordWriter類是具體的定義如何將數據寫到目的地的。

public abstract class OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取具體的數據寫出對象public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 檢查輸出配置信息是否正確public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取輸出job的提交者對象
}

  1、自定義RecordWriter

    查看RecordWriter源碼,我們可以看到主要需要實現的有下列三個方法,分別是:

public abstract class RecordWriter<K, V> {  public abstract void write(K key, V value) throws IOException, InterruptedException;  // 具體的寫數據的方法public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 關閉資源
}

三、詳細代碼

  自定義InputFormat&InputSplit

  1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.util.ArrayList;
  7 import java.util.List;
  8 import java.util.Map;
  9 
 10 import org.apache.hadoop.conf.Configurable;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.io.LongWritable;
 13 import org.apache.hadoop.io.Writable;
 14 import org.apache.hadoop.mapreduce.InputFormat;
 15 import org.apache.hadoop.mapreduce.InputSplit;
 16 import org.apache.hadoop.mapreduce.JobContext;
 17 import org.apache.hadoop.mapreduce.MRJobConfig;
 18 import org.apache.hadoop.mapreduce.RecordReader;
 19 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 20 import org.apache.log4j.Logger;
 21 
 22 import com.mongodb.BasicDBObject;
 23 import com.mongodb.BasicDBObjectBuilder;
 24 import com.mongodb.DB;
 25 import com.mongodb.DBCollection;
 26 import com.mongodb.DBObject;
 27 import com.mongodb.Mongo;
 28 import com.mongodb.MongoException;
 29 
 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable {
 31     private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class);
 32 
 33     /**
 34      * 空的對象,主要作用是不進行任何操作,類似于NullWritable
 35      */
 36     public static class NullMongoDBWritable implements MongoDBWritable, Writable {
 37         @Override
 38         public void write(DBCollection collection) throws MongoException {
 39             // TODO Auto-generated method stub
 40         }
 41 
 42         @Override
 43         public void readFields(DBObject object) throws MongoException {
 44             // TODO Auto-generated method stub
 45         }
 46 
 47         @Override
 48         public void write(DataOutput out) throws IOException {
 49             // TODO Auto-generated method stub
 50         }
 51 
 52         @Override
 53         public void readFields(DataInput in) throws IOException {
 54             // TODO Auto-generated method stub
 55         }
 56 
 57         @Override
 58         public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
 59             // TODO Auto-generated method stub
 60             return old;
 61         }
 62 
 63     }
 64 
 65     /**
 66      * MongoDB的input split類
 67      */
 68     public static class MongoDBInputSplit extends InputSplit implements Writable {
 69         private long end = 0;
 70         private long start = 0;
 71 
 72         /**
 73          * 默認構造方法
 74          */
 75         public MongoDBInputSplit() {
 76         }
 77 
 78         /**
 79          * 便利的構造方法
 80          * 
 81          * @param start
 82          *            集合中查詢的文檔開始行號
 83          * @param end
 84          *            集合中查詢的文檔結束行號
 85          */
 86         public MongoDBInputSplit(long start, long end) {
 87             this.start = start;
 88             this.end = end;
 89         }
 90 
 91         public long getEnd() {
 92             return end;
 93         }
 94 
 95         public long getStart() {
 96             return start;
 97         }
 98 
 99         @Override
100         public void write(DataOutput out) throws IOException {
101             out.writeLong(this.start);
102             out.writeLong(this.end);
103         }
104 
105         @Override
106         public void readFields(DataInput in) throws IOException {
107             this.start = in.readLong();
108             this.end = in.readLong();
109         }
110 
111         @Override
112         public long getLength() throws IOException, InterruptedException {
113             // 分片大小
114             return this.end - this.start;
115         }
116 
117         @Override
118         public String[] getLocations() throws IOException, InterruptedException {
119             // TODO 返回一個空的數組,表示不進行數據本地化的優化,那么map執行節點隨機選擇。
120             return new String[] {};
121         }
122 
123     }
124 
125     protected MongoDBConfiguration mongoConfiguration; // mongo相關配置信息
126     protected Mongo mongo; // mongo連接
127     protected String databaseName; // 連接的數據庫名稱
128     protected String collectionName; // 連接的集合名稱
129     protected DBObject conditionQuery; // 選擇條件
130     protected DBObject fieldQuery; // 需要的字段條件
131 
132     @Override
133     public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
134         DBCollection dbCollection = null;
135         try {
136             dbCollection = this.getDBCollection();
137             // 獲取數量大小
138             long count = dbCollection.count(this.getConditionQuery());
139             int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
140             long chunkSize = (count / chunks); // 分片數量
141 
142             // 開始分片,只是簡單的分配每個分片的數據量
143             List<InputSplit> splits = new ArrayList<InputSplit>();
144             for (int i = 0; i < chunks; i++) {
145                 MongoDBInputSplit split = null;
146                 if ((i + 1) == chunks) {
147                     split = new MongoDBInputSplit(i * chunkSize, count);
148                 } else {
149                     split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize);
150                 }
151                 splits.add(split);
152             }
153             return splits;
154         } catch (Exception e) {
155             throw new IOException(e);
156         } finally {
157             dbCollection = null;
158             closeConnection(); // 關閉資源的連接
159         }
160     }
161 
162     @Override
163     public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
164         return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
165     }
166 
167     protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) {
168         // 獲取從mongodb中讀取數據需要轉換成的value class,默認為NullMongoDBWritable
169         Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass();
170         return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
171     }
172 
173     @Override
174     public void setConf(Configuration conf) {
175         mongoConfiguration = new MongoDBConfiguration(conf);
176         databaseName = this.mongoConfiguration.getInputDatabaseName(); // 輸入數據的數據庫
177         collectionName = this.mongoConfiguration.getInputCollectionName(); // 輸入數據的集合
178         getMongo(); // 初始化
179         getConditionQuery(); // 初始化
180         getFieldQuery(); // 初始化
181     }
182 
183     @Override
184     public Configuration getConf() {
185         return this.mongoConfiguration.getConfiguration();
186     }
187 
188     public Mongo getMongo() {
189         try {
190             if (null == this.mongo) {
191                 this.mongo = this.mongoConfiguration.getMongoConnection();
192             }
193         } catch (Exception e) {
194             throw new RuntimeException(e);
195         }
196         return mongo;
197     }
198 
199     public DBObject getConditionQuery() {
200         if (null == this.conditionQuery) {
201             Map<String, String> conditions = this.mongoConfiguration.getInputConditions();
202             BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
203             for (Map.Entry<String, String> entry : conditions.entrySet()) {
204                 if (entry.getValue() != null) {
205                     builder.append(entry.getKey(), entry.getValue());
206                 } else {
207                     builder.push(entry.getKey());
208                 }
209             }
210             if (builder.isEmpty()) {
211                 this.conditionQuery = new BasicDBObject();
212             } else {
213                 this.conditionQuery = builder.get();
214             }
215         }
216         return this.conditionQuery;
217     }
218 
219     public DBObject getFieldQuery() {
220         if (fieldQuery == null) {
221             String[] fields = this.mongoConfiguration.getInputFieldNames();
222             if (fields != null && fields.length > 0) {
223                 BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
224                 for (String field : fields) {
225                     builder.push(field);
226                 }
227                 fieldQuery = builder.get();
228             } else {
229                 fieldQuery = new BasicDBObject();
230             }
231         }
232         return fieldQuery;
233     }
234 
235     protected DBCollection getDBCollection() {
236         DB db = getMongo().getDB(this.databaseName);
237         if (this.mongoConfiguration.isEnableAuth()) {
238             String username = this.mongoConfiguration.getUsername();
239             String password = this.mongoConfiguration.getPassword();
240             if (!db.authenticate(username, password.toCharArray())) {
241                 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password);
242             }
243         }
244         return db.getCollection(collectionName);
245     }
246 
247     protected void closeConnection() {
248         try {
249             if (null != this.mongo) {
250                 this.mongo.close();
251                 this.mongo = null;
252             }
253         } catch (Exception e) {
254             LOG.debug("Exception on close", e);
255         }
256     }
257 }
MongoDBInputFormat.java

  自定義RecordReader

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {private Class<? extends MongoDBWritable> valueClass;private LongWritable key;private T value;private long pos;private Configuration conf;private MongoDBInputFormat.MongoDBInputSplit split;private DBCollection collection;private DBObject conditionQuery;private DBObject fieldQuery;private DBCursor cursor;public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,DBObject fieldQuery) {this.split = split;this.valueClass = valueClass;this.collection = collection;this.conditionQuery = conditionQuery;this.fieldQuery = fieldQuery;this.conf = conf;}@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// do nothing
    }@SuppressWarnings("unchecked")@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (key == null) {key = new LongWritable();}if (value == null) {value = (T) ReflectionUtils.newInstance(valueClass, conf);}if (null == cursor) {cursor = executeQuery();}if (!cursor.hasNext()) {return false;}key.set(pos + split.getStart()); // 設置keyvalue.readFields(cursor.next()); // 設置valuepos++;} catch (Exception e) {throw new IOException("Exception in nextKeyValue", e);}return true;}protected DBCursor executeQuery() {try {return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());} catch (IOException | InterruptedException e) {throw new RuntimeException(e);}}@Overridepublic LongWritable getCurrentKey() throws IOException, InterruptedException {return this.key;}@Overridepublic T getCurrentValue() throws IOException, InterruptedException {return this.value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return pos;}@Overridepublic void close() throws IOException {if (collection != null) {collection.getDB().getMongo().close();}}}
MongoDBRecordReader.java

  自定義OutputFormat&RecordWriter

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);/*** A RecordWriter that writes the reduce output to a MongoDB collection* * @param <K>* @param <T>*/public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {private Mongo mongo;private String databaseName;private String collectionName;private MongoDBConfiguration dbConf;private DBCollection dbCollection;private DBObject dbObject;private boolean enableFetchMethod;public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {this.mongo = mongo;this.databaseName = databaseName;this.collectionName = collectionName;this.dbConf = dbConf;this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();getDbCollection();// 創建連接
        }protected DBCollection getDbCollection() {if (null == this.dbCollection) {DB db = this.mongo.getDB(this.databaseName);if (this.dbConf.isEnableAuth()) {String username = this.dbConf.getUsername();String password = this.dbConf.getPassword();if (!db.authenticate(username, password.toCharArray())) {throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);}}this.dbCollection = db.getCollection(this.collectionName);}return this.dbCollection;}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {if (this.enableFetchMethod) {this.dbObject = key.fetchWriteDBObject(null);this.dbObject = value.fetchWriteDBObject(this.dbObject);// 寫數據this.dbCollection.insert(this.dbObject);// 在這里可以做一個緩存,一起提交,如果數據量大的情況下。this.dbObject = null;} else {// 直接調用寫方法
                key.write(dbCollection);value.write(dbCollection);}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (this.mongo != null) {this.dbCollection = null;this.mongo.close();}}}@Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {try {MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());String databaseName = dbConf.getOutputDatabaseName();String collectionName = dbConf.getOutputCollectionName();Mongo mongo = dbConf.getMongoConnection();return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);} catch (Exception e) {LOG.error("Create the record writer occur exception.", e);throw new IOException(e);}}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {// 不進行檢測
    }@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {// 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我們這里直接使用FileOutputCommitterreturn new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);}/*** 設置output屬性* * @param job* @param databaseName* @param collectionName*/public static void setOutput(Job job, String databaseName, String collectionName) {job.setOutputFormatClass(MongoDBOutputFormat.class);job.setReduceSpeculativeExecution(false);MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());mdc.setOutputCollectionName(collectionName);mdc.setOutputDatabaseName(databaseName);}/*** 靜止使用fetch方法* * @param conf*/public static void disableFetchMethod(Configuration conf) {conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);}
}
MongoDBOutputFormat.java

  其他涉及到的java代碼

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress;public class MongoDBConfiguration {public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";// 在recordwriter中到底是否調用fetch方法,默認調用。如果設置為不調用,那么就直接使用writer方法public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";private Configuration conf;public MongoDBConfiguration(Configuration conf) {this.conf = conf;}/*** 獲取Configuration對象* * @return*/public Configuration getConfiguration() {return this.conf;}/*** 設置連接信息* * @param host* @param port* @return*/public MongoDBConfiguration configureDB(String host, int port) {return this.configureDB(host, port, false, null, null);}/*** 設置連接信息* * @param host* @param port* @param enableAuth* @param username* @param password* @return*/public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {this.conf.set(BIND_HOST_PROPERTY, host);this.conf.setInt(BIND_PORT_PROPERTY, port);if (enableAuth) {this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);this.conf.set(USERNAME_PROPERTY, username);this.conf.set(PASSWORD_PROPERTY, password);}return this;}/*** 獲取MongoDB的連接對象Connection對象* * @return* @throws UnknownHostException*/public Mongo getMongoConnection() throws UnknownHostException {return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));}/*** 獲取設置的host* * @return*/public String getBindHost() {return this.conf.get(BIND_HOST_PROPERTY, "localhost");}/*** 獲取設置的port* * @return*/public int getBindPort() {return this.conf.getInt(BIND_PORT_PROPERTY, 27017);}/*** 獲取是否開啟安全驗證,默認的Mongodb是不開啟的。* * @return*/public boolean isEnableAuth() {return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);}/*** 獲取完全驗證所需要的用戶名* * @return*/public String getUsername() {return this.conf.get(USERNAME_PROPERTY);}/*** 獲取安全驗證所需要的密碼* * @return*/public String getPassword() {return this.conf.get(PASSWORD_PROPERTY);}public String getPartition() {return conf.get(PARTITION_PROPERTY, "|");}public MongoDBConfiguration setPartition(String partition) {conf.set(PARTITION_PROPERTY, partition);return this;}public String getInputDatabaseName() {return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setInputDatabaseName(String databaseName) {conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getInputCollectionName() {return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");}public void setInputCollectionName(String tableName) {conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);}public String[] getInputFieldNames() {return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);}public void setInputFieldNames(String... fieldNames) {conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);}public Map<String, String> getInputConditions() {Map<String, String> result = new HashMap<String, String>();String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);if (conditions != null && conditions.length > 0) {String partition = this.getPartition();String[] values = null;for (String condition : conditions) {values = condition.split(partition);if (values != null && values.length == 2) {result.put(values[0], values[1]);} else {result.put(condition, null);}}}return result;}public void setInputConditions(Map<String, String> conditions) {if (conditions != null && conditions.size() > 0) {String[] values = new String[conditions.size()];String partition = this.getPartition();int k = 0;for (Map.Entry<String, String> entry : conditions.entrySet()) {if (entry.getValue() != null) {values[k++] = entry.getKey() + partition + entry.getValue();} else {values[k++] = entry.getKey();}}conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);}}public Class<? extends MongoDBWritable> getValueClass() {return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);}public void setInputClass(Class<? extends DBWritable> inputClass) {conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);}public String getOutputDatabaseName() {return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setOutputDatabaseName(String databaseName) {conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getOutputCollectionName() {return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");}public void setOutputCollectionName(String tableName) {conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);}public boolean isEnableUseFetchMethod() {return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);}public void setOutputUseFetchMethod(boolean useFetchMethod) {conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);}
}
MongoDBConfiguration.java
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public interface MongoDBWritable {/*** 往mongodb的集合中寫數據* * @param collection* @throws MongoException*/public void write(DBCollection collection) throws MongoException;/*** 獲取要寫的mongoDB對象* * @param old* @return* @throws MongoException*/public DBObject fetchWriteDBObject(DBObject old) throws MongoException;/*** 從mongodb的集合中讀數據* * @param collection* @throws MongoException*/public void readFields(DBObject object) throws MongoException;
}
MongoDBWritable.java
package com.gerry.mongo.hadoop2x.mr.mongodb.nw;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public class Demo {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// 設置輸入的mongodb的數據庫和集合,以及對應的輸入對象value,這里的數據庫和集合要求存在,否則是沒有數據的,當然沒有數據不會出問題conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);Job job = Job.getInstance(conf, "mongodb-demo");job.setJarByClass(Demo.class);job.setMapperClass(DemoMapper.class);job.setReducerClass(DemoReducer.class);job.setOutputKeyClass(DemoInputValueAndOutputKey.class);job.setOutputValueClass(DemoOutputValue.class);job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);job.setMapOutputValueClass(NullWritable.class);job.setInputFormatClass(MongoDBInputFormat.class);MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 這個可以不存在
job.waitForCompletion(true);}public static class DemoOutputValue implements Writable, MongoDBWritable {private Date clientTime;private long count;@Overridepublic void write(DBCollection collection) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加當前對象的value值,如果存在同樣的key,那么加序號builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);return builder.get();}@Overridepublic void readFields(DBObject object) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.clientTime.getTime());out.writeLong(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.clientTime = new Date(in.readLong());this.count = in.readLong();}public Date getClientTime() {return clientTime;}public void setClientTime(Date clientTime) {this.clientTime = clientTime;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}}public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {private String name;private Integer age;private String sex;@Overridepublic void write(DataOutput out) throws IOException {if (this.name == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.name);}if (this.age == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeInt(this.age);}if (this.sex == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.sex);}}@Overridepublic void readFields(DataInput in) throws IOException {this.name = in.readBoolean() ? in.readUTF() : null;this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;this.sex = in.readBoolean() ? in.readUTF() : null;}@Overridepublic void write(DBCollection collection) throws MongoException {DBObject object = new BasicDBObject();object.put("name", this.name);object.put("age", this.age.intValue());object.put("sex", this.sex);collection.insert(object);}@Overridepublic void readFields(DBObject object) throws MongoException {this.name = (String) object.get("name");this.age = (Integer) object.get("age");this.sex = (String) object.get("sex");}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加當前對象的value值,如果存在同樣的key,那么加序號if (this.name != null) {builder.append(getKey(keys, "name", 0), this.name);}if (this.age != null) {builder.append(getKey(keys, "age", 0), this.age.intValue());}if (this.sex != null) {builder.append(getKey(keys, "sex", 0), this.sex);}return builder.get();}@Overridepublic String toString() {return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";}@Overridepublic int compareTo(DemoInputValueAndOutputKey o) {int tmp;if (this.name == null) {if (o.name != null) {return -1;}} else if (o.name == null) {return 1;} else {tmp = this.name.compareTo(o.name);if (tmp != 0) {return tmp;}}if (this.age == null) {if (o.age != null) {return -1;}} else if (o.age == null) {return 1;} else {tmp = this.age - o.age;if (tmp != 0) {return tmp;}}if (this.sex == null) {if (o.sex != null) {return -1;}} else if (o.sex == null) {return 1;} else {return this.sex.compareTo(o.sex);}return 0;}}/*** 直接輸出* * @author jsliuming* */public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {@Overrideprotected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}/*** 寫出數據,只做一個統計操作* * @author jsliuming* */public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {private DemoOutputValue outputValue = new DemoOutputValue();@Overrideprotected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (@SuppressWarnings("unused")NullWritable value : values) {sum++;}outputValue.setClientTime(new Date());outputValue.setCount(sum);context.write(key, outputValue);}}/*** 轉換key,作用是當key存在keys集合中的時候,在key后面添加序號* * @param keys* @param key* @param index* @return*/public static String getKey(Set<String> keys, String key, int index) {while (keys.contains(key)) {key = key + (index++);}return key;}
}
Demo

四、結果截圖

?

轉載于:https://www.cnblogs.com/liuming1992/p/4758504.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/258450.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/258450.shtml
英文地址,請注明出處:http://en.pswp.cn/news/258450.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PS 色調——老照片效果

這就是通過調色使照片顯得發黃。 R_new0.393*R0.769*G0.189*B; G_new0.349*R0.686*G0.168*B; B_new0.272*R0.534*G0.131*B; clc; clear all; Imageimread(9.jpg); Imagedouble(Image); Image_newImage; Image_new(:,:,1)0.393*Image(:,:,1)0.769*Image(:,:,2)0.189*Image(:,:,3…

jsp出現錯誤

昨天在調試頁面時發生了如圖顯示的異常&#xff0c;它出現的原因是當<jsp:forward>或<jsp:include>標簽沒有參數時&#xff0c;開始標簽和結束標簽</jsp:forward>或</jsp:include>之間不能有空格&#xff0c;不能換行。解決辦法&#xff1a;刪除標簽之…

門限回歸模型的思想_Stata+R:門檻回歸教程

來源 | 數量經濟學綜合整理轉載請聯系進行回歸分析&#xff0c;一般需要研究系數的估計值是否穩定。很多經濟變量都存在結構突變問題&#xff0c;使用普通回歸的做法就是確定結構突變點&#xff0c;進行分段回歸。這就像我們高中學習的分段函數。但是對于大樣本、面板數據如何尋…

【數論】[CF258C]Little elephant and LCM

題目 分析&#xff1a;枚舉最大數&#xff0c;然后找出它所有因數p1…….pk&#xff0c; 從中任意選取一些數&#xff0c;這些數的LCM|這個數且&#xff0c;這些數的最大LCM就是枚舉的這個數&#xff0c;且若pi<aj<pi1則前i個數可以放在j這個位置&#xff0c;即j這個位置…

為普通Object添加類似AttachedProperty的屬性

為普通Object添加類似AttachedProperty的屬性 周銀輝 我們知道&#xff0c;在WPF中對應一個DependencyObject&#xff0c;我們很容易通過AttachedProperty來為類型附加一個屬性。但對于普通的Object而言&#xff0c;這就不可行了。 我現在遇到這樣一個問題&#xff0c;下面有一…

python 操作RabbitMQ

pip install pika使用API操作RabbitMQ基于Queue實現生產者消費者模型View Code 對于RabbitMQ來說&#xff0c;生產和消費不再針對內存里的一個Queue對象&#xff0c;而是某臺服務器上的RabbitMQ Server實現的消息隊列。#!/usr/bin/env python import pika# ###################…

python和嵌入式哪個容易_嵌入式與python選哪個

從概念上來說&#xff0c;嵌入式和Python的區別還是比較明顯的&#xff0c;嵌入式是一個開發領域&#xff0c;而Python則是一門編程語言。嵌入式開發是開發領域的一個重要分支&#xff0c;是物聯網領域技術的重要組成部分&#xff0c;可以說有物聯網的地方就離不開嵌入式開發。…

二階傳遞函數的推導及幾種求解方法的比較

二階系統是指那些可用二階微分方程描述的系統&#xff0c;其電路形式是由兩個獨立動態元器件組成的電路。 二階系統電路包括二階低通電路、二階高通電路、二階帶通電路和二階帶阻電路。 下面分別給出以上二階系統傳遞函數的推導過程&#xff0c;并以二階低通電路的沖激響應為例…

前端技術-調試工具(上)

頁面制作之調試工具 常用的調試工具有Chrome瀏覽器的調試工具&#xff0c;火狐瀏覽器的Firebug插件調試工具&#xff0c;IE的開發人員工具等。它們的功能與使用方法大致相似。Chrome瀏覽器簡潔快速&#xff0c;功能強大這里主要介紹Chrome瀏覽器的調試工具。 打開 Google Chrom…

新版Microsoft Edge支持跨平臺跨設備瀏覽

之前一直使用Google Chrome瀏覽器&#xff0c;可以隨意安裝插件擴展程序&#xff0c;無廣告&#xff0c;這是我鐘愛她的原因。但是之后不能登錄Google賬號&#xff0c;不能實現跨設備應用&#xff0c;就想找一款好用的替代品&#xff0c;近期發現了新版的Microsoft Edge&#x…

BZOJ1050 [HAOI2006]旅行

Description 給你一個無向圖&#xff0c;N(N<500)個頂點, M(M<5000)條邊&#xff0c;每條邊有一個權值Vi(Vi<30000)。給你兩個頂點S和T &#xff0c;求一條路徑&#xff0c;使得路徑上最大邊和最小邊的比值最小。如果S和T之間沒有路徑&#xff0c;輸出”IMPOSSIBLE”&…

biosrecovery什么意思_BIOS中的每個中文是什么意思

BIOS中的每個中文是什么意思&#xff0c;請對照的翻譯一下Time/System Time時間/系統時間Date/System Date日期/系統日期Level 2 Cache二級緩存System Memory系統內存Video Controller視頻控制器Panel Type液晶屏型號Audio Controller音頻控制器Modem Controller調制解調器(Mod…

百度網盤7.3.1.10版本增加工作空間功能,可實現百度網盤與電腦文件夾同步

百度網盤新增的工作空間是一款文件同步的產品&#xff0c;支持電腦本地與云端之間的文件同步&#xff0c;多設備間文件自動保持同步、支持查看文件每次都修改的歷史版本。功能類似于onedrive。如果有同步需求的小伙伴可以嘗試下載最新版的百度網盤試用該功能哦。下載網址&#…

ubuntu+idea intellij配置android開發環境

最近對移動開發產生興趣&#xff0c;決定在未來幾年內利用空余時間開發一些app或游戲什么的&#xff0c;鑒于ios開發成本較高&#xff0c;且自身對java相對熟悉&#xff0c;因此選擇了學習android。都說android市場不很很好&#xff0c;收益較難&#xff0c;但是仍覺得只要功夫…

typeof的用法

typeof可以返回變量的類型&#xff0c;返回值為字符串&#xff0c;其值有 "undefined" "boolean" "string" "number" "object" "function" 而 typeof(null)會返回object 轉載于:https://www.cnblogs.com/lhyhappy…

opencv 最大連通域_opencv 查找連通區域 最大面積實例

今天在弄一個查找連通的最大面積的問題。要把圖像弄成黑底&#xff0c;白字&#xff0c;這樣才可以正確找到。然后調用下邊的方法&#xff1a;RETR_CCOMP:提取所有輪廓&#xff0c;并將輪廓組織成雙層結構(two-level hierarchy),頂層為連通域的外圍邊界&#xff0c;次層位內層邊…

JS 函數柯里化

在計算機科學中&#xff0c;柯里化是把接受多個參數的函數變換成接受一個單一參數&#xff08;最初函數的第一個參數&#xff09;的函數&#xff0c;并且返回接受余下的參數而且返回結果的新函數的技術。——詳見 維基百科柯里化就是預先將某些參數傳入&#xff0c;得到一個簡單…

LTI系統的物理可實現性與希爾伯特變換

產品的設計一般為線性時不變系統&#xff0c;要求系統具有物理可實現性&#xff0c;從時域上看&#xff0c;h(t)具有因果性&#xff1b;從頻域上看&#xff0c;|H(jw)|符合佩利—維納準則。任何具有因果性的系統&#xff0c;|H(jw)|的實部R(w)滿足希爾伯特變換&#xff0c;|H(j…

垂死掙扎還是涅槃重生 -- Delphi XE5 公布會歸來感想

Delphi 是一個基本上被我遺忘的工具&#xff0c; 要不是在使用RapidSql , 我是收不到Embarcadero 公司發出的邀請來參加Delphi XE5的公布會的。 有人可能要問為什么是Embarcadero &#xff08;名稱很拗口&#xff09;而不是Borland 開Delphi 公布會&#xff0c; 這是由于Borla…

iOS Appstore 版本更新

1&#xff0c;版本更新 通過比較構建號/版本號 檢查更新 /// 構建號 50 // NSString * currentVersion [NSBundle mainBundle].infoDictionary["CFBundleVersion"];/// 版本號 2.2.0//CFBundleShortVersionStringNSString * currentVersion [NSBundle mainBund…