001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNotSame; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Random; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.stream.Collectors; 041import java.util.stream.Stream; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.LocatedFileStatus; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.fs.RemoteIterator; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellUtil; 051import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HDFSBlocksDistribution; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.HadoopShims; 060import org.apache.hadoop.hbase.KeyValue; 061import org.apache.hadoop.hbase.PerformanceEvaluation; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.Tag; 064import org.apache.hadoop.hbase.TagType; 065import org.apache.hadoop.hbase.TagUtil; 066import org.apache.hadoop.hbase.client.Admin; 067import org.apache.hadoop.hbase.client.Connection; 068import org.apache.hadoop.hbase.client.ConnectionFactory; 069import org.apache.hadoop.hbase.client.Put; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.Table; 075import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 076import org.apache.hadoop.hbase.io.compress.Compression; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 079import org.apache.hadoop.hbase.io.hfile.CacheConfig; 080import org.apache.hadoop.hbase.io.hfile.HFile; 081import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.regionserver.BloomType; 084import org.apache.hadoop.hbase.regionserver.HRegion; 085import org.apache.hadoop.hbase.regionserver.HStore; 086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 088import org.apache.hadoop.hbase.testclassification.LargeTests; 089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.CommonFSUtils; 093import org.apache.hadoop.hbase.util.FSUtils; 094import org.apache.hadoop.hbase.util.ReflectionUtils; 095import org.apache.hadoop.hdfs.DistributedFileSystem; 096import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 097import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 098import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 099import org.apache.hadoop.io.NullWritable; 100import org.apache.hadoop.mapreduce.Job; 101import org.apache.hadoop.mapreduce.Mapper; 102import org.apache.hadoop.mapreduce.RecordWriter; 103import org.apache.hadoop.mapreduce.TaskAttemptContext; 104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 105import org.junit.ClassRule; 106import org.junit.Ignore; 107import org.junit.Test; 108import org.junit.experimental.categories.Category; 109import org.mockito.Mockito; 110import org.slf4j.Logger; 111import org.slf4j.LoggerFactory; 112 113/** 114 * Simple test for {@link HFileOutputFormat2}. 115 * Sets up and runs a mapreduce job that writes hfile output. 116 * Creates a few inner classes to implement splits and an inputformat that 117 * emits keys and values like those of {@link PerformanceEvaluation}. 118 */ 119@Category({VerySlowMapReduceTests.class, LargeTests.class}) 120public class TestCellBasedHFileOutputFormat2 { 121 122 @ClassRule 123 public static final HBaseClassTestRule CLASS_RULE = 124 HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class); 125 126 private final static int ROWSPERSPLIT = 1024; 127 128 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 129 private static final byte[][] FAMILIES = { 130 Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))}; 131 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", 132 "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); 133 134 private HBaseTestingUtility util = new HBaseTestingUtility(); 135 136 private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class); 137 138 /** 139 * Simple mapper that makes KeyValue output. 140 */ 141 static class RandomKVGeneratingMapper 142 extends Mapper<NullWritable, NullWritable, 143 ImmutableBytesWritable, Cell> { 144 145 private int keyLength; 146 private static final int KEYLEN_DEFAULT=10; 147 private static final String KEYLEN_CONF="randomkv.key.length"; 148 149 private int valLength; 150 private static final int VALLEN_DEFAULT=10; 151 private static final String VALLEN_CONF="randomkv.val.length"; 152 private static final byte [] QUALIFIER = Bytes.toBytes("data"); 153 private boolean multiTableMapper = false; 154 private TableName[] tables = null; 155 156 157 @Override 158 protected void setup(Context context) throws IOException, 159 InterruptedException { 160 super.setup(context); 161 162 Configuration conf = context.getConfiguration(); 163 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 164 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 165 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 166 false); 167 if (multiTableMapper) { 168 tables = TABLE_NAMES; 169 } else { 170 tables = new TableName[]{TABLE_NAMES[0]}; 171 } 172 } 173 174 @Override 175 protected void map( 176 NullWritable n1, NullWritable n2, 177 Mapper<NullWritable, NullWritable, 178 ImmutableBytesWritable,Cell>.Context context) 179 throws java.io.IOException ,InterruptedException 180 { 181 182 byte keyBytes[] = new byte[keyLength]; 183 byte valBytes[] = new byte[valLength]; 184 185 int taskId = context.getTaskAttemptID().getTaskID().getId(); 186 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 187 Random random = new Random(); 188 byte[] key; 189 for (int j = 0; j < tables.length; ++j) { 190 for (int i = 0; i < ROWSPERSPLIT; i++) { 191 random.nextBytes(keyBytes); 192 // Ensure that unique tasks generate unique keys 193 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 194 random.nextBytes(valBytes); 195 key = keyBytes; 196 if (multiTableMapper) { 197 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 198 } 199 200 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 201 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 202 context.write(new ImmutableBytesWritable(key), kv); 203 } 204 } 205 } 206 } 207 } 208 209 /** 210 * Simple mapper that makes Put output. 211 */ 212 static class RandomPutGeneratingMapper 213 extends Mapper<NullWritable, NullWritable, 214 ImmutableBytesWritable, Put> { 215 216 private int keyLength; 217 private static final int KEYLEN_DEFAULT = 10; 218 private static final String KEYLEN_CONF = "randomkv.key.length"; 219 220 private int valLength; 221 private static final int VALLEN_DEFAULT = 10; 222 private static final String VALLEN_CONF = "randomkv.val.length"; 223 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 224 private boolean multiTableMapper = false; 225 private TableName[] tables = null; 226 227 @Override 228 protected void setup(Context context) throws IOException, 229 InterruptedException { 230 super.setup(context); 231 232 Configuration conf = context.getConfiguration(); 233 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 234 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 235 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 236 false); 237 if (multiTableMapper) { 238 tables = TABLE_NAMES; 239 } else { 240 tables = new TableName[]{TABLE_NAMES[0]}; 241 } 242 } 243 244 @Override 245 protected void map( 246 NullWritable n1, NullWritable n2, 247 Mapper<NullWritable, NullWritable, 248 ImmutableBytesWritable, Put>.Context context) 249 throws java.io.IOException, InterruptedException { 250 251 byte keyBytes[] = new byte[keyLength]; 252 byte valBytes[] = new byte[valLength]; 253 254 int taskId = context.getTaskAttemptID().getTaskID().getId(); 255 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 256 257 Random random = new Random(); 258 byte[] key; 259 for (int j = 0; j < tables.length; ++j) { 260 for (int i = 0; i < ROWSPERSPLIT; i++) { 261 random.nextBytes(keyBytes); 262 // Ensure that unique tasks generate unique keys 263 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 264 random.nextBytes(valBytes); 265 key = keyBytes; 266 if (multiTableMapper) { 267 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 268 } 269 270 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 271 Put p = new Put(keyBytes); 272 p.addColumn(family, QUALIFIER, valBytes); 273 // set TTL to very low so that the scan does not return any value 274 p.setTTL(1l); 275 context.write(new ImmutableBytesWritable(key), p); 276 } 277 } 278 } 279 } 280 } 281 282 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 283 if (putSortReducer) { 284 job.setInputFormatClass(NMapInputFormat.class); 285 job.setMapperClass(RandomPutGeneratingMapper.class); 286 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 287 job.setMapOutputValueClass(Put.class); 288 } else { 289 job.setInputFormatClass(NMapInputFormat.class); 290 job.setMapperClass(RandomKVGeneratingMapper.class); 291 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 292 job.setMapOutputValueClass(KeyValue.class); 293 } 294 } 295 296 /** 297 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if 298 * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. 299 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 300 */ 301 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 302 public void test_LATEST_TIMESTAMP_isReplaced() 303 throws Exception { 304 Configuration conf = new Configuration(this.util.getConfiguration()); 305 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 306 TaskAttemptContext context = null; 307 Path dir = 308 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 309 try { 310 Job job = new Job(conf); 311 FileOutputFormat.setOutputPath(job, dir); 312 context = createTestTaskAttemptContext(job); 313 HFileOutputFormat2 hof = new HFileOutputFormat2(); 314 writer = hof.getRecordWriter(context); 315 final byte [] b = Bytes.toBytes("b"); 316 317 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 318 // changed by call to write. Check all in kv is same but ts. 319 KeyValue kv = new KeyValue(b, b, b); 320 KeyValue original = kv.clone(); 321 writer.write(new ImmutableBytesWritable(), kv); 322 assertFalse(original.equals(kv)); 323 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 324 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 325 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 326 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 327 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 328 329 // Test 2. Now test passing a kv that has explicit ts. It should not be 330 // changed by call to record write. 331 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 332 original = kv.clone(); 333 writer.write(new ImmutableBytesWritable(), kv); 334 assertTrue(original.equals(kv)); 335 } finally { 336 if (writer != null && context != null) writer.close(context); 337 dir.getFileSystem(conf).delete(dir, true); 338 } 339 } 340 341 private TaskAttemptContext createTestTaskAttemptContext(final Job job) 342 throws Exception { 343 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 344 TaskAttemptContext context = hadoop.createTestTaskAttemptContext( 345 job, "attempt_201402131733_0001_m_000000_0"); 346 return context; 347 } 348 349 /* 350 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE 351 * metadata used by time-restricted scans. 352 */ 353 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 354 public void test_TIMERANGE() throws Exception { 355 Configuration conf = new Configuration(this.util.getConfiguration()); 356 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 357 TaskAttemptContext context = null; 358 Path dir = 359 util.getDataTestDir("test_TIMERANGE_present"); 360 LOG.info("Timerange dir writing to dir: "+ dir); 361 try { 362 // build a record writer using HFileOutputFormat2 363 Job job = new Job(conf); 364 FileOutputFormat.setOutputPath(job, dir); 365 context = createTestTaskAttemptContext(job); 366 HFileOutputFormat2 hof = new HFileOutputFormat2(); 367 writer = hof.getRecordWriter(context); 368 369 // Pass two key values with explicit times stamps 370 final byte [] b = Bytes.toBytes("b"); 371 372 // value 1 with timestamp 2000 373 KeyValue kv = new KeyValue(b, b, b, 2000, b); 374 KeyValue original = kv.clone(); 375 writer.write(new ImmutableBytesWritable(), kv); 376 assertEquals(original,kv); 377 378 // value 2 with timestamp 1000 379 kv = new KeyValue(b, b, b, 1000, b); 380 original = kv.clone(); 381 writer.write(new ImmutableBytesWritable(), kv); 382 assertEquals(original, kv); 383 384 // verify that the file has the proper FileInfo. 385 writer.close(context); 386 387 // the generated file lives 1 directory down from the attempt directory 388 // and is the only file, e.g. 389 // _attempt__0000_r_000000_0/b/1979617994050536795 390 FileSystem fs = FileSystem.get(conf); 391 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 392 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 393 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 394 395 // open as HFile Reader and pull out TIMERANGE FileInfo. 396 HFile.Reader rd = 397 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 398 Map<byte[],byte[]> finfo = rd.getHFileInfo(); 399 byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); 400 assertNotNull(range); 401 402 // unmarshall and check values. 403 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 404 LOG.info(timeRangeTracker.getMin() + 405 "...." + timeRangeTracker.getMax()); 406 assertEquals(1000, timeRangeTracker.getMin()); 407 assertEquals(2000, timeRangeTracker.getMax()); 408 rd.close(); 409 } finally { 410 if (writer != null && context != null) writer.close(context); 411 dir.getFileSystem(conf).delete(dir, true); 412 } 413 } 414 415 /** 416 * Run small MR job. 417 */ 418 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 419 public void testWritingPEData() throws Exception { 420 Configuration conf = util.getConfiguration(); 421 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 422 FileSystem fs = testDir.getFileSystem(conf); 423 424 // Set down this value or we OOME in eclipse. 425 conf.setInt("mapreduce.task.io.sort.mb", 20); 426 // Write a few files. 427 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); 428 429 Job job = new Job(conf, "testWritingPEData"); 430 setupRandomGeneratorMapper(job, false); 431 // This partitioner doesn't work well for number keys but using it anyways 432 // just to demonstrate how to configure it. 433 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 434 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 435 436 Arrays.fill(startKey, (byte)0); 437 Arrays.fill(endKey, (byte)0xff); 438 439 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 440 // Set start and end rows for partitioner. 441 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 442 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 443 job.setReducerClass(CellSortReducer.class); 444 job.setOutputFormatClass(HFileOutputFormat2.class); 445 job.setNumReduceTasks(4); 446 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 447 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 448 CellSerialization.class.getName()); 449 450 FileOutputFormat.setOutputPath(job, testDir); 451 assertTrue(job.waitForCompletion(false)); 452 FileStatus [] files = fs.listStatus(testDir); 453 assertTrue(files.length > 0); 454 } 455 456 /** 457 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into 458 * hfile. 459 */ 460 @Test 461 public void test_WritingTagData() 462 throws Exception { 463 Configuration conf = new Configuration(this.util.getConfiguration()); 464 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 465 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 466 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 467 TaskAttemptContext context = null; 468 Path dir = 469 util.getDataTestDir("WritingTagData"); 470 try { 471 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 472 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 473 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 474 Job job = new Job(conf); 475 FileOutputFormat.setOutputPath(job, dir); 476 context = createTestTaskAttemptContext(job); 477 HFileOutputFormat2 hof = new HFileOutputFormat2(); 478 writer = hof.getRecordWriter(context); 479 final byte [] b = Bytes.toBytes("b"); 480 481 List< Tag > tags = new ArrayList<>(); 482 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 483 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 484 writer.write(new ImmutableBytesWritable(), kv); 485 writer.close(context); 486 writer = null; 487 FileSystem fs = dir.getFileSystem(conf); 488 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 489 while(iterator.hasNext()) { 490 LocatedFileStatus keyFileStatus = iterator.next(); 491 HFile.Reader reader = 492 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 493 HFileScanner scanner = reader.getScanner(false, false, false); 494 scanner.seekTo(); 495 Cell cell = scanner.getCell(); 496 List<Tag> tagsFromCell = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), 497 cell.getTagsLength()); 498 assertTrue(tagsFromCell.size() > 0); 499 for (Tag tag : tagsFromCell) { 500 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 501 } 502 } 503 } finally { 504 if (writer != null && context != null) writer.close(context); 505 dir.getFileSystem(conf).delete(dir, true); 506 } 507 } 508 509 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 510 public void testJobConfiguration() throws Exception { 511 Configuration conf = new Configuration(this.util.getConfiguration()); 512 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration") 513 .toString()); 514 Job job = new Job(conf); 515 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 516 Table table = Mockito.mock(Table.class); 517 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 518 setupMockStartKeys(regionLocator); 519 setupMockTableName(regionLocator); 520 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 521 assertEquals(job.getNumReduceTasks(), 4); 522 } 523 524 private byte [][] generateRandomStartKeys(int numKeys) { 525 Random random = new Random(); 526 byte[][] ret = new byte[numKeys][]; 527 // first region start key is always empty 528 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 529 for (int i = 1; i < numKeys; i++) { 530 ret[i] = 531 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 532 } 533 return ret; 534 } 535 536 private byte[][] generateRandomSplitKeys(int numKeys) { 537 Random random = new Random(); 538 byte[][] ret = new byte[numKeys][]; 539 for (int i = 0; i < numKeys; i++) { 540 ret[i] = 541 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 542 } 543 return ret; 544 } 545 546 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 547 public void testMRIncrementalLoad() throws Exception { 548 LOG.info("\nStarting test testMRIncrementalLoad\n"); 549 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 550 } 551 552 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 553 public void testMRIncrementalLoadWithSplit() throws Exception { 554 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 555 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 556 } 557 558 /** 559 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true 560 * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY 561 * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports), 562 * it's not possible to check the region locality by comparing region locations and DN hostnames. 563 * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does), 564 * we could test region locality features more easily. 565 */ 566 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 567 public void testMRIncrementalLoadWithLocality() throws Exception { 568 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 569 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 570 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 571 } 572 573 //@Ignore("Wahtevs") 574 @Test 575 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 576 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 577 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 578 } 579 580 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 581 boolean putSortReducer, String tableStr) throws Exception { 582 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 583 Arrays.asList(tableStr)); 584 } 585 586 @Test 587 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 588 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 589 doIncrementalLoadTest(false, false, true, 590 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList 591 ())); 592 } 593 594 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 595 boolean putSortReducer, List<String> tableStr) throws Exception { 596 util = new HBaseTestingUtility(); 597 Configuration conf = util.getConfiguration(); 598 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 599 int hostCount = 1; 600 int regionNum = 5; 601 if (shouldKeepLocality) { 602 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 603 // explicit hostnames parameter just like MiniDFSCluster does. 604 hostCount = 3; 605 regionNum = 20; 606 } 607 608 String[] hostnames = new String[hostCount]; 609 for (int i = 0; i < hostCount; ++i) { 610 hostnames[i] = "datanode_" + i; 611 } 612 util.startMiniCluster(1, hostCount, hostnames); 613 614 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 615 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 616 boolean writeMultipleTables = tableStr.size() > 1; 617 for (String tableStrSingle : tableStr) { 618 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 619 TableName tableName = TableName.valueOf(tableStrSingle); 620 Table table = util.createTable(tableName, FAMILIES, splitKeys); 621 622 RegionLocator r = util.getConnection().getRegionLocator(tableName); 623 assertEquals("Should start with empty table", 0, util.countRows(table)); 624 int numRegions = r.getStartKeys().length; 625 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 626 627 allTables.put(tableStrSingle, table); 628 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 629 } 630 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 631 // Generate the bulk load files 632 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 633 634 for (Table tableSingle : allTables.values()) { 635 // This doesn't write into the table, just makes files 636 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 637 } 638 int numTableDirs = 0; 639 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 640 Path tablePath = testDir; 641 642 if (writeMultipleTables) { 643 if (allTables.containsKey(tf.getPath().getName())) { 644 ++numTableDirs; 645 tablePath = tf.getPath(); 646 } 647 else { 648 continue; 649 } 650 } 651 652 // Make sure that a directory was created for every CF 653 int dir = 0; 654 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 655 for (byte[] family : FAMILIES) { 656 if (Bytes.toString(family).equals(f.getPath().getName())) { 657 ++dir; 658 } 659 } 660 } 661 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 662 } 663 if (writeMultipleTables) { 664 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 665 } 666 667 Admin admin = util.getConnection().getAdmin(); 668 try { 669 // handle the split case 670 if (shouldChangeRegions) { 671 Table chosenTable = allTables.values().iterator().next(); 672 // Choose a semi-random table if multiple tables are available 673 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 674 admin.disableTable(chosenTable.getName()); 675 util.waitUntilNoRegionsInTransition(); 676 677 util.deleteTable(chosenTable.getName()); 678 byte[][] newSplitKeys = generateRandomSplitKeys(14); 679 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 680 681 while (util.getConnection().getRegionLocator(chosenTable.getName()) 682 .getAllRegionLocations().size() != 15 || 683 !admin.isTableAvailable(table.getName())) { 684 Thread.sleep(200); 685 LOG.info("Waiting for new region assignment to happen"); 686 } 687 } 688 689 // Perform the actual load 690 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 691 Path tableDir = testDir; 692 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 693 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 694 if (writeMultipleTables) { 695 tableDir = new Path(testDir, tableNameStr); 696 } 697 Table currentTable = allTables.get(tableNameStr); 698 TableName currentTableName = currentTable.getName(); 699 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo 700 .getRegionLocator()); 701 702 // Ensure data shows up 703 int expectedRows = 0; 704 if (putSortReducer) { 705 // no rows should be extracted 706 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 707 util.countRows(currentTable)); 708 } else { 709 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 710 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 711 util.countRows(currentTable)); 712 Scan scan = new Scan(); 713 ResultScanner results = currentTable.getScanner(scan); 714 for (Result res : results) { 715 assertEquals(FAMILIES.length, res.rawCells().length); 716 Cell first = res.rawCells()[0]; 717 for (Cell kv : res.rawCells()) { 718 assertTrue(CellUtil.matchingRows(first, kv)); 719 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 720 } 721 } 722 results.close(); 723 } 724 String tableDigestBefore = util.checksumRows(currentTable); 725 // Check region locality 726 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 727 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 728 hbd.add(region.getHDFSBlocksDistribution()); 729 } 730 for (String hostname : hostnames) { 731 float locality = hbd.getBlockLocalityIndex(hostname); 732 LOG.info("locality of [" + hostname + "]: " + locality); 733 assertEquals(100, (int) (locality * 100)); 734 } 735 736 // Cause regions to reopen 737 admin.disableTable(currentTableName); 738 while (!admin.isTableDisabled(currentTableName)) { 739 Thread.sleep(200); 740 LOG.info("Waiting for table to disable"); 741 } 742 admin.enableTable(currentTableName); 743 util.waitTableAvailable(currentTableName); 744 assertEquals("Data should remain after reopening of regions", 745 tableDigestBefore, util.checksumRows(currentTable)); 746 } 747 } finally { 748 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 749 tableInfoSingle.getRegionLocator().close(); 750 } 751 for (Entry<String, Table> singleTable : allTables.entrySet() ) { 752 singleTable.getValue().close(); 753 util.deleteTable(singleTable.getValue().getName()); 754 } 755 testDir.getFileSystem(conf).delete(testDir, true); 756 util.shutdownMiniCluster(); 757 } 758 } 759 760 private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, 761 boolean putSortReducer) throws IOException, 762 InterruptedException, ClassNotFoundException { 763 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 764 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 765 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 766 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 767 CellSerialization.class.getName()); 768 setupRandomGeneratorMapper(job, putSortReducer); 769 if (tableInfo.size() > 1) { 770 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 771 int sum = 0; 772 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 773 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 774 } 775 assertEquals(sum, job.getNumReduceTasks()); 776 } 777 else { 778 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 779 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 780 regionLocator); 781 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 782 } 783 784 FileOutputFormat.setOutputPath(job, outDir); 785 786 assertFalse(util.getTestFileSystem().exists(outDir)) ; 787 788 assertTrue(job.waitForCompletion(true)); 789 } 790 791 /** 792 * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and 793 * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. 794 * Tests that the compression map is correctly serialized into 795 * and deserialized from configuration 796 * 797 * @throws IOException 798 */ 799 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 800 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 801 for (int numCfs = 0; numCfs <= 3; numCfs++) { 802 Configuration conf = new Configuration(this.util.getConfiguration()); 803 Map<String, Compression.Algorithm> familyToCompression = 804 getMockColumnFamiliesForCompression(numCfs); 805 Table table = Mockito.mock(Table.class); 806 setupMockColumnFamiliesForCompression(table, familyToCompression); 807 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 808 HFileOutputFormat2.serializeColumnFamilyAttribute 809 (HFileOutputFormat2.compressionDetails, 810 Arrays.asList(table.getTableDescriptor()))); 811 812 // read back family specific compression setting from the configuration 813 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2 814 .createFamilyCompressionMap(conf); 815 816 // test that we have a value for all column families that matches with the 817 // used mock values 818 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 819 assertEquals("Compression configuration incorrect for column family:" 820 + entry.getKey(), entry.getValue(), 821 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 822 } 823 } 824 } 825 826 private void setupMockColumnFamiliesForCompression(Table table, 827 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 828 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 829 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 830 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 831 .setMaxVersions(1) 832 .setCompressionType(entry.getValue()) 833 .setBlockCacheEnabled(false) 834 .setTimeToLive(0)); 835 } 836 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 837 } 838 839 /** 840 * @return a map from column family names to compression algorithms for 841 * testing column family compression. Column family names have special characters 842 */ 843 private Map<String, Compression.Algorithm> 844 getMockColumnFamiliesForCompression (int numCfs) { 845 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 846 // use column family names having special characters 847 if (numCfs-- > 0) { 848 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 849 } 850 if (numCfs-- > 0) { 851 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 852 } 853 if (numCfs-- > 0) { 854 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 855 } 856 if (numCfs-- > 0) { 857 familyToCompression.put("Family3", Compression.Algorithm.NONE); 858 } 859 return familyToCompression; 860 } 861 862 863 /** 864 * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and 865 * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. 866 * Tests that the compression map is correctly serialized into 867 * and deserialized from configuration 868 * 869 * @throws IOException 870 */ 871 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 872 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 873 for (int numCfs = 0; numCfs <= 2; numCfs++) { 874 Configuration conf = new Configuration(this.util.getConfiguration()); 875 Map<String, BloomType> familyToBloomType = 876 getMockColumnFamiliesForBloomType(numCfs); 877 Table table = Mockito.mock(Table.class); 878 setupMockColumnFamiliesForBloomType(table, 879 familyToBloomType); 880 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 881 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 882 Arrays.asList(table.getTableDescriptor()))); 883 884 // read back family specific data block encoding settings from the 885 // configuration 886 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 887 HFileOutputFormat2 888 .createFamilyBloomTypeMap(conf); 889 890 // test that we have a value for all column families that matches with the 891 // used mock values 892 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 893 assertEquals("BloomType configuration incorrect for column family:" 894 + entry.getKey(), entry.getValue(), 895 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 896 } 897 } 898 } 899 900 private void setupMockColumnFamiliesForBloomType(Table table, 901 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 902 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 903 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 904 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 905 .setMaxVersions(1) 906 .setBloomFilterType(entry.getValue()) 907 .setBlockCacheEnabled(false) 908 .setTimeToLive(0)); 909 } 910 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 911 } 912 913 /** 914 * @return a map from column family names to compression algorithms for 915 * testing column family compression. Column family names have special characters 916 */ 917 private Map<String, BloomType> 918 getMockColumnFamiliesForBloomType (int numCfs) { 919 Map<String, BloomType> familyToBloomType = new HashMap<>(); 920 // use column family names having special characters 921 if (numCfs-- > 0) { 922 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 923 } 924 if (numCfs-- > 0) { 925 familyToBloomType.put("Family2=asdads&!AASD", 926 BloomType.ROWCOL); 927 } 928 if (numCfs-- > 0) { 929 familyToBloomType.put("Family3", BloomType.NONE); 930 } 931 return familyToBloomType; 932 } 933 934 /** 935 * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and 936 * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. 937 * Tests that the compression map is correctly serialized into 938 * and deserialized from configuration 939 * 940 * @throws IOException 941 */ 942 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 943 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 944 for (int numCfs = 0; numCfs <= 3; numCfs++) { 945 Configuration conf = new Configuration(this.util.getConfiguration()); 946 Map<String, Integer> familyToBlockSize = 947 getMockColumnFamiliesForBlockSize(numCfs); 948 Table table = Mockito.mock(Table.class); 949 setupMockColumnFamiliesForBlockSize(table, 950 familyToBlockSize); 951 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 952 HFileOutputFormat2.serializeColumnFamilyAttribute 953 (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table 954 .getTableDescriptor()))); 955 956 // read back family specific data block encoding settings from the 957 // configuration 958 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 959 HFileOutputFormat2 960 .createFamilyBlockSizeMap(conf); 961 962 // test that we have a value for all column families that matches with the 963 // used mock values 964 for (Entry<String, Integer> entry : familyToBlockSize.entrySet() 965 ) { 966 assertEquals("BlockSize configuration incorrect for column family:" 967 + entry.getKey(), entry.getValue(), 968 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 969 } 970 } 971 } 972 973 private void setupMockColumnFamiliesForBlockSize(Table table, 974 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 975 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 976 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 977 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 978 .setMaxVersions(1) 979 .setBlocksize(entry.getValue()) 980 .setBlockCacheEnabled(false) 981 .setTimeToLive(0)); 982 } 983 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 984 } 985 986 /** 987 * @return a map from column family names to compression algorithms for 988 * testing column family compression. Column family names have special characters 989 */ 990 private Map<String, Integer> 991 getMockColumnFamiliesForBlockSize (int numCfs) { 992 Map<String, Integer> familyToBlockSize = new HashMap<>(); 993 // use column family names having special characters 994 if (numCfs-- > 0) { 995 familyToBlockSize.put("Family1!@#!@#&", 1234); 996 } 997 if (numCfs-- > 0) { 998 familyToBlockSize.put("Family2=asdads&!AASD", 999 Integer.MAX_VALUE); 1000 } 1001 if (numCfs-- > 0) { 1002 familyToBlockSize.put("Family2=asdads&!AASD", 1003 Integer.MAX_VALUE); 1004 } 1005 if (numCfs-- > 0) { 1006 familyToBlockSize.put("Family3", 0); 1007 } 1008 return familyToBlockSize; 1009 } 1010 1011 /** 1012 * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)} 1013 * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. 1014 * Tests that the compression map is correctly serialized into 1015 * and deserialized from configuration 1016 * 1017 * @throws IOException 1018 */ 1019 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1020 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1021 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1022 Configuration conf = new Configuration(this.util.getConfiguration()); 1023 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1024 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1025 Table table = Mockito.mock(Table.class); 1026 setupMockColumnFamiliesForDataBlockEncoding(table, 1027 familyToDataBlockEncoding); 1028 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1029 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1030 HFileOutputFormat2.serializeColumnFamilyAttribute 1031 (HFileOutputFormat2.dataBlockEncodingDetails, Arrays 1032 .asList(tableDescriptor))); 1033 1034 // read back family specific data block encoding settings from the 1035 // configuration 1036 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1037 HFileOutputFormat2 1038 .createFamilyDataBlockEncodingMap(conf); 1039 1040 // test that we have a value for all column families that matches with the 1041 // used mock values 1042 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1043 assertEquals("DataBlockEncoding configuration incorrect for column family:" 1044 + entry.getKey(), entry.getValue(), 1045 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1046 } 1047 } 1048 } 1049 1050 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1051 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1052 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1053 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1054 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 1055 .setMaxVersions(1) 1056 .setDataBlockEncoding(entry.getValue()) 1057 .setBlockCacheEnabled(false) 1058 .setTimeToLive(0)); 1059 } 1060 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1061 } 1062 1063 /** 1064 * @return a map from column family names to compression algorithms for 1065 * testing column family compression. Column family names have special characters 1066 */ 1067 private Map<String, DataBlockEncoding> 1068 getMockColumnFamiliesForDataBlockEncoding (int numCfs) { 1069 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1070 // use column family names having special characters 1071 if (numCfs-- > 0) { 1072 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1073 } 1074 if (numCfs-- > 0) { 1075 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1076 DataBlockEncoding.FAST_DIFF); 1077 } 1078 if (numCfs-- > 0) { 1079 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1080 DataBlockEncoding.PREFIX); 1081 } 1082 if (numCfs-- > 0) { 1083 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1084 } 1085 return familyToDataBlockEncoding; 1086 } 1087 1088 private void setupMockStartKeys(RegionLocator table) throws IOException { 1089 byte[][] mockKeys = new byte[][] { 1090 HConstants.EMPTY_BYTE_ARRAY, 1091 Bytes.toBytes("aaa"), 1092 Bytes.toBytes("ggg"), 1093 Bytes.toBytes("zzz") 1094 }; 1095 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1096 } 1097 1098 private void setupMockTableName(RegionLocator table) throws IOException { 1099 TableName mockTableName = TableName.valueOf("mock_table"); 1100 Mockito.doReturn(mockTableName).when(table).getName(); 1101 } 1102 1103 /** 1104 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and 1105 * bloom filter settings from the column family descriptor 1106 */ 1107 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1108 public void testColumnFamilySettings() throws Exception { 1109 Configuration conf = new Configuration(this.util.getConfiguration()); 1110 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1111 TaskAttemptContext context = null; 1112 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1113 1114 // Setup table descriptor 1115 Table table = Mockito.mock(Table.class); 1116 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1117 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1118 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1119 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { 1120 htd.addFamily(hcd); 1121 } 1122 1123 // set up the table to return some mock keys 1124 setupMockStartKeys(regionLocator); 1125 1126 try { 1127 // partial map red setup to get an operational writer for testing 1128 // We turn off the sequence file compression, because DefaultCodec 1129 // pollutes the GZip codec pool with an incompatible compressor. 1130 conf.set("io.seqfile.compression.type", "NONE"); 1131 conf.set("hbase.fs.tmp.dir", dir.toString()); 1132 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1133 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1134 1135 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1136 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1137 setupRandomGeneratorMapper(job, false); 1138 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1139 FileOutputFormat.setOutputPath(job, dir); 1140 context = createTestTaskAttemptContext(job); 1141 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1142 writer = hof.getRecordWriter(context); 1143 1144 // write out random rows 1145 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1146 writer.close(context); 1147 1148 // Make sure that a directory was created for every CF 1149 FileSystem fs = dir.getFileSystem(conf); 1150 1151 // commit so that the filesystem has one directory per column family 1152 hof.getOutputCommitter(context).commitTask(context); 1153 hof.getOutputCommitter(context).commitJob(context); 1154 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1155 assertEquals(htd.getFamilies().size(), families.length); 1156 for (FileStatus f : families) { 1157 String familyStr = f.getPath().getName(); 1158 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1159 // verify that the compression on this file matches the configured 1160 // compression 1161 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1162 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1163 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1164 1165 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1166 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1167 assertEquals("Incorrect bloom filter used for column family " + familyStr + 1168 "(reader: " + reader + ")", 1169 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1170 assertEquals("Incorrect compression used for column family " + familyStr + 1171 "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression()); 1172 } 1173 } finally { 1174 dir.getFileSystem(conf).delete(dir, true); 1175 } 1176 } 1177 1178 /** 1179 * Write random values to the writer assuming a table created using 1180 * {@link #FAMILIES} as column family descriptors 1181 */ 1182 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1183 TaskAttemptContext context, Set<byte[]> families, int numRows) 1184 throws IOException, InterruptedException { 1185 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1186 int valLength = 10; 1187 byte valBytes[] = new byte[valLength]; 1188 1189 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1190 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1191 final byte [] qualifier = Bytes.toBytes("data"); 1192 Random random = new Random(); 1193 for (int i = 0; i < numRows; i++) { 1194 1195 Bytes.putInt(keyBytes, 0, i); 1196 random.nextBytes(valBytes); 1197 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1198 1199 for (byte[] family : families) { 1200 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1201 writer.write(key, kv); 1202 } 1203 } 1204 } 1205 1206 /** 1207 * This test is to test the scenario happened in HBASE-6901. 1208 * All files are bulk loaded and excluded from minor compaction. 1209 * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException 1210 * will be thrown. 1211 */ 1212 @Ignore ("Flakey: See HBASE-9051") @Test 1213 public void testExcludeAllFromMinorCompaction() throws Exception { 1214 Configuration conf = util.getConfiguration(); 1215 conf.setInt("hbase.hstore.compaction.min", 2); 1216 generateRandomStartKeys(5); 1217 1218 util.startMiniCluster(); 1219 try (Connection conn = ConnectionFactory.createConnection(); 1220 Admin admin = conn.getAdmin(); 1221 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1222 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1223 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1224 assertEquals("Should start with empty table", 0, util.countRows(table)); 1225 1226 // deep inspection: get the StoreFile dir 1227 final Path storePath = new Path( 1228 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1229 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1230 Bytes.toString(FAMILIES[0]))); 1231 assertEquals(0, fs.listStatus(storePath).length); 1232 1233 // Generate two bulk load files 1234 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1235 true); 1236 1237 for (int i = 0; i < 2; i++) { 1238 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1239 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1240 .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); 1241 // Perform the actual load 1242 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1243 } 1244 1245 // Ensure data shows up 1246 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1247 assertEquals("LoadIncrementalHFiles should put expected data in table", 1248 expectedRows, util.countRows(table)); 1249 1250 // should have a second StoreFile now 1251 assertEquals(2, fs.listStatus(storePath).length); 1252 1253 // minor compactions shouldn't get rid of the file 1254 admin.compact(TABLE_NAMES[0]); 1255 try { 1256 quickPoll(new Callable<Boolean>() { 1257 @Override 1258 public Boolean call() throws Exception { 1259 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1260 for (HRegion region : regions) { 1261 for (HStore store : region.getStores()) { 1262 store.closeAndArchiveCompactedFiles(); 1263 } 1264 } 1265 return fs.listStatus(storePath).length == 1; 1266 } 1267 }, 5000); 1268 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1269 } catch (AssertionError ae) { 1270 // this is expected behavior 1271 } 1272 1273 // a major compaction should work though 1274 admin.majorCompact(TABLE_NAMES[0]); 1275 quickPoll(new Callable<Boolean>() { 1276 @Override 1277 public Boolean call() throws Exception { 1278 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1279 for (HRegion region : regions) { 1280 for (HStore store : region.getStores()) { 1281 store.closeAndArchiveCompactedFiles(); 1282 } 1283 } 1284 return fs.listStatus(storePath).length == 1; 1285 } 1286 }, 5000); 1287 1288 } finally { 1289 util.shutdownMiniCluster(); 1290 } 1291 } 1292 1293 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1294 public void testExcludeMinorCompaction() throws Exception { 1295 Configuration conf = util.getConfiguration(); 1296 conf.setInt("hbase.hstore.compaction.min", 2); 1297 generateRandomStartKeys(5); 1298 1299 util.startMiniCluster(); 1300 try (Connection conn = ConnectionFactory.createConnection(conf); 1301 Admin admin = conn.getAdmin()){ 1302 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1303 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1304 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1305 assertEquals("Should start with empty table", 0, util.countRows(table)); 1306 1307 // deep inspection: get the StoreFile dir 1308 final Path storePath = new Path( 1309 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1310 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1311 Bytes.toString(FAMILIES[0]))); 1312 assertEquals(0, fs.listStatus(storePath).length); 1313 1314 // put some data in it and flush to create a storefile 1315 Put p = new Put(Bytes.toBytes("test")); 1316 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1317 table.put(p); 1318 admin.flush(TABLE_NAMES[0]); 1319 assertEquals(1, util.countRows(table)); 1320 quickPoll(new Callable<Boolean>() { 1321 @Override 1322 public Boolean call() throws Exception { 1323 return fs.listStatus(storePath).length == 1; 1324 } 1325 }, 5000); 1326 1327 // Generate a bulk load file with more rows 1328 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1329 true); 1330 1331 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1332 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1333 .getTableDescriptor(), regionLocator)), testDir, false); 1334 1335 // Perform the actual load 1336 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1337 1338 // Ensure data shows up 1339 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1340 assertEquals("LoadIncrementalHFiles should put expected data in table", 1341 expectedRows + 1, util.countRows(table)); 1342 1343 // should have a second StoreFile now 1344 assertEquals(2, fs.listStatus(storePath).length); 1345 1346 // minor compactions shouldn't get rid of the file 1347 admin.compact(TABLE_NAMES[0]); 1348 try { 1349 quickPoll(new Callable<Boolean>() { 1350 @Override 1351 public Boolean call() throws Exception { 1352 return fs.listStatus(storePath).length == 1; 1353 } 1354 }, 5000); 1355 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1356 } catch (AssertionError ae) { 1357 // this is expected behavior 1358 } 1359 1360 // a major compaction should work though 1361 admin.majorCompact(TABLE_NAMES[0]); 1362 quickPoll(new Callable<Boolean>() { 1363 @Override 1364 public Boolean call() throws Exception { 1365 return fs.listStatus(storePath).length == 1; 1366 } 1367 }, 5000); 1368 1369 } finally { 1370 util.shutdownMiniCluster(); 1371 } 1372 } 1373 1374 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1375 int sleepMs = 10; 1376 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1377 while (retries-- > 0) { 1378 if (c.call().booleanValue()) { 1379 return; 1380 } 1381 Thread.sleep(sleepMs); 1382 } 1383 fail(); 1384 } 1385 1386 public static void main(String args[]) throws Exception { 1387 new TestCellBasedHFileOutputFormat2().manualTest(args); 1388 } 1389 1390 public void manualTest(String args[]) throws Exception { 1391 Configuration conf = HBaseConfiguration.create(); 1392 util = new HBaseTestingUtility(conf); 1393 if ("newtable".equals(args[0])) { 1394 TableName tname = TableName.valueOf(args[1]); 1395 byte[][] splitKeys = generateRandomSplitKeys(4); 1396 Table table = util.createTable(tname, FAMILIES, splitKeys); 1397 } else if ("incremental".equals(args[0])) { 1398 TableName tname = TableName.valueOf(args[1]); 1399 try(Connection c = ConnectionFactory.createConnection(conf); 1400 Admin admin = c.getAdmin(); 1401 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1402 Path outDir = new Path("incremental-out"); 1403 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin 1404 .getTableDescriptor(tname), regionLocator)), outDir, false); 1405 } 1406 } else { 1407 throw new RuntimeException( 1408 "usage: TestHFileOutputFormat2 newtable | incremental"); 1409 } 1410 } 1411 1412 @Test 1413 public void testBlockStoragePolicy() throws Exception { 1414 util = new HBaseTestingUtility(); 1415 Configuration conf = util.getConfiguration(); 1416 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1417 1418 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + 1419 Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( 1420 TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); 1421 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1422 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1423 util.startMiniDFSCluster(3); 1424 FileSystem fs = util.getDFSCluster().getFileSystem(); 1425 try { 1426 fs.mkdirs(cf1Dir); 1427 fs.mkdirs(cf2Dir); 1428 1429 // the original block storage policy would be HOT 1430 String spA = getStoragePolicyName(fs, cf1Dir); 1431 String spB = getStoragePolicyName(fs, cf2Dir); 1432 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1433 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1434 assertEquals("HOT", spA); 1435 assertEquals("HOT", spB); 1436 1437 // alter table cf schema to change storage policies 1438 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1439 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1440 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1441 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1442 spA = getStoragePolicyName(fs, cf1Dir); 1443 spB = getStoragePolicyName(fs, cf2Dir); 1444 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1445 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1446 assertNotNull(spA); 1447 assertEquals("ONE_SSD", spA); 1448 assertNotNull(spB); 1449 assertEquals("ALL_SSD", spB); 1450 } finally { 1451 fs.delete(cf1Dir, true); 1452 fs.delete(cf2Dir, true); 1453 util.shutdownMiniDFSCluster(); 1454 } 1455 } 1456 1457 private String getStoragePolicyName(FileSystem fs, Path path) { 1458 try { 1459 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1460 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1461 } catch (Exception e) { 1462 // Maybe fail because of using old HDFS version, try the old way 1463 if (LOG.isTraceEnabled()) { 1464 LOG.trace("Failed to get policy directly", e); 1465 } 1466 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1467 return policy == null ? "HOT" : policy;// HOT by default 1468 } 1469 } 1470 1471 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1472 try { 1473 if (fs instanceof DistributedFileSystem) { 1474 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1475 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1476 if (null != status) { 1477 byte storagePolicyId = status.getStoragePolicy(); 1478 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1479 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1480 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1481 for (BlockStoragePolicy policy : policies) { 1482 if (policy.getId() == storagePolicyId) { 1483 return policy.getName(); 1484 } 1485 } 1486 } 1487 } 1488 } 1489 } catch (Throwable e) { 1490 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1491 } 1492 1493 return null; 1494 } 1495} 1496