001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.PrivateCellUtil; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 054import org.apache.hadoop.hbase.client.Connection; 055import org.apache.hadoop.hbase.client.ConnectionFactory; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionLocator; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.fs.HFileSystem; 061import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 062import org.apache.hadoop.hbase.io.compress.Compression; 063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 065import org.apache.hadoop.hbase.io.hfile.CacheConfig; 066import org.apache.hadoop.hbase.io.hfile.HFile; 067import org.apache.hadoop.hbase.io.hfile.HFileContext; 068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 069import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 070import org.apache.hadoop.hbase.regionserver.BloomType; 071import org.apache.hadoop.hbase.regionserver.HStore; 072import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 073import org.apache.hadoop.hbase.util.BloomFilterUtil; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.CommonFSUtils; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 078import org.apache.hadoop.io.NullWritable; 079import org.apache.hadoop.io.SequenceFile; 080import org.apache.hadoop.io.Text; 081import org.apache.hadoop.mapreduce.Job; 082import org.apache.hadoop.mapreduce.OutputCommitter; 083import org.apache.hadoop.mapreduce.OutputFormat; 084import org.apache.hadoop.mapreduce.RecordWriter; 085import org.apache.hadoop.mapreduce.TaskAttemptContext; 086import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 087import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 088import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093/** 094 * Writes HFiles. Passed Cells must arrive in order. 095 * Writes current time as the sequence id for the file. Sets the major compacted 096 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll 097 * all HFiles being written. 098 * <p> 099 * Using this class as part of a MapReduce job is best done 100 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 101 */ 102@InterfaceAudience.Public 103public class HFileOutputFormat2 104 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 105 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 106 static class TableInfo { 107 private TableDescriptor tableDesctiptor; 108 private RegionLocator regionLocator; 109 110 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 111 this.tableDesctiptor = tableDesctiptor; 112 this.regionLocator = regionLocator; 113 } 114 115 /** 116 * The modification for the returned HTD doesn't affect the inner TD. 117 * @return A clone of inner table descriptor 118 * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()} 119 * instead. 120 * @see #getTableDescriptor() 121 * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a> 122 */ 123 @Deprecated 124 public HTableDescriptor getHTableDescriptor() { 125 return new HTableDescriptor(tableDesctiptor); 126 } 127 128 public TableDescriptor getTableDescriptor() { 129 return tableDesctiptor; 130 } 131 132 public RegionLocator getRegionLocator() { 133 return regionLocator; 134 } 135 } 136 137 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 138 139 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 140 return Bytes.add(tableName, tableSeparator, suffix); 141 } 142 143 // The following constants are private since these are used by 144 // HFileOutputFormat2 to internally transfer data between job setup and 145 // reducer run using conf. 146 // These should not be changed by the client. 147 static final String COMPRESSION_FAMILIES_CONF_KEY = 148 "hbase.hfileoutputformat.families.compression"; 149 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = 150 "hbase.hfileoutputformat.families.bloomtype"; 151 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = 152 "hbase.hfileoutputformat.families.bloomparam"; 153 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = 154 "hbase.mapreduce.hfileoutputformat.blocksize"; 155 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 156 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 157 158 // This constant is public since the client can modify this when setting 159 // up their conf object and thus refer to this symbol. 160 // It is present for backwards compatibility reasons. Use it only to 161 // override the auto-detection of datablock encoding and compression. 162 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 163 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 164 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 165 "hbase.mapreduce.hfileoutputformat.compression"; 166 167 /** 168 * Keep locality while generating HFiles for bulkload. See HBASE-12596 169 */ 170 public static final String LOCALITY_SENSITIVE_CONF_KEY = 171 "hbase.bulkload.locality.sensitive.enabled"; 172 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 173 static final String OUTPUT_TABLE_NAME_CONF_KEY = 174 "hbase.mapreduce.hfileoutputformat.table.name"; 175 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 176 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 177 178 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 179 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 180 181 @Override 182 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( 183 final TaskAttemptContext context) throws IOException, InterruptedException { 184 return createRecordWriter(context, this.getOutputCommitter(context)); 185 } 186 187 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 188 return combineTableNameSuffix(tableName, family); 189 } 190 191 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 192 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 193 194 // Get the path of the temporary output file 195 final Path outputDir = ((FileOutputCommitter)committer).getWorkPath(); 196 final Configuration conf = context.getConfiguration(); 197 final boolean writeMultipleTables = 198 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 199 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 200 if (writeTableNames == null || writeTableNames.isEmpty()) { 201 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 202 } 203 final FileSystem fs = outputDir.getFileSystem(conf); 204 // These configs. are from hbase-*.xml 205 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, 206 HConstants.DEFAULT_MAX_FILE_SIZE); 207 // Invented config. Add to hbase-*.xml if other than default compression. 208 final String defaultCompressionStr = conf.get("hfile.compression", 209 Compression.Algorithm.NONE.getName()); 210 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 211 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 212 final Algorithm overriddenCompression = compressionStr != null ? 213 Compression.getCompressionAlgorithmByName(compressionStr): null; 214 final boolean compactionExclude = conf.getBoolean( 215 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 216 final Set<String> allTableNames = Arrays.stream(writeTableNames.split( 217 Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 218 219 // create a map from column family to the compression algorithm 220 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 221 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 222 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 223 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 224 225 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 226 final Map<byte[], DataBlockEncoding> datablockEncodingMap 227 = createFamilyDataBlockEncodingMap(conf); 228 final DataBlockEncoding overriddenEncoding = dataBlockEncodingStr != null ? 229 DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 230 231 return new RecordWriter<ImmutableBytesWritable, V>() { 232 // Map of families to writers and how much has been output on the writer. 233 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 234 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 235 private final long now = EnvironmentEdgeManager.currentTime(); 236 237 @Override 238 public void write(ImmutableBytesWritable row, V cell) throws IOException { 239 Cell kv = cell; 240 // null input == user explicitly wants to flush 241 if (row == null && kv == null) { 242 rollWriters(null); 243 return; 244 } 245 246 byte[] rowKey = CellUtil.cloneRow(kv); 247 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 248 byte[] family = CellUtil.cloneFamily(kv); 249 byte[] tableNameBytes = null; 250 if (writeMultipleTables) { 251 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 252 tableNameBytes = TableName.valueOf(tableNameBytes).toBytes(); 253 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 254 throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) + 255 " not expected"); 256 } 257 } else { 258 tableNameBytes = Bytes.toBytes(writeTableNames); 259 } 260 Path tableRelPath = getTableRelativePath(tableNameBytes); 261 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 262 WriterLength wl = this.writers.get(tableAndFamily); 263 264 // If this is a new column family, verify that the directory exists 265 if (wl == null) { 266 Path writerPath = null; 267 if (writeMultipleTables) { 268 writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family))); 269 } 270 else { 271 writerPath = new Path(outputDir, Bytes.toString(family)); 272 } 273 fs.mkdirs(writerPath); 274 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 275 } 276 277 // This can only happen once a row is finished though 278 if (wl != null && wl.written + length >= maxsize 279 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) { 280 rollWriters(wl); 281 } 282 283 // create a new WAL writer, if necessary 284 if (wl == null || wl.writer == null) { 285 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 286 HRegionLocation loc = null; 287 288 String tableName = Bytes.toString(tableNameBytes); 289 if (tableName != null) { 290 try (Connection connection = ConnectionFactory.createConnection(conf); 291 RegionLocator locator = 292 connection.getRegionLocator(TableName.valueOf(tableName))) { 293 loc = locator.getRegionLocation(rowKey); 294 } catch (Throwable e) { 295 LOG.warn("Something wrong locating rowkey {} in {}", 296 Bytes.toString(rowKey), tableName, e); 297 loc = null; 298 } } 299 300 if (null == loc) { 301 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 302 wl = getNewWriter(tableNameBytes, family, conf, null); 303 } else { 304 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 305 InetSocketAddress initialIsa = 306 new InetSocketAddress(loc.getHostname(), loc.getPort()); 307 if (initialIsa.isUnresolved()) { 308 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 309 wl = getNewWriter(tableNameBytes, family, conf, null); 310 } else { 311 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 312 wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa 313 }); 314 } 315 } 316 } else { 317 wl = getNewWriter(tableNameBytes, family, conf, null); 318 } 319 } 320 321 // we now have the proper WAL writer. full steam ahead 322 PrivateCellUtil.updateLatestStamp(cell, this.now); 323 wl.writer.append(kv); 324 wl.written += length; 325 326 // Copy the row so we know when a row transition. 327 this.previousRows.put(family, rowKey); 328 } 329 330 private Path getTableRelativePath(byte[] tableNameBytes) { 331 String tableName = Bytes.toString(tableNameBytes); 332 String[] tableNameParts = tableName.split(":"); 333 Path tableRelPath = new Path(tableName.split(":")[0]); 334 if (tableNameParts.length > 1) { 335 tableRelPath = new Path(tableRelPath, tableName.split(":")[1]); 336 } 337 return tableRelPath; 338 } 339 340 private void rollWriters(WriterLength writerLength) throws IOException { 341 if (writerLength != null) { 342 closeWriter(writerLength); 343 } else { 344 for (WriterLength wl : this.writers.values()) { 345 closeWriter(wl); 346 } 347 } 348 } 349 350 private void closeWriter(WriterLength wl) throws IOException { 351 if (wl.writer != null) { 352 LOG.info("Writer=" + wl.writer.getPath() + 353 ((wl.written == 0)? "": ", wrote=" + wl.written)); 354 close(wl.writer); 355 wl.writer = null; 356 } 357 wl.written = 0; 358 } 359 360 /* 361 * Create a new StoreFile.Writer. 362 * @return A WriterLength, containing a new StoreFile.Writer. 363 */ 364 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", 365 justification="Not important") 366 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 367 InetSocketAddress[] favoredNodes) throws IOException { 368 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 369 Path familydir = new Path(outputDir, Bytes.toString(family)); 370 if (writeMultipleTables) { 371 familydir = new Path(outputDir, 372 new Path(getTableRelativePath(tableName), Bytes.toString(family))); 373 } 374 WriterLength wl = new WriterLength(); 375 Algorithm compression = overriddenCompression; 376 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 377 compression = compression == null ? defaultCompression : compression; 378 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 379 bloomType = bloomType == null ? BloomType.NONE : bloomType; 380 String bloomParam = bloomParamMap.get(tableAndFamily); 381 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 382 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 383 } 384 Integer blockSize = blockSizeMap.get(tableAndFamily); 385 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 386 DataBlockEncoding encoding = overriddenEncoding; 387 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 388 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 389 HFileContextBuilder contextBuilder = new HFileContextBuilder() 390 .withCompression(compression).withChecksumType(HStore.getChecksumType(conf)) 391 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize) 392 .withColumnFamily(family).withTableName(tableName); 393 394 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 395 contextBuilder.withIncludesTags(true); 396 } 397 398 contextBuilder.withDataBlockEncoding(encoding); 399 HFileContext hFileContext = contextBuilder.build(); 400 if (null == favoredNodes) { 401 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) 402 .withOutputDir(familydir).withBloomType(bloomType) 403 .withFileContext(hFileContext).build(); 404 } else { 405 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 406 .withOutputDir(familydir).withBloomType(bloomType) 407 .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build(); 408 } 409 410 this.writers.put(tableAndFamily, wl); 411 return wl; 412 } 413 414 private void close(final StoreFileWriter w) throws IOException { 415 if (w != null) { 416 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); 417 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 418 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 419 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 420 w.appendTrackedTimestampsToMetadata(); 421 w.close(); 422 } 423 } 424 425 @Override 426 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 427 for (WriterLength wl: this.writers.values()) { 428 close(wl.writer); 429 } 430 } 431 }; 432 } 433 434 /** 435 * Configure block storage policy for CF after the directory is created. 436 */ 437 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 438 byte[] tableAndFamily, Path cfPath) { 439 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 440 return; 441 } 442 443 String policy = 444 conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 445 conf.get(STORAGE_POLICY_PROPERTY)); 446 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 447 } 448 449 /* 450 * Data structure to hold a Writer and amount of data written on it. 451 */ 452 static class WriterLength { 453 long written = 0; 454 StoreFileWriter writer = null; 455 } 456 457 /** 458 * Return the start keys of all of the regions in this table, 459 * as a list of ImmutableBytesWritable. 460 */ 461 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 462 boolean writeMultipleTables) 463 throws IOException { 464 465 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 466 for(RegionLocator regionLocator : regionLocators) { 467 TableName tableName = regionLocator.getName(); 468 LOG.info("Looking up current regions for table " + tableName); 469 byte[][] byteKeys = regionLocator.getStartKeys(); 470 for (byte[] byteKey : byteKeys) { 471 byte[] fullKey = byteKey; //HFileOutputFormat2 use case 472 if (writeMultipleTables) { 473 //MultiTableHFileOutputFormat use case 474 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 475 } 476 if (LOG.isDebugEnabled()) { 477 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 478 } 479 ret.add(new ImmutableBytesWritable(fullKey)); 480 } 481 } 482 return ret; 483 } 484 485 /** 486 * Write out a {@link SequenceFile} that can be read by 487 * {@link TotalOrderPartitioner} that contains the split points in startKeys. 488 */ 489 @SuppressWarnings("deprecation") 490 private static void writePartitions(Configuration conf, Path partitionsPath, 491 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 492 LOG.info("Writing partition information to " + partitionsPath); 493 if (startKeys.isEmpty()) { 494 throw new IllegalArgumentException("No regions passed"); 495 } 496 497 // We're generating a list of split points, and we don't ever 498 // have keys < the first region (which has an empty start key) 499 // so we need to remove it. Otherwise we would end up with an 500 // empty reducer with index 0 501 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 502 ImmutableBytesWritable first = sorted.first(); 503 if (writeMultipleTables) { 504 first = 505 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 506 } 507 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 508 throw new IllegalArgumentException( 509 "First region of table should have empty start key. Instead has: " 510 + Bytes.toStringBinary(first.get())); 511 } 512 sorted.remove(sorted.first()); 513 514 // Write the actual file 515 FileSystem fs = partitionsPath.getFileSystem(conf); 516 SequenceFile.Writer writer = SequenceFile.createWriter( 517 fs, conf, partitionsPath, ImmutableBytesWritable.class, 518 NullWritable.class); 519 520 try { 521 for (ImmutableBytesWritable startKey : sorted) { 522 writer.append(startKey, NullWritable.get()); 523 } 524 } finally { 525 writer.close(); 526 } 527 } 528 529 /** 530 * Configure a MapReduce Job to perform an incremental load into the given 531 * table. This 532 * <ul> 533 * <li>Inspects the table to configure a total order partitioner</li> 534 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 535 * <li>Sets the number of reduce tasks to match the current number of regions</li> 536 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 537 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 538 * PutSortReducer)</li> 539 * </ul> 540 * The user should be sure to set the map output value class to either KeyValue or Put before 541 * running this function. 542 */ 543 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 544 throws IOException { 545 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 546 } 547 548 /** 549 * Configure a MapReduce Job to perform an incremental load into the given 550 * table. This 551 * <ul> 552 * <li>Inspects the table to configure a total order partitioner</li> 553 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 554 * <li>Sets the number of reduce tasks to match the current number of regions</li> 555 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 556 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 557 * PutSortReducer)</li> 558 * </ul> 559 * The user should be sure to set the map output value class to either KeyValue or Put before 560 * running this function. 561 */ 562 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 563 RegionLocator regionLocator) throws IOException { 564 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 565 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 566 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 567 } 568 569 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 570 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 571 Configuration conf = job.getConfiguration(); 572 job.setOutputKeyClass(ImmutableBytesWritable.class); 573 job.setOutputValueClass(MapReduceExtendedCell.class); 574 job.setOutputFormatClass(cls); 575 576 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 577 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 578 } 579 boolean writeMultipleTables = false; 580 if (MultiTableHFileOutputFormat.class.equals(cls)) { 581 writeMultipleTables = true; 582 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 583 } 584 // Based on the configured map output class, set the correct reducer to properly 585 // sort the incoming values. 586 // TODO it would be nice to pick one or the other of these formats. 587 if (KeyValue.class.equals(job.getMapOutputValueClass()) 588 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { 589 job.setReducerClass(CellSortReducer.class); 590 } else if (Put.class.equals(job.getMapOutputValueClass())) { 591 job.setReducerClass(PutSortReducer.class); 592 } else if (Text.class.equals(job.getMapOutputValueClass())) { 593 job.setReducerClass(TextSortReducer.class); 594 } else { 595 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 596 } 597 598 conf.setStrings("io.serializations", conf.get("io.serializations"), 599 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 600 CellSerialization.class.getName()); 601 602 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 603 LOG.info("bulkload locality sensitive enabled"); 604 } 605 606 /* Now get the region start keys for every table required */ 607 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 608 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 609 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 610 611 for(TableInfo tableInfo : multiTableInfo) { 612 regionLocators.add(tableInfo.getRegionLocator()); 613 allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); 614 tableDescriptors.add(tableInfo.getTableDescriptor()); 615 } 616 // Record tablenames for creating writer by favored nodes, and decoding compression, 617 // block size and other attributes of columnfamily per table 618 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes 619 .toString(tableSeparator))); 620 List<ImmutableBytesWritable> startKeys = 621 getRegionStartKeys(regionLocators, writeMultipleTables); 622 // Use table's region boundaries for TOP split points. 623 LOG.info("Configuring " + startKeys.size() + " reduce partitions " + 624 "to match current region count for all tables"); 625 job.setNumReduceTasks(startKeys.size()); 626 627 configurePartitioner(job, startKeys, writeMultipleTables); 628 // Set compression algorithms based on column families 629 630 conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, 631 tableDescriptors)); 632 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, 633 tableDescriptors)); 634 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, 635 tableDescriptors)); 636 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, 637 tableDescriptors)); 638 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 639 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 640 641 TableMapReduceUtil.addDependencyJars(job); 642 TableMapReduceUtil.initCredentials(job); 643 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 644 } 645 646 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws 647 IOException { 648 Configuration conf = job.getConfiguration(); 649 650 job.setOutputKeyClass(ImmutableBytesWritable.class); 651 job.setOutputValueClass(MapReduceExtendedCell.class); 652 job.setOutputFormatClass(HFileOutputFormat2.class); 653 654 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 655 singleTableDescriptor.add(tableDescriptor); 656 657 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 658 // Set compression algorithms based on column families 659 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 660 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 661 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 662 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 663 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 664 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 665 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 666 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 667 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 668 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 669 670 TableMapReduceUtil.addDependencyJars(job); 671 TableMapReduceUtil.initCredentials(job); 672 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 673 } 674 675 /** 676 * Runs inside the task to deserialize column family to compression algorithm 677 * map from the configuration. 678 * 679 * @param conf to read the serialized values from 680 * @return a map from column family to the configured compression algorithm 681 */ 682 @InterfaceAudience.Private 683 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 684 conf) { 685 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 686 COMPRESSION_FAMILIES_CONF_KEY); 687 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 688 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 689 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 690 compressionMap.put(e.getKey(), algorithm); 691 } 692 return compressionMap; 693 } 694 695 /** 696 * Runs inside the task to deserialize column family to bloom filter type 697 * map from the configuration. 698 * 699 * @param conf to read the serialized values from 700 * @return a map from column family to the the configured bloom filter type 701 */ 702 @InterfaceAudience.Private 703 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 704 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 705 BLOOM_TYPE_FAMILIES_CONF_KEY); 706 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 707 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 708 BloomType bloomType = BloomType.valueOf(e.getValue()); 709 bloomTypeMap.put(e.getKey(), bloomType); 710 } 711 return bloomTypeMap; 712 } 713 714 /** 715 * Runs inside the task to deserialize column family to bloom filter param 716 * map from the configuration. 717 * 718 * @param conf to read the serialized values from 719 * @return a map from column family to the the configured bloom filter param 720 */ 721 @InterfaceAudience.Private 722 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 723 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 724 } 725 726 727 /** 728 * Runs inside the task to deserialize column family to block size 729 * map from the configuration. 730 * 731 * @param conf to read the serialized values from 732 * @return a map from column family to the configured block size 733 */ 734 @InterfaceAudience.Private 735 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 736 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 737 BLOCK_SIZE_FAMILIES_CONF_KEY); 738 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 739 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 740 Integer blockSize = Integer.parseInt(e.getValue()); 741 blockSizeMap.put(e.getKey(), blockSize); 742 } 743 return blockSizeMap; 744 } 745 746 /** 747 * Runs inside the task to deserialize column family to data block encoding 748 * type map from the configuration. 749 * 750 * @param conf to read the serialized values from 751 * @return a map from column family to HFileDataBlockEncoder for the 752 * configured data block type for the family 753 */ 754 @InterfaceAudience.Private 755 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( 756 Configuration conf) { 757 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 758 DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 759 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 760 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 761 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 762 } 763 return encoderMap; 764 } 765 766 767 /** 768 * Run inside the task to deserialize column family to given conf value map. 769 * 770 * @param conf to read the serialized values from 771 * @param confName conf key to read from the configuration 772 * @return a map of column family to the given configuration value 773 */ 774 private static Map<byte[], String> createFamilyConfValueMap( 775 Configuration conf, String confName) { 776 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 777 String confVal = conf.get(confName, ""); 778 for (String familyConf : confVal.split("&")) { 779 String[] familySplit = familyConf.split("="); 780 if (familySplit.length != 2) { 781 continue; 782 } 783 try { 784 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 785 URLDecoder.decode(familySplit[1], "UTF-8")); 786 } catch (UnsupportedEncodingException e) { 787 // will not happen with UTF-8 encoding 788 throw new AssertionError(e); 789 } 790 } 791 return confValMap; 792 } 793 794 /** 795 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 796 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 797 */ 798 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean 799 writeMultipleTables) 800 throws IOException { 801 Configuration conf = job.getConfiguration(); 802 // create the partitions file 803 FileSystem fs = FileSystem.get(conf); 804 String hbaseTmpFsDir = 805 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 806 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 807 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 808 fs.makeQualified(partitionsPath); 809 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 810 fs.deleteOnExit(partitionsPath); 811 812 // configure job to use it 813 job.setPartitionerClass(TotalOrderPartitioner.class); 814 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 815 } 816 817 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 818 "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 819 @InterfaceAudience.Private 820 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 821 List<TableDescriptor> allTables) 822 throws UnsupportedEncodingException { 823 StringBuilder attributeValue = new StringBuilder(); 824 int i = 0; 825 for (TableDescriptor tableDescriptor : allTables) { 826 if (tableDescriptor == null) { 827 // could happen with mock table instance 828 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 829 return ""; 830 } 831 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 832 if (i++ > 0) { 833 attributeValue.append('&'); 834 } 835 attributeValue.append(URLEncoder.encode( 836 Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 837 familyDescriptor.getName())), "UTF-8")); 838 attributeValue.append('='); 839 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 840 } 841 } 842 // Get rid of the last ampersand 843 return attributeValue.toString(); 844 } 845 846 /** 847 * Serialize column family to compression algorithm map to configuration. 848 * Invoked while configuring the MR job for incremental load. 849 */ 850 @InterfaceAudience.Private 851 static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> 852 familyDescriptor.getCompressionType().getName(); 853 854 /** 855 * Serialize column family to block size map to configuration. Invoked while 856 * configuring the MR job for incremental load. 857 */ 858 @InterfaceAudience.Private 859 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String 860 .valueOf(familyDescriptor.getBlocksize()); 861 862 /** 863 * Serialize column family to bloom type map to configuration. Invoked while 864 * configuring the MR job for incremental load. 865 */ 866 @InterfaceAudience.Private 867 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 868 String bloomType = familyDescriptor.getBloomFilterType().toString(); 869 if (bloomType == null) { 870 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 871 } 872 return bloomType; 873 }; 874 875 /** 876 * Serialize column family to bloom param map to configuration. Invoked while 877 * configuring the MR job for incremental load. 878 */ 879 @InterfaceAudience.Private 880 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 881 BloomType bloomType = familyDescriptor.getBloomFilterType(); 882 String bloomParam = ""; 883 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 884 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 885 } 886 return bloomParam; 887 }; 888 889 /** 890 * Serialize column family to data block encoding map to configuration. 891 * Invoked while configuring the MR job for incremental load. 892 */ 893 @InterfaceAudience.Private 894 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 895 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 896 if (encoding == null) { 897 encoding = DataBlockEncoding.NONE; 898 } 899 return encoding.toString(); 900 }; 901 902}