001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Random; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.LocatedFileStatus; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.RemoteIterator; 047import org.apache.hadoop.hbase.ArrayBackedTag; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HDFSBlocksDistribution; 057import org.apache.hadoop.hbase.HTableDescriptor; 058import org.apache.hadoop.hbase.HadoopShims; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.PerformanceEvaluation; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.StartMiniClusterOption; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.Tag; 065import org.apache.hadoop.hbase.TagType; 066import org.apache.hadoop.hbase.client.Admin; 067import org.apache.hadoop.hbase.client.Connection; 068import org.apache.hadoop.hbase.client.ConnectionFactory; 069import org.apache.hadoop.hbase.client.Put; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.Table; 075import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 076import org.apache.hadoop.hbase.io.compress.Compression; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 079import org.apache.hadoop.hbase.io.hfile.CacheConfig; 080import org.apache.hadoop.hbase.io.hfile.HFile; 081import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.regionserver.BloomType; 084import org.apache.hadoop.hbase.regionserver.HRegion; 085import org.apache.hadoop.hbase.regionserver.HStore; 086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 088import org.apache.hadoop.hbase.testclassification.LargeTests; 089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.CommonFSUtils; 093import org.apache.hadoop.hbase.util.FSUtils; 094import org.apache.hadoop.hbase.util.ReflectionUtils; 095import org.apache.hadoop.hdfs.DistributedFileSystem; 096import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 097import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 098import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 099import org.apache.hadoop.io.NullWritable; 100import org.apache.hadoop.mapreduce.Job; 101import org.apache.hadoop.mapreduce.Mapper; 102import org.apache.hadoop.mapreduce.RecordWriter; 103import org.apache.hadoop.mapreduce.TaskAttemptContext; 104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 105import org.junit.ClassRule; 106import org.junit.Ignore; 107import org.junit.Test; 108import org.junit.experimental.categories.Category; 109import org.mockito.Mockito; 110import org.slf4j.Logger; 111import org.slf4j.LoggerFactory; 112 113/** 114 * Simple test for {@link HFileOutputFormat2}. 115 * Sets up and runs a mapreduce job that writes hfile output. 116 * Creates a few inner classes to implement splits and an inputformat that 117 * emits keys and values like those of {@link PerformanceEvaluation}. 118 */ 119@Category({VerySlowMapReduceTests.class, LargeTests.class}) 120//TODO : Remove this in 3.0 121public class TestHFileOutputFormat2 { 122 123 @ClassRule 124 public static final HBaseClassTestRule CLASS_RULE = 125 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 126 127 private final static int ROWSPERSPLIT = 1024; 128 129 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 130 private static final byte[][] FAMILIES = { 131 Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))}; 132 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", 133 "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); 134 135 private HBaseTestingUtility util = new HBaseTestingUtility(); 136 137 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 138 139 /** 140 * Simple mapper that makes KeyValue output. 141 */ 142 static class RandomKVGeneratingMapper 143 extends Mapper<NullWritable, NullWritable, 144 ImmutableBytesWritable, Cell> { 145 146 private int keyLength; 147 private static final int KEYLEN_DEFAULT=10; 148 private static final String KEYLEN_CONF="randomkv.key.length"; 149 150 private int valLength; 151 private static final int VALLEN_DEFAULT=10; 152 private static final String VALLEN_CONF="randomkv.val.length"; 153 private static final byte [] QUALIFIER = Bytes.toBytes("data"); 154 private boolean multiTableMapper = false; 155 private TableName[] tables = null; 156 157 158 @Override 159 protected void setup(Context context) throws IOException, 160 InterruptedException { 161 super.setup(context); 162 163 Configuration conf = context.getConfiguration(); 164 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 165 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 166 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 167 false); 168 if (multiTableMapper) { 169 tables = TABLE_NAMES; 170 } else { 171 tables = new TableName[]{TABLE_NAMES[0]}; 172 } 173 } 174 175 @Override 176 protected void map( 177 NullWritable n1, NullWritable n2, 178 Mapper<NullWritable, NullWritable, 179 ImmutableBytesWritable,Cell>.Context context) 180 throws java.io.IOException ,InterruptedException 181 { 182 183 byte keyBytes[] = new byte[keyLength]; 184 byte valBytes[] = new byte[valLength]; 185 186 int taskId = context.getTaskAttemptID().getTaskID().getId(); 187 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 188 Random random = new Random(); 189 byte[] key; 190 for (int j = 0; j < tables.length; ++j) { 191 for (int i = 0; i < ROWSPERSPLIT; i++) { 192 random.nextBytes(keyBytes); 193 // Ensure that unique tasks generate unique keys 194 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 195 random.nextBytes(valBytes); 196 key = keyBytes; 197 if (multiTableMapper) { 198 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 199 } 200 201 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 202 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 203 context.write(new ImmutableBytesWritable(key), kv); 204 } 205 } 206 } 207 } 208 } 209 210 /** 211 * Simple mapper that makes Put output. 212 */ 213 static class RandomPutGeneratingMapper 214 extends Mapper<NullWritable, NullWritable, 215 ImmutableBytesWritable, Put> { 216 217 private int keyLength; 218 private static final int KEYLEN_DEFAULT = 10; 219 private static final String KEYLEN_CONF = "randomkv.key.length"; 220 221 private int valLength; 222 private static final int VALLEN_DEFAULT = 10; 223 private static final String VALLEN_CONF = "randomkv.val.length"; 224 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 225 private boolean multiTableMapper = false; 226 private TableName[] tables = null; 227 228 @Override 229 protected void setup(Context context) throws IOException, 230 InterruptedException { 231 super.setup(context); 232 233 Configuration conf = context.getConfiguration(); 234 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 235 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 236 multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 237 false); 238 if (multiTableMapper) { 239 tables = TABLE_NAMES; 240 } else { 241 tables = new TableName[]{TABLE_NAMES[0]}; 242 } 243 } 244 245 @Override 246 protected void map( 247 NullWritable n1, NullWritable n2, 248 Mapper<NullWritable, NullWritable, 249 ImmutableBytesWritable, Put>.Context context) 250 throws java.io.IOException, InterruptedException { 251 252 byte keyBytes[] = new byte[keyLength]; 253 byte valBytes[] = new byte[valLength]; 254 255 int taskId = context.getTaskAttemptID().getTaskID().getId(); 256 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 257 258 Random random = new Random(); 259 byte[] key; 260 for (int j = 0; j < tables.length; ++j) { 261 for (int i = 0; i < ROWSPERSPLIT; i++) { 262 random.nextBytes(keyBytes); 263 // Ensure that unique tasks generate unique keys 264 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 265 random.nextBytes(valBytes); 266 key = keyBytes; 267 if (multiTableMapper) { 268 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 269 } 270 271 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 272 Put p = new Put(keyBytes); 273 p.addColumn(family, QUALIFIER, valBytes); 274 // set TTL to very low so that the scan does not return any value 275 p.setTTL(1l); 276 context.write(new ImmutableBytesWritable(key), p); 277 } 278 } 279 } 280 } 281 } 282 283 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 284 if (putSortReducer) { 285 job.setInputFormatClass(NMapInputFormat.class); 286 job.setMapperClass(RandomPutGeneratingMapper.class); 287 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 288 job.setMapOutputValueClass(Put.class); 289 } else { 290 job.setInputFormatClass(NMapInputFormat.class); 291 job.setMapperClass(RandomKVGeneratingMapper.class); 292 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 293 job.setMapOutputValueClass(KeyValue.class); 294 } 295 } 296 297 /** 298 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if 299 * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. 300 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 301 */ 302 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 303 public void test_LATEST_TIMESTAMP_isReplaced() 304 throws Exception { 305 Configuration conf = new Configuration(this.util.getConfiguration()); 306 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 307 TaskAttemptContext context = null; 308 Path dir = 309 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 310 try { 311 Job job = new Job(conf); 312 FileOutputFormat.setOutputPath(job, dir); 313 context = createTestTaskAttemptContext(job); 314 HFileOutputFormat2 hof = new HFileOutputFormat2(); 315 writer = hof.getRecordWriter(context); 316 final byte [] b = Bytes.toBytes("b"); 317 318 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 319 // changed by call to write. Check all in kv is same but ts. 320 KeyValue kv = new KeyValue(b, b, b); 321 KeyValue original = kv.clone(); 322 writer.write(new ImmutableBytesWritable(), kv); 323 assertFalse(original.equals(kv)); 324 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 325 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 326 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 327 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 328 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 329 330 // Test 2. Now test passing a kv that has explicit ts. It should not be 331 // changed by call to record write. 332 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 333 original = kv.clone(); 334 writer.write(new ImmutableBytesWritable(), kv); 335 assertTrue(original.equals(kv)); 336 } finally { 337 if (writer != null && context != null) writer.close(context); 338 dir.getFileSystem(conf).delete(dir, true); 339 } 340 } 341 342 private TaskAttemptContext createTestTaskAttemptContext(final Job job) 343 throws Exception { 344 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 345 TaskAttemptContext context = hadoop.createTestTaskAttemptContext( 346 job, "attempt_201402131733_0001_m_000000_0"); 347 return context; 348 } 349 350 /* 351 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE 352 * metadata used by time-restricted scans. 353 */ 354 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 355 public void test_TIMERANGE() throws Exception { 356 Configuration conf = new Configuration(this.util.getConfiguration()); 357 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 358 TaskAttemptContext context = null; 359 Path dir = 360 util.getDataTestDir("test_TIMERANGE_present"); 361 LOG.info("Timerange dir writing to dir: "+ dir); 362 try { 363 // build a record writer using HFileOutputFormat2 364 Job job = new Job(conf); 365 FileOutputFormat.setOutputPath(job, dir); 366 context = createTestTaskAttemptContext(job); 367 HFileOutputFormat2 hof = new HFileOutputFormat2(); 368 writer = hof.getRecordWriter(context); 369 370 // Pass two key values with explicit times stamps 371 final byte [] b = Bytes.toBytes("b"); 372 373 // value 1 with timestamp 2000 374 KeyValue kv = new KeyValue(b, b, b, 2000, b); 375 KeyValue original = kv.clone(); 376 writer.write(new ImmutableBytesWritable(), kv); 377 assertEquals(original,kv); 378 379 // value 2 with timestamp 1000 380 kv = new KeyValue(b, b, b, 1000, b); 381 original = kv.clone(); 382 writer.write(new ImmutableBytesWritable(), kv); 383 assertEquals(original, kv); 384 385 // verify that the file has the proper FileInfo. 386 writer.close(context); 387 388 // the generated file lives 1 directory down from the attempt directory 389 // and is the only file, e.g. 390 // _attempt__0000_r_000000_0/b/1979617994050536795 391 FileSystem fs = FileSystem.get(conf); 392 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 393 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 394 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 395 396 // open as HFile Reader and pull out TIMERANGE FileInfo. 397 HFile.Reader rd = 398 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 399 Map<byte[],byte[]> finfo = rd.getHFileInfo(); 400 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 401 assertNotNull(range); 402 403 // unmarshall and check values. 404 TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range); 405 LOG.info(timeRangeTracker.getMin() + 406 "...." + timeRangeTracker.getMax()); 407 assertEquals(1000, timeRangeTracker.getMin()); 408 assertEquals(2000, timeRangeTracker.getMax()); 409 rd.close(); 410 } finally { 411 if (writer != null && context != null) writer.close(context); 412 dir.getFileSystem(conf).delete(dir, true); 413 } 414 } 415 416 /** 417 * Run small MR job. 418 */ 419 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 420 public void testWritingPEData() throws Exception { 421 Configuration conf = util.getConfiguration(); 422 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 423 FileSystem fs = testDir.getFileSystem(conf); 424 425 // Set down this value or we OOME in eclipse. 426 conf.setInt("mapreduce.task.io.sort.mb", 20); 427 // Write a few files. 428 long hregionMaxFilesize = 10 * 1024; 429 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 430 431 Job job = new Job(conf, "testWritingPEData"); 432 setupRandomGeneratorMapper(job, false); 433 // This partitioner doesn't work well for number keys but using it anyways 434 // just to demonstrate how to configure it. 435 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 436 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 437 438 Arrays.fill(startKey, (byte)0); 439 Arrays.fill(endKey, (byte)0xff); 440 441 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 442 // Set start and end rows for partitioner. 443 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 444 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 445 job.setReducerClass(KeyValueSortReducer.class); 446 job.setOutputFormatClass(HFileOutputFormat2.class); 447 job.setNumReduceTasks(4); 448 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 449 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 450 KeyValueSerialization.class.getName()); 451 452 FileOutputFormat.setOutputPath(job, testDir); 453 assertTrue(job.waitForCompletion(false)); 454 FileStatus [] files = fs.listStatus(testDir); 455 assertTrue(files.length > 0); 456 457 //check output file num and size. 458 for (byte[] family : FAMILIES) { 459 long kvCount= 0; 460 RemoteIterator<LocatedFileStatus> iterator = 461 fs.listFiles(testDir.suffix("/" + new String(family)), true); 462 while (iterator.hasNext()) { 463 LocatedFileStatus keyFileStatus = iterator.next(); 464 HFile.Reader reader = 465 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 466 HFileScanner scanner = reader.getScanner(false, false, false); 467 468 kvCount += reader.getEntries(); 469 scanner.seekTo(); 470 long perKVSize = scanner.getCell().getSerializedSize(); 471 assertTrue("Data size of each file should not be too large.", 472 perKVSize * reader.getEntries() <= hregionMaxFilesize); 473 } 474 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 475 } 476 } 477 478 /** 479 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into 480 * hfile. 481 */ 482 @Test 483 public void test_WritingTagData() 484 throws Exception { 485 Configuration conf = new Configuration(this.util.getConfiguration()); 486 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 487 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 488 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 489 TaskAttemptContext context = null; 490 Path dir = 491 util.getDataTestDir("WritingTagData"); 492 try { 493 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 494 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 495 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 496 Job job = new Job(conf); 497 FileOutputFormat.setOutputPath(job, dir); 498 context = createTestTaskAttemptContext(job); 499 HFileOutputFormat2 hof = new HFileOutputFormat2(); 500 writer = hof.getRecordWriter(context); 501 final byte [] b = Bytes.toBytes("b"); 502 503 List< Tag > tags = new ArrayList<>(); 504 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 505 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 506 writer.write(new ImmutableBytesWritable(), kv); 507 writer.close(context); 508 writer = null; 509 FileSystem fs = dir.getFileSystem(conf); 510 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 511 while(iterator.hasNext()) { 512 LocatedFileStatus keyFileStatus = iterator.next(); 513 HFile.Reader reader = 514 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 515 HFileScanner scanner = reader.getScanner(false, false, false); 516 scanner.seekTo(); 517 Cell cell = scanner.getCell(); 518 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 519 assertTrue(tagsFromCell.size() > 0); 520 for (Tag tag : tagsFromCell) { 521 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 522 } 523 } 524 } finally { 525 if (writer != null && context != null) writer.close(context); 526 dir.getFileSystem(conf).delete(dir, true); 527 } 528 } 529 530 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 531 public void testJobConfiguration() throws Exception { 532 Configuration conf = new Configuration(this.util.getConfiguration()); 533 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration") 534 .toString()); 535 Job job = new Job(conf); 536 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 537 Table table = Mockito.mock(Table.class); 538 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 539 setupMockStartKeys(regionLocator); 540 setupMockTableName(regionLocator); 541 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 542 assertEquals(job.getNumReduceTasks(), 4); 543 } 544 545 private byte [][] generateRandomStartKeys(int numKeys) { 546 Random random = new Random(); 547 byte[][] ret = new byte[numKeys][]; 548 // first region start key is always empty 549 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 550 for (int i = 1; i < numKeys; i++) { 551 ret[i] = 552 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 553 } 554 return ret; 555 } 556 557 private byte[][] generateRandomSplitKeys(int numKeys) { 558 Random random = new Random(); 559 byte[][] ret = new byte[numKeys][]; 560 for (int i = 0; i < numKeys; i++) { 561 ret[i] = 562 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 563 } 564 return ret; 565 } 566 567 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 568 public void testMRIncrementalLoad() throws Exception { 569 LOG.info("\nStarting test testMRIncrementalLoad\n"); 570 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 571 } 572 573 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 574 public void testMRIncrementalLoadWithSplit() throws Exception { 575 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 576 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 577 } 578 579 /** 580 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true 581 * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY 582 * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports), 583 * it's not possible to check the region locality by comparing region locations and DN hostnames. 584 * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does), 585 * we could test region locality features more easily. 586 */ 587 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 588 public void testMRIncrementalLoadWithLocality() throws Exception { 589 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 590 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 591 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 592 } 593 594 //@Ignore("Wahtevs") 595 @Test 596 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 597 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 598 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 599 } 600 601 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 602 boolean putSortReducer, String tableStr) throws Exception { 603 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 604 Arrays.asList(tableStr)); 605 } 606 607 @Test 608 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 609 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 610 doIncrementalLoadTest(false, false, true, 611 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList 612 ())); 613 } 614 615 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 616 boolean putSortReducer, List<String> tableStr) throws Exception { 617 util = new HBaseTestingUtility(); 618 Configuration conf = util.getConfiguration(); 619 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 620 int hostCount = 1; 621 int regionNum = 5; 622 if (shouldKeepLocality) { 623 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 624 // explicit hostnames parameter just like MiniDFSCluster does. 625 hostCount = 3; 626 regionNum = 20; 627 } 628 629 String[] hostnames = new String[hostCount]; 630 for (int i = 0; i < hostCount; ++i) { 631 hostnames[i] = "datanode_" + i; 632 } 633 StartMiniClusterOption option = StartMiniClusterOption.builder() 634 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 635 util.startMiniCluster(option); 636 637 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 638 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 639 boolean writeMultipleTables = tableStr.size() > 1; 640 for (String tableStrSingle : tableStr) { 641 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 642 TableName tableName = TableName.valueOf(tableStrSingle); 643 Table table = util.createTable(tableName, FAMILIES, splitKeys); 644 645 RegionLocator r = util.getConnection().getRegionLocator(tableName); 646 assertEquals("Should start with empty table", 0, util.countRows(table)); 647 int numRegions = r.getStartKeys().length; 648 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 649 650 allTables.put(tableStrSingle, table); 651 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 652 } 653 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 654 // Generate the bulk load files 655 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 656 657 for (Table tableSingle : allTables.values()) { 658 // This doesn't write into the table, just makes files 659 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 660 } 661 int numTableDirs = 0; 662 for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { 663 Path tablePath = testDir; 664 665 if (writeMultipleTables) { 666 if (allTables.containsKey(tf.getPath().getName())) { 667 ++numTableDirs; 668 tablePath = tf.getPath(); 669 } 670 else { 671 continue; 672 } 673 } 674 675 // Make sure that a directory was created for every CF 676 int dir = 0; 677 for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { 678 for (byte[] family : FAMILIES) { 679 if (Bytes.toString(family).equals(f.getPath().getName())) { 680 ++dir; 681 } 682 } 683 } 684 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 685 } 686 if (writeMultipleTables) { 687 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 688 } 689 690 Admin admin = util.getConnection().getAdmin(); 691 try { 692 // handle the split case 693 if (shouldChangeRegions) { 694 Table chosenTable = allTables.values().iterator().next(); 695 // Choose a semi-random table if multiple tables are available 696 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 697 admin.disableTable(chosenTable.getName()); 698 util.waitUntilNoRegionsInTransition(); 699 700 util.deleteTable(chosenTable.getName()); 701 byte[][] newSplitKeys = generateRandomSplitKeys(14); 702 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 703 704 while (util.getConnection().getRegionLocator(chosenTable.getName()) 705 .getAllRegionLocations().size() != 15 || 706 !admin.isTableAvailable(table.getName())) { 707 Thread.sleep(200); 708 LOG.info("Waiting for new region assignment to happen"); 709 } 710 } 711 712 // Perform the actual load 713 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 714 Path tableDir = testDir; 715 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 716 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 717 if (writeMultipleTables) { 718 tableDir = new Path(testDir, tableNameStr); 719 } 720 Table currentTable = allTables.get(tableNameStr); 721 TableName currentTableName = currentTable.getName(); 722 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo 723 .getRegionLocator()); 724 725 // Ensure data shows up 726 int expectedRows = 0; 727 if (putSortReducer) { 728 // no rows should be extracted 729 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 730 util.countRows(currentTable)); 731 } else { 732 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 733 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 734 util.countRows(currentTable)); 735 Scan scan = new Scan(); 736 ResultScanner results = currentTable.getScanner(scan); 737 for (Result res : results) { 738 assertEquals(FAMILIES.length, res.rawCells().length); 739 Cell first = res.rawCells()[0]; 740 for (Cell kv : res.rawCells()) { 741 assertTrue(CellUtil.matchingRows(first, kv)); 742 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 743 } 744 } 745 results.close(); 746 } 747 String tableDigestBefore = util.checksumRows(currentTable); 748 // Check region locality 749 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 750 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 751 hbd.add(region.getHDFSBlocksDistribution()); 752 } 753 for (String hostname : hostnames) { 754 float locality = hbd.getBlockLocalityIndex(hostname); 755 LOG.info("locality of [" + hostname + "]: " + locality); 756 assertEquals(100, (int) (locality * 100)); 757 } 758 759 // Cause regions to reopen 760 admin.disableTable(currentTableName); 761 while (!admin.isTableDisabled(currentTableName)) { 762 Thread.sleep(200); 763 LOG.info("Waiting for table to disable"); 764 } 765 admin.enableTable(currentTableName); 766 util.waitTableAvailable(currentTableName); 767 assertEquals("Data should remain after reopening of regions", 768 tableDigestBefore, util.checksumRows(currentTable)); 769 } 770 } finally { 771 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 772 tableInfoSingle.getRegionLocator().close(); 773 } 774 for (Entry<String, Table> singleTable : allTables.entrySet() ) { 775 singleTable.getValue().close(); 776 util.deleteTable(singleTable.getValue().getName()); 777 } 778 testDir.getFileSystem(conf).delete(testDir, true); 779 util.shutdownMiniCluster(); 780 } 781 } 782 783 private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, 784 boolean putSortReducer) throws IOException, 785 InterruptedException, ClassNotFoundException { 786 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 787 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 788 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 789 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 790 KeyValueSerialization.class.getName()); 791 setupRandomGeneratorMapper(job, putSortReducer); 792 if (tableInfo.size() > 1) { 793 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 794 int sum = 0; 795 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 796 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 797 } 798 assertEquals(sum, job.getNumReduceTasks()); 799 } 800 else { 801 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 802 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 803 regionLocator); 804 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 805 } 806 807 FileOutputFormat.setOutputPath(job, outDir); 808 809 assertFalse(util.getTestFileSystem().exists(outDir)) ; 810 811 assertTrue(job.waitForCompletion(true)); 812 } 813 814 /** 815 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. 816 * Tests that the family compression map is correctly serialized into 817 * and deserialized from configuration 818 * 819 * @throws IOException 820 */ 821 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 822 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 823 for (int numCfs = 0; numCfs <= 3; numCfs++) { 824 Configuration conf = new Configuration(this.util.getConfiguration()); 825 Map<String, Compression.Algorithm> familyToCompression = 826 getMockColumnFamiliesForCompression(numCfs); 827 Table table = Mockito.mock(Table.class); 828 setupMockColumnFamiliesForCompression(table, familyToCompression); 829 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 830 HFileOutputFormat2.serializeColumnFamilyAttribute 831 (HFileOutputFormat2.compressionDetails, 832 Arrays.asList(table.getTableDescriptor()))); 833 834 // read back family specific compression setting from the configuration 835 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2 836 .createFamilyCompressionMap(conf); 837 838 // test that we have a value for all column families that matches with the 839 // used mock values 840 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 841 assertEquals("Compression configuration incorrect for column family:" 842 + entry.getKey(), entry.getValue(), 843 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 844 } 845 } 846 } 847 848 private void setupMockColumnFamiliesForCompression(Table table, 849 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 850 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 851 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 852 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 853 .setMaxVersions(1) 854 .setCompressionType(entry.getValue()) 855 .setBlockCacheEnabled(false) 856 .setTimeToLive(0)); 857 } 858 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 859 } 860 861 /** 862 * @return a map from column family names to compression algorithms for 863 * testing column family compression. Column family names have special characters 864 */ 865 private Map<String, Compression.Algorithm> 866 getMockColumnFamiliesForCompression (int numCfs) { 867 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 868 // use column family names having special characters 869 if (numCfs-- > 0) { 870 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 871 } 872 if (numCfs-- > 0) { 873 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 874 } 875 if (numCfs-- > 0) { 876 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 877 } 878 if (numCfs-- > 0) { 879 familyToCompression.put("Family3", Compression.Algorithm.NONE); 880 } 881 return familyToCompression; 882 } 883 884 885 /** 886 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. 887 * Tests that the family bloom type map is correctly serialized into 888 * and deserialized from configuration 889 * 890 * @throws IOException 891 */ 892 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 893 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 894 for (int numCfs = 0; numCfs <= 2; numCfs++) { 895 Configuration conf = new Configuration(this.util.getConfiguration()); 896 Map<String, BloomType> familyToBloomType = 897 getMockColumnFamiliesForBloomType(numCfs); 898 Table table = Mockito.mock(Table.class); 899 setupMockColumnFamiliesForBloomType(table, 900 familyToBloomType); 901 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 902 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 903 Arrays.asList(table.getTableDescriptor()))); 904 905 // read back family specific data block encoding settings from the 906 // configuration 907 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 908 HFileOutputFormat2 909 .createFamilyBloomTypeMap(conf); 910 911 // test that we have a value for all column families that matches with the 912 // used mock values 913 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 914 assertEquals("BloomType configuration incorrect for column family:" 915 + entry.getKey(), entry.getValue(), 916 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 917 } 918 } 919 } 920 921 private void setupMockColumnFamiliesForBloomType(Table table, 922 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 923 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 924 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 925 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 926 .setMaxVersions(1) 927 .setBloomFilterType(entry.getValue()) 928 .setBlockCacheEnabled(false) 929 .setTimeToLive(0)); 930 } 931 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 932 } 933 934 /** 935 * @return a map from column family names to compression algorithms for 936 * testing column family compression. Column family names have special characters 937 */ 938 private Map<String, BloomType> 939 getMockColumnFamiliesForBloomType (int numCfs) { 940 Map<String, BloomType> familyToBloomType = new HashMap<>(); 941 // use column family names having special characters 942 if (numCfs-- > 0) { 943 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 944 } 945 if (numCfs-- > 0) { 946 familyToBloomType.put("Family2=asdads&!AASD", 947 BloomType.ROWCOL); 948 } 949 if (numCfs-- > 0) { 950 familyToBloomType.put("Family3", BloomType.NONE); 951 } 952 return familyToBloomType; 953 } 954 955 /** 956 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. 957 * Tests that the family block size map is correctly serialized into 958 * and deserialized from configuration 959 * 960 * @throws IOException 961 */ 962 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 963 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 964 for (int numCfs = 0; numCfs <= 3; numCfs++) { 965 Configuration conf = new Configuration(this.util.getConfiguration()); 966 Map<String, Integer> familyToBlockSize = 967 getMockColumnFamiliesForBlockSize(numCfs); 968 Table table = Mockito.mock(Table.class); 969 setupMockColumnFamiliesForBlockSize(table, 970 familyToBlockSize); 971 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 972 HFileOutputFormat2.serializeColumnFamilyAttribute 973 (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table 974 .getTableDescriptor()))); 975 976 // read back family specific data block encoding settings from the 977 // configuration 978 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 979 HFileOutputFormat2 980 .createFamilyBlockSizeMap(conf); 981 982 // test that we have a value for all column families that matches with the 983 // used mock values 984 for (Entry<String, Integer> entry : familyToBlockSize.entrySet() 985 ) { 986 assertEquals("BlockSize configuration incorrect for column family:" 987 + entry.getKey(), entry.getValue(), 988 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 989 } 990 } 991 } 992 993 private void setupMockColumnFamiliesForBlockSize(Table table, 994 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 995 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 996 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 997 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 998 .setMaxVersions(1) 999 .setBlocksize(entry.getValue()) 1000 .setBlockCacheEnabled(false) 1001 .setTimeToLive(0)); 1002 } 1003 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1004 } 1005 1006 /** 1007 * @return a map from column family names to compression algorithms for 1008 * testing column family compression. Column family names have special characters 1009 */ 1010 private Map<String, Integer> 1011 getMockColumnFamiliesForBlockSize (int numCfs) { 1012 Map<String, Integer> familyToBlockSize = new HashMap<>(); 1013 // use column family names having special characters 1014 if (numCfs-- > 0) { 1015 familyToBlockSize.put("Family1!@#!@#&", 1234); 1016 } 1017 if (numCfs-- > 0) { 1018 familyToBlockSize.put("Family2=asdads&!AASD", 1019 Integer.MAX_VALUE); 1020 } 1021 if (numCfs-- > 0) { 1022 familyToBlockSize.put("Family2=asdads&!AASD", 1023 Integer.MAX_VALUE); 1024 } 1025 if (numCfs-- > 0) { 1026 familyToBlockSize.put("Family3", 0); 1027 } 1028 return familyToBlockSize; 1029 } 1030 1031 /** 1032 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. 1033 * Tests that the family data block encoding map is correctly serialized into 1034 * and deserialized from configuration 1035 * 1036 * @throws IOException 1037 */ 1038 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1039 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1040 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1041 Configuration conf = new Configuration(this.util.getConfiguration()); 1042 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1043 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1044 Table table = Mockito.mock(Table.class); 1045 setupMockColumnFamiliesForDataBlockEncoding(table, 1046 familyToDataBlockEncoding); 1047 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1048 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1049 HFileOutputFormat2.serializeColumnFamilyAttribute 1050 (HFileOutputFormat2.dataBlockEncodingDetails, Arrays 1051 .asList(tableDescriptor))); 1052 1053 // read back family specific data block encoding settings from the 1054 // configuration 1055 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1056 HFileOutputFormat2 1057 .createFamilyDataBlockEncodingMap(conf); 1058 1059 // test that we have a value for all column families that matches with the 1060 // used mock values 1061 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1062 assertEquals("DataBlockEncoding configuration incorrect for column family:" 1063 + entry.getKey(), entry.getValue(), 1064 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1065 } 1066 } 1067 } 1068 1069 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1070 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1071 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1072 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1073 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) 1074 .setMaxVersions(1) 1075 .setDataBlockEncoding(entry.getValue()) 1076 .setBlockCacheEnabled(false) 1077 .setTimeToLive(0)); 1078 } 1079 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1080 } 1081 1082 /** 1083 * @return a map from column family names to compression algorithms for 1084 * testing column family compression. Column family names have special characters 1085 */ 1086 private Map<String, DataBlockEncoding> 1087 getMockColumnFamiliesForDataBlockEncoding (int numCfs) { 1088 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1089 // use column family names having special characters 1090 if (numCfs-- > 0) { 1091 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1092 } 1093 if (numCfs-- > 0) { 1094 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1095 DataBlockEncoding.FAST_DIFF); 1096 } 1097 if (numCfs-- > 0) { 1098 familyToDataBlockEncoding.put("Family2=asdads&!AASD", 1099 DataBlockEncoding.PREFIX); 1100 } 1101 if (numCfs-- > 0) { 1102 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1103 } 1104 return familyToDataBlockEncoding; 1105 } 1106 1107 private void setupMockStartKeys(RegionLocator table) throws IOException { 1108 byte[][] mockKeys = new byte[][] { 1109 HConstants.EMPTY_BYTE_ARRAY, 1110 Bytes.toBytes("aaa"), 1111 Bytes.toBytes("ggg"), 1112 Bytes.toBytes("zzz") 1113 }; 1114 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1115 } 1116 1117 private void setupMockTableName(RegionLocator table) throws IOException { 1118 TableName mockTableName = TableName.valueOf("mock_table"); 1119 Mockito.doReturn(mockTableName).when(table).getName(); 1120 } 1121 1122 /** 1123 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and 1124 * bloom filter settings from the column family descriptor 1125 */ 1126 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1127 public void testColumnFamilySettings() throws Exception { 1128 Configuration conf = new Configuration(this.util.getConfiguration()); 1129 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1130 TaskAttemptContext context = null; 1131 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1132 1133 // Setup table descriptor 1134 Table table = Mockito.mock(Table.class); 1135 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1136 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1137 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1138 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { 1139 htd.addFamily(hcd); 1140 } 1141 1142 // set up the table to return some mock keys 1143 setupMockStartKeys(regionLocator); 1144 1145 try { 1146 // partial map red setup to get an operational writer for testing 1147 // We turn off the sequence file compression, because DefaultCodec 1148 // pollutes the GZip codec pool with an incompatible compressor. 1149 conf.set("io.seqfile.compression.type", "NONE"); 1150 conf.set("hbase.fs.tmp.dir", dir.toString()); 1151 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1152 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1153 1154 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1155 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1156 setupRandomGeneratorMapper(job, false); 1157 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1158 FileOutputFormat.setOutputPath(job, dir); 1159 context = createTestTaskAttemptContext(job); 1160 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1161 writer = hof.getRecordWriter(context); 1162 1163 // write out random rows 1164 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1165 writer.close(context); 1166 1167 // Make sure that a directory was created for every CF 1168 FileSystem fs = dir.getFileSystem(conf); 1169 1170 // commit so that the filesystem has one directory per column family 1171 hof.getOutputCommitter(context).commitTask(context); 1172 hof.getOutputCommitter(context).commitJob(context); 1173 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1174 assertEquals(htd.getFamilies().size(), families.length); 1175 for (FileStatus f : families) { 1176 String familyStr = f.getPath().getName(); 1177 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1178 // verify that the compression on this file matches the configured 1179 // compression 1180 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1181 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1182 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1183 1184 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1185 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1186 assertEquals("Incorrect bloom filter used for column family " + familyStr + 1187 "(reader: " + reader + ")", 1188 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1189 assertEquals( 1190 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1191 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1192 } 1193 } finally { 1194 dir.getFileSystem(conf).delete(dir, true); 1195 } 1196 } 1197 1198 /** 1199 * Write random values to the writer assuming a table created using 1200 * {@link #FAMILIES} as column family descriptors 1201 */ 1202 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1203 TaskAttemptContext context, Set<byte[]> families, int numRows) 1204 throws IOException, InterruptedException { 1205 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1206 int valLength = 10; 1207 byte valBytes[] = new byte[valLength]; 1208 1209 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1210 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1211 final byte [] qualifier = Bytes.toBytes("data"); 1212 Random random = new Random(); 1213 for (int i = 0; i < numRows; i++) { 1214 1215 Bytes.putInt(keyBytes, 0, i); 1216 random.nextBytes(valBytes); 1217 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1218 1219 for (byte[] family : families) { 1220 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1221 writer.write(key, kv); 1222 } 1223 } 1224 } 1225 1226 /** 1227 * This test is to test the scenario happened in HBASE-6901. 1228 * All files are bulk loaded and excluded from minor compaction. 1229 * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException 1230 * will be thrown. 1231 */ 1232 @Ignore ("Flakey: See HBASE-9051") @Test 1233 public void testExcludeAllFromMinorCompaction() throws Exception { 1234 Configuration conf = util.getConfiguration(); 1235 conf.setInt("hbase.hstore.compaction.min", 2); 1236 generateRandomStartKeys(5); 1237 1238 util.startMiniCluster(); 1239 try (Connection conn = ConnectionFactory.createConnection(); 1240 Admin admin = conn.getAdmin(); 1241 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1242 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1243 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1244 assertEquals("Should start with empty table", 0, util.countRows(table)); 1245 1246 // deep inspection: get the StoreFile dir 1247 final Path storePath = new Path( 1248 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1249 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1250 Bytes.toString(FAMILIES[0]))); 1251 assertEquals(0, fs.listStatus(storePath).length); 1252 1253 // Generate two bulk load files 1254 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1255 true); 1256 1257 for (int i = 0; i < 2; i++) { 1258 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1259 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1260 .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); 1261 // Perform the actual load 1262 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1263 } 1264 1265 // Ensure data shows up 1266 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1267 assertEquals("LoadIncrementalHFiles should put expected data in table", 1268 expectedRows, util.countRows(table)); 1269 1270 // should have a second StoreFile now 1271 assertEquals(2, fs.listStatus(storePath).length); 1272 1273 // minor compactions shouldn't get rid of the file 1274 admin.compact(TABLE_NAMES[0]); 1275 try { 1276 quickPoll(new Callable<Boolean>() { 1277 @Override 1278 public Boolean call() throws Exception { 1279 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1280 for (HRegion region : regions) { 1281 for (HStore store : region.getStores()) { 1282 store.closeAndArchiveCompactedFiles(); 1283 } 1284 } 1285 return fs.listStatus(storePath).length == 1; 1286 } 1287 }, 5000); 1288 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1289 } catch (AssertionError ae) { 1290 // this is expected behavior 1291 } 1292 1293 // a major compaction should work though 1294 admin.majorCompact(TABLE_NAMES[0]); 1295 quickPoll(new Callable<Boolean>() { 1296 @Override 1297 public Boolean call() throws Exception { 1298 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1299 for (HRegion region : regions) { 1300 for (HStore store : region.getStores()) { 1301 store.closeAndArchiveCompactedFiles(); 1302 } 1303 } 1304 return fs.listStatus(storePath).length == 1; 1305 } 1306 }, 5000); 1307 1308 } finally { 1309 util.shutdownMiniCluster(); 1310 } 1311 } 1312 1313 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test 1314 public void testExcludeMinorCompaction() throws Exception { 1315 Configuration conf = util.getConfiguration(); 1316 conf.setInt("hbase.hstore.compaction.min", 2); 1317 generateRandomStartKeys(5); 1318 1319 util.startMiniCluster(); 1320 try (Connection conn = ConnectionFactory.createConnection(conf); 1321 Admin admin = conn.getAdmin()){ 1322 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1323 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1324 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1325 assertEquals("Should start with empty table", 0, util.countRows(table)); 1326 1327 // deep inspection: get the StoreFile dir 1328 final Path storePath = new Path( 1329 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1330 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1331 Bytes.toString(FAMILIES[0]))); 1332 assertEquals(0, fs.listStatus(storePath).length); 1333 1334 // put some data in it and flush to create a storefile 1335 Put p = new Put(Bytes.toBytes("test")); 1336 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1337 table.put(p); 1338 admin.flush(TABLE_NAMES[0]); 1339 assertEquals(1, util.countRows(table)); 1340 quickPoll(new Callable<Boolean>() { 1341 @Override 1342 public Boolean call() throws Exception { 1343 return fs.listStatus(storePath).length == 1; 1344 } 1345 }, 5000); 1346 1347 // Generate a bulk load file with more rows 1348 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", 1349 true); 1350 1351 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1352 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table 1353 .getTableDescriptor(), regionLocator)), testDir, false); 1354 1355 // Perform the actual load 1356 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1357 1358 // Ensure data shows up 1359 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1360 assertEquals("LoadIncrementalHFiles should put expected data in table", 1361 expectedRows + 1, util.countRows(table)); 1362 1363 // should have a second StoreFile now 1364 assertEquals(2, fs.listStatus(storePath).length); 1365 1366 // minor compactions shouldn't get rid of the file 1367 admin.compact(TABLE_NAMES[0]); 1368 try { 1369 quickPoll(new Callable<Boolean>() { 1370 @Override 1371 public Boolean call() throws Exception { 1372 return fs.listStatus(storePath).length == 1; 1373 } 1374 }, 5000); 1375 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1376 } catch (AssertionError ae) { 1377 // this is expected behavior 1378 } 1379 1380 // a major compaction should work though 1381 admin.majorCompact(TABLE_NAMES[0]); 1382 quickPoll(new Callable<Boolean>() { 1383 @Override 1384 public Boolean call() throws Exception { 1385 return fs.listStatus(storePath).length == 1; 1386 } 1387 }, 5000); 1388 1389 } finally { 1390 util.shutdownMiniCluster(); 1391 } 1392 } 1393 1394 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1395 int sleepMs = 10; 1396 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1397 while (retries-- > 0) { 1398 if (c.call().booleanValue()) { 1399 return; 1400 } 1401 Thread.sleep(sleepMs); 1402 } 1403 fail(); 1404 } 1405 1406 public static void main(String args[]) throws Exception { 1407 new TestHFileOutputFormat2().manualTest(args); 1408 } 1409 1410 public void manualTest(String args[]) throws Exception { 1411 Configuration conf = HBaseConfiguration.create(); 1412 util = new HBaseTestingUtility(conf); 1413 if ("newtable".equals(args[0])) { 1414 TableName tname = TableName.valueOf(args[1]); 1415 byte[][] splitKeys = generateRandomSplitKeys(4); 1416 Table table = util.createTable(tname, FAMILIES, splitKeys); 1417 } else if ("incremental".equals(args[0])) { 1418 TableName tname = TableName.valueOf(args[1]); 1419 try(Connection c = ConnectionFactory.createConnection(conf); 1420 Admin admin = c.getAdmin(); 1421 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1422 Path outDir = new Path("incremental-out"); 1423 runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin 1424 .getTableDescriptor(tname), regionLocator)), outDir, false); 1425 } 1426 } else { 1427 throw new RuntimeException( 1428 "usage: TestHFileOutputFormat2 newtable | incremental"); 1429 } 1430 } 1431 1432 @Test 1433 public void testBlockStoragePolicy() throws Exception { 1434 util = new HBaseTestingUtility(); 1435 Configuration conf = util.getConfiguration(); 1436 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1437 1438 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + 1439 Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( 1440 TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); 1441 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1442 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1443 util.startMiniDFSCluster(3); 1444 FileSystem fs = util.getDFSCluster().getFileSystem(); 1445 try { 1446 fs.mkdirs(cf1Dir); 1447 fs.mkdirs(cf2Dir); 1448 1449 // the original block storage policy would be HOT 1450 String spA = getStoragePolicyName(fs, cf1Dir); 1451 String spB = getStoragePolicyName(fs, cf2Dir); 1452 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1453 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1454 assertEquals("HOT", spA); 1455 assertEquals("HOT", spB); 1456 1457 // alter table cf schema to change storage policies 1458 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1459 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1460 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1461 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1462 spA = getStoragePolicyName(fs, cf1Dir); 1463 spB = getStoragePolicyName(fs, cf2Dir); 1464 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1465 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1466 assertNotNull(spA); 1467 assertEquals("ONE_SSD", spA); 1468 assertNotNull(spB); 1469 assertEquals("ALL_SSD", spB); 1470 } finally { 1471 fs.delete(cf1Dir, true); 1472 fs.delete(cf2Dir, true); 1473 util.shutdownMiniDFSCluster(); 1474 } 1475 } 1476 1477 private String getStoragePolicyName(FileSystem fs, Path path) { 1478 try { 1479 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1480 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1481 } catch (Exception e) { 1482 // Maybe fail because of using old HDFS version, try the old way 1483 if (LOG.isTraceEnabled()) { 1484 LOG.trace("Failed to get policy directly", e); 1485 } 1486 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1487 return policy == null ? "HOT" : policy;// HOT by default 1488 } 1489 } 1490 1491 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1492 try { 1493 if (fs instanceof DistributedFileSystem) { 1494 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1495 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1496 if (null != status) { 1497 byte storagePolicyId = status.getStoragePolicy(); 1498 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1499 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1500 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1501 for (BlockStoragePolicy policy : policies) { 1502 if (policy.getId() == storagePolicyId) { 1503 return policy.getName(); 1504 } 1505 } 1506 } 1507 } 1508 } 1509 } catch (Throwable e) { 1510 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1511 } 1512 1513 return null; 1514 } 1515 1516 @Test 1517 public void TestConfigureCompression() throws Exception { 1518 Configuration conf = new Configuration(this.util.getConfiguration()); 1519 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1520 TaskAttemptContext context = null; 1521 Path dir = util.getDataTestDir("TestConfigureCompression"); 1522 String hfileoutputformatCompression = "gz"; 1523 1524 try { 1525 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1526 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1527 1528 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1529 1530 Job job = Job.getInstance(conf); 1531 FileOutputFormat.setOutputPath(job, dir); 1532 context = createTestTaskAttemptContext(job); 1533 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1534 writer = hof.getRecordWriter(context); 1535 final byte[] b = Bytes.toBytes("b"); 1536 1537 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1538 writer.write(new ImmutableBytesWritable(), kv); 1539 writer.close(context); 1540 writer = null; 1541 FileSystem fs = dir.getFileSystem(conf); 1542 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1543 while (iterator.hasNext()) { 1544 LocatedFileStatus keyFileStatus = iterator.next(); 1545 HFile.Reader reader = 1546 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1547 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1548 hfileoutputformatCompression); 1549 } 1550 } finally { 1551 if (writer != null && context != null) { 1552 writer.close(context); 1553 } 1554 dir.getFileSystem(conf).delete(dir, true); 1555 } 1556 1557 } 1558} 1559