View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  import java.io.UnsupportedEncodingException;
24  import java.net.URI;
25  import java.net.URISyntaxException;
26  import java.net.URLDecoder;
27  import java.net.URLEncoder;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.TreeMap;
33  import java.util.TreeSet;
34  import java.util.UUID;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.filecache.DistributedCache;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
49  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50  import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
51  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52  import org.apache.hadoop.hbase.io.hfile.Compression;
53  import org.apache.hadoop.hbase.io.hfile.HFile;
54  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
55  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
56  import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
57  import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
58  import org.apache.hadoop.hbase.regionserver.Store;
59  import org.apache.hadoop.hbase.regionserver.StoreFile;
60  import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.io.NullWritable;
63  import org.apache.hadoop.io.SequenceFile;
64  import org.apache.hadoop.mapreduce.Job;
65  import org.apache.hadoop.mapreduce.Partitioner;
66  import org.apache.hadoop.mapreduce.RecordWriter;
67  import org.apache.hadoop.mapreduce.TaskAttemptContext;
68  import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
69  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
70  
71  import com.google.common.annotations.VisibleForTesting;
72  
73  /**
74   * Writes HFiles. Passed KeyValues must arrive in order.
75   * Currently, can only write files to a single column family at a
76   * time.  Multiple column families requires coordinating keys cross family.
77   * Writes current time as the sequence id for the file. Sets the major compacted
78   * attribute on created hfiles. Calling write(null,null) will forcibly roll
79   * all HFiles being written.
80   * @see KeyValueSortReducer
81   */
82  public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
83    static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
84    
85    // The following constants are private since these are used by
86    // HFileOutputFormat to internally transfer data between job setup and
87    // reducer run using conf.
88    // These should not be changed by the client.
89    private static final String COMPRESSION_FAMILIES_CONF_KEY =
90        "hbase.hfileoutputformat.families.compression";
91    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
92        "hbase.hfileoutputformat.families.bloomtype";
93    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
94        "hbase.mapreduce.hfileoutputformat.blocksize";
95    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
96        "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
97  
98    // This constant is public since the client can modify this when setting
99    // up their conf object and thus refer to this symbol.
100   // It is present for backwards compatibility reasons. Use it only to
101   // override the auto-detection of datablock encoding.
102   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
103       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
104 
105   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
106   throws IOException, InterruptedException {
107     // Get the path of the temporary output file
108     final Path outputPath = FileOutputFormat.getOutputPath(context);
109     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
110     final Configuration conf = context.getConfiguration();
111     final FileSystem fs = outputdir.getFileSystem(conf);
112     // These configs. are from hbase-*.xml
113     final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
114         HConstants.DEFAULT_MAX_FILE_SIZE);
115     // Invented config.  Add to hbase-*.xml if other than default compression.
116     final String defaultCompressionStr = conf.get("hfile.compression",
117         Compression.Algorithm.NONE.getName());
118     final Algorithm defaultCompression =
119         AbstractHFileWriter.compressionByName(defaultCompressionStr);
120 
121     final boolean compactionExclude = conf.getBoolean(
122         "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
123 
124     // create a map from column family to the compression algorithm
125     final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
126     final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
127     final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
128     
129     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
130     final Map<byte[], HFileDataBlockEncoder> datablockEncodingMap =
131         createFamilyDataBlockEncodingMap(conf);
132     final HFileDataBlockEncoder overriddenEncoder;
133     if (dataBlockEncodingStr != null) {
134       overriddenEncoder = getDataBlockEncoderFromString(dataBlockEncodingStr);
135     } else {
136       overriddenEncoder = null;
137     }
138 
139     return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
140       // Map of families to writers and how much has been output on the writer.
141       private final Map<byte [], WriterLength> writers =
142         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
143       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
144       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
145       private boolean rollRequested = false;
146 
147       public void write(ImmutableBytesWritable row, KeyValue kv)
148       throws IOException {
149         // null input == user explicitly wants to flush
150         if (row == null && kv == null) {
151           rollWriters();
152           return;
153         }
154 
155         byte [] rowKey = kv.getRow();
156         long length = kv.getLength();
157         byte [] family = kv.getFamily();
158         WriterLength wl = this.writers.get(family);
159 
160         // If this is a new column family, verify that the directory exists
161         if (wl == null) {
162           fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
163         }
164 
165         // If any of the HFiles for the column families has reached
166         // maxsize, we need to roll all the writers
167         if (wl != null && wl.written + length >= maxsize) {
168           this.rollRequested = true;
169         }
170 
171         // This can only happen once a row is finished though
172         if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
173           rollWriters();
174         }
175 
176         // create a new HLog writer, if necessary
177         if (wl == null || wl.writer == null) {
178           wl = getNewWriter(family, conf);
179         }
180 
181         // we now have the proper HLog writer. full steam ahead
182         kv.updateLatestStamp(this.now);
183         wl.writer.append(kv);
184         wl.written += length;
185 
186         // Copy the row so we know when a row transition.
187         this.previousRow = rowKey;
188       }
189 
190       private void rollWriters() throws IOException {
191         for (WriterLength wl : this.writers.values()) {
192           if (wl.writer != null) {
193             LOG.info("Writer=" + wl.writer.getPath() +
194                 ((wl.written == 0)? "": ", wrote=" + wl.written));
195             close(wl.writer);
196           }
197           wl.writer = null;
198           wl.written = 0;
199         }
200         this.rollRequested = false;
201       }
202 
203       /* Create a new StoreFile.Writer.
204        * @param family
205        * @return A WriterLength, containing a new StoreFile.Writer.
206        * @throws IOException
207        */
208       private WriterLength getNewWriter(byte[] family, Configuration conf)
209           throws IOException {
210         WriterLength wl = new WriterLength();
211         Path familydir = new Path(outputdir, Bytes.toString(family));
212         Algorithm compression = compressionMap.get(family);
213         compression = compression == null ? defaultCompression : compression;
214         BloomType bloomType = bloomTypeMap.get(family);
215         bloomType = bloomType == null ? BloomType.NONE : bloomType;
216         Integer blockSize = blockSizeMap.get(family);
217         blockSize = blockSize == null ? HFile.DEFAULT_BLOCKSIZE : blockSize;
218         HFileDataBlockEncoder encoder = overriddenEncoder;
219         encoder = encoder == null ? datablockEncodingMap.get(family) : encoder;
220         encoder = encoder == null ? NoOpDataBlockEncoder.INSTANCE : encoder;
221 
222         Configuration tempConf = new Configuration(conf);
223         tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
224         wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize)
225             .withOutputDir(familydir)
226             .withCompression(compression)
227             .withBloomType(bloomType)
228             .withComparator(KeyValue.COMPARATOR)
229             .withDataBlockEncoder(encoder)
230             .withChecksumType(Store.getChecksumType(conf))
231             .withBytesPerChecksum(Store.getBytesPerChecksum(conf))
232             .build();
233 
234         this.writers.put(family, wl);
235         return wl;
236       }
237 
238       private void close(final StoreFile.Writer w) throws IOException {
239         if (w != null) {
240           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
241               Bytes.toBytes(System.currentTimeMillis()));
242           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
243               Bytes.toBytes(context.getTaskAttemptID().toString()));
244           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
245               Bytes.toBytes(true));
246           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
247               Bytes.toBytes(compactionExclude));
248           w.appendTrackedTimestampsToMetadata();
249           w.close();
250         }
251       }
252 
253       public void close(TaskAttemptContext c)
254       throws IOException, InterruptedException {
255         for (WriterLength wl: this.writers.values()) {
256           close(wl.writer);
257         }
258       }
259     };
260   }
261 
262   /*
263    * Data structure to hold a Writer and amount of data written on it.
264    */
265   static class WriterLength {
266     long written = 0;
267     StoreFile.Writer writer = null;
268   }
269 
270   /**
271    * Return the start keys of all of the regions in this table,
272    * as a list of ImmutableBytesWritable.
273    */
274   private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
275   throws IOException {
276     byte[][] byteKeys = table.getStartKeys();
277     ArrayList<ImmutableBytesWritable> ret =
278       new ArrayList<ImmutableBytesWritable>(byteKeys.length);
279     for (byte[] byteKey : byteKeys) {
280       ret.add(new ImmutableBytesWritable(byteKey));
281     }
282     return ret;
283   }
284 
285   /**
286    * Write out a SequenceFile that can be read by TotalOrderPartitioner
287    * that contains the split points in startKeys.
288    * @param partitionsPath output path for SequenceFile
289    * @param startKeys the region start keys
290    */
291   private static void writePartitions(Configuration conf, Path partitionsPath,
292       List<ImmutableBytesWritable> startKeys) throws IOException {
293     if (startKeys.isEmpty()) {
294       throw new IllegalArgumentException("No regions passed");
295     }
296 
297     // We're generating a list of split points, and we don't ever
298     // have keys < the first region (which has an empty start key)
299     // so we need to remove it. Otherwise we would end up with an
300     // empty reducer with index 0
301     TreeSet<ImmutableBytesWritable> sorted =
302       new TreeSet<ImmutableBytesWritable>(startKeys);
303 
304     ImmutableBytesWritable first = sorted.first();
305     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
306       throw new IllegalArgumentException(
307           "First region of table should have empty start key. Instead has: "
308           + Bytes.toStringBinary(first.get()));
309     }
310     sorted.remove(first);
311 
312     // Write the actual file
313     FileSystem fs = partitionsPath.getFileSystem(conf);
314     SequenceFile.Writer writer = SequenceFile.createWriter(fs,
315         conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
316 
317     try {
318       for (ImmutableBytesWritable startKey : sorted) {
319         writer.append(startKey, NullWritable.get());
320       }
321     } finally {
322       writer.close();
323     }
324   }
325 
326   /**
327    * Configure a MapReduce Job to perform an incremental load into the given
328    * table. This
329    * <ul>
330    *   <li>Inspects the table to configure a total order partitioner</li>
331    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
332    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
333    *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
334    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
335    *     PutSortReducer)</li>
336    * </ul>
337    * The user should be sure to set the map output value class to either KeyValue or Put before
338    * running this function.
339    */
340   public static void configureIncrementalLoad(Job job, HTable table)
341   throws IOException {
342     Configuration conf = job.getConfiguration();
343     Class<? extends Partitioner> topClass;
344     try {
345       topClass = getTotalOrderPartitionerClass();
346     } catch (ClassNotFoundException e) {
347       throw new IOException("Failed getting TotalOrderPartitioner", e);
348     }
349     job.setPartitionerClass(topClass);
350     job.setOutputKeyClass(ImmutableBytesWritable.class);
351     job.setOutputValueClass(KeyValue.class);
352     job.setOutputFormatClass(HFileOutputFormat.class);
353 
354     // Based on the configured map output class, set the correct reducer to properly
355     // sort the incoming values.
356     // TODO it would be nice to pick one or the other of these formats.
357     if (KeyValue.class.equals(job.getMapOutputValueClass())) {
358       job.setReducerClass(KeyValueSortReducer.class);
359     } else if (Put.class.equals(job.getMapOutputValueClass())) {
360       job.setReducerClass(PutSortReducer.class);
361     } else {
362       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
363     }
364 
365     LOG.info("Looking up current regions for table " + table);
366     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
367     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
368         "to match current region count");
369     job.setNumReduceTasks(startKeys.size());
370 
371     Path partitionsPath = new Path(job.getWorkingDirectory(),
372                                    "partitions_" + UUID.randomUUID());
373     LOG.info("Writing partition information to " + partitionsPath);
374 
375     FileSystem fs = partitionsPath.getFileSystem(conf);
376     writePartitions(conf, partitionsPath, startKeys);
377     partitionsPath.makeQualified(fs);
378 
379     URI cacheUri;
380     try {
381       // Below we make explicit reference to the bundled TOP.  Its cheating.
382       // We are assume the define in the hbase bundled TOP is as it is in
383       // hadoop (whether 0.20 or 0.22, etc.)
384       cacheUri = new URI(partitionsPath.toString() + "#" +
385         org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
386     } catch (URISyntaxException e) {
387       throw new IOException(e);
388     }
389     DistributedCache.addCacheFile(cacheUri, conf);
390     DistributedCache.createSymlink(conf);
391 
392     // Set compression algorithms based on column families
393     configureCompression(table, conf);
394     configureBloomType(table, conf);
395     configureBlockSize(table, conf);
396     configureDataBlockEncoding(table, conf);
397     
398     TableMapReduceUtil.addDependencyJars(job);
399     LOG.info("Incremental table output configured.");
400   }
401 
402   /**
403    * If > hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner.
404    * If 0.20, then we want to use the TOP that we have under hadoopbackport.
405    * This method is about hbase being able to run on different versions of
406    * hadoop.  In 0.20.x hadoops, we have to use the TOP that is bundled with
407    * hbase.  Otherwise, we use the one in Hadoop.
408    * @return Instance of the TotalOrderPartitioner class
409    * @throws ClassNotFoundException If can't find a TotalOrderPartitioner.
410    */
411   private static Class<? extends Partitioner> getTotalOrderPartitionerClass()
412   throws ClassNotFoundException {
413     Class<? extends Partitioner> clazz = null;
414     try {
415       clazz = (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner");
416     } catch (ClassNotFoundException e) {
417       clazz =
418         (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner");
419     }
420     return clazz;
421   }
422 
423   /**
424    * Runs inside the task to deserialize column family to compression algorithm
425    * map from the configuration.
426    *
427    * @return a map from column family to the name of the configured compression
428    *         algorithm
429    */
430   @VisibleForTesting
431   static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
432     Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
433     Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR);
434     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
435       Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue());
436       compressionMap.put(e.getKey(), algorithm);
437     }
438     return compressionMap;
439   }
440 
441   /**
442    * Runs inside the task to deserialize column family to bloom type
443    * map from the configuration.
444    *
445    * @param conf to read the serialized values from
446    * @return a map from column family to the configured bloom type
447    */
448   @VisibleForTesting
449   static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
450     Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
451     Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], BloomType>(Bytes.BYTES_COMPARATOR);
452     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
453       BloomType bloomType = BloomType.valueOf(e.getValue());
454       bloomTypeMap.put(e.getKey(), bloomType);
455     }
456     return bloomTypeMap;
457   }
458 
459   /**
460    * Runs inside the task to deserialize column family to block size
461    * map from the configuration.
462    *
463    * @param conf to read the serialized values from
464    * @return a map from column family to the configured block size
465    */
466   @VisibleForTesting
467   static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
468     Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
469     Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
470     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
471       Integer blockSize = Integer.parseInt(e.getValue());
472       blockSizeMap.put(e.getKey(), blockSize);
473     }
474     return blockSizeMap;
475   }
476   
477   /**
478    * Runs inside the task to deserialize column family to data block encoding type map from the
479    * configuration.
480    * 
481    * @param conf to read the serialized values from
482    * @return a map from column family to HFileDataBlockEncoder for the configured data block type
483    *         for the family
484    */
485   @VisibleForTesting
486   static Map<byte[], HFileDataBlockEncoder> createFamilyDataBlockEncodingMap(Configuration conf) {
487     Map<byte[], String> stringMap =
488         createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
489     Map<byte[], HFileDataBlockEncoder> encoderMap =
490         new TreeMap<byte[], HFileDataBlockEncoder>(Bytes.BYTES_COMPARATOR);
491     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
492       encoderMap.put(e.getKey(), getDataBlockEncoderFromString(e.getValue()));
493     }
494     return encoderMap;
495   }
496 
497   private static HFileDataBlockEncoder getDataBlockEncoderFromString(String dataBlockEncodingStr) {
498     HFileDataBlockEncoder encoder;
499     try {
500       encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding.valueOf(dataBlockEncodingStr));
501     } catch (IllegalArgumentException ex) {
502       throw new RuntimeException("Invalid data block encoding type configured for the param "
503           + DATABLOCK_ENCODING_FAMILIES_CONF_KEY + " : " + dataBlockEncodingStr);
504     }
505     return encoder;
506   }
507 
508   /**
509    * Run inside the task to deserialize column family to given conf value map.
510    * 
511    * @param conf to read the serialized values from
512    * @param confName conf key to read from the configuration
513    * @return a map of column family to the given configuration value
514    */
515   private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
516     Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
517     String confVal = conf.get(confName, "");
518     for (String familyConf : confVal.split("&")) {
519       String[] familySplit = familyConf.split("=");
520       if (familySplit.length != 2) {
521         continue;
522       }
523       try {
524         confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
525             URLDecoder.decode(familySplit[1], "UTF-8"));
526       } catch (UnsupportedEncodingException e) {
527         // will not happen with UTF-8 encoding
528         throw new AssertionError(e);
529       }
530     }
531     return confValMap;
532   }
533   
534   /**
535    * Serialize column family to compression algorithm map to configuration.
536    * Invoked while configuring the MR job for incremental load.
537    * 
538    * @param table to read the properties from
539    * @param conf to persist serialized values into
540    * @throws IOException 
541    *            on failure to read column family descriptors
542    */
543   @VisibleForTesting
544   static void configureCompression(HTable table, Configuration conf) throws IOException {
545     StringBuilder compressionConfigValue = new StringBuilder();
546     HTableDescriptor tableDescriptor = table.getTableDescriptor();
547     if(tableDescriptor == null){
548       // could happen with mock table instance
549       return;
550     }
551     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
552     int i = 0;
553     for (HColumnDescriptor familyDescriptor : families) {
554       if (i++ > 0) {
555         compressionConfigValue.append('&');
556       }
557       compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
558       compressionConfigValue.append('=');
559       compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
560     }
561     // Get rid of the last ampersand
562     conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
563   }
564   
565   /**
566    * Serialize column family to block size map to configuration.
567    * Invoked while configuring the MR job for incremental load.
568    * 
569    * @param table to read the properties from
570    * @param conf to persist serialized values into
571    * @throws IOException 
572    *            on failure to read column family descriptors
573    */
574   @VisibleForTesting
575   static void configureBlockSize(HTable table, Configuration conf) throws IOException {
576     StringBuilder blockSizeConfigValue = new StringBuilder();
577     HTableDescriptor tableDescriptor = table.getTableDescriptor();
578     if (tableDescriptor == null) {
579       // could happen with mock table instance
580       return;
581     }
582     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
583     int i = 0;
584     for (HColumnDescriptor familyDescriptor : families) {
585       if (i++ > 0) {
586         blockSizeConfigValue.append('&');
587       }
588       blockSizeConfigValue.append(URLEncoder.encode(
589           familyDescriptor.getNameAsString(), "UTF-8"));
590       blockSizeConfigValue.append('=');
591       blockSizeConfigValue.append(URLEncoder.encode(
592           String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
593     }
594     // Get rid of the last ampersand
595     conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
596   }
597 
598   /**
599    * Serialize column family to bloom type map to configuration.
600    * Invoked while configuring the MR job for incremental load.
601    * 
602    * @param table to read the properties from
603    * @param conf to persist serialized values into
604    * @throws IOException 
605    *            on failure to read column family descriptors
606    */
607   @VisibleForTesting
608   static void configureBloomType(HTable table, Configuration conf) throws IOException {
609     HTableDescriptor tableDescriptor = table.getTableDescriptor();
610     if (tableDescriptor == null) {
611       // could happen with mock table instance
612       return;
613     }
614     StringBuilder bloomTypeConfigValue = new StringBuilder();
615     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
616     int i = 0;
617     for (HColumnDescriptor familyDescriptor : families) {
618       if (i++ > 0) {
619         bloomTypeConfigValue.append('&');
620       }
621       bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
622       bloomTypeConfigValue.append('=');
623       String bloomType = familyDescriptor.getBloomFilterType().toString();
624       if (bloomType == null) {
625         bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
626       }
627       bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
628     }
629     conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
630   }
631 
632   /**
633    * Serialize column family to data block encoding map to configuration.
634    * Invoked while configuring the MR job for incremental load.
635    * 
636    * @param table to read the properties from
637    * @param conf to persist serialized values into
638    * @throws IOException 
639    *            on failure to read column family descriptors
640    */
641   @VisibleForTesting
642   static void configureDataBlockEncoding(HTable table, Configuration conf) throws IOException {
643     HTableDescriptor tableDescriptor = table.getTableDescriptor();
644     if (tableDescriptor == null) {
645       // could happen with mock table instance
646       return;
647     }
648     StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
649     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
650     int i = 0;
651     for (HColumnDescriptor familyDescriptor : families) {
652       if (i++ > 0) {
653         dataBlockEncodingConfigValue.append('&');
654       }
655       dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
656         "UTF-8"));
657       dataBlockEncodingConfigValue.append('=');
658       DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
659       if (encoding == null) {
660         encoding = DataBlockEncoding.NONE;
661       }
662       dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8"));
663     }
664     conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString());
665   }
666 
667 }