001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.Mockito.verify; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.security.PrivilegedAction; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashMap; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Random; 040import java.util.Set; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.CopyOnWriteArrayList; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileStatus; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.LocatedFileStatus; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.RemoteIterator; 055import org.apache.hadoop.hbase.ArrayBackedTag; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellUtil; 058import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 059import org.apache.hadoop.hbase.ExtendedCell; 060import org.apache.hadoop.hbase.HBaseClassTestRule; 061import org.apache.hadoop.hbase.HBaseConfiguration; 062import org.apache.hadoop.hbase.HBaseTestingUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.HDFSBlocksDistribution; 065import org.apache.hadoop.hbase.HadoopShims; 066import org.apache.hadoop.hbase.KeyValue; 067import org.apache.hadoop.hbase.PerformanceEvaluation; 068import org.apache.hadoop.hbase.PrivateCellUtil; 069import org.apache.hadoop.hbase.ServerName; 070import org.apache.hadoop.hbase.StartTestingClusterOption; 071import org.apache.hadoop.hbase.TableName; 072import org.apache.hadoop.hbase.Tag; 073import org.apache.hadoop.hbase.TagType; 074import org.apache.hadoop.hbase.client.Admin; 075import org.apache.hadoop.hbase.client.AsyncConnection; 076import org.apache.hadoop.hbase.client.BufferedMutator; 077import org.apache.hadoop.hbase.client.BufferedMutatorParams; 078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 079import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 080import org.apache.hadoop.hbase.client.Connection; 081import org.apache.hadoop.hbase.client.ConnectionFactory; 082import org.apache.hadoop.hbase.client.ConnectionRegistry; 083import org.apache.hadoop.hbase.client.ConnectionUtils; 084import org.apache.hadoop.hbase.client.Hbck; 085import org.apache.hadoop.hbase.client.Put; 086import org.apache.hadoop.hbase.client.RegionLocator; 087import org.apache.hadoop.hbase.client.Result; 088import org.apache.hadoop.hbase.client.ResultScanner; 089import org.apache.hadoop.hbase.client.Scan; 090import org.apache.hadoop.hbase.client.Table; 091import org.apache.hadoop.hbase.client.TableBuilder; 092import org.apache.hadoop.hbase.client.TableDescriptor; 093import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 094import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 095import org.apache.hadoop.hbase.io.compress.Compression; 096import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 097import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 098import org.apache.hadoop.hbase.io.hfile.CacheConfig; 099import org.apache.hadoop.hbase.io.hfile.HFile; 100import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 101import org.apache.hadoop.hbase.io.hfile.HFileScanner; 102import org.apache.hadoop.hbase.regionserver.BloomType; 103import org.apache.hadoop.hbase.regionserver.HRegion; 104import org.apache.hadoop.hbase.regionserver.HStore; 105import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 106import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 107import org.apache.hadoop.hbase.security.User; 108import org.apache.hadoop.hbase.testclassification.LargeTests; 109import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 110import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 111import org.apache.hadoop.hbase.util.Bytes; 112import org.apache.hadoop.hbase.util.CommonFSUtils; 113import org.apache.hadoop.hbase.util.FSUtils; 114import org.apache.hadoop.hbase.util.FutureUtils; 115import org.apache.hadoop.hbase.util.ReflectionUtils; 116import org.apache.hadoop.hdfs.DistributedFileSystem; 117import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 118import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 119import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 120import org.apache.hadoop.io.NullWritable; 121import org.apache.hadoop.mapreduce.Job; 122import org.apache.hadoop.mapreduce.Mapper; 123import org.apache.hadoop.mapreduce.RecordWriter; 124import org.apache.hadoop.mapreduce.TaskAttemptContext; 125import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 126import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 127import org.apache.hadoop.security.UserGroupInformation; 128import org.junit.Assert; 129import org.junit.ClassRule; 130import org.junit.Ignore; 131import org.junit.Test; 132import org.junit.experimental.categories.Category; 133import org.mockito.Mockito; 134import org.slf4j.Logger; 135import org.slf4j.LoggerFactory; 136 137/** 138 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 139 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 140 * values like those of {@link PerformanceEvaluation}. 141 */ 142@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 143public class TestHFileOutputFormat2 { 144 145 @ClassRule 146 public static final HBaseClassTestRule CLASS_RULE = 147 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 148 149 private final static int ROWSPERSPLIT = 1024; 150 151 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 152 private static final byte[][] FAMILIES = 153 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 154 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 155 .map(TableName::valueOf).toArray(TableName[]::new); 156 157 private HBaseTestingUtil util = new HBaseTestingUtil(); 158 159 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 160 161 /** 162 * Simple mapper that makes KeyValue output. 163 */ 164 static class RandomKVGeneratingMapper 165 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 166 167 private int keyLength; 168 private static final int KEYLEN_DEFAULT = 10; 169 private static final String KEYLEN_CONF = "randomkv.key.length"; 170 171 private int valLength; 172 private static final int VALLEN_DEFAULT = 10; 173 private static final String VALLEN_CONF = "randomkv.val.length"; 174 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 175 private boolean multiTableMapper = false; 176 private TableName[] tables = null; 177 178 @Override 179 protected void setup(Context context) throws IOException, InterruptedException { 180 super.setup(context); 181 182 Configuration conf = context.getConfiguration(); 183 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 184 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 185 multiTableMapper = 186 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 187 if (multiTableMapper) { 188 tables = TABLE_NAMES; 189 } else { 190 tables = new TableName[] { TABLE_NAMES[0] }; 191 } 192 } 193 194 @Override 195 protected void map(NullWritable n1, NullWritable n2, 196 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 197 throws java.io.IOException, InterruptedException { 198 199 byte keyBytes[] = new byte[keyLength]; 200 byte valBytes[] = new byte[valLength]; 201 202 int taskId = context.getTaskAttemptID().getTaskID().getId(); 203 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 204 byte[] key; 205 for (int j = 0; j < tables.length; ++j) { 206 for (int i = 0; i < ROWSPERSPLIT; i++) { 207 Bytes.random(keyBytes); 208 // Ensure that unique tasks generate unique keys 209 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 210 Bytes.random(valBytes); 211 key = keyBytes; 212 if (multiTableMapper) { 213 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 214 } 215 216 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 217 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 218 context.write(new ImmutableBytesWritable(key), kv); 219 } 220 } 221 } 222 } 223 } 224 225 /** 226 * Simple mapper that makes Put output. 227 */ 228 static class RandomPutGeneratingMapper 229 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 230 231 private int keyLength; 232 private static final int KEYLEN_DEFAULT = 10; 233 private static final String KEYLEN_CONF = "randomkv.key.length"; 234 235 private int valLength; 236 private static final int VALLEN_DEFAULT = 10; 237 private static final String VALLEN_CONF = "randomkv.val.length"; 238 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 239 private boolean multiTableMapper = false; 240 private TableName[] tables = null; 241 242 @Override 243 protected void setup(Context context) throws IOException, InterruptedException { 244 super.setup(context); 245 246 Configuration conf = context.getConfiguration(); 247 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 248 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 249 multiTableMapper = 250 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 251 if (multiTableMapper) { 252 tables = TABLE_NAMES; 253 } else { 254 tables = new TableName[] { TABLE_NAMES[0] }; 255 } 256 } 257 258 @Override 259 protected void map(NullWritable n1, NullWritable n2, 260 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 261 throws java.io.IOException, InterruptedException { 262 263 byte keyBytes[] = new byte[keyLength]; 264 byte valBytes[] = new byte[valLength]; 265 266 int taskId = context.getTaskAttemptID().getTaskID().getId(); 267 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 268 269 byte[] key; 270 for (int j = 0; j < tables.length; ++j) { 271 for (int i = 0; i < ROWSPERSPLIT; i++) { 272 Bytes.random(keyBytes); 273 // Ensure that unique tasks generate unique keys 274 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 275 Bytes.random(valBytes); 276 key = keyBytes; 277 if (multiTableMapper) { 278 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 279 } 280 281 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 282 Put p = new Put(keyBytes); 283 p.addColumn(family, QUALIFIER, valBytes); 284 // set TTL to very low so that the scan does not return any value 285 p.setTTL(1l); 286 context.write(new ImmutableBytesWritable(key), p); 287 } 288 } 289 } 290 } 291 } 292 293 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 294 if (putSortReducer) { 295 job.setInputFormatClass(NMapInputFormat.class); 296 job.setMapperClass(RandomPutGeneratingMapper.class); 297 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 298 job.setMapOutputValueClass(Put.class); 299 } else { 300 job.setInputFormatClass(NMapInputFormat.class); 301 job.setMapperClass(RandomKVGeneratingMapper.class); 302 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 303 job.setMapOutputValueClass(KeyValue.class); 304 } 305 } 306 307 /** 308 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 309 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 310 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 311 */ 312 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 313 @Test 314 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 315 Configuration conf = new Configuration(this.util.getConfiguration()); 316 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 317 TaskAttemptContext context = null; 318 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 319 try { 320 Job job = new Job(conf); 321 FileOutputFormat.setOutputPath(job, dir); 322 context = createTestTaskAttemptContext(job); 323 HFileOutputFormat2 hof = new HFileOutputFormat2(); 324 writer = hof.getRecordWriter(context); 325 final byte[] b = Bytes.toBytes("b"); 326 327 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 328 // changed by call to write. Check all in kv is same but ts. 329 KeyValue kv = new KeyValue(b, b, b); 330 KeyValue original = kv.clone(); 331 writer.write(new ImmutableBytesWritable(), kv); 332 assertFalse(original.equals(kv)); 333 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 334 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 335 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 336 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 337 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 338 339 // Test 2. Now test passing a kv that has explicit ts. It should not be 340 // changed by call to record write. 341 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 342 original = kv.clone(); 343 writer.write(new ImmutableBytesWritable(), kv); 344 assertTrue(original.equals(kv)); 345 } finally { 346 if (writer != null && context != null) writer.close(context); 347 dir.getFileSystem(conf).delete(dir, true); 348 } 349 } 350 351 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 352 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 353 TaskAttemptContext context = 354 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 355 return context; 356 } 357 358 /* 359 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 360 * time-restricted scans. 361 */ 362 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 363 @Test 364 public void test_TIMERANGE() throws Exception { 365 Configuration conf = new Configuration(this.util.getConfiguration()); 366 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 367 TaskAttemptContext context = null; 368 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 369 LOG.info("Timerange dir writing to dir: " + dir); 370 try { 371 // build a record writer using HFileOutputFormat2 372 Job job = new Job(conf); 373 FileOutputFormat.setOutputPath(job, dir); 374 context = createTestTaskAttemptContext(job); 375 HFileOutputFormat2 hof = new HFileOutputFormat2(); 376 writer = hof.getRecordWriter(context); 377 378 // Pass two key values with explicit times stamps 379 final byte[] b = Bytes.toBytes("b"); 380 381 // value 1 with timestamp 2000 382 KeyValue kv = new KeyValue(b, b, b, 2000, b); 383 KeyValue original = kv.clone(); 384 writer.write(new ImmutableBytesWritable(), kv); 385 assertEquals(original, kv); 386 387 // value 2 with timestamp 1000 388 kv = new KeyValue(b, b, b, 1000, b); 389 original = kv.clone(); 390 writer.write(new ImmutableBytesWritable(), kv); 391 assertEquals(original, kv); 392 393 // verify that the file has the proper FileInfo. 394 writer.close(context); 395 396 // the generated file lives 1 directory down from the attempt directory 397 // and is the only file, e.g. 398 // _attempt__0000_r_000000_0/b/1979617994050536795 399 FileSystem fs = FileSystem.get(conf); 400 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 401 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 402 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 403 404 // open as HFile Reader and pull out TIMERANGE FileInfo. 405 HFile.Reader rd = 406 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 407 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 408 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 409 assertNotNull(range); 410 411 // unmarshall and check values. 412 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 413 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 414 assertEquals(1000, timeRangeTracker.getMin()); 415 assertEquals(2000, timeRangeTracker.getMax()); 416 rd.close(); 417 } finally { 418 if (writer != null && context != null) writer.close(context); 419 dir.getFileSystem(conf).delete(dir, true); 420 } 421 } 422 423 /** 424 * Run small MR job. 425 */ 426 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 427 @Test 428 public void testWritingPEData() throws Exception { 429 Configuration conf = util.getConfiguration(); 430 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 431 FileSystem fs = testDir.getFileSystem(conf); 432 433 // Set down this value or we OOME in eclipse. 434 conf.setInt("mapreduce.task.io.sort.mb", 20); 435 // Write a few files. 436 long hregionMaxFilesize = 10 * 1024; 437 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 438 439 Job job = new Job(conf, "testWritingPEData"); 440 setupRandomGeneratorMapper(job, false); 441 // This partitioner doesn't work well for number keys but using it anyways 442 // just to demonstrate how to configure it. 443 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 444 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 445 446 Arrays.fill(startKey, (byte) 0); 447 Arrays.fill(endKey, (byte) 0xff); 448 449 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 450 // Set start and end rows for partitioner. 451 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 452 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 453 job.setReducerClass(CellSortReducer.class); 454 job.setOutputFormatClass(HFileOutputFormat2.class); 455 job.setNumReduceTasks(4); 456 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 457 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 458 CellSerialization.class.getName()); 459 460 FileOutputFormat.setOutputPath(job, testDir); 461 assertTrue(job.waitForCompletion(false)); 462 FileStatus[] files = fs.listStatus(testDir); 463 assertTrue(files.length > 0); 464 465 // check output file num and size. 466 for (byte[] family : FAMILIES) { 467 long kvCount = 0; 468 RemoteIterator<LocatedFileStatus> iterator = 469 fs.listFiles(testDir.suffix("/" + new String(family)), true); 470 while (iterator.hasNext()) { 471 LocatedFileStatus keyFileStatus = iterator.next(); 472 HFile.Reader reader = 473 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 474 HFileScanner scanner = reader.getScanner(conf, false, false, false); 475 476 kvCount += reader.getEntries(); 477 scanner.seekTo(); 478 long perKVSize = scanner.getCell().getSerializedSize(); 479 assertTrue("Data size of each file should not be too large.", 480 perKVSize * reader.getEntries() <= hregionMaxFilesize); 481 } 482 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 483 } 484 } 485 486 /** 487 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 488 */ 489 @Test 490 public void test_WritingTagData() throws Exception { 491 Configuration conf = new Configuration(this.util.getConfiguration()); 492 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 493 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 494 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 495 TaskAttemptContext context = null; 496 Path dir = util.getDataTestDir("WritingTagData"); 497 try { 498 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 499 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 500 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 501 Job job = new Job(conf); 502 FileOutputFormat.setOutputPath(job, dir); 503 context = createTestTaskAttemptContext(job); 504 HFileOutputFormat2 hof = new HFileOutputFormat2(); 505 writer = hof.getRecordWriter(context); 506 final byte[] b = Bytes.toBytes("b"); 507 508 List<Tag> tags = new ArrayList<>(); 509 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 510 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 511 writer.write(new ImmutableBytesWritable(), kv); 512 writer.close(context); 513 writer = null; 514 FileSystem fs = dir.getFileSystem(conf); 515 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 516 while (iterator.hasNext()) { 517 LocatedFileStatus keyFileStatus = iterator.next(); 518 HFile.Reader reader = 519 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 520 HFileScanner scanner = reader.getScanner(conf, false, false, false); 521 scanner.seekTo(); 522 ExtendedCell cell = scanner.getCell(); 523 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 524 assertTrue(tagsFromCell.size() > 0); 525 for (Tag tag : tagsFromCell) { 526 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 527 } 528 } 529 } finally { 530 if (writer != null && context != null) writer.close(context); 531 dir.getFileSystem(conf).delete(dir, true); 532 } 533 } 534 535 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 536 @Test 537 public void testJobConfiguration() throws Exception { 538 Configuration conf = new Configuration(this.util.getConfiguration()); 539 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 540 util.getDataTestDir("testJobConfiguration").toString()); 541 Job job = new Job(conf); 542 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 543 Table table = Mockito.mock(Table.class); 544 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 545 setupMockStartKeys(regionLocator); 546 setupMockTableName(regionLocator); 547 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 548 assertEquals(job.getNumReduceTasks(), 4); 549 } 550 551 private byte[][] generateRandomStartKeys(int numKeys) { 552 Random random = ThreadLocalRandom.current(); 553 byte[][] ret = new byte[numKeys][]; 554 // first region start key is always empty 555 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 556 for (int i = 1; i < numKeys; i++) { 557 ret[i] = 558 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 559 } 560 return ret; 561 } 562 563 private byte[][] generateRandomSplitKeys(int numKeys) { 564 Random random = ThreadLocalRandom.current(); 565 byte[][] ret = new byte[numKeys][]; 566 for (int i = 0; i < numKeys; i++) { 567 ret[i] = 568 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 569 } 570 return ret; 571 } 572 573 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 574 @Test 575 public void testMRIncrementalLoad() throws Exception { 576 LOG.info("\nStarting test testMRIncrementalLoad\n"); 577 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 578 } 579 580 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 581 @Test 582 public void testMRIncrementalLoadWithSplit() throws Exception { 583 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 584 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 585 } 586 587 /** 588 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 589 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 590 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 591 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 592 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 593 * locality features more easily. 594 */ 595 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 596 @Test 597 public void testMRIncrementalLoadWithLocality() throws Exception { 598 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 599 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 600 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 601 } 602 603 // @Ignore("Wahtevs") 604 @Test 605 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 606 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 607 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 608 } 609 610 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 611 boolean putSortReducer, String tableStr) throws Exception { 612 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 613 Arrays.asList(tableStr)); 614 } 615 616 @Test 617 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 618 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 619 doIncrementalLoadTest(false, false, true, 620 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 621 } 622 623 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 624 boolean putSortReducer, List<String> tableStr) throws Exception { 625 util = new HBaseTestingUtil(); 626 Configuration conf = util.getConfiguration(); 627 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 628 int hostCount = 1; 629 int regionNum = 5; 630 if (shouldKeepLocality) { 631 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 632 // explicit hostnames parameter just like MiniDFSCluster does. 633 hostCount = 3; 634 regionNum = 20; 635 } 636 637 String[] hostnames = new String[hostCount]; 638 for (int i = 0; i < hostCount; ++i) { 639 hostnames[i] = "datanode_" + i; 640 } 641 StartTestingClusterOption option = StartTestingClusterOption.builder() 642 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 643 util.startMiniCluster(option); 644 645 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 646 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 647 boolean writeMultipleTables = tableStr.size() > 1; 648 for (String tableStrSingle : tableStr) { 649 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 650 TableName tableName = TableName.valueOf(tableStrSingle); 651 Table table = util.createTable(tableName, FAMILIES, splitKeys); 652 653 RegionLocator r = util.getConnection().getRegionLocator(tableName); 654 assertEquals("Should start with empty table", 0, util.countRows(table)); 655 int numRegions = r.getStartKeys().length; 656 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 657 658 allTables.put(tableStrSingle, table); 659 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r)); 660 } 661 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 662 // Generate the bulk load files 663 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 664 if (writeMultipleTables) { 665 testDir = new Path(testDir, "default"); 666 } 667 668 for (Table tableSingle : allTables.values()) { 669 // This doesn't write into the table, just makes files 670 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 671 } 672 int numTableDirs = 0; 673 FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir); 674 for (FileStatus tf : fss) { 675 Path tablePath = testDir; 676 if (writeMultipleTables) { 677 if (allTables.containsKey(tf.getPath().getName())) { 678 ++numTableDirs; 679 tablePath = tf.getPath(); 680 } else { 681 continue; 682 } 683 } 684 685 // Make sure that a directory was created for every CF 686 int dir = 0; 687 fss = tablePath.getFileSystem(conf).listStatus(tablePath); 688 for (FileStatus f : fss) { 689 for (byte[] family : FAMILIES) { 690 if (Bytes.toString(family).equals(f.getPath().getName())) { 691 ++dir; 692 } 693 } 694 } 695 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 696 } 697 if (writeMultipleTables) { 698 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 699 } 700 701 Admin admin = util.getConnection().getAdmin(); 702 try { 703 // handle the split case 704 if (shouldChangeRegions) { 705 Table chosenTable = allTables.values().iterator().next(); 706 // Choose a semi-random table if multiple tables are available 707 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 708 admin.disableTable(chosenTable.getName()); 709 util.waitUntilNoRegionsInTransition(); 710 711 util.deleteTable(chosenTable.getName()); 712 byte[][] newSplitKeys = generateRandomSplitKeys(14); 713 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 714 715 while ( 716 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 717 .size() != 15 || !admin.isTableAvailable(table.getName()) 718 ) { 719 Thread.sleep(200); 720 LOG.info("Waiting for new region assignment to happen"); 721 } 722 } 723 724 // Perform the actual load 725 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 726 Path tableDir = testDir; 727 String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString(); 728 LOG.info("Running BulkLoadHFiles on table" + tableNameStr); 729 if (writeMultipleTables) { 730 tableDir = new Path(testDir, tableNameStr); 731 } 732 Table currentTable = allTables.get(tableNameStr); 733 TableName currentTableName = currentTable.getName(); 734 BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir); 735 736 // Ensure data shows up 737 int expectedRows = 0; 738 if (putSortReducer) { 739 // no rows should be extracted 740 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 741 util.countRows(currentTable)); 742 } else { 743 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 744 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 745 util.countRows(currentTable)); 746 Scan scan = new Scan(); 747 ResultScanner results = currentTable.getScanner(scan); 748 for (Result res : results) { 749 assertEquals(FAMILIES.length, res.rawCells().length); 750 Cell first = res.rawCells()[0]; 751 for (Cell kv : res.rawCells()) { 752 assertTrue(CellUtil.matchingRows(first, kv)); 753 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 754 } 755 } 756 results.close(); 757 } 758 String tableDigestBefore = util.checksumRows(currentTable); 759 // Check region locality 760 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 761 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 762 hbd.add(region.getHDFSBlocksDistribution()); 763 } 764 for (String hostname : hostnames) { 765 float locality = hbd.getBlockLocalityIndex(hostname); 766 LOG.info("locality of [" + hostname + "]: " + locality); 767 assertEquals(100, (int) (locality * 100)); 768 } 769 770 // Cause regions to reopen 771 admin.disableTable(currentTableName); 772 while (!admin.isTableDisabled(currentTableName)) { 773 Thread.sleep(200); 774 LOG.info("Waiting for table to disable"); 775 } 776 admin.enableTable(currentTableName); 777 util.waitTableAvailable(currentTableName); 778 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 779 util.checksumRows(currentTable)); 780 } 781 } finally { 782 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 783 tableInfoSingle.getRegionLocator().close(); 784 } 785 for (Entry<String, Table> singleTable : allTables.entrySet()) { 786 singleTable.getValue().close(); 787 util.deleteTable(singleTable.getValue().getName()); 788 } 789 testDir.getFileSystem(conf).delete(testDir, true); 790 util.shutdownMiniCluster(); 791 } 792 } 793 794 private void runIncrementalPELoad(Configuration conf, 795 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 796 throws IOException, InterruptedException, ClassNotFoundException { 797 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 798 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 799 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 800 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 801 CellSerialization.class.getName()); 802 setupRandomGeneratorMapper(job, putSortReducer); 803 if (tableInfo.size() > 1) { 804 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 805 int sum = 0; 806 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 807 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 808 } 809 assertEquals(sum, job.getNumReduceTasks()); 810 } else { 811 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 812 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(), 813 regionLocator); 814 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 815 } 816 817 FileOutputFormat.setOutputPath(job, outDir); 818 819 assertFalse(util.getTestFileSystem().exists(outDir)); 820 821 assertTrue(job.waitForCompletion(true)); 822 } 823 824 /** 825 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 826 * family compression map is correctly serialized into and deserialized from configuration 827 */ 828 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 829 @Test 830 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 831 for (int numCfs = 0; numCfs <= 3; numCfs++) { 832 Configuration conf = new Configuration(this.util.getConfiguration()); 833 Map<String, Compression.Algorithm> familyToCompression = 834 getMockColumnFamiliesForCompression(numCfs); 835 Table table = Mockito.mock(Table.class); 836 setupMockColumnFamiliesForCompression(table, familyToCompression); 837 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 838 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 839 Arrays.asList(table.getDescriptor()))); 840 841 // read back family specific compression setting from the configuration 842 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 843 HFileOutputFormat2.createFamilyCompressionMap(conf); 844 845 // test that we have a value for all column families that matches with the 846 // used mock values 847 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 848 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 849 entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey()))); 850 } 851 } 852 } 853 854 private void setupMockColumnFamiliesForCompression(Table table, 855 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 856 857 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 858 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 859 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 860 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 861 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 862 863 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 864 } 865 Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor(); 866 } 867 868 /** 869 * @return a map from column family names to compression algorithms for testing column family 870 * compression. Column family names have special characters 871 */ 872 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 873 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 874 // use column family names having special characters 875 if (numCfs-- > 0) { 876 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 877 } 878 if (numCfs-- > 0) { 879 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 880 } 881 if (numCfs-- > 0) { 882 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 883 } 884 if (numCfs-- > 0) { 885 familyToCompression.put("Family3", Compression.Algorithm.NONE); 886 } 887 return familyToCompression; 888 } 889 890 /** 891 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 892 * family bloom type map is correctly serialized into and deserialized from configuration 893 */ 894 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 895 @Test 896 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 897 for (int numCfs = 0; numCfs <= 2; numCfs++) { 898 Configuration conf = new Configuration(this.util.getConfiguration()); 899 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 900 Table table = Mockito.mock(Table.class); 901 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 902 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 903 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 904 Arrays.asList(table.getDescriptor()))); 905 906 // read back family specific data block encoding settings from the 907 // configuration 908 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 909 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 910 911 // test that we have a value for all column families that matches with the 912 // used mock values 913 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 914 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 915 entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey()))); 916 } 917 } 918 } 919 920 private void setupMockColumnFamiliesForBloomType(Table table, 921 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 922 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 923 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 924 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 925 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 926 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 927 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 928 } 929 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 930 } 931 932 /** 933 * @return a map from column family names to compression algorithms for testing column family 934 * compression. Column family names have special characters 935 */ 936 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 937 Map<String, BloomType> familyToBloomType = new HashMap<>(); 938 // use column family names having special characters 939 if (numCfs-- > 0) { 940 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 941 } 942 if (numCfs-- > 0) { 943 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 944 } 945 if (numCfs-- > 0) { 946 familyToBloomType.put("Family3", BloomType.NONE); 947 } 948 return familyToBloomType; 949 } 950 951 /** 952 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 953 * family block size map is correctly serialized into and deserialized from configuration 954 */ 955 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 956 @Test 957 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 958 for (int numCfs = 0; numCfs <= 3; numCfs++) { 959 Configuration conf = new Configuration(this.util.getConfiguration()); 960 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 961 Table table = Mockito.mock(Table.class); 962 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 963 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 964 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 965 Arrays.asList(table.getDescriptor()))); 966 967 // read back family specific data block encoding settings from the 968 // configuration 969 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 970 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 971 972 // test that we have a value for all column families that matches with the 973 // used mock values 974 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 975 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 976 entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey()))); 977 } 978 } 979 } 980 981 private void setupMockColumnFamiliesForBlockSize(Table table, 982 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 983 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 984 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 985 ColumnFamilyDescriptor columnFamilyDescriptor = 986 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 987 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 988 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 989 } 990 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 991 } 992 993 /** 994 * @return a map from column family names to compression algorithms for testing column family 995 * compression. Column family names have special characters 996 */ 997 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 998 Map<String, Integer> familyToBlockSize = new HashMap<>(); 999 // use column family names having special characters 1000 if (numCfs-- > 0) { 1001 familyToBlockSize.put("Family1!@#!@#&", 1234); 1002 } 1003 if (numCfs-- > 0) { 1004 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1005 } 1006 if (numCfs-- > 0) { 1007 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1008 } 1009 if (numCfs-- > 0) { 1010 familyToBlockSize.put("Family3", 0); 1011 } 1012 return familyToBlockSize; 1013 } 1014 1015 /** 1016 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 1017 * the family data block encoding map is correctly serialized into and deserialized from 1018 * configuration 1019 */ 1020 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1021 @Test 1022 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1023 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1024 Configuration conf = new Configuration(this.util.getConfiguration()); 1025 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1026 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1027 Table table = Mockito.mock(Table.class); 1028 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 1029 TableDescriptor tableDescriptor = table.getDescriptor(); 1030 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1031 HFileOutputFormat2.serializeColumnFamilyAttribute( 1032 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 1033 1034 // read back family specific data block encoding settings from the 1035 // configuration 1036 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1037 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1038 1039 // test that we have a value for all column families that matches with the 1040 // used mock values 1041 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1042 assertEquals( 1043 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1044 entry.getValue(), 1045 retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey()))); 1046 } 1047 } 1048 } 1049 1050 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1051 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1052 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1053 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1054 ColumnFamilyDescriptor columnFamilyDescriptor = 1055 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 1056 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0) 1057 .build(); 1058 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 1059 } 1060 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 1061 } 1062 1063 /** 1064 * @return a map from column family names to compression algorithms for testing column family 1065 * compression. Column family names have special characters 1066 */ 1067 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1068 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1069 // use column family names having special characters 1070 if (numCfs-- > 0) { 1071 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1072 } 1073 if (numCfs-- > 0) { 1074 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1075 } 1076 if (numCfs-- > 0) { 1077 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1078 } 1079 if (numCfs-- > 0) { 1080 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1081 } 1082 return familyToDataBlockEncoding; 1083 } 1084 1085 private void setupMockStartKeys(RegionLocator table) throws IOException { 1086 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1087 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1088 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1089 } 1090 1091 private void setupMockTableName(RegionLocator table) throws IOException { 1092 TableName mockTableName = TableName.valueOf("mock_table"); 1093 Mockito.doReturn(mockTableName).when(table).getName(); 1094 } 1095 1096 /** 1097 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1098 * from the column family descriptor 1099 */ 1100 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1101 @Test 1102 public void testColumnFamilySettings() throws Exception { 1103 Configuration conf = new Configuration(this.util.getConfiguration()); 1104 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1105 TaskAttemptContext context = null; 1106 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1107 1108 // Setup table descriptor 1109 Table table = Mockito.mock(Table.class); 1110 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1111 TableDescriptorBuilder tableDescriptorBuilder = 1112 TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1113 1114 Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor(); 1115 for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) { 1116 tableDescriptorBuilder.setColumnFamily(hcd); 1117 } 1118 1119 // set up the table to return some mock keys 1120 setupMockStartKeys(regionLocator); 1121 1122 try { 1123 // partial map red setup to get an operational writer for testing 1124 // We turn off the sequence file compression, because DefaultCodec 1125 // pollutes the GZip codec pool with an incompatible compressor. 1126 conf.set("io.seqfile.compression.type", "NONE"); 1127 conf.set("hbase.fs.tmp.dir", dir.toString()); 1128 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1129 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1130 1131 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1132 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1133 setupRandomGeneratorMapper(job, false); 1134 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 1135 FileOutputFormat.setOutputPath(job, dir); 1136 context = createTestTaskAttemptContext(job); 1137 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1138 writer = hof.getRecordWriter(context); 1139 1140 // write out random rows 1141 writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(), 1142 ROWSPERSPLIT); 1143 writer.close(context); 1144 1145 // Make sure that a directory was created for every CF 1146 FileSystem fs = dir.getFileSystem(conf); 1147 1148 // commit so that the filesystem has one directory per column family 1149 hof.getOutputCommitter(context).commitTask(context); 1150 hof.getOutputCommitter(context).commitJob(context); 1151 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1152 assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length); 1153 for (FileStatus f : families) { 1154 String familyStr = f.getPath().getName(); 1155 ColumnFamilyDescriptor hcd = 1156 tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr)); 1157 // verify that the compression on this file matches the configured 1158 // compression 1159 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1160 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1161 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1162 1163 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1164 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1165 assertEquals( 1166 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1167 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1168 assertEquals( 1169 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1170 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1171 } 1172 } finally { 1173 dir.getFileSystem(conf).delete(dir, true); 1174 } 1175 } 1176 1177 /** 1178 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1179 * family descriptors 1180 */ 1181 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1182 TaskAttemptContext context, Set<byte[]> families, int numRows) 1183 throws IOException, InterruptedException { 1184 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1185 int valLength = 10; 1186 byte valBytes[] = new byte[valLength]; 1187 1188 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1189 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1190 final byte[] qualifier = Bytes.toBytes("data"); 1191 for (int i = 0; i < numRows; i++) { 1192 Bytes.putInt(keyBytes, 0, i); 1193 Bytes.random(valBytes); 1194 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1195 for (byte[] family : families) { 1196 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1197 writer.write(key, kv); 1198 } 1199 } 1200 } 1201 1202 /** 1203 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1204 * excluded from minor compaction. Without the fix of HBASE-6901, an 1205 * ArrayIndexOutOfBoundsException will be thrown. 1206 */ 1207 @Ignore("Flakey: See HBASE-9051") 1208 @Test 1209 public void testExcludeAllFromMinorCompaction() throws Exception { 1210 Configuration conf = util.getConfiguration(); 1211 conf.setInt("hbase.hstore.compaction.min", 2); 1212 generateRandomStartKeys(5); 1213 1214 util.startMiniCluster(); 1215 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1216 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1217 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1218 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1219 assertEquals("Should start with empty table", 0, util.countRows(table)); 1220 1221 // deep inspection: get the StoreFile dir 1222 final Path storePath = 1223 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1224 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1225 Bytes.toString(FAMILIES[0]))); 1226 assertEquals(0, fs.listStatus(storePath).length); 1227 1228 // Generate two bulk load files 1229 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1230 1231 for (int i = 0; i < 2; i++) { 1232 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1233 runIncrementalPELoad(conf, 1234 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), 1235 conn.getRegionLocator(TABLE_NAMES[0]))), 1236 testDir, false); 1237 // Perform the actual load 1238 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1239 } 1240 1241 // Ensure data shows up 1242 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1243 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 1244 util.countRows(table)); 1245 1246 // should have a second StoreFile now 1247 assertEquals(2, fs.listStatus(storePath).length); 1248 1249 // minor compactions shouldn't get rid of the file 1250 admin.compact(TABLE_NAMES[0]); 1251 try { 1252 quickPoll(new Callable<Boolean>() { 1253 @Override 1254 public Boolean call() throws Exception { 1255 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1256 for (HRegion region : regions) { 1257 for (HStore store : region.getStores()) { 1258 store.closeAndArchiveCompactedFiles(); 1259 } 1260 } 1261 return fs.listStatus(storePath).length == 1; 1262 } 1263 }, 5000); 1264 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1265 } catch (AssertionError ae) { 1266 // this is expected behavior 1267 } 1268 1269 // a major compaction should work though 1270 admin.majorCompact(TABLE_NAMES[0]); 1271 quickPoll(new Callable<Boolean>() { 1272 @Override 1273 public Boolean call() throws Exception { 1274 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1275 for (HRegion region : regions) { 1276 for (HStore store : region.getStores()) { 1277 store.closeAndArchiveCompactedFiles(); 1278 } 1279 } 1280 return fs.listStatus(storePath).length == 1; 1281 } 1282 }, 5000); 1283 1284 } finally { 1285 util.shutdownMiniCluster(); 1286 } 1287 } 1288 1289 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1290 @Test 1291 public void testExcludeMinorCompaction() throws Exception { 1292 Configuration conf = util.getConfiguration(); 1293 conf.setInt("hbase.hstore.compaction.min", 2); 1294 generateRandomStartKeys(5); 1295 1296 util.startMiniCluster(); 1297 try (Connection conn = ConnectionFactory.createConnection(conf); 1298 Admin admin = conn.getAdmin()) { 1299 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1300 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1301 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1302 assertEquals("Should start with empty table", 0, util.countRows(table)); 1303 1304 // deep inspection: get the StoreFile dir 1305 final Path storePath = 1306 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1307 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1308 Bytes.toString(FAMILIES[0]))); 1309 assertEquals(0, fs.listStatus(storePath).length); 1310 1311 // put some data in it and flush to create a storefile 1312 Put p = new Put(Bytes.toBytes("test")); 1313 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1314 table.put(p); 1315 admin.flush(TABLE_NAMES[0]); 1316 assertEquals(1, util.countRows(table)); 1317 quickPoll(new Callable<Boolean>() { 1318 @Override 1319 public Boolean call() throws Exception { 1320 return fs.listStatus(storePath).length == 1; 1321 } 1322 }, 5000); 1323 1324 // Generate a bulk load file with more rows 1325 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1326 1327 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1328 runIncrementalPELoad(conf, 1329 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)), 1330 testDir, false); 1331 1332 // Perform the actual load 1333 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1334 1335 // Ensure data shows up 1336 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1337 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1, 1338 util.countRows(table)); 1339 1340 // should have a second StoreFile now 1341 assertEquals(2, fs.listStatus(storePath).length); 1342 1343 // minor compactions shouldn't get rid of the file 1344 admin.compact(TABLE_NAMES[0]); 1345 try { 1346 quickPoll(new Callable<Boolean>() { 1347 @Override 1348 public Boolean call() throws Exception { 1349 return fs.listStatus(storePath).length == 1; 1350 } 1351 }, 5000); 1352 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1353 } catch (AssertionError ae) { 1354 // this is expected behavior 1355 } 1356 1357 // a major compaction should work though 1358 admin.majorCompact(TABLE_NAMES[0]); 1359 quickPoll(new Callable<Boolean>() { 1360 @Override 1361 public Boolean call() throws Exception { 1362 return fs.listStatus(storePath).length == 1; 1363 } 1364 }, 5000); 1365 1366 } finally { 1367 util.shutdownMiniCluster(); 1368 } 1369 } 1370 1371 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1372 int sleepMs = 10; 1373 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1374 while (retries-- > 0) { 1375 if (c.call().booleanValue()) { 1376 return; 1377 } 1378 Thread.sleep(sleepMs); 1379 } 1380 fail(); 1381 } 1382 1383 public static void main(String args[]) throws Exception { 1384 new TestHFileOutputFormat2().manualTest(args); 1385 } 1386 1387 public void manualTest(String args[]) throws Exception { 1388 Configuration conf = HBaseConfiguration.create(); 1389 util = new HBaseTestingUtil(conf); 1390 if ("newtable".equals(args[0])) { 1391 TableName tname = TableName.valueOf(args[1]); 1392 byte[][] splitKeys = generateRandomSplitKeys(4); 1393 Table table = util.createTable(tname, FAMILIES, splitKeys); 1394 } else if ("incremental".equals(args[0])) { 1395 TableName tname = TableName.valueOf(args[1]); 1396 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1397 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1398 Path outDir = new Path("incremental-out"); 1399 runIncrementalPELoad(conf, 1400 Arrays 1401 .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)), 1402 outDir, false); 1403 } 1404 } else { 1405 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1406 } 1407 } 1408 1409 @Test 1410 public void testBlockStoragePolicy() throws Exception { 1411 util = new HBaseTestingUtil(); 1412 Configuration conf = util.getConfiguration(); 1413 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1414 1415 conf.set( 1416 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1417 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1418 "ONE_SSD"); 1419 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1420 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1421 util.startMiniDFSCluster(3); 1422 FileSystem fs = util.getDFSCluster().getFileSystem(); 1423 try { 1424 fs.mkdirs(cf1Dir); 1425 fs.mkdirs(cf2Dir); 1426 1427 // the original block storage policy would be HOT 1428 String spA = getStoragePolicyName(fs, cf1Dir); 1429 String spB = getStoragePolicyName(fs, cf2Dir); 1430 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1431 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1432 assertEquals("HOT", spA); 1433 assertEquals("HOT", spB); 1434 1435 // alter table cf schema to change storage policies 1436 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1437 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1438 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1439 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1440 spA = getStoragePolicyName(fs, cf1Dir); 1441 spB = getStoragePolicyName(fs, cf2Dir); 1442 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1443 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1444 assertNotNull(spA); 1445 assertEquals("ONE_SSD", spA); 1446 assertNotNull(spB); 1447 assertEquals("ALL_SSD", spB); 1448 } finally { 1449 fs.delete(cf1Dir, true); 1450 fs.delete(cf2Dir, true); 1451 util.shutdownMiniDFSCluster(); 1452 } 1453 } 1454 1455 private String getStoragePolicyName(FileSystem fs, Path path) { 1456 try { 1457 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1458 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1459 } catch (Exception e) { 1460 // Maybe fail because of using old HDFS version, try the old way 1461 if (LOG.isTraceEnabled()) { 1462 LOG.trace("Failed to get policy directly", e); 1463 } 1464 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1465 return policy == null ? "HOT" : policy;// HOT by default 1466 } 1467 } 1468 1469 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1470 try { 1471 if (fs instanceof DistributedFileSystem) { 1472 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1473 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1474 if (null != status) { 1475 byte storagePolicyId = status.getStoragePolicy(); 1476 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1477 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1478 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1479 for (BlockStoragePolicy policy : policies) { 1480 if (policy.getId() == storagePolicyId) { 1481 return policy.getName(); 1482 } 1483 } 1484 } 1485 } 1486 } 1487 } catch (Throwable e) { 1488 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1489 } 1490 1491 return null; 1492 } 1493 1494 @Test 1495 public void TestConfigurePartitioner() throws IOException { 1496 Configuration conf = util.getConfiguration(); 1497 // Create a user who is not the current user 1498 String fooUserName = "foo1234"; 1499 String fooGroupName = "group1"; 1500 UserGroupInformation ugi = 1501 UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName }); 1502 // Get user's home directory 1503 Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() { 1504 @Override 1505 public Path run() { 1506 try (FileSystem fs = FileSystem.get(conf)) { 1507 return fs.makeQualified(fs.getHomeDirectory()); 1508 } catch (IOException ioe) { 1509 LOG.error("Failed to get foo's home directory", ioe); 1510 } 1511 return null; 1512 } 1513 }); 1514 1515 Job job = Mockito.mock(Job.class); 1516 Mockito.doReturn(conf).when(job).getConfiguration(); 1517 ImmutableBytesWritable writable = new ImmutableBytesWritable(); 1518 List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>(); 1519 splitPoints.add(writable); 1520 1521 ugi.doAs(new PrivilegedAction<Void>() { 1522 @Override 1523 public Void run() { 1524 try { 1525 HFileOutputFormat2.configurePartitioner(job, splitPoints, false); 1526 } catch (IOException ioe) { 1527 LOG.error("Failed to configure partitioner", ioe); 1528 } 1529 return null; 1530 } 1531 }); 1532 FileSystem fs = FileSystem.get(conf); 1533 // verify that the job uses TotalOrderPartitioner 1534 verify(job).setPartitionerClass(TotalOrderPartitioner.class); 1535 // verify that TotalOrderPartitioner.setPartitionFile() is called. 1536 String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path"); 1537 Assert.assertNotNull(partitionPathString); 1538 // Make sure the partion file is in foo1234's home directory, and that 1539 // the file exists. 1540 Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString())); 1541 Assert.assertTrue(fs.exists(new Path(partitionPathString))); 1542 } 1543 1544 @Test 1545 public void TestConfigureCompression() throws Exception { 1546 Configuration conf = new Configuration(this.util.getConfiguration()); 1547 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1548 TaskAttemptContext context = null; 1549 Path dir = util.getDataTestDir("TestConfigureCompression"); 1550 String hfileoutputformatCompression = "gz"; 1551 1552 try { 1553 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1554 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1555 1556 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1557 1558 Job job = Job.getInstance(conf); 1559 FileOutputFormat.setOutputPath(job, dir); 1560 context = createTestTaskAttemptContext(job); 1561 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1562 writer = hof.getRecordWriter(context); 1563 final byte[] b = Bytes.toBytes("b"); 1564 1565 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1566 writer.write(new ImmutableBytesWritable(), kv); 1567 writer.close(context); 1568 writer = null; 1569 FileSystem fs = dir.getFileSystem(conf); 1570 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1571 while (iterator.hasNext()) { 1572 LocatedFileStatus keyFileStatus = iterator.next(); 1573 HFile.Reader reader = 1574 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1575 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1576 hfileoutputformatCompression); 1577 } 1578 } finally { 1579 if (writer != null && context != null) { 1580 writer.close(context); 1581 } 1582 dir.getFileSystem(conf).delete(dir, true); 1583 } 1584 1585 } 1586 1587 @Test 1588 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1589 // Start cluster A 1590 util = new HBaseTestingUtil(); 1591 Configuration confA = util.getConfiguration(); 1592 int hostCount = 3; 1593 int regionNum = 20; 1594 String[] hostnames = new String[hostCount]; 1595 for (int i = 0; i < hostCount; ++i) { 1596 hostnames[i] = "datanode_" + i; 1597 } 1598 StartTestingClusterOption option = StartTestingClusterOption.builder() 1599 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1600 util.startMiniCluster(option); 1601 1602 // Start cluster B 1603 HBaseTestingUtil utilB = new HBaseTestingUtil(); 1604 Configuration confB = utilB.getConfiguration(); 1605 utilB.startMiniCluster(option); 1606 1607 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1608 1609 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1610 TableName tableName = TableName.valueOf("table"); 1611 // Create table in cluster B 1612 try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); 1613 RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { 1614 // Generate the bulk load files 1615 // Job has zookeeper configuration for cluster A 1616 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1617 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1618 Configuration jobConf = job.getConfiguration(); 1619 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1620 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 1621 setupRandomGeneratorMapper(job, false); 1622 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1623 1624 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1625 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1626 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1627 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1628 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1629 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1630 1631 String bSpecificConfigKey = "my.override.config.for.b"; 1632 String bSpecificConfigValue = "b-specific-value"; 1633 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1634 bSpecificConfigValue); 1635 1636 FileOutputFormat.setOutputPath(job, testDir); 1637 1638 assertFalse(util.getTestFileSystem().exists(testDir)); 1639 1640 assertTrue(job.waitForCompletion(true)); 1641 1642 final List<Configuration> configs = 1643 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1644 1645 assertFalse(configs.isEmpty()); 1646 for (Configuration config : configs) { 1647 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1648 config.get(HConstants.ZOOKEEPER_QUORUM)); 1649 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1650 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1651 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1652 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1653 1654 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1655 } 1656 } finally { 1657 utilB.deleteTable(tableName); 1658 testDir.getFileSystem(confA).delete(testDir, true); 1659 util.shutdownMiniCluster(); 1660 utilB.shutdownMiniCluster(); 1661 } 1662 } 1663 1664 private static class ConfigurationCaptorConnection implements Connection { 1665 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1666 1667 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1668 1669 private final Connection delegate; 1670 1671 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, 1672 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 1673 // here we do not use this registry, so close it... 1674 registry.close(); 1675 // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection 1676 // implementation in below method 1677 delegate = 1678 FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes)) 1679 .toConnection(); 1680 1681 final String uuid = conf.get(UUID_KEY); 1682 if (uuid != null) { 1683 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1684 } 1685 } 1686 1687 static UUID configureConnectionImpl(Configuration conf) { 1688 conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 1689 ConfigurationCaptorConnection.class, Connection.class); 1690 1691 final UUID uuid = UUID.randomUUID(); 1692 conf.set(UUID_KEY, uuid.toString()); 1693 return uuid; 1694 } 1695 1696 static List<Configuration> getCapturedConfigarutions(UUID key) { 1697 return confs.get(key); 1698 } 1699 1700 @Override 1701 public Configuration getConfiguration() { 1702 return delegate.getConfiguration(); 1703 } 1704 1705 @Override 1706 public Table getTable(TableName tableName) throws IOException { 1707 return delegate.getTable(tableName); 1708 } 1709 1710 @Override 1711 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1712 return delegate.getTable(tableName, pool); 1713 } 1714 1715 @Override 1716 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1717 return delegate.getBufferedMutator(tableName); 1718 } 1719 1720 @Override 1721 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1722 return delegate.getBufferedMutator(params); 1723 } 1724 1725 @Override 1726 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1727 return delegate.getRegionLocator(tableName); 1728 } 1729 1730 @Override 1731 public void clearRegionLocationCache() { 1732 delegate.clearRegionLocationCache(); 1733 } 1734 1735 @Override 1736 public Admin getAdmin() throws IOException { 1737 return delegate.getAdmin(); 1738 } 1739 1740 @Override 1741 public void close() throws IOException { 1742 delegate.close(); 1743 } 1744 1745 @Override 1746 public boolean isClosed() { 1747 return delegate.isClosed(); 1748 } 1749 1750 @Override 1751 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1752 return delegate.getTableBuilder(tableName, pool); 1753 } 1754 1755 @Override 1756 public AsyncConnection toAsyncConnection() { 1757 return delegate.toAsyncConnection(); 1758 } 1759 1760 @Override 1761 public String getClusterId() { 1762 return delegate.getClusterId(); 1763 } 1764 1765 @Override 1766 public Hbck getHbck() throws IOException { 1767 return delegate.getHbck(); 1768 } 1769 1770 @Override 1771 public Hbck getHbck(ServerName masterServer) throws IOException { 1772 return delegate.getHbck(masterServer); 1773 } 1774 1775 @Override 1776 public void abort(String why, Throwable e) { 1777 delegate.abort(why, e); 1778 } 1779 1780 @Override 1781 public boolean isAborted() { 1782 return delegate.isAborted(); 1783 } 1784 } 1785 1786}