001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.nio.charset.Charset; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Set; 037import java.util.TreeMap; 038import java.util.TreeSet; 039import java.util.UUID; 040import java.util.function.Function; 041import java.util.stream.Collectors; 042import org.apache.commons.lang3.StringUtils; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.KeyValue; 051import org.apache.hadoop.hbase.PrivateCellUtil; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.Connection; 056import org.apache.hadoop.hbase.client.ConnectionFactory; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.RegionLocator; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.fs.HFileSystem; 062import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 063import org.apache.hadoop.hbase.io.compress.Compression; 064import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 065import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.io.hfile.HFile; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 070import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 071import org.apache.hadoop.hbase.regionserver.BloomType; 072import org.apache.hadoop.hbase.regionserver.HStore; 073import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 074import org.apache.hadoop.hbase.regionserver.StoreUtils; 075import org.apache.hadoop.hbase.util.BloomFilterUtil; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.CommonFSUtils; 078import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 079import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 080import org.apache.hadoop.io.NullWritable; 081import org.apache.hadoop.io.SequenceFile; 082import org.apache.hadoop.io.Text; 083import org.apache.hadoop.mapreduce.Job; 084import org.apache.hadoop.mapreduce.OutputCommitter; 085import org.apache.hadoop.mapreduce.OutputFormat; 086import org.apache.hadoop.mapreduce.RecordWriter; 087import org.apache.hadoop.mapreduce.TaskAttemptContext; 088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095/** 096 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the 097 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will 098 * forcibly roll all HFiles being written. 099 * <p> 100 * Using this class as part of a MapReduce job is best done using 101 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 102 */ 103@InterfaceAudience.Public 104public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 105 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 106 107 static class TableInfo { 108 private TableDescriptor tableDesctiptor; 109 private RegionLocator regionLocator; 110 111 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 112 this.tableDesctiptor = tableDesctiptor; 113 this.regionLocator = regionLocator; 114 } 115 116 public TableDescriptor getTableDescriptor() { 117 return tableDesctiptor; 118 } 119 120 public RegionLocator getRegionLocator() { 121 return regionLocator; 122 } 123 } 124 125 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 126 127 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 128 return Bytes.add(tableName, tableSeparator, suffix); 129 } 130 131 // The following constants are private since these are used by 132 // HFileOutputFormat2 to internally transfer data between job setup and 133 // reducer run using conf. 134 // These should not be changed by the client. 135 static final String COMPRESSION_FAMILIES_CONF_KEY = 136 "hbase.hfileoutputformat.families.compression"; 137 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; 138 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; 139 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; 140 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 141 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 142 143 // This constant is public since the client can modify this when setting 144 // up their conf object and thus refer to this symbol. 145 // It is present for backwards compatibility reasons. Use it only to 146 // override the auto-detection of datablock encoding and compression. 147 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 148 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 149 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 150 "hbase.mapreduce.hfileoutputformat.compression"; 151 152 /** 153 * Keep locality while generating HFiles for bulkload. See HBASE-12596 154 */ 155 public static final String LOCALITY_SENSITIVE_CONF_KEY = 156 "hbase.bulkload.locality.sensitive.enabled"; 157 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 158 static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; 159 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 160 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 161 162 public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; 163 public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = 164 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; 165 public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = 166 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR; 167 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = 168 REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; 169 170 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 171 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 172 173 @Override 174 public RecordWriter<ImmutableBytesWritable, Cell> 175 getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { 176 return createRecordWriter(context, this.getOutputCommitter(context)); 177 } 178 179 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 180 return combineTableNameSuffix(tableName, family); 181 } 182 183 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 184 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 185 186 // Get the path of the temporary output file 187 final Path outputDir = ((FileOutputCommitter) committer).getWorkPath(); 188 final Configuration conf = context.getConfiguration(); 189 final boolean writeMultipleTables = 190 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 191 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 192 if (writeTableNames == null || writeTableNames.isEmpty()) { 193 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 194 } 195 final FileSystem fs = outputDir.getFileSystem(conf); 196 // These configs. are from hbase-*.xml 197 final long maxsize = 198 conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); 199 // Invented config. Add to hbase-*.xml if other than default compression. 200 final String defaultCompressionStr = 201 conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); 202 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 203 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 204 final Algorithm overriddenCompression = 205 compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null; 206 final boolean compactionExclude = 207 conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 208 final Set<String> allTableNames = Arrays 209 .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 210 211 // create a map from column family to the compression algorithm 212 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 213 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 214 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 215 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 216 217 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 218 final Map<byte[], DataBlockEncoding> datablockEncodingMap = 219 createFamilyDataBlockEncodingMap(conf); 220 final DataBlockEncoding overriddenEncoding = 221 dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 222 223 return new RecordWriter<ImmutableBytesWritable, V>() { 224 // Map of families to writers and how much has been output on the writer. 225 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 226 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 227 private final long now = EnvironmentEdgeManager.currentTime(); 228 private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); 229 230 @Override 231 public void write(ImmutableBytesWritable row, V cell) throws IOException { 232 Cell kv = cell; 233 // null input == user explicitly wants to flush 234 if (row == null && kv == null) { 235 rollWriters(null); 236 return; 237 } 238 239 byte[] rowKey = CellUtil.cloneRow(kv); 240 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 241 byte[] family = CellUtil.cloneFamily(kv); 242 if (writeMultipleTables) { 243 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 244 tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() 245 .getBytes(Charset.defaultCharset()); 246 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 247 throw new IllegalArgumentException( 248 "TableName " + Bytes.toString(tableNameBytes) + " not expected"); 249 } 250 } 251 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 252 253 WriterLength wl = this.writers.get(tableAndFamily); 254 255 // If this is a new column family, verify that the directory exists 256 if (wl == null) { 257 Path writerPath = null; 258 if (writeMultipleTables) { 259 Path tableRelPath = getTableRelativePath(tableNameBytes); 260 writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); 261 } else { 262 writerPath = new Path(outputDir, Bytes.toString(family)); 263 } 264 fs.mkdirs(writerPath); 265 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 266 } 267 268 // This can only happen once a row is finished though 269 if ( 270 wl != null && wl.written + length >= maxsize 271 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 272 ) { 273 rollWriters(wl); 274 } 275 276 // create a new WAL writer, if necessary 277 if (wl == null || wl.writer == null) { 278 InetSocketAddress[] favoredNodes = null; 279 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 280 HRegionLocation loc = null; 281 String tableName = Bytes.toString(tableNameBytes); 282 if (tableName != null) { 283 try ( 284 Connection connection = 285 ConnectionFactory.createConnection(createRemoteClusterConf(conf)); 286 RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { 287 loc = locator.getRegionLocation(rowKey); 288 } catch (Throwable e) { 289 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), 290 tableName, e); 291 loc = null; 292 } 293 } 294 if (null == loc) { 295 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 296 } else { 297 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 298 InetSocketAddress initialIsa = 299 new InetSocketAddress(loc.getHostname(), loc.getPort()); 300 if (initialIsa.isUnresolved()) { 301 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 302 } else { 303 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 304 favoredNodes = new InetSocketAddress[] { initialIsa }; 305 } 306 } 307 } 308 wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); 309 310 } 311 312 // we now have the proper WAL writer. full steam ahead 313 PrivateCellUtil.updateLatestStamp(cell, this.now); 314 wl.writer.append(kv); 315 wl.written += length; 316 317 // Copy the row so we know when a row transition. 318 this.previousRows.put(family, rowKey); 319 } 320 321 private Path getTableRelativePath(byte[] tableNameBytes) { 322 String tableName = Bytes.toString(tableNameBytes); 323 String[] tableNameParts = tableName.split(":"); 324 Path tableRelPath = new Path(tableNameParts[0]); 325 if (tableNameParts.length > 1) { 326 tableRelPath = new Path(tableRelPath, tableNameParts[1]); 327 } 328 return tableRelPath; 329 } 330 331 private void rollWriters(WriterLength writerLength) throws IOException { 332 if (writerLength != null) { 333 closeWriter(writerLength); 334 } else { 335 for (WriterLength wl : this.writers.values()) { 336 closeWriter(wl); 337 } 338 } 339 } 340 341 private void closeWriter(WriterLength wl) throws IOException { 342 if (wl.writer != null) { 343 LOG.info( 344 "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); 345 close(wl.writer); 346 wl.writer = null; 347 } 348 wl.written = 0; 349 } 350 351 private Configuration createRemoteClusterConf(Configuration conf) { 352 final Configuration newConf = new Configuration(conf); 353 354 final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); 355 final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); 356 final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); 357 358 if (quorum != null && clientPort != null && parent != null) { 359 newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); 360 newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); 361 newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); 362 } 363 364 for (Entry<String, String> entry : conf) { 365 String key = entry.getKey(); 366 if ( 367 REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key) 368 || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key) 369 || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key) 370 ) { 371 // Handled them above 372 continue; 373 } 374 375 if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) { 376 String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length()); 377 if (!originalKey.isEmpty()) { 378 newConf.set(originalKey, entry.getValue()); 379 } 380 } 381 } 382 383 return newConf; 384 } 385 386 /* 387 * Create a new StoreFile.Writer. 388 * @return A WriterLength, containing a new StoreFile.Writer. 389 */ 390 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", 391 justification = "Not important") 392 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 393 InetSocketAddress[] favoredNodes) throws IOException { 394 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 395 Path familydir = new Path(outputDir, Bytes.toString(family)); 396 if (writeMultipleTables) { 397 familydir = 398 new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); 399 } 400 WriterLength wl = new WriterLength(); 401 Algorithm compression = overriddenCompression; 402 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 403 compression = compression == null ? defaultCompression : compression; 404 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 405 bloomType = bloomType == null ? BloomType.NONE : bloomType; 406 String bloomParam = bloomParamMap.get(tableAndFamily); 407 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 408 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 409 } 410 Integer blockSize = blockSizeMap.get(tableAndFamily); 411 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 412 DataBlockEncoding encoding = overriddenEncoding; 413 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 414 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 415 HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) 416 .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf)) 417 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize) 418 .withColumnFamily(family).withTableName(tableName); 419 420 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 421 contextBuilder.withIncludesTags(true); 422 } 423 424 HFileContext hFileContext = contextBuilder.build(); 425 if (null == favoredNodes) { 426 wl.writer = 427 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) 428 .withBloomType(bloomType).withFileContext(hFileContext).build(); 429 } else { 430 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 431 .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) 432 .withFavoredNodes(favoredNodes).build(); 433 } 434 435 this.writers.put(tableAndFamily, wl); 436 return wl; 437 } 438 439 private void close(final StoreFileWriter w) throws IOException { 440 if (w != null) { 441 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 442 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 443 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 444 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 445 w.appendTrackedTimestampsToMetadata(); 446 w.close(); 447 } 448 } 449 450 @Override 451 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 452 for (WriterLength wl : this.writers.values()) { 453 close(wl.writer); 454 } 455 } 456 }; 457 } 458 459 /** 460 * Configure block storage policy for CF after the directory is created. 461 */ 462 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 463 byte[] tableAndFamily, Path cfPath) { 464 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 465 return; 466 } 467 468 String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 469 conf.get(STORAGE_POLICY_PROPERTY)); 470 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 471 } 472 473 /* 474 * Data structure to hold a Writer and amount of data written on it. 475 */ 476 static class WriterLength { 477 long written = 0; 478 StoreFileWriter writer = null; 479 } 480 481 /** 482 * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable. 483 */ 484 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 485 boolean writeMultipleTables) throws IOException { 486 487 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 488 for (RegionLocator regionLocator : regionLocators) { 489 TableName tableName = regionLocator.getName(); 490 LOG.info("Looking up current regions for table " + tableName); 491 byte[][] byteKeys = regionLocator.getStartKeys(); 492 for (byte[] byteKey : byteKeys) { 493 byte[] fullKey = byteKey; // HFileOutputFormat2 use case 494 if (writeMultipleTables) { 495 // MultiTableHFileOutputFormat use case 496 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 497 } 498 if (LOG.isDebugEnabled()) { 499 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 500 } 501 ret.add(new ImmutableBytesWritable(fullKey)); 502 } 503 } 504 return ret; 505 } 506 507 /** 508 * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that 509 * contains the split points in startKeys. 510 */ 511 @SuppressWarnings("deprecation") 512 private static void writePartitions(Configuration conf, Path partitionsPath, 513 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 514 LOG.info("Writing partition information to " + partitionsPath); 515 if (startKeys.isEmpty()) { 516 throw new IllegalArgumentException("No regions passed"); 517 } 518 519 // We're generating a list of split points, and we don't ever 520 // have keys < the first region (which has an empty start key) 521 // so we need to remove it. Otherwise we would end up with an 522 // empty reducer with index 0 523 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 524 ImmutableBytesWritable first = sorted.first(); 525 if (writeMultipleTables) { 526 first = 527 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 528 } 529 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 530 throw new IllegalArgumentException( 531 "First region of table should have empty start key. Instead has: " 532 + Bytes.toStringBinary(first.get())); 533 } 534 sorted.remove(sorted.first()); 535 536 // Write the actual file 537 FileSystem fs = partitionsPath.getFileSystem(conf); 538 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, 539 ImmutableBytesWritable.class, NullWritable.class); 540 541 try { 542 for (ImmutableBytesWritable startKey : sorted) { 543 writer.append(startKey, NullWritable.get()); 544 } 545 } finally { 546 writer.close(); 547 } 548 } 549 550 /** 551 * Configure a MapReduce Job to perform an incremental load into the given table. This 552 * <ul> 553 * <li>Inspects the table to configure a total order partitioner</li> 554 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 555 * <li>Sets the number of reduce tasks to match the current number of regions</li> 556 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 557 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 558 * PutSortReducer)</li> 559 * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li> 560 * </ul> 561 * The user should be sure to set the map output value class to either KeyValue or Put before 562 * running this function. 563 */ 564 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 565 throws IOException { 566 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 567 configureRemoteCluster(job, table.getConfiguration()); 568 } 569 570 /** 571 * Configure a MapReduce Job to perform an incremental load into the given table. This 572 * <ul> 573 * <li>Inspects the table to configure a total order partitioner</li> 574 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 575 * <li>Sets the number of reduce tasks to match the current number of regions</li> 576 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 577 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 578 * PutSortReducer)</li> 579 * </ul> 580 * The user should be sure to set the map output value class to either KeyValue or Put before 581 * running this function. 582 */ 583 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 584 RegionLocator regionLocator) throws IOException { 585 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 586 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 587 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 588 } 589 590 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 591 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 592 Configuration conf = job.getConfiguration(); 593 job.setOutputKeyClass(ImmutableBytesWritable.class); 594 job.setOutputValueClass(MapReduceExtendedCell.class); 595 job.setOutputFormatClass(cls); 596 597 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 598 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 599 } 600 boolean writeMultipleTables = false; 601 if (MultiTableHFileOutputFormat.class.equals(cls)) { 602 writeMultipleTables = true; 603 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 604 } 605 // Based on the configured map output class, set the correct reducer to properly 606 // sort the incoming values. 607 // TODO it would be nice to pick one or the other of these formats. 608 if ( 609 KeyValue.class.equals(job.getMapOutputValueClass()) 610 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) 611 ) { 612 job.setReducerClass(CellSortReducer.class); 613 } else if (Put.class.equals(job.getMapOutputValueClass())) { 614 job.setReducerClass(PutSortReducer.class); 615 } else if (Text.class.equals(job.getMapOutputValueClass())) { 616 job.setReducerClass(TextSortReducer.class); 617 } else { 618 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 619 } 620 621 conf.setStrings("io.serializations", conf.get("io.serializations"), 622 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 623 CellSerialization.class.getName()); 624 625 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 626 LOG.info("bulkload locality sensitive enabled"); 627 } 628 629 /* Now get the region start keys for every table required */ 630 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 631 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 632 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 633 634 for (TableInfo tableInfo : multiTableInfo) { 635 regionLocators.add(tableInfo.getRegionLocator()); 636 String tn = writeMultipleTables 637 ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() 638 : tableInfo.getRegionLocator().getName().getNameAsString(); 639 allTableNames.add(tn); 640 tableDescriptors.add(tableInfo.getTableDescriptor()); 641 } 642 // Record tablenames for creating writer by favored nodes, and decoding compression, 643 // block size and other attributes of columnfamily per table 644 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 645 StringUtils.join(allTableNames, Bytes.toString(tableSeparator))); 646 List<ImmutableBytesWritable> startKeys = 647 getRegionStartKeys(regionLocators, writeMultipleTables); 648 // Use table's region boundaries for TOP split points. 649 LOG.info("Configuring " + startKeys.size() + " reduce partitions " 650 + "to match current region count for all tables"); 651 job.setNumReduceTasks(startKeys.size()); 652 653 configurePartitioner(job, startKeys, writeMultipleTables); 654 // Set compression algorithms based on column families 655 656 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 657 serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); 658 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 659 serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); 660 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 661 serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); 662 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 663 serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); 664 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 665 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 666 667 TableMapReduceUtil.addDependencyJars(job); 668 TableMapReduceUtil.initCredentials(job); 669 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 670 } 671 672 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) 673 throws IOException { 674 Configuration conf = job.getConfiguration(); 675 676 job.setOutputKeyClass(ImmutableBytesWritable.class); 677 job.setOutputValueClass(MapReduceExtendedCell.class); 678 job.setOutputFormatClass(HFileOutputFormat2.class); 679 680 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 681 singleTableDescriptor.add(tableDescriptor); 682 683 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 684 // Set compression algorithms based on column families 685 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 686 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 687 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 688 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 689 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 690 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 691 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 692 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 693 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 694 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 695 696 TableMapReduceUtil.addDependencyJars(job); 697 TableMapReduceUtil.initCredentials(job); 698 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 699 } 700 701 /** 702 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 703 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 704 * HBase cluster to be used to load region location is configured in the job configuration. Call 705 * this method when another HBase cluster key is configured in the job configuration. For example, 706 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 707 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 708 * A and locality-sensitive won't working correctly. 709 * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using 710 * {@link Table#getConfiguration} as clusterConf. See HBASE-25608. 711 * @param job which has configuration to be updated 712 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 713 * @see #configureIncrementalLoad(Job, Table, RegionLocator) 714 * @see #LOCALITY_SENSITIVE_CONF_KEY 715 * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY 716 * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY 717 * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY 718 */ 719 public static void configureRemoteCluster(Job job, Configuration clusterConf) { 720 Configuration conf = job.getConfiguration(); 721 722 if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 723 return; 724 } 725 726 final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); 727 final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 728 HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); 729 final String parent = 730 clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 731 732 conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); 733 conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); 734 conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); 735 736 LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort 737 + "/" + parent); 738 } 739 740 /** 741 * Runs inside the task to deserialize column family to compression algorithm map from the 742 * configuration. 743 * @param conf to read the serialized values from 744 * @return a map from column family to the configured compression algorithm 745 */ 746 @InterfaceAudience.Private 747 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { 748 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); 749 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 750 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 751 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 752 compressionMap.put(e.getKey(), algorithm); 753 } 754 return compressionMap; 755 } 756 757 /** 758 * Runs inside the task to deserialize column family to bloom filter type map from the 759 * configuration. 760 * @param conf to read the serialized values from 761 * @return a map from column family to the the configured bloom filter type 762 */ 763 @InterfaceAudience.Private 764 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 765 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); 766 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 767 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 768 BloomType bloomType = BloomType.valueOf(e.getValue()); 769 bloomTypeMap.put(e.getKey(), bloomType); 770 } 771 return bloomTypeMap; 772 } 773 774 /** 775 * Runs inside the task to deserialize column family to bloom filter param map from the 776 * configuration. 777 * @param conf to read the serialized values from 778 * @return a map from column family to the the configured bloom filter param 779 */ 780 @InterfaceAudience.Private 781 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 782 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 783 } 784 785 /** 786 * Runs inside the task to deserialize column family to block size map from the configuration. 787 * @param conf to read the serialized values from 788 * @return a map from column family to the configured block size 789 */ 790 @InterfaceAudience.Private 791 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 792 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); 793 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 794 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 795 Integer blockSize = Integer.parseInt(e.getValue()); 796 blockSizeMap.put(e.getKey(), blockSize); 797 } 798 return blockSizeMap; 799 } 800 801 /** 802 * Runs inside the task to deserialize column family to data block encoding type map from the 803 * configuration. 804 * @param conf to read the serialized values from 805 * @return a map from column family to HFileDataBlockEncoder for the configured data block type 806 * for the family 807 */ 808 @InterfaceAudience.Private 809 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { 810 Map<byte[], String> stringMap = 811 createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 812 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 813 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 814 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 815 } 816 return encoderMap; 817 } 818 819 /** 820 * Run inside the task to deserialize column family to given conf value map. 821 * @param conf to read the serialized values from 822 * @param confName conf key to read from the configuration 823 * @return a map of column family to the given configuration value 824 */ 825 private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { 826 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 827 String confVal = conf.get(confName, ""); 828 for (String familyConf : confVal.split("&")) { 829 String[] familySplit = familyConf.split("="); 830 if (familySplit.length != 2) { 831 continue; 832 } 833 try { 834 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 835 URLDecoder.decode(familySplit[1], "UTF-8")); 836 } catch (UnsupportedEncodingException e) { 837 // will not happen with UTF-8 encoding 838 throw new AssertionError(e); 839 } 840 } 841 return confValMap; 842 } 843 844 /** 845 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 846 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 847 */ 848 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, 849 boolean writeMultipleTables) throws IOException { 850 Configuration conf = job.getConfiguration(); 851 // create the partitions file 852 FileSystem fs = FileSystem.get(conf); 853 String hbaseTmpFsDir = 854 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging"); 855 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 856 fs.makeQualified(partitionsPath); 857 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 858 fs.deleteOnExit(partitionsPath); 859 860 // configure job to use it 861 job.setPartitionerClass(TotalOrderPartitioner.class); 862 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 863 } 864 865 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 866 value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 867 @InterfaceAudience.Private 868 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 869 List<TableDescriptor> allTables) throws UnsupportedEncodingException { 870 StringBuilder attributeValue = new StringBuilder(); 871 int i = 0; 872 for (TableDescriptor tableDescriptor : allTables) { 873 if (tableDescriptor == null) { 874 // could happen with mock table instance 875 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 876 return ""; 877 } 878 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 879 if (i++ > 0) { 880 attributeValue.append('&'); 881 } 882 attributeValue.append(URLEncoder 883 .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 884 familyDescriptor.getName())), "UTF-8")); 885 attributeValue.append('='); 886 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 887 } 888 } 889 // Get rid of the last ampersand 890 return attributeValue.toString(); 891 } 892 893 /** 894 * Serialize column family to compression algorithm map to configuration. Invoked while 895 * configuring the MR job for incremental load. 896 */ 897 @InterfaceAudience.Private 898 static Function<ColumnFamilyDescriptor, String> compressionDetails = 899 familyDescriptor -> familyDescriptor.getCompressionType().getName(); 900 901 /** 902 * Serialize column family to block size map to configuration. Invoked while configuring the MR 903 * job for incremental load. 904 */ 905 @InterfaceAudience.Private 906 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 907 familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize()); 908 909 /** 910 * Serialize column family to bloom type map to configuration. Invoked while configuring the MR 911 * job for incremental load. 912 */ 913 @InterfaceAudience.Private 914 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 915 String bloomType = familyDescriptor.getBloomFilterType().toString(); 916 if (bloomType == null) { 917 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 918 } 919 return bloomType; 920 }; 921 922 /** 923 * Serialize column family to bloom param map to configuration. Invoked while configuring the MR 924 * job for incremental load. 925 */ 926 @InterfaceAudience.Private 927 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 928 BloomType bloomType = familyDescriptor.getBloomFilterType(); 929 String bloomParam = ""; 930 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 931 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 932 } 933 return bloomParam; 934 }; 935 936 /** 937 * Serialize column family to data block encoding map to configuration. Invoked while configuring 938 * the MR job for incremental load. 939 */ 940 @InterfaceAudience.Private 941 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 942 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 943 if (encoding == null) { 944 encoding = DataBlockEncoding.NONE; 945 } 946 return encoding.toString(); 947 }; 948 949}