001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Random; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.LocatedFileStatus; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.RemoteIterator; 047import org.apache.hadoop.hbase.ArrayBackedTag; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HDFSBlocksDistribution; 057import org.apache.hadoop.hbase.HTableDescriptor; 058import org.apache.hadoop.hbase.HadoopShims; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.PerformanceEvaluation; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.Tag; 064import org.apache.hadoop.hbase.TagType; 065import org.apache.hadoop.hbase.client.Admin; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.Put; 069import org.apache.hadoop.hbase.client.RegionLocator; 070import org.apache.hadoop.hbase.client.Result; 071import org.apache.hadoop.hbase.client.ResultScanner; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.client.Table; 074import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 075import org.apache.hadoop.hbase.io.compress.Compression; 076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 077import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.regionserver.BloomType; 083import org.apache.hadoop.hbase.regionserver.HRegion; 084import org.apache.hadoop.hbase.regionserver.HStore; 085import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 086import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 087import org.apache.hadoop.hbase.testclassification.LargeTests; 088import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 089import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.FSUtils; 092import org.apache.hadoop.hbase.util.ReflectionUtils; 093import org.apache.hadoop.hdfs.DistributedFileSystem; 094import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 095import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 096import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 097import org.apache.hadoop.io.NullWritable; 098import org.apache.hadoop.mapreduce.Job; 099import org.apache.hadoop.mapreduce.Mapper; 100import org.apache.hadoop.mapreduce.RecordWriter; 101import org.apache.hadoop.mapreduce.TaskAttemptContext; 102import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 103import org.junit.ClassRule; 104import org.junit.Ignore; 105import org.junit.Test; 106import org.junit.experimental.categories.Category; 107import org.mockito.Mockito; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111/** 112 * Simple test for {@link HFileOutputFormat2}. 113 * Sets up and runs a mapreduce job that writes hfile output. 114 * Creates a few inner classes to implement splits and an inputformat that 115 * emits keys and values like those of {@link PerformanceEvaluation}. 116 */ 117@Category({VerySlowMapReduceTests.class, LargeTests.class}) 118//TODO : Remove this in 3.0 119public class TestHFileOutputFormat2 { 120 121 @ClassRule 122 public static final HBaseClassTestRule CLASS_RULE = 123 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 124 125 private final static int ROWSPERSPLIT = 1024; 126 127 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 128 private static final byte[][] FAMILIES = { 129 Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))}; 130 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", 131 "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); 132 133 private HBaseTestingUtility util = new HBaseTestingUtility(); 134 135 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 136 137 /** 138 * Simple mapper that makes KeyValue output. 139 */ 140 static class RandomKVGeneratingMapper 141 extends Mapper<NullWritable, NullWritable, 142 ImmutableBytesWritable, Cell> { 143 144 private int keyLength; 145 private static final int KEYLEN_DEFAULT=10; 146 private static final String KEYLEN_CONF="randomkv.key.length"; 147 148 private int valLength; 149 private static final int VALLEN_DEFAULT=10; 150 private static final String VALLEN_CONF="randomkv.val.length"; 151 private static final byte [] QUALIFIER = Bytes.toBytes("data"); 152 private boolean multiTableMapper = false; 153 private TableName[] tables = null; 154 155 156 @Override 157 protected void setup(Context context) throws IOException, 158 InterruptedException { 159 super.setup(context); 160 161 Configuration conf = context.getConfiguration(); 162 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 163 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 164 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 165 false); 166 if (multiTableMapper) { 167 tables = TABLE_NAMES; 168 } else { 169 tables = new TableName[]{TABLE_NAMES[0]}; 170 } 171 } 172 173 @Override 174 protected void map( 175 NullWritable n1, NullWritable n2, 176 Mapper<NullWritable, NullWritable, 177 ImmutableBytesWritable,Cell>.Context context) 178 throws java.io.IOException ,InterruptedException 179 { 180 181 byte keyBytes[] = new byte[keyLength]; 182 byte valBytes[] = new byte[valLength]; 183 184 int taskId = context.getTaskAttemptID().getTaskID().getId(); 185 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 186 Random random = new Random(); 187 byte[] key; 188 for (int j = 0; j < tables.length; ++j) { 189 for (int i = 0; i < ROWSPERSPLIT; i++) { 190 random.nextBytes(keyBytes); 191 // Ensure that unique tasks generate unique keys 192 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 193 random.nextBytes(valBytes); 194 key = keyBytes; 195 if (multiTableMapper) { 196 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 197 } 198 199 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 200 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 201 context.write(new ImmutableBytesWritable(key), kv); 202 } 203 } 204 } 205 } 206 } 207 208 /** 209 * Simple mapper that makes Put output. 210 */ 211 static class RandomPutGeneratingMapper 212 extends Mapper<NullWritable, NullWritable, 213 ImmutableBytesWritable, Put> { 214 215 private int keyLength; 216 private static final int KEYLEN_DEFAULT = 10; 217 private static final String KEYLEN_CONF = "randomkv.key.length"; 218 219 private int valLength; 220 private static final int VALLEN_DEFAULT = 10; 221 private static final String VALLEN_CONF = "randomkv.val.length"; 222 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 223 private boolean multiTableMapper = false; 224 private TableName[] tables = null; 225 226 @Override 227 protected void setup(Context context) throws IOException, 228 InterruptedException { 229 super.setup(context); 230 231 Configuration conf = context.getConfiguration(); 232 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 233 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 234 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 235 false); 236 if (multiTableMapper) { 237 tables = TABLE_NAMES; 238 } else { 239 tables = new TableName[]{TABLE_NAMES[0]}; 240 } 241 } 242 243 @Override 244 protected void map( 245 NullWritable n1, NullWritable n2, 246 Mapper<NullWritable, NullWritable, 247 ImmutableBytesWritable, Put>.Context context) 248 throws java.io.IOException, InterruptedException { 249 250 byte keyBytes[] = new byte[keyLength]; 251 byte valBytes[] = new byte[valLength]; 252 253 int taskId = context.getTaskAttemptID().getTaskID().getId(); 254 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 255 256 Random random = new Random(); 257 byte[] key; 258 for (int j = 0; j < tables.length; ++j) { 259 for (int i = 0; i < ROWSPERSPLIT; i++) { 260 random.nextBytes(keyBytes); 261 // Ensure that unique tasks generate unique keys 262 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 263 random.nextBytes(valBytes); 264 key = keyBytes; 265 if (multiTableMapper) { 266 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 267 } 268 269 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 270 Put p = new Put(keyBytes); 271 p.addColumn(family, QUALIFIER, valBytes); 272 // set TTL to very low so that the scan does not return any value 273 p.setTTL(1l); 274 context.write(new ImmutableBytesWritable(key), p); 275 } 276 } 277 } 278 } 279 } 280 281 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 282 if (putSortReducer) { 283 job.setInputFormatClass(NMapInputFormat.class); 284 job.setMapperClass(RandomPutGeneratingMapper.class); 285 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 286 job.setMapOutputValueClass(Put.class); 287 } else { 288 job.setInputFormatClass(NMapInputFormat.class); 289 job.setMapperClass(RandomKVGeneratingMapper.class); 290 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 291 job.setMapOutputValueClass(KeyValue.class); 292 } 293 } 294 295 /** 296 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if 297 * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. 298 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 299 */ 300 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 301 public void test_LATEST_TIMESTAMP_isReplaced() 302 throws Exception { 303 Configuration conf = new Configuration(this.util.getConfiguration()); 304 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 305 TaskAttemptContext context = null; 306 Path dir = 307 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 308 try { 309 Job job = new Job(conf); 310 FileOutputFormat.setOutputPath(job, dir); 311 context = createTestTaskAttemptContext(job); 312 HFileOutputFormat2 hof = new HFileOutputFormat2(); 313 writer = hof.getRecordWriter(context); 314 final byte [] b = Bytes.toBytes("b"); 315 316 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 317 // changed by call to write. Check all in kv is same but ts. 318 KeyValue kv = new KeyValue(b, b, b); 319 KeyValue original = kv.clone(); 320 writer.write(new ImmutableBytesWritable(), kv); 321 assertFalse(original.equals(kv)); 322 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 323 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 324 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 325 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 326 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 327 328 // Test 2. Now test passing a kv that has explicit ts. It should not be 329 // changed by call to record write. 330 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 331 original = kv.clone(); 332 writer.write(new ImmutableBytesWritable(), kv); 333 assertTrue(original.equals(kv)); 334 } finally { 335 if (writer != null && context != null) writer.close(context); 336 dir.getFileSystem(conf).delete(dir, true); 337 } 338 } 339 340 private TaskAttemptContext createTestTaskAttemptContext(final Job job) 341 throws Exception { 342 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 343 TaskAttemptContext context = hadoop.createTestTaskAttemptContext( 344 job, "attempt_201402131733_0001_m_000000_0"); 345 return context; 346 } 347 348 /* 349 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE 350 * metadata used by time-restricted scans. 351 */ 352 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 353 public void test_TIMERANGE() throws Exception { 354 Configuration conf = new Configuration(this.util.getConfiguration()); 355 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 356 TaskAttemptContext context = null; 357 Path dir = 358 util.getDataTestDir("test_TIMERANGE_present"); 359 LOG.info("Timerange dir writing to dir: "+ dir); 360 try { 361 // build a record writer using HFileOutputFormat2 362 Job job = new Job(conf); 363 FileOutputFormat.setOutputPath(job, dir); 364 context = createTestTaskAttemptContext(job); 365 HFileOutputFormat2 hof = new HFileOutputFormat2(); 366 writer = hof.getRecordWriter(context); 367 368 // Pass two key values with explicit times stamps 369 final byte [] b = Bytes.toBytes("b"); 370 371 // value 1 with timestamp 2000 372 KeyValue kv = new KeyValue(b, b, b, 2000, b); 373 KeyValue original = kv.clone(); 374 writer.write(new ImmutableBytesWritable(), kv); 375 assertEquals(original,kv); 376 377 // value 2 with timestamp 1000 378 kv = new KeyValue(b, b, b, 1000, b); 379 original = kv.clone(); 380 writer.write(new ImmutableBytesWritable(), kv); 381 assertEquals(original, kv); 382 383 // verify that the file has the proper FileInfo. 384 writer.close(context); 385 386 // the generated file lives 1 directory down from the attempt directory 387 // and is the only file, e.g. 388 // _attempt__0000_r_000000_0/b/1979617994050536795 389 FileSystem fs = FileSystem.get(conf); 390 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 391 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 392 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 393 394 // open as HFile Reader and pull out TIMERANGE FileInfo. 395 HFile.Reader rd = 396 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 397 Map<byte[],byte[]> finfo = rd.loadFileInfo(); 398 byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); 399 assertNotNull(range); 400 401 // unmarshall and check values. 402 TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range); 403 LOG.info(timeRangeTracker.getMin() + 404 "...." + timeRangeTracker.getMax()); 405 assertEquals(1000, timeRangeTracker.getMin()); 406 assertEquals(2000, timeRangeTracker.getMax()); 407 rd.close(); 408 } finally { 409 if (writer != null && context != null) writer.close(context); 410 dir.getFileSystem(conf).delete(dir, true); 411 } 412 } 413 414 /** 415 * Run small MR job. 416 */ 417 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 418 public void testWritingPEData() throws Exception { 419 Configuration conf = util.getConfiguration(); 420 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 421 FileSystem fs = testDir.getFileSystem(conf); 422 423 // Set down this value or we OOME in eclipse. 424 conf.setInt("mapreduce.task.io.sort.mb", 20); 425 // Write a few files. 426 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); 427 428 Job job = new Job(conf, "testWritingPEData"); 429 setupRandomGeneratorMapper(job, false); 430 // This partitioner doesn't work well for number keys but using it anyways 431 // just to demonstrate how to configure it. 432 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 433 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 434 435 Arrays.fill(startKey, (byte)0); 436 Arrays.fill(endKey, (byte)0xff); 437 438 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 439 // Set start and end rows for partitioner. 440 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 441 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 442 job.setReducerClass(KeyValueSortReducer.class); 443 job.setOutputFormatClass(HFileOutputFormat2.class); 444 job.setNumReduceTasks(4); 445 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 446 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 447 KeyValueSerialization.class.getName()); 448 449 FileOutputFormat.setOutputPath(job, testDir); 450 assertTrue(job.waitForCompletion(false)); 451 FileStatus [] files = fs.listStatus(testDir); 452 assertTrue(files.length > 0); 453 } 454 455 /** 456 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into 457 * hfile. 458 */ 459 @Test 460 public void test_WritingTagData() 461 throws Exception { 462 Configuration conf = new Configuration(this.util.getConfiguration()); 463 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 464 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 465 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 466 TaskAttemptContext context = null; 467 Path dir = 468 util.getDataTestDir("WritingTagData"); 469 try { 470 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 471 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 472 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 473 Job job = new Job(conf); 474 FileOutputFormat.setOutputPath(job, dir); 475 context = createTestTaskAttemptContext(job); 476 HFileOutputFormat2 hof = new HFileOutputFormat2(); 477 writer = hof.getRecordWriter(context); 478 final byte [] b = Bytes.toBytes("b"); 479 480 List< Tag > tags = new ArrayList<>(); 481 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 482 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 483 writer.write(new ImmutableBytesWritable(), kv); 484 writer.close(context); 485 writer = null; 486 FileSystem fs = dir.getFileSystem(conf); 487 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 488 while(iterator.hasNext()) { 489 LocatedFileStatus keyFileStatus = iterator.next(); 490 HFile.Reader reader = 491 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 492 HFileScanner scanner = reader.getScanner(false, false, false); 493 scanner.seekTo(); 494 Cell cell = scanner.getCell(); 495 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 496 assertTrue(tagsFromCell.size() > 0); 497 for (Tag tag : tagsFromCell) { 498 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 499 } 500 } 501 } finally { 502 if (writer != null && context != null) writer.close(context); 503 dir.getFileSystem(conf).delete(dir, true); 504 } 505 } 506 507 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 508 public void testJobConfiguration() throws Exception { 509 Configuration conf = new Configuration(this.util.getConfiguration()); 510 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration") 511 .toString()); 512 Job job = new Job(conf); 513 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 514 Table table = Mockito.mock(Table.class); 515 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 516 setupMockStartKeys(regionLocator); 517 setupMockTableName(regionLocator); 518 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 519 assertEquals(job.getNumReduceTasks(), 4); 520 } 521 522 private byte [][] generateRandomStartKeys(int numKeys) { 523 Random random = new Random(); 524 byte[][] ret = new byte[numKeys][]; 525 // first region start key is always empty 526 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 527 for (int i = 1; i < numKeys; i++) { 528 ret[i] = 529 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 530 } 531 return ret; 532 } 533 534 private byte[][] generateRandomSplitKeys(int numKeys) { 535 Random random = new Random(); 536 byte[][] ret = new byte[numKeys][]; 537 for (int i = 0; i < numKeys; i++) { 538 ret[i] = 539 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 540 } 541 return ret; 542 } 543 544 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 545 public void testMRIncrementalLoad() throws Exception { 546 LOG.info("\nStarting test testMRIncrementalLoad\n"); 547 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 548 } 549 550 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 551 public void testMRIncrementalLoadWithSplit() throws Exception { 552 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 553 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 554 } 555 556 /** 557 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true 558 * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY 559 * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports), 560 * it's not possible to check the region locality by comparing region locations and DN hostnames. 561 * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does), 562 * we could test region locality features more easily. 563 */ 564 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 565 public void testMRIncrementalLoadWithLocality() throws Exception { 566 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 567 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 568 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 569 } 570 571 //@Ignore("Wahtevs") 572 @Test 573 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 574 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 575 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 576 } 577 578 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 579 boolean putSortReducer, String tableStr) throws Exception { 580 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 581 Arrays.asList(tableStr)); 582 } 583 584 @Test 585 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 586 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 587 doIncrementalLoadTest(false, false, true, 588 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList 589 ())); 590 } 591 592 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 593 boolean putSortReducer, List<String> tableStr) throws Exception { 594 util = new HBaseTestingUtility(); 595 Configuration conf = util.getConfiguration(); 596 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 597 int hostCount = 1; 598 int regionNum = 5; 599 if (shouldKeepLocality) { 600 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 601 // explicit hostnames parameter just like MiniDFSCluster does. 602 hostCount = 3; 603 regionNum = 20; 604 } 605 606 String[] hostnames = new String[hostCount]; 607 for (int i = 0; i < hostCount; ++i) { 608 hostnames[i] = "datanode_" + i; 609 } 610 util.startMiniCluster(1, hostCount, hostnames); 611 612 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 613 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 614 boolean writeMultipleTables = tableStr.size() > 1; 615 for (String tableStrSingle : tableStr) { 616 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 617 TableName tableName = TableName.valueOf(tableStrSingle); 618 Table table = util.createTable(tableName, FAMILIES, splitKeys); 619 620 RegionLocator r = util.getConnection().getRegionLocator(tableName); 621 assertEquals("Should start with empty table", 0, util.countRows(table)); 622 int numRegions = r.getStartKeys().length; 623 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 624 625 allTables.put(tableStrSingle, table); 626 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 627 } 628 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 629 // Generate the bulk load files 630 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 631 632 for (Table tableSingle : allTables.values()) { 633 // This doesn't write into the table, just makes files 634 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 635 } 636 int numTableDirs = 0; 637 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 638 Path tablePath = testDir; 639 640 if (writeMultipleTables) { 641 if (allTables.containsKey(tf.getPath().getName())) { 642 ++numTableDirs; 643 tablePath = tf.getPath(); 644 } 645 else { 646 continue; 647 } 648 } 649 650 // Make sure that a directory was created for every CF 651 int dir = 0; 652 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 653 for (byte[] family : FAMILIES) { 654 if (Bytes.toString(family).equals(f.getPath().getName())) { 655 ++dir; 656 } 657 } 658 } 659 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 660 } 661 if (writeMultipleTables) { 662 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 663 } 664 665 Admin admin = util.getConnection().getAdmin(); 666 try { 667 // handle the split case 668 if (shouldChangeRegions) { 669 Table chosenTable = allTables.values().iterator().next(); 670 // Choose a semi-random table if multiple tables are available 671 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 672 admin.disableTable(chosenTable.getName()); 673 util.waitUntilNoRegionsInTransition(); 674 675 util.deleteTable(chosenTable.getName()); 676 byte[][] newSplitKeys = generateRandomSplitKeys(14); 677 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 678 679 while (util.getConnection().getRegionLocator(chosenTable.getName()) 680 .getAllRegionLocations().size() != 15 || 681 !admin.isTableAvailable(table.getName())) { 682 Thread.sleep(200); 683 LOG.info("Waiting for new region assignment to happen"); 684 } 685 } 686 687 // Perform the actual load 688 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 689 Path tableDir = testDir; 690 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 691 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 692 if (writeMultipleTables) { 693 tableDir = new Path(testDir, tableNameStr); 694 } 695 Table currentTable = allTables.get(tableNameStr); 696 TableName currentTableName = currentTable.getName(); 697 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo 698 .getRegionLocator()); 699 700 // Ensure data shows up 701 int expectedRows = 0; 702 if (putSortReducer) { 703 // no rows should be extracted 704 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 705 util.countRows(currentTable)); 706 } else { 707 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 708 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 709 util.countRows(currentTable)); 710 Scan scan = new Scan(); 711 ResultScanner results = currentTable.getScanner(scan); 712 for (Result res : results) { 713 assertEquals(FAMILIES.length, res.rawCells().length); 714 Cell first = res.rawCells()[0]; 715 for (Cell kv : res.rawCells()) { 716 assertTrue(CellUtil.matchingRows(first, kv)); 717 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 718 } 719 } 720 results.close(); 721 } 722 String tableDigestBefore = util.checksumRows(currentTable); 723 // Check region locality 724 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 725 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 726 hbd.add(region.getHDFSBlocksDistribution()); 727 } 728 for (String hostname : hostnames) { 729 float locality = hbd.getBlockLocalityIndex(hostname); 730 LOG.info("locality of [" + hostname + "]: " + locality); 731 assertEquals(100, (int) (locality * 100)); 732 } 733 734 // Cause regions to reopen 735 admin.disableTable(currentTableName); 736 while (!admin.isTableDisabled(currentTableName)) { 737 Thread.sleep(200); 738 LOG.info("Waiting for table to disable"); 739 } 740 admin.enableTable(currentTableName); 741 util.waitTableAvailable(currentTableName); 742 assertEquals("Data should remain after reopening of regions", 743 tableDigestBefore, util.checksumRows(currentTable)); 744 } 745 } finally { 746 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 747 tableInfoSingle.getRegionLocator().close(); 748 } 749 for (Entry<String, Table> singleTable : allTables.entrySet() ) { 750 singleTable.getValue().close(); 751 util.deleteTable(singleTable.getValue().getName()); 752 } 753 testDir.getFileSystem(conf).delete(testDir, true); 754 util.shutdownMiniCluster(); 755 } 756 } 757 758 private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, 759 boolean putSortReducer) throws IOException, 760 InterruptedException, ClassNotFoundException { 761 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 762 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 763 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 764 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 765 KeyValueSerialization.class.getName()); 766 setupRandomGeneratorMapper(job, putSortReducer); 767 if (tableInfo.size() > 1) { 768 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 769 int sum = 0; 770 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 771 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 772 } 773 assertEquals(sum, job.getNumReduceTasks()); 774 } 775 else { 776 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 777 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 778 regionLocator); 779 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 780 } 781 782 FileOutputFormat.setOutputPath(job, outDir); 783 784 assertFalse(util.getTestFileSystem().exists(outDir)) ; 785 786 assertTrue(job.waitForCompletion(true)); 787 } 788 789 /** 790 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. 791 * Tests that the family compression map is correctly serialized into 792 * and deserialized from configuration 793 * 794 * @throws IOException 795 */ 796 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 797 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 798 for (int numCfs = 0; numCfs <= 3; numCfs++) { 799 Configuration conf = new Configuration(this.util.getConfiguration()); 800 Map<String, Compression.Algorithm> familyToCompression = 801 getMockColumnFamiliesForCompression(numCfs); 802 Table table = Mockito.mock(Table.class); 803 setupMockColumnFamiliesForCompression(table, familyToCompression); 804 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 805 HFileOutputFormat2.serializeColumnFamilyAttribute 806 (HFileOutputFormat2.compressionDetails, 807 Arrays.asList(table.getTableDescriptor()))); 808 809 // read back family specific compression setting from the configuration 810 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2 811 .createFamilyCompressionMap(conf); 812 813 // test that we have a value for all column families that matches with the 814 // used mock values 815 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 816 assertEquals("Compression configuration incorrect for column family:" 817 + entry.getKey(), entry.getValue(), 818 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 819 } 820 } 821 } 822 823 private void setupMockColumnFamiliesForCompression(Table table, 824 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 825 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 826 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 827 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 828 .setMaxVersions(1) 829 .setCompressionType(entry.getValue()) 830 .setBlockCacheEnabled(false) 831 .setTimeToLive(0)); 832 } 833 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 834 } 835 836 /** 837 * @return a map from column family names to compression algorithms for 838 * testing column family compression. Column family names have special characters 839 */ 840 private Map<String, Compression.Algorithm> 841 getMockColumnFamiliesForCompression (int numCfs) { 842 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 843 // use column family names having special characters 844 if (numCfs-- > 0) { 845 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 846 } 847 if (numCfs-- > 0) { 848 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 849 } 850 if (numCfs-- > 0) { 851 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 852 } 853 if (numCfs-- > 0) { 854 familyToCompression.put("Family3", Compression.Algorithm.NONE); 855 } 856 return familyToCompression; 857 } 858 859 860 /** 861 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. 862 * Tests that the family bloom type map is correctly serialized into 863 * and deserialized from configuration 864 * 865 * @throws IOException 866 */ 867 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 868 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 869 for (int numCfs = 0; numCfs <= 2; numCfs++) { 870 Configuration conf = new Configuration(this.util.getConfiguration()); 871 Map<String, BloomType> familyToBloomType = 872 getMockColumnFamiliesForBloomType(numCfs); 873 Table table = Mockito.mock(Table.class); 874 setupMockColumnFamiliesForBloomType(table, 875 familyToBloomType); 876 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 877 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 878 Arrays.asList(table.getTableDescriptor()))); 879 880 // read back family specific data block encoding settings from the 881 // configuration 882 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 883 HFileOutputFormat2 884 .createFamilyBloomTypeMap(conf); 885 886 // test that we have a value for all column families that matches with the 887 // used mock values 888 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 889 assertEquals("BloomType configuration incorrect for column family:" 890 + entry.getKey(), entry.getValue(), 891 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 892 } 893 } 894 } 895 896 private void setupMockColumnFamiliesForBloomType(Table table, 897 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 898 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 899 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 900 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 901 .setMaxVersions(1) 902 .setBloomFilterType(entry.getValue()) 903 .setBlockCacheEnabled(false) 904 .setTimeToLive(0)); 905 } 906 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 907 } 908 909 /** 910 * @return a map from column family names to compression algorithms for 911 * testing column family compression. Column family names have special characters 912 */ 913 private Map<String, BloomType> 914 getMockColumnFamiliesForBloomType (int numCfs) { 915 Map<String, BloomType> familyToBloomType = new HashMap<>(); 916 // use column family names having special characters 917 if (numCfs-- > 0) { 918 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 919 } 920 if (numCfs-- > 0) { 921 familyToBloomType.put("Family2=asdads&!AASD", 922 BloomType.ROWCOL); 923 } 924 if (numCfs-- > 0) { 925 familyToBloomType.put("Family3", BloomType.NONE); 926 } 927 return familyToBloomType; 928 } 929 930 /** 931 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. 932 * Tests that the family block size map is correctly serialized into 933 * and deserialized from configuration 934 * 935 * @throws IOException 936 */ 937 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 938 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 939 for (int numCfs = 0; numCfs <= 3; numCfs++) { 940 Configuration conf = new Configuration(this.util.getConfiguration()); 941 Map<String, Integer> familyToBlockSize = 942 getMockColumnFamiliesForBlockSize(numCfs); 943 Table table = Mockito.mock(Table.class); 944 setupMockColumnFamiliesForBlockSize(table, 945 familyToBlockSize); 946 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 947 HFileOutputFormat2.serializeColumnFamilyAttribute 948 (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table 949 .getTableDescriptor()))); 950 951 // read back family specific data block encoding settings from the 952 // configuration 953 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 954 HFileOutputFormat2 955 .createFamilyBlockSizeMap(conf); 956 957 // test that we have a value for all column families that matches with the 958 // used mock values 959 for (Entry<String, Integer> entry : familyToBlockSize.entrySet() 960 ) { 961 assertEquals("BlockSize configuration incorrect for column family:" 962 + entry.getKey(), entry.getValue(), 963 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 964 } 965 } 966 } 967 968 private void setupMockColumnFamiliesForBlockSize(Table table, 969 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 970 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 971 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 972 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 973 .setMaxVersions(1) 974 .setBlocksize(entry.getValue()) 975 .setBlockCacheEnabled(false) 976 .setTimeToLive(0)); 977 } 978 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 979 } 980 981 /** 982 * @return a map from column family names to compression algorithms for 983 * testing column family compression. Column family names have special characters 984 */ 985 private Map<String, Integer> 986 getMockColumnFamiliesForBlockSize (int numCfs) { 987 Map<String, Integer> familyToBlockSize = new HashMap<>(); 988 // use column family names having special characters 989 if (numCfs-- > 0) { 990 familyToBlockSize.put("Family1!@#!@#&", 1234); 991 } 992 if (numCfs-- > 0) { 993 familyToBlockSize.put("Family2=asdads&!AASD", 994 Integer.MAX_VALUE); 995 } 996 if (numCfs-- > 0) { 997 familyToBlockSize.put("Family2=asdads&!AASD", 998 Integer.MAX_VALUE); 999 } 1000 if (numCfs-- > 0) { 1001 familyToBlockSize.put("Family3", 0); 1002 } 1003 return familyToBlockSize; 1004 } 1005 1006 /** 1007 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. 1008 * Tests that the family data block encoding map is correctly serialized into 1009 * and deserialized from configuration 1010 * 1011 * @throws IOException 1012 */ 1013 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1014 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1015 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1016 Configuration conf = new Configuration(this.util.getConfiguration()); 1017 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1018 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1019 Table table = Mockito.mock(Table.class); 1020 setupMockColumnFamiliesForDataBlockEncoding(table, 1021 familyToDataBlockEncoding); 1022 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1023 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1024 HFileOutputFormat2.serializeColumnFamilyAttribute 1025 (HFileOutputFormat2.dataBlockEncodingDetails, Arrays 1026 .asList(tableDescriptor))); 1027 1028 // read back family specific data block encoding settings from the 1029 // configuration 1030 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1031 HFileOutputFormat2 1032 .createFamilyDataBlockEncodingMap(conf); 1033 1034 // test that we have a value for all column families that matches with the 1035 // used mock values 1036 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1037 assertEquals("DataBlockEncoding configuration incorrect for column family:" 1038 + entry.getKey(), entry.getValue(), 1039 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1040 } 1041 } 1042 } 1043 1044 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1045 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1046 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1047 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1048 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 1049 .setMaxVersions(1) 1050 .setDataBlockEncoding(entry.getValue()) 1051 .setBlockCacheEnabled(false) 1052 .setTimeToLive(0)); 1053 } 1054 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1055 } 1056 1057 /** 1058 * @return a map from column family names to compression algorithms for 1059 * testing column family compression. Column family names have special characters 1060 */ 1061 private Map<String, DataBlockEncoding> 1062 getMockColumnFamiliesForDataBlockEncoding (int numCfs) { 1063 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1064 // use column family names having special characters 1065 if (numCfs-- > 0) { 1066 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1067 } 1068 if (numCfs-- > 0) { 1069 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1070 DataBlockEncoding.FAST_DIFF); 1071 } 1072 if (numCfs-- > 0) { 1073 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1074 DataBlockEncoding.PREFIX); 1075 } 1076 if (numCfs-- > 0) { 1077 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1078 } 1079 return familyToDataBlockEncoding; 1080 } 1081 1082 private void setupMockStartKeys(RegionLocator table) throws IOException { 1083 byte[][] mockKeys = new byte[][] { 1084 HConstants.EMPTY_BYTE_ARRAY, 1085 Bytes.toBytes("aaa"), 1086 Bytes.toBytes("ggg"), 1087 Bytes.toBytes("zzz") 1088 }; 1089 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1090 } 1091 1092 private void setupMockTableName(RegionLocator table) throws IOException { 1093 TableName mockTableName = TableName.valueOf("mock_table"); 1094 Mockito.doReturn(mockTableName).when(table).getName(); 1095 } 1096 1097 /** 1098 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and 1099 * bloom filter settings from the column family descriptor 1100 */ 1101 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1102 public void testColumnFamilySettings() throws Exception { 1103 Configuration conf = new Configuration(this.util.getConfiguration()); 1104 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1105 TaskAttemptContext context = null; 1106 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1107 1108 // Setup table descriptor 1109 Table table = Mockito.mock(Table.class); 1110 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1111 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1112 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1113 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { 1114 htd.addFamily(hcd); 1115 } 1116 1117 // set up the table to return some mock keys 1118 setupMockStartKeys(regionLocator); 1119 1120 try { 1121 // partial map red setup to get an operational writer for testing 1122 // We turn off the sequence file compression, because DefaultCodec 1123 // pollutes the GZip codec pool with an incompatible compressor. 1124 conf.set("io.seqfile.compression.type", "NONE"); 1125 conf.set("hbase.fs.tmp.dir", dir.toString()); 1126 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1127 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1128 1129 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1130 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1131 setupRandomGeneratorMapper(job, false); 1132 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1133 FileOutputFormat.setOutputPath(job, dir); 1134 context = createTestTaskAttemptContext(job); 1135 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1136 writer = hof.getRecordWriter(context); 1137 1138 // write out random rows 1139 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1140 writer.close(context); 1141 1142 // Make sure that a directory was created for every CF 1143 FileSystem fs = dir.getFileSystem(conf); 1144 1145 // commit so that the filesystem has one directory per column family 1146 hof.getOutputCommitter(context).commitTask(context); 1147 hof.getOutputCommitter(context).commitJob(context); 1148 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1149 assertEquals(htd.getFamilies().size(), families.length); 1150 for (FileStatus f : families) { 1151 String familyStr = f.getPath().getName(); 1152 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1153 // verify that the compression on this file matches the configured 1154 // compression 1155 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1156 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1157 Map<byte[], byte[]> fileInfo = reader.loadFileInfo(); 1158 1159 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1160 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1161 assertEquals("Incorrect bloom filter used for column family " + familyStr + 1162 "(reader: " + reader + ")", 1163 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1164 assertEquals("Incorrect compression used for column family " + familyStr + 1165 "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression()); 1166 } 1167 } finally { 1168 dir.getFileSystem(conf).delete(dir, true); 1169 } 1170 } 1171 1172 /** 1173 * Write random values to the writer assuming a table created using 1174 * {@link #FAMILIES} as column family descriptors 1175 */ 1176 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1177 TaskAttemptContext context, Set<byte[]> families, int numRows) 1178 throws IOException, InterruptedException { 1179 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1180 int valLength = 10; 1181 byte valBytes[] = new byte[valLength]; 1182 1183 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1184 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1185 final byte [] qualifier = Bytes.toBytes("data"); 1186 Random random = new Random(); 1187 for (int i = 0; i < numRows; i++) { 1188 1189 Bytes.putInt(keyBytes, 0, i); 1190 random.nextBytes(valBytes); 1191 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1192 1193 for (byte[] family : families) { 1194 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1195 writer.write(key, kv); 1196 } 1197 } 1198 } 1199 1200 /** 1201 * This test is to test the scenario happened in HBASE-6901. 1202 * All files are bulk loaded and excluded from minor compaction. 1203 * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException 1204 * will be thrown. 1205 */ 1206 @Ignore ("Flakey: See HBASE-9051") @Test 1207 public void testExcludeAllFromMinorCompaction() throws Exception { 1208 Configuration conf = util.getConfiguration(); 1209 conf.setInt("hbase.hstore.compaction.min", 2); 1210 generateRandomStartKeys(5); 1211 1212 util.startMiniCluster(); 1213 try (Connection conn = ConnectionFactory.createConnection(); 1214 Admin admin = conn.getAdmin(); 1215 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1216 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1217 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1218 assertEquals("Should start with empty table", 0, util.countRows(table)); 1219 1220 // deep inspection: get the StoreFile dir 1221 final Path storePath = new Path( 1222 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), 1223 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1224 Bytes.toString(FAMILIES[0]))); 1225 assertEquals(0, fs.listStatus(storePath).length); 1226 1227 // Generate two bulk load files 1228 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1229 true); 1230 1231 for (int i = 0; i < 2; i++) { 1232 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1233 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1234 .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); 1235 // Perform the actual load 1236 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1237 } 1238 1239 // Ensure data shows up 1240 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1241 assertEquals("LoadIncrementalHFiles should put expected data in table", 1242 expectedRows, util.countRows(table)); 1243 1244 // should have a second StoreFile now 1245 assertEquals(2, fs.listStatus(storePath).length); 1246 1247 // minor compactions shouldn't get rid of the file 1248 admin.compact(TABLE_NAMES[0]); 1249 try { 1250 quickPoll(new Callable<Boolean>() { 1251 @Override 1252 public Boolean call() throws Exception { 1253 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1254 for (HRegion region : regions) { 1255 for (HStore store : region.getStores()) { 1256 store.closeAndArchiveCompactedFiles(); 1257 } 1258 } 1259 return fs.listStatus(storePath).length == 1; 1260 } 1261 }, 5000); 1262 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1263 } catch (AssertionError ae) { 1264 // this is expected behavior 1265 } 1266 1267 // a major compaction should work though 1268 admin.majorCompact(TABLE_NAMES[0]); 1269 quickPoll(new Callable<Boolean>() { 1270 @Override 1271 public Boolean call() throws Exception { 1272 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1273 for (HRegion region : regions) { 1274 for (HStore store : region.getStores()) { 1275 store.closeAndArchiveCompactedFiles(); 1276 } 1277 } 1278 return fs.listStatus(storePath).length == 1; 1279 } 1280 }, 5000); 1281 1282 } finally { 1283 util.shutdownMiniCluster(); 1284 } 1285 } 1286 1287 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1288 public void testExcludeMinorCompaction() throws Exception { 1289 Configuration conf = util.getConfiguration(); 1290 conf.setInt("hbase.hstore.compaction.min", 2); 1291 generateRandomStartKeys(5); 1292 1293 util.startMiniCluster(); 1294 try (Connection conn = ConnectionFactory.createConnection(conf); 1295 Admin admin = conn.getAdmin()){ 1296 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1297 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1298 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1299 assertEquals("Should start with empty table", 0, util.countRows(table)); 1300 1301 // deep inspection: get the StoreFile dir 1302 final Path storePath = new Path( 1303 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), 1304 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1305 Bytes.toString(FAMILIES[0]))); 1306 assertEquals(0, fs.listStatus(storePath).length); 1307 1308 // put some data in it and flush to create a storefile 1309 Put p = new Put(Bytes.toBytes("test")); 1310 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1311 table.put(p); 1312 admin.flush(TABLE_NAMES[0]); 1313 assertEquals(1, util.countRows(table)); 1314 quickPoll(new Callable<Boolean>() { 1315 @Override 1316 public Boolean call() throws Exception { 1317 return fs.listStatus(storePath).length == 1; 1318 } 1319 }, 5000); 1320 1321 // Generate a bulk load file with more rows 1322 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1323 true); 1324 1325 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1326 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1327 .getTableDescriptor(), regionLocator)), testDir, false); 1328 1329 // Perform the actual load 1330 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1331 1332 // Ensure data shows up 1333 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1334 assertEquals("LoadIncrementalHFiles should put expected data in table", 1335 expectedRows + 1, util.countRows(table)); 1336 1337 // should have a second StoreFile now 1338 assertEquals(2, fs.listStatus(storePath).length); 1339 1340 // minor compactions shouldn't get rid of the file 1341 admin.compact(TABLE_NAMES[0]); 1342 try { 1343 quickPoll(new Callable<Boolean>() { 1344 @Override 1345 public Boolean call() throws Exception { 1346 return fs.listStatus(storePath).length == 1; 1347 } 1348 }, 5000); 1349 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1350 } catch (AssertionError ae) { 1351 // this is expected behavior 1352 } 1353 1354 // a major compaction should work though 1355 admin.majorCompact(TABLE_NAMES[0]); 1356 quickPoll(new Callable<Boolean>() { 1357 @Override 1358 public Boolean call() throws Exception { 1359 return fs.listStatus(storePath).length == 1; 1360 } 1361 }, 5000); 1362 1363 } finally { 1364 util.shutdownMiniCluster(); 1365 } 1366 } 1367 1368 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1369 int sleepMs = 10; 1370 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1371 while (retries-- > 0) { 1372 if (c.call().booleanValue()) { 1373 return; 1374 } 1375 Thread.sleep(sleepMs); 1376 } 1377 fail(); 1378 } 1379 1380 public static void main(String args[]) throws Exception { 1381 new TestHFileOutputFormat2().manualTest(args); 1382 } 1383 1384 public void manualTest(String args[]) throws Exception { 1385 Configuration conf = HBaseConfiguration.create(); 1386 util = new HBaseTestingUtility(conf); 1387 if ("newtable".equals(args[0])) { 1388 TableName tname = TableName.valueOf(args[1]); 1389 byte[][] splitKeys = generateRandomSplitKeys(4); 1390 Table table = util.createTable(tname, FAMILIES, splitKeys); 1391 } else if ("incremental".equals(args[0])) { 1392 TableName tname = TableName.valueOf(args[1]); 1393 try(Connection c = ConnectionFactory.createConnection(conf); 1394 Admin admin = c.getAdmin(); 1395 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1396 Path outDir = new Path("incremental-out"); 1397 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin 1398 .getTableDescriptor(tname), regionLocator)), outDir, false); 1399 } 1400 } else { 1401 throw new RuntimeException( 1402 "usage: TestHFileOutputFormat2 newtable | incremental"); 1403 } 1404 } 1405 1406 @Test 1407 public void testBlockStoragePolicy() throws Exception { 1408 util = new HBaseTestingUtility(); 1409 Configuration conf = util.getConfiguration(); 1410 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1411 1412 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + 1413 Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( 1414 TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); 1415 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1416 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1417 util.startMiniDFSCluster(3); 1418 FileSystem fs = util.getDFSCluster().getFileSystem(); 1419 try { 1420 fs.mkdirs(cf1Dir); 1421 fs.mkdirs(cf2Dir); 1422 1423 // the original block storage policy would be HOT 1424 String spA = getStoragePolicyName(fs, cf1Dir); 1425 String spB = getStoragePolicyName(fs, cf2Dir); 1426 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1427 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1428 assertEquals("HOT", spA); 1429 assertEquals("HOT", spB); 1430 1431 // alter table cf schema to change storage policies 1432 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1433 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1434 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1435 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1436 spA = getStoragePolicyName(fs, cf1Dir); 1437 spB = getStoragePolicyName(fs, cf2Dir); 1438 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1439 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1440 assertNotNull(spA); 1441 assertEquals("ONE_SSD", spA); 1442 assertNotNull(spB); 1443 assertEquals("ALL_SSD", spB); 1444 } finally { 1445 fs.delete(cf1Dir, true); 1446 fs.delete(cf2Dir, true); 1447 util.shutdownMiniDFSCluster(); 1448 } 1449 } 1450 1451 private String getStoragePolicyName(FileSystem fs, Path path) { 1452 try { 1453 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1454 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1455 } catch (Exception e) { 1456 // Maybe fail because of using old HDFS version, try the old way 1457 if (LOG.isTraceEnabled()) { 1458 LOG.trace("Failed to get policy directly", e); 1459 } 1460 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1461 return policy == null ? "HOT" : policy;// HOT by default 1462 } 1463 } 1464 1465 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1466 try { 1467 if (fs instanceof DistributedFileSystem) { 1468 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1469 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1470 if (null != status) { 1471 byte storagePolicyId = status.getStoragePolicy(); 1472 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1473 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1474 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1475 for (BlockStoragePolicy policy : policies) { 1476 if (policy.getId() == storagePolicyId) { 1477 return policy.getName(); 1478 } 1479 } 1480 } 1481 } 1482 } 1483 } catch (Throwable e) { 1484 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1485 } 1486 1487 return null; 1488 } 1489} 1490