001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNotSame; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Random; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.stream.Collectors; 041import java.util.stream.Stream; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.LocatedFileStatus; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.fs.RemoteIterator; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellUtil; 051import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HDFSBlocksDistribution; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.HadoopShims; 060import org.apache.hadoop.hbase.KeyValue; 061import org.apache.hadoop.hbase.PerformanceEvaluation; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.Tag; 064import org.apache.hadoop.hbase.TagType; 065import org.apache.hadoop.hbase.TagUtil; 066import org.apache.hadoop.hbase.client.Admin; 067import org.apache.hadoop.hbase.client.Connection; 068import org.apache.hadoop.hbase.client.ConnectionFactory; 069import org.apache.hadoop.hbase.client.Put; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.Table; 075import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 076import org.apache.hadoop.hbase.io.compress.Compression; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 079import org.apache.hadoop.hbase.io.hfile.CacheConfig; 080import org.apache.hadoop.hbase.io.hfile.HFile; 081import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.regionserver.BloomType; 084import org.apache.hadoop.hbase.regionserver.HRegion; 085import org.apache.hadoop.hbase.regionserver.HStore; 086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 088import org.apache.hadoop.hbase.testclassification.LargeTests; 089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.FSUtils; 093import org.apache.hadoop.hbase.util.ReflectionUtils; 094import org.apache.hadoop.hdfs.DistributedFileSystem; 095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 098import org.apache.hadoop.io.NullWritable; 099import org.apache.hadoop.mapreduce.Job; 100import org.apache.hadoop.mapreduce.Mapper; 101import org.apache.hadoop.mapreduce.RecordWriter; 102import org.apache.hadoop.mapreduce.TaskAttemptContext; 103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 104import org.junit.ClassRule; 105import org.junit.Ignore; 106import org.junit.Test; 107import org.junit.experimental.categories.Category; 108import org.mockito.Mockito; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112/** 113 * Simple test for {@link HFileOutputFormat2}. 114 * Sets up and runs a mapreduce job that writes hfile output. 115 * Creates a few inner classes to implement splits and an inputformat that 116 * emits keys and values like those of {@link PerformanceEvaluation}. 117 */ 118@Category({VerySlowMapReduceTests.class, LargeTests.class}) 119public class TestCellBasedHFileOutputFormat2 { 120 121 @ClassRule 122 public static final HBaseClassTestRule CLASS_RULE = 123 HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class); 124 125 private final static int ROWSPERSPLIT = 1024; 126 127 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 128 private static final byte[][] FAMILIES = { 129 Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))}; 130 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", 131 "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); 132 133 private HBaseTestingUtility util = new HBaseTestingUtility(); 134 135 private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class); 136 137 /** 138 * Simple mapper that makes KeyValue output. 139 */ 140 static class RandomKVGeneratingMapper 141 extends Mapper<NullWritable, NullWritable, 142 ImmutableBytesWritable, Cell> { 143 144 private int keyLength; 145 private static final int KEYLEN_DEFAULT=10; 146 private static final String KEYLEN_CONF="randomkv.key.length"; 147 148 private int valLength; 149 private static final int VALLEN_DEFAULT=10; 150 private static final String VALLEN_CONF="randomkv.val.length"; 151 private static final byte [] QUALIFIER = Bytes.toBytes("data"); 152 private boolean multiTableMapper = false; 153 private TableName[] tables = null; 154 155 156 @Override 157 protected void setup(Context context) throws IOException, 158 InterruptedException { 159 super.setup(context); 160 161 Configuration conf = context.getConfiguration(); 162 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 163 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 164 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 165 false); 166 if (multiTableMapper) { 167 tables = TABLE_NAMES; 168 } else { 169 tables = new TableName[]{TABLE_NAMES[0]}; 170 } 171 } 172 173 @Override 174 protected void map( 175 NullWritable n1, NullWritable n2, 176 Mapper<NullWritable, NullWritable, 177 ImmutableBytesWritable,Cell>.Context context) 178 throws java.io.IOException ,InterruptedException 179 { 180 181 byte keyBytes[] = new byte[keyLength]; 182 byte valBytes[] = new byte[valLength]; 183 184 int taskId = context.getTaskAttemptID().getTaskID().getId(); 185 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 186 Random random = new Random(); 187 byte[] key; 188 for (int j = 0; j < tables.length; ++j) { 189 for (int i = 0; i < ROWSPERSPLIT; i++) { 190 random.nextBytes(keyBytes); 191 // Ensure that unique tasks generate unique keys 192 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 193 random.nextBytes(valBytes); 194 key = keyBytes; 195 if (multiTableMapper) { 196 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 197 } 198 199 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 200 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 201 context.write(new ImmutableBytesWritable(key), kv); 202 } 203 } 204 } 205 } 206 } 207 208 /** 209 * Simple mapper that makes Put output. 210 */ 211 static class RandomPutGeneratingMapper 212 extends Mapper<NullWritable, NullWritable, 213 ImmutableBytesWritable, Put> { 214 215 private int keyLength; 216 private static final int KEYLEN_DEFAULT = 10; 217 private static final String KEYLEN_CONF = "randomkv.key.length"; 218 219 private int valLength; 220 private static final int VALLEN_DEFAULT = 10; 221 private static final String VALLEN_CONF = "randomkv.val.length"; 222 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 223 private boolean multiTableMapper = false; 224 private TableName[] tables = null; 225 226 @Override 227 protected void setup(Context context) throws IOException, 228 InterruptedException { 229 super.setup(context); 230 231 Configuration conf = context.getConfiguration(); 232 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 233 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 234 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 235 false); 236 if (multiTableMapper) { 237 tables = TABLE_NAMES; 238 } else { 239 tables = new TableName[]{TABLE_NAMES[0]}; 240 } 241 } 242 243 @Override 244 protected void map( 245 NullWritable n1, NullWritable n2, 246 Mapper<NullWritable, NullWritable, 247 ImmutableBytesWritable, Put>.Context context) 248 throws java.io.IOException, InterruptedException { 249 250 byte keyBytes[] = new byte[keyLength]; 251 byte valBytes[] = new byte[valLength]; 252 253 int taskId = context.getTaskAttemptID().getTaskID().getId(); 254 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 255 256 Random random = new Random(); 257 byte[] key; 258 for (int j = 0; j < tables.length; ++j) { 259 for (int i = 0; i < ROWSPERSPLIT; i++) { 260 random.nextBytes(keyBytes); 261 // Ensure that unique tasks generate unique keys 262 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 263 random.nextBytes(valBytes); 264 key = keyBytes; 265 if (multiTableMapper) { 266 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 267 } 268 269 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 270 Put p = new Put(keyBytes); 271 p.addColumn(family, QUALIFIER, valBytes); 272 // set TTL to very low so that the scan does not return any value 273 p.setTTL(1l); 274 context.write(new ImmutableBytesWritable(key), p); 275 } 276 } 277 } 278 } 279 } 280 281 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 282 if (putSortReducer) { 283 job.setInputFormatClass(NMapInputFormat.class); 284 job.setMapperClass(RandomPutGeneratingMapper.class); 285 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 286 job.setMapOutputValueClass(Put.class); 287 } else { 288 job.setInputFormatClass(NMapInputFormat.class); 289 job.setMapperClass(RandomKVGeneratingMapper.class); 290 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 291 job.setMapOutputValueClass(KeyValue.class); 292 } 293 } 294 295 /** 296 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if 297 * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. 298 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 299 */ 300 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 301 public void test_LATEST_TIMESTAMP_isReplaced() 302 throws Exception { 303 Configuration conf = new Configuration(this.util.getConfiguration()); 304 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 305 TaskAttemptContext context = null; 306 Path dir = 307 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 308 try { 309 Job job = new Job(conf); 310 FileOutputFormat.setOutputPath(job, dir); 311 context = createTestTaskAttemptContext(job); 312 HFileOutputFormat2 hof = new HFileOutputFormat2(); 313 writer = hof.getRecordWriter(context); 314 final byte [] b = Bytes.toBytes("b"); 315 316 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 317 // changed by call to write. Check all in kv is same but ts. 318 KeyValue kv = new KeyValue(b, b, b); 319 KeyValue original = kv.clone(); 320 writer.write(new ImmutableBytesWritable(), kv); 321 assertFalse(original.equals(kv)); 322 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 323 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 324 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 325 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 326 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 327 328 // Test 2. Now test passing a kv that has explicit ts. It should not be 329 // changed by call to record write. 330 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 331 original = kv.clone(); 332 writer.write(new ImmutableBytesWritable(), kv); 333 assertTrue(original.equals(kv)); 334 } finally { 335 if (writer != null && context != null) writer.close(context); 336 dir.getFileSystem(conf).delete(dir, true); 337 } 338 } 339 340 private TaskAttemptContext createTestTaskAttemptContext(final Job job) 341 throws Exception { 342 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 343 TaskAttemptContext context = hadoop.createTestTaskAttemptContext( 344 job, "attempt_201402131733_0001_m_000000_0"); 345 return context; 346 } 347 348 /* 349 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE 350 * metadata used by time-restricted scans. 351 */ 352 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 353 public void test_TIMERANGE() throws Exception { 354 Configuration conf = new Configuration(this.util.getConfiguration()); 355 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 356 TaskAttemptContext context = null; 357 Path dir = 358 util.getDataTestDir("test_TIMERANGE_present"); 359 LOG.info("Timerange dir writing to dir: "+ dir); 360 try { 361 // build a record writer using HFileOutputFormat2 362 Job job = new Job(conf); 363 FileOutputFormat.setOutputPath(job, dir); 364 context = createTestTaskAttemptContext(job); 365 HFileOutputFormat2 hof = new HFileOutputFormat2(); 366 writer = hof.getRecordWriter(context); 367 368 // Pass two key values with explicit times stamps 369 final byte [] b = Bytes.toBytes("b"); 370 371 // value 1 with timestamp 2000 372 KeyValue kv = new KeyValue(b, b, b, 2000, b); 373 KeyValue original = kv.clone(); 374 writer.write(new ImmutableBytesWritable(), kv); 375 assertEquals(original,kv); 376 377 // value 2 with timestamp 1000 378 kv = new KeyValue(b, b, b, 1000, b); 379 original = kv.clone(); 380 writer.write(new ImmutableBytesWritable(), kv); 381 assertEquals(original, kv); 382 383 // verify that the file has the proper FileInfo. 384 writer.close(context); 385 386 // the generated file lives 1 directory down from the attempt directory 387 // and is the only file, e.g. 388 // _attempt__0000_r_000000_0/b/1979617994050536795 389 FileSystem fs = FileSystem.get(conf); 390 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 391 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 392 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 393 394 // open as HFile Reader and pull out TIMERANGE FileInfo. 395 HFile.Reader rd = 396 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 397 Map<byte[],byte[]> finfo = rd.loadFileInfo(); 398 byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); 399 assertNotNull(range); 400 401 // unmarshall and check values. 402 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 403 LOG.info(timeRangeTracker.getMin() + 404 "...." + timeRangeTracker.getMax()); 405 assertEquals(1000, timeRangeTracker.getMin()); 406 assertEquals(2000, timeRangeTracker.getMax()); 407 rd.close(); 408 } finally { 409 if (writer != null && context != null) writer.close(context); 410 dir.getFileSystem(conf).delete(dir, true); 411 } 412 } 413 414 /** 415 * Run small MR job. 416 */ 417 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 418 public void testWritingPEData() throws Exception { 419 Configuration conf = util.getConfiguration(); 420 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 421 FileSystem fs = testDir.getFileSystem(conf); 422 423 // Set down this value or we OOME in eclipse. 424 conf.setInt("mapreduce.task.io.sort.mb", 20); 425 // Write a few files. 426 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); 427 428 Job job = new Job(conf, "testWritingPEData"); 429 setupRandomGeneratorMapper(job, false); 430 // This partitioner doesn't work well for number keys but using it anyways 431 // just to demonstrate how to configure it. 432 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 433 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 434 435 Arrays.fill(startKey, (byte)0); 436 Arrays.fill(endKey, (byte)0xff); 437 438 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 439 // Set start and end rows for partitioner. 440 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 441 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 442 job.setReducerClass(CellSortReducer.class); 443 job.setOutputFormatClass(HFileOutputFormat2.class); 444 job.setNumReduceTasks(4); 445 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 446 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 447 CellSerialization.class.getName()); 448 449 FileOutputFormat.setOutputPath(job, testDir); 450 assertTrue(job.waitForCompletion(false)); 451 FileStatus [] files = fs.listStatus(testDir); 452 assertTrue(files.length > 0); 453 } 454 455 /** 456 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into 457 * hfile. 458 */ 459 @Test 460 public void test_WritingTagData() 461 throws Exception { 462 Configuration conf = new Configuration(this.util.getConfiguration()); 463 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 464 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 465 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 466 TaskAttemptContext context = null; 467 Path dir = 468 util.getDataTestDir("WritingTagData"); 469 try { 470 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 471 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 472 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 473 Job job = new Job(conf); 474 FileOutputFormat.setOutputPath(job, dir); 475 context = createTestTaskAttemptContext(job); 476 HFileOutputFormat2 hof = new HFileOutputFormat2(); 477 writer = hof.getRecordWriter(context); 478 final byte [] b = Bytes.toBytes("b"); 479 480 List< Tag > tags = new ArrayList<>(); 481 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 482 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 483 writer.write(new ImmutableBytesWritable(), kv); 484 writer.close(context); 485 writer = null; 486 FileSystem fs = dir.getFileSystem(conf); 487 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 488 while(iterator.hasNext()) { 489 LocatedFileStatus keyFileStatus = iterator.next(); 490 HFile.Reader reader = 491 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 492 HFileScanner scanner = reader.getScanner(false, false, false); 493 scanner.seekTo(); 494 Cell cell = scanner.getCell(); 495 List<Tag> tagsFromCell = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), 496 cell.getTagsLength()); 497 assertTrue(tagsFromCell.size() > 0); 498 for (Tag tag : tagsFromCell) { 499 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 500 } 501 } 502 } finally { 503 if (writer != null && context != null) writer.close(context); 504 dir.getFileSystem(conf).delete(dir, true); 505 } 506 } 507 508 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 509 public void testJobConfiguration() throws Exception { 510 Configuration conf = new Configuration(this.util.getConfiguration()); 511 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration") 512 .toString()); 513 Job job = new Job(conf); 514 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 515 Table table = Mockito.mock(Table.class); 516 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 517 setupMockStartKeys(regionLocator); 518 setupMockTableName(regionLocator); 519 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 520 assertEquals(job.getNumReduceTasks(), 4); 521 } 522 523 private byte [][] generateRandomStartKeys(int numKeys) { 524 Random random = new Random(); 525 byte[][] ret = new byte[numKeys][]; 526 // first region start key is always empty 527 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 528 for (int i = 1; i < numKeys; i++) { 529 ret[i] = 530 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 531 } 532 return ret; 533 } 534 535 private byte[][] generateRandomSplitKeys(int numKeys) { 536 Random random = new Random(); 537 byte[][] ret = new byte[numKeys][]; 538 for (int i = 0; i < numKeys; i++) { 539 ret[i] = 540 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 541 } 542 return ret; 543 } 544 545 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 546 public void testMRIncrementalLoad() throws Exception { 547 LOG.info("\nStarting test testMRIncrementalLoad\n"); 548 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 549 } 550 551 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 552 public void testMRIncrementalLoadWithSplit() throws Exception { 553 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 554 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 555 } 556 557 /** 558 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true 559 * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY 560 * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports), 561 * it's not possible to check the region locality by comparing region locations and DN hostnames. 562 * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does), 563 * we could test region locality features more easily. 564 */ 565 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 566 public void testMRIncrementalLoadWithLocality() throws Exception { 567 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 568 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 569 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 570 } 571 572 //@Ignore("Wahtevs") 573 @Test 574 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 575 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 576 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 577 } 578 579 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 580 boolean putSortReducer, String tableStr) throws Exception { 581 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 582 Arrays.asList(tableStr)); 583 } 584 585 @Test 586 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 587 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 588 doIncrementalLoadTest(false, false, true, 589 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList 590 ())); 591 } 592 593 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 594 boolean putSortReducer, List<String> tableStr) throws Exception { 595 util = new HBaseTestingUtility(); 596 Configuration conf = util.getConfiguration(); 597 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 598 int hostCount = 1; 599 int regionNum = 5; 600 if (shouldKeepLocality) { 601 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 602 // explicit hostnames parameter just like MiniDFSCluster does. 603 hostCount = 3; 604 regionNum = 20; 605 } 606 607 String[] hostnames = new String[hostCount]; 608 for (int i = 0; i < hostCount; ++i) { 609 hostnames[i] = "datanode_" + i; 610 } 611 util.startMiniCluster(1, hostCount, hostnames); 612 613 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 614 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 615 boolean writeMultipleTables = tableStr.size() > 1; 616 for (String tableStrSingle : tableStr) { 617 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 618 TableName tableName = TableName.valueOf(tableStrSingle); 619 Table table = util.createTable(tableName, FAMILIES, splitKeys); 620 621 RegionLocator r = util.getConnection().getRegionLocator(tableName); 622 assertEquals("Should start with empty table", 0, util.countRows(table)); 623 int numRegions = r.getStartKeys().length; 624 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 625 626 allTables.put(tableStrSingle, table); 627 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 628 } 629 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 630 // Generate the bulk load files 631 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 632 633 for (Table tableSingle : allTables.values()) { 634 // This doesn't write into the table, just makes files 635 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 636 } 637 int numTableDirs = 0; 638 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 639 Path tablePath = testDir; 640 641 if (writeMultipleTables) { 642 if (allTables.containsKey(tf.getPath().getName())) { 643 ++numTableDirs; 644 tablePath = tf.getPath(); 645 } 646 else { 647 continue; 648 } 649 } 650 651 // Make sure that a directory was created for every CF 652 int dir = 0; 653 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 654 for (byte[] family : FAMILIES) { 655 if (Bytes.toString(family).equals(f.getPath().getName())) { 656 ++dir; 657 } 658 } 659 } 660 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 661 } 662 if (writeMultipleTables) { 663 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 664 } 665 666 Admin admin = util.getConnection().getAdmin(); 667 try { 668 // handle the split case 669 if (shouldChangeRegions) { 670 Table chosenTable = allTables.values().iterator().next(); 671 // Choose a semi-random table if multiple tables are available 672 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 673 admin.disableTable(chosenTable.getName()); 674 util.waitUntilNoRegionsInTransition(); 675 676 util.deleteTable(chosenTable.getName()); 677 byte[][] newSplitKeys = generateRandomSplitKeys(14); 678 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 679 680 while (util.getConnection().getRegionLocator(chosenTable.getName()) 681 .getAllRegionLocations().size() != 15 || 682 !admin.isTableAvailable(table.getName())) { 683 Thread.sleep(200); 684 LOG.info("Waiting for new region assignment to happen"); 685 } 686 } 687 688 // Perform the actual load 689 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 690 Path tableDir = testDir; 691 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 692 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 693 if (writeMultipleTables) { 694 tableDir = new Path(testDir, tableNameStr); 695 } 696 Table currentTable = allTables.get(tableNameStr); 697 TableName currentTableName = currentTable.getName(); 698 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo 699 .getRegionLocator()); 700 701 // Ensure data shows up 702 int expectedRows = 0; 703 if (putSortReducer) { 704 // no rows should be extracted 705 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 706 util.countRows(currentTable)); 707 } else { 708 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 709 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 710 util.countRows(currentTable)); 711 Scan scan = new Scan(); 712 ResultScanner results = currentTable.getScanner(scan); 713 for (Result res : results) { 714 assertEquals(FAMILIES.length, res.rawCells().length); 715 Cell first = res.rawCells()[0]; 716 for (Cell kv : res.rawCells()) { 717 assertTrue(CellUtil.matchingRows(first, kv)); 718 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 719 } 720 } 721 results.close(); 722 } 723 String tableDigestBefore = util.checksumRows(currentTable); 724 // Check region locality 725 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 726 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 727 hbd.add(region.getHDFSBlocksDistribution()); 728 } 729 for (String hostname : hostnames) { 730 float locality = hbd.getBlockLocalityIndex(hostname); 731 LOG.info("locality of [" + hostname + "]: " + locality); 732 assertEquals(100, (int) (locality * 100)); 733 } 734 735 // Cause regions to reopen 736 admin.disableTable(currentTableName); 737 while (!admin.isTableDisabled(currentTableName)) { 738 Thread.sleep(200); 739 LOG.info("Waiting for table to disable"); 740 } 741 admin.enableTable(currentTableName); 742 util.waitTableAvailable(currentTableName); 743 assertEquals("Data should remain after reopening of regions", 744 tableDigestBefore, util.checksumRows(currentTable)); 745 } 746 } finally { 747 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 748 tableInfoSingle.getRegionLocator().close(); 749 } 750 for (Entry<String, Table> singleTable : allTables.entrySet() ) { 751 singleTable.getValue().close(); 752 util.deleteTable(singleTable.getValue().getName()); 753 } 754 testDir.getFileSystem(conf).delete(testDir, true); 755 util.shutdownMiniCluster(); 756 } 757 } 758 759 private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, 760 boolean putSortReducer) throws IOException, 761 InterruptedException, ClassNotFoundException { 762 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 763 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 764 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 765 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 766 CellSerialization.class.getName()); 767 setupRandomGeneratorMapper(job, putSortReducer); 768 if (tableInfo.size() > 1) { 769 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 770 int sum = 0; 771 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 772 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 773 } 774 assertEquals(sum, job.getNumReduceTasks()); 775 } 776 else { 777 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 778 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 779 regionLocator); 780 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 781 } 782 783 FileOutputFormat.setOutputPath(job, outDir); 784 785 assertFalse(util.getTestFileSystem().exists(outDir)) ; 786 787 assertTrue(job.waitForCompletion(true)); 788 } 789 790 /** 791 * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and 792 * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. 793 * Tests that the compression map is correctly serialized into 794 * and deserialized from configuration 795 * 796 * @throws IOException 797 */ 798 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 799 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 800 for (int numCfs = 0; numCfs <= 3; numCfs++) { 801 Configuration conf = new Configuration(this.util.getConfiguration()); 802 Map<String, Compression.Algorithm> familyToCompression = 803 getMockColumnFamiliesForCompression(numCfs); 804 Table table = Mockito.mock(Table.class); 805 setupMockColumnFamiliesForCompression(table, familyToCompression); 806 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 807 HFileOutputFormat2.serializeColumnFamilyAttribute 808 (HFileOutputFormat2.compressionDetails, 809 Arrays.asList(table.getTableDescriptor()))); 810 811 // read back family specific compression setting from the configuration 812 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2 813 .createFamilyCompressionMap(conf); 814 815 // test that we have a value for all column families that matches with the 816 // used mock values 817 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 818 assertEquals("Compression configuration incorrect for column family:" 819 + entry.getKey(), entry.getValue(), 820 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 821 } 822 } 823 } 824 825 private void setupMockColumnFamiliesForCompression(Table table, 826 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 827 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 828 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 829 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 830 .setMaxVersions(1) 831 .setCompressionType(entry.getValue()) 832 .setBlockCacheEnabled(false) 833 .setTimeToLive(0)); 834 } 835 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 836 } 837 838 /** 839 * @return a map from column family names to compression algorithms for 840 * testing column family compression. Column family names have special characters 841 */ 842 private Map<String, Compression.Algorithm> 843 getMockColumnFamiliesForCompression (int numCfs) { 844 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 845 // use column family names having special characters 846 if (numCfs-- > 0) { 847 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 848 } 849 if (numCfs-- > 0) { 850 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 851 } 852 if (numCfs-- > 0) { 853 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 854 } 855 if (numCfs-- > 0) { 856 familyToCompression.put("Family3", Compression.Algorithm.NONE); 857 } 858 return familyToCompression; 859 } 860 861 862 /** 863 * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and 864 * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. 865 * Tests that the compression map is correctly serialized into 866 * and deserialized from configuration 867 * 868 * @throws IOException 869 */ 870 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 871 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 872 for (int numCfs = 0; numCfs <= 2; numCfs++) { 873 Configuration conf = new Configuration(this.util.getConfiguration()); 874 Map<String, BloomType> familyToBloomType = 875 getMockColumnFamiliesForBloomType(numCfs); 876 Table table = Mockito.mock(Table.class); 877 setupMockColumnFamiliesForBloomType(table, 878 familyToBloomType); 879 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 880 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 881 Arrays.asList(table.getTableDescriptor()))); 882 883 // read back family specific data block encoding settings from the 884 // configuration 885 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 886 HFileOutputFormat2 887 .createFamilyBloomTypeMap(conf); 888 889 // test that we have a value for all column families that matches with the 890 // used mock values 891 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 892 assertEquals("BloomType configuration incorrect for column family:" 893 + entry.getKey(), entry.getValue(), 894 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 895 } 896 } 897 } 898 899 private void setupMockColumnFamiliesForBloomType(Table table, 900 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 901 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 902 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 903 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 904 .setMaxVersions(1) 905 .setBloomFilterType(entry.getValue()) 906 .setBlockCacheEnabled(false) 907 .setTimeToLive(0)); 908 } 909 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 910 } 911 912 /** 913 * @return a map from column family names to compression algorithms for 914 * testing column family compression. Column family names have special characters 915 */ 916 private Map<String, BloomType> 917 getMockColumnFamiliesForBloomType (int numCfs) { 918 Map<String, BloomType> familyToBloomType = new HashMap<>(); 919 // use column family names having special characters 920 if (numCfs-- > 0) { 921 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 922 } 923 if (numCfs-- > 0) { 924 familyToBloomType.put("Family2=asdads&!AASD", 925 BloomType.ROWCOL); 926 } 927 if (numCfs-- > 0) { 928 familyToBloomType.put("Family3", BloomType.NONE); 929 } 930 return familyToBloomType; 931 } 932 933 /** 934 * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and 935 * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. 936 * Tests that the compression map is correctly serialized into 937 * and deserialized from configuration 938 * 939 * @throws IOException 940 */ 941 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 942 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 943 for (int numCfs = 0; numCfs <= 3; numCfs++) { 944 Configuration conf = new Configuration(this.util.getConfiguration()); 945 Map<String, Integer> familyToBlockSize = 946 getMockColumnFamiliesForBlockSize(numCfs); 947 Table table = Mockito.mock(Table.class); 948 setupMockColumnFamiliesForBlockSize(table, 949 familyToBlockSize); 950 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 951 HFileOutputFormat2.serializeColumnFamilyAttribute 952 (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table 953 .getTableDescriptor()))); 954 955 // read back family specific data block encoding settings from the 956 // configuration 957 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 958 HFileOutputFormat2 959 .createFamilyBlockSizeMap(conf); 960 961 // test that we have a value for all column families that matches with the 962 // used mock values 963 for (Entry<String, Integer> entry : familyToBlockSize.entrySet() 964 ) { 965 assertEquals("BlockSize configuration incorrect for column family:" 966 + entry.getKey(), entry.getValue(), 967 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 968 } 969 } 970 } 971 972 private void setupMockColumnFamiliesForBlockSize(Table table, 973 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 974 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 975 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 976 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 977 .setMaxVersions(1) 978 .setBlocksize(entry.getValue()) 979 .setBlockCacheEnabled(false) 980 .setTimeToLive(0)); 981 } 982 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 983 } 984 985 /** 986 * @return a map from column family names to compression algorithms for 987 * testing column family compression. Column family names have special characters 988 */ 989 private Map<String, Integer> 990 getMockColumnFamiliesForBlockSize (int numCfs) { 991 Map<String, Integer> familyToBlockSize = new HashMap<>(); 992 // use column family names having special characters 993 if (numCfs-- > 0) { 994 familyToBlockSize.put("Family1!@#!@#&", 1234); 995 } 996 if (numCfs-- > 0) { 997 familyToBlockSize.put("Family2=asdads&!AASD", 998 Integer.MAX_VALUE); 999 } 1000 if (numCfs-- > 0) { 1001 familyToBlockSize.put("Family2=asdads&!AASD", 1002 Integer.MAX_VALUE); 1003 } 1004 if (numCfs-- > 0) { 1005 familyToBlockSize.put("Family3", 0); 1006 } 1007 return familyToBlockSize; 1008 } 1009 1010 /** 1011 * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)} 1012 * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. 1013 * Tests that the compression map is correctly serialized into 1014 * and deserialized from configuration 1015 * 1016 * @throws IOException 1017 */ 1018 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1019 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1020 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1021 Configuration conf = new Configuration(this.util.getConfiguration()); 1022 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1023 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1024 Table table = Mockito.mock(Table.class); 1025 setupMockColumnFamiliesForDataBlockEncoding(table, 1026 familyToDataBlockEncoding); 1027 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1028 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1029 HFileOutputFormat2.serializeColumnFamilyAttribute 1030 (HFileOutputFormat2.dataBlockEncodingDetails, Arrays 1031 .asList(tableDescriptor))); 1032 1033 // read back family specific data block encoding settings from the 1034 // configuration 1035 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1036 HFileOutputFormat2 1037 .createFamilyDataBlockEncodingMap(conf); 1038 1039 // test that we have a value for all column families that matches with the 1040 // used mock values 1041 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1042 assertEquals("DataBlockEncoding configuration incorrect for column family:" 1043 + entry.getKey(), entry.getValue(), 1044 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1045 } 1046 } 1047 } 1048 1049 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1050 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1051 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1052 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1053 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 1054 .setMaxVersions(1) 1055 .setDataBlockEncoding(entry.getValue()) 1056 .setBlockCacheEnabled(false) 1057 .setTimeToLive(0)); 1058 } 1059 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1060 } 1061 1062 /** 1063 * @return a map from column family names to compression algorithms for 1064 * testing column family compression. Column family names have special characters 1065 */ 1066 private Map<String, DataBlockEncoding> 1067 getMockColumnFamiliesForDataBlockEncoding (int numCfs) { 1068 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1069 // use column family names having special characters 1070 if (numCfs-- > 0) { 1071 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1072 } 1073 if (numCfs-- > 0) { 1074 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1075 DataBlockEncoding.FAST_DIFF); 1076 } 1077 if (numCfs-- > 0) { 1078 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1079 DataBlockEncoding.PREFIX); 1080 } 1081 if (numCfs-- > 0) { 1082 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1083 } 1084 return familyToDataBlockEncoding; 1085 } 1086 1087 private void setupMockStartKeys(RegionLocator table) throws IOException { 1088 byte[][] mockKeys = new byte[][] { 1089 HConstants.EMPTY_BYTE_ARRAY, 1090 Bytes.toBytes("aaa"), 1091 Bytes.toBytes("ggg"), 1092 Bytes.toBytes("zzz") 1093 }; 1094 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1095 } 1096 1097 private void setupMockTableName(RegionLocator table) throws IOException { 1098 TableName mockTableName = TableName.valueOf("mock_table"); 1099 Mockito.doReturn(mockTableName).when(table).getName(); 1100 } 1101 1102 /** 1103 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and 1104 * bloom filter settings from the column family descriptor 1105 */ 1106 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1107 public void testColumnFamilySettings() throws Exception { 1108 Configuration conf = new Configuration(this.util.getConfiguration()); 1109 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1110 TaskAttemptContext context = null; 1111 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1112 1113 // Setup table descriptor 1114 Table table = Mockito.mock(Table.class); 1115 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1116 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1117 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1118 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { 1119 htd.addFamily(hcd); 1120 } 1121 1122 // set up the table to return some mock keys 1123 setupMockStartKeys(regionLocator); 1124 1125 try { 1126 // partial map red setup to get an operational writer for testing 1127 // We turn off the sequence file compression, because DefaultCodec 1128 // pollutes the GZip codec pool with an incompatible compressor. 1129 conf.set("io.seqfile.compression.type", "NONE"); 1130 conf.set("hbase.fs.tmp.dir", dir.toString()); 1131 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1132 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1133 1134 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1135 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1136 setupRandomGeneratorMapper(job, false); 1137 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1138 FileOutputFormat.setOutputPath(job, dir); 1139 context = createTestTaskAttemptContext(job); 1140 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1141 writer = hof.getRecordWriter(context); 1142 1143 // write out random rows 1144 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1145 writer.close(context); 1146 1147 // Make sure that a directory was created for every CF 1148 FileSystem fs = dir.getFileSystem(conf); 1149 1150 // commit so that the filesystem has one directory per column family 1151 hof.getOutputCommitter(context).commitTask(context); 1152 hof.getOutputCommitter(context).commitJob(context); 1153 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1154 assertEquals(htd.getFamilies().size(), families.length); 1155 for (FileStatus f : families) { 1156 String familyStr = f.getPath().getName(); 1157 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1158 // verify that the compression on this file matches the configured 1159 // compression 1160 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1161 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1162 Map<byte[], byte[]> fileInfo = reader.loadFileInfo(); 1163 1164 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1165 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1166 assertEquals("Incorrect bloom filter used for column family " + familyStr + 1167 "(reader: " + reader + ")", 1168 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1169 assertEquals("Incorrect compression used for column family " + familyStr + 1170 "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression()); 1171 } 1172 } finally { 1173 dir.getFileSystem(conf).delete(dir, true); 1174 } 1175 } 1176 1177 /** 1178 * Write random values to the writer assuming a table created using 1179 * {@link #FAMILIES} as column family descriptors 1180 */ 1181 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1182 TaskAttemptContext context, Set<byte[]> families, int numRows) 1183 throws IOException, InterruptedException { 1184 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1185 int valLength = 10; 1186 byte valBytes[] = new byte[valLength]; 1187 1188 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1189 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1190 final byte [] qualifier = Bytes.toBytes("data"); 1191 Random random = new Random(); 1192 for (int i = 0; i < numRows; i++) { 1193 1194 Bytes.putInt(keyBytes, 0, i); 1195 random.nextBytes(valBytes); 1196 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1197 1198 for (byte[] family : families) { 1199 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1200 writer.write(key, kv); 1201 } 1202 } 1203 } 1204 1205 /** 1206 * This test is to test the scenario happened in HBASE-6901. 1207 * All files are bulk loaded and excluded from minor compaction. 1208 * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException 1209 * will be thrown. 1210 */ 1211 @Ignore ("Flakey: See HBASE-9051") @Test 1212 public void testExcludeAllFromMinorCompaction() throws Exception { 1213 Configuration conf = util.getConfiguration(); 1214 conf.setInt("hbase.hstore.compaction.min", 2); 1215 generateRandomStartKeys(5); 1216 1217 util.startMiniCluster(); 1218 try (Connection conn = ConnectionFactory.createConnection(); 1219 Admin admin = conn.getAdmin(); 1220 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1221 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1222 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1223 assertEquals("Should start with empty table", 0, util.countRows(table)); 1224 1225 // deep inspection: get the StoreFile dir 1226 final Path storePath = new Path( 1227 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), 1228 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1229 Bytes.toString(FAMILIES[0]))); 1230 assertEquals(0, fs.listStatus(storePath).length); 1231 1232 // Generate two bulk load files 1233 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1234 true); 1235 1236 for (int i = 0; i < 2; i++) { 1237 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1238 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1239 .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); 1240 // Perform the actual load 1241 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1242 } 1243 1244 // Ensure data shows up 1245 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1246 assertEquals("LoadIncrementalHFiles should put expected data in table", 1247 expectedRows, util.countRows(table)); 1248 1249 // should have a second StoreFile now 1250 assertEquals(2, fs.listStatus(storePath).length); 1251 1252 // minor compactions shouldn't get rid of the file 1253 admin.compact(TABLE_NAMES[0]); 1254 try { 1255 quickPoll(new Callable<Boolean>() { 1256 @Override 1257 public Boolean call() throws Exception { 1258 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1259 for (HRegion region : regions) { 1260 for (HStore store : region.getStores()) { 1261 store.closeAndArchiveCompactedFiles(); 1262 } 1263 } 1264 return fs.listStatus(storePath).length == 1; 1265 } 1266 }, 5000); 1267 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1268 } catch (AssertionError ae) { 1269 // this is expected behavior 1270 } 1271 1272 // a major compaction should work though 1273 admin.majorCompact(TABLE_NAMES[0]); 1274 quickPoll(new Callable<Boolean>() { 1275 @Override 1276 public Boolean call() throws Exception { 1277 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1278 for (HRegion region : regions) { 1279 for (HStore store : region.getStores()) { 1280 store.closeAndArchiveCompactedFiles(); 1281 } 1282 } 1283 return fs.listStatus(storePath).length == 1; 1284 } 1285 }, 5000); 1286 1287 } finally { 1288 util.shutdownMiniCluster(); 1289 } 1290 } 1291 1292 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1293 public void testExcludeMinorCompaction() throws Exception { 1294 Configuration conf = util.getConfiguration(); 1295 conf.setInt("hbase.hstore.compaction.min", 2); 1296 generateRandomStartKeys(5); 1297 1298 util.startMiniCluster(); 1299 try (Connection conn = ConnectionFactory.createConnection(conf); 1300 Admin admin = conn.getAdmin()){ 1301 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1302 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1303 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1304 assertEquals("Should start with empty table", 0, util.countRows(table)); 1305 1306 // deep inspection: get the StoreFile dir 1307 final Path storePath = new Path( 1308 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), 1309 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1310 Bytes.toString(FAMILIES[0]))); 1311 assertEquals(0, fs.listStatus(storePath).length); 1312 1313 // put some data in it and flush to create a storefile 1314 Put p = new Put(Bytes.toBytes("test")); 1315 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1316 table.put(p); 1317 admin.flush(TABLE_NAMES[0]); 1318 assertEquals(1, util.countRows(table)); 1319 quickPoll(new Callable<Boolean>() { 1320 @Override 1321 public Boolean call() throws Exception { 1322 return fs.listStatus(storePath).length == 1; 1323 } 1324 }, 5000); 1325 1326 // Generate a bulk load file with more rows 1327 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1328 true); 1329 1330 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1331 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1332 .getTableDescriptor(), regionLocator)), testDir, false); 1333 1334 // Perform the actual load 1335 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1336 1337 // Ensure data shows up 1338 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1339 assertEquals("LoadIncrementalHFiles should put expected data in table", 1340 expectedRows + 1, util.countRows(table)); 1341 1342 // should have a second StoreFile now 1343 assertEquals(2, fs.listStatus(storePath).length); 1344 1345 // minor compactions shouldn't get rid of the file 1346 admin.compact(TABLE_NAMES[0]); 1347 try { 1348 quickPoll(new Callable<Boolean>() { 1349 @Override 1350 public Boolean call() throws Exception { 1351 return fs.listStatus(storePath).length == 1; 1352 } 1353 }, 5000); 1354 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1355 } catch (AssertionError ae) { 1356 // this is expected behavior 1357 } 1358 1359 // a major compaction should work though 1360 admin.majorCompact(TABLE_NAMES[0]); 1361 quickPoll(new Callable<Boolean>() { 1362 @Override 1363 public Boolean call() throws Exception { 1364 return fs.listStatus(storePath).length == 1; 1365 } 1366 }, 5000); 1367 1368 } finally { 1369 util.shutdownMiniCluster(); 1370 } 1371 } 1372 1373 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1374 int sleepMs = 10; 1375 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1376 while (retries-- > 0) { 1377 if (c.call().booleanValue()) { 1378 return; 1379 } 1380 Thread.sleep(sleepMs); 1381 } 1382 fail(); 1383 } 1384 1385 public static void main(String args[]) throws Exception { 1386 new TestCellBasedHFileOutputFormat2().manualTest(args); 1387 } 1388 1389 public void manualTest(String args[]) throws Exception { 1390 Configuration conf = HBaseConfiguration.create(); 1391 util = new HBaseTestingUtility(conf); 1392 if ("newtable".equals(args[0])) { 1393 TableName tname = TableName.valueOf(args[1]); 1394 byte[][] splitKeys = generateRandomSplitKeys(4); 1395 Table table = util.createTable(tname, FAMILIES, splitKeys); 1396 } else if ("incremental".equals(args[0])) { 1397 TableName tname = TableName.valueOf(args[1]); 1398 try(Connection c = ConnectionFactory.createConnection(conf); 1399 Admin admin = c.getAdmin(); 1400 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1401 Path outDir = new Path("incremental-out"); 1402 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin 1403 .getTableDescriptor(tname), regionLocator)), outDir, false); 1404 } 1405 } else { 1406 throw new RuntimeException( 1407 "usage: TestHFileOutputFormat2 newtable | incremental"); 1408 } 1409 } 1410 1411 @Test 1412 public void testBlockStoragePolicy() throws Exception { 1413 util = new HBaseTestingUtility(); 1414 Configuration conf = util.getConfiguration(); 1415 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1416 1417 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + 1418 Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( 1419 TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); 1420 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1421 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1422 util.startMiniDFSCluster(3); 1423 FileSystem fs = util.getDFSCluster().getFileSystem(); 1424 try { 1425 fs.mkdirs(cf1Dir); 1426 fs.mkdirs(cf2Dir); 1427 1428 // the original block storage policy would be HOT 1429 String spA = getStoragePolicyName(fs, cf1Dir); 1430 String spB = getStoragePolicyName(fs, cf2Dir); 1431 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1432 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1433 assertEquals("HOT", spA); 1434 assertEquals("HOT", spB); 1435 1436 // alter table cf schema to change storage policies 1437 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1438 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1439 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1440 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1441 spA = getStoragePolicyName(fs, cf1Dir); 1442 spB = getStoragePolicyName(fs, cf2Dir); 1443 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1444 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1445 assertNotNull(spA); 1446 assertEquals("ONE_SSD", spA); 1447 assertNotNull(spB); 1448 assertEquals("ALL_SSD", spB); 1449 } finally { 1450 fs.delete(cf1Dir, true); 1451 fs.delete(cf2Dir, true); 1452 util.shutdownMiniDFSCluster(); 1453 } 1454 } 1455 1456 private String getStoragePolicyName(FileSystem fs, Path path) { 1457 try { 1458 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1459 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1460 } catch (Exception e) { 1461 // Maybe fail because of using old HDFS version, try the old way 1462 if (LOG.isTraceEnabled()) { 1463 LOG.trace("Failed to get policy directly", e); 1464 } 1465 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1466 return policy == null ? "HOT" : policy;// HOT by default 1467 } 1468 } 1469 1470 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1471 try { 1472 if (fs instanceof DistributedFileSystem) { 1473 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1474 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1475 if (null != status) { 1476 byte storagePolicyId = status.getStoragePolicy(); 1477 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1478 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1479 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1480 for (BlockStoragePolicy policy : policies) { 1481 if (policy.getId() == storagePolicyId) { 1482 return policy.getName(); 1483 } 1484 } 1485 } 1486 } 1487 } 1488 } catch (Throwable e) { 1489 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1490 } 1491 1492 return null; 1493 } 1494} 1495