001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Random; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.LocatedFileStatus; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.RemoteIterator; 047import org.apache.hadoop.hbase.ArrayBackedTag; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HDFSBlocksDistribution; 057import org.apache.hadoop.hbase.HTableDescriptor; 058import org.apache.hadoop.hbase.HadoopShims; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.PerformanceEvaluation; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.Tag; 063import org.apache.hadoop.hbase.TagType; 064import org.apache.hadoop.hbase.TagUtil; 065import org.apache.hadoop.hbase.client.Admin; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.Put; 069import org.apache.hadoop.hbase.client.RegionLocator; 070import org.apache.hadoop.hbase.client.Result; 071import org.apache.hadoop.hbase.client.ResultScanner; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.client.Table; 074import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 075import org.apache.hadoop.hbase.io.compress.Compression; 076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 077import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.regionserver.BloomType; 083import org.apache.hadoop.hbase.regionserver.HRegion; 084import org.apache.hadoop.hbase.regionserver.HStore; 085import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 086import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 087import org.apache.hadoop.hbase.testclassification.LargeTests; 088import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 089import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.CommonFSUtils; 092import org.apache.hadoop.hbase.util.FSUtils; 093import org.apache.hadoop.hbase.util.ReflectionUtils; 094import org.apache.hadoop.hdfs.DistributedFileSystem; 095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 098import org.apache.hadoop.io.NullWritable; 099import org.apache.hadoop.mapreduce.Job; 100import org.apache.hadoop.mapreduce.Mapper; 101import org.apache.hadoop.mapreduce.RecordWriter; 102import org.apache.hadoop.mapreduce.TaskAttemptContext; 103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 104import org.junit.ClassRule; 105import org.junit.Ignore; 106import org.junit.Test; 107import org.junit.experimental.categories.Category; 108import org.mockito.Mockito; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112/** 113 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 114 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 115 * values like those of {@link PerformanceEvaluation}. 116 */ 117@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 118public class TestCellBasedHFileOutputFormat2 { 119 120 @ClassRule 121 public static final HBaseClassTestRule CLASS_RULE = 122 HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class); 123 124 private final static int ROWSPERSPLIT = 1024; 125 126 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 127 private static final byte[][] FAMILIES = 128 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 129 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 130 .map(TableName::valueOf).toArray(TableName[]::new); 131 132 private HBaseTestingUtility util = new HBaseTestingUtility(); 133 134 private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class); 135 136 /** 137 * Simple mapper that makes KeyValue output. 138 */ 139 static class RandomKVGeneratingMapper 140 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 141 142 private int keyLength; 143 private static final int KEYLEN_DEFAULT = 10; 144 private static final String KEYLEN_CONF = "randomkv.key.length"; 145 146 private int valLength; 147 private static final int VALLEN_DEFAULT = 10; 148 private static final String VALLEN_CONF = "randomkv.val.length"; 149 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 150 private boolean multiTableMapper = false; 151 private TableName[] tables = null; 152 153 @Override 154 protected void setup(Context context) throws IOException, InterruptedException { 155 super.setup(context); 156 157 Configuration conf = context.getConfiguration(); 158 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 159 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 160 multiTableMapper = 161 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 162 if (multiTableMapper) { 163 tables = TABLE_NAMES; 164 } else { 165 tables = new TableName[] { TABLE_NAMES[0] }; 166 } 167 } 168 169 @Override 170 protected void map(NullWritable n1, NullWritable n2, 171 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 172 throws java.io.IOException, InterruptedException { 173 174 byte keyBytes[] = new byte[keyLength]; 175 byte valBytes[] = new byte[valLength]; 176 177 int taskId = context.getTaskAttemptID().getTaskID().getId(); 178 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 179 Random random = new Random(); 180 byte[] key; 181 for (int j = 0; j < tables.length; ++j) { 182 for (int i = 0; i < ROWSPERSPLIT; i++) { 183 random.nextBytes(keyBytes); 184 // Ensure that unique tasks generate unique keys 185 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 186 random.nextBytes(valBytes); 187 key = keyBytes; 188 if (multiTableMapper) { 189 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 190 } 191 192 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 193 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 194 context.write(new ImmutableBytesWritable(key), kv); 195 } 196 } 197 } 198 } 199 } 200 201 /** 202 * Simple mapper that makes Put output. 203 */ 204 static class RandomPutGeneratingMapper 205 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 206 207 private int keyLength; 208 private static final int KEYLEN_DEFAULT = 10; 209 private static final String KEYLEN_CONF = "randomkv.key.length"; 210 211 private int valLength; 212 private static final int VALLEN_DEFAULT = 10; 213 private static final String VALLEN_CONF = "randomkv.val.length"; 214 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 215 private boolean multiTableMapper = false; 216 private TableName[] tables = null; 217 218 @Override 219 protected void setup(Context context) throws IOException, InterruptedException { 220 super.setup(context); 221 222 Configuration conf = context.getConfiguration(); 223 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 224 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 225 multiTableMapper = 226 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 227 if (multiTableMapper) { 228 tables = TABLE_NAMES; 229 } else { 230 tables = new TableName[] { TABLE_NAMES[0] }; 231 } 232 } 233 234 @Override 235 protected void map(NullWritable n1, NullWritable n2, 236 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 237 throws java.io.IOException, InterruptedException { 238 239 byte keyBytes[] = new byte[keyLength]; 240 byte valBytes[] = new byte[valLength]; 241 242 int taskId = context.getTaskAttemptID().getTaskID().getId(); 243 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 244 245 Random random = new Random(); 246 byte[] key; 247 for (int j = 0; j < tables.length; ++j) { 248 for (int i = 0; i < ROWSPERSPLIT; i++) { 249 random.nextBytes(keyBytes); 250 // Ensure that unique tasks generate unique keys 251 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 252 random.nextBytes(valBytes); 253 key = keyBytes; 254 if (multiTableMapper) { 255 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 256 } 257 258 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 259 Put p = new Put(keyBytes); 260 p.addColumn(family, QUALIFIER, valBytes); 261 // set TTL to very low so that the scan does not return any value 262 p.setTTL(1l); 263 context.write(new ImmutableBytesWritable(key), p); 264 } 265 } 266 } 267 } 268 } 269 270 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 271 if (putSortReducer) { 272 job.setInputFormatClass(NMapInputFormat.class); 273 job.setMapperClass(RandomPutGeneratingMapper.class); 274 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 275 job.setMapOutputValueClass(Put.class); 276 } else { 277 job.setInputFormatClass(NMapInputFormat.class); 278 job.setMapperClass(RandomKVGeneratingMapper.class); 279 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 280 job.setMapOutputValueClass(KeyValue.class); 281 } 282 } 283 284 /** 285 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 286 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 287 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 288 */ 289 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 290 @Test 291 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 292 Configuration conf = new Configuration(this.util.getConfiguration()); 293 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 294 TaskAttemptContext context = null; 295 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 296 try { 297 Job job = new Job(conf); 298 FileOutputFormat.setOutputPath(job, dir); 299 context = createTestTaskAttemptContext(job); 300 HFileOutputFormat2 hof = new HFileOutputFormat2(); 301 writer = hof.getRecordWriter(context); 302 final byte[] b = Bytes.toBytes("b"); 303 304 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 305 // changed by call to write. Check all in kv is same but ts. 306 KeyValue kv = new KeyValue(b, b, b); 307 KeyValue original = kv.clone(); 308 writer.write(new ImmutableBytesWritable(), kv); 309 assertFalse(original.equals(kv)); 310 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 311 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 312 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 313 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 314 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 315 316 // Test 2. Now test passing a kv that has explicit ts. It should not be 317 // changed by call to record write. 318 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 319 original = kv.clone(); 320 writer.write(new ImmutableBytesWritable(), kv); 321 assertTrue(original.equals(kv)); 322 } finally { 323 if (writer != null && context != null) writer.close(context); 324 dir.getFileSystem(conf).delete(dir, true); 325 } 326 } 327 328 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 329 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 330 TaskAttemptContext context = 331 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 332 return context; 333 } 334 335 /* 336 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 337 * time-restricted scans. 338 */ 339 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 340 @Test 341 public void test_TIMERANGE() throws Exception { 342 Configuration conf = new Configuration(this.util.getConfiguration()); 343 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 344 TaskAttemptContext context = null; 345 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 346 LOG.info("Timerange dir writing to dir: " + dir); 347 try { 348 // build a record writer using HFileOutputFormat2 349 Job job = new Job(conf); 350 FileOutputFormat.setOutputPath(job, dir); 351 context = createTestTaskAttemptContext(job); 352 HFileOutputFormat2 hof = new HFileOutputFormat2(); 353 writer = hof.getRecordWriter(context); 354 355 // Pass two key values with explicit times stamps 356 final byte[] b = Bytes.toBytes("b"); 357 358 // value 1 with timestamp 2000 359 KeyValue kv = new KeyValue(b, b, b, 2000, b); 360 KeyValue original = kv.clone(); 361 writer.write(new ImmutableBytesWritable(), kv); 362 assertEquals(original, kv); 363 364 // value 2 with timestamp 1000 365 kv = new KeyValue(b, b, b, 1000, b); 366 original = kv.clone(); 367 writer.write(new ImmutableBytesWritable(), kv); 368 assertEquals(original, kv); 369 370 // verify that the file has the proper FileInfo. 371 writer.close(context); 372 373 // the generated file lives 1 directory down from the attempt directory 374 // and is the only file, e.g. 375 // _attempt__0000_r_000000_0/b/1979617994050536795 376 FileSystem fs = FileSystem.get(conf); 377 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 378 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 379 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 380 381 // open as HFile Reader and pull out TIMERANGE FileInfo. 382 HFile.Reader rd = 383 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 384 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 385 byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); 386 assertNotNull(range); 387 388 // unmarshall and check values. 389 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 390 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 391 assertEquals(1000, timeRangeTracker.getMin()); 392 assertEquals(2000, timeRangeTracker.getMax()); 393 rd.close(); 394 } finally { 395 if (writer != null && context != null) writer.close(context); 396 dir.getFileSystem(conf).delete(dir, true); 397 } 398 } 399 400 /** 401 * Run small MR job. 402 */ 403 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 404 @Test 405 public void testWritingPEData() throws Exception { 406 Configuration conf = util.getConfiguration(); 407 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 408 FileSystem fs = testDir.getFileSystem(conf); 409 410 // Set down this value or we OOME in eclipse. 411 conf.setInt("mapreduce.task.io.sort.mb", 20); 412 // Write a few files. 413 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); 414 415 Job job = new Job(conf, "testWritingPEData"); 416 setupRandomGeneratorMapper(job, false); 417 // This partitioner doesn't work well for number keys but using it anyways 418 // just to demonstrate how to configure it. 419 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 420 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 421 422 Arrays.fill(startKey, (byte) 0); 423 Arrays.fill(endKey, (byte) 0xff); 424 425 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 426 // Set start and end rows for partitioner. 427 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 428 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 429 job.setReducerClass(CellSortReducer.class); 430 job.setOutputFormatClass(HFileOutputFormat2.class); 431 job.setNumReduceTasks(4); 432 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 433 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 434 CellSerialization.class.getName()); 435 436 FileOutputFormat.setOutputPath(job, testDir); 437 assertTrue(job.waitForCompletion(false)); 438 FileStatus[] files = fs.listStatus(testDir); 439 assertTrue(files.length > 0); 440 } 441 442 /** 443 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 444 */ 445 @Test 446 public void test_WritingTagData() throws Exception { 447 Configuration conf = new Configuration(this.util.getConfiguration()); 448 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 449 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 450 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 451 TaskAttemptContext context = null; 452 Path dir = util.getDataTestDir("WritingTagData"); 453 try { 454 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 455 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 456 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 457 Job job = new Job(conf); 458 FileOutputFormat.setOutputPath(job, dir); 459 context = createTestTaskAttemptContext(job); 460 HFileOutputFormat2 hof = new HFileOutputFormat2(); 461 writer = hof.getRecordWriter(context); 462 final byte[] b = Bytes.toBytes("b"); 463 464 List<Tag> tags = new ArrayList<>(); 465 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 466 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 467 writer.write(new ImmutableBytesWritable(), kv); 468 writer.close(context); 469 writer = null; 470 FileSystem fs = dir.getFileSystem(conf); 471 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 472 while (iterator.hasNext()) { 473 LocatedFileStatus keyFileStatus = iterator.next(); 474 HFile.Reader reader = 475 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 476 HFileScanner scanner = reader.getScanner(conf, false, false, false); 477 scanner.seekTo(); 478 Cell cell = scanner.getCell(); 479 List<Tag> tagsFromCell = 480 TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); 481 assertTrue(tagsFromCell.size() > 0); 482 for (Tag tag : tagsFromCell) { 483 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 484 } 485 } 486 } finally { 487 if (writer != null && context != null) writer.close(context); 488 dir.getFileSystem(conf).delete(dir, true); 489 } 490 } 491 492 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 493 @Test 494 public void testJobConfiguration() throws Exception { 495 Configuration conf = new Configuration(this.util.getConfiguration()); 496 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 497 util.getDataTestDir("testJobConfiguration").toString()); 498 Job job = new Job(conf); 499 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 500 Table table = Mockito.mock(Table.class); 501 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 502 setupMockStartKeys(regionLocator); 503 setupMockTableName(regionLocator); 504 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 505 assertEquals(job.getNumReduceTasks(), 4); 506 } 507 508 private byte[][] generateRandomStartKeys(int numKeys) { 509 Random random = new Random(); 510 byte[][] ret = new byte[numKeys][]; 511 // first region start key is always empty 512 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 513 for (int i = 1; i < numKeys; i++) { 514 ret[i] = 515 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 516 } 517 return ret; 518 } 519 520 private byte[][] generateRandomSplitKeys(int numKeys) { 521 Random random = new Random(); 522 byte[][] ret = new byte[numKeys][]; 523 for (int i = 0; i < numKeys; i++) { 524 ret[i] = 525 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 526 } 527 return ret; 528 } 529 530 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 531 @Test 532 public void testMRIncrementalLoad() throws Exception { 533 LOG.info("\nStarting test testMRIncrementalLoad\n"); 534 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 535 } 536 537 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 538 @Test 539 public void testMRIncrementalLoadWithSplit() throws Exception { 540 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 541 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 542 } 543 544 /** 545 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 546 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 547 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 548 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 549 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 550 * locality features more easily. 551 */ 552 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 553 @Test 554 public void testMRIncrementalLoadWithLocality() throws Exception { 555 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 556 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 557 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 558 } 559 560 // @Ignore("Wahtevs") 561 @Test 562 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 563 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 564 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 565 } 566 567 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 568 boolean putSortReducer, String tableStr) throws Exception { 569 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 570 Arrays.asList(tableStr)); 571 } 572 573 @Test 574 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 575 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 576 doIncrementalLoadTest(false, false, true, 577 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 578 } 579 580 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 581 boolean putSortReducer, List<String> tableStr) throws Exception { 582 util = new HBaseTestingUtility(); 583 Configuration conf = util.getConfiguration(); 584 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 585 int hostCount = 1; 586 int regionNum = 5; 587 if (shouldKeepLocality) { 588 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 589 // explicit hostnames parameter just like MiniDFSCluster does. 590 hostCount = 3; 591 regionNum = 20; 592 } 593 594 String[] hostnames = new String[hostCount]; 595 for (int i = 0; i < hostCount; ++i) { 596 hostnames[i] = "datanode_" + i; 597 } 598 util.startMiniCluster(1, hostCount, hostnames); 599 600 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 601 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 602 boolean writeMultipleTables = tableStr.size() > 1; 603 for (String tableStrSingle : tableStr) { 604 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 605 TableName tableName = TableName.valueOf(tableStrSingle); 606 Table table = util.createTable(tableName, FAMILIES, splitKeys); 607 608 RegionLocator r = util.getConnection().getRegionLocator(tableName); 609 assertEquals("Should start with empty table", 0, util.countRows(table)); 610 int numRegions = r.getStartKeys().length; 611 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 612 613 allTables.put(tableStrSingle, table); 614 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 615 } 616 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 617 // Generate the bulk load files 618 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 619 620 for (Table tableSingle : allTables.values()) { 621 // This doesn't write into the table, just makes files 622 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 623 } 624 int numTableDirs = 0; 625 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 626 Path tablePath = testDir; 627 628 if (writeMultipleTables) { 629 if (allTables.containsKey(tf.getPath().getName())) { 630 ++numTableDirs; 631 tablePath = tf.getPath(); 632 } else { 633 continue; 634 } 635 } 636 637 // Make sure that a directory was created for every CF 638 int dir = 0; 639 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 640 for (byte[] family : FAMILIES) { 641 if (Bytes.toString(family).equals(f.getPath().getName())) { 642 ++dir; 643 } 644 } 645 } 646 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 647 } 648 if (writeMultipleTables) { 649 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 650 } 651 652 Admin admin = util.getConnection().getAdmin(); 653 try { 654 // handle the split case 655 if (shouldChangeRegions) { 656 Table chosenTable = allTables.values().iterator().next(); 657 // Choose a semi-random table if multiple tables are available 658 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 659 admin.disableTable(chosenTable.getName()); 660 util.waitUntilNoRegionsInTransition(); 661 662 util.deleteTable(chosenTable.getName()); 663 byte[][] newSplitKeys = generateRandomSplitKeys(14); 664 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 665 666 while ( 667 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 668 .size() != 15 || !admin.isTableAvailable(table.getName()) 669 ) { 670 Thread.sleep(200); 671 LOG.info("Waiting for new region assignment to happen"); 672 } 673 } 674 675 // Perform the actual load 676 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 677 Path tableDir = testDir; 678 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 679 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 680 if (writeMultipleTables) { 681 tableDir = new Path(testDir, tableNameStr); 682 } 683 Table currentTable = allTables.get(tableNameStr); 684 TableName currentTableName = currentTable.getName(); 685 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, 686 singleTableInfo.getRegionLocator()); 687 688 // Ensure data shows up 689 int expectedRows = 0; 690 if (putSortReducer) { 691 // no rows should be extracted 692 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 693 util.countRows(currentTable)); 694 } else { 695 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 696 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 697 util.countRows(currentTable)); 698 Scan scan = new Scan(); 699 ResultScanner results = currentTable.getScanner(scan); 700 for (Result res : results) { 701 assertEquals(FAMILIES.length, res.rawCells().length); 702 Cell first = res.rawCells()[0]; 703 for (Cell kv : res.rawCells()) { 704 assertTrue(CellUtil.matchingRows(first, kv)); 705 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 706 } 707 } 708 results.close(); 709 } 710 String tableDigestBefore = util.checksumRows(currentTable); 711 // Check region locality 712 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 713 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 714 hbd.add(region.getHDFSBlocksDistribution()); 715 } 716 for (String hostname : hostnames) { 717 float locality = hbd.getBlockLocalityIndex(hostname); 718 LOG.info("locality of [" + hostname + "]: " + locality); 719 assertEquals(100, (int) (locality * 100)); 720 } 721 722 // Cause regions to reopen 723 admin.disableTable(currentTableName); 724 while (!admin.isTableDisabled(currentTableName)) { 725 Thread.sleep(200); 726 LOG.info("Waiting for table to disable"); 727 } 728 admin.enableTable(currentTableName); 729 util.waitTableAvailable(currentTableName); 730 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 731 util.checksumRows(currentTable)); 732 } 733 } finally { 734 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 735 tableInfoSingle.getRegionLocator().close(); 736 } 737 for (Entry<String, Table> singleTable : allTables.entrySet()) { 738 singleTable.getValue().close(); 739 util.deleteTable(singleTable.getValue().getName()); 740 } 741 testDir.getFileSystem(conf).delete(testDir, true); 742 util.shutdownMiniCluster(); 743 } 744 } 745 746 private void runIncrementalPELoad(Configuration conf, 747 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 748 throws IOException, InterruptedException, ClassNotFoundException { 749 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 750 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 751 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 752 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 753 CellSerialization.class.getName()); 754 setupRandomGeneratorMapper(job, putSortReducer); 755 if (tableInfo.size() > 1) { 756 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 757 int sum = 0; 758 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 759 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 760 } 761 assertEquals(sum, job.getNumReduceTasks()); 762 } else { 763 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 764 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 765 regionLocator); 766 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 767 } 768 769 FileOutputFormat.setOutputPath(job, outDir); 770 771 assertFalse(util.getTestFileSystem().exists(outDir)); 772 773 assertTrue(job.waitForCompletion(true)); 774 } 775 776 /** 777 * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and 778 * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 779 * compression map is correctly serialized into and deserialized from configuration n 780 */ 781 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 782 @Test 783 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 784 for (int numCfs = 0; numCfs <= 3; numCfs++) { 785 Configuration conf = new Configuration(this.util.getConfiguration()); 786 Map<String, Compression.Algorithm> familyToCompression = 787 getMockColumnFamiliesForCompression(numCfs); 788 Table table = Mockito.mock(Table.class); 789 setupMockColumnFamiliesForCompression(table, familyToCompression); 790 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 791 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 792 Arrays.asList(table.getTableDescriptor()))); 793 794 // read back family specific compression setting from the configuration 795 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 796 HFileOutputFormat2.createFamilyCompressionMap(conf); 797 798 // test that we have a value for all column families that matches with the 799 // used mock values 800 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 801 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 802 entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 803 } 804 } 805 } 806 807 private void setupMockColumnFamiliesForCompression(Table table, 808 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 809 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 810 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 811 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 812 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 813 } 814 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 815 } 816 817 /** 818 * @return a map from column family names to compression algorithms for testing column family 819 * compression. Column family names have special characters 820 */ 821 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 822 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 823 // use column family names having special characters 824 if (numCfs-- > 0) { 825 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 826 } 827 if (numCfs-- > 0) { 828 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 829 } 830 if (numCfs-- > 0) { 831 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 832 } 833 if (numCfs-- > 0) { 834 familyToCompression.put("Family3", Compression.Algorithm.NONE); 835 } 836 return familyToCompression; 837 } 838 839 /** 840 * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and 841 * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the compression 842 * map is correctly serialized into and deserialized from configuration n 843 */ 844 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 845 @Test 846 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 847 for (int numCfs = 0; numCfs <= 2; numCfs++) { 848 Configuration conf = new Configuration(this.util.getConfiguration()); 849 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 850 Table table = Mockito.mock(Table.class); 851 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 852 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 853 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 854 Arrays.asList(table.getTableDescriptor()))); 855 856 // read back family specific data block encoding settings from the 857 // configuration 858 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 859 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 860 861 // test that we have a value for all column families that matches with the 862 // used mock values 863 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 864 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 865 entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 866 } 867 } 868 } 869 870 private void setupMockColumnFamiliesForBloomType(Table table, 871 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 872 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 873 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 874 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 875 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 876 } 877 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 878 } 879 880 /** 881 * @return a map from column family names to compression algorithms for testing column family 882 * compression. Column family names have special characters 883 */ 884 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 885 Map<String, BloomType> familyToBloomType = new HashMap<>(); 886 // use column family names having special characters 887 if (numCfs-- > 0) { 888 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 889 } 890 if (numCfs-- > 0) { 891 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 892 } 893 if (numCfs-- > 0) { 894 familyToBloomType.put("Family3", BloomType.NONE); 895 } 896 return familyToBloomType; 897 } 898 899 /** 900 * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and 901 * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the compression 902 * map is correctly serialized into and deserialized from configuration n 903 */ 904 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 905 @Test 906 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 907 for (int numCfs = 0; numCfs <= 3; numCfs++) { 908 Configuration conf = new Configuration(this.util.getConfiguration()); 909 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 910 Table table = Mockito.mock(Table.class); 911 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 912 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 913 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 914 Arrays.asList(table.getTableDescriptor()))); 915 916 // read back family specific data block encoding settings from the 917 // configuration 918 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 919 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 920 921 // test that we have a value for all column families that matches with the 922 // used mock values 923 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 924 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 925 entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 926 } 927 } 928 } 929 930 private void setupMockColumnFamiliesForBlockSize(Table table, 931 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 932 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 933 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 934 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 935 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 936 } 937 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 938 } 939 940 /** 941 * @return a map from column family names to compression algorithms for testing column family 942 * compression. Column family names have special characters 943 */ 944 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 945 Map<String, Integer> familyToBlockSize = new HashMap<>(); 946 // use column family names having special characters 947 if (numCfs-- > 0) { 948 familyToBlockSize.put("Family1!@#!@#&", 1234); 949 } 950 if (numCfs-- > 0) { 951 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 952 } 953 if (numCfs-- > 0) { 954 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 955 } 956 if (numCfs-- > 0) { 957 familyToBlockSize.put("Family3", 0); 958 } 959 return familyToBlockSize; 960 } 961 962 /** 963 * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)} 964 * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that the 965 * compression map is correctly serialized into and deserialized from configuration n 966 */ 967 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 968 @Test 969 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 970 for (int numCfs = 0; numCfs <= 3; numCfs++) { 971 Configuration conf = new Configuration(this.util.getConfiguration()); 972 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 973 getMockColumnFamiliesForDataBlockEncoding(numCfs); 974 Table table = Mockito.mock(Table.class); 975 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 976 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 977 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 978 HFileOutputFormat2.serializeColumnFamilyAttribute( 979 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 980 981 // read back family specific data block encoding settings from the 982 // configuration 983 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 984 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 985 986 // test that we have a value for all column families that matches with the 987 // used mock values 988 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 989 assertEquals( 990 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 991 entry.getValue(), 992 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 993 } 994 } 995 } 996 997 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 998 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 999 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1000 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1001 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 1002 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 1003 } 1004 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1005 } 1006 1007 /** 1008 * @return a map from column family names to compression algorithms for testing column family 1009 * compression. Column family names have special characters 1010 */ 1011 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1012 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1013 // use column family names having special characters 1014 if (numCfs-- > 0) { 1015 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1016 } 1017 if (numCfs-- > 0) { 1018 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1019 } 1020 if (numCfs-- > 0) { 1021 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1022 } 1023 if (numCfs-- > 0) { 1024 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1025 } 1026 return familyToDataBlockEncoding; 1027 } 1028 1029 private void setupMockStartKeys(RegionLocator table) throws IOException { 1030 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1031 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1032 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1033 } 1034 1035 private void setupMockTableName(RegionLocator table) throws IOException { 1036 TableName mockTableName = TableName.valueOf("mock_table"); 1037 Mockito.doReturn(mockTableName).when(table).getName(); 1038 } 1039 1040 /** 1041 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1042 * from the column family descriptor 1043 */ 1044 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1045 @Test 1046 public void testColumnFamilySettings() throws Exception { 1047 Configuration conf = new Configuration(this.util.getConfiguration()); 1048 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1049 TaskAttemptContext context = null; 1050 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1051 1052 // Setup table descriptor 1053 Table table = Mockito.mock(Table.class); 1054 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1055 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1056 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1057 for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) { 1058 htd.addFamily(hcd); 1059 } 1060 1061 // set up the table to return some mock keys 1062 setupMockStartKeys(regionLocator); 1063 1064 try { 1065 // partial map red setup to get an operational writer for testing 1066 // We turn off the sequence file compression, because DefaultCodec 1067 // pollutes the GZip codec pool with an incompatible compressor. 1068 conf.set("io.seqfile.compression.type", "NONE"); 1069 conf.set("hbase.fs.tmp.dir", dir.toString()); 1070 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1071 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1072 1073 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1074 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1075 setupRandomGeneratorMapper(job, false); 1076 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1077 FileOutputFormat.setOutputPath(job, dir); 1078 context = createTestTaskAttemptContext(job); 1079 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1080 writer = hof.getRecordWriter(context); 1081 1082 // write out random rows 1083 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1084 writer.close(context); 1085 1086 // Make sure that a directory was created for every CF 1087 FileSystem fs = dir.getFileSystem(conf); 1088 1089 // commit so that the filesystem has one directory per column family 1090 hof.getOutputCommitter(context).commitTask(context); 1091 hof.getOutputCommitter(context).commitJob(context); 1092 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1093 assertEquals(htd.getFamilies().size(), families.length); 1094 for (FileStatus f : families) { 1095 String familyStr = f.getPath().getName(); 1096 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1097 // verify that the compression on this file matches the configured 1098 // compression 1099 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1100 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1101 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1102 1103 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1104 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1105 assertEquals( 1106 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1107 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1108 assertEquals( 1109 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1110 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1111 } 1112 } finally { 1113 dir.getFileSystem(conf).delete(dir, true); 1114 } 1115 } 1116 1117 /** 1118 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1119 * family descriptors 1120 */ 1121 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1122 TaskAttemptContext context, Set<byte[]> families, int numRows) 1123 throws IOException, InterruptedException { 1124 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1125 int valLength = 10; 1126 byte valBytes[] = new byte[valLength]; 1127 1128 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1129 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1130 final byte[] qualifier = Bytes.toBytes("data"); 1131 Random random = new Random(); 1132 for (int i = 0; i < numRows; i++) { 1133 1134 Bytes.putInt(keyBytes, 0, i); 1135 random.nextBytes(valBytes); 1136 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1137 1138 for (byte[] family : families) { 1139 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1140 writer.write(key, kv); 1141 } 1142 } 1143 } 1144 1145 /** 1146 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1147 * excluded from minor compaction. Without the fix of HBASE-6901, an 1148 * ArrayIndexOutOfBoundsException will be thrown. 1149 */ 1150 @Ignore("Flakey: See HBASE-9051") 1151 @Test 1152 public void testExcludeAllFromMinorCompaction() throws Exception { 1153 Configuration conf = util.getConfiguration(); 1154 conf.setInt("hbase.hstore.compaction.min", 2); 1155 generateRandomStartKeys(5); 1156 1157 util.startMiniCluster(); 1158 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1159 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1160 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1161 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1162 assertEquals("Should start with empty table", 0, util.countRows(table)); 1163 1164 // deep inspection: get the StoreFile dir 1165 final Path storePath = 1166 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1167 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1168 Bytes.toString(FAMILIES[0]))); 1169 assertEquals(0, fs.listStatus(storePath).length); 1170 1171 // Generate two bulk load files 1172 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1173 1174 for (int i = 0; i < 2; i++) { 1175 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1176 runIncrementalPELoad(conf, 1177 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), 1178 conn.getRegionLocator(TABLE_NAMES[0]))), 1179 testDir, false); 1180 // Perform the actual load 1181 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1182 } 1183 1184 // Ensure data shows up 1185 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1186 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 1187 util.countRows(table)); 1188 1189 // should have a second StoreFile now 1190 assertEquals(2, fs.listStatus(storePath).length); 1191 1192 // minor compactions shouldn't get rid of the file 1193 admin.compact(TABLE_NAMES[0]); 1194 try { 1195 quickPoll(new Callable<Boolean>() { 1196 @Override 1197 public Boolean call() throws Exception { 1198 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1199 for (HRegion region : regions) { 1200 for (HStore store : region.getStores()) { 1201 store.closeAndArchiveCompactedFiles(); 1202 } 1203 } 1204 return fs.listStatus(storePath).length == 1; 1205 } 1206 }, 5000); 1207 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1208 } catch (AssertionError ae) { 1209 // this is expected behavior 1210 } 1211 1212 // a major compaction should work though 1213 admin.majorCompact(TABLE_NAMES[0]); 1214 quickPoll(new Callable<Boolean>() { 1215 @Override 1216 public Boolean call() throws Exception { 1217 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1218 for (HRegion region : regions) { 1219 for (HStore store : region.getStores()) { 1220 store.closeAndArchiveCompactedFiles(); 1221 } 1222 } 1223 return fs.listStatus(storePath).length == 1; 1224 } 1225 }, 5000); 1226 1227 } finally { 1228 util.shutdownMiniCluster(); 1229 } 1230 } 1231 1232 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1233 @Test 1234 public void testExcludeMinorCompaction() throws Exception { 1235 Configuration conf = util.getConfiguration(); 1236 conf.setInt("hbase.hstore.compaction.min", 2); 1237 generateRandomStartKeys(5); 1238 1239 util.startMiniCluster(); 1240 try (Connection conn = ConnectionFactory.createConnection(conf); 1241 Admin admin = conn.getAdmin()) { 1242 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1243 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1244 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1245 assertEquals("Should start with empty table", 0, util.countRows(table)); 1246 1247 // deep inspection: get the StoreFile dir 1248 final Path storePath = 1249 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1250 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1251 Bytes.toString(FAMILIES[0]))); 1252 assertEquals(0, fs.listStatus(storePath).length); 1253 1254 // put some data in it and flush to create a storefile 1255 Put p = new Put(Bytes.toBytes("test")); 1256 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1257 table.put(p); 1258 admin.flush(TABLE_NAMES[0]); 1259 assertEquals(1, util.countRows(table)); 1260 quickPoll(new Callable<Boolean>() { 1261 @Override 1262 public Boolean call() throws Exception { 1263 return fs.listStatus(storePath).length == 1; 1264 } 1265 }, 5000); 1266 1267 // Generate a bulk load file with more rows 1268 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1269 1270 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1271 runIncrementalPELoad(conf, 1272 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)), 1273 testDir, false); 1274 1275 // Perform the actual load 1276 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1277 1278 // Ensure data shows up 1279 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1280 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1, 1281 util.countRows(table)); 1282 1283 // should have a second StoreFile now 1284 assertEquals(2, fs.listStatus(storePath).length); 1285 1286 // minor compactions shouldn't get rid of the file 1287 admin.compact(TABLE_NAMES[0]); 1288 try { 1289 quickPoll(new Callable<Boolean>() { 1290 @Override 1291 public Boolean call() throws Exception { 1292 return fs.listStatus(storePath).length == 1; 1293 } 1294 }, 5000); 1295 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1296 } catch (AssertionError ae) { 1297 // this is expected behavior 1298 } 1299 1300 // a major compaction should work though 1301 admin.majorCompact(TABLE_NAMES[0]); 1302 quickPoll(new Callable<Boolean>() { 1303 @Override 1304 public Boolean call() throws Exception { 1305 return fs.listStatus(storePath).length == 1; 1306 } 1307 }, 5000); 1308 1309 } finally { 1310 util.shutdownMiniCluster(); 1311 } 1312 } 1313 1314 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1315 int sleepMs = 10; 1316 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1317 while (retries-- > 0) { 1318 if (c.call().booleanValue()) { 1319 return; 1320 } 1321 Thread.sleep(sleepMs); 1322 } 1323 fail(); 1324 } 1325 1326 public static void main(String args[]) throws Exception { 1327 new TestCellBasedHFileOutputFormat2().manualTest(args); 1328 } 1329 1330 public void manualTest(String args[]) throws Exception { 1331 Configuration conf = HBaseConfiguration.create(); 1332 util = new HBaseTestingUtility(conf); 1333 if ("newtable".equals(args[0])) { 1334 TableName tname = TableName.valueOf(args[1]); 1335 byte[][] splitKeys = generateRandomSplitKeys(4); 1336 Table table = util.createTable(tname, FAMILIES, splitKeys); 1337 } else if ("incremental".equals(args[0])) { 1338 TableName tname = TableName.valueOf(args[1]); 1339 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1340 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1341 Path outDir = new Path("incremental-out"); 1342 runIncrementalPELoad(conf, 1343 Arrays.asList( 1344 new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)), 1345 outDir, false); 1346 } 1347 } else { 1348 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1349 } 1350 } 1351 1352 @Test 1353 public void testBlockStoragePolicy() throws Exception { 1354 util = new HBaseTestingUtility(); 1355 Configuration conf = util.getConfiguration(); 1356 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1357 1358 conf.set( 1359 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1360 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1361 "ONE_SSD"); 1362 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1363 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1364 util.startMiniDFSCluster(3); 1365 FileSystem fs = util.getDFSCluster().getFileSystem(); 1366 try { 1367 fs.mkdirs(cf1Dir); 1368 fs.mkdirs(cf2Dir); 1369 1370 // the original block storage policy would be HOT 1371 String spA = getStoragePolicyName(fs, cf1Dir); 1372 String spB = getStoragePolicyName(fs, cf2Dir); 1373 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1374 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1375 assertEquals("HOT", spA); 1376 assertEquals("HOT", spB); 1377 1378 // alter table cf schema to change storage policies 1379 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1380 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1381 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1382 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1383 spA = getStoragePolicyName(fs, cf1Dir); 1384 spB = getStoragePolicyName(fs, cf2Dir); 1385 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1386 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1387 assertNotNull(spA); 1388 assertEquals("ONE_SSD", spA); 1389 assertNotNull(spB); 1390 assertEquals("ALL_SSD", spB); 1391 } finally { 1392 fs.delete(cf1Dir, true); 1393 fs.delete(cf2Dir, true); 1394 util.shutdownMiniDFSCluster(); 1395 } 1396 } 1397 1398 private String getStoragePolicyName(FileSystem fs, Path path) { 1399 try { 1400 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1401 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1402 } catch (Exception e) { 1403 // Maybe fail because of using old HDFS version, try the old way 1404 if (LOG.isTraceEnabled()) { 1405 LOG.trace("Failed to get policy directly", e); 1406 } 1407 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1408 return policy == null ? "HOT" : policy;// HOT by default 1409 } 1410 } 1411 1412 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1413 try { 1414 if (fs instanceof DistributedFileSystem) { 1415 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1416 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1417 if (null != status) { 1418 byte storagePolicyId = status.getStoragePolicy(); 1419 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1420 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1421 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1422 for (BlockStoragePolicy policy : policies) { 1423 if (policy.getId() == storagePolicyId) { 1424 return policy.getName(); 1425 } 1426 } 1427 } 1428 } 1429 } 1430 } catch (Throwable e) { 1431 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1432 } 1433 1434 return null; 1435 } 1436}