001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.PrivateCellUtil; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 054import org.apache.hadoop.hbase.client.Connection; 055import org.apache.hadoop.hbase.client.ConnectionFactory; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionLocator; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.fs.HFileSystem; 061import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 062import org.apache.hadoop.hbase.io.compress.Compression; 063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 065import org.apache.hadoop.hbase.io.hfile.CacheConfig; 066import org.apache.hadoop.hbase.io.hfile.HFile; 067import org.apache.hadoop.hbase.io.hfile.HFileContext; 068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 069import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 070import org.apache.hadoop.hbase.regionserver.BloomType; 071import org.apache.hadoop.hbase.regionserver.HStore; 072import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 073import org.apache.hadoop.hbase.util.BloomFilterUtil; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.CommonFSUtils; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 078import org.apache.hadoop.io.NullWritable; 079import org.apache.hadoop.io.SequenceFile; 080import org.apache.hadoop.io.Text; 081import org.apache.hadoop.mapreduce.Job; 082import org.apache.hadoop.mapreduce.OutputCommitter; 083import org.apache.hadoop.mapreduce.OutputFormat; 084import org.apache.hadoop.mapreduce.RecordWriter; 085import org.apache.hadoop.mapreduce.TaskAttemptContext; 086import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 087import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 088import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 094 095/** 096 * Writes HFiles. Passed Cells must arrive in order. 097 * Writes current time as the sequence id for the file. Sets the major compacted 098 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll 099 * all HFiles being written. 100 * <p> 101 * Using this class as part of a MapReduce job is best done 102 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 103 */ 104@InterfaceAudience.Public 105public class HFileOutputFormat2 106 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 107 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 108 static class TableInfo { 109 private TableDescriptor tableDesctiptor; 110 private RegionLocator regionLocator; 111 112 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 113 this.tableDesctiptor = tableDesctiptor; 114 this.regionLocator = regionLocator; 115 } 116 117 /** 118 * The modification for the returned HTD doesn't affect the inner TD. 119 * @return A clone of inner table descriptor 120 * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()} 121 * instead. 122 * @see #getTableDescriptor() 123 * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a> 124 */ 125 @Deprecated 126 public HTableDescriptor getHTableDescriptor() { 127 return new HTableDescriptor(tableDesctiptor); 128 } 129 130 public TableDescriptor getTableDescriptor() { 131 return tableDesctiptor; 132 } 133 134 public RegionLocator getRegionLocator() { 135 return regionLocator; 136 } 137 } 138 139 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 140 141 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 142 return Bytes.add(tableName, tableSeparator, suffix); 143 } 144 145 // The following constants are private since these are used by 146 // HFileOutputFormat2 to internally transfer data between job setup and 147 // reducer run using conf. 148 // These should not be changed by the client. 149 static final String COMPRESSION_FAMILIES_CONF_KEY = 150 "hbase.hfileoutputformat.families.compression"; 151 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = 152 "hbase.hfileoutputformat.families.bloomtype"; 153 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = 154 "hbase.hfileoutputformat.families.bloomparam"; 155 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = 156 "hbase.mapreduce.hfileoutputformat.blocksize"; 157 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 158 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 159 160 // This constant is public since the client can modify this when setting 161 // up their conf object and thus refer to this symbol. 162 // It is present for backwards compatibility reasons. Use it only to 163 // override the auto-detection of datablock encoding and compression. 164 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 165 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 166 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 167 "hbase.mapreduce.hfileoutputformat.compression"; 168 169 /** 170 * Keep locality while generating HFiles for bulkload. See HBASE-12596 171 */ 172 public static final String LOCALITY_SENSITIVE_CONF_KEY = 173 "hbase.bulkload.locality.sensitive.enabled"; 174 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 175 static final String OUTPUT_TABLE_NAME_CONF_KEY = 176 "hbase.mapreduce.hfileoutputformat.table.name"; 177 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 178 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 179 180 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 181 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 182 183 @Override 184 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( 185 final TaskAttemptContext context) throws IOException, InterruptedException { 186 return createRecordWriter(context, this.getOutputCommitter(context)); 187 } 188 189 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 190 return combineTableNameSuffix(tableName, family); 191 } 192 193 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 194 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 195 196 // Get the path of the temporary output file 197 final Path outputDir = ((FileOutputCommitter)committer).getWorkPath(); 198 final Configuration conf = context.getConfiguration(); 199 final boolean writeMultipleTables = 200 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 201 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 202 if (writeTableNames == null || writeTableNames.isEmpty()) { 203 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 204 } 205 final FileSystem fs = outputDir.getFileSystem(conf); 206 // These configs. are from hbase-*.xml 207 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, 208 HConstants.DEFAULT_MAX_FILE_SIZE); 209 // Invented config. Add to hbase-*.xml if other than default compression. 210 final String defaultCompressionStr = conf.get("hfile.compression", 211 Compression.Algorithm.NONE.getName()); 212 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 213 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 214 final Algorithm overriddenCompression = compressionStr != null ? 215 Compression.getCompressionAlgorithmByName(compressionStr): null; 216 final boolean compactionExclude = conf.getBoolean( 217 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 218 final Set<String> allTableNames = Arrays.stream(writeTableNames.split( 219 Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 220 221 // create a map from column family to the compression algorithm 222 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 223 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 224 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 225 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 226 227 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 228 final Map<byte[], DataBlockEncoding> datablockEncodingMap 229 = createFamilyDataBlockEncodingMap(conf); 230 final DataBlockEncoding overriddenEncoding = dataBlockEncodingStr != null ? 231 DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 232 233 return new RecordWriter<ImmutableBytesWritable, V>() { 234 // Map of families to writers and how much has been output on the writer. 235 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 236 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 237 private final long now = EnvironmentEdgeManager.currentTime(); 238 239 @Override 240 public void write(ImmutableBytesWritable row, V cell) throws IOException { 241 Cell kv = cell; 242 // null input == user explicitly wants to flush 243 if (row == null && kv == null) { 244 rollWriters(null); 245 return; 246 } 247 248 byte[] rowKey = CellUtil.cloneRow(kv); 249 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 250 byte[] family = CellUtil.cloneFamily(kv); 251 byte[] tableNameBytes = null; 252 if (writeMultipleTables) { 253 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 254 tableNameBytes = TableName.valueOf(tableNameBytes).toBytes(); 255 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 256 throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) + 257 " not expected"); 258 } 259 } else { 260 tableNameBytes = Bytes.toBytes(writeTableNames); 261 } 262 Path tableRelPath = getTableRelativePath(tableNameBytes); 263 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 264 WriterLength wl = this.writers.get(tableAndFamily); 265 266 // If this is a new column family, verify that the directory exists 267 if (wl == null) { 268 Path writerPath = null; 269 if (writeMultipleTables) { 270 writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family))); 271 } 272 else { 273 writerPath = new Path(outputDir, Bytes.toString(family)); 274 } 275 fs.mkdirs(writerPath); 276 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 277 } 278 279 // This can only happen once a row is finished though 280 if (wl != null && wl.written + length >= maxsize 281 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) { 282 rollWriters(wl); 283 } 284 285 // create a new WAL writer, if necessary 286 if (wl == null || wl.writer == null) { 287 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 288 HRegionLocation loc = null; 289 290 String tableName = Bytes.toString(tableNameBytes); 291 if (tableName != null) { 292 try (Connection connection = ConnectionFactory.createConnection(conf); 293 RegionLocator locator = 294 connection.getRegionLocator(TableName.valueOf(tableName))) { 295 loc = locator.getRegionLocation(rowKey); 296 } catch (Throwable e) { 297 LOG.warn("Something wrong locating rowkey {} in {}", 298 Bytes.toString(rowKey), tableName, e); 299 loc = null; 300 } } 301 302 if (null == loc) { 303 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 304 wl = getNewWriter(tableNameBytes, family, conf, null); 305 } else { 306 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 307 InetSocketAddress initialIsa = 308 new InetSocketAddress(loc.getHostname(), loc.getPort()); 309 if (initialIsa.isUnresolved()) { 310 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 311 wl = getNewWriter(tableNameBytes, family, conf, null); 312 } else { 313 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 314 wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa 315 }); 316 } 317 } 318 } else { 319 wl = getNewWriter(tableNameBytes, family, conf, null); 320 } 321 } 322 323 // we now have the proper WAL writer. full steam ahead 324 PrivateCellUtil.updateLatestStamp(cell, this.now); 325 wl.writer.append(kv); 326 wl.written += length; 327 328 // Copy the row so we know when a row transition. 329 this.previousRows.put(family, rowKey); 330 } 331 332 private Path getTableRelativePath(byte[] tableNameBytes) { 333 String tableName = Bytes.toString(tableNameBytes); 334 String[] tableNameParts = tableName.split(":"); 335 Path tableRelPath = new Path(tableName.split(":")[0]); 336 if (tableNameParts.length > 1) { 337 tableRelPath = new Path(tableRelPath, tableName.split(":")[1]); 338 } 339 return tableRelPath; 340 } 341 342 private void rollWriters(WriterLength writerLength) throws IOException { 343 if (writerLength != null) { 344 closeWriter(writerLength); 345 } else { 346 for (WriterLength wl : this.writers.values()) { 347 closeWriter(wl); 348 } 349 } 350 } 351 352 private void closeWriter(WriterLength wl) throws IOException { 353 if (wl.writer != null) { 354 LOG.info("Writer=" + wl.writer.getPath() + 355 ((wl.written == 0)? "": ", wrote=" + wl.written)); 356 close(wl.writer); 357 wl.writer = null; 358 } 359 wl.written = 0; 360 } 361 362 /* 363 * Create a new StoreFile.Writer. 364 * @return A WriterLength, containing a new StoreFile.Writer. 365 */ 366 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", 367 justification="Not important") 368 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 369 InetSocketAddress[] favoredNodes) throws IOException { 370 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 371 Path familydir = new Path(outputDir, Bytes.toString(family)); 372 if (writeMultipleTables) { 373 familydir = new Path(outputDir, 374 new Path(getTableRelativePath(tableName), Bytes.toString(family))); 375 } 376 WriterLength wl = new WriterLength(); 377 Algorithm compression = overriddenCompression; 378 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 379 compression = compression == null ? defaultCompression : compression; 380 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 381 bloomType = bloomType == null ? BloomType.NONE : bloomType; 382 String bloomParam = bloomParamMap.get(tableAndFamily); 383 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 384 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 385 } 386 Integer blockSize = blockSizeMap.get(tableAndFamily); 387 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 388 DataBlockEncoding encoding = overriddenEncoding; 389 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 390 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 391 HFileContextBuilder contextBuilder = new HFileContextBuilder() 392 .withCompression(compression).withChecksumType(HStore.getChecksumType(conf)) 393 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize) 394 .withColumnFamily(family).withTableName(tableName); 395 396 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 397 contextBuilder.withIncludesTags(true); 398 } 399 400 contextBuilder.withDataBlockEncoding(encoding); 401 HFileContext hFileContext = contextBuilder.build(); 402 if (null == favoredNodes) { 403 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) 404 .withOutputDir(familydir).withBloomType(bloomType) 405 .withFileContext(hFileContext).build(); 406 } else { 407 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 408 .withOutputDir(familydir).withBloomType(bloomType) 409 .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build(); 410 } 411 412 this.writers.put(tableAndFamily, wl); 413 return wl; 414 } 415 416 private void close(final StoreFileWriter w) throws IOException { 417 if (w != null) { 418 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); 419 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 420 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 421 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 422 w.appendTrackedTimestampsToMetadata(); 423 w.close(); 424 } 425 } 426 427 @Override 428 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 429 for (WriterLength wl: this.writers.values()) { 430 close(wl.writer); 431 } 432 } 433 }; 434 } 435 436 /** 437 * Configure block storage policy for CF after the directory is created. 438 */ 439 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 440 byte[] tableAndFamily, Path cfPath) { 441 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 442 return; 443 } 444 445 String policy = 446 conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 447 conf.get(STORAGE_POLICY_PROPERTY)); 448 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 449 } 450 451 /* 452 * Data structure to hold a Writer and amount of data written on it. 453 */ 454 static class WriterLength { 455 long written = 0; 456 StoreFileWriter writer = null; 457 } 458 459 /** 460 * Return the start keys of all of the regions in this table, 461 * as a list of ImmutableBytesWritable. 462 */ 463 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 464 boolean writeMultipleTables) 465 throws IOException { 466 467 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 468 for(RegionLocator regionLocator : regionLocators) { 469 TableName tableName = regionLocator.getName(); 470 LOG.info("Looking up current regions for table " + tableName); 471 byte[][] byteKeys = regionLocator.getStartKeys(); 472 for (byte[] byteKey : byteKeys) { 473 byte[] fullKey = byteKey; //HFileOutputFormat2 use case 474 if (writeMultipleTables) { 475 //MultiTableHFileOutputFormat use case 476 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 477 } 478 if (LOG.isDebugEnabled()) { 479 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 480 } 481 ret.add(new ImmutableBytesWritable(fullKey)); 482 } 483 } 484 return ret; 485 } 486 487 /** 488 * Write out a {@link SequenceFile} that can be read by 489 * {@link TotalOrderPartitioner} that contains the split points in startKeys. 490 */ 491 @SuppressWarnings("deprecation") 492 private static void writePartitions(Configuration conf, Path partitionsPath, 493 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 494 LOG.info("Writing partition information to " + partitionsPath); 495 if (startKeys.isEmpty()) { 496 throw new IllegalArgumentException("No regions passed"); 497 } 498 499 // We're generating a list of split points, and we don't ever 500 // have keys < the first region (which has an empty start key) 501 // so we need to remove it. Otherwise we would end up with an 502 // empty reducer with index 0 503 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 504 ImmutableBytesWritable first = sorted.first(); 505 if (writeMultipleTables) { 506 first = 507 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 508 } 509 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 510 throw new IllegalArgumentException( 511 "First region of table should have empty start key. Instead has: " 512 + Bytes.toStringBinary(first.get())); 513 } 514 sorted.remove(sorted.first()); 515 516 // Write the actual file 517 FileSystem fs = partitionsPath.getFileSystem(conf); 518 SequenceFile.Writer writer = SequenceFile.createWriter( 519 fs, conf, partitionsPath, ImmutableBytesWritable.class, 520 NullWritable.class); 521 522 try { 523 for (ImmutableBytesWritable startKey : sorted) { 524 writer.append(startKey, NullWritable.get()); 525 } 526 } finally { 527 writer.close(); 528 } 529 } 530 531 /** 532 * Configure a MapReduce Job to perform an incremental load into the given 533 * table. This 534 * <ul> 535 * <li>Inspects the table to configure a total order partitioner</li> 536 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 537 * <li>Sets the number of reduce tasks to match the current number of regions</li> 538 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 539 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 540 * PutSortReducer)</li> 541 * </ul> 542 * The user should be sure to set the map output value class to either KeyValue or Put before 543 * running this function. 544 */ 545 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 546 throws IOException { 547 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 548 } 549 550 /** 551 * Configure a MapReduce Job to perform an incremental load into the given 552 * table. This 553 * <ul> 554 * <li>Inspects the table to configure a total order partitioner</li> 555 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 556 * <li>Sets the number of reduce tasks to match the current number of regions</li> 557 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 558 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 559 * PutSortReducer)</li> 560 * </ul> 561 * The user should be sure to set the map output value class to either KeyValue or Put before 562 * running this function. 563 */ 564 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 565 RegionLocator regionLocator) throws IOException { 566 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 567 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 568 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 569 } 570 571 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 572 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 573 Configuration conf = job.getConfiguration(); 574 job.setOutputKeyClass(ImmutableBytesWritable.class); 575 job.setOutputValueClass(MapReduceExtendedCell.class); 576 job.setOutputFormatClass(cls); 577 578 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 579 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 580 } 581 boolean writeMultipleTables = false; 582 if (MultiTableHFileOutputFormat.class.equals(cls)) { 583 writeMultipleTables = true; 584 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 585 } 586 // Based on the configured map output class, set the correct reducer to properly 587 // sort the incoming values. 588 // TODO it would be nice to pick one or the other of these formats. 589 if (KeyValue.class.equals(job.getMapOutputValueClass()) 590 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { 591 job.setReducerClass(CellSortReducer.class); 592 } else if (Put.class.equals(job.getMapOutputValueClass())) { 593 job.setReducerClass(PutSortReducer.class); 594 } else if (Text.class.equals(job.getMapOutputValueClass())) { 595 job.setReducerClass(TextSortReducer.class); 596 } else { 597 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 598 } 599 600 conf.setStrings("io.serializations", conf.get("io.serializations"), 601 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 602 CellSerialization.class.getName()); 603 604 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 605 LOG.info("bulkload locality sensitive enabled"); 606 } 607 608 /* Now get the region start keys for every table required */ 609 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 610 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 611 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 612 613 for(TableInfo tableInfo : multiTableInfo) { 614 regionLocators.add(tableInfo.getRegionLocator()); 615 allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); 616 tableDescriptors.add(tableInfo.getTableDescriptor()); 617 } 618 // Record tablenames for creating writer by favored nodes, and decoding compression, 619 // block size and other attributes of columnfamily per table 620 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes 621 .toString(tableSeparator))); 622 List<ImmutableBytesWritable> startKeys = 623 getRegionStartKeys(regionLocators, writeMultipleTables); 624 // Use table's region boundaries for TOP split points. 625 LOG.info("Configuring " + startKeys.size() + " reduce partitions " + 626 "to match current region count for all tables"); 627 job.setNumReduceTasks(startKeys.size()); 628 629 configurePartitioner(job, startKeys, writeMultipleTables); 630 // Set compression algorithms based on column families 631 632 conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, 633 tableDescriptors)); 634 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, 635 tableDescriptors)); 636 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, 637 tableDescriptors)); 638 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, 639 tableDescriptors)); 640 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 641 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 642 643 TableMapReduceUtil.addDependencyJars(job); 644 TableMapReduceUtil.initCredentials(job); 645 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 646 } 647 648 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws 649 IOException { 650 Configuration conf = job.getConfiguration(); 651 652 job.setOutputKeyClass(ImmutableBytesWritable.class); 653 job.setOutputValueClass(MapReduceExtendedCell.class); 654 job.setOutputFormatClass(HFileOutputFormat2.class); 655 656 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 657 singleTableDescriptor.add(tableDescriptor); 658 659 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 660 // Set compression algorithms based on column families 661 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 662 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 663 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 664 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 665 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 666 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 667 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 668 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 669 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 670 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 671 672 TableMapReduceUtil.addDependencyJars(job); 673 TableMapReduceUtil.initCredentials(job); 674 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 675 } 676 677 /** 678 * Runs inside the task to deserialize column family to compression algorithm 679 * map from the configuration. 680 * 681 * @param conf to read the serialized values from 682 * @return a map from column family to the configured compression algorithm 683 */ 684 @VisibleForTesting 685 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 686 conf) { 687 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 688 COMPRESSION_FAMILIES_CONF_KEY); 689 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 690 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 691 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 692 compressionMap.put(e.getKey(), algorithm); 693 } 694 return compressionMap; 695 } 696 697 /** 698 * Runs inside the task to deserialize column family to bloom filter type 699 * map from the configuration. 700 * 701 * @param conf to read the serialized values from 702 * @return a map from column family to the the configured bloom filter type 703 */ 704 @VisibleForTesting 705 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 706 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 707 BLOOM_TYPE_FAMILIES_CONF_KEY); 708 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 709 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 710 BloomType bloomType = BloomType.valueOf(e.getValue()); 711 bloomTypeMap.put(e.getKey(), bloomType); 712 } 713 return bloomTypeMap; 714 } 715 716 /** 717 * Runs inside the task to deserialize column family to bloom filter param 718 * map from the configuration. 719 * 720 * @param conf to read the serialized values from 721 * @return a map from column family to the the configured bloom filter param 722 */ 723 @VisibleForTesting 724 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 725 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 726 } 727 728 729 /** 730 * Runs inside the task to deserialize column family to block size 731 * map from the configuration. 732 * 733 * @param conf to read the serialized values from 734 * @return a map from column family to the configured block size 735 */ 736 @VisibleForTesting 737 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 738 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 739 BLOCK_SIZE_FAMILIES_CONF_KEY); 740 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 741 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 742 Integer blockSize = Integer.parseInt(e.getValue()); 743 blockSizeMap.put(e.getKey(), blockSize); 744 } 745 return blockSizeMap; 746 } 747 748 /** 749 * Runs inside the task to deserialize column family to data block encoding 750 * type map from the configuration. 751 * 752 * @param conf to read the serialized values from 753 * @return a map from column family to HFileDataBlockEncoder for the 754 * configured data block type for the family 755 */ 756 @VisibleForTesting 757 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( 758 Configuration conf) { 759 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 760 DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 761 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 762 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 763 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 764 } 765 return encoderMap; 766 } 767 768 769 /** 770 * Run inside the task to deserialize column family to given conf value map. 771 * 772 * @param conf to read the serialized values from 773 * @param confName conf key to read from the configuration 774 * @return a map of column family to the given configuration value 775 */ 776 private static Map<byte[], String> createFamilyConfValueMap( 777 Configuration conf, String confName) { 778 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 779 String confVal = conf.get(confName, ""); 780 for (String familyConf : confVal.split("&")) { 781 String[] familySplit = familyConf.split("="); 782 if (familySplit.length != 2) { 783 continue; 784 } 785 try { 786 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 787 URLDecoder.decode(familySplit[1], "UTF-8")); 788 } catch (UnsupportedEncodingException e) { 789 // will not happen with UTF-8 encoding 790 throw new AssertionError(e); 791 } 792 } 793 return confValMap; 794 } 795 796 /** 797 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 798 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 799 */ 800 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean 801 writeMultipleTables) 802 throws IOException { 803 Configuration conf = job.getConfiguration(); 804 // create the partitions file 805 FileSystem fs = FileSystem.get(conf); 806 String hbaseTmpFsDir = 807 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 808 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 809 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 810 fs.makeQualified(partitionsPath); 811 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 812 fs.deleteOnExit(partitionsPath); 813 814 // configure job to use it 815 job.setPartitionerClass(TotalOrderPartitioner.class); 816 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 817 } 818 819 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 820 "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 821 @VisibleForTesting 822 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 823 List<TableDescriptor> allTables) 824 throws UnsupportedEncodingException { 825 StringBuilder attributeValue = new StringBuilder(); 826 int i = 0; 827 for (TableDescriptor tableDescriptor : allTables) { 828 if (tableDescriptor == null) { 829 // could happen with mock table instance 830 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 831 return ""; 832 } 833 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 834 if (i++ > 0) { 835 attributeValue.append('&'); 836 } 837 attributeValue.append(URLEncoder.encode( 838 Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 839 familyDescriptor.getName())), "UTF-8")); 840 attributeValue.append('='); 841 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 842 } 843 } 844 // Get rid of the last ampersand 845 return attributeValue.toString(); 846 } 847 848 /** 849 * Serialize column family to compression algorithm map to configuration. 850 * Invoked while configuring the MR job for incremental load. 851 */ 852 @VisibleForTesting 853 static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> 854 familyDescriptor.getCompressionType().getName(); 855 856 /** 857 * Serialize column family to block size map to configuration. Invoked while 858 * configuring the MR job for incremental load. 859 */ 860 @VisibleForTesting 861 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String 862 .valueOf(familyDescriptor.getBlocksize()); 863 864 /** 865 * Serialize column family to bloom type map to configuration. Invoked while 866 * configuring the MR job for incremental load. 867 */ 868 @VisibleForTesting 869 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 870 String bloomType = familyDescriptor.getBloomFilterType().toString(); 871 if (bloomType == null) { 872 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 873 } 874 return bloomType; 875 }; 876 877 /** 878 * Serialize column family to bloom param map to configuration. Invoked while 879 * configuring the MR job for incremental load. 880 */ 881 @VisibleForTesting 882 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 883 BloomType bloomType = familyDescriptor.getBloomFilterType(); 884 String bloomParam = ""; 885 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 886 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 887 } 888 return bloomParam; 889 }; 890 891 /** 892 * Serialize column family to data block encoding map to configuration. 893 * Invoked while configuring the MR job for incremental load. 894 */ 895 @VisibleForTesting 896 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 897 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 898 if (encoding == null) { 899 encoding = DataBlockEncoding.NONE; 900 } 901 return encoding.toString(); 902 }; 903 904}