001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Set; 037import java.util.UUID; 038import java.util.concurrent.Callable; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.CopyOnWriteArrayList; 041import java.util.concurrent.ExecutorService; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.LocatedFileStatus; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.fs.RemoteIterator; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellUtil; 051import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 052import org.apache.hadoop.hbase.ExtendedCell; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HBaseTestingUtil; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HadoopShims; 058import org.apache.hadoop.hbase.KeyValue; 059import org.apache.hadoop.hbase.PrivateCellUtil; 060import org.apache.hadoop.hbase.ServerName; 061import org.apache.hadoop.hbase.StartTestingClusterOption; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.Tag; 064import org.apache.hadoop.hbase.TagType; 065import org.apache.hadoop.hbase.client.Admin; 066import org.apache.hadoop.hbase.client.AsyncConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.BufferedMutatorParams; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.ConnectionRegistry; 074import org.apache.hadoop.hbase.client.ConnectionUtils; 075import org.apache.hadoop.hbase.client.Hbck; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionLocator; 078import org.apache.hadoop.hbase.client.Table; 079import org.apache.hadoop.hbase.client.TableBuilder; 080import org.apache.hadoop.hbase.client.TableDescriptor; 081import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 082import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 083import org.apache.hadoop.hbase.io.compress.Compression; 084import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 085import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 086import org.apache.hadoop.hbase.io.hfile.CacheConfig; 087import org.apache.hadoop.hbase.io.hfile.HFile; 088import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 089import org.apache.hadoop.hbase.io.hfile.HFileScanner; 090import org.apache.hadoop.hbase.regionserver.BloomType; 091import org.apache.hadoop.hbase.regionserver.HRegion; 092import org.apache.hadoop.hbase.regionserver.HStore; 093import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 094import org.apache.hadoop.hbase.security.User; 095import org.apache.hadoop.hbase.testclassification.LargeTests; 096import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 097import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.CommonFSUtils; 100import org.apache.hadoop.hbase.util.FSUtils; 101import org.apache.hadoop.hbase.util.FutureUtils; 102import org.apache.hadoop.hbase.util.ReflectionUtils; 103import org.apache.hadoop.hdfs.DistributedFileSystem; 104import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 105import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 106import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 107import org.apache.hadoop.mapreduce.Job; 108import org.apache.hadoop.mapreduce.RecordWriter; 109import org.apache.hadoop.mapreduce.TaskAttemptContext; 110import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 111import org.junit.ClassRule; 112import org.junit.Ignore; 113import org.junit.Test; 114import org.junit.experimental.categories.Category; 115import org.mockito.Mockito; 116import org.slf4j.Logger; 117import org.slf4j.LoggerFactory; 118 119/** 120 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 121 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 122 * values. 123 */ 124@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 125public class TestHFileOutputFormat2 extends HFileOutputFormat2TestBase { 126 127 @ClassRule 128 public static final HBaseClassTestRule CLASS_RULE = 129 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 130 131 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 132 133 /** 134 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 135 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 136 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 137 */ 138 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 139 @Test 140 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 141 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 142 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 143 TaskAttemptContext context = null; 144 Path dir = UTIL.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 145 try { 146 Job job = new Job(conf); 147 FileOutputFormat.setOutputPath(job, dir); 148 context = createTestTaskAttemptContext(job); 149 HFileOutputFormat2 hof = new HFileOutputFormat2(); 150 writer = hof.getRecordWriter(context); 151 final byte[] b = Bytes.toBytes("b"); 152 153 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 154 // changed by call to write. Check all in kv is same but ts. 155 KeyValue kv = new KeyValue(b, b, b); 156 KeyValue original = kv.clone(); 157 writer.write(new ImmutableBytesWritable(), kv); 158 assertFalse(original.equals(kv)); 159 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 160 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 161 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 162 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 163 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 164 165 // Test 2. Now test passing a kv that has explicit ts. It should not be 166 // changed by call to record write. 167 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 168 original = kv.clone(); 169 writer.write(new ImmutableBytesWritable(), kv); 170 assertTrue(original.equals(kv)); 171 } finally { 172 if (writer != null && context != null) writer.close(context); 173 dir.getFileSystem(conf).delete(dir, true); 174 } 175 } 176 177 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 178 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 179 TaskAttemptContext context = 180 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 181 return context; 182 } 183 184 /* 185 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 186 * time-restricted scans. 187 */ 188 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 189 @Test 190 public void test_TIMERANGE() throws Exception { 191 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 192 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 193 TaskAttemptContext context = null; 194 Path dir = UTIL.getDataTestDir("test_TIMERANGE_present"); 195 LOG.info("Timerange dir writing to dir: " + dir); 196 try { 197 // build a record writer using HFileOutputFormat2 198 Job job = new Job(conf); 199 FileOutputFormat.setOutputPath(job, dir); 200 context = createTestTaskAttemptContext(job); 201 HFileOutputFormat2 hof = new HFileOutputFormat2(); 202 writer = hof.getRecordWriter(context); 203 204 // Pass two key values with explicit times stamps 205 final byte[] b = Bytes.toBytes("b"); 206 207 // value 1 with timestamp 2000 208 KeyValue kv = new KeyValue(b, b, b, 2000, b); 209 KeyValue original = kv.clone(); 210 writer.write(new ImmutableBytesWritable(), kv); 211 assertEquals(original, kv); 212 213 // value 2 with timestamp 1000 214 kv = new KeyValue(b, b, b, 1000, b); 215 original = kv.clone(); 216 writer.write(new ImmutableBytesWritable(), kv); 217 assertEquals(original, kv); 218 219 // verify that the file has the proper FileInfo. 220 writer.close(context); 221 222 // the generated file lives 1 directory down from the attempt directory 223 // and is the only file, e.g. 224 // _attempt__0000_r_000000_0/b/1979617994050536795 225 FileSystem fs = FileSystem.get(conf); 226 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 227 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 228 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 229 230 // open as HFile Reader and pull out TIMERANGE FileInfo. 231 HFile.Reader rd = 232 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 233 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 234 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 235 assertNotNull(range); 236 237 // unmarshall and check values. 238 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 239 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 240 assertEquals(1000, timeRangeTracker.getMin()); 241 assertEquals(2000, timeRangeTracker.getMax()); 242 rd.close(); 243 } finally { 244 if (writer != null && context != null) writer.close(context); 245 dir.getFileSystem(conf).delete(dir, true); 246 } 247 } 248 249 /** 250 * Run small MR job. 251 */ 252 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 253 @Test 254 public void testWritingPEData() throws Exception { 255 Configuration conf = UTIL.getConfiguration(); 256 Path testDir = UTIL.getDataTestDirOnTestFS("testWritingPEData"); 257 FileSystem fs = testDir.getFileSystem(conf); 258 259 // Set down this value or we OOME in eclipse. 260 conf.setInt("mapreduce.task.io.sort.mb", 20); 261 // Write a few files. 262 long hregionMaxFilesize = 10 * 1024; 263 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 264 265 Job job = new Job(conf, "testWritingPEData"); 266 setupRandomGeneratorMapper(job, false); 267 // This partitioner doesn't work well for number keys but using it anyways 268 // just to demonstrate how to configure it. 269 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 270 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 271 272 Arrays.fill(startKey, (byte) 0); 273 Arrays.fill(endKey, (byte) 0xff); 274 275 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 276 // Set start and end rows for partitioner. 277 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 278 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 279 job.setReducerClass(CellSortReducer.class); 280 job.setOutputFormatClass(HFileOutputFormat2.class); 281 job.setNumReduceTasks(4); 282 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 283 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 284 CellSerialization.class.getName()); 285 286 FileOutputFormat.setOutputPath(job, testDir); 287 assertTrue(job.waitForCompletion(false)); 288 FileStatus[] files = fs.listStatus(testDir); 289 assertTrue(files.length > 0); 290 291 // check output file num and size. 292 for (byte[] family : FAMILIES) { 293 long kvCount = 0; 294 RemoteIterator<LocatedFileStatus> iterator = 295 fs.listFiles(testDir.suffix("/" + new String(family)), true); 296 while (iterator.hasNext()) { 297 LocatedFileStatus keyFileStatus = iterator.next(); 298 HFile.Reader reader = 299 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 300 HFileScanner scanner = reader.getScanner(conf, false, false, false); 301 302 kvCount += reader.getEntries(); 303 scanner.seekTo(); 304 long perKVSize = scanner.getCell().getSerializedSize(); 305 assertTrue("Data size of each file should not be too large.", 306 perKVSize * reader.getEntries() <= hregionMaxFilesize); 307 } 308 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 309 } 310 } 311 312 /** 313 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 314 */ 315 @Test 316 public void test_WritingTagData() throws Exception { 317 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 318 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 319 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 320 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 321 TaskAttemptContext context = null; 322 Path dir = UTIL.getDataTestDir("WritingTagData"); 323 try { 324 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 325 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 326 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 327 Job job = new Job(conf); 328 FileOutputFormat.setOutputPath(job, dir); 329 context = createTestTaskAttemptContext(job); 330 HFileOutputFormat2 hof = new HFileOutputFormat2(); 331 writer = hof.getRecordWriter(context); 332 final byte[] b = Bytes.toBytes("b"); 333 334 List<Tag> tags = new ArrayList<>(); 335 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 336 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 337 writer.write(new ImmutableBytesWritable(), kv); 338 writer.close(context); 339 writer = null; 340 FileSystem fs = dir.getFileSystem(conf); 341 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 342 while (iterator.hasNext()) { 343 LocatedFileStatus keyFileStatus = iterator.next(); 344 HFile.Reader reader = 345 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 346 HFileScanner scanner = reader.getScanner(conf, false, false, false); 347 scanner.seekTo(); 348 ExtendedCell cell = scanner.getCell(); 349 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 350 assertTrue(tagsFromCell.size() > 0); 351 for (Tag tag : tagsFromCell) { 352 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 353 } 354 } 355 } finally { 356 if (writer != null && context != null) writer.close(context); 357 dir.getFileSystem(conf).delete(dir, true); 358 } 359 } 360 361 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 362 @Test 363 public void testJobConfiguration() throws Exception { 364 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 365 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 366 UTIL.getDataTestDir("testJobConfiguration").toString()); 367 Job job = new Job(conf); 368 job.setWorkingDirectory(UTIL.getDataTestDir("testJobConfiguration")); 369 Table table = Mockito.mock(Table.class); 370 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 371 setupMockStartKeys(regionLocator); 372 setupMockTableName(regionLocator); 373 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 374 assertEquals(job.getNumReduceTasks(), 4); 375 } 376 377 /** 378 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 379 * family compression map is correctly serialized into and deserialized from configuration 380 */ 381 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 382 @Test 383 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 384 for (int numCfs = 0; numCfs <= 3; numCfs++) { 385 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 386 Map<String, Compression.Algorithm> familyToCompression = 387 getMockColumnFamiliesForCompression(numCfs); 388 Table table = Mockito.mock(Table.class); 389 setupMockColumnFamiliesForCompression(table, familyToCompression); 390 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 391 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 392 Arrays.asList(table.getDescriptor()))); 393 394 // read back family specific compression setting from the configuration 395 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 396 HFileOutputFormat2.createFamilyCompressionMap(conf); 397 398 // test that we have a value for all column families that matches with the 399 // used mock values 400 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 401 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 402 entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey()))); 403 } 404 } 405 } 406 407 private void setupMockColumnFamiliesForCompression(Table table, 408 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 409 410 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 411 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 412 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 413 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 414 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 415 416 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 417 } 418 Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor(); 419 } 420 421 /** 422 * @return a map from column family names to compression algorithms for testing column family 423 * compression. Column family names have special characters 424 */ 425 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 426 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 427 // use column family names having special characters 428 if (numCfs-- > 0) { 429 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 430 } 431 if (numCfs-- > 0) { 432 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 433 } 434 if (numCfs-- > 0) { 435 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 436 } 437 if (numCfs-- > 0) { 438 familyToCompression.put("Family3", Compression.Algorithm.NONE); 439 } 440 return familyToCompression; 441 } 442 443 /** 444 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 445 * family bloom type map is correctly serialized into and deserialized from configuration 446 */ 447 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 448 @Test 449 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 450 for (int numCfs = 0; numCfs <= 2; numCfs++) { 451 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 452 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 453 Table table = Mockito.mock(Table.class); 454 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 455 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 456 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 457 Arrays.asList(table.getDescriptor()))); 458 459 // read back family specific data block encoding settings from the 460 // configuration 461 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 462 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 463 464 // test that we have a value for all column families that matches with the 465 // used mock values 466 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 467 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 468 entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey()))); 469 } 470 } 471 } 472 473 private void setupMockColumnFamiliesForBloomType(Table table, 474 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 475 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 476 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 477 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 478 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 479 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 480 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 481 } 482 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 483 } 484 485 /** 486 * @return a map from column family names to compression algorithms for testing column family 487 * compression. Column family names have special characters 488 */ 489 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 490 Map<String, BloomType> familyToBloomType = new HashMap<>(); 491 // use column family names having special characters 492 if (numCfs-- > 0) { 493 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 494 } 495 if (numCfs-- > 0) { 496 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 497 } 498 if (numCfs-- > 0) { 499 familyToBloomType.put("Family3", BloomType.NONE); 500 } 501 return familyToBloomType; 502 } 503 504 /** 505 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 506 * family block size map is correctly serialized into and deserialized from configuration 507 */ 508 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 509 @Test 510 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 511 for (int numCfs = 0; numCfs <= 3; numCfs++) { 512 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 513 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 514 Table table = Mockito.mock(Table.class); 515 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 516 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 517 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 518 Arrays.asList(table.getDescriptor()))); 519 520 // read back family specific data block encoding settings from the 521 // configuration 522 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 523 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 524 525 // test that we have a value for all column families that matches with the 526 // used mock values 527 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 528 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 529 entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey()))); 530 } 531 } 532 } 533 534 private void setupMockColumnFamiliesForBlockSize(Table table, 535 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 536 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 537 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 538 ColumnFamilyDescriptor columnFamilyDescriptor = 539 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 540 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 541 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 542 } 543 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 544 } 545 546 /** 547 * @return a map from column family names to compression algorithms for testing column family 548 * compression. Column family names have special characters 549 */ 550 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 551 Map<String, Integer> familyToBlockSize = new HashMap<>(); 552 // use column family names having special characters 553 if (numCfs-- > 0) { 554 familyToBlockSize.put("Family1!@#!@#&", 1234); 555 } 556 if (numCfs-- > 0) { 557 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 558 } 559 if (numCfs-- > 0) { 560 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 561 } 562 if (numCfs-- > 0) { 563 familyToBlockSize.put("Family3", 0); 564 } 565 return familyToBlockSize; 566 } 567 568 /** 569 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 570 * the family data block encoding map is correctly serialized into and deserialized from 571 * configuration 572 */ 573 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 574 @Test 575 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 576 for (int numCfs = 0; numCfs <= 3; numCfs++) { 577 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 578 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 579 getMockColumnFamiliesForDataBlockEncoding(numCfs); 580 Table table = Mockito.mock(Table.class); 581 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 582 TableDescriptor tableDescriptor = table.getDescriptor(); 583 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 584 HFileOutputFormat2.serializeColumnFamilyAttribute( 585 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 586 587 // read back family specific data block encoding settings from the 588 // configuration 589 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 590 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 591 592 // test that we have a value for all column families that matches with the 593 // used mock values 594 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 595 assertEquals( 596 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 597 entry.getValue(), 598 retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey()))); 599 } 600 } 601 } 602 603 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 604 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 605 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 606 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 607 ColumnFamilyDescriptor columnFamilyDescriptor = 608 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 609 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0) 610 .build(); 611 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 612 } 613 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 614 } 615 616 /** 617 * @return a map from column family names to compression algorithms for testing column family 618 * compression. Column family names have special characters 619 */ 620 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 621 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 622 // use column family names having special characters 623 if (numCfs-- > 0) { 624 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 625 } 626 if (numCfs-- > 0) { 627 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 628 } 629 if (numCfs-- > 0) { 630 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 631 } 632 if (numCfs-- > 0) { 633 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 634 } 635 return familyToDataBlockEncoding; 636 } 637 638 private void setupMockStartKeys(RegionLocator table) throws IOException { 639 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 640 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 641 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 642 } 643 644 private void setupMockTableName(RegionLocator table) throws IOException { 645 TableName mockTableName = TableName.valueOf("mock_table"); 646 Mockito.doReturn(mockTableName).when(table).getName(); 647 } 648 649 /** 650 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 651 * from the column family descriptor 652 */ 653 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 654 @Test 655 public void testColumnFamilySettings() throws Exception { 656 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 657 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 658 TaskAttemptContext context = null; 659 Path dir = UTIL.getDataTestDir("testColumnFamilySettings"); 660 661 // Setup table descriptor 662 Table table = Mockito.mock(Table.class); 663 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 664 TableDescriptorBuilder tableDescriptorBuilder = 665 TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 666 667 Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor(); 668 for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) { 669 tableDescriptorBuilder.setColumnFamily(hcd); 670 } 671 672 // set up the table to return some mock keys 673 setupMockStartKeys(regionLocator); 674 675 try { 676 // partial map red setup to get an operational writer for testing 677 // We turn off the sequence file compression, because DefaultCodec 678 // pollutes the GZip codec pool with an incompatible compressor. 679 conf.set("io.seqfile.compression.type", "NONE"); 680 conf.set("hbase.fs.tmp.dir", dir.toString()); 681 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 682 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 683 684 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 685 job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("testColumnFamilySettings")); 686 setupRandomGeneratorMapper(job, false); 687 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 688 FileOutputFormat.setOutputPath(job, dir); 689 context = createTestTaskAttemptContext(job); 690 HFileOutputFormat2 hof = new HFileOutputFormat2(); 691 writer = hof.getRecordWriter(context); 692 693 // write out random rows 694 writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(), 695 ROWSPERSPLIT); 696 writer.close(context); 697 698 // Make sure that a directory was created for every CF 699 FileSystem fs = dir.getFileSystem(conf); 700 701 // commit so that the filesystem has one directory per column family 702 hof.getOutputCommitter(context).commitTask(context); 703 hof.getOutputCommitter(context).commitJob(context); 704 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 705 assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length); 706 for (FileStatus f : families) { 707 String familyStr = f.getPath().getName(); 708 ColumnFamilyDescriptor hcd = 709 tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr)); 710 // verify that the compression on this file matches the configured 711 // compression 712 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 713 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 714 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 715 716 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 717 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 718 assertEquals( 719 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 720 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 721 assertEquals( 722 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 723 hcd.getCompressionType(), reader.getFileContext().getCompression()); 724 } 725 } finally { 726 dir.getFileSystem(conf).delete(dir, true); 727 } 728 } 729 730 /** 731 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 732 * family descriptors 733 */ 734 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 735 TaskAttemptContext context, Set<byte[]> families, int numRows) 736 throws IOException, InterruptedException { 737 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 738 int valLength = 10; 739 byte valBytes[] = new byte[valLength]; 740 741 int taskId = context.getTaskAttemptID().getTaskID().getId(); 742 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 743 final byte[] qualifier = Bytes.toBytes("data"); 744 for (int i = 0; i < numRows; i++) { 745 Bytes.putInt(keyBytes, 0, i); 746 Bytes.random(valBytes); 747 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 748 for (byte[] family : families) { 749 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 750 writer.write(key, kv); 751 } 752 } 753 } 754 755 /** 756 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 757 * excluded from minor compaction. Without the fix of HBASE-6901, an 758 * ArrayIndexOutOfBoundsException will be thrown. 759 */ 760 @Ignore("Flakey: See HBASE-9051") 761 @Test 762 public void testExcludeAllFromMinorCompaction() throws Exception { 763 Configuration conf = UTIL.getConfiguration(); 764 conf.setInt("hbase.hstore.compaction.min", 2); 765 generateRandomStartKeys(5); 766 767 UTIL.startMiniCluster(); 768 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 769 Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES); 770 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 771 final FileSystem fs = UTIL.getDFSCluster().getFileSystem(); 772 assertEquals("Should start with empty table", 0, UTIL.countRows(table)); 773 774 // deep inspection: get the StoreFile dir 775 final Path storePath = 776 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 777 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 778 Bytes.toString(FAMILIES[0]))); 779 assertEquals(0, fs.listStatus(storePath).length); 780 781 // Generate two bulk load files 782 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 783 784 for (int i = 0; i < 2; i++) { 785 Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 786 runIncrementalPELoad(conf, 787 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), 788 conn.getRegionLocator(TABLE_NAMES[0]))), 789 testDir, false); 790 // Perform the actual load 791 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 792 } 793 794 // Ensure data shows up 795 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 796 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 797 UTIL.countRows(table)); 798 799 // should have a second StoreFile now 800 assertEquals(2, fs.listStatus(storePath).length); 801 802 // minor compactions shouldn't get rid of the file 803 admin.compact(TABLE_NAMES[0]); 804 try { 805 quickPoll(new Callable<Boolean>() { 806 @Override 807 public Boolean call() throws Exception { 808 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 809 for (HRegion region : regions) { 810 for (HStore store : region.getStores()) { 811 store.closeAndArchiveCompactedFiles(); 812 } 813 } 814 return fs.listStatus(storePath).length == 1; 815 } 816 }, 5000); 817 throw new IOException("SF# = " + fs.listStatus(storePath).length); 818 } catch (AssertionError ae) { 819 // this is expected behavior 820 } 821 822 // a major compaction should work though 823 admin.majorCompact(TABLE_NAMES[0]); 824 quickPoll(new Callable<Boolean>() { 825 @Override 826 public Boolean call() throws Exception { 827 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 828 for (HRegion region : regions) { 829 for (HStore store : region.getStores()) { 830 store.closeAndArchiveCompactedFiles(); 831 } 832 } 833 return fs.listStatus(storePath).length == 1; 834 } 835 }, 5000); 836 837 } finally { 838 UTIL.shutdownMiniCluster(); 839 } 840 } 841 842 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 843 @Test 844 public void testExcludeMinorCompaction() throws Exception { 845 Configuration conf = UTIL.getConfiguration(); 846 conf.setInt("hbase.hstore.compaction.min", 2); 847 generateRandomStartKeys(5); 848 849 UTIL.startMiniCluster(); 850 try (Connection conn = ConnectionFactory.createConnection(conf); 851 Admin admin = conn.getAdmin()) { 852 Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 853 final FileSystem fs = UTIL.getDFSCluster().getFileSystem(); 854 Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES); 855 assertEquals("Should start with empty table", 0, UTIL.countRows(table)); 856 857 // deep inspection: get the StoreFile dir 858 final Path storePath = 859 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 860 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 861 Bytes.toString(FAMILIES[0]))); 862 assertEquals(0, fs.listStatus(storePath).length); 863 864 // put some data in it and flush to create a storefile 865 Put p = new Put(Bytes.toBytes("test")); 866 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 867 table.put(p); 868 admin.flush(TABLE_NAMES[0]); 869 assertEquals(1, UTIL.countRows(table)); 870 quickPoll(new Callable<Boolean>() { 871 @Override 872 public Boolean call() throws Exception { 873 return fs.listStatus(storePath).length == 1; 874 } 875 }, 5000); 876 877 // Generate a bulk load file with more rows 878 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 879 880 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 881 runIncrementalPELoad(conf, 882 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)), 883 testDir, false); 884 885 // Perform the actual load 886 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 887 888 // Ensure data shows up 889 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 890 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1, 891 UTIL.countRows(table)); 892 893 // should have a second StoreFile now 894 assertEquals(2, fs.listStatus(storePath).length); 895 896 // minor compactions shouldn't get rid of the file 897 admin.compact(TABLE_NAMES[0]); 898 try { 899 quickPoll(new Callable<Boolean>() { 900 @Override 901 public Boolean call() throws Exception { 902 return fs.listStatus(storePath).length == 1; 903 } 904 }, 5000); 905 throw new IOException("SF# = " + fs.listStatus(storePath).length); 906 } catch (AssertionError ae) { 907 // this is expected behavior 908 } 909 910 // a major compaction should work though 911 admin.majorCompact(TABLE_NAMES[0]); 912 quickPoll(new Callable<Boolean>() { 913 @Override 914 public Boolean call() throws Exception { 915 return fs.listStatus(storePath).length == 1; 916 } 917 }, 5000); 918 919 } finally { 920 UTIL.shutdownMiniCluster(); 921 } 922 } 923 924 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 925 int sleepMs = 10; 926 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 927 while (retries-- > 0) { 928 if (c.call().booleanValue()) { 929 return; 930 } 931 Thread.sleep(sleepMs); 932 } 933 fail(); 934 } 935 936 public static void main(String args[]) throws Exception { 937 new TestHFileOutputFormat2().manualTest(args); 938 } 939 940 public void manualTest(String args[]) throws Exception { 941 Configuration conf = HBaseConfiguration.create(); 942 UTIL = new HBaseTestingUtil(conf); 943 if ("newtable".equals(args[0])) { 944 TableName tname = TableName.valueOf(args[1]); 945 byte[][] splitKeys = generateRandomSplitKeys(4); 946 Table table = UTIL.createTable(tname, FAMILIES, splitKeys); 947 } else if ("incremental".equals(args[0])) { 948 TableName tname = TableName.valueOf(args[1]); 949 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 950 RegionLocator regionLocator = c.getRegionLocator(tname)) { 951 Path outDir = new Path("incremental-out"); 952 runIncrementalPELoad(conf, 953 Arrays 954 .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)), 955 outDir, false); 956 } 957 } else { 958 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 959 } 960 } 961 962 @Test 963 public void testBlockStoragePolicy() throws Exception { 964 UTIL = new HBaseTestingUtil(); 965 Configuration conf = UTIL.getConfiguration(); 966 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 967 968 conf.set( 969 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 970 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 971 "ONE_SSD"); 972 Path cf1Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[0])); 973 Path cf2Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[1])); 974 UTIL.startMiniDFSCluster(3); 975 FileSystem fs = UTIL.getDFSCluster().getFileSystem(); 976 try { 977 fs.mkdirs(cf1Dir); 978 fs.mkdirs(cf2Dir); 979 980 // the original block storage policy would be HOT 981 String spA = getStoragePolicyName(fs, cf1Dir); 982 String spB = getStoragePolicyName(fs, cf2Dir); 983 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 984 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 985 assertEquals("HOT", spA); 986 assertEquals("HOT", spB); 987 988 // alter table cf schema to change storage policies 989 HFileOutputFormat2.configureStoragePolicy(conf, fs, 990 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 991 HFileOutputFormat2.configureStoragePolicy(conf, fs, 992 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 993 spA = getStoragePolicyName(fs, cf1Dir); 994 spB = getStoragePolicyName(fs, cf2Dir); 995 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 996 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 997 assertNotNull(spA); 998 assertEquals("ONE_SSD", spA); 999 assertNotNull(spB); 1000 assertEquals("ALL_SSD", spB); 1001 } finally { 1002 fs.delete(cf1Dir, true); 1003 fs.delete(cf2Dir, true); 1004 UTIL.shutdownMiniDFSCluster(); 1005 } 1006 } 1007 1008 private String getStoragePolicyName(FileSystem fs, Path path) { 1009 try { 1010 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1011 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1012 } catch (Exception e) { 1013 // Maybe fail because of using old HDFS version, try the old way 1014 if (LOG.isTraceEnabled()) { 1015 LOG.trace("Failed to get policy directly", e); 1016 } 1017 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1018 return policy == null ? "HOT" : policy;// HOT by default 1019 } 1020 } 1021 1022 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1023 try { 1024 if (fs instanceof DistributedFileSystem) { 1025 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1026 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1027 if (null != status) { 1028 byte storagePolicyId = status.getStoragePolicy(); 1029 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1030 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1031 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1032 for (BlockStoragePolicy policy : policies) { 1033 if (policy.getId() == storagePolicyId) { 1034 return policy.getName(); 1035 } 1036 } 1037 } 1038 } 1039 } 1040 } catch (Throwable e) { 1041 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1042 } 1043 1044 return null; 1045 } 1046 1047 @Test 1048 public void TestConfigureCompression() throws Exception { 1049 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 1050 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1051 TaskAttemptContext context = null; 1052 Path dir = UTIL.getDataTestDir("TestConfigureCompression"); 1053 String hfileoutputformatCompression = "gz"; 1054 1055 try { 1056 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1057 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1058 1059 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1060 1061 Job job = Job.getInstance(conf); 1062 FileOutputFormat.setOutputPath(job, dir); 1063 context = createTestTaskAttemptContext(job); 1064 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1065 writer = hof.getRecordWriter(context); 1066 final byte[] b = Bytes.toBytes("b"); 1067 1068 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1069 writer.write(new ImmutableBytesWritable(), kv); 1070 writer.close(context); 1071 writer = null; 1072 FileSystem fs = dir.getFileSystem(conf); 1073 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1074 while (iterator.hasNext()) { 1075 LocatedFileStatus keyFileStatus = iterator.next(); 1076 HFile.Reader reader = 1077 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1078 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1079 hfileoutputformatCompression); 1080 } 1081 } finally { 1082 if (writer != null && context != null) { 1083 writer.close(context); 1084 } 1085 dir.getFileSystem(conf).delete(dir, true); 1086 } 1087 1088 } 1089 1090 @Test 1091 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1092 // Start cluster A 1093 UTIL = new HBaseTestingUtil(); 1094 Configuration confA = UTIL.getConfiguration(); 1095 int hostCount = 3; 1096 int regionNum = 20; 1097 String[] hostnames = new String[hostCount]; 1098 for (int i = 0; i < hostCount; ++i) { 1099 hostnames[i] = "datanode_" + i; 1100 } 1101 StartTestingClusterOption option = StartTestingClusterOption.builder() 1102 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1103 UTIL.startMiniCluster(option); 1104 1105 // Start cluster B 1106 HBaseTestingUtil UTILB = new HBaseTestingUtil(); 1107 Configuration confB = UTILB.getConfiguration(); 1108 UTILB.startMiniCluster(option); 1109 1110 Path testDir = UTIL.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1111 1112 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1113 TableName tableName = TableName.valueOf("table"); 1114 // Create table in cluster B 1115 try (Table table = UTILB.createTable(tableName, FAMILIES, splitKeys); 1116 RegionLocator r = UTILB.getConnection().getRegionLocator(tableName)) { 1117 // Generate the bulk load files 1118 // Job has zookeeper configuration for cluster A 1119 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1120 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1121 Configuration jobConf = job.getConfiguration(); 1122 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1123 job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad")); 1124 setupRandomGeneratorMapper(job, false); 1125 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1126 1127 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1128 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1129 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1130 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1131 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1132 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1133 1134 String bSpecificConfigKey = "my.override.config.for.b"; 1135 String bSpecificConfigValue = "b-specific-value"; 1136 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1137 bSpecificConfigValue); 1138 1139 FileOutputFormat.setOutputPath(job, testDir); 1140 1141 assertFalse(UTIL.getTestFileSystem().exists(testDir)); 1142 1143 assertTrue(job.waitForCompletion(true)); 1144 1145 final List<Configuration> configs = 1146 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1147 1148 assertFalse(configs.isEmpty()); 1149 for (Configuration config : configs) { 1150 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1151 config.get(HConstants.ZOOKEEPER_QUORUM)); 1152 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1153 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1154 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1155 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1156 1157 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1158 } 1159 } finally { 1160 UTILB.deleteTable(tableName); 1161 testDir.getFileSystem(confA).delete(testDir, true); 1162 UTIL.shutdownMiniCluster(); 1163 UTILB.shutdownMiniCluster(); 1164 } 1165 } 1166 1167 private static class ConfigurationCaptorConnection implements Connection { 1168 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1169 1170 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1171 1172 private final Connection delegate; 1173 1174 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, 1175 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 1176 // here we do not use this registry, so close it... 1177 registry.close(); 1178 // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection 1179 // implementation in below method 1180 delegate = 1181 FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes)) 1182 .toConnection(); 1183 1184 final String uuid = conf.get(UUID_KEY); 1185 if (uuid != null) { 1186 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1187 } 1188 } 1189 1190 static UUID configureConnectionImpl(Configuration conf) { 1191 conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 1192 ConfigurationCaptorConnection.class, Connection.class); 1193 1194 final UUID uuid = UUID.randomUUID(); 1195 conf.set(UUID_KEY, uuid.toString()); 1196 return uuid; 1197 } 1198 1199 static List<Configuration> getCapturedConfigarutions(UUID key) { 1200 return confs.get(key); 1201 } 1202 1203 @Override 1204 public Configuration getConfiguration() { 1205 return delegate.getConfiguration(); 1206 } 1207 1208 @Override 1209 public Table getTable(TableName tableName) throws IOException { 1210 return delegate.getTable(tableName); 1211 } 1212 1213 @Override 1214 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1215 return delegate.getTable(tableName, pool); 1216 } 1217 1218 @Override 1219 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1220 return delegate.getBufferedMutator(tableName); 1221 } 1222 1223 @Override 1224 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1225 return delegate.getBufferedMutator(params); 1226 } 1227 1228 @Override 1229 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1230 return delegate.getRegionLocator(tableName); 1231 } 1232 1233 @Override 1234 public void clearRegionLocationCache() { 1235 delegate.clearRegionLocationCache(); 1236 } 1237 1238 @Override 1239 public Admin getAdmin() throws IOException { 1240 return delegate.getAdmin(); 1241 } 1242 1243 @Override 1244 public void close() throws IOException { 1245 delegate.close(); 1246 } 1247 1248 @Override 1249 public boolean isClosed() { 1250 return delegate.isClosed(); 1251 } 1252 1253 @Override 1254 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1255 return delegate.getTableBuilder(tableName, pool); 1256 } 1257 1258 @Override 1259 public AsyncConnection toAsyncConnection() { 1260 return delegate.toAsyncConnection(); 1261 } 1262 1263 @Override 1264 public String getClusterId() { 1265 return delegate.getClusterId(); 1266 } 1267 1268 @Override 1269 public Hbck getHbck() throws IOException { 1270 return delegate.getHbck(); 1271 } 1272 1273 @Override 1274 public Hbck getHbck(ServerName masterServer) throws IOException { 1275 return delegate.getHbck(masterServer); 1276 } 1277 1278 @Override 1279 public void abort(String why, Throwable e) { 1280 delegate.abort(why, e); 1281 } 1282 1283 @Override 1284 public boolean isAborted() { 1285 return delegate.isAborted(); 1286 } 1287 } 1288 1289}