001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040 041import org.apache.commons.lang3.StringUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellComparator; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.HTableDescriptor; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.PrivateCellUtil; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionLocator; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.io.hfile.HFile; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 072import org.apache.hadoop.hbase.regionserver.BloomType; 073import org.apache.hadoop.hbase.regionserver.HStore; 074import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 075import org.apache.hadoop.hbase.util.BloomFilterUtil; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.FSUtils; 079import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 080import org.apache.hadoop.io.NullWritable; 081import org.apache.hadoop.io.SequenceFile; 082import org.apache.hadoop.io.Text; 083import org.apache.hadoop.mapreduce.Job; 084import org.apache.hadoop.mapreduce.OutputCommitter; 085import org.apache.hadoop.mapreduce.OutputFormat; 086import org.apache.hadoop.mapreduce.RecordWriter; 087import org.apache.hadoop.mapreduce.TaskAttemptContext; 088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 096 097/** 098 * Writes HFiles. Passed Cells must arrive in order. 099 * Writes current time as the sequence id for the file. Sets the major compacted 100 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll 101 * all HFiles being written. 102 * <p> 103 * Using this class as part of a MapReduce job is best done 104 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 105 */ 106@InterfaceAudience.Public 107public class HFileOutputFormat2 108 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 109 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 110 static class TableInfo { 111 private TableDescriptor tableDesctiptor; 112 private RegionLocator regionLocator; 113 114 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 115 this.tableDesctiptor = tableDesctiptor; 116 this.regionLocator = regionLocator; 117 } 118 119 /** 120 * The modification for the returned HTD doesn't affect the inner TD. 121 * @return A clone of inner table descriptor 122 * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()} 123 * instead. 124 * @see #getTableDescriptor() 125 * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a> 126 */ 127 @Deprecated 128 public HTableDescriptor getHTableDescriptor() { 129 return new HTableDescriptor(tableDesctiptor); 130 } 131 132 public TableDescriptor getTableDescriptor() { 133 return tableDesctiptor; 134 } 135 136 public RegionLocator getRegionLocator() { 137 return regionLocator; 138 } 139 } 140 141 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 142 143 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 144 return Bytes.add(tableName, tableSeparator, suffix); 145 } 146 147 // The following constants are private since these are used by 148 // HFileOutputFormat2 to internally transfer data between job setup and 149 // reducer run using conf. 150 // These should not be changed by the client. 151 static final String COMPRESSION_FAMILIES_CONF_KEY = 152 "hbase.hfileoutputformat.families.compression"; 153 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = 154 "hbase.hfileoutputformat.families.bloomtype"; 155 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = 156 "hbase.hfileoutputformat.families.bloomparam"; 157 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = 158 "hbase.mapreduce.hfileoutputformat.blocksize"; 159 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 160 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 161 162 // This constant is public since the client can modify this when setting 163 // up their conf object and thus refer to this symbol. 164 // It is present for backwards compatibility reasons. Use it only to 165 // override the auto-detection of datablock encoding and compression. 166 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 167 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 168 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 169 "hbase.mapreduce.hfileoutputformat.compression"; 170 171 /** 172 * Keep locality while generating HFiles for bulkload. See HBASE-12596 173 */ 174 public static final String LOCALITY_SENSITIVE_CONF_KEY = 175 "hbase.bulkload.locality.sensitive.enabled"; 176 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 177 static final String OUTPUT_TABLE_NAME_CONF_KEY = 178 "hbase.mapreduce.hfileoutputformat.table.name"; 179 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 180 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 181 182 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 183 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 184 185 @Override 186 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( 187 final TaskAttemptContext context) throws IOException, InterruptedException { 188 return createRecordWriter(context, this.getOutputCommitter(context)); 189 } 190 191 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 192 return combineTableNameSuffix(tableName, family); 193 } 194 195 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> 196 createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) 197 throws IOException { 198 199 // Get the path of the temporary output file 200 final Path outputDir = ((FileOutputCommitter)committer).getWorkPath(); 201 final Configuration conf = context.getConfiguration(); 202 final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ; 203 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 204 if (writeTableNames==null || writeTableNames.isEmpty()) { 205 throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY 206 + " cannot be empty"); 207 } 208 final FileSystem fs = outputDir.getFileSystem(conf); 209 // These configs. are from hbase-*.xml 210 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, 211 HConstants.DEFAULT_MAX_FILE_SIZE); 212 // Invented config. Add to hbase-*.xml if other than default compression. 213 final String defaultCompressionStr = conf.get("hfile.compression", 214 Compression.Algorithm.NONE.getName()); 215 final Algorithm defaultCompression = HFileWriterImpl 216 .compressionByName(defaultCompressionStr); 217 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 218 final Algorithm overriddenCompression; 219 if (compressionStr != null) { 220 overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr); 221 } else { 222 overriddenCompression = null; 223 } 224 final boolean compactionExclude = conf.getBoolean( 225 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 226 227 final Set<String> allTableNames = Arrays.stream(writeTableNames.split( 228 Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 229 230 // create a map from column family to the compression algorithm 231 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 232 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 233 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 234 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 235 236 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 237 final Map<byte[], DataBlockEncoding> datablockEncodingMap 238 = createFamilyDataBlockEncodingMap(conf); 239 final DataBlockEncoding overriddenEncoding; 240 if (dataBlockEncodingStr != null) { 241 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); 242 } else { 243 overriddenEncoding = null; 244 } 245 246 return new RecordWriter<ImmutableBytesWritable, V>() { 247 // Map of families to writers and how much has been output on the writer. 248 private final Map<byte[], WriterLength> writers = 249 new TreeMap<>(Bytes.BYTES_COMPARATOR); 250 private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; 251 private final long now = EnvironmentEdgeManager.currentTime(); 252 private boolean rollRequested = false; 253 254 @Override 255 public void write(ImmutableBytesWritable row, V cell) 256 throws IOException { 257 Cell kv = cell; 258 // null input == user explicitly wants to flush 259 if (row == null && kv == null) { 260 rollWriters(null); 261 return; 262 } 263 264 byte[] rowKey = CellUtil.cloneRow(kv); 265 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 266 byte[] family = CellUtil.cloneFamily(kv); 267 byte[] tableNameBytes = null; 268 if (writeMultipleTables) { 269 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 270 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 271 throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + 272 "' not" + " expected"); 273 } 274 } else { 275 tableNameBytes = Bytes.toBytes(writeTableNames); 276 } 277 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 278 WriterLength wl = this.writers.get(tableAndFamily); 279 280 // If this is a new column family, verify that the directory exists 281 if (wl == null) { 282 Path writerPath = null; 283 if (writeMultipleTables) { 284 writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes 285 .toString(family))); 286 } 287 else { 288 writerPath = new Path(outputDir, Bytes.toString(family)); 289 } 290 fs.mkdirs(writerPath); 291 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 292 } 293 294 if (wl != null && wl.written + length >= maxsize) { 295 this.rollRequested = true; 296 } 297 298 // This can only happen once a row is finished though 299 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { 300 rollWriters(wl); 301 } 302 303 // create a new WAL writer, if necessary 304 if (wl == null || wl.writer == null) { 305 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 306 HRegionLocation loc = null; 307 308 String tableName = Bytes.toString(tableNameBytes); 309 if (tableName != null) { 310 try (Connection connection = ConnectionFactory.createConnection(conf); 311 RegionLocator locator = 312 connection.getRegionLocator(TableName.valueOf(tableName))) { 313 loc = locator.getRegionLocation(rowKey); 314 } catch (Throwable e) { 315 LOG.warn("There's something wrong when locating rowkey: " + 316 Bytes.toString(rowKey) + " for tablename: " + tableName, e); 317 loc = null; 318 } } 319 320 if (null == loc) { 321 if (LOG.isTraceEnabled()) { 322 LOG.trace("failed to get region location, so use default writer for rowkey: " + 323 Bytes.toString(rowKey)); 324 } 325 wl = getNewWriter(tableNameBytes, family, conf, null); 326 } else { 327 if (LOG.isDebugEnabled()) { 328 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); 329 } 330 InetSocketAddress initialIsa = 331 new InetSocketAddress(loc.getHostname(), loc.getPort()); 332 if (initialIsa.isUnresolved()) { 333 if (LOG.isTraceEnabled()) { 334 LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" 335 + loc.getPort() + ", so use default writer"); 336 } 337 wl = getNewWriter(tableNameBytes, family, conf, null); 338 } else { 339 if (LOG.isDebugEnabled()) { 340 LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); 341 } 342 wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa 343 }); 344 } 345 } 346 } else { 347 wl = getNewWriter(tableNameBytes, family, conf, null); 348 } 349 } 350 351 // we now have the proper WAL writer. full steam ahead 352 PrivateCellUtil.updateLatestStamp(cell, this.now); 353 wl.writer.append(kv); 354 wl.written += length; 355 356 // Copy the row so we know when a row transition. 357 this.previousRow = rowKey; 358 } 359 360 private void rollWriters(WriterLength writerLength) throws IOException { 361 if (writerLength != null) { 362 closeWriter(writerLength); 363 } else { 364 for (WriterLength wl : this.writers.values()) { 365 closeWriter(wl); 366 } 367 } 368 this.rollRequested = false; 369 } 370 371 private void closeWriter(WriterLength wl) throws IOException { 372 if (wl.writer != null) { 373 LOG.info( 374 "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); 375 close(wl.writer); 376 } 377 wl.writer = null; 378 wl.written = 0; 379 } 380 381 /* 382 * Create a new StoreFile.Writer. 383 * @param family 384 * @return A WriterLength, containing a new StoreFile.Writer. 385 * @throws IOException 386 */ 387 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", 388 justification="Not important") 389 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration 390 conf, InetSocketAddress[] favoredNodes) throws IOException { 391 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 392 Path familydir = new Path(outputDir, Bytes.toString(family)); 393 if (writeMultipleTables) { 394 familydir = new Path(outputDir, 395 new Path(Bytes.toString(tableName), Bytes.toString(family))); 396 } 397 WriterLength wl = new WriterLength(); 398 Algorithm compression = overriddenCompression; 399 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 400 compression = compression == null ? defaultCompression : compression; 401 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 402 bloomType = bloomType == null ? BloomType.NONE : bloomType; 403 String bloomParam = bloomParamMap.get(tableAndFamily); 404 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 405 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 406 } 407 Integer blockSize = blockSizeMap.get(tableAndFamily); 408 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 409 DataBlockEncoding encoding = overriddenEncoding; 410 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 411 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 412 HFileContextBuilder contextBuilder = new HFileContextBuilder() 413 .withCompression(compression) 414 .withChecksumType(HStore.getChecksumType(conf)) 415 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) 416 .withBlockSize(blockSize); 417 418 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 419 contextBuilder.withIncludesTags(true); 420 } 421 422 contextBuilder.withDataBlockEncoding(encoding); 423 HFileContext hFileContext = contextBuilder.build(); 424 if (null == favoredNodes) { 425 wl.writer = 426 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) 427 .withOutputDir(familydir).withBloomType(bloomType) 428 .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); 429 } else { 430 wl.writer = 431 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 432 .withOutputDir(familydir).withBloomType(bloomType) 433 .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) 434 .withFavoredNodes(favoredNodes).build(); 435 } 436 437 this.writers.put(tableAndFamily, wl); 438 return wl; 439 } 440 441 private void close(final StoreFileWriter w) throws IOException { 442 if (w != null) { 443 w.appendFileInfo(BULKLOAD_TIME_KEY, 444 Bytes.toBytes(System.currentTimeMillis())); 445 w.appendFileInfo(BULKLOAD_TASK_KEY, 446 Bytes.toBytes(context.getTaskAttemptID().toString())); 447 w.appendFileInfo(MAJOR_COMPACTION_KEY, 448 Bytes.toBytes(true)); 449 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, 450 Bytes.toBytes(compactionExclude)); 451 w.appendTrackedTimestampsToMetadata(); 452 w.close(); 453 } 454 } 455 456 @Override 457 public void close(TaskAttemptContext c) 458 throws IOException, InterruptedException { 459 for (WriterLength wl: this.writers.values()) { 460 close(wl.writer); 461 } 462 } 463 }; 464 } 465 466 /** 467 * Configure block storage policy for CF after the directory is created. 468 */ 469 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 470 byte[] tableAndFamily, Path cfPath) { 471 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 472 return; 473 } 474 475 String policy = 476 conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 477 conf.get(STORAGE_POLICY_PROPERTY)); 478 FSUtils.setStoragePolicy(fs, cfPath, policy); 479 } 480 481 /* 482 * Data structure to hold a Writer and amount of data written on it. 483 */ 484 static class WriterLength { 485 long written = 0; 486 StoreFileWriter writer = null; 487 } 488 489 /** 490 * Return the start keys of all of the regions in this table, 491 * as a list of ImmutableBytesWritable. 492 */ 493 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 494 boolean writeMultipleTables) 495 throws IOException { 496 497 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 498 for(RegionLocator regionLocator : regionLocators) 499 { 500 TableName tableName = regionLocator.getName(); 501 LOG.info("Looking up current regions for table " + tableName); 502 byte[][] byteKeys = regionLocator.getStartKeys(); 503 for (byte[] byteKey : byteKeys) { 504 byte[] fullKey = byteKey; //HFileOutputFormat2 use case 505 if (writeMultipleTables) 506 { 507 //MultiTableHFileOutputFormat use case 508 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 509 } 510 if (LOG.isDebugEnabled()) { 511 LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary 512 (fullKey) + "]"); 513 } 514 ret.add(new ImmutableBytesWritable(fullKey)); 515 } 516 } 517 return ret; 518 } 519 520 /** 521 * Write out a {@link SequenceFile} that can be read by 522 * {@link TotalOrderPartitioner} that contains the split points in startKeys. 523 */ 524 @SuppressWarnings("deprecation") 525 private static void writePartitions(Configuration conf, Path partitionsPath, 526 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 527 LOG.info("Writing partition information to " + partitionsPath); 528 if (startKeys.isEmpty()) { 529 throw new IllegalArgumentException("No regions passed"); 530 } 531 532 // We're generating a list of split points, and we don't ever 533 // have keys < the first region (which has an empty start key) 534 // so we need to remove it. Otherwise we would end up with an 535 // empty reducer with index 0 536 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 537 ImmutableBytesWritable first = sorted.first(); 538 if (writeMultipleTables) { 539 first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first 540 ().get())); 541 } 542 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 543 throw new IllegalArgumentException( 544 "First region of table should have empty start key. Instead has: " 545 + Bytes.toStringBinary(first.get())); 546 } 547 sorted.remove(sorted.first()); 548 549 // Write the actual file 550 FileSystem fs = partitionsPath.getFileSystem(conf); 551 SequenceFile.Writer writer = SequenceFile.createWriter( 552 fs, conf, partitionsPath, ImmutableBytesWritable.class, 553 NullWritable.class); 554 555 try { 556 for (ImmutableBytesWritable startKey : sorted) { 557 writer.append(startKey, NullWritable.get()); 558 } 559 } finally { 560 writer.close(); 561 } 562 } 563 564 /** 565 * Configure a MapReduce Job to perform an incremental load into the given 566 * table. This 567 * <ul> 568 * <li>Inspects the table to configure a total order partitioner</li> 569 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 570 * <li>Sets the number of reduce tasks to match the current number of regions</li> 571 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 572 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 573 * PutSortReducer)</li> 574 * </ul> 575 * The user should be sure to set the map output value class to either KeyValue or Put before 576 * running this function. 577 */ 578 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 579 throws IOException { 580 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 581 } 582 583 /** 584 * Configure a MapReduce Job to perform an incremental load into the given 585 * table. This 586 * <ul> 587 * <li>Inspects the table to configure a total order partitioner</li> 588 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 589 * <li>Sets the number of reduce tasks to match the current number of regions</li> 590 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 591 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 592 * PutSortReducer)</li> 593 * </ul> 594 * The user should be sure to set the map output value class to either KeyValue or Put before 595 * running this function. 596 */ 597 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 598 RegionLocator regionLocator) throws IOException { 599 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 600 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 601 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 602 } 603 604 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 605 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 606 Configuration conf = job.getConfiguration(); 607 job.setOutputKeyClass(ImmutableBytesWritable.class); 608 job.setOutputValueClass(MapReduceExtendedCell.class); 609 job.setOutputFormatClass(cls); 610 611 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 612 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 613 } 614 boolean writeMultipleTables = false; 615 if (MultiTableHFileOutputFormat.class.equals(cls)) { 616 writeMultipleTables = true; 617 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 618 } 619 // Based on the configured map output class, set the correct reducer to properly 620 // sort the incoming values. 621 // TODO it would be nice to pick one or the other of these formats. 622 if (KeyValue.class.equals(job.getMapOutputValueClass()) 623 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { 624 job.setReducerClass(CellSortReducer.class); 625 } else if (Put.class.equals(job.getMapOutputValueClass())) { 626 job.setReducerClass(PutSortReducer.class); 627 } else if (Text.class.equals(job.getMapOutputValueClass())) { 628 job.setReducerClass(TextSortReducer.class); 629 } else { 630 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 631 } 632 633 conf.setStrings("io.serializations", conf.get("io.serializations"), 634 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 635 CellSerialization.class.getName()); 636 637 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 638 LOG.info("bulkload locality sensitive enabled"); 639 } 640 641 /* Now get the region start keys for every table required */ 642 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 643 List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size()); 644 List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size()); 645 646 for( TableInfo tableInfo : multiTableInfo ) 647 { 648 regionLocators.add(tableInfo.getRegionLocator()); 649 allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); 650 tableDescriptors.add(tableInfo.getTableDescriptor()); 651 } 652 // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table 653 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes 654 .toString(tableSeparator))); 655 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); 656 // Use table's region boundaries for TOP split points. 657 LOG.info("Configuring " + startKeys.size() + " reduce partitions " + 658 "to match current region count for all tables"); 659 job.setNumReduceTasks(startKeys.size()); 660 661 configurePartitioner(job, startKeys, writeMultipleTables); 662 // Set compression algorithms based on column families 663 664 conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, 665 tableDescriptors)); 666 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, 667 tableDescriptors)); 668 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, 669 tableDescriptors)); 670 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, 671 tableDescriptors)); 672 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 673 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 674 675 TableMapReduceUtil.addDependencyJars(job); 676 TableMapReduceUtil.initCredentials(job); 677 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 678 } 679 680 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws 681 IOException { 682 Configuration conf = job.getConfiguration(); 683 684 job.setOutputKeyClass(ImmutableBytesWritable.class); 685 job.setOutputValueClass(MapReduceExtendedCell.class); 686 job.setOutputFormatClass(HFileOutputFormat2.class); 687 688 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 689 singleTableDescriptor.add(tableDescriptor); 690 691 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 692 // Set compression algorithms based on column families 693 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 694 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 695 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 696 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 697 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 698 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 699 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 700 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 701 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 702 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 703 704 TableMapReduceUtil.addDependencyJars(job); 705 TableMapReduceUtil.initCredentials(job); 706 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 707 } 708 709 /** 710 * Runs inside the task to deserialize column family to compression algorithm 711 * map from the configuration. 712 * 713 * @param conf to read the serialized values from 714 * @return a map from column family to the configured compression algorithm 715 */ 716 @VisibleForTesting 717 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 718 conf) { 719 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 720 COMPRESSION_FAMILIES_CONF_KEY); 721 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 722 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 723 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 724 compressionMap.put(e.getKey(), algorithm); 725 } 726 return compressionMap; 727 } 728 729 /** 730 * Runs inside the task to deserialize column family to bloom filter type 731 * map from the configuration. 732 * 733 * @param conf to read the serialized values from 734 * @return a map from column family to the the configured bloom filter type 735 */ 736 @VisibleForTesting 737 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 738 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 739 BLOOM_TYPE_FAMILIES_CONF_KEY); 740 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 741 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 742 BloomType bloomType = BloomType.valueOf(e.getValue()); 743 bloomTypeMap.put(e.getKey(), bloomType); 744 } 745 return bloomTypeMap; 746 } 747 748 /** 749 * Runs inside the task to deserialize column family to bloom filter param 750 * map from the configuration. 751 * 752 * @param conf to read the serialized values from 753 * @return a map from column family to the the configured bloom filter param 754 */ 755 @VisibleForTesting 756 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 757 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 758 } 759 760 761 /** 762 * Runs inside the task to deserialize column family to block size 763 * map from the configuration. 764 * 765 * @param conf to read the serialized values from 766 * @return a map from column family to the configured block size 767 */ 768 @VisibleForTesting 769 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 770 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 771 BLOCK_SIZE_FAMILIES_CONF_KEY); 772 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 773 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 774 Integer blockSize = Integer.parseInt(e.getValue()); 775 blockSizeMap.put(e.getKey(), blockSize); 776 } 777 return blockSizeMap; 778 } 779 780 /** 781 * Runs inside the task to deserialize column family to data block encoding 782 * type map from the configuration. 783 * 784 * @param conf to read the serialized values from 785 * @return a map from column family to HFileDataBlockEncoder for the 786 * configured data block type for the family 787 */ 788 @VisibleForTesting 789 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( 790 Configuration conf) { 791 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 792 DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 793 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 794 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 795 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 796 } 797 return encoderMap; 798 } 799 800 801 /** 802 * Run inside the task to deserialize column family to given conf value map. 803 * 804 * @param conf to read the serialized values from 805 * @param confName conf key to read from the configuration 806 * @return a map of column family to the given configuration value 807 */ 808 private static Map<byte[], String> createFamilyConfValueMap( 809 Configuration conf, String confName) { 810 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 811 String confVal = conf.get(confName, ""); 812 for (String familyConf : confVal.split("&")) { 813 String[] familySplit = familyConf.split("="); 814 if (familySplit.length != 2) { 815 continue; 816 } 817 try { 818 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 819 URLDecoder.decode(familySplit[1], "UTF-8")); 820 } catch (UnsupportedEncodingException e) { 821 // will not happen with UTF-8 encoding 822 throw new AssertionError(e); 823 } 824 } 825 return confValMap; 826 } 827 828 /** 829 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 830 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 831 */ 832 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean 833 writeMultipleTables) 834 throws IOException { 835 Configuration conf = job.getConfiguration(); 836 // create the partitions file 837 FileSystem fs = FileSystem.get(conf); 838 String hbaseTmpFsDir = 839 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 840 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 841 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 842 fs.makeQualified(partitionsPath); 843 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 844 fs.deleteOnExit(partitionsPath); 845 846 // configure job to use it 847 job.setPartitionerClass(TotalOrderPartitioner.class); 848 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 849 } 850 851 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 852 @VisibleForTesting 853 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables) 854 throws UnsupportedEncodingException { 855 StringBuilder attributeValue = new StringBuilder(); 856 int i = 0; 857 for (TableDescriptor tableDescriptor : allTables) { 858 if (tableDescriptor == null) { 859 // could happen with mock table instance 860 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 861 return ""; 862 } 863 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 864 if (i++ > 0) { 865 attributeValue.append('&'); 866 } 867 attributeValue.append(URLEncoder.encode( 868 Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), 869 "UTF-8")); 870 attributeValue.append('='); 871 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 872 } 873 } 874 // Get rid of the last ampersand 875 return attributeValue.toString(); 876 } 877 878 /** 879 * Serialize column family to compression algorithm map to configuration. 880 * Invoked while configuring the MR job for incremental load. 881 */ 882 @VisibleForTesting 883 static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> 884 familyDescriptor.getCompressionType().getName(); 885 886 /** 887 * Serialize column family to block size map to configuration. Invoked while 888 * configuring the MR job for incremental load. 889 */ 890 @VisibleForTesting 891 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String 892 .valueOf(familyDescriptor.getBlocksize()); 893 894 /** 895 * Serialize column family to bloom type map to configuration. Invoked while 896 * configuring the MR job for incremental load. 897 */ 898 @VisibleForTesting 899 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 900 String bloomType = familyDescriptor.getBloomFilterType().toString(); 901 if (bloomType == null) { 902 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 903 } 904 return bloomType; 905 }; 906 907 /** 908 * Serialize column family to bloom param map to configuration. Invoked while 909 * configuring the MR job for incremental load. 910 */ 911 @VisibleForTesting 912 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 913 BloomType bloomType = familyDescriptor.getBloomFilterType(); 914 String bloomParam = ""; 915 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 916 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 917 } 918 return bloomParam; 919 }; 920 921 /** 922 * Serialize column family to data block encoding map to configuration. 923 * Invoked while configuring the MR job for incremental load. 924 */ 925 @VisibleForTesting 926 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 927 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 928 if (encoding == null) { 929 encoding = DataBlockEncoding.NONE; 930 } 931 return encoding.toString(); 932 }; 933 934}