1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.io.UnsupportedEncodingException;
22 import java.net.URLDecoder;
23 import java.net.URLEncoder;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.TreeSet;
30 import java.util.UUID;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.classification.InterfaceStability;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.KeyValueUtil;
46 import org.apache.hadoop.hbase.client.HTable;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.RegionLocator;
49 import org.apache.hadoop.hbase.client.Table;
50 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
51 import org.apache.hadoop.hbase.io.compress.Compression;
52 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
53 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
54 import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
55 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
56 import org.apache.hadoop.hbase.io.hfile.HFile;
57 import org.apache.hadoop.hbase.io.hfile.HFileContext;
58 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
59 import org.apache.hadoop.hbase.regionserver.BloomType;
60 import org.apache.hadoop.hbase.regionserver.HStore;
61 import org.apache.hadoop.hbase.regionserver.StoreFile;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.io.NullWritable;
64 import org.apache.hadoop.io.SequenceFile;
65 import org.apache.hadoop.io.Text;
66 import org.apache.hadoop.mapreduce.Job;
67 import org.apache.hadoop.mapreduce.OutputCommitter;
68 import org.apache.hadoop.mapreduce.OutputFormat;
69 import org.apache.hadoop.mapreduce.RecordWriter;
70 import org.apache.hadoop.mapreduce.TaskAttemptContext;
71 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
72 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
73 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
74
75 import com.google.common.annotations.VisibleForTesting;
76
77
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Public
87 @InterfaceStability.Evolving
88 public class HFileOutputFormat2
89 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
90 private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
91
92
93
94
95
96 private static final String COMPRESSION_FAMILIES_CONF_KEY =
97 "hbase.hfileoutputformat.families.compression";
98 private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
99 "hbase.hfileoutputformat.families.bloomtype";
100 private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
101 "hbase.mapreduce.hfileoutputformat.blocksize";
102 private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
103 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
104
105
106
107
108
109 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
110 "hbase.mapreduce.hfileoutputformat.datablock.encoding";
111
112 @Override
113 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
114 final TaskAttemptContext context) throws IOException, InterruptedException {
115 return createRecordWriter(context, this.getOutputCommitter(context));
116 }
117
118 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
119 createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
120 throws IOException {
121
122
123 final Path outputdir = ((FileOutputCommitter) committer).getWorkPath();
124 final Configuration conf = context.getConfiguration();
125 final FileSystem fs = outputdir.getFileSystem(conf);
126
127 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
128 HConstants.DEFAULT_MAX_FILE_SIZE);
129
130 final String defaultCompressionStr = conf.get("hfile.compression",
131 Compression.Algorithm.NONE.getName());
132 final Algorithm defaultCompression = AbstractHFileWriter
133 .compressionByName(defaultCompressionStr);
134 final boolean compactionExclude = conf.getBoolean(
135 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
136
137
138 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
139 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
140 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
141
142 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
143 final Map<byte[], DataBlockEncoding> datablockEncodingMap
144 = createFamilyDataBlockEncodingMap(conf);
145 final DataBlockEncoding overriddenEncoding;
146 if (dataBlockEncodingStr != null) {
147 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
148 } else {
149 overriddenEncoding = null;
150 }
151
152 return new RecordWriter<ImmutableBytesWritable, V>() {
153
154 private final Map<byte [], WriterLength> writers =
155 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
156 private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
157 private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
158 private boolean rollRequested = false;
159
160 @Override
161 public void write(ImmutableBytesWritable row, V cell)
162 throws IOException {
163 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
164
165
166 if (row == null && kv == null) {
167 rollWriters();
168 return;
169 }
170
171 byte [] rowKey = CellUtil.cloneRow(kv);
172 long length = kv.getLength();
173 byte [] family = CellUtil.cloneFamily(kv);
174 WriterLength wl = this.writers.get(family);
175
176
177 if (wl == null) {
178 fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
179 }
180
181
182
183 if (wl != null && wl.written + length >= maxsize) {
184 this.rollRequested = true;
185 }
186
187
188 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
189 rollWriters();
190 }
191
192
193 if (wl == null || wl.writer == null) {
194 wl = getNewWriter(family, conf);
195 }
196
197
198 kv.updateLatestStamp(this.now);
199 wl.writer.append(kv);
200 wl.written += length;
201
202
203 this.previousRow = rowKey;
204 }
205
206 private void rollWriters() throws IOException {
207 for (WriterLength wl : this.writers.values()) {
208 if (wl.writer != null) {
209 LOG.info("Writer=" + wl.writer.getPath() +
210 ((wl.written == 0)? "": ", wrote=" + wl.written));
211 close(wl.writer);
212 }
213 wl.writer = null;
214 wl.written = 0;
215 }
216 this.rollRequested = false;
217 }
218
219
220
221
222
223
224 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
225 justification="Not important")
226 private WriterLength getNewWriter(byte[] family, Configuration conf)
227 throws IOException {
228 WriterLength wl = new WriterLength();
229 Path familydir = new Path(outputdir, Bytes.toString(family));
230 Algorithm compression = compressionMap.get(family);
231 compression = compression == null ? defaultCompression : compression;
232 BloomType bloomType = bloomTypeMap.get(family);
233 bloomType = bloomType == null ? BloomType.NONE : bloomType;
234 Integer blockSize = blockSizeMap.get(family);
235 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
236 DataBlockEncoding encoding = overriddenEncoding;
237 encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
238 encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
239 Configuration tempConf = new Configuration(conf);
240 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
241 HFileContextBuilder contextBuilder = new HFileContextBuilder()
242 .withCompression(compression)
243 .withChecksumType(HStore.getChecksumType(conf))
244 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
245 .withBlockSize(blockSize);
246 contextBuilder.withDataBlockEncoding(encoding);
247 HFileContext hFileContext = contextBuilder.build();
248
249 wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
250 .withOutputDir(familydir).withBloomType(bloomType)
251 .withComparator(KeyValue.COMPARATOR)
252 .withFileContext(hFileContext).build();
253
254 this.writers.put(family, wl);
255 return wl;
256 }
257
258 private void close(final StoreFile.Writer w) throws IOException {
259 if (w != null) {
260 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
261 Bytes.toBytes(System.currentTimeMillis()));
262 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
263 Bytes.toBytes(context.getTaskAttemptID().toString()));
264 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
265 Bytes.toBytes(true));
266 w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
267 Bytes.toBytes(compactionExclude));
268 w.appendTrackedTimestampsToMetadata();
269 w.close();
270 }
271 }
272
273 @Override
274 public void close(TaskAttemptContext c)
275 throws IOException, InterruptedException {
276 for (WriterLength wl: this.writers.values()) {
277 close(wl.writer);
278 }
279 }
280 };
281 }
282
283
284
285
286 static class WriterLength {
287 long written = 0;
288 StoreFile.Writer writer = null;
289 }
290
291
292
293
294
295 private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
296 throws IOException {
297 byte[][] byteKeys = table.getStartKeys();
298 ArrayList<ImmutableBytesWritable> ret =
299 new ArrayList<ImmutableBytesWritable>(byteKeys.length);
300 for (byte[] byteKey : byteKeys) {
301 ret.add(new ImmutableBytesWritable(byteKey));
302 }
303 return ret;
304 }
305
306
307
308
309
310 @SuppressWarnings("deprecation")
311 private static void writePartitions(Configuration conf, Path partitionsPath,
312 List<ImmutableBytesWritable> startKeys) throws IOException {
313 LOG.info("Writing partition information to " + partitionsPath);
314 if (startKeys.isEmpty()) {
315 throw new IllegalArgumentException("No regions passed");
316 }
317
318
319
320
321
322 TreeSet<ImmutableBytesWritable> sorted =
323 new TreeSet<ImmutableBytesWritable>(startKeys);
324
325 ImmutableBytesWritable first = sorted.first();
326 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
327 throw new IllegalArgumentException(
328 "First region of table should have empty start key. Instead has: "
329 + Bytes.toStringBinary(first.get()));
330 }
331 sorted.remove(first);
332
333
334 FileSystem fs = partitionsPath.getFileSystem(conf);
335 SequenceFile.Writer writer = SequenceFile.createWriter(
336 fs, conf, partitionsPath, ImmutableBytesWritable.class,
337 NullWritable.class);
338
339 try {
340 for (ImmutableBytesWritable startKey : sorted) {
341 writer.append(startKey, NullWritable.get());
342 }
343 } finally {
344 writer.close();
345 }
346 }
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 @Deprecated
365 public static void configureIncrementalLoad(Job job, HTable table)
366 throws IOException {
367 configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
385 throws IOException {
386 configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
387 }
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403 public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
404 RegionLocator regionLocator) throws IOException {
405 configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
406 }
407
408 static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
409 RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
410 UnsupportedEncodingException {
411 Configuration conf = job.getConfiguration();
412 job.setOutputKeyClass(ImmutableBytesWritable.class);
413 job.setOutputValueClass(KeyValue.class);
414 job.setOutputFormatClass(cls);
415
416
417
418
419 if (KeyValue.class.equals(job.getMapOutputValueClass())) {
420 job.setReducerClass(KeyValueSortReducer.class);
421 } else if (Put.class.equals(job.getMapOutputValueClass())) {
422 job.setReducerClass(PutSortReducer.class);
423 } else if (Text.class.equals(job.getMapOutputValueClass())) {
424 job.setReducerClass(TextSortReducer.class);
425 } else {
426 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
427 }
428
429 conf.setStrings("io.serializations", conf.get("io.serializations"),
430 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
431 KeyValueSerialization.class.getName());
432
433
434 LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
435 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
436 LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
437 "to match current region count");
438 job.setNumReduceTasks(startKeys.size());
439
440 configurePartitioner(job, startKeys);
441
442 configureCompression(conf, tableDescriptor);
443 configureBloomType(tableDescriptor, conf);
444 configureBlockSize(tableDescriptor, conf);
445 configureDataBlockEncoding(tableDescriptor, conf);
446
447 TableMapReduceUtil.addDependencyJars(job);
448 TableMapReduceUtil.initCredentials(job);
449 LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
450 }
451
452 public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
453 Configuration conf = job.getConfiguration();
454
455 job.setOutputKeyClass(ImmutableBytesWritable.class);
456 job.setOutputValueClass(KeyValue.class);
457 job.setOutputFormatClass(HFileOutputFormat2.class);
458
459
460 configureCompression(conf, table.getTableDescriptor());
461 configureBloomType(table.getTableDescriptor(), conf);
462 configureBlockSize(table.getTableDescriptor(), conf);
463 HTableDescriptor tableDescriptor = table.getTableDescriptor();
464 configureDataBlockEncoding(tableDescriptor, conf);
465
466 TableMapReduceUtil.addDependencyJars(job);
467 TableMapReduceUtil.initCredentials(job);
468 LOG.info("Incremental table " + table.getName() + " output configured.");
469 }
470
471
472
473
474
475
476
477
478 @VisibleForTesting
479 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
480 conf) {
481 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
482 COMPRESSION_FAMILIES_CONF_KEY);
483 Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
484 Algorithm>(Bytes.BYTES_COMPARATOR);
485 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
486 Algorithm algorithm = AbstractHFileWriter.compressionByName
487 (e.getValue());
488 compressionMap.put(e.getKey(), algorithm);
489 }
490 return compressionMap;
491 }
492
493
494
495
496
497
498
499
500 @VisibleForTesting
501 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
502 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
503 BLOOM_TYPE_FAMILIES_CONF_KEY);
504 Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
505 BloomType>(Bytes.BYTES_COMPARATOR);
506 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
507 BloomType bloomType = BloomType.valueOf(e.getValue());
508 bloomTypeMap.put(e.getKey(), bloomType);
509 }
510 return bloomTypeMap;
511 }
512
513
514
515
516
517
518
519
520 @VisibleForTesting
521 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
522 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
523 BLOCK_SIZE_FAMILIES_CONF_KEY);
524 Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
525 Integer>(Bytes.BYTES_COMPARATOR);
526 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
527 Integer blockSize = Integer.parseInt(e.getValue());
528 blockSizeMap.put(e.getKey(), blockSize);
529 }
530 return blockSizeMap;
531 }
532
533
534
535
536
537
538
539
540
541 @VisibleForTesting
542 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
543 Configuration conf) {
544 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
545 DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
546 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
547 DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
548 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
549 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
550 }
551 return encoderMap;
552 }
553
554
555
556
557
558
559
560
561
562 private static Map<byte[], String> createFamilyConfValueMap(
563 Configuration conf, String confName) {
564 Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
565 String confVal = conf.get(confName, "");
566 for (String familyConf : confVal.split("&")) {
567 String[] familySplit = familyConf.split("=");
568 if (familySplit.length != 2) {
569 continue;
570 }
571 try {
572 confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
573 URLDecoder.decode(familySplit[1], "UTF-8"));
574 } catch (UnsupportedEncodingException e) {
575
576 throw new AssertionError(e);
577 }
578 }
579 return confValMap;
580 }
581
582
583
584
585
586 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
587 throws IOException {
588 Configuration conf = job.getConfiguration();
589
590 FileSystem fs = FileSystem.get(conf);
591 String hbaseTmpFsDir =
592 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
593 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
594 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
595 fs.makeQualified(partitionsPath);
596 writePartitions(conf, partitionsPath, splitPoints);
597 fs.deleteOnExit(partitionsPath);
598
599
600 job.setPartitionerClass(TotalOrderPartitioner.class);
601 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
602 }
603
604
605
606
607
608
609
610
611
612
613 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
614 value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
615 @VisibleForTesting
616 static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
617 throws UnsupportedEncodingException {
618 StringBuilder compressionConfigValue = new StringBuilder();
619 if(tableDescriptor == null){
620
621 return;
622 }
623 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
624 int i = 0;
625 for (HColumnDescriptor familyDescriptor : families) {
626 if (i++ > 0) {
627 compressionConfigValue.append('&');
628 }
629 compressionConfigValue.append(URLEncoder.encode(
630 familyDescriptor.getNameAsString(), "UTF-8"));
631 compressionConfigValue.append('=');
632 compressionConfigValue.append(URLEncoder.encode(
633 familyDescriptor.getCompression().getName(), "UTF-8"));
634 }
635
636 conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
637 }
638
639
640
641
642
643
644
645
646
647
648 @VisibleForTesting
649 static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
650 throws UnsupportedEncodingException {
651 StringBuilder blockSizeConfigValue = new StringBuilder();
652 if (tableDescriptor == null) {
653
654 return;
655 }
656 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
657 int i = 0;
658 for (HColumnDescriptor familyDescriptor : families) {
659 if (i++ > 0) {
660 blockSizeConfigValue.append('&');
661 }
662 blockSizeConfigValue.append(URLEncoder.encode(
663 familyDescriptor.getNameAsString(), "UTF-8"));
664 blockSizeConfigValue.append('=');
665 blockSizeConfigValue.append(URLEncoder.encode(
666 String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
667 }
668
669 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
670 }
671
672
673
674
675
676
677
678
679
680
681 @VisibleForTesting
682 static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
683 throws UnsupportedEncodingException {
684 if (tableDescriptor == null) {
685
686 return;
687 }
688 StringBuilder bloomTypeConfigValue = new StringBuilder();
689 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
690 int i = 0;
691 for (HColumnDescriptor familyDescriptor : families) {
692 if (i++ > 0) {
693 bloomTypeConfigValue.append('&');
694 }
695 bloomTypeConfigValue.append(URLEncoder.encode(
696 familyDescriptor.getNameAsString(), "UTF-8"));
697 bloomTypeConfigValue.append('=');
698 String bloomType = familyDescriptor.getBloomFilterType().toString();
699 if (bloomType == null) {
700 bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
701 }
702 bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
703 }
704 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
705 }
706
707
708
709
710
711
712
713
714
715
716 @VisibleForTesting
717 static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
718 Configuration conf) throws UnsupportedEncodingException {
719 if (tableDescriptor == null) {
720
721 return;
722 }
723 StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
724 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
725 int i = 0;
726 for (HColumnDescriptor familyDescriptor : families) {
727 if (i++ > 0) {
728 dataBlockEncodingConfigValue.append('&');
729 }
730 dataBlockEncodingConfigValue.append(
731 URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
732 dataBlockEncodingConfigValue.append('=');
733 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
734 if (encoding == null) {
735 encoding = DataBlockEncoding.NONE;
736 }
737 dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
738 "UTF-8"));
739 }
740 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
741 dataBlockEncodingConfigValue.toString());
742 }
743 }