001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.nio.charset.Charset; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.function.Function; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.ExtendedCell; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.HRegionLocation; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.KeyValueUtil; 054import org.apache.hadoop.hbase.PrivateCellUtil; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 058import org.apache.hadoop.hbase.client.Connection; 059import org.apache.hadoop.hbase.client.ConnectionFactory; 060import org.apache.hadoop.hbase.client.Put; 061import org.apache.hadoop.hbase.client.RegionLocator; 062import org.apache.hadoop.hbase.client.Table; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.fs.HFileSystem; 065import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 066import org.apache.hadoop.hbase.io.compress.Compression; 067import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 068import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 069import org.apache.hadoop.hbase.io.hfile.CacheConfig; 070import org.apache.hadoop.hbase.io.hfile.HFile; 071import org.apache.hadoop.hbase.io.hfile.HFileContext; 072import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 073import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 074import org.apache.hadoop.hbase.regionserver.BloomType; 075import org.apache.hadoop.hbase.regionserver.HStore; 076import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 077import org.apache.hadoop.hbase.regionserver.StoreUtils; 078import org.apache.hadoop.hbase.util.BloomFilterUtil; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 083import org.apache.hadoop.io.NullWritable; 084import org.apache.hadoop.io.SequenceFile; 085import org.apache.hadoop.io.Text; 086import org.apache.hadoop.io.Writable; 087import org.apache.hadoop.mapreduce.Job; 088import org.apache.hadoop.mapreduce.OutputCommitter; 089import org.apache.hadoop.mapreduce.OutputFormat; 090import org.apache.hadoop.mapreduce.RecordWriter; 091import org.apache.hadoop.mapreduce.TaskAttemptContext; 092import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 093import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; 094import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099/** 100 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the 101 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will 102 * forcibly roll all HFiles being written. 103 * <p> 104 * Using this class as part of a MapReduce job is best done using 105 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 106 */ 107@InterfaceAudience.Public 108public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 109 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 110 111 static class TableInfo { 112 private TableDescriptor tableDesctiptor; 113 private RegionLocator regionLocator; 114 115 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 116 this.tableDesctiptor = tableDesctiptor; 117 this.regionLocator = regionLocator; 118 } 119 120 public TableDescriptor getTableDescriptor() { 121 return tableDesctiptor; 122 } 123 124 public RegionLocator getRegionLocator() { 125 return regionLocator; 126 } 127 } 128 129 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 130 131 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 132 return Bytes.add(tableName, tableSeparator, suffix); 133 } 134 135 // The following constants are private since these are used by 136 // HFileOutputFormat2 to internally transfer data between job setup and 137 // reducer run using conf. 138 // These should not be changed by the client. 139 static final String COMPRESSION_FAMILIES_CONF_KEY = 140 "hbase.hfileoutputformat.families.compression"; 141 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; 142 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; 143 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; 144 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 145 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 146 147 // This constant is public since the client can modify this when setting 148 // up their conf object and thus refer to this symbol. 149 // It is present for backwards compatibility reasons. Use it only to 150 // override the auto-detection of datablock encoding and compression. 151 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 152 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 153 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 154 "hbase.mapreduce.hfileoutputformat.compression"; 155 156 /** 157 * Keep locality while generating HFiles for bulkload. See HBASE-12596 158 */ 159 public static final String LOCALITY_SENSITIVE_CONF_KEY = 160 "hbase.bulkload.locality.sensitive.enabled"; 161 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 162 static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; 163 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 164 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 165 166 /** 167 * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config 168 * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell. 169 */ 170 @InterfaceAudience.Private 171 public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = 172 "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; 173 static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; 174 175 @InterfaceAudience.Private 176 public static final String DISK_BASED_SORTING_ENABLED_KEY = 177 "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled"; 178 private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false; 179 180 public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; 181 public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = 182 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; 183 public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = 184 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR; 185 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = 186 REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; 187 188 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 189 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 190 191 @Override 192 public RecordWriter<ImmutableBytesWritable, Cell> 193 getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { 194 return createRecordWriter(context, this.getOutputCommitter(context)); 195 } 196 197 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 198 return combineTableNameSuffix(tableName, family); 199 } 200 201 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 202 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 203 204 // Get the path of the temporary output file 205 final Path outputDir = ((PathOutputCommitter) committer).getWorkPath(); 206 final Configuration conf = context.getConfiguration(); 207 final boolean writeMultipleTables = 208 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 209 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 210 if (writeTableNames == null || writeTableNames.isEmpty()) { 211 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 212 } 213 final FileSystem fs = outputDir.getFileSystem(conf); 214 // These configs. are from hbase-*.xml 215 final long maxsize = 216 conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); 217 // Invented config. Add to hbase-*.xml if other than default compression. 218 final String defaultCompressionStr = 219 conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); 220 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 221 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 222 final Algorithm overriddenCompression = 223 compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null; 224 final boolean compactionExclude = 225 conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 226 final Set<String> allTableNames = Arrays 227 .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 228 229 // create a map from column family to the compression algorithm 230 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 231 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 232 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 233 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 234 235 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 236 final Map<byte[], DataBlockEncoding> datablockEncodingMap = 237 createFamilyDataBlockEncodingMap(conf); 238 final DataBlockEncoding overriddenEncoding = 239 dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 240 241 return new RecordWriter<ImmutableBytesWritable, V>() { 242 // Map of families to writers and how much has been output on the writer. 243 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 244 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 245 private final long now = EnvironmentEdgeManager.currentTime(); 246 private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); 247 248 @Override 249 public void write(ImmutableBytesWritable row, V cell) throws IOException { 250 // null input == user explicitly wants to flush 251 if (row == null && cell == null) { 252 rollWriters(null); 253 return; 254 } 255 256 ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell); 257 byte[] rowKey = CellUtil.cloneRow(kv); 258 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 259 byte[] family = CellUtil.cloneFamily(kv); 260 if (writeMultipleTables) { 261 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 262 tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() 263 .getBytes(Charset.defaultCharset()); 264 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 265 throw new IllegalArgumentException( 266 "TableName " + Bytes.toString(tableNameBytes) + " not expected"); 267 } 268 } 269 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 270 271 WriterLength wl = this.writers.get(tableAndFamily); 272 273 // If this is a new column family, verify that the directory exists 274 if (wl == null) { 275 Path writerPath = null; 276 if (writeMultipleTables) { 277 Path tableRelPath = getTableRelativePath(tableNameBytes); 278 writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); 279 } else { 280 writerPath = new Path(outputDir, Bytes.toString(family)); 281 } 282 fs.mkdirs(writerPath); 283 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 284 } 285 286 // This can only happen once a row is finished though 287 if ( 288 wl != null && wl.written + length >= maxsize 289 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 290 ) { 291 rollWriters(wl); 292 } 293 294 // create a new WAL writer, if necessary 295 if (wl == null || wl.writer == null) { 296 InetSocketAddress[] favoredNodes = null; 297 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 298 HRegionLocation loc = null; 299 String tableName = Bytes.toString(tableNameBytes); 300 if (tableName != null) { 301 try ( 302 Connection connection = 303 ConnectionFactory.createConnection(createRemoteClusterConf(conf)); 304 RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { 305 loc = locator.getRegionLocation(rowKey); 306 } catch (Throwable e) { 307 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), 308 tableName, e); 309 loc = null; 310 } 311 } 312 if (null == loc) { 313 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 314 } else { 315 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 316 InetSocketAddress initialIsa = 317 new InetSocketAddress(loc.getHostname(), loc.getPort()); 318 if (initialIsa.isUnresolved()) { 319 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 320 } else { 321 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 322 favoredNodes = new InetSocketAddress[] { initialIsa }; 323 } 324 } 325 } 326 wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); 327 328 } 329 330 // we now have the proper WAL writer. full steam ahead 331 PrivateCellUtil.updateLatestStamp(kv, this.now); 332 wl.writer.append((ExtendedCell) kv); 333 wl.written += length; 334 335 // Copy the row so we know when a row transition. 336 this.previousRows.put(family, rowKey); 337 } 338 339 private Path getTableRelativePath(byte[] tableNameBytes) { 340 String tableName = Bytes.toString(tableNameBytes); 341 String[] tableNameParts = tableName.split(":"); 342 Path tableRelPath = new Path(tableNameParts[0]); 343 if (tableNameParts.length > 1) { 344 tableRelPath = new Path(tableRelPath, tableNameParts[1]); 345 } 346 return tableRelPath; 347 } 348 349 private void rollWriters(WriterLength writerLength) throws IOException { 350 if (writerLength != null) { 351 closeWriter(writerLength); 352 } else { 353 for (WriterLength wl : this.writers.values()) { 354 closeWriter(wl); 355 } 356 } 357 } 358 359 private void closeWriter(WriterLength wl) throws IOException { 360 if (wl.writer != null) { 361 LOG.info( 362 "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); 363 close(wl.writer); 364 wl.writer = null; 365 } 366 wl.written = 0; 367 } 368 369 private Configuration createRemoteClusterConf(Configuration conf) { 370 final Configuration newConf = new Configuration(conf); 371 372 final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); 373 final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); 374 final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); 375 376 if (quorum != null && clientPort != null && parent != null) { 377 newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); 378 newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); 379 newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); 380 } 381 382 for (Entry<String, String> entry : conf) { 383 String key = entry.getKey(); 384 if ( 385 REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key) 386 || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key) 387 || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key) 388 ) { 389 // Handled them above 390 continue; 391 } 392 393 if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) { 394 String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length()); 395 if (!originalKey.isEmpty()) { 396 newConf.set(originalKey, entry.getValue()); 397 } 398 } 399 } 400 401 return newConf; 402 } 403 404 /* 405 * Create a new StoreFile.Writer. 406 * @return A WriterLength, containing a new StoreFile.Writer. 407 */ 408 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", 409 justification = "Not important") 410 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 411 InetSocketAddress[] favoredNodes) throws IOException { 412 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 413 Path familydir = new Path(outputDir, Bytes.toString(family)); 414 if (writeMultipleTables) { 415 familydir = 416 new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); 417 } 418 WriterLength wl = new WriterLength(); 419 Algorithm compression = overriddenCompression; 420 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 421 compression = compression == null ? defaultCompression : compression; 422 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 423 bloomType = bloomType == null ? BloomType.NONE : bloomType; 424 String bloomParam = bloomParamMap.get(tableAndFamily); 425 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 426 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 427 } 428 Integer blockSize = blockSizeMap.get(tableAndFamily); 429 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 430 DataBlockEncoding encoding = overriddenEncoding; 431 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 432 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 433 HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) 434 .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf)) 435 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize) 436 .withColumnFamily(family).withTableName(tableName) 437 .withCreateTime(EnvironmentEdgeManager.currentTime()); 438 439 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 440 contextBuilder.withIncludesTags(true); 441 } 442 443 HFileContext hFileContext = contextBuilder.build(); 444 if (null == favoredNodes) { 445 wl.writer = 446 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) 447 .withBloomType(bloomType).withFileContext(hFileContext).build(); 448 } else { 449 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 450 .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) 451 .withFavoredNodes(favoredNodes).build(); 452 } 453 454 this.writers.put(tableAndFamily, wl); 455 return wl; 456 } 457 458 private void close(final StoreFileWriter w) throws IOException { 459 if (w != null) { 460 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 461 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 462 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 463 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 464 w.appendTrackedTimestampsToMetadata(); 465 w.close(); 466 } 467 } 468 469 @Override 470 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 471 for (WriterLength wl : this.writers.values()) { 472 close(wl.writer); 473 } 474 } 475 }; 476 } 477 478 /** 479 * Configure block storage policy for CF after the directory is created. 480 */ 481 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 482 byte[] tableAndFamily, Path cfPath) { 483 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 484 return; 485 } 486 487 String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 488 conf.get(STORAGE_POLICY_PROPERTY)); 489 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 490 } 491 492 /* 493 * Data structure to hold a Writer and amount of data written on it. 494 */ 495 static class WriterLength { 496 long written = 0; 497 StoreFileWriter writer = null; 498 } 499 500 /** 501 * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable. 502 */ 503 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 504 boolean writeMultipleTables) throws IOException { 505 506 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 507 for (RegionLocator regionLocator : regionLocators) { 508 TableName tableName = regionLocator.getName(); 509 LOG.info("Looking up current regions for table " + tableName); 510 byte[][] byteKeys = regionLocator.getStartKeys(); 511 for (byte[] byteKey : byteKeys) { 512 byte[] fullKey = byteKey; // HFileOutputFormat2 use case 513 if (writeMultipleTables) { 514 // MultiTableHFileOutputFormat use case 515 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 516 } 517 if (LOG.isDebugEnabled()) { 518 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 519 } 520 ret.add(new ImmutableBytesWritable(fullKey)); 521 } 522 } 523 return ret; 524 } 525 526 /** 527 * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that 528 * contains the split points in startKeys. 529 */ 530 @SuppressWarnings("deprecation") 531 private static void writePartitions(Configuration conf, Path partitionsPath, 532 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 533 LOG.info("Writing partition information to " + partitionsPath); 534 if (startKeys.isEmpty()) { 535 throw new IllegalArgumentException("No regions passed"); 536 } 537 538 // We're generating a list of split points, and we don't ever 539 // have keys < the first region (which has an empty start key) 540 // so we need to remove it. Otherwise we would end up with an 541 // empty reducer with index 0 542 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 543 ImmutableBytesWritable first = sorted.first(); 544 if (writeMultipleTables) { 545 first = 546 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 547 } 548 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 549 throw new IllegalArgumentException( 550 "First region of table should have empty start key. Instead has: " 551 + Bytes.toStringBinary(first.get())); 552 } 553 sorted.remove(sorted.first()); 554 555 // Write the actual file 556 FileSystem fs = partitionsPath.getFileSystem(conf); 557 boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf); 558 Class<? extends Writable> keyClass = 559 diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class; 560 SequenceFile.Writer writer = 561 SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class); 562 563 try { 564 for (ImmutableBytesWritable startKey : sorted) { 565 Writable writable = diskBasedSortingEnabled 566 ? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get())) 567 : startKey; 568 569 writer.append(writable, NullWritable.get()); 570 } 571 } finally { 572 writer.close(); 573 } 574 } 575 576 /** 577 * Configure a MapReduce Job to perform an incremental load into the given table. This 578 * <ul> 579 * <li>Inspects the table to configure a total order partitioner</li> 580 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 581 * <li>Sets the number of reduce tasks to match the current number of regions</li> 582 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 583 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 584 * PutSortReducer)</li> 585 * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li> 586 * </ul> 587 * The user should be sure to set the map output value class to either KeyValue or Put before 588 * running this function. 589 */ 590 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 591 throws IOException { 592 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 593 configureRemoteCluster(job, table.getConfiguration()); 594 } 595 596 /** 597 * Configure a MapReduce Job to perform an incremental load into the given table. This 598 * <ul> 599 * <li>Inspects the table to configure a total order partitioner</li> 600 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 601 * <li>Sets the number of reduce tasks to match the current number of regions</li> 602 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 603 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 604 * PutSortReducer)</li> 605 * </ul> 606 * The user should be sure to set the map output value class to either KeyValue or Put before 607 * running this function. 608 */ 609 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 610 RegionLocator regionLocator) throws IOException { 611 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 612 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 613 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 614 } 615 616 public static boolean diskBasedSortingEnabled(Configuration conf) { 617 return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT); 618 } 619 620 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 621 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 622 Configuration conf = job.getConfiguration(); 623 job.setOutputKeyClass(ImmutableBytesWritable.class); 624 job.setOutputValueClass(MapReduceExtendedCell.class); 625 job.setOutputFormatClass(cls); 626 627 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 628 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 629 } 630 boolean writeMultipleTables = false; 631 if (MultiTableHFileOutputFormat.class.equals(cls)) { 632 writeMultipleTables = true; 633 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 634 } 635 // Based on the configured map output class, set the correct reducer to properly 636 // sort the incoming values. 637 // TODO it would be nice to pick one or the other of these formats. 638 boolean diskBasedSorting = diskBasedSortingEnabled(conf); 639 640 if (diskBasedSorting) { 641 job.setMapOutputKeyClass(KeyOnlyCellComparable.class); 642 job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); 643 job.setReducerClass(PreSortedCellsReducer.class); 644 } else if ( 645 KeyValue.class.equals(job.getMapOutputValueClass()) 646 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) 647 ) { 648 job.setReducerClass(CellSortReducer.class); 649 } else if (Put.class.equals(job.getMapOutputValueClass())) { 650 job.setReducerClass(PutSortReducer.class); 651 } else if (Text.class.equals(job.getMapOutputValueClass())) { 652 job.setReducerClass(TextSortReducer.class); 653 } else { 654 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 655 } 656 657 mergeSerializations(conf); 658 659 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 660 LOG.info("bulkload locality sensitive enabled"); 661 } 662 663 /* Now get the region start keys for every table required */ 664 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 665 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 666 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 667 668 for (TableInfo tableInfo : multiTableInfo) { 669 regionLocators.add(tableInfo.getRegionLocator()); 670 String tn = writeMultipleTables 671 ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() 672 : tableInfo.getRegionLocator().getName().getNameAsString(); 673 allTableNames.add(tn); 674 tableDescriptors.add(tableInfo.getTableDescriptor()); 675 } 676 // Record tablenames for creating writer by favored nodes, and decoding compression, 677 // block size and other attributes of columnfamily per table 678 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 679 StringUtils.join(allTableNames, Bytes.toString(tableSeparator))); 680 List<ImmutableBytesWritable> startKeys = 681 getRegionStartKeys(regionLocators, writeMultipleTables); 682 // Use table's region boundaries for TOP split points. 683 LOG.info("Configuring " + startKeys.size() + " reduce partitions " 684 + "to match current region count for all tables"); 685 job.setNumReduceTasks(startKeys.size()); 686 687 configurePartitioner(job, startKeys, writeMultipleTables); 688 // Set compression algorithms based on column families 689 690 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 691 serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); 692 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 693 serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); 694 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 695 serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); 696 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 697 serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); 698 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 699 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 700 701 TableMapReduceUtil.addDependencyJars(job); 702 TableMapReduceUtil.initCredentials(job); 703 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 704 } 705 706 private static void mergeSerializations(Configuration conf) { 707 List<String> serializations = new ArrayList<>(); 708 709 // add any existing values that have been set 710 String[] existing = conf.getStrings("io.serializations"); 711 if (existing != null) { 712 Collections.addAll(serializations, existing); 713 } 714 715 serializations.add(MutationSerialization.class.getName()); 716 serializations.add(ResultSerialization.class.getName()); 717 718 // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's 719 // SerializationFactory runs through serializations in the order they are registered. 720 // We want to register ExtendedCellSerialization before CellSerialization because both 721 // work for ExtendedCells but only ExtendedCellSerialization handles them properly. 722 if ( 723 conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, 724 EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) 725 ) { 726 serializations.add(ExtendedCellSerialization.class.getName()); 727 } 728 serializations.add(CellSerialization.class.getName()); 729 730 conf.setStrings("io.serializations", serializations.toArray(new String[0])); 731 } 732 733 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) 734 throws IOException { 735 Configuration conf = job.getConfiguration(); 736 737 job.setOutputKeyClass(ImmutableBytesWritable.class); 738 job.setOutputValueClass(MapReduceExtendedCell.class); 739 job.setOutputFormatClass(HFileOutputFormat2.class); 740 741 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 742 singleTableDescriptor.add(tableDescriptor); 743 744 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 745 // Set compression algorithms based on column families 746 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 747 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 748 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 749 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 750 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 751 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 752 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 753 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 754 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 755 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 756 757 TableMapReduceUtil.addDependencyJars(job); 758 TableMapReduceUtil.initCredentials(job); 759 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 760 } 761 762 /** 763 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 764 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 765 * HBase cluster to be used to load region location is configured in the job configuration. Call 766 * this method when another HBase cluster key is configured in the job configuration. For example, 767 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 768 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 769 * A and locality-sensitive won't working correctly. 770 * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using 771 * {@link Table#getConfiguration} as clusterConf. See HBASE-25608. 772 * @param job which has configuration to be updated 773 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 774 * @see #configureIncrementalLoad(Job, Table, RegionLocator) 775 * @see #LOCALITY_SENSITIVE_CONF_KEY 776 * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY 777 * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY 778 * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY 779 */ 780 public static void configureRemoteCluster(Job job, Configuration clusterConf) throws IOException { 781 Configuration conf = job.getConfiguration(); 782 783 if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 784 return; 785 } 786 787 final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); 788 final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 789 HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); 790 final String parent = 791 clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 792 793 conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); 794 conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); 795 conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); 796 797 TableMapReduceUtil.initCredentialsForCluster(job, clusterConf); 798 799 LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort 800 + "/" + parent); 801 } 802 803 /** 804 * Runs inside the task to deserialize column family to compression algorithm map from the 805 * configuration. 806 * @param conf to read the serialized values from 807 * @return a map from column family to the configured compression algorithm 808 */ 809 @InterfaceAudience.Private 810 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { 811 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); 812 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 813 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 814 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 815 compressionMap.put(e.getKey(), algorithm); 816 } 817 return compressionMap; 818 } 819 820 /** 821 * Runs inside the task to deserialize column family to bloom filter type map from the 822 * configuration. 823 * @param conf to read the serialized values from 824 * @return a map from column family to the the configured bloom filter type 825 */ 826 @InterfaceAudience.Private 827 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 828 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); 829 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 830 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 831 BloomType bloomType = BloomType.valueOf(e.getValue()); 832 bloomTypeMap.put(e.getKey(), bloomType); 833 } 834 return bloomTypeMap; 835 } 836 837 /** 838 * Runs inside the task to deserialize column family to bloom filter param map from the 839 * configuration. 840 * @param conf to read the serialized values from 841 * @return a map from column family to the the configured bloom filter param 842 */ 843 @InterfaceAudience.Private 844 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 845 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 846 } 847 848 /** 849 * Runs inside the task to deserialize column family to block size map from the configuration. 850 * @param conf to read the serialized values from 851 * @return a map from column family to the configured block size 852 */ 853 @InterfaceAudience.Private 854 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 855 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); 856 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 857 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 858 Integer blockSize = Integer.parseInt(e.getValue()); 859 blockSizeMap.put(e.getKey(), blockSize); 860 } 861 return blockSizeMap; 862 } 863 864 /** 865 * Runs inside the task to deserialize column family to data block encoding type map from the 866 * configuration. 867 * @param conf to read the serialized values from 868 * @return a map from column family to HFileDataBlockEncoder for the configured data block type 869 * for the family 870 */ 871 @InterfaceAudience.Private 872 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { 873 Map<byte[], String> stringMap = 874 createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 875 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 876 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 877 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 878 } 879 return encoderMap; 880 } 881 882 /** 883 * Run inside the task to deserialize column family to given conf value map. 884 * @param conf to read the serialized values from 885 * @param confName conf key to read from the configuration 886 * @return a map of column family to the given configuration value 887 */ 888 private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { 889 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 890 String confVal = conf.get(confName, ""); 891 for (String familyConf : confVal.split("&")) { 892 String[] familySplit = familyConf.split("="); 893 if (familySplit.length != 2) { 894 continue; 895 } 896 try { 897 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 898 URLDecoder.decode(familySplit[1], "UTF-8")); 899 } catch (UnsupportedEncodingException e) { 900 // will not happen with UTF-8 encoding 901 throw new AssertionError(e); 902 } 903 } 904 return confValMap; 905 } 906 907 /** 908 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 909 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 910 */ 911 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, 912 boolean writeMultipleTables) throws IOException { 913 Configuration conf = job.getConfiguration(); 914 // create the partitions file 915 FileSystem fs = FileSystem.get(conf); 916 String hbaseTmpFsDir = 917 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging"); 918 Path partitionsPath = 919 fs.makeQualified(new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID())); 920 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 921 fs.deleteOnExit(partitionsPath); 922 923 // configure job to use it 924 job.setPartitionerClass(TotalOrderPartitioner.class); 925 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 926 } 927 928 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 929 value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 930 @InterfaceAudience.Private 931 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 932 List<TableDescriptor> allTables) throws UnsupportedEncodingException { 933 StringBuilder attributeValue = new StringBuilder(); 934 int i = 0; 935 for (TableDescriptor tableDescriptor : allTables) { 936 if (tableDescriptor == null) { 937 // could happen with mock table instance 938 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 939 return ""; 940 } 941 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 942 if (i++ > 0) { 943 attributeValue.append('&'); 944 } 945 attributeValue.append(URLEncoder 946 .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 947 familyDescriptor.getName())), "UTF-8")); 948 attributeValue.append('='); 949 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 950 } 951 } 952 // Get rid of the last ampersand 953 return attributeValue.toString(); 954 } 955 956 /** 957 * Serialize column family to compression algorithm map to configuration. Invoked while 958 * configuring the MR job for incremental load. 959 */ 960 @InterfaceAudience.Private 961 static Function<ColumnFamilyDescriptor, String> compressionDetails = 962 familyDescriptor -> familyDescriptor.getCompressionType().getName(); 963 964 /** 965 * Serialize column family to block size map to configuration. Invoked while configuring the MR 966 * job for incremental load. 967 */ 968 @InterfaceAudience.Private 969 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 970 familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize()); 971 972 /** 973 * Serialize column family to bloom type map to configuration. Invoked while configuring the MR 974 * job for incremental load. 975 */ 976 @InterfaceAudience.Private 977 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 978 String bloomType = familyDescriptor.getBloomFilterType().toString(); 979 if (bloomType == null) { 980 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 981 } 982 return bloomType; 983 }; 984 985 /** 986 * Serialize column family to bloom param map to configuration. Invoked while configuring the MR 987 * job for incremental load. 988 */ 989 @InterfaceAudience.Private 990 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 991 BloomType bloomType = familyDescriptor.getBloomFilterType(); 992 String bloomParam = ""; 993 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 994 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 995 } 996 return bloomParam; 997 }; 998 999 /** 1000 * Serialize column family to data block encoding map to configuration. Invoked while configuring 1001 * the MR job for incremental load. 1002 */ 1003 @InterfaceAudience.Private 1004 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 1005 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 1006 if (encoding == null) { 1007 encoding = DataBlockEncoding.NONE; 1008 } 1009 return encoding.toString(); 1010 }; 1011 1012}