View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.UnsupportedEncodingException;
22  import java.net.URLDecoder;
23  import java.net.URLEncoder;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  import java.util.UUID;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValueUtil;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.RegionLocator;
49  import org.apache.hadoop.hbase.client.Table;
50  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
51  import org.apache.hadoop.hbase.io.compress.Compression;
52  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
53  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
54  import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
55  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
56  import org.apache.hadoop.hbase.io.hfile.HFile;
57  import org.apache.hadoop.hbase.io.hfile.HFileContext;
58  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
59  import org.apache.hadoop.hbase.regionserver.BloomType;
60  import org.apache.hadoop.hbase.regionserver.HStore;
61  import org.apache.hadoop.hbase.regionserver.StoreFile;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.io.NullWritable;
64  import org.apache.hadoop.io.SequenceFile;
65  import org.apache.hadoop.io.Text;
66  import org.apache.hadoop.mapreduce.Job;
67  import org.apache.hadoop.mapreduce.OutputCommitter;
68  import org.apache.hadoop.mapreduce.OutputFormat;
69  import org.apache.hadoop.mapreduce.RecordWriter;
70  import org.apache.hadoop.mapreduce.TaskAttemptContext;
71  import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
72  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
73  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
74  
75  import com.google.common.annotations.VisibleForTesting;
76  
77  /**
78   * Writes HFiles. Passed Cells must arrive in order.
79   * Writes current time as the sequence id for the file. Sets the major compacted
80   * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
81   * all HFiles being written.
82   * <p>
83   * Using this class as part of a MapReduce job is best done
84   * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class HFileOutputFormat2
89      extends FileOutputFormat<ImmutableBytesWritable, Cell> {
90    private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
91  
92    // The following constants are private since these are used by
93    // HFileOutputFormat2 to internally transfer data between job setup and
94    // reducer run using conf.
95    // These should not be changed by the client.
96    private static final String COMPRESSION_FAMILIES_CONF_KEY =
97        "hbase.hfileoutputformat.families.compression";
98    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
99        "hbase.hfileoutputformat.families.bloomtype";
100   private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
101       "hbase.mapreduce.hfileoutputformat.blocksize";
102   private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
103       "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
104 
105   // This constant is public since the client can modify this when setting
106   // up their conf object and thus refer to this symbol.
107   // It is present for backwards compatibility reasons. Use it only to
108   // override the auto-detection of datablock encoding.
109   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
110       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
111 
112   @Override
113   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
114       final TaskAttemptContext context) throws IOException, InterruptedException {
115     return createRecordWriter(context, this.getOutputCommitter(context));
116   }
117 
118   static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
119       createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
120           throws IOException {
121 
122     // Get the path of the temporary output file
123     final Path outputdir = ((FileOutputCommitter) committer).getWorkPath();
124     final Configuration conf = context.getConfiguration();
125     final FileSystem fs = outputdir.getFileSystem(conf);
126     // These configs. are from hbase-*.xml
127     final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
128         HConstants.DEFAULT_MAX_FILE_SIZE);
129     // Invented config.  Add to hbase-*.xml if other than default compression.
130     final String defaultCompressionStr = conf.get("hfile.compression",
131         Compression.Algorithm.NONE.getName());
132     final Algorithm defaultCompression = AbstractHFileWriter
133         .compressionByName(defaultCompressionStr);
134     final boolean compactionExclude = conf.getBoolean(
135         "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
136 
137     // create a map from column family to the compression algorithm
138     final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
139     final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
140     final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
141 
142     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
143     final Map<byte[], DataBlockEncoding> datablockEncodingMap
144         = createFamilyDataBlockEncodingMap(conf);
145     final DataBlockEncoding overriddenEncoding;
146     if (dataBlockEncodingStr != null) {
147       overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
148     } else {
149       overriddenEncoding = null;
150     }
151 
152     return new RecordWriter<ImmutableBytesWritable, V>() {
153       // Map of families to writers and how much has been output on the writer.
154       private final Map<byte [], WriterLength> writers =
155         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
156       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
157       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
158       private boolean rollRequested = false;
159 
160       @Override
161       public void write(ImmutableBytesWritable row, V cell)
162           throws IOException {
163         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
164 
165         // null input == user explicitly wants to flush
166         if (row == null && kv == null) {
167           rollWriters();
168           return;
169         }
170 
171         byte [] rowKey = CellUtil.cloneRow(kv);
172         long length = kv.getLength();
173         byte [] family = CellUtil.cloneFamily(kv);
174         WriterLength wl = this.writers.get(family);
175 
176         // If this is a new column family, verify that the directory exists
177         if (wl == null) {
178           fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
179         }
180 
181         // If any of the HFiles for the column families has reached
182         // maxsize, we need to roll all the writers
183         if (wl != null && wl.written + length >= maxsize) {
184           this.rollRequested = true;
185         }
186 
187         // This can only happen once a row is finished though
188         if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
189           rollWriters();
190         }
191 
192         // create a new WAL writer, if necessary
193         if (wl == null || wl.writer == null) {
194           wl = getNewWriter(family, conf);
195         }
196 
197         // we now have the proper WAL writer. full steam ahead
198         kv.updateLatestStamp(this.now);
199         wl.writer.append(kv);
200         wl.written += length;
201 
202         // Copy the row so we know when a row transition.
203         this.previousRow = rowKey;
204       }
205 
206       private void rollWriters() throws IOException {
207         for (WriterLength wl : this.writers.values()) {
208           if (wl.writer != null) {
209             LOG.info("Writer=" + wl.writer.getPath() +
210                 ((wl.written == 0)? "": ", wrote=" + wl.written));
211             close(wl.writer);
212           }
213           wl.writer = null;
214           wl.written = 0;
215         }
216         this.rollRequested = false;
217       }
218 
219       /* Create a new StoreFile.Writer.
220        * @param family
221        * @return A WriterLength, containing a new StoreFile.Writer.
222        * @throws IOException
223        */
224       @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
225           justification="Not important")
226       private WriterLength getNewWriter(byte[] family, Configuration conf)
227           throws IOException {
228         WriterLength wl = new WriterLength();
229         Path familydir = new Path(outputdir, Bytes.toString(family));
230         Algorithm compression = compressionMap.get(family);
231         compression = compression == null ? defaultCompression : compression;
232         BloomType bloomType = bloomTypeMap.get(family);
233         bloomType = bloomType == null ? BloomType.NONE : bloomType;
234         Integer blockSize = blockSizeMap.get(family);
235         blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
236         DataBlockEncoding encoding = overriddenEncoding;
237         encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
238         encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
239         Configuration tempConf = new Configuration(conf);
240         tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
241         HFileContextBuilder contextBuilder = new HFileContextBuilder()
242                                     .withCompression(compression)
243                                     .withChecksumType(HStore.getChecksumType(conf))
244                                     .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
245                                     .withBlockSize(blockSize);
246         contextBuilder.withDataBlockEncoding(encoding);
247         HFileContext hFileContext = contextBuilder.build();
248                                     
249         wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
250             .withOutputDir(familydir).withBloomType(bloomType)
251             .withComparator(KeyValue.COMPARATOR)
252             .withFileContext(hFileContext).build();
253 
254         this.writers.put(family, wl);
255         return wl;
256       }
257 
258       private void close(final StoreFile.Writer w) throws IOException {
259         if (w != null) {
260           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
261               Bytes.toBytes(System.currentTimeMillis()));
262           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
263               Bytes.toBytes(context.getTaskAttemptID().toString()));
264           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
265               Bytes.toBytes(true));
266           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
267               Bytes.toBytes(compactionExclude));
268           w.appendTrackedTimestampsToMetadata();
269           w.close();
270         }
271       }
272 
273       @Override
274       public void close(TaskAttemptContext c)
275       throws IOException, InterruptedException {
276         for (WriterLength wl: this.writers.values()) {
277           close(wl.writer);
278         }
279       }
280     };
281   }
282 
283   /*
284    * Data structure to hold a Writer and amount of data written on it.
285    */
286   static class WriterLength {
287     long written = 0;
288     StoreFile.Writer writer = null;
289   }
290 
291   /**
292    * Return the start keys of all of the regions in this table,
293    * as a list of ImmutableBytesWritable.
294    */
295   private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
296   throws IOException {
297     byte[][] byteKeys = table.getStartKeys();
298     ArrayList<ImmutableBytesWritable> ret =
299       new ArrayList<ImmutableBytesWritable>(byteKeys.length);
300     for (byte[] byteKey : byteKeys) {
301       ret.add(new ImmutableBytesWritable(byteKey));
302     }
303     return ret;
304   }
305 
306   /**
307    * Write out a {@link SequenceFile} that can be read by
308    * {@link TotalOrderPartitioner} that contains the split points in startKeys.
309    */
310   @SuppressWarnings("deprecation")
311   private static void writePartitions(Configuration conf, Path partitionsPath,
312       List<ImmutableBytesWritable> startKeys) throws IOException {
313     LOG.info("Writing partition information to " + partitionsPath);
314     if (startKeys.isEmpty()) {
315       throw new IllegalArgumentException("No regions passed");
316     }
317 
318     // We're generating a list of split points, and we don't ever
319     // have keys < the first region (which has an empty start key)
320     // so we need to remove it. Otherwise we would end up with an
321     // empty reducer with index 0
322     TreeSet<ImmutableBytesWritable> sorted =
323       new TreeSet<ImmutableBytesWritable>(startKeys);
324 
325     ImmutableBytesWritable first = sorted.first();
326     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
327       throw new IllegalArgumentException(
328           "First region of table should have empty start key. Instead has: "
329           + Bytes.toStringBinary(first.get()));
330     }
331     sorted.remove(first);
332 
333     // Write the actual file
334     FileSystem fs = partitionsPath.getFileSystem(conf);
335     SequenceFile.Writer writer = SequenceFile.createWriter(
336       fs, conf, partitionsPath, ImmutableBytesWritable.class,
337       NullWritable.class);
338 
339     try {
340       for (ImmutableBytesWritable startKey : sorted) {
341         writer.append(startKey, NullWritable.get());
342       }
343     } finally {
344       writer.close();
345     }
346   }
347 
348   /**
349    * Configure a MapReduce Job to perform an incremental load into the given
350    * table. This
351    * <ul>
352    *   <li>Inspects the table to configure a total order partitioner</li>
353    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
354    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
355    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
356    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
357    *     PutSortReducer)</li>
358    * </ul>
359    * The user should be sure to set the map output value class to either KeyValue or Put before
360    * running this function.
361    * 
362    * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
363    */
364   @Deprecated
365   public static void configureIncrementalLoad(Job job, HTable table)
366       throws IOException {
367     configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
368   }
369 
370   /**
371    * Configure a MapReduce Job to perform an incremental load into the given
372    * table. This
373    * <ul>
374    *   <li>Inspects the table to configure a total order partitioner</li>
375    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
376    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
377    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
378    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
379    *     PutSortReducer)</li>
380    * </ul>
381    * The user should be sure to set the map output value class to either KeyValue or Put before
382    * running this function.
383    */
384   public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
385       throws IOException {
386     configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
387   }
388 
389   /**
390    * Configure a MapReduce Job to perform an incremental load into the given
391    * table. This
392    * <ul>
393    *   <li>Inspects the table to configure a total order partitioner</li>
394    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
395    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
396    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
397    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
398    *     PutSortReducer)</li>
399    * </ul>
400    * The user should be sure to set the map output value class to either KeyValue or Put before
401    * running this function.
402    */
403   public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
404       RegionLocator regionLocator) throws IOException {
405     configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
406   }
407 
408   static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
409       RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
410       UnsupportedEncodingException {
411     Configuration conf = job.getConfiguration();
412     job.setOutputKeyClass(ImmutableBytesWritable.class);
413     job.setOutputValueClass(KeyValue.class);
414     job.setOutputFormatClass(cls);
415 
416     // Based on the configured map output class, set the correct reducer to properly
417     // sort the incoming values.
418     // TODO it would be nice to pick one or the other of these formats.
419     if (KeyValue.class.equals(job.getMapOutputValueClass())) {
420       job.setReducerClass(KeyValueSortReducer.class);
421     } else if (Put.class.equals(job.getMapOutputValueClass())) {
422       job.setReducerClass(PutSortReducer.class);
423     } else if (Text.class.equals(job.getMapOutputValueClass())) {
424       job.setReducerClass(TextSortReducer.class);
425     } else {
426       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
427     }
428 
429     conf.setStrings("io.serializations", conf.get("io.serializations"),
430         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
431         KeyValueSerialization.class.getName());
432 
433     // Use table's region boundaries for TOP split points.
434     LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
435     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
436     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
437         "to match current region count");
438     job.setNumReduceTasks(startKeys.size());
439 
440     configurePartitioner(job, startKeys);
441     // Set compression algorithms based on column families
442     configureCompression(conf, tableDescriptor);
443     configureBloomType(tableDescriptor, conf);
444     configureBlockSize(tableDescriptor, conf);
445     configureDataBlockEncoding(tableDescriptor, conf);
446 
447     TableMapReduceUtil.addDependencyJars(job);
448     TableMapReduceUtil.initCredentials(job);
449     LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
450   }
451   
452   public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
453     Configuration conf = job.getConfiguration();
454 
455     job.setOutputKeyClass(ImmutableBytesWritable.class);
456     job.setOutputValueClass(KeyValue.class);
457     job.setOutputFormatClass(HFileOutputFormat2.class);
458 
459     // Set compression algorithms based on column families
460     configureCompression(conf, table.getTableDescriptor());
461     configureBloomType(table.getTableDescriptor(), conf);
462     configureBlockSize(table.getTableDescriptor(), conf);
463     HTableDescriptor tableDescriptor = table.getTableDescriptor();
464     configureDataBlockEncoding(tableDescriptor, conf);
465 
466     TableMapReduceUtil.addDependencyJars(job);
467     TableMapReduceUtil.initCredentials(job);
468     LOG.info("Incremental table " + table.getName() + " output configured.");
469   }
470 
471   /**
472    * Runs inside the task to deserialize column family to compression algorithm
473    * map from the configuration.
474    *
475    * @param conf to read the serialized values from
476    * @return a map from column family to the configured compression algorithm
477    */
478   @VisibleForTesting
479   static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
480       conf) {
481     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
482         COMPRESSION_FAMILIES_CONF_KEY);
483     Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
484         Algorithm>(Bytes.BYTES_COMPARATOR);
485     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
486       Algorithm algorithm = AbstractHFileWriter.compressionByName
487           (e.getValue());
488       compressionMap.put(e.getKey(), algorithm);
489     }
490     return compressionMap;
491   }
492 
493   /**
494    * Runs inside the task to deserialize column family to bloom filter type
495    * map from the configuration.
496    *
497    * @param conf to read the serialized values from
498    * @return a map from column family to the the configured bloom filter type
499    */
500   @VisibleForTesting
501   static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
502     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
503         BLOOM_TYPE_FAMILIES_CONF_KEY);
504     Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
505         BloomType>(Bytes.BYTES_COMPARATOR);
506     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
507       BloomType bloomType = BloomType.valueOf(e.getValue());
508       bloomTypeMap.put(e.getKey(), bloomType);
509     }
510     return bloomTypeMap;
511   }
512 
513   /**
514    * Runs inside the task to deserialize column family to block size
515    * map from the configuration.
516    *
517    * @param conf to read the serialized values from
518    * @return a map from column family to the configured block size
519    */
520   @VisibleForTesting
521   static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
522     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
523         BLOCK_SIZE_FAMILIES_CONF_KEY);
524     Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
525         Integer>(Bytes.BYTES_COMPARATOR);
526     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
527       Integer blockSize = Integer.parseInt(e.getValue());
528       blockSizeMap.put(e.getKey(), blockSize);
529     }
530     return blockSizeMap;
531   }
532 
533   /**
534    * Runs inside the task to deserialize column family to data block encoding
535    * type map from the configuration.
536    *
537    * @param conf to read the serialized values from
538    * @return a map from column family to HFileDataBlockEncoder for the
539    *         configured data block type for the family
540    */
541   @VisibleForTesting
542   static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
543       Configuration conf) {
544     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
545         DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
546     Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
547         DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
548     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
549       encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
550     }
551     return encoderMap;
552   }
553 
554 
555   /**
556    * Run inside the task to deserialize column family to given conf value map.
557    *
558    * @param conf to read the serialized values from
559    * @param confName conf key to read from the configuration
560    * @return a map of column family to the given configuration value
561    */
562   private static Map<byte[], String> createFamilyConfValueMap(
563       Configuration conf, String confName) {
564     Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
565     String confVal = conf.get(confName, "");
566     for (String familyConf : confVal.split("&")) {
567       String[] familySplit = familyConf.split("=");
568       if (familySplit.length != 2) {
569         continue;
570       }
571       try {
572         confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
573             URLDecoder.decode(familySplit[1], "UTF-8"));
574       } catch (UnsupportedEncodingException e) {
575         // will not happen with UTF-8 encoding
576         throw new AssertionError(e);
577       }
578     }
579     return confValMap;
580   }
581 
582   /**
583    * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
584    * <code>splitPoints</code>. Cleans up the partitions file after job exists.
585    */
586   static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
587       throws IOException {
588     Configuration conf = job.getConfiguration();
589     // create the partitions file
590     FileSystem fs = FileSystem.get(conf);
591     String hbaseTmpFsDir =
592         conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
593           HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
594     Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
595     fs.makeQualified(partitionsPath);
596     writePartitions(conf, partitionsPath, splitPoints);
597     fs.deleteOnExit(partitionsPath);
598 
599     // configure job to use it
600     job.setPartitionerClass(TotalOrderPartitioner.class);
601     TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
602   }
603 
604   /**
605    * Serialize column family to compression algorithm map to configuration.
606    * Invoked while configuring the MR job for incremental load.
607    *
608    * @param table to read the properties from
609    * @param conf to persist serialized values into
610    * @throws IOException
611    *           on failure to read column family descriptors
612    */
613   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
614       value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
615   @VisibleForTesting
616   static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
617       throws UnsupportedEncodingException {
618     StringBuilder compressionConfigValue = new StringBuilder();
619     if(tableDescriptor == null){
620       // could happen with mock table instance
621       return;
622     }
623     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
624     int i = 0;
625     for (HColumnDescriptor familyDescriptor : families) {
626       if (i++ > 0) {
627         compressionConfigValue.append('&');
628       }
629       compressionConfigValue.append(URLEncoder.encode(
630         familyDescriptor.getNameAsString(), "UTF-8"));
631       compressionConfigValue.append('=');
632       compressionConfigValue.append(URLEncoder.encode(
633         familyDescriptor.getCompression().getName(), "UTF-8"));
634     }
635     // Get rid of the last ampersand
636     conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
637   }
638 
639   /**
640    * Serialize column family to block size map to configuration.
641    * Invoked while configuring the MR job for incremental load.
642    * @param tableDescriptor to read the properties from
643    * @param conf to persist serialized values into
644    *
645    * @throws IOException
646    *           on failure to read column family descriptors
647    */
648   @VisibleForTesting
649   static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
650       throws UnsupportedEncodingException {
651     StringBuilder blockSizeConfigValue = new StringBuilder();
652     if (tableDescriptor == null) {
653       // could happen with mock table instance
654       return;
655     }
656     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
657     int i = 0;
658     for (HColumnDescriptor familyDescriptor : families) {
659       if (i++ > 0) {
660         blockSizeConfigValue.append('&');
661       }
662       blockSizeConfigValue.append(URLEncoder.encode(
663           familyDescriptor.getNameAsString(), "UTF-8"));
664       blockSizeConfigValue.append('=');
665       blockSizeConfigValue.append(URLEncoder.encode(
666           String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
667     }
668     // Get rid of the last ampersand
669     conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
670   }
671 
672   /**
673    * Serialize column family to bloom type map to configuration.
674    * Invoked while configuring the MR job for incremental load.
675    * @param tableDescriptor to read the properties from
676    * @param conf to persist serialized values into
677    *
678    * @throws IOException
679    *           on failure to read column family descriptors
680    */
681   @VisibleForTesting
682   static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
683       throws UnsupportedEncodingException {
684     if (tableDescriptor == null) {
685       // could happen with mock table instance
686       return;
687     }
688     StringBuilder bloomTypeConfigValue = new StringBuilder();
689     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
690     int i = 0;
691     for (HColumnDescriptor familyDescriptor : families) {
692       if (i++ > 0) {
693         bloomTypeConfigValue.append('&');
694       }
695       bloomTypeConfigValue.append(URLEncoder.encode(
696         familyDescriptor.getNameAsString(), "UTF-8"));
697       bloomTypeConfigValue.append('=');
698       String bloomType = familyDescriptor.getBloomFilterType().toString();
699       if (bloomType == null) {
700         bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
701       }
702       bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
703     }
704     conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
705   }
706 
707   /**
708    * Serialize column family to data block encoding map to configuration.
709    * Invoked while configuring the MR job for incremental load.
710    *
711    * @param table to read the properties from
712    * @param conf to persist serialized values into
713    * @throws IOException
714    *           on failure to read column family descriptors
715    */
716   @VisibleForTesting
717   static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
718       Configuration conf) throws UnsupportedEncodingException {
719     if (tableDescriptor == null) {
720       // could happen with mock table instance
721       return;
722     }
723     StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
724     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
725     int i = 0;
726     for (HColumnDescriptor familyDescriptor : families) {
727       if (i++ > 0) {
728         dataBlockEncodingConfigValue.append('&');
729       }
730       dataBlockEncodingConfigValue.append(
731           URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
732       dataBlockEncodingConfigValue.append('=');
733       DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
734       if (encoding == null) {
735         encoding = DataBlockEncoding.NONE;
736       }
737       dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
738           "UTF-8"));
739     }
740     conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
741         dataBlockEncodingConfigValue.toString());
742   }
743 }