001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertNotSame; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.junit.jupiter.api.Assertions.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Set; 037import java.util.UUID; 038import java.util.concurrent.Callable; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.CopyOnWriteArrayList; 041import java.util.concurrent.ExecutorService; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.LocatedFileStatus; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.fs.RemoteIterator; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellUtil; 051import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 052import org.apache.hadoop.hbase.ExtendedCell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HadoopShims; 057import org.apache.hadoop.hbase.KeyValue; 058import org.apache.hadoop.hbase.PrivateCellUtil; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.StartTestingClusterOption; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.Tag; 063import org.apache.hadoop.hbase.TagType; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncConnection; 066import org.apache.hadoop.hbase.client.BufferedMutator; 067import org.apache.hadoop.hbase.client.BufferedMutatorParams; 068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 070import org.apache.hadoop.hbase.client.Connection; 071import org.apache.hadoop.hbase.client.ConnectionFactory; 072import org.apache.hadoop.hbase.client.ConnectionRegistry; 073import org.apache.hadoop.hbase.client.ConnectionUtils; 074import org.apache.hadoop.hbase.client.Hbck; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionLocator; 077import org.apache.hadoop.hbase.client.Table; 078import org.apache.hadoop.hbase.client.TableBuilder; 079import org.apache.hadoop.hbase.client.TableDescriptor; 080import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 081import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 082import org.apache.hadoop.hbase.io.compress.Compression; 083import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 084import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 085import org.apache.hadoop.hbase.io.hfile.CacheConfig; 086import org.apache.hadoop.hbase.io.hfile.HFile; 087import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 088import org.apache.hadoop.hbase.io.hfile.HFileScanner; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.HRegion; 091import org.apache.hadoop.hbase.regionserver.HStore; 092import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 093import org.apache.hadoop.hbase.security.User; 094import org.apache.hadoop.hbase.testclassification.LargeTests; 095import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 096import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.CommonFSUtils; 099import org.apache.hadoop.hbase.util.FSUtils; 100import org.apache.hadoop.hbase.util.FutureUtils; 101import org.apache.hadoop.hbase.util.ReflectionUtils; 102import org.apache.hadoop.hdfs.DistributedFileSystem; 103import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 104import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 105import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 106import org.apache.hadoop.mapreduce.Job; 107import org.apache.hadoop.mapreduce.RecordWriter; 108import org.apache.hadoop.mapreduce.TaskAttemptContext; 109import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 110import org.junit.jupiter.api.Disabled; 111import org.junit.jupiter.api.Test; 112import org.mockito.Mockito; 113import org.slf4j.Logger; 114import org.slf4j.LoggerFactory; 115 116/** 117 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 118 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 119 * values. 120 */ 121@org.junit.jupiter.api.Tag(VerySlowMapReduceTests.TAG) 122@org.junit.jupiter.api.Tag(LargeTests.TAG) 123public class TestHFileOutputFormat2 extends HFileOutputFormat2TestBase { 124 125 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 126 127 /** 128 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 129 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 130 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 131 */ 132 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 133 @Test 134 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 135 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 136 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 137 TaskAttemptContext context = null; 138 Path dir = UTIL.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 139 try { 140 Job job = new Job(conf); 141 FileOutputFormat.setOutputPath(job, dir); 142 context = createTestTaskAttemptContext(job); 143 HFileOutputFormat2 hof = new HFileOutputFormat2(); 144 writer = hof.getRecordWriter(context); 145 final byte[] b = Bytes.toBytes("b"); 146 147 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 148 // changed by call to write. Check all in kv is same but ts. 149 KeyValue kv = new KeyValue(b, b, b); 150 KeyValue original = kv.clone(); 151 writer.write(new ImmutableBytesWritable(), kv); 152 assertFalse(original.equals(kv)); 153 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 154 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 155 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 156 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 157 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 158 159 // Test 2. Now test passing a kv that has explicit ts. It should not be 160 // changed by call to record write. 161 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 162 original = kv.clone(); 163 writer.write(new ImmutableBytesWritable(), kv); 164 assertTrue(original.equals(kv)); 165 } finally { 166 if (writer != null && context != null) writer.close(context); 167 dir.getFileSystem(conf).delete(dir, true); 168 } 169 } 170 171 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 172 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 173 TaskAttemptContext context = 174 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 175 return context; 176 } 177 178 /* 179 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 180 * time-restricted scans. 181 */ 182 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 183 @Test 184 public void test_TIMERANGE() throws Exception { 185 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 186 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 187 TaskAttemptContext context = null; 188 Path dir = UTIL.getDataTestDir("test_TIMERANGE_present"); 189 LOG.info("Timerange dir writing to dir: " + dir); 190 try { 191 // build a record writer using HFileOutputFormat2 192 Job job = new Job(conf); 193 FileOutputFormat.setOutputPath(job, dir); 194 context = createTestTaskAttemptContext(job); 195 HFileOutputFormat2 hof = new HFileOutputFormat2(); 196 writer = hof.getRecordWriter(context); 197 198 // Pass two key values with explicit times stamps 199 final byte[] b = Bytes.toBytes("b"); 200 201 // value 1 with timestamp 2000 202 KeyValue kv = new KeyValue(b, b, b, 2000, b); 203 KeyValue original = kv.clone(); 204 writer.write(new ImmutableBytesWritable(), kv); 205 assertEquals(original, kv); 206 207 // value 2 with timestamp 1000 208 kv = new KeyValue(b, b, b, 1000, b); 209 original = kv.clone(); 210 writer.write(new ImmutableBytesWritable(), kv); 211 assertEquals(original, kv); 212 213 // verify that the file has the proper FileInfo. 214 writer.close(context); 215 216 // the generated file lives 1 directory down from the attempt directory 217 // and is the only file, e.g. 218 // _attempt__0000_r_000000_0/b/1979617994050536795 219 FileSystem fs = FileSystem.get(conf); 220 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 221 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 222 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 223 224 // open as HFile Reader and pull out TIMERANGE FileInfo. 225 HFile.Reader rd = 226 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 227 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 228 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 229 assertNotNull(range); 230 231 // unmarshall and check values. 232 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 233 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 234 assertEquals(1000, timeRangeTracker.getMin()); 235 assertEquals(2000, timeRangeTracker.getMax()); 236 rd.close(); 237 } finally { 238 if (writer != null && context != null) writer.close(context); 239 dir.getFileSystem(conf).delete(dir, true); 240 } 241 } 242 243 /** 244 * Run small MR job. 245 */ 246 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 247 @Test 248 public void testWritingPEData() throws Exception { 249 Configuration conf = UTIL.getConfiguration(); 250 Path testDir = UTIL.getDataTestDirOnTestFS("testWritingPEData"); 251 FileSystem fs = testDir.getFileSystem(conf); 252 253 // Set down this value or we OOME in eclipse. 254 conf.setInt("mapreduce.task.io.sort.mb", 20); 255 // Write a few files. 256 long hregionMaxFilesize = 10 * 1024; 257 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 258 259 Job job = new Job(conf, "testWritingPEData"); 260 setupRandomGeneratorMapper(job, false); 261 // This partitioner doesn't work well for number keys but using it anyways 262 // just to demonstrate how to configure it. 263 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 264 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 265 266 Arrays.fill(startKey, (byte) 0); 267 Arrays.fill(endKey, (byte) 0xff); 268 269 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 270 // Set start and end rows for partitioner. 271 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 272 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 273 job.setReducerClass(CellSortReducer.class); 274 job.setOutputFormatClass(HFileOutputFormat2.class); 275 job.setNumReduceTasks(4); 276 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 277 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 278 CellSerialization.class.getName()); 279 280 FileOutputFormat.setOutputPath(job, testDir); 281 assertTrue(job.waitForCompletion(false)); 282 FileStatus[] files = fs.listStatus(testDir); 283 assertTrue(files.length > 0); 284 285 // check output file num and size. 286 for (byte[] family : FAMILIES) { 287 long kvCount = 0; 288 RemoteIterator<LocatedFileStatus> iterator = 289 fs.listFiles(testDir.suffix("/" + new String(family)), true); 290 while (iterator.hasNext()) { 291 LocatedFileStatus keyFileStatus = iterator.next(); 292 HFile.Reader reader = 293 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 294 HFileScanner scanner = reader.getScanner(conf, false, false, false); 295 296 kvCount += reader.getEntries(); 297 scanner.seekTo(); 298 long perKVSize = scanner.getCell().getSerializedSize(); 299 assertTrue(perKVSize * reader.getEntries() <= hregionMaxFilesize, 300 "Data size of each file should not be too large."); 301 } 302 assertEquals(ROWSPERSPLIT, kvCount, "Should write expected data in output file."); 303 } 304 } 305 306 /** 307 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 308 */ 309 @Test 310 public void test_WritingTagData() throws Exception { 311 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 312 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 313 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 314 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 315 TaskAttemptContext context = null; 316 Path dir = UTIL.getDataTestDir("WritingTagData"); 317 try { 318 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 319 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 320 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 321 Job job = new Job(conf); 322 FileOutputFormat.setOutputPath(job, dir); 323 context = createTestTaskAttemptContext(job); 324 HFileOutputFormat2 hof = new HFileOutputFormat2(); 325 writer = hof.getRecordWriter(context); 326 final byte[] b = Bytes.toBytes("b"); 327 328 List<Tag> tags = new ArrayList<>(); 329 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 330 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 331 writer.write(new ImmutableBytesWritable(), kv); 332 writer.close(context); 333 writer = null; 334 FileSystem fs = dir.getFileSystem(conf); 335 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 336 while (iterator.hasNext()) { 337 LocatedFileStatus keyFileStatus = iterator.next(); 338 HFile.Reader reader = 339 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 340 HFileScanner scanner = reader.getScanner(conf, false, false, false); 341 scanner.seekTo(); 342 ExtendedCell cell = scanner.getCell(); 343 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 344 assertTrue(tagsFromCell.size() > 0); 345 for (Tag tag : tagsFromCell) { 346 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 347 } 348 } 349 } finally { 350 if (writer != null && context != null) writer.close(context); 351 dir.getFileSystem(conf).delete(dir, true); 352 } 353 } 354 355 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 356 @Test 357 public void testJobConfiguration() throws Exception { 358 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 359 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 360 UTIL.getDataTestDir("testJobConfiguration").toString()); 361 Job job = new Job(conf); 362 job.setWorkingDirectory(UTIL.getDataTestDir("testJobConfiguration")); 363 Table table = Mockito.mock(Table.class); 364 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 365 setupMockStartKeys(regionLocator); 366 setupMockTableName(regionLocator); 367 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 368 assertEquals(4, job.getNumReduceTasks()); 369 } 370 371 /** 372 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 373 * family compression map is correctly serialized into and deserialized from configuration 374 */ 375 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 376 @Test 377 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 378 for (int numCfs = 0; numCfs <= 3; numCfs++) { 379 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 380 Map<String, Compression.Algorithm> familyToCompression = 381 getMockColumnFamiliesForCompression(numCfs); 382 Table table = Mockito.mock(Table.class); 383 setupMockColumnFamiliesForCompression(table, familyToCompression); 384 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 385 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 386 Arrays.asList(table.getDescriptor()))); 387 388 // read back family specific compression setting from the configuration 389 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 390 HFileOutputFormat2.createFamilyCompressionMap(conf); 391 392 // test that we have a value for all column families that matches with the 393 // used mock values 394 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 395 assertEquals(entry.getValue(), 396 retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())), 397 "Compression configuration incorrect for column family:" + entry.getKey()); 398 } 399 } 400 } 401 402 private void setupMockColumnFamiliesForCompression(Table table, 403 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 404 405 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 406 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 407 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 408 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 409 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 410 411 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 412 } 413 Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor(); 414 } 415 416 /** 417 * @return a map from column family names to compression algorithms for testing column family 418 * compression. Column family names have special characters 419 */ 420 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 421 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 422 // use column family names having special characters 423 if (numCfs-- > 0) { 424 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 425 } 426 if (numCfs-- > 0) { 427 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 428 } 429 if (numCfs-- > 0) { 430 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 431 } 432 if (numCfs-- > 0) { 433 familyToCompression.put("Family3", Compression.Algorithm.NONE); 434 } 435 return familyToCompression; 436 } 437 438 /** 439 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 440 * family bloom type map is correctly serialized into and deserialized from configuration 441 */ 442 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 443 @Test 444 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 445 for (int numCfs = 0; numCfs <= 2; numCfs++) { 446 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 447 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 448 Table table = Mockito.mock(Table.class); 449 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 450 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 451 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 452 Arrays.asList(table.getDescriptor()))); 453 454 // read back family specific data block encoding settings from the 455 // configuration 456 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 457 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 458 459 // test that we have a value for all column families that matches with the 460 // used mock values 461 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 462 assertEquals(entry.getValue(), 463 retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())), 464 "BloomType configuration incorrect for column family:" + entry.getKey()); 465 } 466 } 467 } 468 469 private void setupMockColumnFamiliesForBloomType(Table table, 470 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 471 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 472 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 473 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 474 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 475 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 476 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 477 } 478 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 479 } 480 481 /** 482 * @return a map from column family names to compression algorithms for testing column family 483 * compression. Column family names have special characters 484 */ 485 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 486 Map<String, BloomType> familyToBloomType = new HashMap<>(); 487 // use column family names having special characters 488 if (numCfs-- > 0) { 489 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 490 } 491 if (numCfs-- > 0) { 492 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 493 } 494 if (numCfs-- > 0) { 495 familyToBloomType.put("Family3", BloomType.NONE); 496 } 497 return familyToBloomType; 498 } 499 500 /** 501 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 502 * family block size map is correctly serialized into and deserialized from configuration 503 */ 504 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 505 @Test 506 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 507 for (int numCfs = 0; numCfs <= 3; numCfs++) { 508 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 509 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 510 Table table = Mockito.mock(Table.class); 511 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 512 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 513 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 514 Arrays.asList(table.getDescriptor()))); 515 516 // read back family specific data block encoding settings from the 517 // configuration 518 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 519 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 520 521 // test that we have a value for all column families that matches with the 522 // used mock values 523 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 524 assertEquals(entry.getValue(), 525 retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())), 526 "BlockSize configuration incorrect for column family:" + entry.getKey()); 527 } 528 } 529 } 530 531 private void setupMockColumnFamiliesForBlockSize(Table table, 532 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 533 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 534 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 535 ColumnFamilyDescriptor columnFamilyDescriptor = 536 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 537 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 538 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 539 } 540 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 541 } 542 543 /** 544 * @return a map from column family names to compression algorithms for testing column family 545 * compression. Column family names have special characters 546 */ 547 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 548 Map<String, Integer> familyToBlockSize = new HashMap<>(); 549 // use column family names having special characters 550 if (numCfs-- > 0) { 551 familyToBlockSize.put("Family1!@#!@#&", 1234); 552 } 553 if (numCfs-- > 0) { 554 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 555 } 556 if (numCfs-- > 0) { 557 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 558 } 559 if (numCfs-- > 0) { 560 familyToBlockSize.put("Family3", 0); 561 } 562 return familyToBlockSize; 563 } 564 565 /** 566 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 567 * the family data block encoding map is correctly serialized into and deserialized from 568 * configuration 569 */ 570 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 571 @Test 572 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 573 for (int numCfs = 0; numCfs <= 3; numCfs++) { 574 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 575 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 576 getMockColumnFamiliesForDataBlockEncoding(numCfs); 577 Table table = Mockito.mock(Table.class); 578 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 579 TableDescriptor tableDescriptor = table.getDescriptor(); 580 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 581 HFileOutputFormat2.serializeColumnFamilyAttribute( 582 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 583 584 // read back family specific data block encoding settings from the 585 // configuration 586 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 587 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 588 589 // test that we have a value for all column families that matches with the 590 // used mock values 591 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 592 assertEquals(entry.getValue(), 593 retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())), 594 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey()); 595 } 596 } 597 } 598 599 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 600 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 601 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 602 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 603 ColumnFamilyDescriptor columnFamilyDescriptor = 604 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 605 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0) 606 .build(); 607 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 608 } 609 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 610 } 611 612 /** 613 * @return a map from column family names to compression algorithms for testing column family 614 * compression. Column family names have special characters 615 */ 616 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 617 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 618 // use column family names having special characters 619 if (numCfs-- > 0) { 620 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 621 } 622 if (numCfs-- > 0) { 623 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 624 } 625 if (numCfs-- > 0) { 626 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 627 } 628 if (numCfs-- > 0) { 629 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 630 } 631 return familyToDataBlockEncoding; 632 } 633 634 private void setupMockStartKeys(RegionLocator table) throws IOException { 635 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 636 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 637 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 638 } 639 640 private void setupMockTableName(RegionLocator table) throws IOException { 641 TableName mockTableName = TableName.valueOf("mock_table"); 642 Mockito.doReturn(mockTableName).when(table).getName(); 643 } 644 645 /** 646 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 647 * from the column family descriptor 648 */ 649 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 650 @Test 651 public void testColumnFamilySettings() throws Exception { 652 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 653 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 654 TaskAttemptContext context = null; 655 Path dir = UTIL.getDataTestDir("testColumnFamilySettings"); 656 657 // Setup table descriptor 658 Table table = Mockito.mock(Table.class); 659 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 660 TableDescriptorBuilder tableDescriptorBuilder = 661 TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 662 663 Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor(); 664 for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) { 665 tableDescriptorBuilder.setColumnFamily(hcd); 666 } 667 668 // set up the table to return some mock keys 669 setupMockStartKeys(regionLocator); 670 671 try { 672 // partial map red setup to get an operational writer for testing 673 // We turn off the sequence file compression, because DefaultCodec 674 // pollutes the GZip codec pool with an incompatible compressor. 675 conf.set("io.seqfile.compression.type", "NONE"); 676 conf.set("hbase.fs.tmp.dir", dir.toString()); 677 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 678 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 679 680 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 681 job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("testColumnFamilySettings")); 682 setupRandomGeneratorMapper(job, false); 683 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 684 FileOutputFormat.setOutputPath(job, dir); 685 context = createTestTaskAttemptContext(job); 686 HFileOutputFormat2 hof = new HFileOutputFormat2(); 687 writer = hof.getRecordWriter(context); 688 689 // write out random rows 690 writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(), 691 ROWSPERSPLIT); 692 writer.close(context); 693 694 // Make sure that a directory was created for every CF 695 FileSystem fs = dir.getFileSystem(conf); 696 697 // commit so that the filesystem has one directory per column family 698 hof.getOutputCommitter(context).commitTask(context); 699 hof.getOutputCommitter(context).commitJob(context); 700 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 701 assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length); 702 for (FileStatus f : families) { 703 String familyStr = f.getPath().getName(); 704 ColumnFamilyDescriptor hcd = 705 tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr)); 706 // verify that the compression on this file matches the configured 707 // compression 708 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 709 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 710 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 711 712 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 713 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 714 assertEquals(hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)), 715 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader 716 + ")"); 717 assertEquals(hcd.getCompressionType(), reader.getFileContext().getCompression(), 718 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")"); 719 } 720 } finally { 721 dir.getFileSystem(conf).delete(dir, true); 722 } 723 } 724 725 /** 726 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 727 * family descriptors 728 */ 729 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 730 TaskAttemptContext context, Set<byte[]> families, int numRows) 731 throws IOException, InterruptedException { 732 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 733 int valLength = 10; 734 byte valBytes[] = new byte[valLength]; 735 736 int taskId = context.getTaskAttemptID().getTaskID().getId(); 737 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 738 final byte[] qualifier = Bytes.toBytes("data"); 739 for (int i = 0; i < numRows; i++) { 740 Bytes.putInt(keyBytes, 0, i); 741 Bytes.random(valBytes); 742 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 743 for (byte[] family : families) { 744 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 745 writer.write(key, kv); 746 } 747 } 748 } 749 750 /** 751 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 752 * excluded from minor compaction. Without the fix of HBASE-6901, an 753 * ArrayIndexOutOfBoundsException will be thrown. 754 */ 755 @Disabled("Flakey: See HBASE-9051") 756 @Test 757 public void testExcludeAllFromMinorCompaction() throws Exception { 758 Configuration conf = UTIL.getConfiguration(); 759 conf.setInt("hbase.hstore.compaction.min", 2); 760 generateRandomStartKeys(5); 761 762 UTIL.startMiniCluster(); 763 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 764 Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES); 765 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 766 final FileSystem fs = UTIL.getDFSCluster().getFileSystem(); 767 assertEquals(0, UTIL.countRows(table), "Should start with empty table"); 768 769 // deep inspection: get the StoreFile dir 770 final Path storePath = 771 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 772 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 773 Bytes.toString(FAMILIES[0]))); 774 assertEquals(0, fs.listStatus(storePath).length); 775 776 // Generate two bulk load files 777 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 778 779 for (int i = 0; i < 2; i++) { 780 Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 781 runIncrementalPELoad(conf, 782 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), 783 conn.getRegionLocator(TABLE_NAMES[0]))), 784 testDir, false); 785 // Perform the actual load 786 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 787 } 788 789 // Ensure data shows up 790 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 791 assertEquals(expectedRows, UTIL.countRows(table), 792 "BulkLoadHFiles should put expected data in table"); 793 794 // should have a second StoreFile now 795 assertEquals(2, fs.listStatus(storePath).length); 796 797 // minor compactions shouldn't get rid of the file 798 admin.compact(TABLE_NAMES[0]); 799 try { 800 quickPoll(new Callable<Boolean>() { 801 @Override 802 public Boolean call() throws Exception { 803 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 804 for (HRegion region : regions) { 805 for (HStore store : region.getStores()) { 806 store.closeAndArchiveCompactedFiles(); 807 } 808 } 809 return fs.listStatus(storePath).length == 1; 810 } 811 }, 5000); 812 throw new IOException("SF# = " + fs.listStatus(storePath).length); 813 } catch (AssertionError ae) { 814 // this is expected behavior 815 } 816 817 // a major compaction should work though 818 admin.majorCompact(TABLE_NAMES[0]); 819 quickPoll(new Callable<Boolean>() { 820 @Override 821 public Boolean call() throws Exception { 822 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 823 for (HRegion region : regions) { 824 for (HStore store : region.getStores()) { 825 store.closeAndArchiveCompactedFiles(); 826 } 827 } 828 return fs.listStatus(storePath).length == 1; 829 } 830 }, 5000); 831 832 } finally { 833 UTIL.shutdownMiniCluster(); 834 } 835 } 836 837 @Disabled("Goes zombie too frequently; needs work. See HBASE-14563") 838 @Test 839 public void testExcludeMinorCompaction() throws Exception { 840 Configuration conf = UTIL.getConfiguration(); 841 conf.setInt("hbase.hstore.compaction.min", 2); 842 generateRandomStartKeys(5); 843 844 UTIL.startMiniCluster(); 845 try (Connection conn = ConnectionFactory.createConnection(conf); 846 Admin admin = conn.getAdmin()) { 847 Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 848 final FileSystem fs = UTIL.getDFSCluster().getFileSystem(); 849 Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES); 850 assertEquals(0, UTIL.countRows(table), "Should start with empty table"); 851 852 // deep inspection: get the StoreFile dir 853 final Path storePath = 854 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 855 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 856 Bytes.toString(FAMILIES[0]))); 857 assertEquals(0, fs.listStatus(storePath).length); 858 859 // put some data in it and flush to create a storefile 860 Put p = new Put(Bytes.toBytes("test")); 861 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 862 table.put(p); 863 admin.flush(TABLE_NAMES[0]); 864 assertEquals(1, UTIL.countRows(table)); 865 quickPoll(new Callable<Boolean>() { 866 @Override 867 public Boolean call() throws Exception { 868 return fs.listStatus(storePath).length == 1; 869 } 870 }, 5000); 871 872 // Generate a bulk load file with more rows 873 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 874 875 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 876 runIncrementalPELoad(conf, 877 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)), 878 testDir, false); 879 880 // Perform the actual load 881 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 882 883 // Ensure data shows up 884 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 885 assertEquals(expectedRows + 1, UTIL.countRows(table), 886 "BulkLoadHFiles should put expected data in table"); 887 888 // should have a second StoreFile now 889 assertEquals(2, fs.listStatus(storePath).length); 890 891 // minor compactions shouldn't get rid of the file 892 admin.compact(TABLE_NAMES[0]); 893 try { 894 quickPoll(new Callable<Boolean>() { 895 @Override 896 public Boolean call() throws Exception { 897 return fs.listStatus(storePath).length == 1; 898 } 899 }, 5000); 900 throw new IOException("SF# = " + fs.listStatus(storePath).length); 901 } catch (AssertionError ae) { 902 // this is expected behavior 903 } 904 905 // a major compaction should work though 906 admin.majorCompact(TABLE_NAMES[0]); 907 quickPoll(new Callable<Boolean>() { 908 @Override 909 public Boolean call() throws Exception { 910 return fs.listStatus(storePath).length == 1; 911 } 912 }, 5000); 913 914 } finally { 915 UTIL.shutdownMiniCluster(); 916 } 917 } 918 919 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 920 int sleepMs = 10; 921 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 922 while (retries-- > 0) { 923 if (c.call().booleanValue()) { 924 return; 925 } 926 Thread.sleep(sleepMs); 927 } 928 fail(); 929 } 930 931 public static void main(String args[]) throws Exception { 932 new TestHFileOutputFormat2().manualTest(args); 933 } 934 935 public void manualTest(String args[]) throws Exception { 936 Configuration conf = HBaseConfiguration.create(); 937 UTIL = new HBaseTestingUtil(conf); 938 if ("newtable".equals(args[0])) { 939 TableName tname = TableName.valueOf(args[1]); 940 byte[][] splitKeys = generateRandomSplitKeys(4); 941 Table table = UTIL.createTable(tname, FAMILIES, splitKeys); 942 } else if ("incremental".equals(args[0])) { 943 TableName tname = TableName.valueOf(args[1]); 944 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 945 RegionLocator regionLocator = c.getRegionLocator(tname)) { 946 Path outDir = new Path("incremental-out"); 947 runIncrementalPELoad(conf, 948 Arrays 949 .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)), 950 outDir, false); 951 } 952 } else { 953 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 954 } 955 } 956 957 @Test 958 public void testBlockStoragePolicy() throws Exception { 959 UTIL = new HBaseTestingUtil(); 960 Configuration conf = UTIL.getConfiguration(); 961 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 962 963 conf.set( 964 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 965 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 966 "ONE_SSD"); 967 Path cf1Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[0])); 968 Path cf2Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[1])); 969 UTIL.startMiniDFSCluster(3); 970 FileSystem fs = UTIL.getDFSCluster().getFileSystem(); 971 try { 972 fs.mkdirs(cf1Dir); 973 fs.mkdirs(cf2Dir); 974 975 // the original block storage policy would be HOT 976 String spA = getStoragePolicyName(fs, cf1Dir); 977 String spB = getStoragePolicyName(fs, cf2Dir); 978 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 979 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 980 assertEquals("HOT", spA); 981 assertEquals("HOT", spB); 982 983 // alter table cf schema to change storage policies 984 HFileOutputFormat2.configureStoragePolicy(conf, fs, 985 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 986 HFileOutputFormat2.configureStoragePolicy(conf, fs, 987 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 988 spA = getStoragePolicyName(fs, cf1Dir); 989 spB = getStoragePolicyName(fs, cf2Dir); 990 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 991 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 992 assertNotNull(spA); 993 assertEquals("ONE_SSD", spA); 994 assertNotNull(spB); 995 assertEquals("ALL_SSD", spB); 996 } finally { 997 fs.delete(cf1Dir, true); 998 fs.delete(cf2Dir, true); 999 UTIL.shutdownMiniDFSCluster(); 1000 } 1001 } 1002 1003 private String getStoragePolicyName(FileSystem fs, Path path) { 1004 try { 1005 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1006 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1007 } catch (Exception e) { 1008 // Maybe fail because of using old HDFS version, try the old way 1009 if (LOG.isTraceEnabled()) { 1010 LOG.trace("Failed to get policy directly", e); 1011 } 1012 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1013 return policy == null ? "HOT" : policy;// HOT by default 1014 } 1015 } 1016 1017 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1018 try { 1019 if (fs instanceof DistributedFileSystem) { 1020 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1021 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1022 if (null != status) { 1023 byte storagePolicyId = status.getStoragePolicy(); 1024 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1025 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1026 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1027 for (BlockStoragePolicy policy : policies) { 1028 if (policy.getId() == storagePolicyId) { 1029 return policy.getName(); 1030 } 1031 } 1032 } 1033 } 1034 } 1035 } catch (Throwable e) { 1036 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1037 } 1038 1039 return null; 1040 } 1041 1042 @Test 1043 public void TestConfigureCompression() throws Exception { 1044 Configuration conf = new Configuration(this.UTIL.getConfiguration()); 1045 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1046 TaskAttemptContext context = null; 1047 Path dir = UTIL.getDataTestDir("TestConfigureCompression"); 1048 String hfileoutputformatCompression = "gz"; 1049 1050 try { 1051 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1052 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1053 1054 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1055 1056 Job job = Job.getInstance(conf); 1057 FileOutputFormat.setOutputPath(job, dir); 1058 context = createTestTaskAttemptContext(job); 1059 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1060 writer = hof.getRecordWriter(context); 1061 final byte[] b = Bytes.toBytes("b"); 1062 1063 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1064 writer.write(new ImmutableBytesWritable(), kv); 1065 writer.close(context); 1066 writer = null; 1067 FileSystem fs = dir.getFileSystem(conf); 1068 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1069 while (iterator.hasNext()) { 1070 LocatedFileStatus keyFileStatus = iterator.next(); 1071 HFile.Reader reader = 1072 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1073 assertEquals(hfileoutputformatCompression, 1074 reader.getTrailer().getCompressionCodec().getName()); 1075 } 1076 } finally { 1077 if (writer != null && context != null) { 1078 writer.close(context); 1079 } 1080 dir.getFileSystem(conf).delete(dir, true); 1081 } 1082 1083 } 1084 1085 @Test 1086 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1087 // Start cluster A 1088 UTIL = new HBaseTestingUtil(); 1089 Configuration confA = UTIL.getConfiguration(); 1090 int hostCount = 3; 1091 int regionNum = 20; 1092 String[] hostnames = new String[hostCount]; 1093 for (int i = 0; i < hostCount; ++i) { 1094 hostnames[i] = "datanode_" + i; 1095 } 1096 StartTestingClusterOption option = StartTestingClusterOption.builder() 1097 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1098 UTIL.startMiniCluster(option); 1099 1100 // Start cluster B 1101 HBaseTestingUtil UTILB = new HBaseTestingUtil(); 1102 Configuration confB = UTILB.getConfiguration(); 1103 UTILB.startMiniCluster(option); 1104 1105 Path testDir = UTIL.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1106 1107 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1108 TableName tableName = TableName.valueOf("table"); 1109 // Create table in cluster B 1110 try (Table table = UTILB.createTable(tableName, FAMILIES, splitKeys); 1111 RegionLocator r = UTILB.getConnection().getRegionLocator(tableName)) { 1112 // Generate the bulk load files 1113 // Job has zookeeper configuration for cluster A 1114 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1115 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1116 Configuration jobConf = job.getConfiguration(); 1117 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1118 job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad")); 1119 setupRandomGeneratorMapper(job, false); 1120 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1121 1122 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1123 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1124 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1125 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1126 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1127 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1128 1129 String bSpecificConfigKey = "my.override.config.for.b"; 1130 String bSpecificConfigValue = "b-specific-value"; 1131 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1132 bSpecificConfigValue); 1133 1134 FileOutputFormat.setOutputPath(job, testDir); 1135 1136 assertFalse(UTIL.getTestFileSystem().exists(testDir)); 1137 1138 assertTrue(job.waitForCompletion(true)); 1139 1140 final List<Configuration> configs = 1141 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1142 1143 assertFalse(configs.isEmpty()); 1144 for (Configuration config : configs) { 1145 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1146 config.get(HConstants.ZOOKEEPER_QUORUM)); 1147 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1148 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1149 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1150 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1151 1152 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1153 } 1154 } finally { 1155 UTILB.deleteTable(tableName); 1156 testDir.getFileSystem(confA).delete(testDir, true); 1157 UTIL.shutdownMiniCluster(); 1158 UTILB.shutdownMiniCluster(); 1159 } 1160 } 1161 1162 private static class ConfigurationCaptorConnection implements Connection { 1163 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1164 1165 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1166 1167 private final Connection delegate; 1168 1169 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, 1170 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 1171 // here we do not use this registry, so close it... 1172 registry.close(); 1173 // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection 1174 // implementation in below method 1175 delegate = 1176 FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes)) 1177 .toConnection(); 1178 1179 final String uuid = conf.get(UUID_KEY); 1180 if (uuid != null) { 1181 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1182 } 1183 } 1184 1185 static UUID configureConnectionImpl(Configuration conf) { 1186 conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 1187 ConfigurationCaptorConnection.class, Connection.class); 1188 1189 final UUID uuid = UUID.randomUUID(); 1190 conf.set(UUID_KEY, uuid.toString()); 1191 return uuid; 1192 } 1193 1194 static List<Configuration> getCapturedConfigarutions(UUID key) { 1195 return confs.get(key); 1196 } 1197 1198 @Override 1199 public Configuration getConfiguration() { 1200 return delegate.getConfiguration(); 1201 } 1202 1203 @Override 1204 public Table getTable(TableName tableName) throws IOException { 1205 return delegate.getTable(tableName); 1206 } 1207 1208 @Override 1209 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1210 return delegate.getTable(tableName, pool); 1211 } 1212 1213 @Override 1214 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1215 return delegate.getBufferedMutator(tableName); 1216 } 1217 1218 @Override 1219 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1220 return delegate.getBufferedMutator(params); 1221 } 1222 1223 @Override 1224 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1225 return delegate.getRegionLocator(tableName); 1226 } 1227 1228 @Override 1229 public void clearRegionLocationCache() { 1230 delegate.clearRegionLocationCache(); 1231 } 1232 1233 @Override 1234 public Admin getAdmin() throws IOException { 1235 return delegate.getAdmin(); 1236 } 1237 1238 @Override 1239 public void close() throws IOException { 1240 delegate.close(); 1241 } 1242 1243 @Override 1244 public boolean isClosed() { 1245 return delegate.isClosed(); 1246 } 1247 1248 @Override 1249 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1250 return delegate.getTableBuilder(tableName, pool); 1251 } 1252 1253 @Override 1254 public AsyncConnection toAsyncConnection() { 1255 return delegate.toAsyncConnection(); 1256 } 1257 1258 @Override 1259 public String getClusterId() { 1260 return delegate.getClusterId(); 1261 } 1262 1263 @Override 1264 public Hbck getHbck() throws IOException { 1265 return delegate.getHbck(); 1266 } 1267 1268 @Override 1269 public Hbck getHbck(ServerName masterServer) throws IOException { 1270 return delegate.getHbck(masterServer); 1271 } 1272 1273 @Override 1274 public void abort(String why, Throwable e) { 1275 delegate.abort(why, e); 1276 } 1277 1278 @Override 1279 public boolean isAborted() { 1280 return delegate.isAborted(); 1281 } 1282 } 1283 1284}