001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNotSame; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Random; 038import java.util.Set; 039import java.util.UUID; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.CopyOnWriteArrayList; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.ThreadLocalRandom; 045import java.util.stream.Collectors; 046import java.util.stream.Stream; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.LocatedFileStatus; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.RemoteIterator; 053import org.apache.hadoop.hbase.ArrayBackedTag; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 057import org.apache.hadoop.hbase.HBaseClassTestRule; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseTestingUtility; 060import org.apache.hadoop.hbase.HColumnDescriptor; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.HDFSBlocksDistribution; 063import org.apache.hadoop.hbase.HTableDescriptor; 064import org.apache.hadoop.hbase.HadoopShims; 065import org.apache.hadoop.hbase.KeyValue; 066import org.apache.hadoop.hbase.PerformanceEvaluation; 067import org.apache.hadoop.hbase.PrivateCellUtil; 068import org.apache.hadoop.hbase.ServerName; 069import org.apache.hadoop.hbase.StartMiniClusterOption; 070import org.apache.hadoop.hbase.TableName; 071import org.apache.hadoop.hbase.Tag; 072import org.apache.hadoop.hbase.TagType; 073import org.apache.hadoop.hbase.client.Admin; 074import org.apache.hadoop.hbase.client.BufferedMutator; 075import org.apache.hadoop.hbase.client.BufferedMutatorParams; 076import org.apache.hadoop.hbase.client.ClusterConnection; 077import org.apache.hadoop.hbase.client.Connection; 078import org.apache.hadoop.hbase.client.ConnectionFactory; 079import org.apache.hadoop.hbase.client.Hbck; 080import org.apache.hadoop.hbase.client.Put; 081import org.apache.hadoop.hbase.client.RegionLocator; 082import org.apache.hadoop.hbase.client.Result; 083import org.apache.hadoop.hbase.client.ResultScanner; 084import org.apache.hadoop.hbase.client.Scan; 085import org.apache.hadoop.hbase.client.Table; 086import org.apache.hadoop.hbase.client.TableBuilder; 087import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 088import org.apache.hadoop.hbase.io.compress.Compression; 089import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 090import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 091import org.apache.hadoop.hbase.io.hfile.CacheConfig; 092import org.apache.hadoop.hbase.io.hfile.HFile; 093import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 094import org.apache.hadoop.hbase.io.hfile.HFileScanner; 095import org.apache.hadoop.hbase.regionserver.BloomType; 096import org.apache.hadoop.hbase.regionserver.HRegion; 097import org.apache.hadoop.hbase.regionserver.HStore; 098import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 099import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 100import org.apache.hadoop.hbase.security.User; 101import org.apache.hadoop.hbase.testclassification.LargeTests; 102import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 103import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 104import org.apache.hadoop.hbase.util.Bytes; 105import org.apache.hadoop.hbase.util.CommonFSUtils; 106import org.apache.hadoop.hbase.util.FSUtils; 107import org.apache.hadoop.hbase.util.ReflectionUtils; 108import org.apache.hadoop.hdfs.DistributedFileSystem; 109import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 110import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 111import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 112import org.apache.hadoop.io.NullWritable; 113import org.apache.hadoop.mapreduce.Job; 114import org.apache.hadoop.mapreduce.Mapper; 115import org.apache.hadoop.mapreduce.RecordWriter; 116import org.apache.hadoop.mapreduce.TaskAttemptContext; 117import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 118import org.junit.ClassRule; 119import org.junit.Ignore; 120import org.junit.Test; 121import org.junit.experimental.categories.Category; 122import org.mockito.Mockito; 123import org.slf4j.Logger; 124import org.slf4j.LoggerFactory; 125 126/** 127 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 128 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 129 * values like those of {@link PerformanceEvaluation}. 130 */ 131@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 132// TODO : Remove this in 3.0 133public class TestHFileOutputFormat2 { 134 135 @ClassRule 136 public static final HBaseClassTestRule CLASS_RULE = 137 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 138 139 private final static int ROWSPERSPLIT = 1024; 140 141 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 142 private static final byte[][] FAMILIES = 143 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 144 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 145 .map(TableName::valueOf).toArray(TableName[]::new); 146 147 private HBaseTestingUtility util = new HBaseTestingUtility(); 148 149 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 150 151 /** 152 * Simple mapper that makes KeyValue output. 153 */ 154 static class RandomKVGeneratingMapper 155 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 156 157 private int keyLength; 158 private static final int KEYLEN_DEFAULT = 10; 159 private static final String KEYLEN_CONF = "randomkv.key.length"; 160 161 private int valLength; 162 private static final int VALLEN_DEFAULT = 10; 163 private static final String VALLEN_CONF = "randomkv.val.length"; 164 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 165 private boolean multiTableMapper = false; 166 private TableName[] tables = null; 167 168 @Override 169 protected void setup(Context context) throws IOException, InterruptedException { 170 super.setup(context); 171 172 Configuration conf = context.getConfiguration(); 173 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 174 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 175 multiTableMapper = 176 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 177 if (multiTableMapper) { 178 tables = TABLE_NAMES; 179 } else { 180 tables = new TableName[] { TABLE_NAMES[0] }; 181 } 182 } 183 184 @Override 185 protected void map(NullWritable n1, NullWritable n2, 186 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 187 throws java.io.IOException, InterruptedException { 188 189 byte keyBytes[] = new byte[keyLength]; 190 byte valBytes[] = new byte[valLength]; 191 192 int taskId = context.getTaskAttemptID().getTaskID().getId(); 193 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 194 byte[] key; 195 for (int j = 0; j < tables.length; ++j) { 196 for (int i = 0; i < ROWSPERSPLIT; i++) { 197 Bytes.random(keyBytes); 198 // Ensure that unique tasks generate unique keys 199 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 200 Bytes.random(valBytes); 201 key = keyBytes; 202 if (multiTableMapper) { 203 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 204 } 205 206 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 207 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 208 context.write(new ImmutableBytesWritable(key), kv); 209 } 210 } 211 } 212 } 213 } 214 215 /** 216 * Simple mapper that makes Put output. 217 */ 218 static class RandomPutGeneratingMapper 219 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 220 221 private int keyLength; 222 private static final int KEYLEN_DEFAULT = 10; 223 private static final String KEYLEN_CONF = "randomkv.key.length"; 224 225 private int valLength; 226 private static final int VALLEN_DEFAULT = 10; 227 private static final String VALLEN_CONF = "randomkv.val.length"; 228 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 229 private boolean multiTableMapper = false; 230 private TableName[] tables = null; 231 232 @Override 233 protected void setup(Context context) throws IOException, InterruptedException { 234 super.setup(context); 235 236 Configuration conf = context.getConfiguration(); 237 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 238 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 239 multiTableMapper = 240 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 241 if (multiTableMapper) { 242 tables = TABLE_NAMES; 243 } else { 244 tables = new TableName[] { TABLE_NAMES[0] }; 245 } 246 } 247 248 @Override 249 protected void map(NullWritable n1, NullWritable n2, 250 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 251 throws java.io.IOException, InterruptedException { 252 253 byte keyBytes[] = new byte[keyLength]; 254 byte valBytes[] = new byte[valLength]; 255 256 int taskId = context.getTaskAttemptID().getTaskID().getId(); 257 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 258 259 byte[] key; 260 for (int j = 0; j < tables.length; ++j) { 261 for (int i = 0; i < ROWSPERSPLIT; i++) { 262 Bytes.random(keyBytes); 263 // Ensure that unique tasks generate unique keys 264 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 265 Bytes.random(valBytes); 266 key = keyBytes; 267 if (multiTableMapper) { 268 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 269 } 270 271 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 272 Put p = new Put(keyBytes); 273 p.addColumn(family, QUALIFIER, valBytes); 274 // set TTL to very low so that the scan does not return any value 275 p.setTTL(1l); 276 context.write(new ImmutableBytesWritable(key), p); 277 } 278 } 279 } 280 } 281 } 282 283 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 284 if (putSortReducer) { 285 job.setInputFormatClass(NMapInputFormat.class); 286 job.setMapperClass(RandomPutGeneratingMapper.class); 287 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 288 job.setMapOutputValueClass(Put.class); 289 } else { 290 job.setInputFormatClass(NMapInputFormat.class); 291 job.setMapperClass(RandomKVGeneratingMapper.class); 292 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 293 job.setMapOutputValueClass(KeyValue.class); 294 } 295 } 296 297 /** 298 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 299 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 300 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 301 */ 302 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 303 @Test 304 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 305 Configuration conf = new Configuration(this.util.getConfiguration()); 306 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 307 TaskAttemptContext context = null; 308 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 309 try { 310 Job job = new Job(conf); 311 FileOutputFormat.setOutputPath(job, dir); 312 context = createTestTaskAttemptContext(job); 313 HFileOutputFormat2 hof = new HFileOutputFormat2(); 314 writer = hof.getRecordWriter(context); 315 final byte[] b = Bytes.toBytes("b"); 316 317 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 318 // changed by call to write. Check all in kv is same but ts. 319 KeyValue kv = new KeyValue(b, b, b); 320 KeyValue original = kv.clone(); 321 writer.write(new ImmutableBytesWritable(), kv); 322 assertFalse(original.equals(kv)); 323 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 324 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 325 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 326 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 327 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 328 329 // Test 2. Now test passing a kv that has explicit ts. It should not be 330 // changed by call to record write. 331 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 332 original = kv.clone(); 333 writer.write(new ImmutableBytesWritable(), kv); 334 assertTrue(original.equals(kv)); 335 } finally { 336 if (writer != null && context != null) writer.close(context); 337 dir.getFileSystem(conf).delete(dir, true); 338 } 339 } 340 341 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 342 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 343 TaskAttemptContext context = 344 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 345 return context; 346 } 347 348 /* 349 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 350 * time-restricted scans. 351 */ 352 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 353 @Test 354 public void test_TIMERANGE() throws Exception { 355 Configuration conf = new Configuration(this.util.getConfiguration()); 356 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 357 TaskAttemptContext context = null; 358 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 359 LOG.info("Timerange dir writing to dir: " + dir); 360 try { 361 // build a record writer using HFileOutputFormat2 362 Job job = new Job(conf); 363 FileOutputFormat.setOutputPath(job, dir); 364 context = createTestTaskAttemptContext(job); 365 HFileOutputFormat2 hof = new HFileOutputFormat2(); 366 writer = hof.getRecordWriter(context); 367 368 // Pass two key values with explicit times stamps 369 final byte[] b = Bytes.toBytes("b"); 370 371 // value 1 with timestamp 2000 372 KeyValue kv = new KeyValue(b, b, b, 2000, b); 373 KeyValue original = kv.clone(); 374 writer.write(new ImmutableBytesWritable(), kv); 375 assertEquals(original, kv); 376 377 // value 2 with timestamp 1000 378 kv = new KeyValue(b, b, b, 1000, b); 379 original = kv.clone(); 380 writer.write(new ImmutableBytesWritable(), kv); 381 assertEquals(original, kv); 382 383 // verify that the file has the proper FileInfo. 384 writer.close(context); 385 386 // the generated file lives 1 directory down from the attempt directory 387 // and is the only file, e.g. 388 // _attempt__0000_r_000000_0/b/1979617994050536795 389 FileSystem fs = FileSystem.get(conf); 390 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 391 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 392 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 393 394 // open as HFile Reader and pull out TIMERANGE FileInfo. 395 HFile.Reader rd = 396 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 397 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 398 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 399 assertNotNull(range); 400 401 // unmarshall and check values. 402 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 403 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 404 assertEquals(1000, timeRangeTracker.getMin()); 405 assertEquals(2000, timeRangeTracker.getMax()); 406 rd.close(); 407 } finally { 408 if (writer != null && context != null) writer.close(context); 409 dir.getFileSystem(conf).delete(dir, true); 410 } 411 } 412 413 /** 414 * Run small MR job. 415 */ 416 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 417 @Test 418 public void testWritingPEData() throws Exception { 419 Configuration conf = util.getConfiguration(); 420 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 421 FileSystem fs = testDir.getFileSystem(conf); 422 423 // Set down this value or we OOME in eclipse. 424 conf.setInt("mapreduce.task.io.sort.mb", 20); 425 // Write a few files. 426 long hregionMaxFilesize = 10 * 1024; 427 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 428 429 Job job = new Job(conf, "testWritingPEData"); 430 setupRandomGeneratorMapper(job, false); 431 // This partitioner doesn't work well for number keys but using it anyways 432 // just to demonstrate how to configure it. 433 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 434 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 435 436 Arrays.fill(startKey, (byte) 0); 437 Arrays.fill(endKey, (byte) 0xff); 438 439 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 440 // Set start and end rows for partitioner. 441 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 442 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 443 job.setReducerClass(KeyValueSortReducer.class); 444 job.setOutputFormatClass(HFileOutputFormat2.class); 445 job.setNumReduceTasks(4); 446 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 447 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 448 KeyValueSerialization.class.getName()); 449 450 FileOutputFormat.setOutputPath(job, testDir); 451 assertTrue(job.waitForCompletion(false)); 452 FileStatus[] files = fs.listStatus(testDir); 453 assertTrue(files.length > 0); 454 455 // check output file num and size. 456 for (byte[] family : FAMILIES) { 457 long kvCount = 0; 458 RemoteIterator<LocatedFileStatus> iterator = 459 fs.listFiles(testDir.suffix("/" + new String(family)), true); 460 while (iterator.hasNext()) { 461 LocatedFileStatus keyFileStatus = iterator.next(); 462 HFile.Reader reader = 463 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 464 HFileScanner scanner = reader.getScanner(conf, false, false, false); 465 466 kvCount += reader.getEntries(); 467 scanner.seekTo(); 468 long perKVSize = scanner.getCell().getSerializedSize(); 469 assertTrue("Data size of each file should not be too large.", 470 perKVSize * reader.getEntries() <= hregionMaxFilesize); 471 } 472 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 473 } 474 } 475 476 /** 477 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 478 */ 479 @Test 480 public void test_WritingTagData() throws Exception { 481 Configuration conf = new Configuration(this.util.getConfiguration()); 482 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 483 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 484 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 485 TaskAttemptContext context = null; 486 Path dir = util.getDataTestDir("WritingTagData"); 487 try { 488 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 489 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 490 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 491 Job job = new Job(conf); 492 FileOutputFormat.setOutputPath(job, dir); 493 context = createTestTaskAttemptContext(job); 494 HFileOutputFormat2 hof = new HFileOutputFormat2(); 495 writer = hof.getRecordWriter(context); 496 final byte[] b = Bytes.toBytes("b"); 497 498 List<Tag> tags = new ArrayList<>(); 499 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 500 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 501 writer.write(new ImmutableBytesWritable(), kv); 502 writer.close(context); 503 writer = null; 504 FileSystem fs = dir.getFileSystem(conf); 505 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 506 while (iterator.hasNext()) { 507 LocatedFileStatus keyFileStatus = iterator.next(); 508 HFile.Reader reader = 509 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 510 HFileScanner scanner = reader.getScanner(conf, false, false, false); 511 scanner.seekTo(); 512 Cell cell = scanner.getCell(); 513 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 514 assertTrue(tagsFromCell.size() > 0); 515 for (Tag tag : tagsFromCell) { 516 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 517 } 518 } 519 } finally { 520 if (writer != null && context != null) writer.close(context); 521 dir.getFileSystem(conf).delete(dir, true); 522 } 523 } 524 525 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 526 @Test 527 public void testJobConfiguration() throws Exception { 528 Configuration conf = new Configuration(this.util.getConfiguration()); 529 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 530 util.getDataTestDir("testJobConfiguration").toString()); 531 Job job = new Job(conf); 532 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 533 Table table = Mockito.mock(Table.class); 534 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 535 setupMockStartKeys(regionLocator); 536 setupMockTableName(regionLocator); 537 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 538 assertEquals(job.getNumReduceTasks(), 4); 539 } 540 541 private byte[][] generateRandomStartKeys(int numKeys) { 542 Random random = ThreadLocalRandom.current(); 543 byte[][] ret = new byte[numKeys][]; 544 // first region start key is always empty 545 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 546 for (int i = 1; i < numKeys; i++) { 547 ret[i] = 548 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 549 } 550 return ret; 551 } 552 553 private byte[][] generateRandomSplitKeys(int numKeys) { 554 Random random = ThreadLocalRandom.current(); 555 byte[][] ret = new byte[numKeys][]; 556 for (int i = 0; i < numKeys; i++) { 557 ret[i] = 558 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 559 } 560 return ret; 561 } 562 563 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 564 @Test 565 public void testMRIncrementalLoad() throws Exception { 566 LOG.info("\nStarting test testMRIncrementalLoad\n"); 567 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 568 } 569 570 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 571 @Test 572 public void testMRIncrementalLoadWithSplit() throws Exception { 573 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 574 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 575 } 576 577 /** 578 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 579 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 580 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 581 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 582 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 583 * locality features more easily. 584 */ 585 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 586 @Test 587 public void testMRIncrementalLoadWithLocality() throws Exception { 588 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 589 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 590 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 591 } 592 593 // @Ignore("Wahtevs") 594 @Test 595 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 596 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 597 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 598 } 599 600 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 601 boolean putSortReducer, String tableStr) throws Exception { 602 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 603 Arrays.asList(tableStr)); 604 } 605 606 @Test 607 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 608 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 609 doIncrementalLoadTest(false, false, true, 610 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 611 } 612 613 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 614 boolean putSortReducer, List<String> tableStr) throws Exception { 615 util = new HBaseTestingUtility(); 616 Configuration conf = util.getConfiguration(); 617 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 618 int hostCount = 1; 619 int regionNum = 5; 620 if (shouldKeepLocality) { 621 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 622 // explicit hostnames parameter just like MiniDFSCluster does. 623 hostCount = 3; 624 regionNum = 20; 625 } 626 627 String[] hostnames = new String[hostCount]; 628 for (int i = 0; i < hostCount; ++i) { 629 hostnames[i] = "datanode_" + i; 630 } 631 StartMiniClusterOption option = 632 StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 633 util.startMiniCluster(option); 634 635 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 636 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 637 boolean writeMultipleTables = tableStr.size() > 1; 638 for (String tableStrSingle : tableStr) { 639 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 640 TableName tableName = TableName.valueOf(tableStrSingle); 641 Table table = util.createTable(tableName, FAMILIES, splitKeys); 642 643 RegionLocator r = util.getConnection().getRegionLocator(tableName); 644 assertEquals("Should start with empty table", 0, util.countRows(table)); 645 int numRegions = r.getStartKeys().length; 646 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 647 648 allTables.put(tableStrSingle, table); 649 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 650 } 651 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 652 // Generate the bulk load files 653 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 654 655 for (Table tableSingle : allTables.values()) { 656 // This doesn't write into the table, just makes files 657 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 658 } 659 int numTableDirs = 0; 660 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 661 Path tablePath = testDir; 662 663 if (writeMultipleTables) { 664 if (allTables.containsKey(tf.getPath().getName())) { 665 ++numTableDirs; 666 tablePath = tf.getPath(); 667 } else { 668 continue; 669 } 670 } 671 672 // Make sure that a directory was created for every CF 673 int dir = 0; 674 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 675 for (byte[] family : FAMILIES) { 676 if (Bytes.toString(family).equals(f.getPath().getName())) { 677 ++dir; 678 } 679 } 680 } 681 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 682 } 683 if (writeMultipleTables) { 684 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 685 } 686 687 Admin admin = util.getConnection().getAdmin(); 688 try { 689 // handle the split case 690 if (shouldChangeRegions) { 691 Table chosenTable = allTables.values().iterator().next(); 692 // Choose a semi-random table if multiple tables are available 693 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 694 admin.disableTable(chosenTable.getName()); 695 util.waitUntilNoRegionsInTransition(); 696 697 util.deleteTable(chosenTable.getName()); 698 byte[][] newSplitKeys = generateRandomSplitKeys(14); 699 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 700 701 while ( 702 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 703 .size() != 15 || !admin.isTableAvailable(table.getName()) 704 ) { 705 Thread.sleep(200); 706 LOG.info("Waiting for new region assignment to happen"); 707 } 708 } 709 710 // Perform the actual load 711 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 712 Path tableDir = testDir; 713 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 714 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 715 if (writeMultipleTables) { 716 tableDir = new Path(testDir, tableNameStr); 717 } 718 Table currentTable = allTables.get(tableNameStr); 719 TableName currentTableName = currentTable.getName(); 720 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, 721 singleTableInfo.getRegionLocator()); 722 723 // Ensure data shows up 724 int expectedRows = 0; 725 if (putSortReducer) { 726 // no rows should be extracted 727 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 728 util.countRows(currentTable)); 729 } else { 730 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 731 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 732 util.countRows(currentTable)); 733 Scan scan = new Scan(); 734 ResultScanner results = currentTable.getScanner(scan); 735 for (Result res : results) { 736 assertEquals(FAMILIES.length, res.rawCells().length); 737 Cell first = res.rawCells()[0]; 738 for (Cell kv : res.rawCells()) { 739 assertTrue(CellUtil.matchingRows(first, kv)); 740 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 741 } 742 } 743 results.close(); 744 } 745 String tableDigestBefore = util.checksumRows(currentTable); 746 // Check region locality 747 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 748 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 749 hbd.add(region.getHDFSBlocksDistribution()); 750 } 751 for (String hostname : hostnames) { 752 float locality = hbd.getBlockLocalityIndex(hostname); 753 LOG.info("locality of [" + hostname + "]: " + locality); 754 assertEquals(100, (int) (locality * 100)); 755 } 756 757 // Cause regions to reopen 758 admin.disableTable(currentTableName); 759 while (!admin.isTableDisabled(currentTableName)) { 760 Thread.sleep(200); 761 LOG.info("Waiting for table to disable"); 762 } 763 admin.enableTable(currentTableName); 764 util.waitTableAvailable(currentTableName); 765 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 766 util.checksumRows(currentTable)); 767 } 768 } finally { 769 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 770 tableInfoSingle.getRegionLocator().close(); 771 } 772 for (Entry<String, Table> singleTable : allTables.entrySet()) { 773 singleTable.getValue().close(); 774 util.deleteTable(singleTable.getValue().getName()); 775 } 776 testDir.getFileSystem(conf).delete(testDir, true); 777 util.shutdownMiniCluster(); 778 } 779 } 780 781 private void runIncrementalPELoad(Configuration conf, 782 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 783 throws IOException, InterruptedException, ClassNotFoundException { 784 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 785 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 786 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 787 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 788 KeyValueSerialization.class.getName()); 789 setupRandomGeneratorMapper(job, putSortReducer); 790 if (tableInfo.size() > 1) { 791 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 792 int sum = 0; 793 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 794 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 795 } 796 assertEquals(sum, job.getNumReduceTasks()); 797 } else { 798 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 799 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 800 regionLocator); 801 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 802 } 803 804 FileOutputFormat.setOutputPath(job, outDir); 805 806 assertFalse(util.getTestFileSystem().exists(outDir)); 807 808 assertTrue(job.waitForCompletion(true)); 809 } 810 811 /** 812 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 813 * family compression map is correctly serialized into and deserialized from configuration n 814 */ 815 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 816 @Test 817 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 818 for (int numCfs = 0; numCfs <= 3; numCfs++) { 819 Configuration conf = new Configuration(this.util.getConfiguration()); 820 Map<String, Compression.Algorithm> familyToCompression = 821 getMockColumnFamiliesForCompression(numCfs); 822 Table table = Mockito.mock(Table.class); 823 setupMockColumnFamiliesForCompression(table, familyToCompression); 824 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 825 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 826 Arrays.asList(table.getTableDescriptor()))); 827 828 // read back family specific compression setting from the configuration 829 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 830 HFileOutputFormat2.createFamilyCompressionMap(conf); 831 832 // test that we have a value for all column families that matches with the 833 // used mock values 834 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 835 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 836 entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 837 } 838 } 839 } 840 841 private void setupMockColumnFamiliesForCompression(Table table, 842 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 843 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 844 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 845 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 846 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 847 } 848 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 849 } 850 851 /** 852 * @return a map from column family names to compression algorithms for testing column family 853 * compression. Column family names have special characters 854 */ 855 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 856 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 857 // use column family names having special characters 858 if (numCfs-- > 0) { 859 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 860 } 861 if (numCfs-- > 0) { 862 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 863 } 864 if (numCfs-- > 0) { 865 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 866 } 867 if (numCfs-- > 0) { 868 familyToCompression.put("Family3", Compression.Algorithm.NONE); 869 } 870 return familyToCompression; 871 } 872 873 /** 874 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 875 * family bloom type map is correctly serialized into and deserialized from configuration n 876 */ 877 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 878 @Test 879 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 880 for (int numCfs = 0; numCfs <= 2; numCfs++) { 881 Configuration conf = new Configuration(this.util.getConfiguration()); 882 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 883 Table table = Mockito.mock(Table.class); 884 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 885 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 886 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 887 Arrays.asList(table.getTableDescriptor()))); 888 889 // read back family specific data block encoding settings from the 890 // configuration 891 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 892 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 893 894 // test that we have a value for all column families that matches with the 895 // used mock values 896 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 897 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 898 entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 899 } 900 } 901 } 902 903 private void setupMockColumnFamiliesForBloomType(Table table, 904 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 905 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 906 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 907 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 908 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 909 } 910 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 911 } 912 913 /** 914 * @return a map from column family names to compression algorithms for testing column family 915 * compression. Column family names have special characters 916 */ 917 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 918 Map<String, BloomType> familyToBloomType = new HashMap<>(); 919 // use column family names having special characters 920 if (numCfs-- > 0) { 921 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 922 } 923 if (numCfs-- > 0) { 924 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 925 } 926 if (numCfs-- > 0) { 927 familyToBloomType.put("Family3", BloomType.NONE); 928 } 929 return familyToBloomType; 930 } 931 932 /** 933 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 934 * family block size map is correctly serialized into and deserialized from configuration n 935 */ 936 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 937 @Test 938 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 939 for (int numCfs = 0; numCfs <= 3; numCfs++) { 940 Configuration conf = new Configuration(this.util.getConfiguration()); 941 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 942 Table table = Mockito.mock(Table.class); 943 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 944 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 945 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 946 Arrays.asList(table.getTableDescriptor()))); 947 948 // read back family specific data block encoding settings from the 949 // configuration 950 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 951 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 952 953 // test that we have a value for all column families that matches with the 954 // used mock values 955 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 956 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 957 entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 958 } 959 } 960 } 961 962 private void setupMockColumnFamiliesForBlockSize(Table table, 963 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 964 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 965 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 966 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 967 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 968 } 969 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 970 } 971 972 /** 973 * @return a map from column family names to compression algorithms for testing column family 974 * compression. Column family names have special characters 975 */ 976 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 977 Map<String, Integer> familyToBlockSize = new HashMap<>(); 978 // use column family names having special characters 979 if (numCfs-- > 0) { 980 familyToBlockSize.put("Family1!@#!@#&", 1234); 981 } 982 if (numCfs-- > 0) { 983 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 984 } 985 if (numCfs-- > 0) { 986 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 987 } 988 if (numCfs-- > 0) { 989 familyToBlockSize.put("Family3", 0); 990 } 991 return familyToBlockSize; 992 } 993 994 /** 995 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 996 * the family data block encoding map is correctly serialized into and deserialized from 997 * configuration n 998 */ 999 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1000 @Test 1001 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1002 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1003 Configuration conf = new Configuration(this.util.getConfiguration()); 1004 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1005 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1006 Table table = Mockito.mock(Table.class); 1007 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 1008 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1009 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1010 HFileOutputFormat2.serializeColumnFamilyAttribute( 1011 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 1012 1013 // read back family specific data block encoding settings from the 1014 // configuration 1015 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1016 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1017 1018 // test that we have a value for all column families that matches with the 1019 // used mock values 1020 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1021 assertEquals( 1022 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1023 entry.getValue(), 1024 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1025 } 1026 } 1027 } 1028 1029 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1030 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1031 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1032 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1033 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 1034 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 1035 } 1036 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1037 } 1038 1039 /** 1040 * @return a map from column family names to compression algorithms for testing column family 1041 * compression. Column family names have special characters 1042 */ 1043 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1044 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1045 // use column family names having special characters 1046 if (numCfs-- > 0) { 1047 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1048 } 1049 if (numCfs-- > 0) { 1050 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1051 } 1052 if (numCfs-- > 0) { 1053 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1054 } 1055 if (numCfs-- > 0) { 1056 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1057 } 1058 return familyToDataBlockEncoding; 1059 } 1060 1061 private void setupMockStartKeys(RegionLocator table) throws IOException { 1062 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1063 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1064 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1065 } 1066 1067 private void setupMockTableName(RegionLocator table) throws IOException { 1068 TableName mockTableName = TableName.valueOf("mock_table"); 1069 Mockito.doReturn(mockTableName).when(table).getName(); 1070 } 1071 1072 /** 1073 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1074 * from the column family descriptor 1075 */ 1076 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1077 @Test 1078 public void testColumnFamilySettings() throws Exception { 1079 Configuration conf = new Configuration(this.util.getConfiguration()); 1080 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1081 TaskAttemptContext context = null; 1082 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1083 1084 // Setup table descriptor 1085 Table table = Mockito.mock(Table.class); 1086 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1087 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1088 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1089 for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) { 1090 htd.addFamily(hcd); 1091 } 1092 1093 // set up the table to return some mock keys 1094 setupMockStartKeys(regionLocator); 1095 1096 try { 1097 // partial map red setup to get an operational writer for testing 1098 // We turn off the sequence file compression, because DefaultCodec 1099 // pollutes the GZip codec pool with an incompatible compressor. 1100 conf.set("io.seqfile.compression.type", "NONE"); 1101 conf.set("hbase.fs.tmp.dir", dir.toString()); 1102 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1103 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1104 1105 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1106 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1107 setupRandomGeneratorMapper(job, false); 1108 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1109 FileOutputFormat.setOutputPath(job, dir); 1110 context = createTestTaskAttemptContext(job); 1111 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1112 writer = hof.getRecordWriter(context); 1113 1114 // write out random rows 1115 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1116 writer.close(context); 1117 1118 // Make sure that a directory was created for every CF 1119 FileSystem fs = dir.getFileSystem(conf); 1120 1121 // commit so that the filesystem has one directory per column family 1122 hof.getOutputCommitter(context).commitTask(context); 1123 hof.getOutputCommitter(context).commitJob(context); 1124 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1125 assertEquals(htd.getFamilies().size(), families.length); 1126 for (FileStatus f : families) { 1127 String familyStr = f.getPath().getName(); 1128 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1129 // verify that the compression on this file matches the configured 1130 // compression 1131 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1132 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1133 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1134 1135 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1136 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1137 assertEquals( 1138 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1139 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1140 assertEquals( 1141 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1142 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1143 } 1144 } finally { 1145 dir.getFileSystem(conf).delete(dir, true); 1146 } 1147 } 1148 1149 /** 1150 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1151 * family descriptors 1152 */ 1153 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1154 TaskAttemptContext context, Set<byte[]> families, int numRows) 1155 throws IOException, InterruptedException { 1156 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1157 int valLength = 10; 1158 byte valBytes[] = new byte[valLength]; 1159 1160 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1161 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1162 final byte[] qualifier = Bytes.toBytes("data"); 1163 for (int i = 0; i < numRows; i++) { 1164 Bytes.putInt(keyBytes, 0, i); 1165 Bytes.random(valBytes); 1166 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1167 for (byte[] family : families) { 1168 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1169 writer.write(key, kv); 1170 } 1171 } 1172 } 1173 1174 /** 1175 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1176 * excluded from minor compaction. Without the fix of HBASE-6901, an 1177 * ArrayIndexOutOfBoundsException will be thrown. 1178 */ 1179 @Ignore("Flakey: See HBASE-9051") 1180 @Test 1181 public void testExcludeAllFromMinorCompaction() throws Exception { 1182 Configuration conf = util.getConfiguration(); 1183 conf.setInt("hbase.hstore.compaction.min", 2); 1184 generateRandomStartKeys(5); 1185 1186 util.startMiniCluster(); 1187 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1188 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1189 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1190 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1191 assertEquals("Should start with empty table", 0, util.countRows(table)); 1192 1193 // deep inspection: get the StoreFile dir 1194 final Path storePath = 1195 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1196 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1197 Bytes.toString(FAMILIES[0]))); 1198 assertEquals(0, fs.listStatus(storePath).length); 1199 1200 // Generate two bulk load files 1201 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1202 1203 for (int i = 0; i < 2; i++) { 1204 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1205 runIncrementalPELoad(conf, 1206 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), 1207 conn.getRegionLocator(TABLE_NAMES[0]))), 1208 testDir, false); 1209 // Perform the actual load 1210 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1211 } 1212 1213 // Ensure data shows up 1214 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1215 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 1216 util.countRows(table)); 1217 1218 // should have a second StoreFile now 1219 assertEquals(2, fs.listStatus(storePath).length); 1220 1221 // minor compactions shouldn't get rid of the file 1222 admin.compact(TABLE_NAMES[0]); 1223 try { 1224 quickPoll(new Callable<Boolean>() { 1225 @Override 1226 public Boolean call() throws Exception { 1227 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1228 for (HRegion region : regions) { 1229 for (HStore store : region.getStores()) { 1230 store.closeAndArchiveCompactedFiles(); 1231 } 1232 } 1233 return fs.listStatus(storePath).length == 1; 1234 } 1235 }, 5000); 1236 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1237 } catch (AssertionError ae) { 1238 // this is expected behavior 1239 } 1240 1241 // a major compaction should work though 1242 admin.majorCompact(TABLE_NAMES[0]); 1243 quickPoll(new Callable<Boolean>() { 1244 @Override 1245 public Boolean call() throws Exception { 1246 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1247 for (HRegion region : regions) { 1248 for (HStore store : region.getStores()) { 1249 store.closeAndArchiveCompactedFiles(); 1250 } 1251 } 1252 return fs.listStatus(storePath).length == 1; 1253 } 1254 }, 5000); 1255 1256 } finally { 1257 util.shutdownMiniCluster(); 1258 } 1259 } 1260 1261 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1262 @Test 1263 public void testExcludeMinorCompaction() throws Exception { 1264 Configuration conf = util.getConfiguration(); 1265 conf.setInt("hbase.hstore.compaction.min", 2); 1266 generateRandomStartKeys(5); 1267 1268 util.startMiniCluster(); 1269 try (Connection conn = ConnectionFactory.createConnection(conf); 1270 Admin admin = conn.getAdmin()) { 1271 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1272 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1273 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1274 assertEquals("Should start with empty table", 0, util.countRows(table)); 1275 1276 // deep inspection: get the StoreFile dir 1277 final Path storePath = 1278 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1279 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1280 Bytes.toString(FAMILIES[0]))); 1281 assertEquals(0, fs.listStatus(storePath).length); 1282 1283 // put some data in it and flush to create a storefile 1284 Put p = new Put(Bytes.toBytes("test")); 1285 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1286 table.put(p); 1287 admin.flush(TABLE_NAMES[0]); 1288 assertEquals(1, util.countRows(table)); 1289 quickPoll(new Callable<Boolean>() { 1290 @Override 1291 public Boolean call() throws Exception { 1292 return fs.listStatus(storePath).length == 1; 1293 } 1294 }, 5000); 1295 1296 // Generate a bulk load file with more rows 1297 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1298 1299 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1300 runIncrementalPELoad(conf, 1301 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)), 1302 testDir, false); 1303 1304 // Perform the actual load 1305 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1306 1307 // Ensure data shows up 1308 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1309 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1, 1310 util.countRows(table)); 1311 1312 // should have a second StoreFile now 1313 assertEquals(2, fs.listStatus(storePath).length); 1314 1315 // minor compactions shouldn't get rid of the file 1316 admin.compact(TABLE_NAMES[0]); 1317 try { 1318 quickPoll(new Callable<Boolean>() { 1319 @Override 1320 public Boolean call() throws Exception { 1321 return fs.listStatus(storePath).length == 1; 1322 } 1323 }, 5000); 1324 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1325 } catch (AssertionError ae) { 1326 // this is expected behavior 1327 } 1328 1329 // a major compaction should work though 1330 admin.majorCompact(TABLE_NAMES[0]); 1331 quickPoll(new Callable<Boolean>() { 1332 @Override 1333 public Boolean call() throws Exception { 1334 return fs.listStatus(storePath).length == 1; 1335 } 1336 }, 5000); 1337 1338 } finally { 1339 util.shutdownMiniCluster(); 1340 } 1341 } 1342 1343 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1344 int sleepMs = 10; 1345 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1346 while (retries-- > 0) { 1347 if (c.call().booleanValue()) { 1348 return; 1349 } 1350 Thread.sleep(sleepMs); 1351 } 1352 fail(); 1353 } 1354 1355 public static void main(String args[]) throws Exception { 1356 new TestHFileOutputFormat2().manualTest(args); 1357 } 1358 1359 public void manualTest(String args[]) throws Exception { 1360 Configuration conf = HBaseConfiguration.create(); 1361 util = new HBaseTestingUtility(conf); 1362 if ("newtable".equals(args[0])) { 1363 TableName tname = TableName.valueOf(args[1]); 1364 byte[][] splitKeys = generateRandomSplitKeys(4); 1365 Table table = util.createTable(tname, FAMILIES, splitKeys); 1366 } else if ("incremental".equals(args[0])) { 1367 TableName tname = TableName.valueOf(args[1]); 1368 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1369 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1370 Path outDir = new Path("incremental-out"); 1371 runIncrementalPELoad(conf, 1372 Arrays.asList( 1373 new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)), 1374 outDir, false); 1375 } 1376 } else { 1377 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1378 } 1379 } 1380 1381 @Test 1382 public void testBlockStoragePolicy() throws Exception { 1383 util = new HBaseTestingUtility(); 1384 Configuration conf = util.getConfiguration(); 1385 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1386 1387 conf.set( 1388 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1389 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1390 "ONE_SSD"); 1391 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1392 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1393 util.startMiniDFSCluster(3); 1394 FileSystem fs = util.getDFSCluster().getFileSystem(); 1395 try { 1396 fs.mkdirs(cf1Dir); 1397 fs.mkdirs(cf2Dir); 1398 1399 // the original block storage policy would be HOT 1400 String spA = getStoragePolicyName(fs, cf1Dir); 1401 String spB = getStoragePolicyName(fs, cf2Dir); 1402 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1403 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1404 assertEquals("HOT", spA); 1405 assertEquals("HOT", spB); 1406 1407 // alter table cf schema to change storage policies 1408 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1409 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1410 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1411 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1412 spA = getStoragePolicyName(fs, cf1Dir); 1413 spB = getStoragePolicyName(fs, cf2Dir); 1414 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1415 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1416 assertNotNull(spA); 1417 assertEquals("ONE_SSD", spA); 1418 assertNotNull(spB); 1419 assertEquals("ALL_SSD", spB); 1420 } finally { 1421 fs.delete(cf1Dir, true); 1422 fs.delete(cf2Dir, true); 1423 util.shutdownMiniDFSCluster(); 1424 } 1425 } 1426 1427 private String getStoragePolicyName(FileSystem fs, Path path) { 1428 try { 1429 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1430 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1431 } catch (Exception e) { 1432 // Maybe fail because of using old HDFS version, try the old way 1433 if (LOG.isTraceEnabled()) { 1434 LOG.trace("Failed to get policy directly", e); 1435 } 1436 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1437 return policy == null ? "HOT" : policy;// HOT by default 1438 } 1439 } 1440 1441 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1442 try { 1443 if (fs instanceof DistributedFileSystem) { 1444 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1445 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1446 if (null != status) { 1447 byte storagePolicyId = status.getStoragePolicy(); 1448 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1449 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1450 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1451 for (BlockStoragePolicy policy : policies) { 1452 if (policy.getId() == storagePolicyId) { 1453 return policy.getName(); 1454 } 1455 } 1456 } 1457 } 1458 } 1459 } catch (Throwable e) { 1460 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1461 } 1462 1463 return null; 1464 } 1465 1466 @Test 1467 public void TestConfigureCompression() throws Exception { 1468 Configuration conf = new Configuration(this.util.getConfiguration()); 1469 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1470 TaskAttemptContext context = null; 1471 Path dir = util.getDataTestDir("TestConfigureCompression"); 1472 String hfileoutputformatCompression = "gz"; 1473 1474 try { 1475 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1476 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1477 1478 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1479 1480 Job job = Job.getInstance(conf); 1481 FileOutputFormat.setOutputPath(job, dir); 1482 context = createTestTaskAttemptContext(job); 1483 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1484 writer = hof.getRecordWriter(context); 1485 final byte[] b = Bytes.toBytes("b"); 1486 1487 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1488 writer.write(new ImmutableBytesWritable(), kv); 1489 writer.close(context); 1490 writer = null; 1491 FileSystem fs = dir.getFileSystem(conf); 1492 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1493 while (iterator.hasNext()) { 1494 LocatedFileStatus keyFileStatus = iterator.next(); 1495 HFile.Reader reader = 1496 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1497 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1498 hfileoutputformatCompression); 1499 } 1500 } finally { 1501 if (writer != null && context != null) { 1502 writer.close(context); 1503 } 1504 dir.getFileSystem(conf).delete(dir, true); 1505 } 1506 1507 } 1508 1509 @Test 1510 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1511 // Start cluster A 1512 util = new HBaseTestingUtility(); 1513 Configuration confA = util.getConfiguration(); 1514 int hostCount = 3; 1515 int regionNum = 20; 1516 String[] hostnames = new String[hostCount]; 1517 for (int i = 0; i < hostCount; ++i) { 1518 hostnames[i] = "datanode_" + i; 1519 } 1520 StartMiniClusterOption option = 1521 StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1522 util.startMiniCluster(option); 1523 1524 // Start cluster B 1525 HBaseTestingUtility utilB = new HBaseTestingUtility(); 1526 Configuration confB = utilB.getConfiguration(); 1527 utilB.startMiniCluster(option); 1528 1529 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1530 1531 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1532 TableName tableName = TableName.valueOf("table"); 1533 // Create table in cluster B 1534 try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); 1535 RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { 1536 // Generate the bulk load files 1537 // Job has zookeeper configuration for cluster A 1538 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1539 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1540 Configuration jobConf = job.getConfiguration(); 1541 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1542 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 1543 setupRandomGeneratorMapper(job, false); 1544 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1545 1546 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1547 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1548 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1549 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1550 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1551 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1552 1553 String bSpecificConfigKey = "my.override.config.for.b"; 1554 String bSpecificConfigValue = "b-specific-value"; 1555 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1556 bSpecificConfigValue); 1557 1558 FileOutputFormat.setOutputPath(job, testDir); 1559 1560 assertFalse(util.getTestFileSystem().exists(testDir)); 1561 1562 assertTrue(job.waitForCompletion(true)); 1563 1564 final List<Configuration> configs = 1565 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1566 1567 assertFalse(configs.isEmpty()); 1568 for (Configuration config : configs) { 1569 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1570 config.get(HConstants.ZOOKEEPER_QUORUM)); 1571 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1572 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1573 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1574 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1575 1576 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1577 } 1578 } finally { 1579 utilB.deleteTable(tableName); 1580 testDir.getFileSystem(confA).delete(testDir, true); 1581 util.shutdownMiniCluster(); 1582 utilB.shutdownMiniCluster(); 1583 } 1584 } 1585 1586 private static class ConfigurationCaptorConnection implements Connection { 1587 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1588 1589 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1590 1591 private final Connection delegate; 1592 1593 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user) 1594 throws IOException { 1595 Configuration confForDelegate = new Configuration(conf); 1596 confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL); 1597 delegate = createConnection(confForDelegate, es, user); 1598 1599 final String uuid = conf.get(UUID_KEY); 1600 if (uuid != null) { 1601 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1602 } 1603 } 1604 1605 static UUID configureConnectionImpl(Configuration conf) { 1606 conf.setClass(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 1607 ConfigurationCaptorConnection.class, Connection.class); 1608 1609 final UUID uuid = UUID.randomUUID(); 1610 conf.set(UUID_KEY, uuid.toString()); 1611 return uuid; 1612 } 1613 1614 static List<Configuration> getCapturedConfigarutions(UUID key) { 1615 return confs.get(key); 1616 } 1617 1618 @Override 1619 public Configuration getConfiguration() { 1620 return delegate.getConfiguration(); 1621 } 1622 1623 @Override 1624 public Table getTable(TableName tableName) throws IOException { 1625 return delegate.getTable(tableName); 1626 } 1627 1628 @Override 1629 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1630 return delegate.getTable(tableName, pool); 1631 } 1632 1633 @Override 1634 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1635 return delegate.getBufferedMutator(tableName); 1636 } 1637 1638 @Override 1639 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1640 return delegate.getBufferedMutator(params); 1641 } 1642 1643 @Override 1644 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1645 return delegate.getRegionLocator(tableName); 1646 } 1647 1648 @Override 1649 public void clearRegionLocationCache() { 1650 delegate.clearRegionLocationCache(); 1651 } 1652 1653 @Override 1654 public Admin getAdmin() throws IOException { 1655 return delegate.getAdmin(); 1656 } 1657 1658 @Override 1659 public void close() throws IOException { 1660 delegate.close(); 1661 } 1662 1663 @Override 1664 public boolean isClosed() { 1665 return delegate.isClosed(); 1666 } 1667 1668 @Override 1669 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1670 return delegate.getTableBuilder(tableName, pool); 1671 } 1672 1673 @Override 1674 public Hbck getHbck() throws IOException { 1675 return delegate.getHbck(); 1676 } 1677 1678 @Override 1679 public Hbck getHbck(ServerName masterServer) throws IOException { 1680 return delegate.getHbck(masterServer); 1681 } 1682 1683 @Override 1684 public void abort(String why, Throwable e) { 1685 delegate.abort(why, e); 1686 } 1687 1688 @Override 1689 public boolean isAborted() { 1690 return delegate.isAborted(); 1691 } 1692 1693 @Override 1694 public String getClusterId() { 1695 return delegate.getClusterId(); 1696 } 1697 } 1698}