001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.management.ManagementFactory; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeepDeletedCells; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueTestUtil; 044import org.apache.hadoop.hbase.MemoryCompactionPolicy; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 054import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.testclassification.RegionServerTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdge; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.junit.After; 063import org.junit.Before; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.Mockito; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * compacted memstore test case 073 */ 074@Category({ RegionServerTests.class, MediumTests.class }) 075public class TestCompactingMemStore extends TestDefaultMemStore { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestCompactingMemStore.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 082 protected static ChunkCreator chunkCreator; 083 protected HRegion region; 084 protected RegionServicesForStores regionServicesForStores; 085 protected HStore store; 086 087 @After 088 public void tearDown() throws Exception { 089 chunkCreator.clearChunksInPool(); 090 super.tearDown(); 091 } 092 093 @Override 094 @Before 095 public void setUp() throws Exception { 096 compactingSetUp(); 097 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), 098 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 099 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 100 } 101 102 protected void compactingSetUp() throws Exception { 103 super.internalSetUp(); 104 Configuration conf = new Configuration(); 105 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 106 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 107 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 108 HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf); 109 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY); 110 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar")) 111 .setColumnFamily(familyDescriptor).build(); 112 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build(); 113 WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info); 114 this.region = 115 HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true); 116 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 117 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 118 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 119 this.store = new HStore(region, familyDescriptor, conf, false); 120 121 long globalMemStoreLimit = 122 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 123 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 124 chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 125 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 126 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 127 assertNotNull(chunkCreator); 128 } 129 130 /** 131 * A simple test which flush in memory affect timeOfOldestEdit 132 */ 133 @Test 134 public void testTimeOfOldestEdit() { 135 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 136 final byte[] r = Bytes.toBytes("r"); 137 final byte[] f = Bytes.toBytes("f"); 138 final byte[] q = Bytes.toBytes("q"); 139 final byte[] v = Bytes.toBytes("v"); 140 final KeyValue kv = new KeyValue(r, f, q, v); 141 memstore.add(kv, null); 142 long timeOfOldestEdit = memstore.timeOfOldestEdit(); 143 assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit); 144 145 ((CompactingMemStore) memstore).flushInMemory(); 146 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 147 memstore.add(kv, null); 148 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 149 memstore.snapshot(); 150 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 151 } 152 153 /** 154 * A simple test which verifies the 3 possible states when scanning across snapshot. 155 */ 156 @Override 157 @Test 158 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 159 // we are going to the scanning across snapshot with two kvs 160 // kv1 should always be returned before kv2 161 final byte[] one = Bytes.toBytes(1); 162 final byte[] two = Bytes.toBytes(2); 163 final byte[] f = Bytes.toBytes("f"); 164 final byte[] q = Bytes.toBytes("q"); 165 final byte[] v = Bytes.toBytes(3); 166 167 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 168 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 169 170 // use case 1: both kvs in kvset 171 this.memstore.add(kv1.clone(), null); 172 this.memstore.add(kv2.clone(), null); 173 // snapshot is empty,active segment is not empty, 174 // empty segment is skipped. 175 verifyOneScanAcrossSnapshot2(kv1, kv2); 176 177 // use case 2: both kvs in snapshot 178 this.memstore.snapshot(); 179 // active segment is empty,snapshot is not empty, 180 // empty segment is skipped. 181 verifyOneScanAcrossSnapshot2(kv1, kv2); 182 183 // use case 3: first in snapshot second in kvset 184 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 185 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 186 this.memstore.add(kv1.clone(), null); 187 // As compaction is starting in the background the repetition 188 // of the k1 might be removed BUT the scanners created earlier 189 // should look on the OLD MutableCellSetSegment, so this should be OK... 190 this.memstore.snapshot(); 191 this.memstore.add(kv2.clone(), null); 192 verifyScanAcrossSnapshot2(kv1, kv2); 193 } 194 195 /** 196 * Test memstore snapshots 197 */ 198 @Override 199 @Test 200 public void testSnapshotting() throws IOException { 201 final int snapshotCount = 5; 202 // Add some rows, run a snapshot. Do it a few times. 203 for (int i = 0; i < snapshotCount; i++) { 204 addRows(this.memstore); 205 runSnapshot(this.memstore, true); 206 assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); 207 } 208 } 209 210 ////////////////////////////////////////////////////////////////////////////// 211 // Get tests 212 ////////////////////////////////////////////////////////////////////////////// 213 214 /** 215 * Test getNextRow from memstore 216 */ 217 @Override 218 @Test 219 public void testGetNextRow() throws Exception { 220 addRows(this.memstore); 221 // Add more versions to make it a little more interesting. 222 Thread.sleep(1); 223 addRows(this.memstore); 224 Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY); 225 assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, 226 new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0); 227 for (int i = 0; i < ROW_COUNT; i++) { 228 Cell nr = ((CompactingMemStore) this.memstore) 229 .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime())); 230 if (i + 1 == ROW_COUNT) { 231 assertNull(nr); 232 } else { 233 assertTrue(CellComparator.getInstance().compareRows(nr, 234 new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0); 235 } 236 } 237 // starting from each row, validate results should contain the starting row 238 Configuration conf = HBaseConfiguration.create(); 239 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 240 ScanInfo scanInfo = 241 new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, 242 HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 243 try (InternalScanner scanner = 244 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 245 memstore.getScanners(0))) { 246 List<Cell> results = new ArrayList<>(); 247 for (int i = 0; scanner.next(results); i++) { 248 int rowId = startRowId + i; 249 Cell left = results.get(0); 250 byte[] row1 = Bytes.toBytes(rowId); 251 assertTrue("Row name", 252 CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); 253 assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); 254 List<Cell> row = new ArrayList<>(); 255 for (Cell kv : results) { 256 row.add(kv); 257 } 258 isExpectedRowWithoutTimestamps(rowId, row); 259 // Clear out set. Otherwise row results accumulate. 260 results.clear(); 261 } 262 } 263 } 264 } 265 266 @Override 267 @Test 268 public void testGet_memstoreAndSnapShot() throws IOException { 269 byte[] row = Bytes.toBytes("testrow"); 270 byte[] fam = Bytes.toBytes("testfamily"); 271 byte[] qf1 = Bytes.toBytes("testqualifier1"); 272 byte[] qf2 = Bytes.toBytes("testqualifier2"); 273 byte[] qf3 = Bytes.toBytes("testqualifier3"); 274 byte[] qf4 = Bytes.toBytes("testqualifier4"); 275 byte[] qf5 = Bytes.toBytes("testqualifier5"); 276 byte[] val = Bytes.toBytes("testval"); 277 278 // Setting up memstore 279 memstore.add(new KeyValue(row, fam, qf1, val), null); 280 memstore.add(new KeyValue(row, fam, qf2, val), null); 281 memstore.add(new KeyValue(row, fam, qf3, val), null); 282 // Pushing to pipeline 283 ((CompactingMemStore) memstore).flushInMemory(); 284 assertEquals(0, memstore.getSnapshot().getCellsCount()); 285 // Creating a snapshot 286 memstore.snapshot(); 287 assertEquals(3, memstore.getSnapshot().getCellsCount()); 288 // Adding value to "new" memstore 289 assertEquals(0, memstore.getActive().getCellsCount()); 290 memstore.add(new KeyValue(row, fam, qf4, val), null); 291 memstore.add(new KeyValue(row, fam, qf5, val), null); 292 assertEquals(2, memstore.getActive().getCellsCount()); 293 } 294 295 //////////////////////////////////// 296 // Test for periodic memstore flushes 297 // based on time of oldest edit 298 //////////////////////////////////// 299 300 /** 301 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older 302 * keyvalues are deleted from the memstore. 303 */ 304 @Override 305 @Test 306 public void testUpsertMemstoreSize() throws Exception { 307 MemStoreSize oldSize = memstore.size(); 308 309 List<ExtendedCell> l = new ArrayList<>(); 310 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 311 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 312 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 313 314 kv1.setSequenceId(1); 315 kv2.setSequenceId(1); 316 kv3.setSequenceId(1); 317 l.add(kv1); 318 l.add(kv2); 319 l.add(kv3); 320 321 this.memstore.upsert(l, 2, null);// readpoint is 2 322 MemStoreSize newSize = this.memstore.size(); 323 assert (newSize.getDataSize() > oldSize.getDataSize()); 324 // The kv1 should be removed. 325 assert (memstore.getActive().getCellsCount() == 2); 326 327 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 328 kv4.setSequenceId(1); 329 l.clear(); 330 l.add(kv4); 331 this.memstore.upsert(l, 3, null); 332 assertEquals(newSize, this.memstore.size()); 333 // The kv2 should be removed. 334 assert (memstore.getActive().getCellsCount() == 2); 335 // this.memstore = null; 336 } 337 338 /** 339 * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in 340 * memstore. 341 */ 342 @Override 343 @Test 344 public void testUpdateToTimeOfOldestEdit() throws Exception { 345 try { 346 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 347 EnvironmentEdgeManager.injectEdge(edge); 348 long t = memstore.timeOfOldestEdit(); 349 assertEquals(Long.MAX_VALUE, t); 350 351 // test the case that the timeOfOldestEdit is updated after a KV add 352 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 353 t = memstore.timeOfOldestEdit(); 354 assertTrue(t == 1234); 355 // The method will also assert 356 // the value is reset to Long.MAX_VALUE 357 t = runSnapshot(memstore, true); 358 359 // test the case that the timeOfOldestEdit is updated after a KV delete 360 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 361 t = memstore.timeOfOldestEdit(); 362 assertTrue(t == 1234); 363 t = runSnapshot(memstore, true); 364 365 // test the case that the timeOfOldestEdit is updated after a KV upsert 366 List<ExtendedCell> l = new ArrayList<>(); 367 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 368 kv1.setSequenceId(100); 369 l.add(kv1); 370 memstore.upsert(l, 1000, null); 371 t = memstore.timeOfOldestEdit(); 372 assertTrue(t == 1234); 373 } finally { 374 EnvironmentEdgeManager.reset(); 375 } 376 } 377 378 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { 379 // Save off old state. 380 long oldHistorySize = hmc.getSnapshot().getDataSize(); 381 long prevTimeStamp = hmc.timeOfOldestEdit(); 382 383 hmc.snapshot(); 384 MemStoreSnapshot snapshot = hmc.snapshot(); 385 if (useForce) { 386 // Make some assertions about what just happened. 387 assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); 388 long t = hmc.timeOfOldestEdit(); 389 assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); 390 hmc.clearSnapshot(snapshot.getId()); 391 } else { 392 long t = hmc.timeOfOldestEdit(); 393 assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); 394 } 395 return prevTimeStamp; 396 } 397 398 private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) { 399 int i = 0; 400 for (Cell kv : kvs) { 401 byte[] expectedColname = makeQualifier(rowIndex, i++); 402 assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); 403 // Value is column name as bytes. Usually result is 404 // 100 bytes in size at least. This is the default size 405 // for BytesWriteable. For comparison, convert bytes to 406 // String and trim to remove trailing null bytes. 407 assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); 408 } 409 } 410 411 @Test 412 public void testPuttingBackChunksAfterFlushing() throws IOException { 413 byte[] row = Bytes.toBytes("testrow"); 414 byte[] fam = Bytes.toBytes("testfamily"); 415 byte[] qf1 = Bytes.toBytes("testqualifier1"); 416 byte[] qf2 = Bytes.toBytes("testqualifier2"); 417 byte[] qf3 = Bytes.toBytes("testqualifier3"); 418 byte[] qf4 = Bytes.toBytes("testqualifier4"); 419 byte[] qf5 = Bytes.toBytes("testqualifier5"); 420 byte[] val = Bytes.toBytes("testval"); 421 422 // Setting up memstore 423 memstore.add(new KeyValue(row, fam, qf1, val), null); 424 memstore.add(new KeyValue(row, fam, qf2, val), null); 425 memstore.add(new KeyValue(row, fam, qf3, val), null); 426 427 // Creating a snapshot 428 MemStoreSnapshot snapshot = memstore.snapshot(); 429 assertEquals(3, memstore.getSnapshot().getCellsCount()); 430 431 // Adding value to "new" memstore 432 assertEquals(0, memstore.getActive().getCellsCount()); 433 memstore.add(new KeyValue(row, fam, qf4, val), null); 434 memstore.add(new KeyValue(row, fam, qf5, val), null); 435 assertEquals(2, memstore.getActive().getCellsCount()); 436 // close the scanners 437 for (KeyValueScanner scanner : snapshot.getScanners()) { 438 scanner.close(); 439 } 440 memstore.clearSnapshot(snapshot.getId()); 441 442 int chunkCount = chunkCreator.getPoolSize(); 443 assertTrue(chunkCount > 0); 444 445 } 446 447 @Test 448 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 449 byte[] row = Bytes.toBytes("testrow"); 450 byte[] fam = Bytes.toBytes("testfamily"); 451 byte[] qf1 = Bytes.toBytes("testqualifier1"); 452 byte[] qf2 = Bytes.toBytes("testqualifier2"); 453 byte[] qf3 = Bytes.toBytes("testqualifier3"); 454 byte[] qf4 = Bytes.toBytes("testqualifier4"); 455 byte[] qf5 = Bytes.toBytes("testqualifier5"); 456 byte[] qf6 = Bytes.toBytes("testqualifier6"); 457 byte[] qf7 = Bytes.toBytes("testqualifier7"); 458 byte[] val = Bytes.toBytes("testval"); 459 460 // Setting up memstore 461 memstore.add(new KeyValue(row, fam, qf1, val), null); 462 memstore.add(new KeyValue(row, fam, qf2, val), null); 463 memstore.add(new KeyValue(row, fam, qf3, val), null); 464 465 // Creating a snapshot 466 MemStoreSnapshot snapshot = memstore.snapshot(); 467 assertEquals(3, memstore.getSnapshot().getCellsCount()); 468 469 // Adding value to "new" memstore 470 assertEquals(0, memstore.getActive().getCellsCount()); 471 memstore.add(new KeyValue(row, fam, qf4, val), null); 472 memstore.add(new KeyValue(row, fam, qf5, val), null); 473 assertEquals(2, memstore.getActive().getCellsCount()); 474 475 // opening scanner before clear the snapshot 476 List<KeyValueScanner> scanners = memstore.getScanners(0); 477 // Shouldn't putting back the chunks to pool,since some scanners are opening 478 // based on their data 479 // close the scanners 480 for (KeyValueScanner scanner : snapshot.getScanners()) { 481 scanner.close(); 482 } 483 memstore.clearSnapshot(snapshot.getId()); 484 485 assertTrue(chunkCreator.getPoolSize() == 0); 486 487 // Chunks will be put back to pool after close scanners; 488 for (KeyValueScanner scanner : scanners) { 489 scanner.close(); 490 } 491 assertTrue(chunkCreator.getPoolSize() > 0); 492 493 // clear chunks 494 chunkCreator.clearChunksInPool(); 495 496 // Creating another snapshot 497 498 snapshot = memstore.snapshot(); 499 // Adding more value 500 memstore.add(new KeyValue(row, fam, qf6, val), null); 501 memstore.add(new KeyValue(row, fam, qf7, val), null); 502 // opening scanners 503 scanners = memstore.getScanners(0); 504 // close scanners before clear the snapshot 505 for (KeyValueScanner scanner : scanners) { 506 scanner.close(); 507 } 508 // Since no opening scanner, the chunks of snapshot should be put back to 509 // pool 510 // close the scanners 511 for (KeyValueScanner scanner : snapshot.getScanners()) { 512 scanner.close(); 513 } 514 memstore.clearSnapshot(snapshot.getId()); 515 assertTrue(chunkCreator.getPoolSize() > 0); 516 } 517 518 @Test 519 public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { 520 521 // set memstore to do data compaction and not to use the speculative scan 522 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 523 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 524 String.valueOf(compactionType)); 525 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 526 527 byte[] row = Bytes.toBytes("testrow"); 528 byte[] fam = Bytes.toBytes("testfamily"); 529 byte[] qf1 = Bytes.toBytes("testqualifier1"); 530 byte[] qf2 = Bytes.toBytes("testqualifier2"); 531 byte[] qf3 = Bytes.toBytes("testqualifier3"); 532 byte[] val = Bytes.toBytes("testval"); 533 534 // Setting up memstore 535 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 536 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 537 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 538 539 // Creating a pipeline 540 ((MyCompactingMemStore) memstore).disableCompaction(); 541 ((CompactingMemStore) memstore).flushInMemory(); 542 543 // Adding value to "new" memstore 544 assertEquals(0, memstore.getActive().getCellsCount()); 545 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 546 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 547 assertEquals(2, memstore.getActive().getCellsCount()); 548 549 // pipeline bucket 2 550 ((CompactingMemStore) memstore).flushInMemory(); 551 // opening scanner before force flushing 552 List<KeyValueScanner> scanners = memstore.getScanners(0); 553 // Shouldn't putting back the chunks to pool,since some scanners are opening 554 // based on their data 555 ((MyCompactingMemStore) memstore).enableCompaction(); 556 // trigger compaction 557 ((CompactingMemStore) memstore).flushInMemory(); 558 559 // Adding value to "new" memstore 560 assertEquals(0, memstore.getActive().getCellsCount()); 561 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 562 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 563 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 564 assertEquals(3, memstore.getActive().getCellsCount()); 565 566 assertTrue(chunkCreator.getPoolSize() == 0); 567 568 // Chunks will be put back to pool after close scanners; 569 for (KeyValueScanner scanner : scanners) { 570 scanner.close(); 571 } 572 assertTrue(chunkCreator.getPoolSize() > 0); 573 574 // clear chunks 575 chunkCreator.clearChunksInPool(); 576 577 // Creating another snapshot 578 579 MemStoreSnapshot snapshot = memstore.snapshot(); 580 // close the scanners 581 for (KeyValueScanner scanner : snapshot.getScanners()) { 582 scanner.close(); 583 } 584 memstore.clearSnapshot(snapshot.getId()); 585 586 snapshot = memstore.snapshot(); 587 // Adding more value 588 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 589 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 590 // opening scanners 591 scanners = memstore.getScanners(0); 592 // close scanners before clear the snapshot 593 for (KeyValueScanner scanner : scanners) { 594 scanner.close(); 595 } 596 // Since no opening scanner, the chunks of snapshot should be put back to 597 // pool 598 // close the scanners 599 for (KeyValueScanner scanner : snapshot.getScanners()) { 600 scanner.close(); 601 } 602 memstore.clearSnapshot(snapshot.getId()); 603 assertTrue(chunkCreator.getPoolSize() > 0); 604 } 605 606 ////////////////////////////////////////////////////////////////////////////// 607 // Compaction tests 608 ////////////////////////////////////////////////////////////////////////////// 609 @Test 610 public void testCompaction1Bucket() throws IOException { 611 612 // set memstore to do basic structure flattening, the "eager" option is tested in 613 // TestCompactingToCellFlatMapMemStore 614 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 615 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 616 String.valueOf(compactionType)); 617 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 618 619 String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4 620 621 // test 1 bucket 622 int totalCellsLen = addRowsByKeys(memstore, keys1); 623 int oneCellOnCSLMHeapSize = 120; 624 int oneCellOnCAHeapSize = 88; 625 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 626 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 627 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 628 629 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 630 assertEquals(0, memstore.getSnapshot().getCellsCount()); 631 // There is no compaction, as the compacting memstore type is basic. 632 // totalCellsLen remains the same 633 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 634 + 4 * oneCellOnCAHeapSize; 635 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 636 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 637 638 MemStoreSize mss = memstore.getFlushableSize(); 639 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 640 // simulate flusher 641 region.decrMemStoreSize(mss); 642 ImmutableSegment s = memstore.getSnapshot(); 643 assertEquals(4, s.getCellsCount()); 644 assertEquals(0, regionServicesForStores.getMemStoreSize()); 645 646 memstore.clearSnapshot(snapshot.getId()); 647 } 648 649 @Test 650 public void testCompaction2Buckets() throws IOException { 651 652 // set memstore to do basic structure flattening, the "eager" option is tested in 653 // TestCompactingToCellFlatMapMemStore 654 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 655 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 656 String.valueOf(compactionType)); 657 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 658 String.valueOf(1)); 659 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 660 String[] keys1 = { "A", "A", "B", "C" }; 661 String[] keys2 = { "A", "B", "D" }; 662 663 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 664 int oneCellOnCSLMHeapSize = 120; 665 int oneCellOnCAHeapSize = 88; 666 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 667 668 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 669 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 670 671 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 672 int counter = 0; 673 for (Segment s : memstore.getSegments()) { 674 counter += s.getCellsCount(); 675 } 676 assertEquals(4, counter); 677 assertEquals(0, memstore.getSnapshot().getCellsCount()); 678 // There is no compaction, as the compacting memstore type is basic. 679 // totalCellsLen remains the same 680 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 681 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 682 + 4 * oneCellOnCAHeapSize; 683 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 684 685 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 686 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 687 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 688 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 689 690 MemStoreSize mss = memstore.getFlushableSize(); 691 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 692 assertEquals(0, memstore.getSnapshot().getCellsCount()); 693 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 694 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 695 + 7 * oneCellOnCAHeapSize; 696 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 697 698 mss = memstore.getFlushableSize(); 699 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 700 // simulate flusher 701 region.decrMemStoreSize(mss); 702 ImmutableSegment s = memstore.getSnapshot(); 703 assertEquals(7, s.getCellsCount()); 704 assertEquals(0, regionServicesForStores.getMemStoreSize()); 705 706 memstore.clearSnapshot(snapshot.getId()); 707 } 708 709 @Test 710 public void testCompaction3Buckets() throws IOException { 711 712 // set memstore to do data compaction and not to use the speculative scan 713 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 714 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 715 String.valueOf(compactionType)); 716 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 717 String[] keys1 = { "A", "A", "B", "C" }; 718 String[] keys2 = { "A", "B", "D" }; 719 String[] keys3 = { "D", "B", "B" }; 720 721 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 722 int oneCellOnCSLMHeapSize = 120; 723 int oneCellOnCAHeapSize = 88; 724 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 725 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 726 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 727 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 728 729 assertEquals(0, memstore.getSnapshot().getCellsCount()); 730 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 731 // totalCellsLen 732 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 733 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 734 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 735 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 736 + 3 * oneCellOnCAHeapSize; 737 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 738 739 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 740 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 741 742 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 743 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 744 745 ((MyCompactingMemStore) memstore).disableCompaction(); 746 MemStoreSize mss = memstore.getFlushableSize(); 747 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 748 assertEquals(0, memstore.getSnapshot().getCellsCount()); 749 // No change in the cells data size. ie. memstore size. as there is no compaction. 750 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 751 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 752 ((CompactingMemStore) memstore).heapSize()); 753 754 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 755 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 756 regionServicesForStores.getMemStoreSize()); 757 long totalHeapSize3 = 758 totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize; 759 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 760 761 ((MyCompactingMemStore) memstore).enableCompaction(); 762 mss = memstore.getFlushableSize(); 763 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 764 assertEquals(0, memstore.getSnapshot().getCellsCount()); 765 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 766 // Out of total 10, only 4 cells are unique 767 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 768 totalCellsLen3 = 0;// All duplicated cells. 769 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 770 regionServicesForStores.getMemStoreSize()); 771 // Only 4 unique cells left 772 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 773 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 774 775 mss = memstore.getFlushableSize(); 776 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 777 // simulate flusher 778 region.decrMemStoreSize(mss); 779 ImmutableSegment s = memstore.getSnapshot(); 780 assertEquals(4, s.getCellsCount()); 781 assertEquals(0, regionServicesForStores.getMemStoreSize()); 782 783 memstore.clearSnapshot(snapshot.getId()); 784 } 785 786 @Test 787 public void testMagicCompaction3Buckets() throws IOException { 788 789 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 790 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 791 String.valueOf(compactionType)); 792 memstore.getConfiguration() 793 .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 794 memstore.getConfiguration() 795 .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 796 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 797 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 798 799 String[] keys1 = { "A", "B", "D" }; 800 String[] keys2 = { "A" }; 801 String[] keys3 = { "A", "A", "B", "C" }; 802 String[] keys4 = { "D", "B", "B" }; 803 804 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 805 int oneCellOnCSLMHeapSize = 120; 806 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 807 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 808 assertEquals(totalHeapSize, memstore.heapSize()); 809 810 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten 811 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 812 assertEquals(1.0, 813 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 814 assertEquals(0, memstore.getSnapshot().getCellsCount()); 815 816 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 817 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 818 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 819 assertEquals(1.0, 820 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 821 assertEquals(0, memstore.getSnapshot().getCellsCount()); 822 823 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 824 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 825 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 826 assertEquals((4.0 / 8.0), 827 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 828 assertEquals(0, memstore.getSnapshot().getCellsCount()); 829 830 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 831 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 832 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 833 assertTrue(4 == numCells || 11 == numCells); 834 assertEquals(0, memstore.getSnapshot().getCellsCount()); 835 836 MemStoreSize mss = memstore.getFlushableSize(); 837 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 838 // simulate flusher 839 region.decrMemStoreSize(mss); 840 ImmutableSegment s = memstore.getSnapshot(); 841 numCells = s.getCellsCount(); 842 assertTrue(4 == numCells || 11 == numCells); 843 assertEquals(0, regionServicesForStores.getMemStoreSize()); 844 845 memstore.clearSnapshot(snapshot.getId()); 846 } 847 848 @Override 849 @Test 850 public void testScan() throws IOException { 851 scanMemStore(memstore, 6635); 852 } 853 854 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 855 byte[] fam = Bytes.toBytes("testfamily"); 856 byte[] qf = Bytes.toBytes("testqualifier"); 857 long size = hmc.getActive().getDataSize(); 858 long heapOverhead = hmc.getActive().getHeapSize(); 859 int cellsCount = hmc.getActive().getCellsCount(); 860 int totalLen = 0; 861 for (int i = 0; i < keys.length; i++) { 862 long timestamp = EnvironmentEdgeManager.currentTime(); 863 Threads.sleep(1); // to make sure each kv gets a different ts 864 byte[] row = Bytes.toBytes(keys[i]); 865 byte[] val = Bytes.toBytes(keys[i] + i); 866 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 867 totalLen += Segment.getCellLength(kv); 868 hmc.add(kv, null); 869 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 870 } 871 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 872 hmc.getActive().getHeapSize() - heapOverhead, 0, 873 hmc.getActive().getCellsCount() - cellsCount); 874 return totalLen; 875 } 876 877 // for controlling the val size when adding a new cell 878 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 879 byte[] fam = Bytes.toBytes("testfamily"); 880 byte[] qf = Bytes.toBytes("testqualifier"); 881 long size = hmc.getActive().getDataSize(); 882 long heapOverhead = hmc.getActive().getHeapSize(); 883 int cellsCount = hmc.getActive().getCellsCount(); 884 int totalLen = 0; 885 for (int i = 0; i < keys.length; i++) { 886 long timestamp = EnvironmentEdgeManager.currentTime(); 887 Threads.sleep(1); // to make sure each kv gets a different ts 888 byte[] row = Bytes.toBytes(keys[i]); 889 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 890 totalLen += Segment.getCellLength(kv); 891 hmc.add(kv, null); 892 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 893 } 894 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 895 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 896 return totalLen; 897 } 898 899 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 900 long t = 1234; 901 902 @Override 903 public long currentTime() { 904 return t; 905 } 906 } 907 908 static protected class MyCompactingMemStore extends CompactingMemStore { 909 910 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 911 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 912 throws IOException { 913 super(conf, c, store, regionServices, compactionPolicy); 914 } 915 916 void disableCompaction() { 917 allowCompaction.set(false); 918 } 919 920 void enableCompaction() { 921 allowCompaction.set(true); 922 } 923 924 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 925 throws IllegalArgumentIOException { 926 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 927 } 928 } 929}