001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Random; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.LocatedFileStatus; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.RemoteIterator; 047import org.apache.hadoop.hbase.ArrayBackedTag; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HDFSBlocksDistribution; 057import org.apache.hadoop.hbase.HTableDescriptor; 058import org.apache.hadoop.hbase.HadoopShims; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.PerformanceEvaluation; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.StartMiniClusterOption; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.Tag; 065import org.apache.hadoop.hbase.TagType; 066import org.apache.hadoop.hbase.client.Admin; 067import org.apache.hadoop.hbase.client.Connection; 068import org.apache.hadoop.hbase.client.ConnectionFactory; 069import org.apache.hadoop.hbase.client.Put; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.Table; 075import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 076import org.apache.hadoop.hbase.io.compress.Compression; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 079import org.apache.hadoop.hbase.io.hfile.CacheConfig; 080import org.apache.hadoop.hbase.io.hfile.HFile; 081import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.regionserver.BloomType; 084import org.apache.hadoop.hbase.regionserver.HRegion; 085import org.apache.hadoop.hbase.regionserver.HStore; 086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 088import org.apache.hadoop.hbase.testclassification.LargeTests; 089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.FSUtils; 093import org.apache.hadoop.hbase.util.ReflectionUtils; 094import org.apache.hadoop.hdfs.DistributedFileSystem; 095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 098import org.apache.hadoop.io.NullWritable; 099import org.apache.hadoop.mapreduce.Job; 100import org.apache.hadoop.mapreduce.Mapper; 101import org.apache.hadoop.mapreduce.RecordWriter; 102import org.apache.hadoop.mapreduce.TaskAttemptContext; 103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 104import org.junit.ClassRule; 105import org.junit.Ignore; 106import org.junit.Test; 107import org.junit.experimental.categories.Category; 108import org.mockito.Mockito; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112/** 113 * Simple test for {@link HFileOutputFormat2}. 114 * Sets up and runs a mapreduce job that writes hfile output. 115 * Creates a few inner classes to implement splits and an inputformat that 116 * emits keys and values like those of {@link PerformanceEvaluation}. 117 */ 118@Category({VerySlowMapReduceTests.class, LargeTests.class}) 119//TODO : Remove this in 3.0 120public class TestHFileOutputFormat2 { 121 122 @ClassRule 123 public static final HBaseClassTestRule CLASS_RULE = 124 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 125 126 private final static int ROWSPERSPLIT = 1024; 127 128 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 129 private static final byte[][] FAMILIES = { 130 Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))}; 131 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", 132 "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); 133 134 private HBaseTestingUtility util = new HBaseTestingUtility(); 135 136 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 137 138 /** 139 * Simple mapper that makes KeyValue output. 140 */ 141 static class RandomKVGeneratingMapper 142 extends Mapper<NullWritable, NullWritable, 143 ImmutableBytesWritable, Cell> { 144 145 private int keyLength; 146 private static final int KEYLEN_DEFAULT=10; 147 private static final String KEYLEN_CONF="randomkv.key.length"; 148 149 private int valLength; 150 private static final int VALLEN_DEFAULT=10; 151 private static final String VALLEN_CONF="randomkv.val.length"; 152 private static final byte [] QUALIFIER = Bytes.toBytes("data"); 153 private boolean multiTableMapper = false; 154 private TableName[] tables = null; 155 156 157 @Override 158 protected void setup(Context context) throws IOException, 159 InterruptedException { 160 super.setup(context); 161 162 Configuration conf = context.getConfiguration(); 163 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 164 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 165 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 166 false); 167 if (multiTableMapper) { 168 tables = TABLE_NAMES; 169 } else { 170 tables = new TableName[]{TABLE_NAMES[0]}; 171 } 172 } 173 174 @Override 175 protected void map( 176 NullWritable n1, NullWritable n2, 177 Mapper<NullWritable, NullWritable, 178 ImmutableBytesWritable,Cell>.Context context) 179 throws java.io.IOException ,InterruptedException 180 { 181 182 byte keyBytes[] = new byte[keyLength]; 183 byte valBytes[] = new byte[valLength]; 184 185 int taskId = context.getTaskAttemptID().getTaskID().getId(); 186 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 187 Random random = new Random(); 188 byte[] key; 189 for (int j = 0; j < tables.length; ++j) { 190 for (int i = 0; i < ROWSPERSPLIT; i++) { 191 random.nextBytes(keyBytes); 192 // Ensure that unique tasks generate unique keys 193 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 194 random.nextBytes(valBytes); 195 key = keyBytes; 196 if (multiTableMapper) { 197 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 198 } 199 200 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 201 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 202 context.write(new ImmutableBytesWritable(key), kv); 203 } 204 } 205 } 206 } 207 } 208 209 /** 210 * Simple mapper that makes Put output. 211 */ 212 static class RandomPutGeneratingMapper 213 extends Mapper<NullWritable, NullWritable, 214 ImmutableBytesWritable, Put> { 215 216 private int keyLength; 217 private static final int KEYLEN_DEFAULT = 10; 218 private static final String KEYLEN_CONF = "randomkv.key.length"; 219 220 private int valLength; 221 private static final int VALLEN_DEFAULT = 10; 222 private static final String VALLEN_CONF = "randomkv.val.length"; 223 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 224 private boolean multiTableMapper = false; 225 private TableName[] tables = null; 226 227 @Override 228 protected void setup(Context context) throws IOException, 229 InterruptedException { 230 super.setup(context); 231 232 Configuration conf = context.getConfiguration(); 233 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 234 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 235 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 236 false); 237 if (multiTableMapper) { 238 tables = TABLE_NAMES; 239 } else { 240 tables = new TableName[]{TABLE_NAMES[0]}; 241 } 242 } 243 244 @Override 245 protected void map( 246 NullWritable n1, NullWritable n2, 247 Mapper<NullWritable, NullWritable, 248 ImmutableBytesWritable, Put>.Context context) 249 throws java.io.IOException, InterruptedException { 250 251 byte keyBytes[] = new byte[keyLength]; 252 byte valBytes[] = new byte[valLength]; 253 254 int taskId = context.getTaskAttemptID().getTaskID().getId(); 255 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 256 257 Random random = new Random(); 258 byte[] key; 259 for (int j = 0; j < tables.length; ++j) { 260 for (int i = 0; i < ROWSPERSPLIT; i++) { 261 random.nextBytes(keyBytes); 262 // Ensure that unique tasks generate unique keys 263 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 264 random.nextBytes(valBytes); 265 key = keyBytes; 266 if (multiTableMapper) { 267 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 268 } 269 270 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 271 Put p = new Put(keyBytes); 272 p.addColumn(family, QUALIFIER, valBytes); 273 // set TTL to very low so that the scan does not return any value 274 p.setTTL(1l); 275 context.write(new ImmutableBytesWritable(key), p); 276 } 277 } 278 } 279 } 280 } 281 282 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 283 if (putSortReducer) { 284 job.setInputFormatClass(NMapInputFormat.class); 285 job.setMapperClass(RandomPutGeneratingMapper.class); 286 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 287 job.setMapOutputValueClass(Put.class); 288 } else { 289 job.setInputFormatClass(NMapInputFormat.class); 290 job.setMapperClass(RandomKVGeneratingMapper.class); 291 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 292 job.setMapOutputValueClass(KeyValue.class); 293 } 294 } 295 296 /** 297 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if 298 * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. 299 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 300 */ 301 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 302 public void test_LATEST_TIMESTAMP_isReplaced() 303 throws Exception { 304 Configuration conf = new Configuration(this.util.getConfiguration()); 305 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 306 TaskAttemptContext context = null; 307 Path dir = 308 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 309 try { 310 Job job = new Job(conf); 311 FileOutputFormat.setOutputPath(job, dir); 312 context = createTestTaskAttemptContext(job); 313 HFileOutputFormat2 hof = new HFileOutputFormat2(); 314 writer = hof.getRecordWriter(context); 315 final byte [] b = Bytes.toBytes("b"); 316 317 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 318 // changed by call to write. Check all in kv is same but ts. 319 KeyValue kv = new KeyValue(b, b, b); 320 KeyValue original = kv.clone(); 321 writer.write(new ImmutableBytesWritable(), kv); 322 assertFalse(original.equals(kv)); 323 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 324 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 325 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 326 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 327 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 328 329 // Test 2. Now test passing a kv that has explicit ts. It should not be 330 // changed by call to record write. 331 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 332 original = kv.clone(); 333 writer.write(new ImmutableBytesWritable(), kv); 334 assertTrue(original.equals(kv)); 335 } finally { 336 if (writer != null && context != null) writer.close(context); 337 dir.getFileSystem(conf).delete(dir, true); 338 } 339 } 340 341 private TaskAttemptContext createTestTaskAttemptContext(final Job job) 342 throws Exception { 343 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 344 TaskAttemptContext context = hadoop.createTestTaskAttemptContext( 345 job, "attempt_201402131733_0001_m_000000_0"); 346 return context; 347 } 348 349 /* 350 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE 351 * metadata used by time-restricted scans. 352 */ 353 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 354 public void test_TIMERANGE() throws Exception { 355 Configuration conf = new Configuration(this.util.getConfiguration()); 356 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 357 TaskAttemptContext context = null; 358 Path dir = 359 util.getDataTestDir("test_TIMERANGE_present"); 360 LOG.info("Timerange dir writing to dir: "+ dir); 361 try { 362 // build a record writer using HFileOutputFormat2 363 Job job = new Job(conf); 364 FileOutputFormat.setOutputPath(job, dir); 365 context = createTestTaskAttemptContext(job); 366 HFileOutputFormat2 hof = new HFileOutputFormat2(); 367 writer = hof.getRecordWriter(context); 368 369 // Pass two key values with explicit times stamps 370 final byte [] b = Bytes.toBytes("b"); 371 372 // value 1 with timestamp 2000 373 KeyValue kv = new KeyValue(b, b, b, 2000, b); 374 KeyValue original = kv.clone(); 375 writer.write(new ImmutableBytesWritable(), kv); 376 assertEquals(original,kv); 377 378 // value 2 with timestamp 1000 379 kv = new KeyValue(b, b, b, 1000, b); 380 original = kv.clone(); 381 writer.write(new ImmutableBytesWritable(), kv); 382 assertEquals(original, kv); 383 384 // verify that the file has the proper FileInfo. 385 writer.close(context); 386 387 // the generated file lives 1 directory down from the attempt directory 388 // and is the only file, e.g. 389 // _attempt__0000_r_000000_0/b/1979617994050536795 390 FileSystem fs = FileSystem.get(conf); 391 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 392 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 393 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 394 395 // open as HFile Reader and pull out TIMERANGE FileInfo. 396 HFile.Reader rd = 397 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 398 Map<byte[],byte[]> finfo = rd.loadFileInfo(); 399 byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); 400 assertNotNull(range); 401 402 // unmarshall and check values. 403 TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range); 404 LOG.info(timeRangeTracker.getMin() + 405 "...." + timeRangeTracker.getMax()); 406 assertEquals(1000, timeRangeTracker.getMin()); 407 assertEquals(2000, timeRangeTracker.getMax()); 408 rd.close(); 409 } finally { 410 if (writer != null && context != null) writer.close(context); 411 dir.getFileSystem(conf).delete(dir, true); 412 } 413 } 414 415 /** 416 * Run small MR job. 417 */ 418 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 419 public void testWritingPEData() throws Exception { 420 Configuration conf = util.getConfiguration(); 421 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 422 FileSystem fs = testDir.getFileSystem(conf); 423 424 // Set down this value or we OOME in eclipse. 425 conf.setInt("mapreduce.task.io.sort.mb", 20); 426 // Write a few files. 427 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); 428 429 Job job = new Job(conf, "testWritingPEData"); 430 setupRandomGeneratorMapper(job, false); 431 // This partitioner doesn't work well for number keys but using it anyways 432 // just to demonstrate how to configure it. 433 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 434 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 435 436 Arrays.fill(startKey, (byte)0); 437 Arrays.fill(endKey, (byte)0xff); 438 439 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 440 // Set start and end rows for partitioner. 441 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 442 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 443 job.setReducerClass(KeyValueSortReducer.class); 444 job.setOutputFormatClass(HFileOutputFormat2.class); 445 job.setNumReduceTasks(4); 446 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 447 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 448 KeyValueSerialization.class.getName()); 449 450 FileOutputFormat.setOutputPath(job, testDir); 451 assertTrue(job.waitForCompletion(false)); 452 FileStatus [] files = fs.listStatus(testDir); 453 assertTrue(files.length > 0); 454 } 455 456 /** 457 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into 458 * hfile. 459 */ 460 @Test 461 public void test_WritingTagData() 462 throws Exception { 463 Configuration conf = new Configuration(this.util.getConfiguration()); 464 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 465 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 466 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 467 TaskAttemptContext context = null; 468 Path dir = 469 util.getDataTestDir("WritingTagData"); 470 try { 471 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 472 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 473 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 474 Job job = new Job(conf); 475 FileOutputFormat.setOutputPath(job, dir); 476 context = createTestTaskAttemptContext(job); 477 HFileOutputFormat2 hof = new HFileOutputFormat2(); 478 writer = hof.getRecordWriter(context); 479 final byte [] b = Bytes.toBytes("b"); 480 481 List< Tag > tags = new ArrayList<>(); 482 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 483 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 484 writer.write(new ImmutableBytesWritable(), kv); 485 writer.close(context); 486 writer = null; 487 FileSystem fs = dir.getFileSystem(conf); 488 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 489 while(iterator.hasNext()) { 490 LocatedFileStatus keyFileStatus = iterator.next(); 491 HFile.Reader reader = 492 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 493 HFileScanner scanner = reader.getScanner(false, false, false); 494 scanner.seekTo(); 495 Cell cell = scanner.getCell(); 496 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 497 assertTrue(tagsFromCell.size() > 0); 498 for (Tag tag : tagsFromCell) { 499 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 500 } 501 } 502 } finally { 503 if (writer != null && context != null) writer.close(context); 504 dir.getFileSystem(conf).delete(dir, true); 505 } 506 } 507 508 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 509 public void testJobConfiguration() throws Exception { 510 Configuration conf = new Configuration(this.util.getConfiguration()); 511 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration") 512 .toString()); 513 Job job = new Job(conf); 514 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 515 Table table = Mockito.mock(Table.class); 516 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 517 setupMockStartKeys(regionLocator); 518 setupMockTableName(regionLocator); 519 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 520 assertEquals(job.getNumReduceTasks(), 4); 521 } 522 523 private byte [][] generateRandomStartKeys(int numKeys) { 524 Random random = new Random(); 525 byte[][] ret = new byte[numKeys][]; 526 // first region start key is always empty 527 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 528 for (int i = 1; i < numKeys; i++) { 529 ret[i] = 530 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 531 } 532 return ret; 533 } 534 535 private byte[][] generateRandomSplitKeys(int numKeys) { 536 Random random = new Random(); 537 byte[][] ret = new byte[numKeys][]; 538 for (int i = 0; i < numKeys; i++) { 539 ret[i] = 540 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 541 } 542 return ret; 543 } 544 545 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 546 public void testMRIncrementalLoad() throws Exception { 547 LOG.info("\nStarting test testMRIncrementalLoad\n"); 548 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 549 } 550 551 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 552 public void testMRIncrementalLoadWithSplit() throws Exception { 553 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 554 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 555 } 556 557 /** 558 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true 559 * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY 560 * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports), 561 * it's not possible to check the region locality by comparing region locations and DN hostnames. 562 * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does), 563 * we could test region locality features more easily. 564 */ 565 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 566 public void testMRIncrementalLoadWithLocality() throws Exception { 567 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 568 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 569 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 570 } 571 572 //@Ignore("Wahtevs") 573 @Test 574 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 575 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 576 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 577 } 578 579 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 580 boolean putSortReducer, String tableStr) throws Exception { 581 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 582 Arrays.asList(tableStr)); 583 } 584 585 @Test 586 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 587 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 588 doIncrementalLoadTest(false, false, true, 589 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList 590 ())); 591 } 592 593 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 594 boolean putSortReducer, List<String> tableStr) throws Exception { 595 util = new HBaseTestingUtility(); 596 Configuration conf = util.getConfiguration(); 597 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 598 int hostCount = 1; 599 int regionNum = 5; 600 if (shouldKeepLocality) { 601 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 602 // explicit hostnames parameter just like MiniDFSCluster does. 603 hostCount = 3; 604 regionNum = 20; 605 } 606 607 String[] hostnames = new String[hostCount]; 608 for (int i = 0; i < hostCount; ++i) { 609 hostnames[i] = "datanode_" + i; 610 } 611 StartMiniClusterOption option = StartMiniClusterOption.builder() 612 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 613 util.startMiniCluster(option); 614 615 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 616 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 617 boolean writeMultipleTables = tableStr.size() > 1; 618 for (String tableStrSingle : tableStr) { 619 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 620 TableName tableName = TableName.valueOf(tableStrSingle); 621 Table table = util.createTable(tableName, FAMILIES, splitKeys); 622 623 RegionLocator r = util.getConnection().getRegionLocator(tableName); 624 assertEquals("Should start with empty table", 0, util.countRows(table)); 625 int numRegions = r.getStartKeys().length; 626 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 627 628 allTables.put(tableStrSingle, table); 629 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 630 } 631 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 632 // Generate the bulk load files 633 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 634 635 for (Table tableSingle : allTables.values()) { 636 // This doesn't write into the table, just makes files 637 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 638 } 639 int numTableDirs = 0; 640 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 641 Path tablePath = testDir; 642 643 if (writeMultipleTables) { 644 if (allTables.containsKey(tf.getPath().getName())) { 645 ++numTableDirs; 646 tablePath = tf.getPath(); 647 } 648 else { 649 continue; 650 } 651 } 652 653 // Make sure that a directory was created for every CF 654 int dir = 0; 655 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 656 for (byte[] family : FAMILIES) { 657 if (Bytes.toString(family).equals(f.getPath().getName())) { 658 ++dir; 659 } 660 } 661 } 662 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 663 } 664 if (writeMultipleTables) { 665 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 666 } 667 668 Admin admin = util.getConnection().getAdmin(); 669 try { 670 // handle the split case 671 if (shouldChangeRegions) { 672 Table chosenTable = allTables.values().iterator().next(); 673 // Choose a semi-random table if multiple tables are available 674 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 675 admin.disableTable(chosenTable.getName()); 676 util.waitUntilNoRegionsInTransition(); 677 678 util.deleteTable(chosenTable.getName()); 679 byte[][] newSplitKeys = generateRandomSplitKeys(14); 680 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 681 682 while (util.getConnection().getRegionLocator(chosenTable.getName()) 683 .getAllRegionLocations().size() != 15 || 684 !admin.isTableAvailable(table.getName())) { 685 Thread.sleep(200); 686 LOG.info("Waiting for new region assignment to happen"); 687 } 688 } 689 690 // Perform the actual load 691 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 692 Path tableDir = testDir; 693 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 694 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 695 if (writeMultipleTables) { 696 tableDir = new Path(testDir, tableNameStr); 697 } 698 Table currentTable = allTables.get(tableNameStr); 699 TableName currentTableName = currentTable.getName(); 700 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo 701 .getRegionLocator()); 702 703 // Ensure data shows up 704 int expectedRows = 0; 705 if (putSortReducer) { 706 // no rows should be extracted 707 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 708 util.countRows(currentTable)); 709 } else { 710 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 711 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 712 util.countRows(currentTable)); 713 Scan scan = new Scan(); 714 ResultScanner results = currentTable.getScanner(scan); 715 for (Result res : results) { 716 assertEquals(FAMILIES.length, res.rawCells().length); 717 Cell first = res.rawCells()[0]; 718 for (Cell kv : res.rawCells()) { 719 assertTrue(CellUtil.matchingRows(first, kv)); 720 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 721 } 722 } 723 results.close(); 724 } 725 String tableDigestBefore = util.checksumRows(currentTable); 726 // Check region locality 727 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 728 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 729 hbd.add(region.getHDFSBlocksDistribution()); 730 } 731 for (String hostname : hostnames) { 732 float locality = hbd.getBlockLocalityIndex(hostname); 733 LOG.info("locality of [" + hostname + "]: " + locality); 734 assertEquals(100, (int) (locality * 100)); 735 } 736 737 // Cause regions to reopen 738 admin.disableTable(currentTableName); 739 while (!admin.isTableDisabled(currentTableName)) { 740 Thread.sleep(200); 741 LOG.info("Waiting for table to disable"); 742 } 743 admin.enableTable(currentTableName); 744 util.waitTableAvailable(currentTableName); 745 assertEquals("Data should remain after reopening of regions", 746 tableDigestBefore, util.checksumRows(currentTable)); 747 } 748 } finally { 749 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 750 tableInfoSingle.getRegionLocator().close(); 751 } 752 for (Entry<String, Table> singleTable : allTables.entrySet() ) { 753 singleTable.getValue().close(); 754 util.deleteTable(singleTable.getValue().getName()); 755 } 756 testDir.getFileSystem(conf).delete(testDir, true); 757 util.shutdownMiniCluster(); 758 } 759 } 760 761 private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, 762 boolean putSortReducer) throws IOException, 763 InterruptedException, ClassNotFoundException { 764 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 765 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 766 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 767 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 768 KeyValueSerialization.class.getName()); 769 setupRandomGeneratorMapper(job, putSortReducer); 770 if (tableInfo.size() > 1) { 771 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 772 int sum = 0; 773 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 774 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 775 } 776 assertEquals(sum, job.getNumReduceTasks()); 777 } 778 else { 779 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 780 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 781 regionLocator); 782 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 783 } 784 785 FileOutputFormat.setOutputPath(job, outDir); 786 787 assertFalse(util.getTestFileSystem().exists(outDir)) ; 788 789 assertTrue(job.waitForCompletion(true)); 790 } 791 792 /** 793 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. 794 * Tests that the family compression map is correctly serialized into 795 * and deserialized from configuration 796 * 797 * @throws IOException 798 */ 799 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 800 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 801 for (int numCfs = 0; numCfs <= 3; numCfs++) { 802 Configuration conf = new Configuration(this.util.getConfiguration()); 803 Map<String, Compression.Algorithm> familyToCompression = 804 getMockColumnFamiliesForCompression(numCfs); 805 Table table = Mockito.mock(Table.class); 806 setupMockColumnFamiliesForCompression(table, familyToCompression); 807 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 808 HFileOutputFormat2.serializeColumnFamilyAttribute 809 (HFileOutputFormat2.compressionDetails, 810 Arrays.asList(table.getTableDescriptor()))); 811 812 // read back family specific compression setting from the configuration 813 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2 814 .createFamilyCompressionMap(conf); 815 816 // test that we have a value for all column families that matches with the 817 // used mock values 818 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 819 assertEquals("Compression configuration incorrect for column family:" 820 + entry.getKey(), entry.getValue(), 821 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 822 } 823 } 824 } 825 826 private void setupMockColumnFamiliesForCompression(Table table, 827 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 828 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 829 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 830 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 831 .setMaxVersions(1) 832 .setCompressionType(entry.getValue()) 833 .setBlockCacheEnabled(false) 834 .setTimeToLive(0)); 835 } 836 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 837 } 838 839 /** 840 * @return a map from column family names to compression algorithms for 841 * testing column family compression. Column family names have special characters 842 */ 843 private Map<String, Compression.Algorithm> 844 getMockColumnFamiliesForCompression (int numCfs) { 845 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 846 // use column family names having special characters 847 if (numCfs-- > 0) { 848 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 849 } 850 if (numCfs-- > 0) { 851 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 852 } 853 if (numCfs-- > 0) { 854 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 855 } 856 if (numCfs-- > 0) { 857 familyToCompression.put("Family3", Compression.Algorithm.NONE); 858 } 859 return familyToCompression; 860 } 861 862 863 /** 864 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. 865 * Tests that the family bloom type map is correctly serialized into 866 * and deserialized from configuration 867 * 868 * @throws IOException 869 */ 870 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 871 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 872 for (int numCfs = 0; numCfs <= 2; numCfs++) { 873 Configuration conf = new Configuration(this.util.getConfiguration()); 874 Map<String, BloomType> familyToBloomType = 875 getMockColumnFamiliesForBloomType(numCfs); 876 Table table = Mockito.mock(Table.class); 877 setupMockColumnFamiliesForBloomType(table, 878 familyToBloomType); 879 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 880 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 881 Arrays.asList(table.getTableDescriptor()))); 882 883 // read back family specific data block encoding settings from the 884 // configuration 885 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 886 HFileOutputFormat2 887 .createFamilyBloomTypeMap(conf); 888 889 // test that we have a value for all column families that matches with the 890 // used mock values 891 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 892 assertEquals("BloomType configuration incorrect for column family:" 893 + entry.getKey(), entry.getValue(), 894 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 895 } 896 } 897 } 898 899 private void setupMockColumnFamiliesForBloomType(Table table, 900 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 901 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 902 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 903 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 904 .setMaxVersions(1) 905 .setBloomFilterType(entry.getValue()) 906 .setBlockCacheEnabled(false) 907 .setTimeToLive(0)); 908 } 909 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 910 } 911 912 /** 913 * @return a map from column family names to compression algorithms for 914 * testing column family compression. Column family names have special characters 915 */ 916 private Map<String, BloomType> 917 getMockColumnFamiliesForBloomType (int numCfs) { 918 Map<String, BloomType> familyToBloomType = new HashMap<>(); 919 // use column family names having special characters 920 if (numCfs-- > 0) { 921 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 922 } 923 if (numCfs-- > 0) { 924 familyToBloomType.put("Family2=asdads&!AASD", 925 BloomType.ROWCOL); 926 } 927 if (numCfs-- > 0) { 928 familyToBloomType.put("Family3", BloomType.NONE); 929 } 930 return familyToBloomType; 931 } 932 933 /** 934 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. 935 * Tests that the family block size map is correctly serialized into 936 * and deserialized from configuration 937 * 938 * @throws IOException 939 */ 940 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 941 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 942 for (int numCfs = 0; numCfs <= 3; numCfs++) { 943 Configuration conf = new Configuration(this.util.getConfiguration()); 944 Map<String, Integer> familyToBlockSize = 945 getMockColumnFamiliesForBlockSize(numCfs); 946 Table table = Mockito.mock(Table.class); 947 setupMockColumnFamiliesForBlockSize(table, 948 familyToBlockSize); 949 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 950 HFileOutputFormat2.serializeColumnFamilyAttribute 951 (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table 952 .getTableDescriptor()))); 953 954 // read back family specific data block encoding settings from the 955 // configuration 956 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 957 HFileOutputFormat2 958 .createFamilyBlockSizeMap(conf); 959 960 // test that we have a value for all column families that matches with the 961 // used mock values 962 for (Entry<String, Integer> entry : familyToBlockSize.entrySet() 963 ) { 964 assertEquals("BlockSize configuration incorrect for column family:" 965 + entry.getKey(), entry.getValue(), 966 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 967 } 968 } 969 } 970 971 private void setupMockColumnFamiliesForBlockSize(Table table, 972 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 973 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 974 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 975 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 976 .setMaxVersions(1) 977 .setBlocksize(entry.getValue()) 978 .setBlockCacheEnabled(false) 979 .setTimeToLive(0)); 980 } 981 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 982 } 983 984 /** 985 * @return a map from column family names to compression algorithms for 986 * testing column family compression. Column family names have special characters 987 */ 988 private Map<String, Integer> 989 getMockColumnFamiliesForBlockSize (int numCfs) { 990 Map<String, Integer> familyToBlockSize = new HashMap<>(); 991 // use column family names having special characters 992 if (numCfs-- > 0) { 993 familyToBlockSize.put("Family1!@#!@#&", 1234); 994 } 995 if (numCfs-- > 0) { 996 familyToBlockSize.put("Family2=asdads&!AASD", 997 Integer.MAX_VALUE); 998 } 999 if (numCfs-- > 0) { 1000 familyToBlockSize.put("Family2=asdads&!AASD", 1001 Integer.MAX_VALUE); 1002 } 1003 if (numCfs-- > 0) { 1004 familyToBlockSize.put("Family3", 0); 1005 } 1006 return familyToBlockSize; 1007 } 1008 1009 /** 1010 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. 1011 * Tests that the family data block encoding map is correctly serialized into 1012 * and deserialized from configuration 1013 * 1014 * @throws IOException 1015 */ 1016 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1017 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1018 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1019 Configuration conf = new Configuration(this.util.getConfiguration()); 1020 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1021 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1022 Table table = Mockito.mock(Table.class); 1023 setupMockColumnFamiliesForDataBlockEncoding(table, 1024 familyToDataBlockEncoding); 1025 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1026 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1027 HFileOutputFormat2.serializeColumnFamilyAttribute 1028 (HFileOutputFormat2.dataBlockEncodingDetails, Arrays 1029 .asList(tableDescriptor))); 1030 1031 // read back family specific data block encoding settings from the 1032 // configuration 1033 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1034 HFileOutputFormat2 1035 .createFamilyDataBlockEncodingMap(conf); 1036 1037 // test that we have a value for all column families that matches with the 1038 // used mock values 1039 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1040 assertEquals("DataBlockEncoding configuration incorrect for column family:" 1041 + entry.getKey(), entry.getValue(), 1042 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1043 } 1044 } 1045 } 1046 1047 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1048 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1049 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1050 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1051 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 1052 .setMaxVersions(1) 1053 .setDataBlockEncoding(entry.getValue()) 1054 .setBlockCacheEnabled(false) 1055 .setTimeToLive(0)); 1056 } 1057 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1058 } 1059 1060 /** 1061 * @return a map from column family names to compression algorithms for 1062 * testing column family compression. Column family names have special characters 1063 */ 1064 private Map<String, DataBlockEncoding> 1065 getMockColumnFamiliesForDataBlockEncoding (int numCfs) { 1066 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1067 // use column family names having special characters 1068 if (numCfs-- > 0) { 1069 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1070 } 1071 if (numCfs-- > 0) { 1072 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1073 DataBlockEncoding.FAST_DIFF); 1074 } 1075 if (numCfs-- > 0) { 1076 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1077 DataBlockEncoding.PREFIX); 1078 } 1079 if (numCfs-- > 0) { 1080 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1081 } 1082 return familyToDataBlockEncoding; 1083 } 1084 1085 private void setupMockStartKeys(RegionLocator table) throws IOException { 1086 byte[][] mockKeys = new byte[][] { 1087 HConstants.EMPTY_BYTE_ARRAY, 1088 Bytes.toBytes("aaa"), 1089 Bytes.toBytes("ggg"), 1090 Bytes.toBytes("zzz") 1091 }; 1092 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1093 } 1094 1095 private void setupMockTableName(RegionLocator table) throws IOException { 1096 TableName mockTableName = TableName.valueOf("mock_table"); 1097 Mockito.doReturn(mockTableName).when(table).getName(); 1098 } 1099 1100 /** 1101 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and 1102 * bloom filter settings from the column family descriptor 1103 */ 1104 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1105 public void testColumnFamilySettings() throws Exception { 1106 Configuration conf = new Configuration(this.util.getConfiguration()); 1107 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1108 TaskAttemptContext context = null; 1109 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1110 1111 // Setup table descriptor 1112 Table table = Mockito.mock(Table.class); 1113 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1114 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1115 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1116 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { 1117 htd.addFamily(hcd); 1118 } 1119 1120 // set up the table to return some mock keys 1121 setupMockStartKeys(regionLocator); 1122 1123 try { 1124 // partial map red setup to get an operational writer for testing 1125 // We turn off the sequence file compression, because DefaultCodec 1126 // pollutes the GZip codec pool with an incompatible compressor. 1127 conf.set("io.seqfile.compression.type", "NONE"); 1128 conf.set("hbase.fs.tmp.dir", dir.toString()); 1129 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1130 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1131 1132 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1133 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1134 setupRandomGeneratorMapper(job, false); 1135 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1136 FileOutputFormat.setOutputPath(job, dir); 1137 context = createTestTaskAttemptContext(job); 1138 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1139 writer = hof.getRecordWriter(context); 1140 1141 // write out random rows 1142 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1143 writer.close(context); 1144 1145 // Make sure that a directory was created for every CF 1146 FileSystem fs = dir.getFileSystem(conf); 1147 1148 // commit so that the filesystem has one directory per column family 1149 hof.getOutputCommitter(context).commitTask(context); 1150 hof.getOutputCommitter(context).commitJob(context); 1151 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1152 assertEquals(htd.getFamilies().size(), families.length); 1153 for (FileStatus f : families) { 1154 String familyStr = f.getPath().getName(); 1155 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1156 // verify that the compression on this file matches the configured 1157 // compression 1158 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1159 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1160 Map<byte[], byte[]> fileInfo = reader.loadFileInfo(); 1161 1162 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1163 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1164 assertEquals("Incorrect bloom filter used for column family " + familyStr + 1165 "(reader: " + reader + ")", 1166 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1167 assertEquals("Incorrect compression used for column family " + familyStr + 1168 "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression()); 1169 } 1170 } finally { 1171 dir.getFileSystem(conf).delete(dir, true); 1172 } 1173 } 1174 1175 /** 1176 * Write random values to the writer assuming a table created using 1177 * {@link #FAMILIES} as column family descriptors 1178 */ 1179 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1180 TaskAttemptContext context, Set<byte[]> families, int numRows) 1181 throws IOException, InterruptedException { 1182 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1183 int valLength = 10; 1184 byte valBytes[] = new byte[valLength]; 1185 1186 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1187 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1188 final byte [] qualifier = Bytes.toBytes("data"); 1189 Random random = new Random(); 1190 for (int i = 0; i < numRows; i++) { 1191 1192 Bytes.putInt(keyBytes, 0, i); 1193 random.nextBytes(valBytes); 1194 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1195 1196 for (byte[] family : families) { 1197 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1198 writer.write(key, kv); 1199 } 1200 } 1201 } 1202 1203 /** 1204 * This test is to test the scenario happened in HBASE-6901. 1205 * All files are bulk loaded and excluded from minor compaction. 1206 * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException 1207 * will be thrown. 1208 */ 1209 @Ignore ("Flakey: See HBASE-9051") @Test 1210 public void testExcludeAllFromMinorCompaction() throws Exception { 1211 Configuration conf = util.getConfiguration(); 1212 conf.setInt("hbase.hstore.compaction.min", 2); 1213 generateRandomStartKeys(5); 1214 1215 util.startMiniCluster(); 1216 try (Connection conn = ConnectionFactory.createConnection(); 1217 Admin admin = conn.getAdmin(); 1218 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1219 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1220 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1221 assertEquals("Should start with empty table", 0, util.countRows(table)); 1222 1223 // deep inspection: get the StoreFile dir 1224 final Path storePath = new Path( 1225 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), 1226 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1227 Bytes.toString(FAMILIES[0]))); 1228 assertEquals(0, fs.listStatus(storePath).length); 1229 1230 // Generate two bulk load files 1231 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1232 true); 1233 1234 for (int i = 0; i < 2; i++) { 1235 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1236 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1237 .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); 1238 // Perform the actual load 1239 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1240 } 1241 1242 // Ensure data shows up 1243 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1244 assertEquals("LoadIncrementalHFiles should put expected data in table", 1245 expectedRows, util.countRows(table)); 1246 1247 // should have a second StoreFile now 1248 assertEquals(2, fs.listStatus(storePath).length); 1249 1250 // minor compactions shouldn't get rid of the file 1251 admin.compact(TABLE_NAMES[0]); 1252 try { 1253 quickPoll(new Callable<Boolean>() { 1254 @Override 1255 public Boolean call() throws Exception { 1256 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1257 for (HRegion region : regions) { 1258 for (HStore store : region.getStores()) { 1259 store.closeAndArchiveCompactedFiles(); 1260 } 1261 } 1262 return fs.listStatus(storePath).length == 1; 1263 } 1264 }, 5000); 1265 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1266 } catch (AssertionError ae) { 1267 // this is expected behavior 1268 } 1269 1270 // a major compaction should work though 1271 admin.majorCompact(TABLE_NAMES[0]); 1272 quickPoll(new Callable<Boolean>() { 1273 @Override 1274 public Boolean call() throws Exception { 1275 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1276 for (HRegion region : regions) { 1277 for (HStore store : region.getStores()) { 1278 store.closeAndArchiveCompactedFiles(); 1279 } 1280 } 1281 return fs.listStatus(storePath).length == 1; 1282 } 1283 }, 5000); 1284 1285 } finally { 1286 util.shutdownMiniCluster(); 1287 } 1288 } 1289 1290 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1291 public void testExcludeMinorCompaction() throws Exception { 1292 Configuration conf = util.getConfiguration(); 1293 conf.setInt("hbase.hstore.compaction.min", 2); 1294 generateRandomStartKeys(5); 1295 1296 util.startMiniCluster(); 1297 try (Connection conn = ConnectionFactory.createConnection(conf); 1298 Admin admin = conn.getAdmin()){ 1299 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1300 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1301 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1302 assertEquals("Should start with empty table", 0, util.countRows(table)); 1303 1304 // deep inspection: get the StoreFile dir 1305 final Path storePath = new Path( 1306 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), 1307 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1308 Bytes.toString(FAMILIES[0]))); 1309 assertEquals(0, fs.listStatus(storePath).length); 1310 1311 // put some data in it and flush to create a storefile 1312 Put p = new Put(Bytes.toBytes("test")); 1313 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1314 table.put(p); 1315 admin.flush(TABLE_NAMES[0]); 1316 assertEquals(1, util.countRows(table)); 1317 quickPoll(new Callable<Boolean>() { 1318 @Override 1319 public Boolean call() throws Exception { 1320 return fs.listStatus(storePath).length == 1; 1321 } 1322 }, 5000); 1323 1324 // Generate a bulk load file with more rows 1325 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1326 true); 1327 1328 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1329 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1330 .getTableDescriptor(), regionLocator)), testDir, false); 1331 1332 // Perform the actual load 1333 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1334 1335 // Ensure data shows up 1336 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1337 assertEquals("LoadIncrementalHFiles should put expected data in table", 1338 expectedRows + 1, util.countRows(table)); 1339 1340 // should have a second StoreFile now 1341 assertEquals(2, fs.listStatus(storePath).length); 1342 1343 // minor compactions shouldn't get rid of the file 1344 admin.compact(TABLE_NAMES[0]); 1345 try { 1346 quickPoll(new Callable<Boolean>() { 1347 @Override 1348 public Boolean call() throws Exception { 1349 return fs.listStatus(storePath).length == 1; 1350 } 1351 }, 5000); 1352 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1353 } catch (AssertionError ae) { 1354 // this is expected behavior 1355 } 1356 1357 // a major compaction should work though 1358 admin.majorCompact(TABLE_NAMES[0]); 1359 quickPoll(new Callable<Boolean>() { 1360 @Override 1361 public Boolean call() throws Exception { 1362 return fs.listStatus(storePath).length == 1; 1363 } 1364 }, 5000); 1365 1366 } finally { 1367 util.shutdownMiniCluster(); 1368 } 1369 } 1370 1371 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1372 int sleepMs = 10; 1373 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1374 while (retries-- > 0) { 1375 if (c.call().booleanValue()) { 1376 return; 1377 } 1378 Thread.sleep(sleepMs); 1379 } 1380 fail(); 1381 } 1382 1383 public static void main(String args[]) throws Exception { 1384 new TestHFileOutputFormat2().manualTest(args); 1385 } 1386 1387 public void manualTest(String args[]) throws Exception { 1388 Configuration conf = HBaseConfiguration.create(); 1389 util = new HBaseTestingUtility(conf); 1390 if ("newtable".equals(args[0])) { 1391 TableName tname = TableName.valueOf(args[1]); 1392 byte[][] splitKeys = generateRandomSplitKeys(4); 1393 Table table = util.createTable(tname, FAMILIES, splitKeys); 1394 } else if ("incremental".equals(args[0])) { 1395 TableName tname = TableName.valueOf(args[1]); 1396 try(Connection c = ConnectionFactory.createConnection(conf); 1397 Admin admin = c.getAdmin(); 1398 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1399 Path outDir = new Path("incremental-out"); 1400 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin 1401 .getTableDescriptor(tname), regionLocator)), outDir, false); 1402 } 1403 } else { 1404 throw new RuntimeException( 1405 "usage: TestHFileOutputFormat2 newtable | incremental"); 1406 } 1407 } 1408 1409 @Test 1410 public void testBlockStoragePolicy() throws Exception { 1411 util = new HBaseTestingUtility(); 1412 Configuration conf = util.getConfiguration(); 1413 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1414 1415 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + 1416 Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( 1417 TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); 1418 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1419 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1420 util.startMiniDFSCluster(3); 1421 FileSystem fs = util.getDFSCluster().getFileSystem(); 1422 try { 1423 fs.mkdirs(cf1Dir); 1424 fs.mkdirs(cf2Dir); 1425 1426 // the original block storage policy would be HOT 1427 String spA = getStoragePolicyName(fs, cf1Dir); 1428 String spB = getStoragePolicyName(fs, cf2Dir); 1429 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1430 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1431 assertEquals("HOT", spA); 1432 assertEquals("HOT", spB); 1433 1434 // alter table cf schema to change storage policies 1435 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1436 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1437 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1438 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1439 spA = getStoragePolicyName(fs, cf1Dir); 1440 spB = getStoragePolicyName(fs, cf2Dir); 1441 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1442 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1443 assertNotNull(spA); 1444 assertEquals("ONE_SSD", spA); 1445 assertNotNull(spB); 1446 assertEquals("ALL_SSD", spB); 1447 } finally { 1448 fs.delete(cf1Dir, true); 1449 fs.delete(cf2Dir, true); 1450 util.shutdownMiniDFSCluster(); 1451 } 1452 } 1453 1454 private String getStoragePolicyName(FileSystem fs, Path path) { 1455 try { 1456 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1457 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1458 } catch (Exception e) { 1459 // Maybe fail because of using old HDFS version, try the old way 1460 if (LOG.isTraceEnabled()) { 1461 LOG.trace("Failed to get policy directly", e); 1462 } 1463 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1464 return policy == null ? "HOT" : policy;// HOT by default 1465 } 1466 } 1467 1468 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1469 try { 1470 if (fs instanceof DistributedFileSystem) { 1471 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1472 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1473 if (null != status) { 1474 byte storagePolicyId = status.getStoragePolicy(); 1475 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1476 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1477 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1478 for (BlockStoragePolicy policy : policies) { 1479 if (policy.getId() == storagePolicyId) { 1480 return policy.getName(); 1481 } 1482 } 1483 } 1484 } 1485 } 1486 } catch (Throwable e) { 1487 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1488 } 1489 1490 return null; 1491 } 1492 1493 @Test 1494 public void TestConfigureCompression() throws Exception { 1495 Configuration conf = new Configuration(this.util.getConfiguration()); 1496 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1497 TaskAttemptContext context = null; 1498 Path dir = util.getDataTestDir("TestConfigureCompression"); 1499 String hfileoutputformatCompression = "gz"; 1500 1501 try { 1502 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1503 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1504 1505 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1506 1507 Job job = Job.getInstance(conf); 1508 FileOutputFormat.setOutputPath(job, dir); 1509 context = createTestTaskAttemptContext(job); 1510 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1511 writer = hof.getRecordWriter(context); 1512 final byte[] b = Bytes.toBytes("b"); 1513 1514 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1515 writer.write(new ImmutableBytesWritable(), kv); 1516 writer.close(context); 1517 writer = null; 1518 FileSystem fs = dir.getFileSystem(conf); 1519 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1520 while (iterator.hasNext()) { 1521 LocatedFileStatus keyFileStatus = iterator.next(); 1522 HFile.Reader reader = 1523 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1524 assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression); 1525 } 1526 } finally { 1527 if (writer != null && context != null) { 1528 writer.close(context); 1529 } 1530 dir.getFileSystem(conf).delete(dir, true); 1531 } 1532 1533 } 1534} 1535