001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.lang.management.ManagementFactory; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.KeepDeletedCells; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.KeyValueTestUtil; 043import org.apache.hadoop.hbase.MemoryCompactionPolicy; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 053import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.EnvironmentEdge; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.junit.jupiter.api.Tag; 062import org.junit.jupiter.api.Test; 063import org.mockito.Mockito; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * compacted memstore test case 069 */ 070@Tag(RegionServerTests.TAG) 071@Tag(MediumTests.TAG) 072public class TestCompactingMemStore extends TestDefaultMemStore { 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 075 protected static ChunkCreator chunkCreator; 076 protected HRegion region; 077 protected RegionServicesForStores regionServicesForStores; 078 protected HStore store; 079 private Configuration conf; 080 081 @Override 082 protected void internalTearDown() throws Exception { 083 chunkCreator.clearChunksInPool(); 084 } 085 086 @Override 087 protected void internalSetUp() throws Exception { 088 super.internalSetUp(); 089 conf = new Configuration(); 090 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 091 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 092 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 093 HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf); 094 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY); 095 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar")) 096 .setColumnFamily(familyDescriptor).build(); 097 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build(); 098 WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info); 099 this.region = 100 HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true); 101 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 102 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 103 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 104 this.store = new HStore(region, familyDescriptor, conf, false); 105 } 106 107 @Override 108 protected void createChunkCreator() { 109 long globalMemStoreLimit = 110 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 111 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 112 chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 113 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 114 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 115 assertNotNull(chunkCreator); 116 } 117 118 @Override 119 protected void createMemStore() throws IOException { 120 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), 121 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 122 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 123 } 124 125 /** 126 * A simple test which flush in memory affect timeOfOldestEdit 127 */ 128 @Test 129 public void testTimeOfOldestEdit() { 130 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 131 final byte[] r = Bytes.toBytes("r"); 132 final byte[] f = Bytes.toBytes("f"); 133 final byte[] q = Bytes.toBytes("q"); 134 final byte[] v = Bytes.toBytes("v"); 135 final KeyValue kv = new KeyValue(r, f, q, v); 136 memstore.add(kv, null); 137 long timeOfOldestEdit = memstore.timeOfOldestEdit(); 138 assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit); 139 140 ((CompactingMemStore) memstore).flushInMemory(); 141 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 142 memstore.add(kv, null); 143 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 144 memstore.snapshot(); 145 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 146 } 147 148 /** 149 * A simple test which verifies the 3 possible states when scanning across snapshot. 150 */ 151 @Override 152 @Test 153 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 154 // we are going to the scanning across snapshot with two kvs 155 // kv1 should always be returned before kv2 156 final byte[] one = Bytes.toBytes(1); 157 final byte[] two = Bytes.toBytes(2); 158 final byte[] f = Bytes.toBytes("f"); 159 final byte[] q = Bytes.toBytes("q"); 160 final byte[] v = Bytes.toBytes(3); 161 162 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 163 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 164 165 // use case 1: both kvs in kvset 166 this.memstore.add(kv1.clone(), null); 167 this.memstore.add(kv2.clone(), null); 168 // snapshot is empty,active segment is not empty, 169 // empty segment is skipped. 170 verifyOneScanAcrossSnapshot2(kv1, kv2); 171 172 // use case 2: both kvs in snapshot 173 this.memstore.snapshot(); 174 // active segment is empty,snapshot is not empty, 175 // empty segment is skipped. 176 verifyOneScanAcrossSnapshot2(kv1, kv2); 177 178 // use case 3: first in snapshot second in kvset 179 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 180 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 181 this.memstore.add(kv1.clone(), null); 182 // As compaction is starting in the background the repetition 183 // of the k1 might be removed BUT the scanners created earlier 184 // should look on the OLD MutableCellSetSegment, so this should be OK... 185 this.memstore.snapshot(); 186 this.memstore.add(kv2.clone(), null); 187 verifyScanAcrossSnapshot2(kv1, kv2); 188 } 189 190 /** 191 * Test memstore snapshots 192 */ 193 @Override 194 @Test 195 public void testSnapshotting() throws IOException { 196 final int snapshotCount = 5; 197 // Add some rows, run a snapshot. Do it a few times. 198 for (int i = 0; i < snapshotCount; i++) { 199 addRows(this.memstore); 200 runSnapshot(this.memstore, true); 201 assertEquals(0, this.memstore.getSnapshot().getCellsCount(), "History not being cleared"); 202 } 203 } 204 205 ////////////////////////////////////////////////////////////////////////////// 206 // Get tests 207 ////////////////////////////////////////////////////////////////////////////// 208 209 /** 210 * Test getNextRow from memstore 211 */ 212 @Override 213 @Test 214 public void testGetNextRow() throws Exception { 215 addRows(this.memstore); 216 // Add more versions to make it a little more interesting. 217 Thread.sleep(1); 218 addRows(this.memstore); 219 Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY); 220 assertEquals(0, CellComparator.getInstance().compareRows(closestToEmpty, 221 new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime()))); 222 for (int i = 0; i < ROW_COUNT; i++) { 223 Cell nr = ((CompactingMemStore) this.memstore) 224 .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime())); 225 if (i + 1 == ROW_COUNT) { 226 assertNull(nr); 227 } else { 228 assertEquals(0, CellComparator.getInstance().compareRows(nr, 229 new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime()))); 230 } 231 } 232 // starting from each row, validate results should contain the starting row 233 Configuration conf = HBaseConfiguration.create(); 234 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 235 ScanInfo scanInfo = 236 new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, 237 HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 238 try (InternalScanner scanner = 239 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 240 memstore.getScanners(0))) { 241 List<Cell> results = new ArrayList<>(); 242 for (int i = 0; scanner.next(results); i++) { 243 int rowId = startRowId + i; 244 Cell left = results.get(0); 245 byte[] row1 = Bytes.toBytes(rowId); 246 assertEquals(0, CellComparator.getInstance().compareRows(left, row1, 0, row1.length), 247 "Row name"); 248 assertEquals(QUALIFIER_COUNT, results.size(), "Count of columns"); 249 List<Cell> row = new ArrayList<>(); 250 for (Cell kv : results) { 251 row.add(kv); 252 } 253 isExpectedRowWithoutTimestamps(rowId, row); 254 // Clear out set. Otherwise row results accumulate. 255 results.clear(); 256 } 257 } 258 } 259 } 260 261 @Override 262 @Test 263 public void testGet_memstoreAndSnapShot() throws IOException { 264 byte[] row = Bytes.toBytes("testrow"); 265 byte[] fam = Bytes.toBytes("testfamily"); 266 byte[] qf1 = Bytes.toBytes("testqualifier1"); 267 byte[] qf2 = Bytes.toBytes("testqualifier2"); 268 byte[] qf3 = Bytes.toBytes("testqualifier3"); 269 byte[] qf4 = Bytes.toBytes("testqualifier4"); 270 byte[] qf5 = Bytes.toBytes("testqualifier5"); 271 byte[] val = Bytes.toBytes("testval"); 272 273 // Setting up memstore 274 memstore.add(new KeyValue(row, fam, qf1, val), null); 275 memstore.add(new KeyValue(row, fam, qf2, val), null); 276 memstore.add(new KeyValue(row, fam, qf3, val), null); 277 // Pushing to pipeline 278 ((CompactingMemStore) memstore).flushInMemory(); 279 assertEquals(0, memstore.getSnapshot().getCellsCount()); 280 // Creating a snapshot 281 memstore.snapshot(); 282 assertEquals(3, memstore.getSnapshot().getCellsCount()); 283 // Adding value to "new" memstore 284 assertEquals(0, memstore.getActive().getCellsCount()); 285 memstore.add(new KeyValue(row, fam, qf4, val), null); 286 memstore.add(new KeyValue(row, fam, qf5, val), null); 287 assertEquals(2, memstore.getActive().getCellsCount()); 288 } 289 290 //////////////////////////////////// 291 // Test for periodic memstore flushes 292 // based on time of oldest edit 293 //////////////////////////////////// 294 295 /** 296 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older 297 * keyvalues are deleted from the memstore. 298 */ 299 @Override 300 @Test 301 public void testUpsertMemstoreSize() throws Exception { 302 MemStoreSize oldSize = memstore.size(); 303 304 List<ExtendedCell> l = new ArrayList<>(); 305 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 306 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 307 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 308 309 kv1.setSequenceId(1); 310 kv2.setSequenceId(1); 311 kv3.setSequenceId(1); 312 l.add(kv1); 313 l.add(kv2); 314 l.add(kv3); 315 316 this.memstore.upsert(l, 2, null);// readpoint is 2 317 MemStoreSize newSize = this.memstore.size(); 318 assert (newSize.getDataSize() > oldSize.getDataSize()); 319 // The kv1 should be removed. 320 assert (memstore.getActive().getCellsCount() == 2); 321 322 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 323 kv4.setSequenceId(1); 324 l.clear(); 325 l.add(kv4); 326 this.memstore.upsert(l, 3, null); 327 assertEquals(newSize, this.memstore.size()); 328 // The kv2 should be removed. 329 assert (memstore.getActive().getCellsCount() == 2); 330 // this.memstore = null; 331 } 332 333 /** 334 * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in 335 * memstore. 336 */ 337 @Override 338 @Test 339 public void testUpdateToTimeOfOldestEdit() throws Exception { 340 try { 341 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 342 EnvironmentEdgeManager.injectEdge(edge); 343 long t = memstore.timeOfOldestEdit(); 344 assertEquals(Long.MAX_VALUE, t); 345 346 // test the case that the timeOfOldestEdit is updated after a KV add 347 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 348 t = memstore.timeOfOldestEdit(); 349 assertEquals(1234, t); 350 // The method will also assert 351 // the value is reset to Long.MAX_VALUE 352 t = runSnapshot(memstore, true); 353 354 // test the case that the timeOfOldestEdit is updated after a KV delete 355 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 356 t = memstore.timeOfOldestEdit(); 357 assertEquals(1234, t); 358 t = runSnapshot(memstore, true); 359 360 // test the case that the timeOfOldestEdit is updated after a KV upsert 361 List<ExtendedCell> l = new ArrayList<>(); 362 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 363 kv1.setSequenceId(100); 364 l.add(kv1); 365 memstore.upsert(l, 1000, null); 366 t = memstore.timeOfOldestEdit(); 367 assertEquals(1234, t); 368 } finally { 369 EnvironmentEdgeManager.reset(); 370 } 371 } 372 373 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { 374 // Save off old state. 375 long oldHistorySize = hmc.getSnapshot().getDataSize(); 376 long prevTimeStamp = hmc.timeOfOldestEdit(); 377 378 hmc.snapshot(); 379 MemStoreSnapshot snapshot = hmc.snapshot(); 380 if (useForce) { 381 // Make some assertions about what just happened. 382 assertTrue(oldHistorySize < snapshot.getDataSize(), "History size has not increased"); 383 long t = hmc.timeOfOldestEdit(); 384 assertEquals(Long.MAX_VALUE, t, "Time of oldest edit is not Long.MAX_VALUE"); 385 hmc.clearSnapshot(snapshot.getId()); 386 } else { 387 long t = hmc.timeOfOldestEdit(); 388 assertEquals(t, prevTimeStamp, "Time of oldest edit didn't remain the same"); 389 } 390 return prevTimeStamp; 391 } 392 393 private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) { 394 int i = 0; 395 for (Cell kv : kvs) { 396 byte[] expectedColname = makeQualifier(rowIndex, i++); 397 assertTrue(CellUtil.matchingQualifier(kv, expectedColname), "Column name"); 398 // Value is column name as bytes. Usually result is 399 // 100 bytes in size at least. This is the default size 400 // for BytesWriteable. For comparison, convert bytes to 401 // String and trim to remove trailing null bytes. 402 assertTrue(CellUtil.matchingValue(kv, expectedColname), "Content"); 403 } 404 } 405 406 @Test 407 public void testPuttingBackChunksAfterFlushing() throws IOException { 408 byte[] row = Bytes.toBytes("testrow"); 409 byte[] fam = Bytes.toBytes("testfamily"); 410 byte[] qf1 = Bytes.toBytes("testqualifier1"); 411 byte[] qf2 = Bytes.toBytes("testqualifier2"); 412 byte[] qf3 = Bytes.toBytes("testqualifier3"); 413 byte[] qf4 = Bytes.toBytes("testqualifier4"); 414 byte[] qf5 = Bytes.toBytes("testqualifier5"); 415 byte[] val = Bytes.toBytes("testval"); 416 417 // Setting up memstore 418 memstore.add(new KeyValue(row, fam, qf1, val), null); 419 memstore.add(new KeyValue(row, fam, qf2, val), null); 420 memstore.add(new KeyValue(row, fam, qf3, val), null); 421 422 // Creating a snapshot 423 MemStoreSnapshot snapshot = memstore.snapshot(); 424 assertEquals(3, memstore.getSnapshot().getCellsCount()); 425 426 // Adding value to "new" memstore 427 assertEquals(0, memstore.getActive().getCellsCount()); 428 memstore.add(new KeyValue(row, fam, qf4, val), null); 429 memstore.add(new KeyValue(row, fam, qf5, val), null); 430 assertEquals(2, memstore.getActive().getCellsCount()); 431 // close the scanners 432 for (KeyValueScanner scanner : snapshot.getScanners()) { 433 scanner.close(); 434 } 435 memstore.clearSnapshot(snapshot.getId()); 436 437 int chunkCount = chunkCreator.getPoolSize(); 438 assertTrue(chunkCount > 0); 439 440 } 441 442 @Test 443 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 444 byte[] row = Bytes.toBytes("testrow"); 445 byte[] fam = Bytes.toBytes("testfamily"); 446 byte[] qf1 = Bytes.toBytes("testqualifier1"); 447 byte[] qf2 = Bytes.toBytes("testqualifier2"); 448 byte[] qf3 = Bytes.toBytes("testqualifier3"); 449 byte[] qf4 = Bytes.toBytes("testqualifier4"); 450 byte[] qf5 = Bytes.toBytes("testqualifier5"); 451 byte[] qf6 = Bytes.toBytes("testqualifier6"); 452 byte[] qf7 = Bytes.toBytes("testqualifier7"); 453 byte[] val = Bytes.toBytes("testval"); 454 455 // Setting up memstore 456 memstore.add(new KeyValue(row, fam, qf1, val), null); 457 memstore.add(new KeyValue(row, fam, qf2, val), null); 458 memstore.add(new KeyValue(row, fam, qf3, val), null); 459 460 // Creating a snapshot 461 MemStoreSnapshot snapshot = memstore.snapshot(); 462 assertEquals(3, memstore.getSnapshot().getCellsCount()); 463 464 // Adding value to "new" memstore 465 assertEquals(0, memstore.getActive().getCellsCount()); 466 memstore.add(new KeyValue(row, fam, qf4, val), null); 467 memstore.add(new KeyValue(row, fam, qf5, val), null); 468 assertEquals(2, memstore.getActive().getCellsCount()); 469 470 // opening scanner before clear the snapshot 471 List<KeyValueScanner> scanners = memstore.getScanners(0); 472 // Shouldn't putting back the chunks to pool,since some scanners are opening 473 // based on their data 474 // close the scanners 475 for (KeyValueScanner scanner : snapshot.getScanners()) { 476 scanner.close(); 477 } 478 memstore.clearSnapshot(snapshot.getId()); 479 480 assertEquals(0, chunkCreator.getPoolSize()); 481 482 // Chunks will be put back to pool after close scanners; 483 for (KeyValueScanner scanner : scanners) { 484 scanner.close(); 485 } 486 assertTrue(chunkCreator.getPoolSize() > 0); 487 488 // clear chunks 489 chunkCreator.clearChunksInPool(); 490 491 // Creating another snapshot 492 493 snapshot = memstore.snapshot(); 494 // Adding more value 495 memstore.add(new KeyValue(row, fam, qf6, val), null); 496 memstore.add(new KeyValue(row, fam, qf7, val), null); 497 // opening scanners 498 scanners = memstore.getScanners(0); 499 // close scanners before clear the snapshot 500 for (KeyValueScanner scanner : scanners) { 501 scanner.close(); 502 } 503 // Since no opening scanner, the chunks of snapshot should be put back to 504 // pool 505 // close the scanners 506 for (KeyValueScanner scanner : snapshot.getScanners()) { 507 scanner.close(); 508 } 509 memstore.clearSnapshot(snapshot.getId()); 510 assertTrue(chunkCreator.getPoolSize() > 0); 511 } 512 513 @Test 514 public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { 515 516 // set memstore to do data compaction and not to use the speculative scan 517 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 518 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 519 String.valueOf(compactionType)); 520 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 521 522 byte[] row = Bytes.toBytes("testrow"); 523 byte[] fam = Bytes.toBytes("testfamily"); 524 byte[] qf1 = Bytes.toBytes("testqualifier1"); 525 byte[] qf2 = Bytes.toBytes("testqualifier2"); 526 byte[] qf3 = Bytes.toBytes("testqualifier3"); 527 byte[] val = Bytes.toBytes("testval"); 528 529 // Setting up memstore 530 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 531 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 532 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 533 534 // Creating a pipeline 535 ((MyCompactingMemStore) memstore).disableCompaction(); 536 ((CompactingMemStore) memstore).flushInMemory(); 537 538 // Adding value to "new" memstore 539 assertEquals(0, memstore.getActive().getCellsCount()); 540 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 541 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 542 assertEquals(2, memstore.getActive().getCellsCount()); 543 544 // pipeline bucket 2 545 ((CompactingMemStore) memstore).flushInMemory(); 546 // opening scanner before force flushing 547 List<KeyValueScanner> scanners = memstore.getScanners(0); 548 // Shouldn't putting back the chunks to pool,since some scanners are opening 549 // based on their data 550 ((MyCompactingMemStore) memstore).enableCompaction(); 551 // trigger compaction 552 ((CompactingMemStore) memstore).flushInMemory(); 553 554 // Adding value to "new" memstore 555 assertEquals(0, memstore.getActive().getCellsCount()); 556 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 557 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 558 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 559 assertEquals(3, memstore.getActive().getCellsCount()); 560 561 assertEquals(0, chunkCreator.getPoolSize()); 562 563 // Chunks will be put back to pool after close scanners; 564 for (KeyValueScanner scanner : scanners) { 565 scanner.close(); 566 } 567 assertTrue(chunkCreator.getPoolSize() > 0); 568 569 // clear chunks 570 chunkCreator.clearChunksInPool(); 571 572 // Creating another snapshot 573 574 MemStoreSnapshot snapshot = memstore.snapshot(); 575 // close the scanners 576 for (KeyValueScanner scanner : snapshot.getScanners()) { 577 scanner.close(); 578 } 579 memstore.clearSnapshot(snapshot.getId()); 580 581 snapshot = memstore.snapshot(); 582 // Adding more value 583 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 584 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 585 // opening scanners 586 scanners = memstore.getScanners(0); 587 // close scanners before clear the snapshot 588 for (KeyValueScanner scanner : scanners) { 589 scanner.close(); 590 } 591 // Since no opening scanner, the chunks of snapshot should be put back to 592 // pool 593 // close the scanners 594 for (KeyValueScanner scanner : snapshot.getScanners()) { 595 scanner.close(); 596 } 597 memstore.clearSnapshot(snapshot.getId()); 598 assertTrue(chunkCreator.getPoolSize() > 0); 599 } 600 601 ////////////////////////////////////////////////////////////////////////////// 602 // Compaction tests 603 ////////////////////////////////////////////////////////////////////////////// 604 @Test 605 public void testCompaction1Bucket() throws IOException { 606 607 // set memstore to do basic structure flattening, the "eager" option is tested in 608 // TestCompactingToCellFlatMapMemStore 609 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 610 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 611 String.valueOf(compactionType)); 612 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 613 614 String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4 615 616 // test 1 bucket 617 int totalCellsLen = addRowsByKeys(memstore, keys1); 618 int oneCellOnCSLMHeapSize = 120; 619 int oneCellOnCAHeapSize = 88; 620 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 621 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 622 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 623 624 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 625 assertEquals(0, memstore.getSnapshot().getCellsCount()); 626 // There is no compaction, as the compacting memstore type is basic. 627 // totalCellsLen remains the same 628 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 629 + 4 * oneCellOnCAHeapSize; 630 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 631 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 632 633 MemStoreSize mss = memstore.getFlushableSize(); 634 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 635 // simulate flusher 636 region.decrMemStoreSize(mss); 637 ImmutableSegment s = memstore.getSnapshot(); 638 assertEquals(4, s.getCellsCount()); 639 assertEquals(0, regionServicesForStores.getMemStoreSize()); 640 641 memstore.clearSnapshot(snapshot.getId()); 642 } 643 644 @Test 645 public void testCompaction2Buckets() throws IOException { 646 647 // set memstore to do basic structure flattening, the "eager" option is tested in 648 // TestCompactingToCellFlatMapMemStore 649 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 650 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 651 String.valueOf(compactionType)); 652 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 653 String.valueOf(1)); 654 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 655 String[] keys1 = { "A", "A", "B", "C" }; 656 String[] keys2 = { "A", "B", "D" }; 657 658 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 659 int oneCellOnCSLMHeapSize = 120; 660 int oneCellOnCAHeapSize = 88; 661 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 662 663 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 664 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 665 666 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 667 int counter = 0; 668 for (Segment s : memstore.getSegments()) { 669 counter += s.getCellsCount(); 670 } 671 assertEquals(4, counter); 672 assertEquals(0, memstore.getSnapshot().getCellsCount()); 673 // There is no compaction, as the compacting memstore type is basic. 674 // totalCellsLen remains the same 675 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 676 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 677 + 4 * oneCellOnCAHeapSize; 678 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 679 680 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 681 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 682 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 683 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 684 685 MemStoreSize mss = memstore.getFlushableSize(); 686 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 687 assertEquals(0, memstore.getSnapshot().getCellsCount()); 688 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 689 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 690 + 7 * oneCellOnCAHeapSize; 691 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 692 693 mss = memstore.getFlushableSize(); 694 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 695 // simulate flusher 696 region.decrMemStoreSize(mss); 697 ImmutableSegment s = memstore.getSnapshot(); 698 assertEquals(7, s.getCellsCount()); 699 assertEquals(0, regionServicesForStores.getMemStoreSize()); 700 701 memstore.clearSnapshot(snapshot.getId()); 702 } 703 704 @Test 705 public void testCompaction3Buckets() throws IOException { 706 707 // set memstore to do data compaction and not to use the speculative scan 708 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 709 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 710 String.valueOf(compactionType)); 711 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 712 String[] keys1 = { "A", "A", "B", "C" }; 713 String[] keys2 = { "A", "B", "D" }; 714 String[] keys3 = { "D", "B", "B" }; 715 716 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 717 int oneCellOnCSLMHeapSize = 120; 718 int oneCellOnCAHeapSize = 88; 719 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 720 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 721 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 722 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 723 724 assertEquals(0, memstore.getSnapshot().getCellsCount()); 725 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 726 // totalCellsLen 727 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 728 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 729 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 730 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 731 + 3 * oneCellOnCAHeapSize; 732 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 733 734 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 735 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 736 737 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 738 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 739 740 ((MyCompactingMemStore) memstore).disableCompaction(); 741 MemStoreSize mss = memstore.getFlushableSize(); 742 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 743 assertEquals(0, memstore.getSnapshot().getCellsCount()); 744 // No change in the cells data size. ie. memstore size. as there is no compaction. 745 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 746 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 747 ((CompactingMemStore) memstore).heapSize()); 748 749 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 750 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 751 regionServicesForStores.getMemStoreSize()); 752 long totalHeapSize3 = 753 totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize; 754 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 755 756 ((MyCompactingMemStore) memstore).enableCompaction(); 757 mss = memstore.getFlushableSize(); 758 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 759 assertEquals(0, memstore.getSnapshot().getCellsCount()); 760 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 761 // Out of total 10, only 4 cells are unique 762 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 763 totalCellsLen3 = 0;// All duplicated cells. 764 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 765 regionServicesForStores.getMemStoreSize()); 766 // Only 4 unique cells left 767 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 768 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 769 770 mss = memstore.getFlushableSize(); 771 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 772 // simulate flusher 773 region.decrMemStoreSize(mss); 774 ImmutableSegment s = memstore.getSnapshot(); 775 assertEquals(4, s.getCellsCount()); 776 assertEquals(0, regionServicesForStores.getMemStoreSize()); 777 778 memstore.clearSnapshot(snapshot.getId()); 779 } 780 781 @Test 782 public void testMagicCompaction3Buckets() throws IOException { 783 784 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 785 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 786 String.valueOf(compactionType)); 787 memstore.getConfiguration() 788 .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 789 memstore.getConfiguration() 790 .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 791 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 792 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 793 794 String[] keys1 = { "A", "B", "D" }; 795 String[] keys2 = { "A" }; 796 String[] keys3 = { "A", "A", "B", "C" }; 797 String[] keys4 = { "D", "B", "B" }; 798 799 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 800 int oneCellOnCSLMHeapSize = 120; 801 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 802 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 803 assertEquals(totalHeapSize, memstore.heapSize()); 804 805 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten 806 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 807 assertEquals(1.0, 808 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 809 assertEquals(0, memstore.getSnapshot().getCellsCount()); 810 811 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 812 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 813 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 814 assertEquals(1.0, 815 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 816 assertEquals(0, memstore.getSnapshot().getCellsCount()); 817 818 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 819 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 820 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 821 assertEquals((4.0 / 8.0), 822 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 823 assertEquals(0, memstore.getSnapshot().getCellsCount()); 824 825 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 826 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 827 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 828 assertTrue(4 == numCells || 11 == numCells); 829 assertEquals(0, memstore.getSnapshot().getCellsCount()); 830 831 MemStoreSize mss = memstore.getFlushableSize(); 832 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 833 // simulate flusher 834 region.decrMemStoreSize(mss); 835 ImmutableSegment s = memstore.getSnapshot(); 836 numCells = s.getCellsCount(); 837 assertTrue(4 == numCells || 11 == numCells); 838 assertEquals(0, regionServicesForStores.getMemStoreSize()); 839 840 memstore.clearSnapshot(snapshot.getId()); 841 } 842 843 @Override 844 @Test 845 public void testScan() throws IOException { 846 scanMemStore(memstore, 6635); 847 } 848 849 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 850 byte[] fam = Bytes.toBytes("testfamily"); 851 byte[] qf = Bytes.toBytes("testqualifier"); 852 long size = hmc.getActive().getDataSize(); 853 long heapOverhead = hmc.getActive().getHeapSize(); 854 int cellsCount = hmc.getActive().getCellsCount(); 855 int totalLen = 0; 856 for (int i = 0; i < keys.length; i++) { 857 long timestamp = EnvironmentEdgeManager.currentTime(); 858 Threads.sleep(1); // to make sure each kv gets a different ts 859 byte[] row = Bytes.toBytes(keys[i]); 860 byte[] val = Bytes.toBytes(keys[i] + i); 861 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 862 totalLen += Segment.getCellLength(kv); 863 hmc.add(kv, null); 864 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 865 } 866 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 867 hmc.getActive().getHeapSize() - heapOverhead, 0, 868 hmc.getActive().getCellsCount() - cellsCount); 869 return totalLen; 870 } 871 872 // for controlling the val size when adding a new cell 873 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 874 byte[] fam = Bytes.toBytes("testfamily"); 875 byte[] qf = Bytes.toBytes("testqualifier"); 876 long size = hmc.getActive().getDataSize(); 877 long heapOverhead = hmc.getActive().getHeapSize(); 878 int cellsCount = hmc.getActive().getCellsCount(); 879 int totalLen = 0; 880 for (int i = 0; i < keys.length; i++) { 881 long timestamp = EnvironmentEdgeManager.currentTime(); 882 Threads.sleep(1); // to make sure each kv gets a different ts 883 byte[] row = Bytes.toBytes(keys[i]); 884 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 885 totalLen += Segment.getCellLength(kv); 886 hmc.add(kv, null); 887 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 888 } 889 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 890 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 891 return totalLen; 892 } 893 894 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 895 long t = 1234; 896 897 @Override 898 public long currentTime() { 899 return t; 900 } 901 } 902 903 static protected class MyCompactingMemStore extends CompactingMemStore { 904 905 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 906 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 907 throws IOException { 908 super(conf, c, store, regionServices, compactionPolicy); 909 } 910 911 void disableCompaction() { 912 allowCompaction.set(false); 913 } 914 915 void enableCompaction() { 916 allowCompaction.set(true); 917 } 918 919 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 920 throws IllegalArgumentIOException { 921 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 922 } 923 } 924}