001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.nio.charset.Charset; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.function.Function; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.HRegionLocation; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.PrivateCellUtil; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionLocator; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.io.hfile.HFile; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 072import org.apache.hadoop.hbase.regionserver.BloomType; 073import org.apache.hadoop.hbase.regionserver.HStore; 074import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 075import org.apache.hadoop.hbase.regionserver.StoreUtils; 076import org.apache.hadoop.hbase.util.BloomFilterUtil; 077import org.apache.hadoop.hbase.util.Bytes; 078import org.apache.hadoop.hbase.util.CommonFSUtils; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 081import org.apache.hadoop.io.NullWritable; 082import org.apache.hadoop.io.SequenceFile; 083import org.apache.hadoop.io.Text; 084import org.apache.hadoop.mapreduce.Job; 085import org.apache.hadoop.mapreduce.OutputCommitter; 086import org.apache.hadoop.mapreduce.OutputFormat; 087import org.apache.hadoop.mapreduce.RecordWriter; 088import org.apache.hadoop.mapreduce.TaskAttemptContext; 089import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 090import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 091import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 092import org.apache.yetus.audience.InterfaceAudience; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096/** 097 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the 098 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will 099 * forcibly roll all HFiles being written. 100 * <p> 101 * Using this class as part of a MapReduce job is best done using 102 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 103 */ 104@InterfaceAudience.Public 105public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 106 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 107 108 static class TableInfo { 109 private TableDescriptor tableDesctiptor; 110 private RegionLocator regionLocator; 111 112 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 113 this.tableDesctiptor = tableDesctiptor; 114 this.regionLocator = regionLocator; 115 } 116 117 public TableDescriptor getTableDescriptor() { 118 return tableDesctiptor; 119 } 120 121 public RegionLocator getRegionLocator() { 122 return regionLocator; 123 } 124 } 125 126 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 127 128 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 129 return Bytes.add(tableName, tableSeparator, suffix); 130 } 131 132 // The following constants are private since these are used by 133 // HFileOutputFormat2 to internally transfer data between job setup and 134 // reducer run using conf. 135 // These should not be changed by the client. 136 static final String COMPRESSION_FAMILIES_CONF_KEY = 137 "hbase.hfileoutputformat.families.compression"; 138 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; 139 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; 140 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; 141 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 142 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 143 144 // This constant is public since the client can modify this when setting 145 // up their conf object and thus refer to this symbol. 146 // It is present for backwards compatibility reasons. Use it only to 147 // override the auto-detection of datablock encoding and compression. 148 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 149 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 150 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 151 "hbase.mapreduce.hfileoutputformat.compression"; 152 153 /** 154 * Keep locality while generating HFiles for bulkload. See HBASE-12596 155 */ 156 public static final String LOCALITY_SENSITIVE_CONF_KEY = 157 "hbase.bulkload.locality.sensitive.enabled"; 158 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 159 static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; 160 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 161 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 162 163 /** 164 * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config 165 * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell. 166 */ 167 @InterfaceAudience.Private 168 public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = 169 "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; 170 static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; 171 172 public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; 173 public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = 174 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; 175 public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = 176 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR; 177 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = 178 REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; 179 180 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 181 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 182 183 @Override 184 public RecordWriter<ImmutableBytesWritable, Cell> 185 getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { 186 return createRecordWriter(context, this.getOutputCommitter(context)); 187 } 188 189 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 190 return combineTableNameSuffix(tableName, family); 191 } 192 193 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 194 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 195 196 // Get the path of the temporary output file 197 final Path outputDir = ((FileOutputCommitter) committer).getWorkPath(); 198 final Configuration conf = context.getConfiguration(); 199 final boolean writeMultipleTables = 200 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 201 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 202 if (writeTableNames == null || writeTableNames.isEmpty()) { 203 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 204 } 205 final FileSystem fs = outputDir.getFileSystem(conf); 206 // These configs. are from hbase-*.xml 207 final long maxsize = 208 conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); 209 // Invented config. Add to hbase-*.xml if other than default compression. 210 final String defaultCompressionStr = 211 conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); 212 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 213 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 214 final Algorithm overriddenCompression = 215 compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null; 216 final boolean compactionExclude = 217 conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 218 final Set<String> allTableNames = Arrays 219 .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 220 221 // create a map from column family to the compression algorithm 222 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 223 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 224 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 225 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 226 227 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 228 final Map<byte[], DataBlockEncoding> datablockEncodingMap = 229 createFamilyDataBlockEncodingMap(conf); 230 final DataBlockEncoding overriddenEncoding = 231 dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 232 233 return new RecordWriter<ImmutableBytesWritable, V>() { 234 // Map of families to writers and how much has been output on the writer. 235 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 236 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 237 private final long now = EnvironmentEdgeManager.currentTime(); 238 private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); 239 240 @Override 241 public void write(ImmutableBytesWritable row, V cell) throws IOException { 242 Cell kv = cell; 243 // null input == user explicitly wants to flush 244 if (row == null && kv == null) { 245 rollWriters(null); 246 return; 247 } 248 249 byte[] rowKey = CellUtil.cloneRow(kv); 250 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 251 byte[] family = CellUtil.cloneFamily(kv); 252 if (writeMultipleTables) { 253 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 254 tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() 255 .getBytes(Charset.defaultCharset()); 256 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 257 throw new IllegalArgumentException( 258 "TableName " + Bytes.toString(tableNameBytes) + " not expected"); 259 } 260 } 261 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 262 263 WriterLength wl = this.writers.get(tableAndFamily); 264 265 // If this is a new column family, verify that the directory exists 266 if (wl == null) { 267 Path writerPath = null; 268 if (writeMultipleTables) { 269 Path tableRelPath = getTableRelativePath(tableNameBytes); 270 writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); 271 } else { 272 writerPath = new Path(outputDir, Bytes.toString(family)); 273 } 274 fs.mkdirs(writerPath); 275 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 276 } 277 278 // This can only happen once a row is finished though 279 if ( 280 wl != null && wl.written + length >= maxsize 281 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 282 ) { 283 rollWriters(wl); 284 } 285 286 // create a new WAL writer, if necessary 287 if (wl == null || wl.writer == null) { 288 InetSocketAddress[] favoredNodes = null; 289 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 290 HRegionLocation loc = null; 291 String tableName = Bytes.toString(tableNameBytes); 292 if (tableName != null) { 293 try ( 294 Connection connection = 295 ConnectionFactory.createConnection(createRemoteClusterConf(conf)); 296 RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { 297 loc = locator.getRegionLocation(rowKey); 298 } catch (Throwable e) { 299 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), 300 tableName, e); 301 loc = null; 302 } 303 } 304 if (null == loc) { 305 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 306 } else { 307 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 308 InetSocketAddress initialIsa = 309 new InetSocketAddress(loc.getHostname(), loc.getPort()); 310 if (initialIsa.isUnresolved()) { 311 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 312 } else { 313 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 314 favoredNodes = new InetSocketAddress[] { initialIsa }; 315 } 316 } 317 } 318 wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); 319 320 } 321 322 // we now have the proper WAL writer. full steam ahead 323 PrivateCellUtil.updateLatestStamp(cell, this.now); 324 wl.writer.append(kv); 325 wl.written += length; 326 327 // Copy the row so we know when a row transition. 328 this.previousRows.put(family, rowKey); 329 } 330 331 private Path getTableRelativePath(byte[] tableNameBytes) { 332 String tableName = Bytes.toString(tableNameBytes); 333 String[] tableNameParts = tableName.split(":"); 334 Path tableRelPath = new Path(tableNameParts[0]); 335 if (tableNameParts.length > 1) { 336 tableRelPath = new Path(tableRelPath, tableNameParts[1]); 337 } 338 return tableRelPath; 339 } 340 341 private void rollWriters(WriterLength writerLength) throws IOException { 342 if (writerLength != null) { 343 closeWriter(writerLength); 344 } else { 345 for (WriterLength wl : this.writers.values()) { 346 closeWriter(wl); 347 } 348 } 349 } 350 351 private void closeWriter(WriterLength wl) throws IOException { 352 if (wl.writer != null) { 353 LOG.info( 354 "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); 355 close(wl.writer); 356 wl.writer = null; 357 } 358 wl.written = 0; 359 } 360 361 private Configuration createRemoteClusterConf(Configuration conf) { 362 final Configuration newConf = new Configuration(conf); 363 364 final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); 365 final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); 366 final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); 367 368 if (quorum != null && clientPort != null && parent != null) { 369 newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); 370 newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); 371 newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); 372 } 373 374 for (Entry<String, String> entry : conf) { 375 String key = entry.getKey(); 376 if ( 377 REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key) 378 || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key) 379 || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key) 380 ) { 381 // Handled them above 382 continue; 383 } 384 385 if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) { 386 String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length()); 387 if (!originalKey.isEmpty()) { 388 newConf.set(originalKey, entry.getValue()); 389 } 390 } 391 } 392 393 return newConf; 394 } 395 396 /* 397 * Create a new StoreFile.Writer. 398 * @return A WriterLength, containing a new StoreFile.Writer. 399 */ 400 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", 401 justification = "Not important") 402 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 403 InetSocketAddress[] favoredNodes) throws IOException { 404 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 405 Path familydir = new Path(outputDir, Bytes.toString(family)); 406 if (writeMultipleTables) { 407 familydir = 408 new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); 409 } 410 WriterLength wl = new WriterLength(); 411 Algorithm compression = overriddenCompression; 412 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 413 compression = compression == null ? defaultCompression : compression; 414 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 415 bloomType = bloomType == null ? BloomType.NONE : bloomType; 416 String bloomParam = bloomParamMap.get(tableAndFamily); 417 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 418 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 419 } 420 Integer blockSize = blockSizeMap.get(tableAndFamily); 421 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 422 DataBlockEncoding encoding = overriddenEncoding; 423 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 424 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 425 HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) 426 .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf)) 427 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize) 428 .withColumnFamily(family).withTableName(tableName) 429 .withCreateTime(EnvironmentEdgeManager.currentTime()); 430 431 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 432 contextBuilder.withIncludesTags(true); 433 } 434 435 HFileContext hFileContext = contextBuilder.build(); 436 if (null == favoredNodes) { 437 wl.writer = 438 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) 439 .withBloomType(bloomType).withFileContext(hFileContext).build(); 440 } else { 441 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 442 .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) 443 .withFavoredNodes(favoredNodes).build(); 444 } 445 446 this.writers.put(tableAndFamily, wl); 447 return wl; 448 } 449 450 private void close(final StoreFileWriter w) throws IOException { 451 if (w != null) { 452 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 453 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 454 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 455 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 456 w.appendTrackedTimestampsToMetadata(); 457 w.close(); 458 } 459 } 460 461 @Override 462 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 463 for (WriterLength wl : this.writers.values()) { 464 close(wl.writer); 465 } 466 } 467 }; 468 } 469 470 /** 471 * Configure block storage policy for CF after the directory is created. 472 */ 473 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 474 byte[] tableAndFamily, Path cfPath) { 475 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 476 return; 477 } 478 479 String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 480 conf.get(STORAGE_POLICY_PROPERTY)); 481 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 482 } 483 484 /* 485 * Data structure to hold a Writer and amount of data written on it. 486 */ 487 static class WriterLength { 488 long written = 0; 489 StoreFileWriter writer = null; 490 } 491 492 /** 493 * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable. 494 */ 495 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 496 boolean writeMultipleTables) throws IOException { 497 498 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 499 for (RegionLocator regionLocator : regionLocators) { 500 TableName tableName = regionLocator.getName(); 501 LOG.info("Looking up current regions for table " + tableName); 502 byte[][] byteKeys = regionLocator.getStartKeys(); 503 for (byte[] byteKey : byteKeys) { 504 byte[] fullKey = byteKey; // HFileOutputFormat2 use case 505 if (writeMultipleTables) { 506 // MultiTableHFileOutputFormat use case 507 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 508 } 509 if (LOG.isDebugEnabled()) { 510 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 511 } 512 ret.add(new ImmutableBytesWritable(fullKey)); 513 } 514 } 515 return ret; 516 } 517 518 /** 519 * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that 520 * contains the split points in startKeys. 521 */ 522 @SuppressWarnings("deprecation") 523 private static void writePartitions(Configuration conf, Path partitionsPath, 524 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 525 LOG.info("Writing partition information to " + partitionsPath); 526 if (startKeys.isEmpty()) { 527 throw new IllegalArgumentException("No regions passed"); 528 } 529 530 // We're generating a list of split points, and we don't ever 531 // have keys < the first region (which has an empty start key) 532 // so we need to remove it. Otherwise we would end up with an 533 // empty reducer with index 0 534 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 535 ImmutableBytesWritable first = sorted.first(); 536 if (writeMultipleTables) { 537 first = 538 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 539 } 540 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 541 throw new IllegalArgumentException( 542 "First region of table should have empty start key. Instead has: " 543 + Bytes.toStringBinary(first.get())); 544 } 545 sorted.remove(sorted.first()); 546 547 // Write the actual file 548 FileSystem fs = partitionsPath.getFileSystem(conf); 549 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, 550 ImmutableBytesWritable.class, NullWritable.class); 551 552 try { 553 for (ImmutableBytesWritable startKey : sorted) { 554 writer.append(startKey, NullWritable.get()); 555 } 556 } finally { 557 writer.close(); 558 } 559 } 560 561 /** 562 * Configure a MapReduce Job to perform an incremental load into the given table. This 563 * <ul> 564 * <li>Inspects the table to configure a total order partitioner</li> 565 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 566 * <li>Sets the number of reduce tasks to match the current number of regions</li> 567 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 568 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 569 * PutSortReducer)</li> 570 * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li> 571 * </ul> 572 * The user should be sure to set the map output value class to either KeyValue or Put before 573 * running this function. 574 */ 575 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 576 throws IOException { 577 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 578 configureRemoteCluster(job, table.getConfiguration()); 579 } 580 581 /** 582 * Configure a MapReduce Job to perform an incremental load into the given table. This 583 * <ul> 584 * <li>Inspects the table to configure a total order partitioner</li> 585 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 586 * <li>Sets the number of reduce tasks to match the current number of regions</li> 587 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 588 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 589 * PutSortReducer)</li> 590 * </ul> 591 * The user should be sure to set the map output value class to either KeyValue or Put before 592 * running this function. 593 */ 594 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 595 RegionLocator regionLocator) throws IOException { 596 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 597 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 598 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 599 } 600 601 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 602 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 603 Configuration conf = job.getConfiguration(); 604 job.setOutputKeyClass(ImmutableBytesWritable.class); 605 job.setOutputValueClass(MapReduceExtendedCell.class); 606 job.setOutputFormatClass(cls); 607 608 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 609 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 610 } 611 boolean writeMultipleTables = false; 612 if (MultiTableHFileOutputFormat.class.equals(cls)) { 613 writeMultipleTables = true; 614 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 615 } 616 // Based on the configured map output class, set the correct reducer to properly 617 // sort the incoming values. 618 // TODO it would be nice to pick one or the other of these formats. 619 if ( 620 KeyValue.class.equals(job.getMapOutputValueClass()) 621 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) 622 ) { 623 job.setReducerClass(CellSortReducer.class); 624 } else if (Put.class.equals(job.getMapOutputValueClass())) { 625 job.setReducerClass(PutSortReducer.class); 626 } else if (Text.class.equals(job.getMapOutputValueClass())) { 627 job.setReducerClass(TextSortReducer.class); 628 } else { 629 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 630 } 631 632 mergeSerializations(conf); 633 634 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 635 LOG.info("bulkload locality sensitive enabled"); 636 } 637 638 /* Now get the region start keys for every table required */ 639 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 640 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 641 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 642 643 for (TableInfo tableInfo : multiTableInfo) { 644 regionLocators.add(tableInfo.getRegionLocator()); 645 String tn = writeMultipleTables 646 ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() 647 : tableInfo.getRegionLocator().getName().getNameAsString(); 648 allTableNames.add(tn); 649 tableDescriptors.add(tableInfo.getTableDescriptor()); 650 } 651 // Record tablenames for creating writer by favored nodes, and decoding compression, 652 // block size and other attributes of columnfamily per table 653 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 654 StringUtils.join(allTableNames, Bytes.toString(tableSeparator))); 655 List<ImmutableBytesWritable> startKeys = 656 getRegionStartKeys(regionLocators, writeMultipleTables); 657 // Use table's region boundaries for TOP split points. 658 LOG.info("Configuring " + startKeys.size() + " reduce partitions " 659 + "to match current region count for all tables"); 660 job.setNumReduceTasks(startKeys.size()); 661 662 configurePartitioner(job, startKeys, writeMultipleTables); 663 // Set compression algorithms based on column families 664 665 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 666 serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); 667 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 668 serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); 669 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 670 serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); 671 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 672 serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); 673 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 674 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 675 676 TableMapReduceUtil.addDependencyJars(job); 677 TableMapReduceUtil.initCredentials(job); 678 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 679 } 680 681 private static void mergeSerializations(Configuration conf) { 682 List<String> serializations = new ArrayList<>(); 683 684 // add any existing values that have been set 685 String[] existing = conf.getStrings("io.serializations"); 686 if (existing != null) { 687 Collections.addAll(serializations, existing); 688 } 689 690 serializations.add(MutationSerialization.class.getName()); 691 serializations.add(ResultSerialization.class.getName()); 692 693 // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's 694 // SerializationFactory runs through serializations in the order they are registered. 695 // We want to register ExtendedCellSerialization before CellSerialization because both 696 // work for ExtendedCells but only ExtendedCellSerialization handles them properly. 697 if ( 698 conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, 699 EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) 700 ) { 701 serializations.add(ExtendedCellSerialization.class.getName()); 702 } 703 serializations.add(CellSerialization.class.getName()); 704 705 conf.setStrings("io.serializations", serializations.toArray(new String[0])); 706 } 707 708 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) 709 throws IOException { 710 Configuration conf = job.getConfiguration(); 711 712 job.setOutputKeyClass(ImmutableBytesWritable.class); 713 job.setOutputValueClass(MapReduceExtendedCell.class); 714 job.setOutputFormatClass(HFileOutputFormat2.class); 715 716 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 717 singleTableDescriptor.add(tableDescriptor); 718 719 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 720 // Set compression algorithms based on column families 721 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 722 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 723 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 724 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 725 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 726 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 727 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 728 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 729 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 730 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 731 732 TableMapReduceUtil.addDependencyJars(job); 733 TableMapReduceUtil.initCredentials(job); 734 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 735 } 736 737 /** 738 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 739 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 740 * HBase cluster to be used to load region location is configured in the job configuration. Call 741 * this method when another HBase cluster key is configured in the job configuration. For example, 742 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 743 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 744 * A and locality-sensitive won't working correctly. 745 * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using 746 * {@link Table#getConfiguration} as clusterConf. See HBASE-25608. 747 * @param job which has configuration to be updated 748 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 749 * @see #configureIncrementalLoad(Job, Table, RegionLocator) 750 * @see #LOCALITY_SENSITIVE_CONF_KEY 751 * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY 752 * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY 753 * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY 754 */ 755 public static void configureRemoteCluster(Job job, Configuration clusterConf) { 756 Configuration conf = job.getConfiguration(); 757 758 if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 759 return; 760 } 761 762 final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); 763 final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 764 HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); 765 final String parent = 766 clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 767 768 conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); 769 conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); 770 conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); 771 772 LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort 773 + "/" + parent); 774 } 775 776 /** 777 * Runs inside the task to deserialize column family to compression algorithm map from the 778 * configuration. 779 * @param conf to read the serialized values from 780 * @return a map from column family to the configured compression algorithm 781 */ 782 @InterfaceAudience.Private 783 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { 784 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); 785 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 786 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 787 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 788 compressionMap.put(e.getKey(), algorithm); 789 } 790 return compressionMap; 791 } 792 793 /** 794 * Runs inside the task to deserialize column family to bloom filter type map from the 795 * configuration. 796 * @param conf to read the serialized values from 797 * @return a map from column family to the the configured bloom filter type 798 */ 799 @InterfaceAudience.Private 800 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 801 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); 802 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 803 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 804 BloomType bloomType = BloomType.valueOf(e.getValue()); 805 bloomTypeMap.put(e.getKey(), bloomType); 806 } 807 return bloomTypeMap; 808 } 809 810 /** 811 * Runs inside the task to deserialize column family to bloom filter param map from the 812 * configuration. 813 * @param conf to read the serialized values from 814 * @return a map from column family to the the configured bloom filter param 815 */ 816 @InterfaceAudience.Private 817 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 818 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 819 } 820 821 /** 822 * Runs inside the task to deserialize column family to block size map from the configuration. 823 * @param conf to read the serialized values from 824 * @return a map from column family to the configured block size 825 */ 826 @InterfaceAudience.Private 827 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 828 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); 829 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 830 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 831 Integer blockSize = Integer.parseInt(e.getValue()); 832 blockSizeMap.put(e.getKey(), blockSize); 833 } 834 return blockSizeMap; 835 } 836 837 /** 838 * Runs inside the task to deserialize column family to data block encoding type map from the 839 * configuration. 840 * @param conf to read the serialized values from 841 * @return a map from column family to HFileDataBlockEncoder for the configured data block type 842 * for the family 843 */ 844 @InterfaceAudience.Private 845 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { 846 Map<byte[], String> stringMap = 847 createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 848 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 849 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 850 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 851 } 852 return encoderMap; 853 } 854 855 /** 856 * Run inside the task to deserialize column family to given conf value map. 857 * @param conf to read the serialized values from 858 * @param confName conf key to read from the configuration 859 * @return a map of column family to the given configuration value 860 */ 861 private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { 862 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 863 String confVal = conf.get(confName, ""); 864 for (String familyConf : confVal.split("&")) { 865 String[] familySplit = familyConf.split("="); 866 if (familySplit.length != 2) { 867 continue; 868 } 869 try { 870 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 871 URLDecoder.decode(familySplit[1], "UTF-8")); 872 } catch (UnsupportedEncodingException e) { 873 // will not happen with UTF-8 encoding 874 throw new AssertionError(e); 875 } 876 } 877 return confValMap; 878 } 879 880 /** 881 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 882 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 883 */ 884 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, 885 boolean writeMultipleTables) throws IOException { 886 Configuration conf = job.getConfiguration(); 887 // create the partitions file 888 FileSystem fs = FileSystem.get(conf); 889 String hbaseTmpFsDir = 890 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging"); 891 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 892 fs.makeQualified(partitionsPath); 893 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 894 fs.deleteOnExit(partitionsPath); 895 896 // configure job to use it 897 job.setPartitionerClass(TotalOrderPartitioner.class); 898 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 899 } 900 901 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 902 value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 903 @InterfaceAudience.Private 904 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 905 List<TableDescriptor> allTables) throws UnsupportedEncodingException { 906 StringBuilder attributeValue = new StringBuilder(); 907 int i = 0; 908 for (TableDescriptor tableDescriptor : allTables) { 909 if (tableDescriptor == null) { 910 // could happen with mock table instance 911 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 912 return ""; 913 } 914 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 915 if (i++ > 0) { 916 attributeValue.append('&'); 917 } 918 attributeValue.append(URLEncoder 919 .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 920 familyDescriptor.getName())), "UTF-8")); 921 attributeValue.append('='); 922 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 923 } 924 } 925 // Get rid of the last ampersand 926 return attributeValue.toString(); 927 } 928 929 /** 930 * Serialize column family to compression algorithm map to configuration. Invoked while 931 * configuring the MR job for incremental load. 932 */ 933 @InterfaceAudience.Private 934 static Function<ColumnFamilyDescriptor, String> compressionDetails = 935 familyDescriptor -> familyDescriptor.getCompressionType().getName(); 936 937 /** 938 * Serialize column family to block size map to configuration. Invoked while configuring the MR 939 * job for incremental load. 940 */ 941 @InterfaceAudience.Private 942 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 943 familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize()); 944 945 /** 946 * Serialize column family to bloom type map to configuration. Invoked while configuring the MR 947 * job for incremental load. 948 */ 949 @InterfaceAudience.Private 950 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 951 String bloomType = familyDescriptor.getBloomFilterType().toString(); 952 if (bloomType == null) { 953 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 954 } 955 return bloomType; 956 }; 957 958 /** 959 * Serialize column family to bloom param map to configuration. Invoked while configuring the MR 960 * job for incremental load. 961 */ 962 @InterfaceAudience.Private 963 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 964 BloomType bloomType = familyDescriptor.getBloomFilterType(); 965 String bloomParam = ""; 966 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 967 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 968 } 969 return bloomParam; 970 }; 971 972 /** 973 * Serialize column family to data block encoding map to configuration. Invoked while configuring 974 * the MR job for incremental load. 975 */ 976 @InterfaceAudience.Private 977 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 978 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 979 if (encoding == null) { 980 encoding = DataBlockEncoding.NONE; 981 } 982 return encoding.toString(); 983 }; 984 985}