001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.lang.management.ManagementFactory;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.Executors;
029import java.util.concurrent.ThreadPoolExecutor;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparator;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionInfo;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeepDeletedCells;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValueTestUtil;
045import org.apache.hadoop.hbase.MemoryCompactionPolicy;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
049import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdge;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hbase.wal.WAL;
057import org.junit.After;
058import org.junit.Before;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.mockito.Mockito;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * compacted memstore test case
068 */
069@Category({RegionServerTests.class, MediumTests.class})
070public class TestCompactingMemStore extends TestDefaultMemStore {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestCompactingMemStore.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
077  protected static ChunkCreator chunkCreator;
078  protected HRegion region;
079  protected RegionServicesForStores regionServicesForStores;
080  protected HStore store;
081
082  //////////////////////////////////////////////////////////////////////////////
083  // Helpers
084  //////////////////////////////////////////////////////////////////////////////
085  protected static byte[] makeQualifier(final int i1, final int i2) {
086    return Bytes.toBytes(Integer.toString(i1) + ";" +
087        Integer.toString(i2));
088  }
089
090  @After
091  public void tearDown() throws Exception {
092    chunkCreator.clearChunksInPool();
093  }
094
095  @Override
096  @Before
097  public void setUp() throws Exception {
098    compactingSetUp();
099    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparator.getInstance(),
100        store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
101    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
102  }
103
104  protected void compactingSetUp() throws Exception {
105    super.internalSetUp();
106    Configuration conf = new Configuration();
107    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
108    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
109    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
110    HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
111    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
112    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
113    htd.addFamily(hcd);
114    HRegionInfo info =
115        new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
116    WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
117    this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
118    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
119    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
120    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
121    this.store = new HStore(region, hcd, conf, false);
122
123    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
124        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
125    chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
126        globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
127            null);
128    assertTrue(chunkCreator != null);
129  }
130
131  /**
132   * A simple test which verifies the 3 possible states when scanning across snapshot.
133   *
134   * @throws IOException
135   * @throws CloneNotSupportedException
136   */
137  @Override
138  @Test
139  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
140    // we are going to the scanning across snapshot with two kvs
141    // kv1 should always be returned before kv2
142    final byte[] one = Bytes.toBytes(1);
143    final byte[] two = Bytes.toBytes(2);
144    final byte[] f = Bytes.toBytes("f");
145    final byte[] q = Bytes.toBytes("q");
146    final byte[] v = Bytes.toBytes(3);
147
148    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
149    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
150
151    // use case 1: both kvs in kvset
152    this.memstore.add(kv1.clone(), null);
153    this.memstore.add(kv2.clone(), null);
154    verifyScanAcrossSnapshot2(kv1, kv2);
155
156    // use case 2: both kvs in snapshot
157    this.memstore.snapshot();
158    verifyScanAcrossSnapshot2(kv1, kv2);
159
160    // use case 3: first in snapshot second in kvset
161    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
162        CellComparator.getInstance(), store, regionServicesForStores,
163        MemoryCompactionPolicy.EAGER);
164    this.memstore.add(kv1.clone(), null);
165    // As compaction is starting in the background the repetition
166    // of the k1 might be removed BUT the scanners created earlier
167    // should look on the OLD MutableCellSetSegment, so this should be OK...
168    this.memstore.snapshot();
169    this.memstore.add(kv2.clone(), null);
170    verifyScanAcrossSnapshot2(kv1,kv2);
171  }
172
173  /**
174   * Test memstore snapshots
175   * @throws IOException
176   */
177  @Override
178  @Test
179  public void testSnapshotting() throws IOException {
180    final int snapshotCount = 5;
181    // Add some rows, run a snapshot. Do it a few times.
182    for (int i = 0; i < snapshotCount; i++) {
183      addRows(this.memstore);
184      runSnapshot(this.memstore, true);
185      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
186    }
187  }
188
189
190  //////////////////////////////////////////////////////////////////////////////
191  // Get tests
192  //////////////////////////////////////////////////////////////////////////////
193
194  /** Test getNextRow from memstore
195   * @throws InterruptedException
196   */
197  @Override
198  @Test
199  public void testGetNextRow() throws Exception {
200    addRows(this.memstore);
201    // Add more versions to make it a little more interesting.
202    Thread.sleep(1);
203    addRows(this.memstore);
204    Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
205    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
206        new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
207    for (int i = 0; i < ROW_COUNT; i++) {
208      Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
209          System.currentTimeMillis()));
210      if (i + 1 == ROW_COUNT) {
211        assertNull(nr);
212      } else {
213        assertTrue(CellComparator.getInstance().compareRows(nr,
214            new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
215      }
216    }
217    //starting from each row, validate results should contain the starting row
218    Configuration conf = HBaseConfiguration.create();
219    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
220      ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
221          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
222      try (InternalScanner scanner =
223          new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
224              memstore.getScanners(0))) {
225        List<Cell> results = new ArrayList<>();
226        for (int i = 0; scanner.next(results); i++) {
227          int rowId = startRowId + i;
228          Cell left = results.get(0);
229          byte[] row1 = Bytes.toBytes(rowId);
230          assertTrue("Row name",
231            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
232          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
233          List<Cell> row = new ArrayList<>();
234          for (Cell kv : results) {
235            row.add(kv);
236          }
237          isExpectedRowWithoutTimestamps(rowId, row);
238          // Clear out set. Otherwise row results accumulate.
239          results.clear();
240        }
241      }
242    }
243  }
244
245  @Override
246  @Test
247  public void testGet_memstoreAndSnapShot() throws IOException {
248    byte[] row = Bytes.toBytes("testrow");
249    byte[] fam = Bytes.toBytes("testfamily");
250    byte[] qf1 = Bytes.toBytes("testqualifier1");
251    byte[] qf2 = Bytes.toBytes("testqualifier2");
252    byte[] qf3 = Bytes.toBytes("testqualifier3");
253    byte[] qf4 = Bytes.toBytes("testqualifier4");
254    byte[] qf5 = Bytes.toBytes("testqualifier5");
255    byte[] val = Bytes.toBytes("testval");
256
257    //Setting up memstore
258    memstore.add(new KeyValue(row, fam, qf1, val), null);
259    memstore.add(new KeyValue(row, fam, qf2, val), null);
260    memstore.add(new KeyValue(row, fam, qf3, val), null);
261    //Pushing to pipeline
262    ((CompactingMemStore)memstore).flushInMemory();
263    assertEquals(0, memstore.getSnapshot().getCellsCount());
264    //Creating a snapshot
265    memstore.snapshot();
266    assertEquals(3, memstore.getSnapshot().getCellsCount());
267    //Adding value to "new" memstore
268    assertEquals(0, memstore.getActive().getCellsCount());
269    memstore.add(new KeyValue(row, fam, qf4, val), null);
270    memstore.add(new KeyValue(row, fam, qf5, val), null);
271    assertEquals(2, memstore.getActive().getCellsCount());
272  }
273
274  ////////////////////////////////////
275  // Test for periodic memstore flushes
276  // based on time of oldest edit
277  ////////////////////////////////////
278
279  /**
280   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased
281   * as older keyvalues are deleted from the memstore.
282   *
283   * @throws Exception
284   */
285  @Override
286  @Test
287  public void testUpsertMemstoreSize() throws Exception {
288    MemStoreSize oldSize = memstore.size();
289
290    List<Cell> l = new ArrayList<>();
291    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
292    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
293    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
294
295    kv1.setSequenceId(1);
296    kv2.setSequenceId(1);
297    kv3.setSequenceId(1);
298    l.add(kv1);
299    l.add(kv2);
300    l.add(kv3);
301
302    this.memstore.upsert(l, 2, null);// readpoint is 2
303    MemStoreSize newSize = this.memstore.size();
304    assert (newSize.getDataSize() > oldSize.getDataSize());
305    //The kv1 should be removed.
306    assert (memstore.getActive().getCellsCount() == 2);
307
308    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
309    kv4.setSequenceId(1);
310    l.clear();
311    l.add(kv4);
312    this.memstore.upsert(l, 3, null);
313    assertEquals(newSize, this.memstore.size());
314    //The kv2 should be removed.
315    assert (memstore.getActive().getCellsCount() == 2);
316    //this.memstore = null;
317  }
318
319  /**
320   * Tests that the timeOfOldestEdit is updated correctly for the
321   * various edit operations in memstore.
322   * @throws Exception
323   */
324  @Override
325  @Test
326  public void testUpdateToTimeOfOldestEdit() throws Exception {
327    try {
328      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
329      EnvironmentEdgeManager.injectEdge(edge);
330      long t = memstore.timeOfOldestEdit();
331      assertEquals(Long.MAX_VALUE, t);
332
333      // test the case that the timeOfOldestEdit is updated after a KV add
334      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
335      t = memstore.timeOfOldestEdit();
336      assertTrue(t == 1234);
337      // The method will also assert
338      // the value is reset to Long.MAX_VALUE
339      t = runSnapshot(memstore, true);
340
341      // test the case that the timeOfOldestEdit is updated after a KV delete
342      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
343      t = memstore.timeOfOldestEdit();
344      assertTrue(t == 1234);
345     t = runSnapshot(memstore, true);
346
347      // test the case that the timeOfOldestEdit is updated after a KV upsert
348      List<Cell> l = new ArrayList<>();
349      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
350      kv1.setSequenceId(100);
351      l.add(kv1);
352      memstore.upsert(l, 1000, null);
353      t = memstore.timeOfOldestEdit();
354      assertTrue(t == 1234);
355    } finally {
356      EnvironmentEdgeManager.reset();
357    }
358  }
359
360  private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
361      throws IOException {
362    // Save off old state.
363    long oldHistorySize = hmc.getSnapshot().getDataSize();
364    long prevTimeStamp = hmc.timeOfOldestEdit();
365
366    hmc.snapshot();
367    MemStoreSnapshot snapshot = hmc.snapshot();
368    if (useForce) {
369      // Make some assertions about what just happened.
370      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
371      long t = hmc.timeOfOldestEdit();
372      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
373      hmc.clearSnapshot(snapshot.getId());
374    } else {
375      long t = hmc.timeOfOldestEdit();
376      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
377    }
378    return prevTimeStamp;
379  }
380
381  private void isExpectedRowWithoutTimestamps(final int rowIndex,
382      List<Cell> kvs) {
383    int i = 0;
384    for (Cell kv : kvs) {
385      byte[] expectedColname = makeQualifier(rowIndex, i++);
386      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
387      // Value is column name as bytes.  Usually result is
388      // 100 bytes in size at least. This is the default size
389      // for BytesWriteable.  For comparison, convert bytes to
390      // String and trim to remove trailing null bytes.
391      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
392    }
393  }
394
395  @Test
396  public void testPuttingBackChunksAfterFlushing() throws IOException {
397    byte[] row = Bytes.toBytes("testrow");
398    byte[] fam = Bytes.toBytes("testfamily");
399    byte[] qf1 = Bytes.toBytes("testqualifier1");
400    byte[] qf2 = Bytes.toBytes("testqualifier2");
401    byte[] qf3 = Bytes.toBytes("testqualifier3");
402    byte[] qf4 = Bytes.toBytes("testqualifier4");
403    byte[] qf5 = Bytes.toBytes("testqualifier5");
404    byte[] val = Bytes.toBytes("testval");
405
406    // Setting up memstore
407    memstore.add(new KeyValue(row, fam, qf1, val), null);
408    memstore.add(new KeyValue(row, fam, qf2, val), null);
409    memstore.add(new KeyValue(row, fam, qf3, val), null);
410
411    // Creating a snapshot
412    MemStoreSnapshot snapshot = memstore.snapshot();
413    assertEquals(3, memstore.getSnapshot().getCellsCount());
414
415    // Adding value to "new" memstore
416    assertEquals(0, memstore.getActive().getCellsCount());
417    memstore.add(new KeyValue(row, fam, qf4, val), null);
418    memstore.add(new KeyValue(row, fam, qf5, val), null);
419    assertEquals(2, memstore.getActive().getCellsCount());
420    // close the scanners
421    for(KeyValueScanner scanner : snapshot.getScanners()) {
422      scanner.close();
423    }
424    memstore.clearSnapshot(snapshot.getId());
425
426    int chunkCount = chunkCreator.getPoolSize();
427    assertTrue(chunkCount > 0);
428
429  }
430
431  @Test
432  public void testPuttingBackChunksWithOpeningScanner()
433      throws IOException {
434    byte[] row = Bytes.toBytes("testrow");
435    byte[] fam = Bytes.toBytes("testfamily");
436    byte[] qf1 = Bytes.toBytes("testqualifier1");
437    byte[] qf2 = Bytes.toBytes("testqualifier2");
438    byte[] qf3 = Bytes.toBytes("testqualifier3");
439    byte[] qf4 = Bytes.toBytes("testqualifier4");
440    byte[] qf5 = Bytes.toBytes("testqualifier5");
441    byte[] qf6 = Bytes.toBytes("testqualifier6");
442    byte[] qf7 = Bytes.toBytes("testqualifier7");
443    byte[] val = Bytes.toBytes("testval");
444
445    // Setting up memstore
446    memstore.add(new KeyValue(row, fam, qf1, val), null);
447    memstore.add(new KeyValue(row, fam, qf2, val), null);
448    memstore.add(new KeyValue(row, fam, qf3, val), null);
449
450    // Creating a snapshot
451    MemStoreSnapshot snapshot = memstore.snapshot();
452    assertEquals(3, memstore.getSnapshot().getCellsCount());
453
454    // Adding value to "new" memstore
455    assertEquals(0, memstore.getActive().getCellsCount());
456    memstore.add(new KeyValue(row, fam, qf4, val), null);
457    memstore.add(new KeyValue(row, fam, qf5, val), null);
458    assertEquals(2, memstore.getActive().getCellsCount());
459
460    // opening scanner before clear the snapshot
461    List<KeyValueScanner> scanners = memstore.getScanners(0);
462    // Shouldn't putting back the chunks to pool,since some scanners are opening
463    // based on their data
464    // close the scanners
465    for(KeyValueScanner scanner : snapshot.getScanners()) {
466      scanner.close();
467    }
468    memstore.clearSnapshot(snapshot.getId());
469
470    assertTrue(chunkCreator.getPoolSize() == 0);
471
472    // Chunks will be put back to pool after close scanners;
473    for (KeyValueScanner scanner : scanners) {
474      scanner.close();
475    }
476    assertTrue(chunkCreator.getPoolSize() > 0);
477
478    // clear chunks
479    chunkCreator.clearChunksInPool();
480
481    // Creating another snapshot
482
483    snapshot = memstore.snapshot();
484    // Adding more value
485    memstore.add(new KeyValue(row, fam, qf6, val), null);
486    memstore.add(new KeyValue(row, fam, qf7, val), null);
487    // opening scanners
488    scanners = memstore.getScanners(0);
489    // close scanners before clear the snapshot
490    for (KeyValueScanner scanner : scanners) {
491      scanner.close();
492    }
493    // Since no opening scanner, the chunks of snapshot should be put back to
494    // pool
495    // close the scanners
496    for(KeyValueScanner scanner : snapshot.getScanners()) {
497      scanner.close();
498    }
499    memstore.clearSnapshot(snapshot.getId());
500    assertTrue(chunkCreator.getPoolSize() > 0);
501  }
502
503  @Test
504  public void testPuttingBackChunksWithOpeningPipelineScanner()
505      throws IOException {
506
507    // set memstore to do data compaction and not to use the speculative scan
508    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
509    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
510        String.valueOf(compactionType));
511    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
512
513    byte[] row = Bytes.toBytes("testrow");
514    byte[] fam = Bytes.toBytes("testfamily");
515    byte[] qf1 = Bytes.toBytes("testqualifier1");
516    byte[] qf2 = Bytes.toBytes("testqualifier2");
517    byte[] qf3 = Bytes.toBytes("testqualifier3");
518    byte[] val = Bytes.toBytes("testval");
519
520    // Setting up memstore
521    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
522    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
523    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
524
525    // Creating a pipeline
526    ((MyCompactingMemStore)memstore).disableCompaction();
527    ((CompactingMemStore)memstore).flushInMemory();
528
529    // Adding value to "new" memstore
530    assertEquals(0, memstore.getActive().getCellsCount());
531    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
532    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
533    assertEquals(2, memstore.getActive().getCellsCount());
534
535    // pipeline bucket 2
536    ((CompactingMemStore)memstore).flushInMemory();
537    // opening scanner before force flushing
538    List<KeyValueScanner> scanners = memstore.getScanners(0);
539    // Shouldn't putting back the chunks to pool,since some scanners are opening
540    // based on their data
541    ((MyCompactingMemStore)memstore).enableCompaction();
542    // trigger compaction
543    ((CompactingMemStore)memstore).flushInMemory();
544
545    // Adding value to "new" memstore
546    assertEquals(0, memstore.getActive().getCellsCount());
547    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
548    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
549    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
550    assertEquals(3, memstore.getActive().getCellsCount());
551
552    assertTrue(chunkCreator.getPoolSize() == 0);
553
554    // Chunks will be put back to pool after close scanners;
555    for (KeyValueScanner scanner : scanners) {
556      scanner.close();
557    }
558    assertTrue(chunkCreator.getPoolSize() > 0);
559
560    // clear chunks
561    chunkCreator.clearChunksInPool();
562
563    // Creating another snapshot
564
565    MemStoreSnapshot snapshot = memstore.snapshot();
566    // close the scanners
567    for(KeyValueScanner scanner : snapshot.getScanners()) {
568      scanner.close();
569    }
570    memstore.clearSnapshot(snapshot.getId());
571
572    snapshot = memstore.snapshot();
573    // Adding more value
574    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
575    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
576    // opening scanners
577    scanners = memstore.getScanners(0);
578    // close scanners before clear the snapshot
579    for (KeyValueScanner scanner : scanners) {
580      scanner.close();
581    }
582    // Since no opening scanner, the chunks of snapshot should be put back to
583    // pool
584    // close the scanners
585    for(KeyValueScanner scanner : snapshot.getScanners()) {
586      scanner.close();
587    }
588    memstore.clearSnapshot(snapshot.getId());
589    assertTrue(chunkCreator.getPoolSize() > 0);
590  }
591
592  //////////////////////////////////////////////////////////////////////////////
593  // Compaction tests
594  //////////////////////////////////////////////////////////////////////////////
595  @Test
596  public void testCompaction1Bucket() throws IOException {
597
598    // set memstore to do basic structure flattening, the "eager" option is tested in
599    // TestCompactingToCellFlatMapMemStore
600    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
601    memstore.getConfiguration()
602        .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
603    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
604
605    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
606
607    // test 1 bucket
608    int totalCellsLen = addRowsByKeys(memstore, keys1);
609    int oneCellOnCSLMHeapSize = 120;
610    int oneCellOnCAHeapSize = 88;
611    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
612    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
613    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
614
615    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
616    assertEquals(0, memstore.getSnapshot().getCellsCount());
617    // There is no compaction, as the compacting memstore type is basic.
618    // totalCellsLen remains the same
619    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
620        + 4 * oneCellOnCAHeapSize;
621    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
622    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
623
624    MemStoreSize mss = memstore.getFlushableSize();
625    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
626    // simulate flusher
627    region.decrMemStoreSize(mss);
628    ImmutableSegment s = memstore.getSnapshot();
629    assertEquals(4, s.getCellsCount());
630    assertEquals(0, regionServicesForStores.getMemStoreSize());
631
632    memstore.clearSnapshot(snapshot.getId());
633  }
634
635  @Test
636  public void testCompaction2Buckets() throws IOException {
637
638    // set memstore to do basic structure flattening, the "eager" option is tested in
639    // TestCompactingToCellFlatMapMemStore
640    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
641    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
642        String.valueOf(compactionType));
643    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
644        String.valueOf(1));
645    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
646    String[] keys1 = { "A", "A", "B", "C" };
647    String[] keys2 = { "A", "B", "D" };
648
649    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
650    int oneCellOnCSLMHeapSize = 120;
651    int oneCellOnCAHeapSize = 88;
652    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
653
654    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
655    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
656
657    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
658    int counter = 0;
659    for ( Segment s : memstore.getSegments()) {
660      counter += s.getCellsCount();
661    }
662    assertEquals(4, counter);
663    assertEquals(0, memstore.getSnapshot().getCellsCount());
664    // There is no compaction, as the compacting memstore type is basic.
665    // totalCellsLen remains the same
666    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
667    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
668        + 4 * oneCellOnCAHeapSize;
669    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
670
671    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
672    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
673    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
674    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
675
676    MemStoreSize mss = memstore.getFlushableSize();
677    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
678    assertEquals(0, memstore.getSnapshot().getCellsCount());
679    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
680    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
681        + 7 * oneCellOnCAHeapSize;
682    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
683
684    mss = memstore.getFlushableSize();
685    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
686    // simulate flusher
687    region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
688      mss.getCellsCount());
689    ImmutableSegment s = memstore.getSnapshot();
690    assertEquals(7, s.getCellsCount());
691    assertEquals(0, regionServicesForStores.getMemStoreSize());
692
693    memstore.clearSnapshot(snapshot.getId());
694  }
695
696  @Test
697  public void testCompaction3Buckets() throws IOException {
698
699    // set memstore to do data compaction and not to use the speculative scan
700    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
701    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
702        String.valueOf(compactionType));
703    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
704    String[] keys1 = { "A", "A", "B", "C" };
705    String[] keys2 = { "A", "B", "D" };
706    String[] keys3 = { "D", "B", "B" };
707
708    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
709    int oneCellOnCSLMHeapSize = 120;
710    int oneCellOnCAHeapSize = 88;
711    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
712    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
713    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
714    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
715
716    assertEquals(0, memstore.getSnapshot().getCellsCount());
717    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
718    // totalCellsLen
719    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
720    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
721    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
722    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
723        + 3 * oneCellOnCAHeapSize;
724    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
725
726    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
727    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
728
729    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
730    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
731
732    ((MyCompactingMemStore) memstore).disableCompaction();
733    MemStoreSize mss = memstore.getFlushableSize();
734    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
735    assertEquals(0, memstore.getSnapshot().getCellsCount());
736    // No change in the cells data size. ie. memstore size. as there is no compaction.
737    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
738    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
739        ((CompactingMemStore) memstore).heapSize());
740
741    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
742    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
743        regionServicesForStores.getMemStoreSize());
744    long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
745        + 3 * oneCellOnCSLMHeapSize;
746    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
747
748    ((MyCompactingMemStore)memstore).enableCompaction();
749    mss = memstore.getFlushableSize();
750    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
751    assertEquals(0, memstore.getSnapshot().getCellsCount());
752    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
753    // Out of total 10, only 4 cells are unique
754    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
755    totalCellsLen3 = 0;// All duplicated cells.
756    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
757        regionServicesForStores.getMemStoreSize());
758    // Only 4 unique cells left
759    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
760        + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
761
762    mss = memstore.getFlushableSize();
763    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
764    // simulate flusher
765    region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
766      mss.getCellsCount());
767    ImmutableSegment s = memstore.getSnapshot();
768    assertEquals(4, s.getCellsCount());
769    assertEquals(0, regionServicesForStores.getMemStoreSize());
770
771    memstore.clearSnapshot(snapshot.getId());
772  }
773
774  @Test
775  public void testMagicCompaction3Buckets() throws IOException {
776
777    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
778    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
779        String.valueOf(compactionType));
780    memstore.getConfiguration().setDouble(
781        AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
782    memstore.getConfiguration().setInt(
783        AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
784    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
785    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
786
787    String[] keys1 = { "A", "B", "D" };
788    String[] keys2 = { "A" };
789    String[] keys3 = { "A", "A", "B", "C" };
790    String[] keys4 = { "D", "B", "B" };
791
792    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
793    int oneCellOnCSLMHeapSize = 120;
794    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
795    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
796    assertEquals(totalHeapSize, memstore.heapSize());
797
798    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten
799    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
800    assertEquals(1.0,
801        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
802    assertEquals(0, memstore.getSnapshot().getCellsCount());
803
804    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
805    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
806    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
807    assertEquals(1.0,
808        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
809    assertEquals(0, memstore.getSnapshot().getCellsCount());
810
811    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
812    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
813    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
814    assertEquals((4.0 / 8.0),
815        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
816    assertEquals(0, memstore.getSnapshot().getCellsCount());
817
818    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
819    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
820    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
821    assertTrue(4 == numCells || 11 == numCells);
822    assertEquals(0, memstore.getSnapshot().getCellsCount());
823
824    MemStoreSize mss = memstore.getFlushableSize();
825    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
826    // simulate flusher
827    region.decrMemStoreSize(mss);
828    ImmutableSegment s = memstore.getSnapshot();
829    numCells = s.getCellsCount();
830    assertTrue(4 == numCells || 11 == numCells);
831    assertEquals(0, regionServicesForStores.getMemStoreSize());
832
833    memstore.clearSnapshot(snapshot.getId());
834  }
835
836  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
837    byte[] fam = Bytes.toBytes("testfamily");
838    byte[] qf = Bytes.toBytes("testqualifier");
839    long size = hmc.getActive().getDataSize();
840    long heapOverhead = hmc.getActive().getHeapSize();
841    int cellsCount = hmc.getActive().getCellsCount();
842    int totalLen = 0;
843    for (int i = 0; i < keys.length; i++) {
844      long timestamp = System.currentTimeMillis();
845      Threads.sleep(1); // to make sure each kv gets a different ts
846      byte[] row = Bytes.toBytes(keys[i]);
847      byte[] val = Bytes.toBytes(keys[i] + i);
848      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
849      totalLen += Segment.getCellLength(kv);
850      hmc.add(kv, null);
851      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
852    }
853    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
854      hmc.getActive().getHeapSize() - heapOverhead, 0,
855      hmc.getActive().getCellsCount() - cellsCount);
856    return totalLen;
857  }
858
859  // for controlling the val size when adding a new cell
860  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
861    byte[] fam = Bytes.toBytes("testfamily");
862    byte[] qf = Bytes.toBytes("testqualifier");
863    long size = hmc.getActive().getDataSize();
864    long heapOverhead = hmc.getActive().getHeapSize();
865    int cellsCount = hmc.getActive().getCellsCount();
866    int totalLen = 0;
867    for (int i = 0; i < keys.length; i++) {
868      long timestamp = System.currentTimeMillis();
869      Threads.sleep(1); // to make sure each kv gets a different ts
870      byte[] row = Bytes.toBytes(keys[i]);
871      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
872      totalLen += Segment.getCellLength(kv);
873      hmc.add(kv, null);
874      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
875    }
876    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
877      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
878    return totalLen;
879  }
880
881  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
882    long t = 1234;
883
884    @Override
885    public long currentTime() {
886            return t;
887        }
888      public void setCurrentTimeMillis(long t) {
889        this.t = t;
890      }
891    }
892
893  static protected class MyCompactingMemStore extends CompactingMemStore {
894
895    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
896        RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
897        throws IOException {
898      super(conf, c, store, regionServices, compactionPolicy);
899    }
900
901    void disableCompaction() {
902      allowCompaction.set(false);
903    }
904
905    void enableCompaction() {
906      allowCompaction.set(true);
907    }
908    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
909        throws IllegalArgumentIOException {
910      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
911    }
912
913  }
914}