001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040 041import org.apache.commons.lang3.StringUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellComparator; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.HTableDescriptor; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.PrivateCellUtil; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionLocator; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.io.hfile.HFile; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 072import org.apache.hadoop.hbase.regionserver.BloomType; 073import org.apache.hadoop.hbase.regionserver.HStore; 074import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.FSUtils; 078import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 079import org.apache.hadoop.io.NullWritable; 080import org.apache.hadoop.io.SequenceFile; 081import org.apache.hadoop.io.Text; 082import org.apache.hadoop.mapreduce.Job; 083import org.apache.hadoop.mapreduce.OutputCommitter; 084import org.apache.hadoop.mapreduce.OutputFormat; 085import org.apache.hadoop.mapreduce.RecordWriter; 086import org.apache.hadoop.mapreduce.TaskAttemptContext; 087import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 088import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 089import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 095 096/** 097 * Writes HFiles. Passed Cells must arrive in order. 098 * Writes current time as the sequence id for the file. Sets the major compacted 099 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll 100 * all HFiles being written. 101 * <p> 102 * Using this class as part of a MapReduce job is best done 103 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 104 */ 105@InterfaceAudience.Public 106public class HFileOutputFormat2 107 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 108 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 109 static class TableInfo { 110 private TableDescriptor tableDesctiptor; 111 private RegionLocator regionLocator; 112 113 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 114 this.tableDesctiptor = tableDesctiptor; 115 this.regionLocator = regionLocator; 116 } 117 118 /** 119 * The modification for the returned HTD doesn't affect the inner TD. 120 * @return A clone of inner table descriptor 121 * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()} 122 * instead. 123 * @see #getTableDescriptor() 124 * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a> 125 */ 126 @Deprecated 127 public HTableDescriptor getHTableDescriptor() { 128 return new HTableDescriptor(tableDesctiptor); 129 } 130 131 public TableDescriptor getTableDescriptor() { 132 return tableDesctiptor; 133 } 134 135 public RegionLocator getRegionLocator() { 136 return regionLocator; 137 } 138 } 139 140 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 141 142 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 143 return Bytes.add(tableName, tableSeparator, suffix); 144 } 145 146 // The following constants are private since these are used by 147 // HFileOutputFormat2 to internally transfer data between job setup and 148 // reducer run using conf. 149 // These should not be changed by the client. 150 static final String COMPRESSION_FAMILIES_CONF_KEY = 151 "hbase.hfileoutputformat.families.compression"; 152 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = 153 "hbase.hfileoutputformat.families.bloomtype"; 154 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = 155 "hbase.mapreduce.hfileoutputformat.blocksize"; 156 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 157 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 158 159 // This constant is public since the client can modify this when setting 160 // up their conf object and thus refer to this symbol. 161 // It is present for backwards compatibility reasons. Use it only to 162 // override the auto-detection of datablock encoding. 163 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 164 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 165 166 /** 167 * Keep locality while generating HFiles for bulkload. See HBASE-12596 168 */ 169 public static final String LOCALITY_SENSITIVE_CONF_KEY = 170 "hbase.bulkload.locality.sensitive.enabled"; 171 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 172 static final String OUTPUT_TABLE_NAME_CONF_KEY = 173 "hbase.mapreduce.hfileoutputformat.table.name"; 174 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 175 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 176 177 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 178 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 179 180 @Override 181 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( 182 final TaskAttemptContext context) throws IOException, InterruptedException { 183 return createRecordWriter(context, this.getOutputCommitter(context)); 184 } 185 186 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 187 return combineTableNameSuffix(tableName, family); 188 } 189 190 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> 191 createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) 192 throws IOException { 193 194 // Get the path of the temporary output file 195 final Path outputDir = ((FileOutputCommitter)committer).getWorkPath(); 196 final Configuration conf = context.getConfiguration(); 197 final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ; 198 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 199 if (writeTableNames==null || writeTableNames.isEmpty()) { 200 throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY 201 + " cannot be empty"); 202 } 203 final FileSystem fs = outputDir.getFileSystem(conf); 204 // These configs. are from hbase-*.xml 205 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, 206 HConstants.DEFAULT_MAX_FILE_SIZE); 207 // Invented config. Add to hbase-*.xml if other than default compression. 208 final String defaultCompressionStr = conf.get("hfile.compression", 209 Compression.Algorithm.NONE.getName()); 210 final Algorithm defaultCompression = HFileWriterImpl 211 .compressionByName(defaultCompressionStr); 212 final boolean compactionExclude = conf.getBoolean( 213 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 214 215 final Set<String> allTableNames = Arrays.stream(writeTableNames.split( 216 Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 217 218 // create a map from column family to the compression algorithm 219 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 220 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 221 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 222 223 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 224 final Map<byte[], DataBlockEncoding> datablockEncodingMap 225 = createFamilyDataBlockEncodingMap(conf); 226 final DataBlockEncoding overriddenEncoding; 227 if (dataBlockEncodingStr != null) { 228 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); 229 } else { 230 overriddenEncoding = null; 231 } 232 233 return new RecordWriter<ImmutableBytesWritable, V>() { 234 // Map of families to writers and how much has been output on the writer. 235 private final Map<byte[], WriterLength> writers = 236 new TreeMap<>(Bytes.BYTES_COMPARATOR); 237 private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; 238 private final long now = EnvironmentEdgeManager.currentTime(); 239 private boolean rollRequested = false; 240 241 @Override 242 public void write(ImmutableBytesWritable row, V cell) 243 throws IOException { 244 Cell kv = cell; 245 // null input == user explicitly wants to flush 246 if (row == null && kv == null) { 247 rollWriters(null); 248 return; 249 } 250 251 byte[] rowKey = CellUtil.cloneRow(kv); 252 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 253 byte[] family = CellUtil.cloneFamily(kv); 254 byte[] tableNameBytes = null; 255 if (writeMultipleTables) { 256 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 257 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 258 throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + 259 "' not" + " expected"); 260 } 261 } else { 262 tableNameBytes = Bytes.toBytes(writeTableNames); 263 } 264 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 265 WriterLength wl = this.writers.get(tableAndFamily); 266 267 // If this is a new column family, verify that the directory exists 268 if (wl == null) { 269 Path writerPath = null; 270 if (writeMultipleTables) { 271 writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes 272 .toString(family))); 273 } 274 else { 275 writerPath = new Path(outputDir, Bytes.toString(family)); 276 } 277 fs.mkdirs(writerPath); 278 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 279 } 280 281 if (wl != null && wl.written + length >= maxsize) { 282 this.rollRequested = true; 283 } 284 285 // This can only happen once a row is finished though 286 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { 287 rollWriters(wl); 288 } 289 290 // create a new WAL writer, if necessary 291 if (wl == null || wl.writer == null) { 292 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 293 HRegionLocation loc = null; 294 295 String tableName = Bytes.toString(tableNameBytes); 296 if (tableName != null) { 297 try (Connection connection = ConnectionFactory.createConnection(conf); 298 RegionLocator locator = 299 connection.getRegionLocator(TableName.valueOf(tableName))) { 300 loc = locator.getRegionLocation(rowKey); 301 } catch (Throwable e) { 302 LOG.warn("There's something wrong when locating rowkey: " + 303 Bytes.toString(rowKey) + " for tablename: " + tableName, e); 304 loc = null; 305 } } 306 307 if (null == loc) { 308 if (LOG.isTraceEnabled()) { 309 LOG.trace("failed to get region location, so use default writer for rowkey: " + 310 Bytes.toString(rowKey)); 311 } 312 wl = getNewWriter(tableNameBytes, family, conf, null); 313 } else { 314 if (LOG.isDebugEnabled()) { 315 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); 316 } 317 InetSocketAddress initialIsa = 318 new InetSocketAddress(loc.getHostname(), loc.getPort()); 319 if (initialIsa.isUnresolved()) { 320 if (LOG.isTraceEnabled()) { 321 LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" 322 + loc.getPort() + ", so use default writer"); 323 } 324 wl = getNewWriter(tableNameBytes, family, conf, null); 325 } else { 326 if (LOG.isDebugEnabled()) { 327 LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); 328 } 329 wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa 330 }); 331 } 332 } 333 } else { 334 wl = getNewWriter(tableNameBytes, family, conf, null); 335 } 336 } 337 338 // we now have the proper WAL writer. full steam ahead 339 PrivateCellUtil.updateLatestStamp(cell, this.now); 340 wl.writer.append(kv); 341 wl.written += length; 342 343 // Copy the row so we know when a row transition. 344 this.previousRow = rowKey; 345 } 346 347 private void rollWriters(WriterLength writerLength) throws IOException { 348 if (writerLength != null) { 349 closeWriter(writerLength); 350 } else { 351 for (WriterLength wl : this.writers.values()) { 352 closeWriter(wl); 353 } 354 } 355 this.rollRequested = false; 356 } 357 358 private void closeWriter(WriterLength wl) throws IOException { 359 if (wl.writer != null) { 360 LOG.info( 361 "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); 362 close(wl.writer); 363 } 364 wl.writer = null; 365 wl.written = 0; 366 } 367 368 /* 369 * Create a new StoreFile.Writer. 370 * @param family 371 * @return A WriterLength, containing a new StoreFile.Writer. 372 * @throws IOException 373 */ 374 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", 375 justification="Not important") 376 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration 377 conf, InetSocketAddress[] favoredNodes) throws IOException { 378 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 379 Path familydir = new Path(outputDir, Bytes.toString(family)); 380 if (writeMultipleTables) { 381 familydir = new Path(outputDir, 382 new Path(Bytes.toString(tableName), Bytes.toString(family))); 383 } 384 WriterLength wl = new WriterLength(); 385 Algorithm compression = compressionMap.get(tableAndFamily); 386 compression = compression == null ? defaultCompression : compression; 387 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 388 bloomType = bloomType == null ? BloomType.NONE : bloomType; 389 Integer blockSize = blockSizeMap.get(tableAndFamily); 390 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 391 DataBlockEncoding encoding = overriddenEncoding; 392 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 393 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 394 HFileContextBuilder contextBuilder = new HFileContextBuilder() 395 .withCompression(compression) 396 .withChecksumType(HStore.getChecksumType(conf)) 397 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) 398 .withBlockSize(blockSize); 399 400 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 401 contextBuilder.withIncludesTags(true); 402 } 403 404 contextBuilder.withDataBlockEncoding(encoding); 405 HFileContext hFileContext = contextBuilder.build(); 406 if (null == favoredNodes) { 407 wl.writer = 408 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) 409 .withOutputDir(familydir).withBloomType(bloomType) 410 .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); 411 } else { 412 wl.writer = 413 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 414 .withOutputDir(familydir).withBloomType(bloomType) 415 .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) 416 .withFavoredNodes(favoredNodes).build(); 417 } 418 419 this.writers.put(tableAndFamily, wl); 420 return wl; 421 } 422 423 private void close(final StoreFileWriter w) throws IOException { 424 if (w != null) { 425 w.appendFileInfo(BULKLOAD_TIME_KEY, 426 Bytes.toBytes(System.currentTimeMillis())); 427 w.appendFileInfo(BULKLOAD_TASK_KEY, 428 Bytes.toBytes(context.getTaskAttemptID().toString())); 429 w.appendFileInfo(MAJOR_COMPACTION_KEY, 430 Bytes.toBytes(true)); 431 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, 432 Bytes.toBytes(compactionExclude)); 433 w.appendTrackedTimestampsToMetadata(); 434 w.close(); 435 } 436 } 437 438 @Override 439 public void close(TaskAttemptContext c) 440 throws IOException, InterruptedException { 441 for (WriterLength wl: this.writers.values()) { 442 close(wl.writer); 443 } 444 } 445 }; 446 } 447 448 /** 449 * Configure block storage policy for CF after the directory is created. 450 */ 451 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 452 byte[] tableAndFamily, Path cfPath) { 453 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 454 return; 455 } 456 457 String policy = 458 conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 459 conf.get(STORAGE_POLICY_PROPERTY)); 460 FSUtils.setStoragePolicy(fs, cfPath, policy); 461 } 462 463 /* 464 * Data structure to hold a Writer and amount of data written on it. 465 */ 466 static class WriterLength { 467 long written = 0; 468 StoreFileWriter writer = null; 469 } 470 471 /** 472 * Return the start keys of all of the regions in this table, 473 * as a list of ImmutableBytesWritable. 474 */ 475 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 476 boolean writeMultipleTables) 477 throws IOException { 478 479 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 480 for(RegionLocator regionLocator : regionLocators) 481 { 482 TableName tableName = regionLocator.getName(); 483 LOG.info("Looking up current regions for table " + tableName); 484 byte[][] byteKeys = regionLocator.getStartKeys(); 485 for (byte[] byteKey : byteKeys) { 486 byte[] fullKey = byteKey; //HFileOutputFormat2 use case 487 if (writeMultipleTables) 488 { 489 //MultiTableHFileOutputFormat use case 490 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 491 } 492 if (LOG.isDebugEnabled()) { 493 LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary 494 (fullKey) + "]"); 495 } 496 ret.add(new ImmutableBytesWritable(fullKey)); 497 } 498 } 499 return ret; 500 } 501 502 /** 503 * Write out a {@link SequenceFile} that can be read by 504 * {@link TotalOrderPartitioner} that contains the split points in startKeys. 505 */ 506 @SuppressWarnings("deprecation") 507 private static void writePartitions(Configuration conf, Path partitionsPath, 508 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 509 LOG.info("Writing partition information to " + partitionsPath); 510 if (startKeys.isEmpty()) { 511 throw new IllegalArgumentException("No regions passed"); 512 } 513 514 // We're generating a list of split points, and we don't ever 515 // have keys < the first region (which has an empty start key) 516 // so we need to remove it. Otherwise we would end up with an 517 // empty reducer with index 0 518 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 519 ImmutableBytesWritable first = sorted.first(); 520 if (writeMultipleTables) { 521 first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first 522 ().get())); 523 } 524 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 525 throw new IllegalArgumentException( 526 "First region of table should have empty start key. Instead has: " 527 + Bytes.toStringBinary(first.get())); 528 } 529 sorted.remove(sorted.first()); 530 531 // Write the actual file 532 FileSystem fs = partitionsPath.getFileSystem(conf); 533 SequenceFile.Writer writer = SequenceFile.createWriter( 534 fs, conf, partitionsPath, ImmutableBytesWritable.class, 535 NullWritable.class); 536 537 try { 538 for (ImmutableBytesWritable startKey : sorted) { 539 writer.append(startKey, NullWritable.get()); 540 } 541 } finally { 542 writer.close(); 543 } 544 } 545 546 /** 547 * Configure a MapReduce Job to perform an incremental load into the given 548 * table. This 549 * <ul> 550 * <li>Inspects the table to configure a total order partitioner</li> 551 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 552 * <li>Sets the number of reduce tasks to match the current number of regions</li> 553 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 554 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 555 * PutSortReducer)</li> 556 * </ul> 557 * The user should be sure to set the map output value class to either KeyValue or Put before 558 * running this function. 559 */ 560 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 561 throws IOException { 562 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 563 } 564 565 /** 566 * Configure a MapReduce Job to perform an incremental load into the given 567 * table. This 568 * <ul> 569 * <li>Inspects the table to configure a total order partitioner</li> 570 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 571 * <li>Sets the number of reduce tasks to match the current number of regions</li> 572 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 573 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 574 * PutSortReducer)</li> 575 * </ul> 576 * The user should be sure to set the map output value class to either KeyValue or Put before 577 * running this function. 578 */ 579 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 580 RegionLocator regionLocator) throws IOException { 581 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 582 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 583 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 584 } 585 586 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 587 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 588 Configuration conf = job.getConfiguration(); 589 job.setOutputKeyClass(ImmutableBytesWritable.class); 590 job.setOutputValueClass(MapReduceExtendedCell.class); 591 job.setOutputFormatClass(cls); 592 593 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 594 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 595 } 596 boolean writeMultipleTables = false; 597 if (MultiTableHFileOutputFormat.class.equals(cls)) { 598 writeMultipleTables = true; 599 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 600 } 601 // Based on the configured map output class, set the correct reducer to properly 602 // sort the incoming values. 603 // TODO it would be nice to pick one or the other of these formats. 604 if (KeyValue.class.equals(job.getMapOutputValueClass()) 605 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { 606 job.setReducerClass(CellSortReducer.class); 607 } else if (Put.class.equals(job.getMapOutputValueClass())) { 608 job.setReducerClass(PutSortReducer.class); 609 } else if (Text.class.equals(job.getMapOutputValueClass())) { 610 job.setReducerClass(TextSortReducer.class); 611 } else { 612 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 613 } 614 615 conf.setStrings("io.serializations", conf.get("io.serializations"), 616 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 617 CellSerialization.class.getName()); 618 619 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 620 LOG.info("bulkload locality sensitive enabled"); 621 } 622 623 /* Now get the region start keys for every table required */ 624 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 625 List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size()); 626 List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size()); 627 628 for( TableInfo tableInfo : multiTableInfo ) 629 { 630 regionLocators.add(tableInfo.getRegionLocator()); 631 allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); 632 tableDescriptors.add(tableInfo.getTableDescriptor()); 633 } 634 // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table 635 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes 636 .toString(tableSeparator))); 637 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); 638 // Use table's region boundaries for TOP split points. 639 LOG.info("Configuring " + startKeys.size() + " reduce partitions " + 640 "to match current region count for all tables"); 641 job.setNumReduceTasks(startKeys.size()); 642 643 configurePartitioner(job, startKeys, writeMultipleTables); 644 // Set compression algorithms based on column families 645 646 conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, 647 tableDescriptors)); 648 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, 649 tableDescriptors)); 650 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, 651 tableDescriptors)); 652 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 653 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 654 655 TableMapReduceUtil.addDependencyJars(job); 656 TableMapReduceUtil.initCredentials(job); 657 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 658 } 659 660 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws 661 IOException { 662 Configuration conf = job.getConfiguration(); 663 664 job.setOutputKeyClass(ImmutableBytesWritable.class); 665 job.setOutputValueClass(MapReduceExtendedCell.class); 666 job.setOutputFormatClass(HFileOutputFormat2.class); 667 668 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 669 singleTableDescriptor.add(tableDescriptor); 670 671 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 672 // Set compression algorithms based on column families 673 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 674 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 675 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 676 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 677 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 678 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 679 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 680 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 681 682 TableMapReduceUtil.addDependencyJars(job); 683 TableMapReduceUtil.initCredentials(job); 684 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 685 } 686 687 /** 688 * Runs inside the task to deserialize column family to compression algorithm 689 * map from the configuration. 690 * 691 * @param conf to read the serialized values from 692 * @return a map from column family to the configured compression algorithm 693 */ 694 @VisibleForTesting 695 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 696 conf) { 697 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 698 COMPRESSION_FAMILIES_CONF_KEY); 699 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 700 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 701 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 702 compressionMap.put(e.getKey(), algorithm); 703 } 704 return compressionMap; 705 } 706 707 /** 708 * Runs inside the task to deserialize column family to bloom filter type 709 * map from the configuration. 710 * 711 * @param conf to read the serialized values from 712 * @return a map from column family to the the configured bloom filter type 713 */ 714 @VisibleForTesting 715 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 716 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 717 BLOOM_TYPE_FAMILIES_CONF_KEY); 718 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 719 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 720 BloomType bloomType = BloomType.valueOf(e.getValue()); 721 bloomTypeMap.put(e.getKey(), bloomType); 722 } 723 return bloomTypeMap; 724 } 725 726 /** 727 * Runs inside the task to deserialize column family to block size 728 * map from the configuration. 729 * 730 * @param conf to read the serialized values from 731 * @return a map from column family to the configured block size 732 */ 733 @VisibleForTesting 734 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 735 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 736 BLOCK_SIZE_FAMILIES_CONF_KEY); 737 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 738 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 739 Integer blockSize = Integer.parseInt(e.getValue()); 740 blockSizeMap.put(e.getKey(), blockSize); 741 } 742 return blockSizeMap; 743 } 744 745 /** 746 * Runs inside the task to deserialize column family to data block encoding 747 * type map from the configuration. 748 * 749 * @param conf to read the serialized values from 750 * @return a map from column family to HFileDataBlockEncoder for the 751 * configured data block type for the family 752 */ 753 @VisibleForTesting 754 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( 755 Configuration conf) { 756 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 757 DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 758 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 759 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 760 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 761 } 762 return encoderMap; 763 } 764 765 766 /** 767 * Run inside the task to deserialize column family to given conf value map. 768 * 769 * @param conf to read the serialized values from 770 * @param confName conf key to read from the configuration 771 * @return a map of column family to the given configuration value 772 */ 773 private static Map<byte[], String> createFamilyConfValueMap( 774 Configuration conf, String confName) { 775 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 776 String confVal = conf.get(confName, ""); 777 for (String familyConf : confVal.split("&")) { 778 String[] familySplit = familyConf.split("="); 779 if (familySplit.length != 2) { 780 continue; 781 } 782 try { 783 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 784 URLDecoder.decode(familySplit[1], "UTF-8")); 785 } catch (UnsupportedEncodingException e) { 786 // will not happen with UTF-8 encoding 787 throw new AssertionError(e); 788 } 789 } 790 return confValMap; 791 } 792 793 /** 794 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 795 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 796 */ 797 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean 798 writeMultipleTables) 799 throws IOException { 800 Configuration conf = job.getConfiguration(); 801 // create the partitions file 802 FileSystem fs = FileSystem.get(conf); 803 String hbaseTmpFsDir = 804 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 805 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 806 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 807 fs.makeQualified(partitionsPath); 808 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 809 fs.deleteOnExit(partitionsPath); 810 811 // configure job to use it 812 job.setPartitionerClass(TotalOrderPartitioner.class); 813 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 814 } 815 816 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 817 @VisibleForTesting 818 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables) 819 throws UnsupportedEncodingException { 820 StringBuilder attributeValue = new StringBuilder(); 821 int i = 0; 822 for (TableDescriptor tableDescriptor : allTables) { 823 if (tableDescriptor == null) { 824 // could happen with mock table instance 825 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 826 return ""; 827 } 828 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 829 if (i++ > 0) { 830 attributeValue.append('&'); 831 } 832 attributeValue.append(URLEncoder.encode( 833 Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), 834 "UTF-8")); 835 attributeValue.append('='); 836 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 837 } 838 } 839 // Get rid of the last ampersand 840 return attributeValue.toString(); 841 } 842 843 /** 844 * Serialize column family to compression algorithm map to configuration. 845 * Invoked while configuring the MR job for incremental load. 846 */ 847 @VisibleForTesting 848 static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> 849 familyDescriptor.getCompressionType().getName(); 850 851 /** 852 * Serialize column family to block size map to configuration. Invoked while 853 * configuring the MR job for incremental load. 854 */ 855 @VisibleForTesting 856 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String 857 .valueOf(familyDescriptor.getBlocksize()); 858 859 /** 860 * Serialize column family to bloom type map to configuration. Invoked while 861 * configuring the MR job for incremental load. 862 */ 863 @VisibleForTesting 864 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 865 String bloomType = familyDescriptor.getBloomFilterType().toString(); 866 if (bloomType == null) { 867 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 868 } 869 return bloomType; 870 }; 871 872 /** 873 * Serialize column family to data block encoding map to configuration. 874 * Invoked while configuring the MR job for incremental load. 875 */ 876 @VisibleForTesting 877 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 878 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 879 if (encoding == null) { 880 encoding = DataBlockEncoding.NONE; 881 } 882 return encoding.toString(); 883 }; 884}