001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNotSame; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Mockito.verify; 029 030import java.io.IOException; 031import java.lang.reflect.Field; 032import java.security.PrivilegedAction; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.HashMap; 036import java.util.LinkedList; 037import java.util.List; 038import java.util.Map; 039import java.util.Map.Entry; 040import java.util.Random; 041import java.util.Set; 042import java.util.UUID; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.ThreadLocalRandom; 048import java.util.stream.Collectors; 049import java.util.stream.Stream; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.LocatedFileStatus; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.hbase.ArrayBackedTag; 057import org.apache.hadoop.hbase.Cell; 058import org.apache.hadoop.hbase.CellUtil; 059import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 060import org.apache.hadoop.hbase.HBaseClassTestRule; 061import org.apache.hadoop.hbase.HBaseConfiguration; 062import org.apache.hadoop.hbase.HBaseTestingUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.HDFSBlocksDistribution; 065import org.apache.hadoop.hbase.HadoopShims; 066import org.apache.hadoop.hbase.KeyValue; 067import org.apache.hadoop.hbase.PerformanceEvaluation; 068import org.apache.hadoop.hbase.PrivateCellUtil; 069import org.apache.hadoop.hbase.ServerName; 070import org.apache.hadoop.hbase.StartTestingClusterOption; 071import org.apache.hadoop.hbase.TableName; 072import org.apache.hadoop.hbase.Tag; 073import org.apache.hadoop.hbase.TagType; 074import org.apache.hadoop.hbase.client.Admin; 075import org.apache.hadoop.hbase.client.AsyncConnection; 076import org.apache.hadoop.hbase.client.BufferedMutator; 077import org.apache.hadoop.hbase.client.BufferedMutatorParams; 078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 079import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 080import org.apache.hadoop.hbase.client.Connection; 081import org.apache.hadoop.hbase.client.ConnectionFactory; 082import org.apache.hadoop.hbase.client.ConnectionUtils; 083import org.apache.hadoop.hbase.client.Hbck; 084import org.apache.hadoop.hbase.client.Put; 085import org.apache.hadoop.hbase.client.RegionLocator; 086import org.apache.hadoop.hbase.client.Result; 087import org.apache.hadoop.hbase.client.ResultScanner; 088import org.apache.hadoop.hbase.client.Scan; 089import org.apache.hadoop.hbase.client.Table; 090import org.apache.hadoop.hbase.client.TableBuilder; 091import org.apache.hadoop.hbase.client.TableDescriptor; 092import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 093import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 094import org.apache.hadoop.hbase.io.compress.Compression; 095import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 096import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 097import org.apache.hadoop.hbase.io.hfile.CacheConfig; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 100import org.apache.hadoop.hbase.io.hfile.HFileScanner; 101import org.apache.hadoop.hbase.regionserver.BloomType; 102import org.apache.hadoop.hbase.regionserver.HRegion; 103import org.apache.hadoop.hbase.regionserver.HStore; 104import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 105import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 106import org.apache.hadoop.hbase.security.User; 107import org.apache.hadoop.hbase.testclassification.LargeTests; 108import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 109import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 110import org.apache.hadoop.hbase.util.Bytes; 111import org.apache.hadoop.hbase.util.CommonFSUtils; 112import org.apache.hadoop.hbase.util.FSUtils; 113import org.apache.hadoop.hbase.util.FutureUtils; 114import org.apache.hadoop.hbase.util.ReflectionUtils; 115import org.apache.hadoop.hdfs.DistributedFileSystem; 116import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 117import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 118import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 119import org.apache.hadoop.io.NullWritable; 120import org.apache.hadoop.mapreduce.Job; 121import org.apache.hadoop.mapreduce.Mapper; 122import org.apache.hadoop.mapreduce.RecordWriter; 123import org.apache.hadoop.mapreduce.TaskAttemptContext; 124import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 125import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 126import org.apache.hadoop.security.UserGroupInformation; 127import org.junit.Assert; 128import org.junit.ClassRule; 129import org.junit.Ignore; 130import org.junit.Test; 131import org.junit.experimental.categories.Category; 132import org.mockito.Mockito; 133import org.slf4j.Logger; 134import org.slf4j.LoggerFactory; 135 136/** 137 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 138 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 139 * values like those of {@link PerformanceEvaluation}. 140 */ 141@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 142public class TestHFileOutputFormat2 { 143 144 @ClassRule 145 public static final HBaseClassTestRule CLASS_RULE = 146 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 147 148 private final static int ROWSPERSPLIT = 1024; 149 150 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 151 private static final byte[][] FAMILIES = 152 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 153 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 154 .map(TableName::valueOf).toArray(TableName[]::new); 155 156 private HBaseTestingUtil util = new HBaseTestingUtil(); 157 158 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 159 160 /** 161 * Simple mapper that makes KeyValue output. 162 */ 163 static class RandomKVGeneratingMapper 164 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 165 166 private int keyLength; 167 private static final int KEYLEN_DEFAULT = 10; 168 private static final String KEYLEN_CONF = "randomkv.key.length"; 169 170 private int valLength; 171 private static final int VALLEN_DEFAULT = 10; 172 private static final String VALLEN_CONF = "randomkv.val.length"; 173 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 174 private boolean multiTableMapper = false; 175 private TableName[] tables = null; 176 177 @Override 178 protected void setup(Context context) throws IOException, InterruptedException { 179 super.setup(context); 180 181 Configuration conf = context.getConfiguration(); 182 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 183 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 184 multiTableMapper = 185 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 186 if (multiTableMapper) { 187 tables = TABLE_NAMES; 188 } else { 189 tables = new TableName[] { TABLE_NAMES[0] }; 190 } 191 } 192 193 @Override 194 protected void map(NullWritable n1, NullWritable n2, 195 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 196 throws java.io.IOException, InterruptedException { 197 198 byte keyBytes[] = new byte[keyLength]; 199 byte valBytes[] = new byte[valLength]; 200 201 int taskId = context.getTaskAttemptID().getTaskID().getId(); 202 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 203 byte[] key; 204 for (int j = 0; j < tables.length; ++j) { 205 for (int i = 0; i < ROWSPERSPLIT; i++) { 206 Bytes.random(keyBytes); 207 // Ensure that unique tasks generate unique keys 208 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 209 Bytes.random(valBytes); 210 key = keyBytes; 211 if (multiTableMapper) { 212 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 213 } 214 215 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 216 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 217 context.write(new ImmutableBytesWritable(key), kv); 218 } 219 } 220 } 221 } 222 } 223 224 /** 225 * Simple mapper that makes Put output. 226 */ 227 static class RandomPutGeneratingMapper 228 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 229 230 private int keyLength; 231 private static final int KEYLEN_DEFAULT = 10; 232 private static final String KEYLEN_CONF = "randomkv.key.length"; 233 234 private int valLength; 235 private static final int VALLEN_DEFAULT = 10; 236 private static final String VALLEN_CONF = "randomkv.val.length"; 237 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 238 private boolean multiTableMapper = false; 239 private TableName[] tables = null; 240 241 @Override 242 protected void setup(Context context) throws IOException, InterruptedException { 243 super.setup(context); 244 245 Configuration conf = context.getConfiguration(); 246 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 247 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 248 multiTableMapper = 249 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 250 if (multiTableMapper) { 251 tables = TABLE_NAMES; 252 } else { 253 tables = new TableName[] { TABLE_NAMES[0] }; 254 } 255 } 256 257 @Override 258 protected void map(NullWritable n1, NullWritable n2, 259 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 260 throws java.io.IOException, InterruptedException { 261 262 byte keyBytes[] = new byte[keyLength]; 263 byte valBytes[] = new byte[valLength]; 264 265 int taskId = context.getTaskAttemptID().getTaskID().getId(); 266 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 267 268 byte[] key; 269 for (int j = 0; j < tables.length; ++j) { 270 for (int i = 0; i < ROWSPERSPLIT; i++) { 271 Bytes.random(keyBytes); 272 // Ensure that unique tasks generate unique keys 273 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 274 Bytes.random(valBytes); 275 key = keyBytes; 276 if (multiTableMapper) { 277 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 278 } 279 280 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 281 Put p = new Put(keyBytes); 282 p.addColumn(family, QUALIFIER, valBytes); 283 // set TTL to very low so that the scan does not return any value 284 p.setTTL(1l); 285 context.write(new ImmutableBytesWritable(key), p); 286 } 287 } 288 } 289 } 290 } 291 292 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 293 if (putSortReducer) { 294 job.setInputFormatClass(NMapInputFormat.class); 295 job.setMapperClass(RandomPutGeneratingMapper.class); 296 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 297 job.setMapOutputValueClass(Put.class); 298 } else { 299 job.setInputFormatClass(NMapInputFormat.class); 300 job.setMapperClass(RandomKVGeneratingMapper.class); 301 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 302 job.setMapOutputValueClass(KeyValue.class); 303 } 304 } 305 306 /** 307 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 308 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 309 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 310 */ 311 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 312 @Test 313 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 314 Configuration conf = new Configuration(this.util.getConfiguration()); 315 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 316 TaskAttemptContext context = null; 317 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 318 try { 319 Job job = new Job(conf); 320 FileOutputFormat.setOutputPath(job, dir); 321 context = createTestTaskAttemptContext(job); 322 HFileOutputFormat2 hof = new HFileOutputFormat2(); 323 writer = hof.getRecordWriter(context); 324 final byte[] b = Bytes.toBytes("b"); 325 326 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 327 // changed by call to write. Check all in kv is same but ts. 328 KeyValue kv = new KeyValue(b, b, b); 329 KeyValue original = kv.clone(); 330 writer.write(new ImmutableBytesWritable(), kv); 331 assertFalse(original.equals(kv)); 332 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 333 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 334 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 335 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 336 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 337 338 // Test 2. Now test passing a kv that has explicit ts. It should not be 339 // changed by call to record write. 340 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 341 original = kv.clone(); 342 writer.write(new ImmutableBytesWritable(), kv); 343 assertTrue(original.equals(kv)); 344 } finally { 345 if (writer != null && context != null) writer.close(context); 346 dir.getFileSystem(conf).delete(dir, true); 347 } 348 } 349 350 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 351 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 352 TaskAttemptContext context = 353 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 354 return context; 355 } 356 357 /* 358 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 359 * time-restricted scans. 360 */ 361 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 362 @Test 363 public void test_TIMERANGE() throws Exception { 364 Configuration conf = new Configuration(this.util.getConfiguration()); 365 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 366 TaskAttemptContext context = null; 367 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 368 LOG.info("Timerange dir writing to dir: " + dir); 369 try { 370 // build a record writer using HFileOutputFormat2 371 Job job = new Job(conf); 372 FileOutputFormat.setOutputPath(job, dir); 373 context = createTestTaskAttemptContext(job); 374 HFileOutputFormat2 hof = new HFileOutputFormat2(); 375 writer = hof.getRecordWriter(context); 376 377 // Pass two key values with explicit times stamps 378 final byte[] b = Bytes.toBytes("b"); 379 380 // value 1 with timestamp 2000 381 KeyValue kv = new KeyValue(b, b, b, 2000, b); 382 KeyValue original = kv.clone(); 383 writer.write(new ImmutableBytesWritable(), kv); 384 assertEquals(original, kv); 385 386 // value 2 with timestamp 1000 387 kv = new KeyValue(b, b, b, 1000, b); 388 original = kv.clone(); 389 writer.write(new ImmutableBytesWritable(), kv); 390 assertEquals(original, kv); 391 392 // verify that the file has the proper FileInfo. 393 writer.close(context); 394 395 // the generated file lives 1 directory down from the attempt directory 396 // and is the only file, e.g. 397 // _attempt__0000_r_000000_0/b/1979617994050536795 398 FileSystem fs = FileSystem.get(conf); 399 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 400 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 401 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 402 403 // open as HFile Reader and pull out TIMERANGE FileInfo. 404 HFile.Reader rd = 405 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 406 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 407 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 408 assertNotNull(range); 409 410 // unmarshall and check values. 411 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 412 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 413 assertEquals(1000, timeRangeTracker.getMin()); 414 assertEquals(2000, timeRangeTracker.getMax()); 415 rd.close(); 416 } finally { 417 if (writer != null && context != null) writer.close(context); 418 dir.getFileSystem(conf).delete(dir, true); 419 } 420 } 421 422 /** 423 * Run small MR job. 424 */ 425 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 426 @Test 427 public void testWritingPEData() throws Exception { 428 Configuration conf = util.getConfiguration(); 429 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 430 FileSystem fs = testDir.getFileSystem(conf); 431 432 // Set down this value or we OOME in eclipse. 433 conf.setInt("mapreduce.task.io.sort.mb", 20); 434 // Write a few files. 435 long hregionMaxFilesize = 10 * 1024; 436 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 437 438 Job job = new Job(conf, "testWritingPEData"); 439 setupRandomGeneratorMapper(job, false); 440 // This partitioner doesn't work well for number keys but using it anyways 441 // just to demonstrate how to configure it. 442 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 443 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 444 445 Arrays.fill(startKey, (byte) 0); 446 Arrays.fill(endKey, (byte) 0xff); 447 448 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 449 // Set start and end rows for partitioner. 450 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 451 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 452 job.setReducerClass(CellSortReducer.class); 453 job.setOutputFormatClass(HFileOutputFormat2.class); 454 job.setNumReduceTasks(4); 455 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 456 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 457 CellSerialization.class.getName()); 458 459 FileOutputFormat.setOutputPath(job, testDir); 460 assertTrue(job.waitForCompletion(false)); 461 FileStatus[] files = fs.listStatus(testDir); 462 assertTrue(files.length > 0); 463 464 // check output file num and size. 465 for (byte[] family : FAMILIES) { 466 long kvCount = 0; 467 RemoteIterator<LocatedFileStatus> iterator = 468 fs.listFiles(testDir.suffix("/" + new String(family)), true); 469 while (iterator.hasNext()) { 470 LocatedFileStatus keyFileStatus = iterator.next(); 471 HFile.Reader reader = 472 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 473 HFileScanner scanner = reader.getScanner(conf, false, false, false); 474 475 kvCount += reader.getEntries(); 476 scanner.seekTo(); 477 long perKVSize = scanner.getCell().getSerializedSize(); 478 assertTrue("Data size of each file should not be too large.", 479 perKVSize * reader.getEntries() <= hregionMaxFilesize); 480 } 481 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 482 } 483 } 484 485 /** 486 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 487 */ 488 @Test 489 public void test_WritingTagData() throws Exception { 490 Configuration conf = new Configuration(this.util.getConfiguration()); 491 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 492 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 493 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 494 TaskAttemptContext context = null; 495 Path dir = util.getDataTestDir("WritingTagData"); 496 try { 497 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 498 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 499 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 500 Job job = new Job(conf); 501 FileOutputFormat.setOutputPath(job, dir); 502 context = createTestTaskAttemptContext(job); 503 HFileOutputFormat2 hof = new HFileOutputFormat2(); 504 writer = hof.getRecordWriter(context); 505 final byte[] b = Bytes.toBytes("b"); 506 507 List<Tag> tags = new ArrayList<>(); 508 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 509 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 510 writer.write(new ImmutableBytesWritable(), kv); 511 writer.close(context); 512 writer = null; 513 FileSystem fs = dir.getFileSystem(conf); 514 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 515 while (iterator.hasNext()) { 516 LocatedFileStatus keyFileStatus = iterator.next(); 517 HFile.Reader reader = 518 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 519 HFileScanner scanner = reader.getScanner(conf, false, false, false); 520 scanner.seekTo(); 521 Cell cell = scanner.getCell(); 522 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 523 assertTrue(tagsFromCell.size() > 0); 524 for (Tag tag : tagsFromCell) { 525 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 526 } 527 } 528 } finally { 529 if (writer != null && context != null) writer.close(context); 530 dir.getFileSystem(conf).delete(dir, true); 531 } 532 } 533 534 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 535 @Test 536 public void testJobConfiguration() throws Exception { 537 Configuration conf = new Configuration(this.util.getConfiguration()); 538 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 539 util.getDataTestDir("testJobConfiguration").toString()); 540 Job job = new Job(conf); 541 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 542 Table table = Mockito.mock(Table.class); 543 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 544 setupMockStartKeys(regionLocator); 545 setupMockTableName(regionLocator); 546 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 547 assertEquals(job.getNumReduceTasks(), 4); 548 } 549 550 private byte[][] generateRandomStartKeys(int numKeys) { 551 Random random = ThreadLocalRandom.current(); 552 byte[][] ret = new byte[numKeys][]; 553 // first region start key is always empty 554 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 555 for (int i = 1; i < numKeys; i++) { 556 ret[i] = 557 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 558 } 559 return ret; 560 } 561 562 private byte[][] generateRandomSplitKeys(int numKeys) { 563 Random random = ThreadLocalRandom.current(); 564 byte[][] ret = new byte[numKeys][]; 565 for (int i = 0; i < numKeys; i++) { 566 ret[i] = 567 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 568 } 569 return ret; 570 } 571 572 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 573 @Test 574 public void testMRIncrementalLoad() throws Exception { 575 LOG.info("\nStarting test testMRIncrementalLoad\n"); 576 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 577 } 578 579 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 580 @Test 581 public void testMRIncrementalLoadWithSplit() throws Exception { 582 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 583 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 584 } 585 586 /** 587 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 588 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 589 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 590 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 591 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 592 * locality features more easily. 593 */ 594 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 595 @Test 596 public void testMRIncrementalLoadWithLocality() throws Exception { 597 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 598 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 599 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 600 } 601 602 // @Ignore("Wahtevs") 603 @Test 604 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 605 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 606 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 607 } 608 609 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 610 boolean putSortReducer, String tableStr) throws Exception { 611 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 612 Arrays.asList(tableStr)); 613 } 614 615 @Test 616 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 617 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 618 doIncrementalLoadTest(false, false, true, 619 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 620 } 621 622 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 623 boolean putSortReducer, List<String> tableStr) throws Exception { 624 util = new HBaseTestingUtil(); 625 Configuration conf = util.getConfiguration(); 626 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 627 int hostCount = 1; 628 int regionNum = 5; 629 if (shouldKeepLocality) { 630 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 631 // explicit hostnames parameter just like MiniDFSCluster does. 632 hostCount = 3; 633 regionNum = 20; 634 } 635 636 String[] hostnames = new String[hostCount]; 637 for (int i = 0; i < hostCount; ++i) { 638 hostnames[i] = "datanode_" + i; 639 } 640 StartTestingClusterOption option = StartTestingClusterOption.builder() 641 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 642 util.startMiniCluster(option); 643 644 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 645 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 646 boolean writeMultipleTables = tableStr.size() > 1; 647 for (String tableStrSingle : tableStr) { 648 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 649 TableName tableName = TableName.valueOf(tableStrSingle); 650 Table table = util.createTable(tableName, FAMILIES, splitKeys); 651 652 RegionLocator r = util.getConnection().getRegionLocator(tableName); 653 assertEquals("Should start with empty table", 0, util.countRows(table)); 654 int numRegions = r.getStartKeys().length; 655 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 656 657 allTables.put(tableStrSingle, table); 658 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r)); 659 } 660 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 661 // Generate the bulk load files 662 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 663 if (writeMultipleTables) { 664 testDir = new Path(testDir, "default"); 665 } 666 667 for (Table tableSingle : allTables.values()) { 668 // This doesn't write into the table, just makes files 669 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 670 } 671 int numTableDirs = 0; 672 FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir); 673 for (FileStatus tf : fss) { 674 Path tablePath = testDir; 675 if (writeMultipleTables) { 676 if (allTables.containsKey(tf.getPath().getName())) { 677 ++numTableDirs; 678 tablePath = tf.getPath(); 679 } else { 680 continue; 681 } 682 } 683 684 // Make sure that a directory was created for every CF 685 int dir = 0; 686 fss = tablePath.getFileSystem(conf).listStatus(tablePath); 687 for (FileStatus f : fss) { 688 for (byte[] family : FAMILIES) { 689 if (Bytes.toString(family).equals(f.getPath().getName())) { 690 ++dir; 691 } 692 } 693 } 694 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 695 } 696 if (writeMultipleTables) { 697 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 698 } 699 700 Admin admin = util.getConnection().getAdmin(); 701 try { 702 // handle the split case 703 if (shouldChangeRegions) { 704 Table chosenTable = allTables.values().iterator().next(); 705 // Choose a semi-random table if multiple tables are available 706 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 707 admin.disableTable(chosenTable.getName()); 708 util.waitUntilNoRegionsInTransition(); 709 710 util.deleteTable(chosenTable.getName()); 711 byte[][] newSplitKeys = generateRandomSplitKeys(14); 712 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 713 714 while ( 715 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 716 .size() != 15 || !admin.isTableAvailable(table.getName()) 717 ) { 718 Thread.sleep(200); 719 LOG.info("Waiting for new region assignment to happen"); 720 } 721 } 722 723 // Perform the actual load 724 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 725 Path tableDir = testDir; 726 String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString(); 727 LOG.info("Running BulkLoadHFiles on table" + tableNameStr); 728 if (writeMultipleTables) { 729 tableDir = new Path(testDir, tableNameStr); 730 } 731 Table currentTable = allTables.get(tableNameStr); 732 TableName currentTableName = currentTable.getName(); 733 BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir); 734 735 // Ensure data shows up 736 int expectedRows = 0; 737 if (putSortReducer) { 738 // no rows should be extracted 739 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 740 util.countRows(currentTable)); 741 } else { 742 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 743 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 744 util.countRows(currentTable)); 745 Scan scan = new Scan(); 746 ResultScanner results = currentTable.getScanner(scan); 747 for (Result res : results) { 748 assertEquals(FAMILIES.length, res.rawCells().length); 749 Cell first = res.rawCells()[0]; 750 for (Cell kv : res.rawCells()) { 751 assertTrue(CellUtil.matchingRows(first, kv)); 752 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 753 } 754 } 755 results.close(); 756 } 757 String tableDigestBefore = util.checksumRows(currentTable); 758 // Check region locality 759 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 760 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 761 hbd.add(region.getHDFSBlocksDistribution()); 762 } 763 for (String hostname : hostnames) { 764 float locality = hbd.getBlockLocalityIndex(hostname); 765 LOG.info("locality of [" + hostname + "]: " + locality); 766 assertEquals(100, (int) (locality * 100)); 767 } 768 769 // Cause regions to reopen 770 admin.disableTable(currentTableName); 771 while (!admin.isTableDisabled(currentTableName)) { 772 Thread.sleep(200); 773 LOG.info("Waiting for table to disable"); 774 } 775 admin.enableTable(currentTableName); 776 util.waitTableAvailable(currentTableName); 777 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 778 util.checksumRows(currentTable)); 779 } 780 } finally { 781 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 782 tableInfoSingle.getRegionLocator().close(); 783 } 784 for (Entry<String, Table> singleTable : allTables.entrySet()) { 785 singleTable.getValue().close(); 786 util.deleteTable(singleTable.getValue().getName()); 787 } 788 testDir.getFileSystem(conf).delete(testDir, true); 789 util.shutdownMiniCluster(); 790 } 791 } 792 793 private void runIncrementalPELoad(Configuration conf, 794 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 795 throws IOException, InterruptedException, ClassNotFoundException { 796 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 797 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 798 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 799 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 800 CellSerialization.class.getName()); 801 setupRandomGeneratorMapper(job, putSortReducer); 802 if (tableInfo.size() > 1) { 803 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 804 int sum = 0; 805 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 806 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 807 } 808 assertEquals(sum, job.getNumReduceTasks()); 809 } else { 810 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 811 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(), 812 regionLocator); 813 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 814 } 815 816 FileOutputFormat.setOutputPath(job, outDir); 817 818 assertFalse(util.getTestFileSystem().exists(outDir)); 819 820 assertTrue(job.waitForCompletion(true)); 821 } 822 823 /** 824 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 825 * family compression map is correctly serialized into and deserialized from configuration 826 */ 827 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 828 @Test 829 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 830 for (int numCfs = 0; numCfs <= 3; numCfs++) { 831 Configuration conf = new Configuration(this.util.getConfiguration()); 832 Map<String, Compression.Algorithm> familyToCompression = 833 getMockColumnFamiliesForCompression(numCfs); 834 Table table = Mockito.mock(Table.class); 835 setupMockColumnFamiliesForCompression(table, familyToCompression); 836 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 837 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 838 Arrays.asList(table.getDescriptor()))); 839 840 // read back family specific compression setting from the configuration 841 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 842 HFileOutputFormat2.createFamilyCompressionMap(conf); 843 844 // test that we have a value for all column families that matches with the 845 // used mock values 846 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 847 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 848 entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey()))); 849 } 850 } 851 } 852 853 private void setupMockColumnFamiliesForCompression(Table table, 854 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 855 856 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 857 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 858 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 859 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 860 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 861 862 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 863 } 864 Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor(); 865 } 866 867 /** 868 * @return a map from column family names to compression algorithms for testing column family 869 * compression. Column family names have special characters 870 */ 871 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 872 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 873 // use column family names having special characters 874 if (numCfs-- > 0) { 875 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 876 } 877 if (numCfs-- > 0) { 878 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 879 } 880 if (numCfs-- > 0) { 881 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 882 } 883 if (numCfs-- > 0) { 884 familyToCompression.put("Family3", Compression.Algorithm.NONE); 885 } 886 return familyToCompression; 887 } 888 889 /** 890 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 891 * family bloom type map is correctly serialized into and deserialized from configuration 892 */ 893 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 894 @Test 895 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 896 for (int numCfs = 0; numCfs <= 2; numCfs++) { 897 Configuration conf = new Configuration(this.util.getConfiguration()); 898 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 899 Table table = Mockito.mock(Table.class); 900 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 901 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 902 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 903 Arrays.asList(table.getDescriptor()))); 904 905 // read back family specific data block encoding settings from the 906 // configuration 907 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 908 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 909 910 // test that we have a value for all column families that matches with the 911 // used mock values 912 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 913 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 914 entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey()))); 915 } 916 } 917 } 918 919 private void setupMockColumnFamiliesForBloomType(Table table, 920 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 921 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 922 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 923 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 924 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 925 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 926 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 927 } 928 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 929 } 930 931 /** 932 * @return a map from column family names to compression algorithms for testing column family 933 * compression. Column family names have special characters 934 */ 935 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 936 Map<String, BloomType> familyToBloomType = new HashMap<>(); 937 // use column family names having special characters 938 if (numCfs-- > 0) { 939 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 940 } 941 if (numCfs-- > 0) { 942 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 943 } 944 if (numCfs-- > 0) { 945 familyToBloomType.put("Family3", BloomType.NONE); 946 } 947 return familyToBloomType; 948 } 949 950 /** 951 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 952 * family block size map is correctly serialized into and deserialized from configuration 953 */ 954 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 955 @Test 956 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 957 for (int numCfs = 0; numCfs <= 3; numCfs++) { 958 Configuration conf = new Configuration(this.util.getConfiguration()); 959 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 960 Table table = Mockito.mock(Table.class); 961 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 962 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 963 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 964 Arrays.asList(table.getDescriptor()))); 965 966 // read back family specific data block encoding settings from the 967 // configuration 968 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 969 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 970 971 // test that we have a value for all column families that matches with the 972 // used mock values 973 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 974 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 975 entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey()))); 976 } 977 } 978 } 979 980 private void setupMockColumnFamiliesForBlockSize(Table table, 981 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 982 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 983 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 984 ColumnFamilyDescriptor columnFamilyDescriptor = 985 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 986 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 987 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 988 } 989 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 990 } 991 992 /** 993 * @return a map from column family names to compression algorithms for testing column family 994 * compression. Column family names have special characters 995 */ 996 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 997 Map<String, Integer> familyToBlockSize = new HashMap<>(); 998 // use column family names having special characters 999 if (numCfs-- > 0) { 1000 familyToBlockSize.put("Family1!@#!@#&", 1234); 1001 } 1002 if (numCfs-- > 0) { 1003 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1004 } 1005 if (numCfs-- > 0) { 1006 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1007 } 1008 if (numCfs-- > 0) { 1009 familyToBlockSize.put("Family3", 0); 1010 } 1011 return familyToBlockSize; 1012 } 1013 1014 /** 1015 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 1016 * the family data block encoding map is correctly serialized into and deserialized from 1017 * configuration 1018 */ 1019 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1020 @Test 1021 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1022 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1023 Configuration conf = new Configuration(this.util.getConfiguration()); 1024 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1025 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1026 Table table = Mockito.mock(Table.class); 1027 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 1028 TableDescriptor tableDescriptor = table.getDescriptor(); 1029 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1030 HFileOutputFormat2.serializeColumnFamilyAttribute( 1031 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 1032 1033 // read back family specific data block encoding settings from the 1034 // configuration 1035 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1036 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1037 1038 // test that we have a value for all column families that matches with the 1039 // used mock values 1040 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1041 assertEquals( 1042 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1043 entry.getValue(), 1044 retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey()))); 1045 } 1046 } 1047 } 1048 1049 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1050 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1051 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1052 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1053 ColumnFamilyDescriptor columnFamilyDescriptor = 1054 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 1055 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0) 1056 .build(); 1057 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 1058 } 1059 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 1060 } 1061 1062 /** 1063 * @return a map from column family names to compression algorithms for testing column family 1064 * compression. Column family names have special characters 1065 */ 1066 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1067 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1068 // use column family names having special characters 1069 if (numCfs-- > 0) { 1070 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1071 } 1072 if (numCfs-- > 0) { 1073 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1074 } 1075 if (numCfs-- > 0) { 1076 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1077 } 1078 if (numCfs-- > 0) { 1079 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1080 } 1081 return familyToDataBlockEncoding; 1082 } 1083 1084 private void setupMockStartKeys(RegionLocator table) throws IOException { 1085 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1086 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1087 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1088 } 1089 1090 private void setupMockTableName(RegionLocator table) throws IOException { 1091 TableName mockTableName = TableName.valueOf("mock_table"); 1092 Mockito.doReturn(mockTableName).when(table).getName(); 1093 } 1094 1095 /** 1096 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1097 * from the column family descriptor 1098 */ 1099 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1100 @Test 1101 public void testColumnFamilySettings() throws Exception { 1102 Configuration conf = new Configuration(this.util.getConfiguration()); 1103 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1104 TaskAttemptContext context = null; 1105 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1106 1107 // Setup table descriptor 1108 Table table = Mockito.mock(Table.class); 1109 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1110 TableDescriptorBuilder tableDescriptorBuilder = 1111 TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1112 1113 Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor(); 1114 for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) { 1115 tableDescriptorBuilder.setColumnFamily(hcd); 1116 } 1117 1118 // set up the table to return some mock keys 1119 setupMockStartKeys(regionLocator); 1120 1121 try { 1122 // partial map red setup to get an operational writer for testing 1123 // We turn off the sequence file compression, because DefaultCodec 1124 // pollutes the GZip codec pool with an incompatible compressor. 1125 conf.set("io.seqfile.compression.type", "NONE"); 1126 conf.set("hbase.fs.tmp.dir", dir.toString()); 1127 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1128 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1129 1130 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1131 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1132 setupRandomGeneratorMapper(job, false); 1133 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 1134 FileOutputFormat.setOutputPath(job, dir); 1135 context = createTestTaskAttemptContext(job); 1136 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1137 writer = hof.getRecordWriter(context); 1138 1139 // write out random rows 1140 writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(), 1141 ROWSPERSPLIT); 1142 writer.close(context); 1143 1144 // Make sure that a directory was created for every CF 1145 FileSystem fs = dir.getFileSystem(conf); 1146 1147 // commit so that the filesystem has one directory per column family 1148 hof.getOutputCommitter(context).commitTask(context); 1149 hof.getOutputCommitter(context).commitJob(context); 1150 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1151 assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length); 1152 for (FileStatus f : families) { 1153 String familyStr = f.getPath().getName(); 1154 ColumnFamilyDescriptor hcd = 1155 tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr)); 1156 // verify that the compression on this file matches the configured 1157 // compression 1158 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1159 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1160 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1161 1162 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1163 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1164 assertEquals( 1165 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1166 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1167 assertEquals( 1168 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1169 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1170 } 1171 } finally { 1172 dir.getFileSystem(conf).delete(dir, true); 1173 } 1174 } 1175 1176 /** 1177 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1178 * family descriptors 1179 */ 1180 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1181 TaskAttemptContext context, Set<byte[]> families, int numRows) 1182 throws IOException, InterruptedException { 1183 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1184 int valLength = 10; 1185 byte valBytes[] = new byte[valLength]; 1186 1187 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1188 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1189 final byte[] qualifier = Bytes.toBytes("data"); 1190 for (int i = 0; i < numRows; i++) { 1191 Bytes.putInt(keyBytes, 0, i); 1192 Bytes.random(valBytes); 1193 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1194 for (byte[] family : families) { 1195 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1196 writer.write(key, kv); 1197 } 1198 } 1199 } 1200 1201 /** 1202 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1203 * excluded from minor compaction. Without the fix of HBASE-6901, an 1204 * ArrayIndexOutOfBoundsException will be thrown. 1205 */ 1206 @Ignore("Flakey: See HBASE-9051") 1207 @Test 1208 public void testExcludeAllFromMinorCompaction() throws Exception { 1209 Configuration conf = util.getConfiguration(); 1210 conf.setInt("hbase.hstore.compaction.min", 2); 1211 generateRandomStartKeys(5); 1212 1213 util.startMiniCluster(); 1214 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1215 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1216 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1217 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1218 assertEquals("Should start with empty table", 0, util.countRows(table)); 1219 1220 // deep inspection: get the StoreFile dir 1221 final Path storePath = 1222 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1223 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1224 Bytes.toString(FAMILIES[0]))); 1225 assertEquals(0, fs.listStatus(storePath).length); 1226 1227 // Generate two bulk load files 1228 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1229 1230 for (int i = 0; i < 2; i++) { 1231 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1232 runIncrementalPELoad(conf, 1233 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), 1234 conn.getRegionLocator(TABLE_NAMES[0]))), 1235 testDir, false); 1236 // Perform the actual load 1237 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1238 } 1239 1240 // Ensure data shows up 1241 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1242 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 1243 util.countRows(table)); 1244 1245 // should have a second StoreFile now 1246 assertEquals(2, fs.listStatus(storePath).length); 1247 1248 // minor compactions shouldn't get rid of the file 1249 admin.compact(TABLE_NAMES[0]); 1250 try { 1251 quickPoll(new Callable<Boolean>() { 1252 @Override 1253 public Boolean call() throws Exception { 1254 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1255 for (HRegion region : regions) { 1256 for (HStore store : region.getStores()) { 1257 store.closeAndArchiveCompactedFiles(); 1258 } 1259 } 1260 return fs.listStatus(storePath).length == 1; 1261 } 1262 }, 5000); 1263 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1264 } catch (AssertionError ae) { 1265 // this is expected behavior 1266 } 1267 1268 // a major compaction should work though 1269 admin.majorCompact(TABLE_NAMES[0]); 1270 quickPoll(new Callable<Boolean>() { 1271 @Override 1272 public Boolean call() throws Exception { 1273 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1274 for (HRegion region : regions) { 1275 for (HStore store : region.getStores()) { 1276 store.closeAndArchiveCompactedFiles(); 1277 } 1278 } 1279 return fs.listStatus(storePath).length == 1; 1280 } 1281 }, 5000); 1282 1283 } finally { 1284 util.shutdownMiniCluster(); 1285 } 1286 } 1287 1288 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1289 @Test 1290 public void testExcludeMinorCompaction() throws Exception { 1291 Configuration conf = util.getConfiguration(); 1292 conf.setInt("hbase.hstore.compaction.min", 2); 1293 generateRandomStartKeys(5); 1294 1295 util.startMiniCluster(); 1296 try (Connection conn = ConnectionFactory.createConnection(conf); 1297 Admin admin = conn.getAdmin()) { 1298 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1299 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1300 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1301 assertEquals("Should start with empty table", 0, util.countRows(table)); 1302 1303 // deep inspection: get the StoreFile dir 1304 final Path storePath = 1305 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1306 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1307 Bytes.toString(FAMILIES[0]))); 1308 assertEquals(0, fs.listStatus(storePath).length); 1309 1310 // put some data in it and flush to create a storefile 1311 Put p = new Put(Bytes.toBytes("test")); 1312 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1313 table.put(p); 1314 admin.flush(TABLE_NAMES[0]); 1315 assertEquals(1, util.countRows(table)); 1316 quickPoll(new Callable<Boolean>() { 1317 @Override 1318 public Boolean call() throws Exception { 1319 return fs.listStatus(storePath).length == 1; 1320 } 1321 }, 5000); 1322 1323 // Generate a bulk load file with more rows 1324 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1325 1326 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1327 runIncrementalPELoad(conf, 1328 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)), 1329 testDir, false); 1330 1331 // Perform the actual load 1332 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1333 1334 // Ensure data shows up 1335 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1336 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1, 1337 util.countRows(table)); 1338 1339 // should have a second StoreFile now 1340 assertEquals(2, fs.listStatus(storePath).length); 1341 1342 // minor compactions shouldn't get rid of the file 1343 admin.compact(TABLE_NAMES[0]); 1344 try { 1345 quickPoll(new Callable<Boolean>() { 1346 @Override 1347 public Boolean call() throws Exception { 1348 return fs.listStatus(storePath).length == 1; 1349 } 1350 }, 5000); 1351 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1352 } catch (AssertionError ae) { 1353 // this is expected behavior 1354 } 1355 1356 // a major compaction should work though 1357 admin.majorCompact(TABLE_NAMES[0]); 1358 quickPoll(new Callable<Boolean>() { 1359 @Override 1360 public Boolean call() throws Exception { 1361 return fs.listStatus(storePath).length == 1; 1362 } 1363 }, 5000); 1364 1365 } finally { 1366 util.shutdownMiniCluster(); 1367 } 1368 } 1369 1370 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1371 int sleepMs = 10; 1372 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1373 while (retries-- > 0) { 1374 if (c.call().booleanValue()) { 1375 return; 1376 } 1377 Thread.sleep(sleepMs); 1378 } 1379 fail(); 1380 } 1381 1382 public static void main(String args[]) throws Exception { 1383 new TestHFileOutputFormat2().manualTest(args); 1384 } 1385 1386 public void manualTest(String args[]) throws Exception { 1387 Configuration conf = HBaseConfiguration.create(); 1388 util = new HBaseTestingUtil(conf); 1389 if ("newtable".equals(args[0])) { 1390 TableName tname = TableName.valueOf(args[1]); 1391 byte[][] splitKeys = generateRandomSplitKeys(4); 1392 Table table = util.createTable(tname, FAMILIES, splitKeys); 1393 } else if ("incremental".equals(args[0])) { 1394 TableName tname = TableName.valueOf(args[1]); 1395 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1396 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1397 Path outDir = new Path("incremental-out"); 1398 runIncrementalPELoad(conf, 1399 Arrays 1400 .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)), 1401 outDir, false); 1402 } 1403 } else { 1404 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1405 } 1406 } 1407 1408 @Test 1409 public void testBlockStoragePolicy() throws Exception { 1410 util = new HBaseTestingUtil(); 1411 Configuration conf = util.getConfiguration(); 1412 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1413 1414 conf.set( 1415 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1416 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1417 "ONE_SSD"); 1418 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1419 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1420 util.startMiniDFSCluster(3); 1421 FileSystem fs = util.getDFSCluster().getFileSystem(); 1422 try { 1423 fs.mkdirs(cf1Dir); 1424 fs.mkdirs(cf2Dir); 1425 1426 // the original block storage policy would be HOT 1427 String spA = getStoragePolicyName(fs, cf1Dir); 1428 String spB = getStoragePolicyName(fs, cf2Dir); 1429 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1430 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1431 assertEquals("HOT", spA); 1432 assertEquals("HOT", spB); 1433 1434 // alter table cf schema to change storage policies 1435 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1436 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1437 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1438 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1439 spA = getStoragePolicyName(fs, cf1Dir); 1440 spB = getStoragePolicyName(fs, cf2Dir); 1441 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1442 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1443 assertNotNull(spA); 1444 assertEquals("ONE_SSD", spA); 1445 assertNotNull(spB); 1446 assertEquals("ALL_SSD", spB); 1447 } finally { 1448 fs.delete(cf1Dir, true); 1449 fs.delete(cf2Dir, true); 1450 util.shutdownMiniDFSCluster(); 1451 } 1452 } 1453 1454 private String getStoragePolicyName(FileSystem fs, Path path) { 1455 try { 1456 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1457 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1458 } catch (Exception e) { 1459 // Maybe fail because of using old HDFS version, try the old way 1460 if (LOG.isTraceEnabled()) { 1461 LOG.trace("Failed to get policy directly", e); 1462 } 1463 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1464 return policy == null ? "HOT" : policy;// HOT by default 1465 } 1466 } 1467 1468 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1469 try { 1470 if (fs instanceof DistributedFileSystem) { 1471 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1472 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1473 if (null != status) { 1474 byte storagePolicyId = status.getStoragePolicy(); 1475 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1476 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1477 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1478 for (BlockStoragePolicy policy : policies) { 1479 if (policy.getId() == storagePolicyId) { 1480 return policy.getName(); 1481 } 1482 } 1483 } 1484 } 1485 } 1486 } catch (Throwable e) { 1487 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1488 } 1489 1490 return null; 1491 } 1492 1493 @Test 1494 public void TestConfigurePartitioner() throws IOException { 1495 Configuration conf = util.getConfiguration(); 1496 // Create a user who is not the current user 1497 String fooUserName = "foo1234"; 1498 String fooGroupName = "group1"; 1499 UserGroupInformation ugi = 1500 UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName }); 1501 // Get user's home directory 1502 Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() { 1503 @Override 1504 public Path run() { 1505 try (FileSystem fs = FileSystem.get(conf)) { 1506 return fs.makeQualified(fs.getHomeDirectory()); 1507 } catch (IOException ioe) { 1508 LOG.error("Failed to get foo's home directory", ioe); 1509 } 1510 return null; 1511 } 1512 }); 1513 1514 Job job = Mockito.mock(Job.class); 1515 Mockito.doReturn(conf).when(job).getConfiguration(); 1516 ImmutableBytesWritable writable = new ImmutableBytesWritable(); 1517 List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>(); 1518 splitPoints.add(writable); 1519 1520 ugi.doAs(new PrivilegedAction<Void>() { 1521 @Override 1522 public Void run() { 1523 try { 1524 HFileOutputFormat2.configurePartitioner(job, splitPoints, false); 1525 } catch (IOException ioe) { 1526 LOG.error("Failed to configure partitioner", ioe); 1527 } 1528 return null; 1529 } 1530 }); 1531 FileSystem fs = FileSystem.get(conf); 1532 // verify that the job uses TotalOrderPartitioner 1533 verify(job).setPartitionerClass(TotalOrderPartitioner.class); 1534 // verify that TotalOrderPartitioner.setPartitionFile() is called. 1535 String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path"); 1536 Assert.assertNotNull(partitionPathString); 1537 // Make sure the partion file is in foo1234's home directory, and that 1538 // the file exists. 1539 Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString())); 1540 Assert.assertTrue(fs.exists(new Path(partitionPathString))); 1541 } 1542 1543 @Test 1544 public void TestConfigureCompression() throws Exception { 1545 Configuration conf = new Configuration(this.util.getConfiguration()); 1546 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1547 TaskAttemptContext context = null; 1548 Path dir = util.getDataTestDir("TestConfigureCompression"); 1549 String hfileoutputformatCompression = "gz"; 1550 1551 try { 1552 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1553 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1554 1555 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1556 1557 Job job = Job.getInstance(conf); 1558 FileOutputFormat.setOutputPath(job, dir); 1559 context = createTestTaskAttemptContext(job); 1560 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1561 writer = hof.getRecordWriter(context); 1562 final byte[] b = Bytes.toBytes("b"); 1563 1564 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1565 writer.write(new ImmutableBytesWritable(), kv); 1566 writer.close(context); 1567 writer = null; 1568 FileSystem fs = dir.getFileSystem(conf); 1569 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1570 while (iterator.hasNext()) { 1571 LocatedFileStatus keyFileStatus = iterator.next(); 1572 HFile.Reader reader = 1573 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1574 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1575 hfileoutputformatCompression); 1576 } 1577 } finally { 1578 if (writer != null && context != null) { 1579 writer.close(context); 1580 } 1581 dir.getFileSystem(conf).delete(dir, true); 1582 } 1583 1584 } 1585 1586 @Test 1587 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1588 // Start cluster A 1589 util = new HBaseTestingUtil(); 1590 Configuration confA = util.getConfiguration(); 1591 int hostCount = 3; 1592 int regionNum = 20; 1593 String[] hostnames = new String[hostCount]; 1594 for (int i = 0; i < hostCount; ++i) { 1595 hostnames[i] = "datanode_" + i; 1596 } 1597 StartTestingClusterOption option = StartTestingClusterOption.builder() 1598 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1599 util.startMiniCluster(option); 1600 1601 // Start cluster B 1602 HBaseTestingUtil utilB = new HBaseTestingUtil(); 1603 Configuration confB = utilB.getConfiguration(); 1604 utilB.startMiniCluster(option); 1605 1606 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1607 1608 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1609 TableName tableName = TableName.valueOf("table"); 1610 // Create table in cluster B 1611 try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); 1612 RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { 1613 // Generate the bulk load files 1614 // Job has zookeeper configuration for cluster A 1615 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1616 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1617 Configuration jobConf = job.getConfiguration(); 1618 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1619 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 1620 setupRandomGeneratorMapper(job, false); 1621 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1622 1623 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1624 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1625 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1626 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1627 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1628 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1629 1630 String bSpecificConfigKey = "my.override.config.for.b"; 1631 String bSpecificConfigValue = "b-specific-value"; 1632 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1633 bSpecificConfigValue); 1634 1635 FileOutputFormat.setOutputPath(job, testDir); 1636 1637 assertFalse(util.getTestFileSystem().exists(testDir)); 1638 1639 assertTrue(job.waitForCompletion(true)); 1640 1641 final List<Configuration> configs = 1642 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1643 1644 assertFalse(configs.isEmpty()); 1645 for (Configuration config : configs) { 1646 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1647 config.get(HConstants.ZOOKEEPER_QUORUM)); 1648 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1649 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1650 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1651 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1652 1653 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1654 } 1655 } finally { 1656 utilB.deleteTable(tableName); 1657 testDir.getFileSystem(confA).delete(testDir, true); 1658 util.shutdownMiniCluster(); 1659 utilB.shutdownMiniCluster(); 1660 } 1661 } 1662 1663 private static class ConfigurationCaptorConnection implements Connection { 1664 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1665 1666 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1667 1668 private final Connection delegate; 1669 1670 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user) 1671 throws IOException { 1672 delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); 1673 1674 final String uuid = conf.get(UUID_KEY); 1675 if (uuid != null) { 1676 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1677 } 1678 } 1679 1680 static UUID configureConnectionImpl(Configuration conf) { 1681 conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 1682 ConfigurationCaptorConnection.class, Connection.class); 1683 1684 final UUID uuid = UUID.randomUUID(); 1685 conf.set(UUID_KEY, uuid.toString()); 1686 return uuid; 1687 } 1688 1689 static List<Configuration> getCapturedConfigarutions(UUID key) { 1690 return confs.get(key); 1691 } 1692 1693 @Override 1694 public Configuration getConfiguration() { 1695 return delegate.getConfiguration(); 1696 } 1697 1698 @Override 1699 public Table getTable(TableName tableName) throws IOException { 1700 return delegate.getTable(tableName); 1701 } 1702 1703 @Override 1704 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1705 return delegate.getTable(tableName, pool); 1706 } 1707 1708 @Override 1709 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1710 return delegate.getBufferedMutator(tableName); 1711 } 1712 1713 @Override 1714 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1715 return delegate.getBufferedMutator(params); 1716 } 1717 1718 @Override 1719 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1720 return delegate.getRegionLocator(tableName); 1721 } 1722 1723 @Override 1724 public void clearRegionLocationCache() { 1725 delegate.clearRegionLocationCache(); 1726 } 1727 1728 @Override 1729 public Admin getAdmin() throws IOException { 1730 return delegate.getAdmin(); 1731 } 1732 1733 @Override 1734 public void close() throws IOException { 1735 delegate.close(); 1736 } 1737 1738 @Override 1739 public boolean isClosed() { 1740 return delegate.isClosed(); 1741 } 1742 1743 @Override 1744 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1745 return delegate.getTableBuilder(tableName, pool); 1746 } 1747 1748 @Override 1749 public AsyncConnection toAsyncConnection() { 1750 return delegate.toAsyncConnection(); 1751 } 1752 1753 @Override 1754 public String getClusterId() { 1755 return delegate.getClusterId(); 1756 } 1757 1758 @Override 1759 public Hbck getHbck() throws IOException { 1760 return delegate.getHbck(); 1761 } 1762 1763 @Override 1764 public Hbck getHbck(ServerName masterServer) throws IOException { 1765 return delegate.getHbck(masterServer); 1766 } 1767 1768 @Override 1769 public void abort(String why, Throwable e) { 1770 delegate.abort(why, e); 1771 } 1772 1773 @Override 1774 public boolean isAborted() { 1775 return delegate.isAborted(); 1776 } 1777 } 1778 1779}