001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UncheckedIOException; 027import java.io.UnsupportedEncodingException; 028import java.net.InetSocketAddress; 029import java.net.URLDecoder; 030import java.net.URLEncoder; 031import java.nio.charset.Charset; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Set; 039import java.util.TreeMap; 040import java.util.TreeSet; 041import java.util.UUID; 042import java.util.function.Function; 043import java.util.stream.Collectors; 044import org.apache.commons.lang3.StringUtils; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.ExtendedCell; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.KeyValueUtil; 055import org.apache.hadoop.hbase.PrivateCellUtil; 056import org.apache.hadoop.hbase.TableName; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 059import org.apache.hadoop.hbase.client.Connection; 060import org.apache.hadoop.hbase.client.ConnectionFactory; 061import org.apache.hadoop.hbase.client.Put; 062import org.apache.hadoop.hbase.client.RegionLocator; 063import org.apache.hadoop.hbase.client.Table; 064import org.apache.hadoop.hbase.client.TableDescriptor; 065import org.apache.hadoop.hbase.fs.HFileSystem; 066import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 067import org.apache.hadoop.hbase.io.compress.Compression; 068import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 069import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 070import org.apache.hadoop.hbase.io.hfile.CacheConfig; 071import org.apache.hadoop.hbase.io.hfile.HFile; 072import org.apache.hadoop.hbase.io.hfile.HFileContext; 073import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 074import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 075import org.apache.hadoop.hbase.regionserver.BloomType; 076import org.apache.hadoop.hbase.regionserver.HStore; 077import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 078import org.apache.hadoop.hbase.regionserver.StoreUtils; 079import org.apache.hadoop.hbase.util.BloomFilterUtil; 080import org.apache.hadoop.hbase.util.Bytes; 081import org.apache.hadoop.hbase.util.CommonFSUtils; 082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 083import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 084import org.apache.hadoop.io.NullWritable; 085import org.apache.hadoop.io.SequenceFile; 086import org.apache.hadoop.io.Text; 087import org.apache.hadoop.io.Writable; 088import org.apache.hadoop.mapreduce.Job; 089import org.apache.hadoop.mapreduce.OutputCommitter; 090import org.apache.hadoop.mapreduce.OutputFormat; 091import org.apache.hadoop.mapreduce.RecordWriter; 092import org.apache.hadoop.mapreduce.TaskAttemptContext; 093import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 094import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; 095import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 096import org.apache.yetus.audience.InterfaceAudience; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100/** 101 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the 102 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will 103 * forcibly roll all HFiles being written. 104 * <p> 105 * Using this class as part of a MapReduce job is best done using 106 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 107 */ 108@InterfaceAudience.Public 109public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 110 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 111 112 static class TableInfo { 113 private TableDescriptor tableDesctiptor; 114 private RegionLocator regionLocator; 115 116 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 117 this.tableDesctiptor = tableDesctiptor; 118 this.regionLocator = regionLocator; 119 } 120 121 public TableDescriptor getTableDescriptor() { 122 return tableDesctiptor; 123 } 124 125 public RegionLocator getRegionLocator() { 126 return regionLocator; 127 } 128 } 129 130 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 131 132 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 133 return Bytes.add(tableName, tableSeparator, suffix); 134 } 135 136 // The following constants are private since these are used by 137 // HFileOutputFormat2 to internally transfer data between job setup and 138 // reducer run using conf. 139 // These should not be changed by the client. 140 static final String COMPRESSION_FAMILIES_CONF_KEY = 141 "hbase.hfileoutputformat.families.compression"; 142 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; 143 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; 144 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; 145 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 146 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 147 148 // This constant is public since the client can modify this when setting 149 // up their conf object and thus refer to this symbol. 150 // It is present for backwards compatibility reasons. Use it only to 151 // override the auto-detection of datablock encoding and compression. 152 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 153 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 154 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 155 "hbase.mapreduce.hfileoutputformat.compression"; 156 157 /** 158 * Keep locality while generating HFiles for bulkload. See HBASE-12596 159 */ 160 public static final String LOCALITY_SENSITIVE_CONF_KEY = 161 "hbase.bulkload.locality.sensitive.enabled"; 162 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 163 static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; 164 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 165 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 166 167 /** 168 * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config 169 * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell. 170 */ 171 @InterfaceAudience.Private 172 public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = 173 "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; 174 static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; 175 176 @InterfaceAudience.Private 177 public static final String DISK_BASED_SORTING_ENABLED_KEY = 178 "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled"; 179 private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false; 180 181 public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; 182 public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = 183 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; 184 public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = 185 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR; 186 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = 187 REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; 188 189 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 190 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 191 192 @Override 193 public RecordWriter<ImmutableBytesWritable, Cell> 194 getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { 195 return createRecordWriter(context, this.getOutputCommitter(context)); 196 } 197 198 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 199 return combineTableNameSuffix(tableName, family); 200 } 201 202 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 203 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 204 205 // Get the path of the temporary output file 206 final Path outputDir = ((PathOutputCommitter) committer).getWorkPath(); 207 final Configuration conf = context.getConfiguration(); 208 final boolean writeMultipleTables = 209 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 210 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 211 if (writeTableNames == null || writeTableNames.isEmpty()) { 212 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 213 } 214 final FileSystem fs = outputDir.getFileSystem(conf); 215 // These configs. are from hbase-*.xml 216 final long maxsize = 217 conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); 218 // Invented config. Add to hbase-*.xml if other than default compression. 219 final String defaultCompressionStr = 220 conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); 221 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 222 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 223 final Algorithm overriddenCompression = 224 compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null; 225 final boolean compactionExclude = 226 conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 227 final Set<String> allTableNames = Arrays 228 .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 229 230 // create a map from column family to the compression algorithm 231 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 232 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 233 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 234 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 235 236 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 237 final Map<byte[], DataBlockEncoding> datablockEncodingMap = 238 createFamilyDataBlockEncodingMap(conf); 239 final DataBlockEncoding overriddenEncoding = 240 dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 241 242 return new RecordWriter<ImmutableBytesWritable, V>() { 243 // Map of families to writers and how much has been output on the writer. 244 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 245 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 246 private final long now = EnvironmentEdgeManager.currentTime(); 247 private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); 248 249 @Override 250 public void write(ImmutableBytesWritable row, V cell) throws IOException { 251 // null input == user explicitly wants to flush 252 if (row == null && cell == null) { 253 rollWriters(null); 254 return; 255 } 256 257 ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell); 258 byte[] rowKey = CellUtil.cloneRow(kv); 259 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 260 byte[] family = CellUtil.cloneFamily(kv); 261 if (writeMultipleTables) { 262 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 263 tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() 264 .getBytes(Charset.defaultCharset()); 265 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 266 throw new IllegalArgumentException( 267 "TableName " + Bytes.toString(tableNameBytes) + " not expected"); 268 } 269 } 270 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 271 272 WriterLength wl = this.writers.get(tableAndFamily); 273 274 // If this is a new column family, verify that the directory exists 275 if (wl == null) { 276 Path writerPath = null; 277 if (writeMultipleTables) { 278 Path tableRelPath = getTableRelativePath(tableNameBytes); 279 writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); 280 } else { 281 writerPath = new Path(outputDir, Bytes.toString(family)); 282 } 283 fs.mkdirs(writerPath); 284 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 285 } 286 287 // This can only happen once a row is finished though 288 if ( 289 wl != null && wl.written + length >= maxsize 290 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 291 ) { 292 rollWriters(wl); 293 } 294 295 // create a new WAL writer, if necessary 296 if (wl == null || wl.writer == null) { 297 InetSocketAddress[] favoredNodes = null; 298 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 299 HRegionLocation loc = null; 300 String tableName = Bytes.toString(tableNameBytes); 301 if (tableName != null) { 302 try ( 303 Connection connection = 304 ConnectionFactory.createConnection(createRemoteClusterConf(conf)); 305 RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { 306 loc = locator.getRegionLocation(rowKey); 307 } catch (Throwable e) { 308 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), 309 tableName, e); 310 loc = null; 311 } 312 } 313 if (null == loc) { 314 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 315 } else { 316 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 317 InetSocketAddress initialIsa = 318 new InetSocketAddress(loc.getHostname(), loc.getPort()); 319 if (initialIsa.isUnresolved()) { 320 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 321 } else { 322 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 323 favoredNodes = new InetSocketAddress[] { initialIsa }; 324 } 325 } 326 } 327 wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); 328 329 } 330 331 // we now have the proper WAL writer. full steam ahead 332 PrivateCellUtil.updateLatestStamp(kv, this.now); 333 wl.writer.append((ExtendedCell) kv); 334 wl.written += length; 335 336 // Copy the row so we know when a row transition. 337 this.previousRows.put(family, rowKey); 338 } 339 340 private Path getTableRelativePath(byte[] tableNameBytes) { 341 String tableName = Bytes.toString(tableNameBytes); 342 String[] tableNameParts = tableName.split(":"); 343 Path tableRelPath = new Path(tableNameParts[0]); 344 if (tableNameParts.length > 1) { 345 tableRelPath = new Path(tableRelPath, tableNameParts[1]); 346 } 347 return tableRelPath; 348 } 349 350 private void rollWriters(WriterLength writerLength) throws IOException { 351 if (writerLength != null) { 352 closeWriter(writerLength); 353 } else { 354 for (WriterLength wl : this.writers.values()) { 355 closeWriter(wl); 356 } 357 } 358 } 359 360 private void closeWriter(WriterLength wl) throws IOException { 361 if (wl.writer != null) { 362 LOG.info( 363 "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); 364 close(wl.writer); 365 wl.writer = null; 366 } 367 wl.written = 0; 368 } 369 370 private Configuration createRemoteClusterConf(Configuration conf) { 371 final Configuration newConf = new Configuration(conf); 372 373 final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); 374 final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); 375 final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); 376 377 if (quorum != null && clientPort != null && parent != null) { 378 newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); 379 newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); 380 newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); 381 } 382 383 for (Entry<String, String> entry : conf) { 384 String key = entry.getKey(); 385 if ( 386 REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key) 387 || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key) 388 || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key) 389 ) { 390 // Handled them above 391 continue; 392 } 393 394 if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) { 395 String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length()); 396 if (!originalKey.isEmpty()) { 397 newConf.set(originalKey, entry.getValue()); 398 } 399 } 400 } 401 402 return newConf; 403 } 404 405 /* 406 * Create a new StoreFile.Writer. 407 * @return A WriterLength, containing a new StoreFile.Writer. 408 */ 409 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", 410 justification = "Not important") 411 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 412 InetSocketAddress[] favoredNodes) throws IOException { 413 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 414 Path familydir = new Path(outputDir, Bytes.toString(family)); 415 if (writeMultipleTables) { 416 familydir = 417 new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); 418 } 419 WriterLength wl = new WriterLength(); 420 Algorithm compression = overriddenCompression; 421 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 422 compression = compression == null ? defaultCompression : compression; 423 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 424 bloomType = bloomType == null ? BloomType.NONE : bloomType; 425 String bloomParam = bloomParamMap.get(tableAndFamily); 426 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 427 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 428 } 429 Integer blockSize = blockSizeMap.get(tableAndFamily); 430 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 431 DataBlockEncoding encoding = overriddenEncoding; 432 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 433 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 434 HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) 435 .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf)) 436 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize) 437 .withColumnFamily(family).withTableName(tableName) 438 .withCreateTime(EnvironmentEdgeManager.currentTime()); 439 440 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 441 contextBuilder.withIncludesTags(true); 442 } 443 444 HFileContext hFileContext = contextBuilder.build(); 445 if (null == favoredNodes) { 446 wl.writer = 447 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) 448 .withBloomType(bloomType).withFileContext(hFileContext).build(); 449 } else { 450 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 451 .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) 452 .withFavoredNodes(favoredNodes).build(); 453 } 454 455 this.writers.put(tableAndFamily, wl); 456 return wl; 457 } 458 459 private void close(final StoreFileWriter w) throws IOException { 460 if (w != null) { 461 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 462 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 463 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 464 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 465 w.appendTrackedTimestampsToMetadata(); 466 w.close(); 467 } 468 } 469 470 @Override 471 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 472 for (WriterLength wl : this.writers.values()) { 473 close(wl.writer); 474 } 475 } 476 }; 477 } 478 479 /** 480 * Configure block storage policy for CF after the directory is created. 481 */ 482 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 483 byte[] tableAndFamily, Path cfPath) { 484 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 485 return; 486 } 487 488 String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 489 conf.get(STORAGE_POLICY_PROPERTY)); 490 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 491 } 492 493 /* 494 * Data structure to hold a Writer and amount of data written on it. 495 */ 496 static class WriterLength { 497 long written = 0; 498 StoreFileWriter writer = null; 499 } 500 501 /** 502 * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable. 503 */ 504 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 505 boolean writeMultipleTables) throws IOException { 506 507 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 508 for (RegionLocator regionLocator : regionLocators) { 509 TableName tableName = regionLocator.getName(); 510 LOG.info("Looking up current regions for table " + tableName); 511 byte[][] byteKeys = regionLocator.getStartKeys(); 512 for (byte[] byteKey : byteKeys) { 513 byte[] fullKey = byteKey; // HFileOutputFormat2 use case 514 if (writeMultipleTables) { 515 // MultiTableHFileOutputFormat use case 516 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 517 } 518 if (LOG.isDebugEnabled()) { 519 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 520 } 521 ret.add(new ImmutableBytesWritable(fullKey)); 522 } 523 } 524 return ret; 525 } 526 527 /** 528 * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that 529 * contains the split points in startKeys. 530 */ 531 @SuppressWarnings("deprecation") 532 private static void writePartitions(Configuration conf, Path partitionsPath, 533 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 534 LOG.info("Writing partition information to " + partitionsPath); 535 if (startKeys.isEmpty()) { 536 throw new IllegalArgumentException("No regions passed"); 537 } 538 539 // We're generating a list of split points, and we don't ever 540 // have keys < the first region (which has an empty start key) 541 // so we need to remove it. Otherwise we would end up with an 542 // empty reducer with index 0 543 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 544 ImmutableBytesWritable first = sorted.first(); 545 if (writeMultipleTables) { 546 first = 547 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 548 } 549 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 550 throw new IllegalArgumentException( 551 "First region of table should have empty start key. Instead has: " 552 + Bytes.toStringBinary(first.get())); 553 } 554 sorted.remove(sorted.first()); 555 556 // Write the actual file 557 FileSystem fs = partitionsPath.getFileSystem(conf); 558 boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf); 559 Class<? extends Writable> keyClass = 560 diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class; 561 SequenceFile.Writer writer = 562 SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class); 563 564 try { 565 for (ImmutableBytesWritable startKey : sorted) { 566 Writable writable = diskBasedSortingEnabled 567 ? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get())) 568 : startKey; 569 570 writer.append(writable, NullWritable.get()); 571 } 572 } finally { 573 writer.close(); 574 } 575 } 576 577 /** 578 * Configure a MapReduce Job to perform an incremental load into the given table. This 579 * <ul> 580 * <li>Inspects the table to configure a total order partitioner</li> 581 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 582 * <li>Sets the number of reduce tasks to match the current number of regions</li> 583 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 584 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 585 * PutSortReducer)</li> 586 * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li> 587 * </ul> 588 * The user should be sure to set the map output value class to either KeyValue or Put before 589 * running this function. 590 */ 591 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 592 throws IOException { 593 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 594 configureForRemoteCluster(job, table.getConfiguration()); 595 } 596 597 /** 598 * Configure a MapReduce Job to perform an incremental load into the given table. This 599 * <ul> 600 * <li>Inspects the table to configure a total order partitioner</li> 601 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 602 * <li>Sets the number of reduce tasks to match the current number of regions</li> 603 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 604 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 605 * PutSortReducer)</li> 606 * </ul> 607 * The user should be sure to set the map output value class to either KeyValue or Put before 608 * running this function. 609 */ 610 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 611 RegionLocator regionLocator) throws IOException { 612 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 613 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 614 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 615 } 616 617 public static boolean diskBasedSortingEnabled(Configuration conf) { 618 return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT); 619 } 620 621 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 622 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 623 Configuration conf = job.getConfiguration(); 624 job.setOutputKeyClass(ImmutableBytesWritable.class); 625 job.setOutputValueClass(MapReduceExtendedCell.class); 626 job.setOutputFormatClass(cls); 627 628 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 629 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 630 } 631 boolean writeMultipleTables = false; 632 if (MultiTableHFileOutputFormat.class.equals(cls)) { 633 writeMultipleTables = true; 634 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 635 } 636 // Based on the configured map output class, set the correct reducer to properly 637 // sort the incoming values. 638 // TODO it would be nice to pick one or the other of these formats. 639 boolean diskBasedSorting = diskBasedSortingEnabled(conf); 640 641 if (diskBasedSorting) { 642 job.setMapOutputKeyClass(KeyOnlyCellComparable.class); 643 job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); 644 job.setReducerClass(PreSortedCellsReducer.class); 645 } else if ( 646 KeyValue.class.equals(job.getMapOutputValueClass()) 647 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) 648 ) { 649 job.setReducerClass(CellSortReducer.class); 650 } else if (Put.class.equals(job.getMapOutputValueClass())) { 651 job.setReducerClass(PutSortReducer.class); 652 } else if (Text.class.equals(job.getMapOutputValueClass())) { 653 job.setReducerClass(TextSortReducer.class); 654 } else { 655 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 656 } 657 658 mergeSerializations(conf); 659 660 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 661 LOG.info("bulkload locality sensitive enabled"); 662 } 663 664 /* Now get the region start keys for every table required */ 665 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 666 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 667 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 668 669 for (TableInfo tableInfo : multiTableInfo) { 670 regionLocators.add(tableInfo.getRegionLocator()); 671 String tn = writeMultipleTables 672 ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() 673 : tableInfo.getRegionLocator().getName().getNameAsString(); 674 allTableNames.add(tn); 675 tableDescriptors.add(tableInfo.getTableDescriptor()); 676 } 677 // Record tablenames for creating writer by favored nodes, and decoding compression, 678 // block size and other attributes of columnfamily per table 679 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 680 StringUtils.join(allTableNames, Bytes.toString(tableSeparator))); 681 List<ImmutableBytesWritable> startKeys = 682 getRegionStartKeys(regionLocators, writeMultipleTables); 683 // Use table's region boundaries for TOP split points. 684 LOG.info("Configuring " + startKeys.size() + " reduce partitions " 685 + "to match current region count for all tables"); 686 job.setNumReduceTasks(startKeys.size()); 687 688 configurePartitioner(job, startKeys, writeMultipleTables); 689 // Set compression algorithms based on column families 690 691 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 692 serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); 693 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 694 serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); 695 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 696 serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); 697 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 698 serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); 699 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 700 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 701 702 TableMapReduceUtil.addDependencyJars(job); 703 TableMapReduceUtil.initCredentials(job); 704 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 705 } 706 707 private static void mergeSerializations(Configuration conf) { 708 List<String> serializations = new ArrayList<>(); 709 710 // add any existing values that have been set 711 String[] existing = conf.getStrings("io.serializations"); 712 if (existing != null) { 713 Collections.addAll(serializations, existing); 714 } 715 716 serializations.add(MutationSerialization.class.getName()); 717 serializations.add(ResultSerialization.class.getName()); 718 719 // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's 720 // SerializationFactory runs through serializations in the order they are registered. 721 // We want to register ExtendedCellSerialization before CellSerialization because both 722 // work for ExtendedCells but only ExtendedCellSerialization handles them properly. 723 if ( 724 conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, 725 EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) 726 ) { 727 serializations.add(ExtendedCellSerialization.class.getName()); 728 } 729 serializations.add(CellSerialization.class.getName()); 730 731 conf.setStrings("io.serializations", serializations.toArray(new String[0])); 732 } 733 734 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) 735 throws IOException { 736 Configuration conf = job.getConfiguration(); 737 738 job.setOutputKeyClass(ImmutableBytesWritable.class); 739 job.setOutputValueClass(MapReduceExtendedCell.class); 740 job.setOutputFormatClass(HFileOutputFormat2.class); 741 742 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 743 singleTableDescriptor.add(tableDescriptor); 744 745 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 746 // Set compression algorithms based on column families 747 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 748 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 749 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 750 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 751 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 752 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 753 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 754 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 755 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 756 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 757 758 TableMapReduceUtil.addDependencyJars(job); 759 TableMapReduceUtil.initCredentials(job); 760 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 761 } 762 763 /** 764 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 765 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 766 * HBase cluster to be used to load region location is configured in the job configuration. Call 767 * this method when another HBase cluster key is configured in the job configuration. For example, 768 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 769 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 770 * A and locality-sensitive won't working correctly. 771 * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using 772 * {@link Table#getConfiguration} as clusterConf. See HBASE-25608. 773 * @param job which has configuration to be updated 774 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 775 * @see #configureIncrementalLoad(Job, Table, RegionLocator) 776 * @see #LOCALITY_SENSITIVE_CONF_KEY 777 * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY 778 * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY 779 * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY 780 * @deprecated As of release 2.6.4, this will be removed in HBase 4.0.0 Use 781 * {@link #configureForRemoteCluster(Job, Configuration)} instead. 782 */ 783 @Deprecated 784 public static void configureRemoteCluster(Job job, Configuration clusterConf) { 785 try { 786 configureForRemoteCluster(job, clusterConf); 787 } catch (IOException e) { 788 LOG.error("Configure remote cluster error.", e); 789 throw new UncheckedIOException("Configure remote cluster error.", e); 790 } 791 } 792 793 /** 794 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 795 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 796 * HBase cluster to be used to load region location is configured in the job configuration. Call 797 * this method when another HBase cluster key is configured in the job configuration. For example, 798 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 799 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 800 * A and locality-sensitive won't working correctly. If authentication is enabled, it obtains the 801 * token for the specific cluster. 802 * @param job which has configuration to be updated 803 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 804 * @throws IOException Exception while initializing cluster credentials 805 */ 806 public static void configureForRemoteCluster(Job job, Configuration clusterConf) 807 throws IOException { 808 Configuration conf = job.getConfiguration(); 809 810 if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 811 return; 812 } 813 814 final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); 815 final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 816 HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); 817 final String parent = 818 clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 819 820 conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); 821 conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); 822 conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); 823 824 TableMapReduceUtil.initCredentialsForCluster(job, clusterConf); 825 826 LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort 827 + "/" + parent); 828 } 829 830 /** 831 * Runs inside the task to deserialize column family to compression algorithm map from the 832 * configuration. 833 * @param conf to read the serialized values from 834 * @return a map from column family to the configured compression algorithm 835 */ 836 @InterfaceAudience.Private 837 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { 838 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); 839 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 840 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 841 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 842 compressionMap.put(e.getKey(), algorithm); 843 } 844 return compressionMap; 845 } 846 847 /** 848 * Runs inside the task to deserialize column family to bloom filter type map from the 849 * configuration. 850 * @param conf to read the serialized values from 851 * @return a map from column family to the the configured bloom filter type 852 */ 853 @InterfaceAudience.Private 854 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 855 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); 856 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 857 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 858 BloomType bloomType = BloomType.valueOf(e.getValue()); 859 bloomTypeMap.put(e.getKey(), bloomType); 860 } 861 return bloomTypeMap; 862 } 863 864 /** 865 * Runs inside the task to deserialize column family to bloom filter param map from the 866 * configuration. 867 * @param conf to read the serialized values from 868 * @return a map from column family to the the configured bloom filter param 869 */ 870 @InterfaceAudience.Private 871 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 872 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 873 } 874 875 /** 876 * Runs inside the task to deserialize column family to block size map from the configuration. 877 * @param conf to read the serialized values from 878 * @return a map from column family to the configured block size 879 */ 880 @InterfaceAudience.Private 881 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 882 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); 883 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 884 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 885 Integer blockSize = Integer.parseInt(e.getValue()); 886 blockSizeMap.put(e.getKey(), blockSize); 887 } 888 return blockSizeMap; 889 } 890 891 /** 892 * Runs inside the task to deserialize column family to data block encoding type map from the 893 * configuration. 894 * @param conf to read the serialized values from 895 * @return a map from column family to HFileDataBlockEncoder for the configured data block type 896 * for the family 897 */ 898 @InterfaceAudience.Private 899 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { 900 Map<byte[], String> stringMap = 901 createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 902 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 903 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 904 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 905 } 906 return encoderMap; 907 } 908 909 /** 910 * Run inside the task to deserialize column family to given conf value map. 911 * @param conf to read the serialized values from 912 * @param confName conf key to read from the configuration 913 * @return a map of column family to the given configuration value 914 */ 915 private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { 916 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 917 String confVal = conf.get(confName, ""); 918 for (String familyConf : confVal.split("&")) { 919 String[] familySplit = familyConf.split("="); 920 if (familySplit.length != 2) { 921 continue; 922 } 923 try { 924 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 925 URLDecoder.decode(familySplit[1], "UTF-8")); 926 } catch (UnsupportedEncodingException e) { 927 // will not happen with UTF-8 encoding 928 throw new AssertionError(e); 929 } 930 } 931 return confValMap; 932 } 933 934 /** 935 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 936 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 937 */ 938 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, 939 boolean writeMultipleTables) throws IOException { 940 Configuration conf = job.getConfiguration(); 941 // create the partitions file 942 FileSystem fs = FileSystem.get(conf); 943 String hbaseTmpFsDir = 944 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging"); 945 Path partitionsPath = 946 fs.makeQualified(new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID())); 947 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 948 fs.deleteOnExit(partitionsPath); 949 950 // configure job to use it 951 job.setPartitionerClass(TotalOrderPartitioner.class); 952 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 953 } 954 955 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 956 value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 957 @InterfaceAudience.Private 958 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 959 List<TableDescriptor> allTables) throws UnsupportedEncodingException { 960 StringBuilder attributeValue = new StringBuilder(); 961 int i = 0; 962 for (TableDescriptor tableDescriptor : allTables) { 963 if (tableDescriptor == null) { 964 // could happen with mock table instance 965 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 966 return ""; 967 } 968 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 969 if (i++ > 0) { 970 attributeValue.append('&'); 971 } 972 attributeValue.append(URLEncoder 973 .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 974 familyDescriptor.getName())), "UTF-8")); 975 attributeValue.append('='); 976 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 977 } 978 } 979 // Get rid of the last ampersand 980 return attributeValue.toString(); 981 } 982 983 /** 984 * Serialize column family to compression algorithm map to configuration. Invoked while 985 * configuring the MR job for incremental load. 986 */ 987 @InterfaceAudience.Private 988 static Function<ColumnFamilyDescriptor, String> compressionDetails = 989 familyDescriptor -> familyDescriptor.getCompressionType().getName(); 990 991 /** 992 * Serialize column family to block size map to configuration. Invoked while configuring the MR 993 * job for incremental load. 994 */ 995 @InterfaceAudience.Private 996 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 997 familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize()); 998 999 /** 1000 * Serialize column family to bloom type map to configuration. Invoked while configuring the MR 1001 * job for incremental load. 1002 */ 1003 @InterfaceAudience.Private 1004 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 1005 String bloomType = familyDescriptor.getBloomFilterType().toString(); 1006 if (bloomType == null) { 1007 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 1008 } 1009 return bloomType; 1010 }; 1011 1012 /** 1013 * Serialize column family to bloom param map to configuration. Invoked while configuring the MR 1014 * job for incremental load. 1015 */ 1016 @InterfaceAudience.Private 1017 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 1018 BloomType bloomType = familyDescriptor.getBloomFilterType(); 1019 String bloomParam = ""; 1020 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 1021 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 1022 } 1023 return bloomParam; 1024 }; 1025 1026 /** 1027 * Serialize column family to data block encoding map to configuration. Invoked while configuring 1028 * the MR job for incremental load. 1029 */ 1030 @InterfaceAudience.Private 1031 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 1032 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 1033 if (encoding == null) { 1034 encoding = DataBlockEncoding.NONE; 1035 } 1036 return encoding.toString(); 1037 }; 1038 1039}