001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.nio.charset.Charset; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.function.Function; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.ExtendedCell; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.HRegionLocation; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.PrivateCellUtil; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RegionLocator; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.client.TableDescriptor; 063import org.apache.hadoop.hbase.fs.HFileSystem; 064import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 065import org.apache.hadoop.hbase.io.compress.Compression; 066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 068import org.apache.hadoop.hbase.io.hfile.CacheConfig; 069import org.apache.hadoop.hbase.io.hfile.HFile; 070import org.apache.hadoop.hbase.io.hfile.HFileContext; 071import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 072import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 073import org.apache.hadoop.hbase.regionserver.BloomType; 074import org.apache.hadoop.hbase.regionserver.HStore; 075import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 076import org.apache.hadoop.hbase.regionserver.StoreUtils; 077import org.apache.hadoop.hbase.util.BloomFilterUtil; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CommonFSUtils; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 082import org.apache.hadoop.io.NullWritable; 083import org.apache.hadoop.io.SequenceFile; 084import org.apache.hadoop.io.Text; 085import org.apache.hadoop.mapreduce.Job; 086import org.apache.hadoop.mapreduce.OutputCommitter; 087import org.apache.hadoop.mapreduce.OutputFormat; 088import org.apache.hadoop.mapreduce.RecordWriter; 089import org.apache.hadoop.mapreduce.TaskAttemptContext; 090import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 091import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; 092import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 093import org.apache.yetus.audience.InterfaceAudience; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the 099 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will 100 * forcibly roll all HFiles being written. 101 * <p> 102 * Using this class as part of a MapReduce job is best done using 103 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 104 */ 105@InterfaceAudience.Public 106public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 107 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 108 109 static class TableInfo { 110 private TableDescriptor tableDesctiptor; 111 private RegionLocator regionLocator; 112 113 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 114 this.tableDesctiptor = tableDesctiptor; 115 this.regionLocator = regionLocator; 116 } 117 118 public TableDescriptor getTableDescriptor() { 119 return tableDesctiptor; 120 } 121 122 public RegionLocator getRegionLocator() { 123 return regionLocator; 124 } 125 } 126 127 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 128 129 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 130 return Bytes.add(tableName, tableSeparator, suffix); 131 } 132 133 // The following constants are private since these are used by 134 // HFileOutputFormat2 to internally transfer data between job setup and 135 // reducer run using conf. 136 // These should not be changed by the client. 137 static final String COMPRESSION_FAMILIES_CONF_KEY = 138 "hbase.hfileoutputformat.families.compression"; 139 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; 140 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; 141 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; 142 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 143 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 144 145 // This constant is public since the client can modify this when setting 146 // up their conf object and thus refer to this symbol. 147 // It is present for backwards compatibility reasons. Use it only to 148 // override the auto-detection of datablock encoding and compression. 149 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 150 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 151 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 152 "hbase.mapreduce.hfileoutputformat.compression"; 153 154 /** 155 * Keep locality while generating HFiles for bulkload. See HBASE-12596 156 */ 157 public static final String LOCALITY_SENSITIVE_CONF_KEY = 158 "hbase.bulkload.locality.sensitive.enabled"; 159 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 160 static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; 161 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 162 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 163 164 /** 165 * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config 166 * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell. 167 */ 168 @InterfaceAudience.Private 169 public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = 170 "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; 171 static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; 172 173 public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; 174 public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = 175 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; 176 public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = 177 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR; 178 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = 179 REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; 180 181 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 182 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 183 184 @Override 185 public RecordWriter<ImmutableBytesWritable, Cell> 186 getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { 187 return createRecordWriter(context, this.getOutputCommitter(context)); 188 } 189 190 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 191 return combineTableNameSuffix(tableName, family); 192 } 193 194 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 195 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 196 197 // Get the path of the temporary output file 198 final Path outputDir = ((PathOutputCommitter) committer).getWorkPath(); 199 final Configuration conf = context.getConfiguration(); 200 final boolean writeMultipleTables = 201 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 202 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 203 if (writeTableNames == null || writeTableNames.isEmpty()) { 204 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 205 } 206 final FileSystem fs = outputDir.getFileSystem(conf); 207 // These configs. are from hbase-*.xml 208 final long maxsize = 209 conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); 210 // Invented config. Add to hbase-*.xml if other than default compression. 211 final String defaultCompressionStr = 212 conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); 213 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 214 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 215 final Algorithm overriddenCompression = 216 compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null; 217 final boolean compactionExclude = 218 conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 219 final Set<String> allTableNames = Arrays 220 .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 221 222 // create a map from column family to the compression algorithm 223 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 224 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 225 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 226 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 227 228 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 229 final Map<byte[], DataBlockEncoding> datablockEncodingMap = 230 createFamilyDataBlockEncodingMap(conf); 231 final DataBlockEncoding overriddenEncoding = 232 dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 233 234 return new RecordWriter<ImmutableBytesWritable, V>() { 235 // Map of families to writers and how much has been output on the writer. 236 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 237 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 238 private final long now = EnvironmentEdgeManager.currentTime(); 239 private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); 240 241 @Override 242 public void write(ImmutableBytesWritable row, V cell) throws IOException { 243 // null input == user explicitly wants to flush 244 if (row == null && cell == null) { 245 rollWriters(null); 246 return; 247 } 248 249 ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell); 250 byte[] rowKey = CellUtil.cloneRow(kv); 251 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 252 byte[] family = CellUtil.cloneFamily(kv); 253 if (writeMultipleTables) { 254 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 255 tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() 256 .getBytes(Charset.defaultCharset()); 257 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 258 throw new IllegalArgumentException( 259 "TableName " + Bytes.toString(tableNameBytes) + " not expected"); 260 } 261 } 262 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 263 264 WriterLength wl = this.writers.get(tableAndFamily); 265 266 // If this is a new column family, verify that the directory exists 267 if (wl == null) { 268 Path writerPath = null; 269 if (writeMultipleTables) { 270 Path tableRelPath = getTableRelativePath(tableNameBytes); 271 writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); 272 } else { 273 writerPath = new Path(outputDir, Bytes.toString(family)); 274 } 275 fs.mkdirs(writerPath); 276 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 277 } 278 279 // This can only happen once a row is finished though 280 if ( 281 wl != null && wl.written + length >= maxsize 282 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 283 ) { 284 rollWriters(wl); 285 } 286 287 // create a new WAL writer, if necessary 288 if (wl == null || wl.writer == null) { 289 InetSocketAddress[] favoredNodes = null; 290 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 291 HRegionLocation loc = null; 292 String tableName = Bytes.toString(tableNameBytes); 293 if (tableName != null) { 294 try ( 295 Connection connection = 296 ConnectionFactory.createConnection(createRemoteClusterConf(conf)); 297 RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { 298 loc = locator.getRegionLocation(rowKey); 299 } catch (Throwable e) { 300 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), 301 tableName, e); 302 loc = null; 303 } 304 } 305 if (null == loc) { 306 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 307 } else { 308 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 309 InetSocketAddress initialIsa = 310 new InetSocketAddress(loc.getHostname(), loc.getPort()); 311 if (initialIsa.isUnresolved()) { 312 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 313 } else { 314 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 315 favoredNodes = new InetSocketAddress[] { initialIsa }; 316 } 317 } 318 } 319 wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); 320 321 } 322 323 // we now have the proper WAL writer. full steam ahead 324 PrivateCellUtil.updateLatestStamp(kv, this.now); 325 wl.writer.append((ExtendedCell) kv); 326 wl.written += length; 327 328 // Copy the row so we know when a row transition. 329 this.previousRows.put(family, rowKey); 330 } 331 332 private Path getTableRelativePath(byte[] tableNameBytes) { 333 String tableName = Bytes.toString(tableNameBytes); 334 String[] tableNameParts = tableName.split(":"); 335 Path tableRelPath = new Path(tableNameParts[0]); 336 if (tableNameParts.length > 1) { 337 tableRelPath = new Path(tableRelPath, tableNameParts[1]); 338 } 339 return tableRelPath; 340 } 341 342 private void rollWriters(WriterLength writerLength) throws IOException { 343 if (writerLength != null) { 344 closeWriter(writerLength); 345 } else { 346 for (WriterLength wl : this.writers.values()) { 347 closeWriter(wl); 348 } 349 } 350 } 351 352 private void closeWriter(WriterLength wl) throws IOException { 353 if (wl.writer != null) { 354 LOG.info( 355 "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); 356 close(wl.writer); 357 wl.writer = null; 358 } 359 wl.written = 0; 360 } 361 362 private Configuration createRemoteClusterConf(Configuration conf) { 363 final Configuration newConf = new Configuration(conf); 364 365 final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); 366 final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); 367 final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); 368 369 if (quorum != null && clientPort != null && parent != null) { 370 newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); 371 newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); 372 newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); 373 } 374 375 for (Entry<String, String> entry : conf) { 376 String key = entry.getKey(); 377 if ( 378 REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key) 379 || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key) 380 || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key) 381 ) { 382 // Handled them above 383 continue; 384 } 385 386 if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) { 387 String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length()); 388 if (!originalKey.isEmpty()) { 389 newConf.set(originalKey, entry.getValue()); 390 } 391 } 392 } 393 394 return newConf; 395 } 396 397 /* 398 * Create a new StoreFile.Writer. 399 * @return A WriterLength, containing a new StoreFile.Writer. 400 */ 401 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", 402 justification = "Not important") 403 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 404 InetSocketAddress[] favoredNodes) throws IOException { 405 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 406 Path familydir = new Path(outputDir, Bytes.toString(family)); 407 if (writeMultipleTables) { 408 familydir = 409 new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); 410 } 411 WriterLength wl = new WriterLength(); 412 Algorithm compression = overriddenCompression; 413 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 414 compression = compression == null ? defaultCompression : compression; 415 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 416 bloomType = bloomType == null ? BloomType.NONE : bloomType; 417 String bloomParam = bloomParamMap.get(tableAndFamily); 418 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 419 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 420 } 421 Integer blockSize = blockSizeMap.get(tableAndFamily); 422 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 423 DataBlockEncoding encoding = overriddenEncoding; 424 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 425 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 426 HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) 427 .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf)) 428 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize) 429 .withColumnFamily(family).withTableName(tableName) 430 .withCreateTime(EnvironmentEdgeManager.currentTime()); 431 432 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 433 contextBuilder.withIncludesTags(true); 434 } 435 436 HFileContext hFileContext = contextBuilder.build(); 437 if (null == favoredNodes) { 438 wl.writer = 439 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) 440 .withBloomType(bloomType).withFileContext(hFileContext).build(); 441 } else { 442 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 443 .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) 444 .withFavoredNodes(favoredNodes).build(); 445 } 446 447 this.writers.put(tableAndFamily, wl); 448 return wl; 449 } 450 451 private void close(final StoreFileWriter w) throws IOException { 452 if (w != null) { 453 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 454 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 455 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 456 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 457 w.appendTrackedTimestampsToMetadata(); 458 w.close(); 459 } 460 } 461 462 @Override 463 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 464 for (WriterLength wl : this.writers.values()) { 465 close(wl.writer); 466 } 467 } 468 }; 469 } 470 471 /** 472 * Configure block storage policy for CF after the directory is created. 473 */ 474 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 475 byte[] tableAndFamily, Path cfPath) { 476 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 477 return; 478 } 479 480 String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 481 conf.get(STORAGE_POLICY_PROPERTY)); 482 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 483 } 484 485 /* 486 * Data structure to hold a Writer and amount of data written on it. 487 */ 488 static class WriterLength { 489 long written = 0; 490 StoreFileWriter writer = null; 491 } 492 493 /** 494 * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable. 495 */ 496 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 497 boolean writeMultipleTables) throws IOException { 498 499 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 500 for (RegionLocator regionLocator : regionLocators) { 501 TableName tableName = regionLocator.getName(); 502 LOG.info("Looking up current regions for table " + tableName); 503 byte[][] byteKeys = regionLocator.getStartKeys(); 504 for (byte[] byteKey : byteKeys) { 505 byte[] fullKey = byteKey; // HFileOutputFormat2 use case 506 if (writeMultipleTables) { 507 // MultiTableHFileOutputFormat use case 508 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 509 } 510 if (LOG.isDebugEnabled()) { 511 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 512 } 513 ret.add(new ImmutableBytesWritable(fullKey)); 514 } 515 } 516 return ret; 517 } 518 519 /** 520 * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that 521 * contains the split points in startKeys. 522 */ 523 @SuppressWarnings("deprecation") 524 private static void writePartitions(Configuration conf, Path partitionsPath, 525 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 526 LOG.info("Writing partition information to " + partitionsPath); 527 if (startKeys.isEmpty()) { 528 throw new IllegalArgumentException("No regions passed"); 529 } 530 531 // We're generating a list of split points, and we don't ever 532 // have keys < the first region (which has an empty start key) 533 // so we need to remove it. Otherwise we would end up with an 534 // empty reducer with index 0 535 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 536 ImmutableBytesWritable first = sorted.first(); 537 if (writeMultipleTables) { 538 first = 539 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 540 } 541 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 542 throw new IllegalArgumentException( 543 "First region of table should have empty start key. Instead has: " 544 + Bytes.toStringBinary(first.get())); 545 } 546 sorted.remove(sorted.first()); 547 548 // Write the actual file 549 FileSystem fs = partitionsPath.getFileSystem(conf); 550 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, 551 ImmutableBytesWritable.class, NullWritable.class); 552 553 try { 554 for (ImmutableBytesWritable startKey : sorted) { 555 writer.append(startKey, NullWritable.get()); 556 } 557 } finally { 558 writer.close(); 559 } 560 } 561 562 /** 563 * Configure a MapReduce Job to perform an incremental load into the given table. This 564 * <ul> 565 * <li>Inspects the table to configure a total order partitioner</li> 566 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 567 * <li>Sets the number of reduce tasks to match the current number of regions</li> 568 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 569 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 570 * PutSortReducer)</li> 571 * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li> 572 * </ul> 573 * The user should be sure to set the map output value class to either KeyValue or Put before 574 * running this function. 575 */ 576 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 577 throws IOException { 578 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 579 configureRemoteCluster(job, table.getConfiguration()); 580 } 581 582 /** 583 * Configure a MapReduce Job to perform an incremental load into the given table. This 584 * <ul> 585 * <li>Inspects the table to configure a total order partitioner</li> 586 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 587 * <li>Sets the number of reduce tasks to match the current number of regions</li> 588 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 589 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 590 * PutSortReducer)</li> 591 * </ul> 592 * The user should be sure to set the map output value class to either KeyValue or Put before 593 * running this function. 594 */ 595 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 596 RegionLocator regionLocator) throws IOException { 597 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 598 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 599 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 600 } 601 602 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 603 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 604 Configuration conf = job.getConfiguration(); 605 job.setOutputKeyClass(ImmutableBytesWritable.class); 606 job.setOutputValueClass(MapReduceExtendedCell.class); 607 job.setOutputFormatClass(cls); 608 609 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 610 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 611 } 612 boolean writeMultipleTables = false; 613 if (MultiTableHFileOutputFormat.class.equals(cls)) { 614 writeMultipleTables = true; 615 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 616 } 617 // Based on the configured map output class, set the correct reducer to properly 618 // sort the incoming values. 619 // TODO it would be nice to pick one or the other of these formats. 620 if ( 621 KeyValue.class.equals(job.getMapOutputValueClass()) 622 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) 623 ) { 624 job.setReducerClass(CellSortReducer.class); 625 } else if (Put.class.equals(job.getMapOutputValueClass())) { 626 job.setReducerClass(PutSortReducer.class); 627 } else if (Text.class.equals(job.getMapOutputValueClass())) { 628 job.setReducerClass(TextSortReducer.class); 629 } else { 630 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 631 } 632 633 mergeSerializations(conf); 634 635 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 636 LOG.info("bulkload locality sensitive enabled"); 637 } 638 639 /* Now get the region start keys for every table required */ 640 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 641 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 642 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 643 644 for (TableInfo tableInfo : multiTableInfo) { 645 regionLocators.add(tableInfo.getRegionLocator()); 646 String tn = writeMultipleTables 647 ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() 648 : tableInfo.getRegionLocator().getName().getNameAsString(); 649 allTableNames.add(tn); 650 tableDescriptors.add(tableInfo.getTableDescriptor()); 651 } 652 // Record tablenames for creating writer by favored nodes, and decoding compression, 653 // block size and other attributes of columnfamily per table 654 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 655 StringUtils.join(allTableNames, Bytes.toString(tableSeparator))); 656 List<ImmutableBytesWritable> startKeys = 657 getRegionStartKeys(regionLocators, writeMultipleTables); 658 // Use table's region boundaries for TOP split points. 659 LOG.info("Configuring " + startKeys.size() + " reduce partitions " 660 + "to match current region count for all tables"); 661 job.setNumReduceTasks(startKeys.size()); 662 663 configurePartitioner(job, startKeys, writeMultipleTables); 664 // Set compression algorithms based on column families 665 666 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 667 serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); 668 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 669 serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); 670 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 671 serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); 672 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 673 serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); 674 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 675 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 676 677 TableMapReduceUtil.addDependencyJars(job); 678 TableMapReduceUtil.initCredentials(job); 679 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 680 } 681 682 private static void mergeSerializations(Configuration conf) { 683 List<String> serializations = new ArrayList<>(); 684 685 // add any existing values that have been set 686 String[] existing = conf.getStrings("io.serializations"); 687 if (existing != null) { 688 Collections.addAll(serializations, existing); 689 } 690 691 serializations.add(MutationSerialization.class.getName()); 692 serializations.add(ResultSerialization.class.getName()); 693 694 // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's 695 // SerializationFactory runs through serializations in the order they are registered. 696 // We want to register ExtendedCellSerialization before CellSerialization because both 697 // work for ExtendedCells but only ExtendedCellSerialization handles them properly. 698 if ( 699 conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, 700 EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) 701 ) { 702 serializations.add(ExtendedCellSerialization.class.getName()); 703 } 704 serializations.add(CellSerialization.class.getName()); 705 706 conf.setStrings("io.serializations", serializations.toArray(new String[0])); 707 } 708 709 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) 710 throws IOException { 711 Configuration conf = job.getConfiguration(); 712 713 job.setOutputKeyClass(ImmutableBytesWritable.class); 714 job.setOutputValueClass(MapReduceExtendedCell.class); 715 job.setOutputFormatClass(HFileOutputFormat2.class); 716 717 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 718 singleTableDescriptor.add(tableDescriptor); 719 720 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 721 // Set compression algorithms based on column families 722 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 723 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 724 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 725 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 726 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 727 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 728 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 729 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 730 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 731 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 732 733 TableMapReduceUtil.addDependencyJars(job); 734 TableMapReduceUtil.initCredentials(job); 735 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 736 } 737 738 /** 739 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 740 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 741 * HBase cluster to be used to load region location is configured in the job configuration. Call 742 * this method when another HBase cluster key is configured in the job configuration. For example, 743 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 744 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 745 * A and locality-sensitive won't working correctly. 746 * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using 747 * {@link Table#getConfiguration} as clusterConf. See HBASE-25608. 748 * @param job which has configuration to be updated 749 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 750 * @see #configureIncrementalLoad(Job, Table, RegionLocator) 751 * @see #LOCALITY_SENSITIVE_CONF_KEY 752 * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY 753 * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY 754 * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY 755 */ 756 public static void configureRemoteCluster(Job job, Configuration clusterConf) { 757 Configuration conf = job.getConfiguration(); 758 759 if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 760 return; 761 } 762 763 final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); 764 final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 765 HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); 766 final String parent = 767 clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 768 769 conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); 770 conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); 771 conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); 772 773 LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort 774 + "/" + parent); 775 } 776 777 /** 778 * Runs inside the task to deserialize column family to compression algorithm map from the 779 * configuration. 780 * @param conf to read the serialized values from 781 * @return a map from column family to the configured compression algorithm 782 */ 783 @InterfaceAudience.Private 784 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { 785 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); 786 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 787 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 788 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 789 compressionMap.put(e.getKey(), algorithm); 790 } 791 return compressionMap; 792 } 793 794 /** 795 * Runs inside the task to deserialize column family to bloom filter type map from the 796 * configuration. 797 * @param conf to read the serialized values from 798 * @return a map from column family to the the configured bloom filter type 799 */ 800 @InterfaceAudience.Private 801 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 802 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); 803 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 804 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 805 BloomType bloomType = BloomType.valueOf(e.getValue()); 806 bloomTypeMap.put(e.getKey(), bloomType); 807 } 808 return bloomTypeMap; 809 } 810 811 /** 812 * Runs inside the task to deserialize column family to bloom filter param map from the 813 * configuration. 814 * @param conf to read the serialized values from 815 * @return a map from column family to the the configured bloom filter param 816 */ 817 @InterfaceAudience.Private 818 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 819 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 820 } 821 822 /** 823 * Runs inside the task to deserialize column family to block size map from the configuration. 824 * @param conf to read the serialized values from 825 * @return a map from column family to the configured block size 826 */ 827 @InterfaceAudience.Private 828 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 829 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); 830 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 831 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 832 Integer blockSize = Integer.parseInt(e.getValue()); 833 blockSizeMap.put(e.getKey(), blockSize); 834 } 835 return blockSizeMap; 836 } 837 838 /** 839 * Runs inside the task to deserialize column family to data block encoding type map from the 840 * configuration. 841 * @param conf to read the serialized values from 842 * @return a map from column family to HFileDataBlockEncoder for the configured data block type 843 * for the family 844 */ 845 @InterfaceAudience.Private 846 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { 847 Map<byte[], String> stringMap = 848 createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 849 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 850 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 851 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 852 } 853 return encoderMap; 854 } 855 856 /** 857 * Run inside the task to deserialize column family to given conf value map. 858 * @param conf to read the serialized values from 859 * @param confName conf key to read from the configuration 860 * @return a map of column family to the given configuration value 861 */ 862 private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { 863 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 864 String confVal = conf.get(confName, ""); 865 for (String familyConf : confVal.split("&")) { 866 String[] familySplit = familyConf.split("="); 867 if (familySplit.length != 2) { 868 continue; 869 } 870 try { 871 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 872 URLDecoder.decode(familySplit[1], "UTF-8")); 873 } catch (UnsupportedEncodingException e) { 874 // will not happen with UTF-8 encoding 875 throw new AssertionError(e); 876 } 877 } 878 return confValMap; 879 } 880 881 /** 882 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 883 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 884 */ 885 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, 886 boolean writeMultipleTables) throws IOException { 887 Configuration conf = job.getConfiguration(); 888 // create the partitions file 889 FileSystem fs = FileSystem.get(conf); 890 String hbaseTmpFsDir = 891 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging"); 892 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 893 fs.makeQualified(partitionsPath); 894 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 895 fs.deleteOnExit(partitionsPath); 896 897 // configure job to use it 898 job.setPartitionerClass(TotalOrderPartitioner.class); 899 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 900 } 901 902 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 903 value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 904 @InterfaceAudience.Private 905 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 906 List<TableDescriptor> allTables) throws UnsupportedEncodingException { 907 StringBuilder attributeValue = new StringBuilder(); 908 int i = 0; 909 for (TableDescriptor tableDescriptor : allTables) { 910 if (tableDescriptor == null) { 911 // could happen with mock table instance 912 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 913 return ""; 914 } 915 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 916 if (i++ > 0) { 917 attributeValue.append('&'); 918 } 919 attributeValue.append(URLEncoder 920 .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 921 familyDescriptor.getName())), "UTF-8")); 922 attributeValue.append('='); 923 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 924 } 925 } 926 // Get rid of the last ampersand 927 return attributeValue.toString(); 928 } 929 930 /** 931 * Serialize column family to compression algorithm map to configuration. Invoked while 932 * configuring the MR job for incremental load. 933 */ 934 @InterfaceAudience.Private 935 static Function<ColumnFamilyDescriptor, String> compressionDetails = 936 familyDescriptor -> familyDescriptor.getCompressionType().getName(); 937 938 /** 939 * Serialize column family to block size map to configuration. Invoked while configuring the MR 940 * job for incremental load. 941 */ 942 @InterfaceAudience.Private 943 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 944 familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize()); 945 946 /** 947 * Serialize column family to bloom type map to configuration. Invoked while configuring the MR 948 * job for incremental load. 949 */ 950 @InterfaceAudience.Private 951 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 952 String bloomType = familyDescriptor.getBloomFilterType().toString(); 953 if (bloomType == null) { 954 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 955 } 956 return bloomType; 957 }; 958 959 /** 960 * Serialize column family to bloom param map to configuration. Invoked while configuring the MR 961 * job for incremental load. 962 */ 963 @InterfaceAudience.Private 964 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 965 BloomType bloomType = familyDescriptor.getBloomFilterType(); 966 String bloomParam = ""; 967 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 968 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 969 } 970 return bloomParam; 971 }; 972 973 /** 974 * Serialize column family to data block encoding map to configuration. Invoked while configuring 975 * the MR job for incremental load. 976 */ 977 @InterfaceAudience.Private 978 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 979 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 980 if (encoding == null) { 981 encoding = DataBlockEncoding.NONE; 982 } 983 return encoding.toString(); 984 }; 985 986}