001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionInfo;
043import org.apache.hadoop.hbase.HTableDescriptor;
044import org.apache.hadoop.hbase.KeepDeletedCells;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.KeyValueTestUtil;
047import org.apache.hadoop.hbase.MemoryCompactionPolicy;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
051import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.RegionServerTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdge;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.Threads;
058import org.apache.hadoop.hbase.wal.WAL;
059import org.junit.After;
060import org.junit.Before;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.mockito.Mockito;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * compacted memstore test case
070 */
071@Category({RegionServerTests.class, MediumTests.class})
072public class TestCompactingMemStore extends TestDefaultMemStore {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestCompactingMemStore.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
079  protected static ChunkCreator chunkCreator;
080  protected HRegion region;
081  protected RegionServicesForStores regionServicesForStores;
082  protected HStore store;
083
084  //////////////////////////////////////////////////////////////////////////////
085  // Helpers
086  //////////////////////////////////////////////////////////////////////////////
087  protected static byte[] makeQualifier(final int i1, final int i2) {
088    return Bytes.toBytes(Integer.toString(i1) + ";" +
089        Integer.toString(i2));
090  }
091
092  @After
093  public void tearDown() throws Exception {
094    chunkCreator.clearChunksInPool();
095  }
096
097  @Override
098  @Before
099  public void setUp() throws Exception {
100    compactingSetUp();
101    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparator.getInstance(),
102        store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
103    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
104  }
105
106  protected void compactingSetUp() throws Exception {
107    super.internalSetUp();
108    Configuration conf = new Configuration();
109    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
110    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
111    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
112    HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
113    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
114    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
115    htd.addFamily(hcd);
116    HRegionInfo info =
117        new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
118    WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
119    this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
120    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
121    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
122    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
123    this.store = new HStore(region, hcd, conf, false);
124
125    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
126        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
127    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
128        globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
129            null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
130    assertNotNull(chunkCreator);
131  }
132
133  /**
134   * A simple test which flush in memory affect timeOfOldestEdit
135   */
136  @Test
137  public void testTimeOfOldestEdit() {
138    assertEquals(Long.MAX_VALUE,  memstore.timeOfOldestEdit());
139    final byte[] r = Bytes.toBytes("r");
140    final byte[] f = Bytes.toBytes("f");
141    final byte[] q = Bytes.toBytes("q");
142    final byte[] v = Bytes.toBytes("v");
143    final KeyValue kv = new KeyValue(r, f, q, v);
144    memstore.add(kv, null);
145    long timeOfOldestEdit = memstore.timeOfOldestEdit();
146    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
147
148    ((CompactingMemStore)memstore).flushInMemory();
149    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
150    memstore.add(kv, null);
151    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
152    memstore.snapshot();
153    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
154  }
155
156  /**
157   * A simple test which verifies the 3 possible states when scanning across snapshot.
158   *
159   * @throws IOException
160   * @throws CloneNotSupportedException
161   */
162  @Override
163  @Test
164  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
165    // we are going to the scanning across snapshot with two kvs
166    // kv1 should always be returned before kv2
167    final byte[] one = Bytes.toBytes(1);
168    final byte[] two = Bytes.toBytes(2);
169    final byte[] f = Bytes.toBytes("f");
170    final byte[] q = Bytes.toBytes("q");
171    final byte[] v = Bytes.toBytes(3);
172
173    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
174    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
175
176    // use case 1: both kvs in kvset
177    this.memstore.add(kv1.clone(), null);
178    this.memstore.add(kv2.clone(), null);
179    verifyScanAcrossSnapshot2(kv1, kv2);
180
181    // use case 2: both kvs in snapshot
182    this.memstore.snapshot();
183    verifyScanAcrossSnapshot2(kv1, kv2);
184
185    // use case 3: first in snapshot second in kvset
186    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
187        CellComparator.getInstance(), store, regionServicesForStores,
188        MemoryCompactionPolicy.EAGER);
189    this.memstore.add(kv1.clone(), null);
190    // As compaction is starting in the background the repetition
191    // of the k1 might be removed BUT the scanners created earlier
192    // should look on the OLD MutableCellSetSegment, so this should be OK...
193    this.memstore.snapshot();
194    this.memstore.add(kv2.clone(), null);
195    verifyScanAcrossSnapshot2(kv1,kv2);
196  }
197
198  /**
199   * Test memstore snapshots
200   * @throws IOException
201   */
202  @Override
203  @Test
204  public void testSnapshotting() throws IOException {
205    final int snapshotCount = 5;
206    // Add some rows, run a snapshot. Do it a few times.
207    for (int i = 0; i < snapshotCount; i++) {
208      addRows(this.memstore);
209      runSnapshot(this.memstore, true);
210      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
211    }
212  }
213
214
215  //////////////////////////////////////////////////////////////////////////////
216  // Get tests
217  //////////////////////////////////////////////////////////////////////////////
218
219  /** Test getNextRow from memstore
220   * @throws InterruptedException
221   */
222  @Override
223  @Test
224  public void testGetNextRow() throws Exception {
225    addRows(this.memstore);
226    // Add more versions to make it a little more interesting.
227    Thread.sleep(1);
228    addRows(this.memstore);
229    Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
230    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
231        new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
232    for (int i = 0; i < ROW_COUNT; i++) {
233      Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
234          System.currentTimeMillis()));
235      if (i + 1 == ROW_COUNT) {
236        assertNull(nr);
237      } else {
238        assertTrue(CellComparator.getInstance().compareRows(nr,
239            new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
240      }
241    }
242    //starting from each row, validate results should contain the starting row
243    Configuration conf = HBaseConfiguration.create();
244    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
245      ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
246          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
247      try (InternalScanner scanner =
248          new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
249              memstore.getScanners(0))) {
250        List<Cell> results = new ArrayList<>();
251        for (int i = 0; scanner.next(results); i++) {
252          int rowId = startRowId + i;
253          Cell left = results.get(0);
254          byte[] row1 = Bytes.toBytes(rowId);
255          assertTrue("Row name",
256            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
257          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
258          List<Cell> row = new ArrayList<>();
259          for (Cell kv : results) {
260            row.add(kv);
261          }
262          isExpectedRowWithoutTimestamps(rowId, row);
263          // Clear out set. Otherwise row results accumulate.
264          results.clear();
265        }
266      }
267    }
268  }
269
270  @Override
271  @Test
272  public void testGet_memstoreAndSnapShot() throws IOException {
273    byte[] row = Bytes.toBytes("testrow");
274    byte[] fam = Bytes.toBytes("testfamily");
275    byte[] qf1 = Bytes.toBytes("testqualifier1");
276    byte[] qf2 = Bytes.toBytes("testqualifier2");
277    byte[] qf3 = Bytes.toBytes("testqualifier3");
278    byte[] qf4 = Bytes.toBytes("testqualifier4");
279    byte[] qf5 = Bytes.toBytes("testqualifier5");
280    byte[] val = Bytes.toBytes("testval");
281
282    //Setting up memstore
283    memstore.add(new KeyValue(row, fam, qf1, val), null);
284    memstore.add(new KeyValue(row, fam, qf2, val), null);
285    memstore.add(new KeyValue(row, fam, qf3, val), null);
286    //Pushing to pipeline
287    ((CompactingMemStore)memstore).flushInMemory();
288    assertEquals(0, memstore.getSnapshot().getCellsCount());
289    //Creating a snapshot
290    memstore.snapshot();
291    assertEquals(3, memstore.getSnapshot().getCellsCount());
292    //Adding value to "new" memstore
293    assertEquals(0, memstore.getActive().getCellsCount());
294    memstore.add(new KeyValue(row, fam, qf4, val), null);
295    memstore.add(new KeyValue(row, fam, qf5, val), null);
296    assertEquals(2, memstore.getActive().getCellsCount());
297  }
298
299  ////////////////////////////////////
300  // Test for periodic memstore flushes
301  // based on time of oldest edit
302  ////////////////////////////////////
303
304  /**
305   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased
306   * as older keyvalues are deleted from the memstore.
307   *
308   * @throws Exception
309   */
310  @Override
311  @Test
312  public void testUpsertMemstoreSize() throws Exception {
313    MemStoreSize oldSize = memstore.size();
314
315    List<Cell> l = new ArrayList<>();
316    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
317    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
318    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
319
320    kv1.setSequenceId(1);
321    kv2.setSequenceId(1);
322    kv3.setSequenceId(1);
323    l.add(kv1);
324    l.add(kv2);
325    l.add(kv3);
326
327    this.memstore.upsert(l, 2, null);// readpoint is 2
328    MemStoreSize newSize = this.memstore.size();
329    assert (newSize.getDataSize() > oldSize.getDataSize());
330    //The kv1 should be removed.
331    assert (memstore.getActive().getCellsCount() == 2);
332
333    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
334    kv4.setSequenceId(1);
335    l.clear();
336    l.add(kv4);
337    this.memstore.upsert(l, 3, null);
338    assertEquals(newSize, this.memstore.size());
339    //The kv2 should be removed.
340    assert (memstore.getActive().getCellsCount() == 2);
341    //this.memstore = null;
342  }
343
344  /**
345   * Tests that the timeOfOldestEdit is updated correctly for the
346   * various edit operations in memstore.
347   * @throws Exception
348   */
349  @Override
350  @Test
351  public void testUpdateToTimeOfOldestEdit() throws Exception {
352    try {
353      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
354      EnvironmentEdgeManager.injectEdge(edge);
355      long t = memstore.timeOfOldestEdit();
356      assertEquals(Long.MAX_VALUE, t);
357
358      // test the case that the timeOfOldestEdit is updated after a KV add
359      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
360      t = memstore.timeOfOldestEdit();
361      assertTrue(t == 1234);
362      // The method will also assert
363      // the value is reset to Long.MAX_VALUE
364      t = runSnapshot(memstore, true);
365
366      // test the case that the timeOfOldestEdit is updated after a KV delete
367      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
368      t = memstore.timeOfOldestEdit();
369      assertTrue(t == 1234);
370     t = runSnapshot(memstore, true);
371
372      // test the case that the timeOfOldestEdit is updated after a KV upsert
373      List<Cell> l = new ArrayList<>();
374      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
375      kv1.setSequenceId(100);
376      l.add(kv1);
377      memstore.upsert(l, 1000, null);
378      t = memstore.timeOfOldestEdit();
379      assertTrue(t == 1234);
380    } finally {
381      EnvironmentEdgeManager.reset();
382    }
383  }
384
385  private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
386      throws IOException {
387    // Save off old state.
388    long oldHistorySize = hmc.getSnapshot().getDataSize();
389    long prevTimeStamp = hmc.timeOfOldestEdit();
390
391    hmc.snapshot();
392    MemStoreSnapshot snapshot = hmc.snapshot();
393    if (useForce) {
394      // Make some assertions about what just happened.
395      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
396      long t = hmc.timeOfOldestEdit();
397      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
398      hmc.clearSnapshot(snapshot.getId());
399    } else {
400      long t = hmc.timeOfOldestEdit();
401      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
402    }
403    return prevTimeStamp;
404  }
405
406  private void isExpectedRowWithoutTimestamps(final int rowIndex,
407      List<Cell> kvs) {
408    int i = 0;
409    for (Cell kv : kvs) {
410      byte[] expectedColname = makeQualifier(rowIndex, i++);
411      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
412      // Value is column name as bytes.  Usually result is
413      // 100 bytes in size at least. This is the default size
414      // for BytesWriteable.  For comparison, convert bytes to
415      // String and trim to remove trailing null bytes.
416      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
417    }
418  }
419
420  @Test
421  public void testPuttingBackChunksAfterFlushing() throws IOException {
422    byte[] row = Bytes.toBytes("testrow");
423    byte[] fam = Bytes.toBytes("testfamily");
424    byte[] qf1 = Bytes.toBytes("testqualifier1");
425    byte[] qf2 = Bytes.toBytes("testqualifier2");
426    byte[] qf3 = Bytes.toBytes("testqualifier3");
427    byte[] qf4 = Bytes.toBytes("testqualifier4");
428    byte[] qf5 = Bytes.toBytes("testqualifier5");
429    byte[] val = Bytes.toBytes("testval");
430
431    // Setting up memstore
432    memstore.add(new KeyValue(row, fam, qf1, val), null);
433    memstore.add(new KeyValue(row, fam, qf2, val), null);
434    memstore.add(new KeyValue(row, fam, qf3, val), null);
435
436    // Creating a snapshot
437    MemStoreSnapshot snapshot = memstore.snapshot();
438    assertEquals(3, memstore.getSnapshot().getCellsCount());
439
440    // Adding value to "new" memstore
441    assertEquals(0, memstore.getActive().getCellsCount());
442    memstore.add(new KeyValue(row, fam, qf4, val), null);
443    memstore.add(new KeyValue(row, fam, qf5, val), null);
444    assertEquals(2, memstore.getActive().getCellsCount());
445    // close the scanners
446    for(KeyValueScanner scanner : snapshot.getScanners()) {
447      scanner.close();
448    }
449    memstore.clearSnapshot(snapshot.getId());
450
451    int chunkCount = chunkCreator.getPoolSize();
452    assertTrue(chunkCount > 0);
453
454  }
455
456  @Test
457  public void testPuttingBackChunksWithOpeningScanner()
458      throws IOException {
459    byte[] row = Bytes.toBytes("testrow");
460    byte[] fam = Bytes.toBytes("testfamily");
461    byte[] qf1 = Bytes.toBytes("testqualifier1");
462    byte[] qf2 = Bytes.toBytes("testqualifier2");
463    byte[] qf3 = Bytes.toBytes("testqualifier3");
464    byte[] qf4 = Bytes.toBytes("testqualifier4");
465    byte[] qf5 = Bytes.toBytes("testqualifier5");
466    byte[] qf6 = Bytes.toBytes("testqualifier6");
467    byte[] qf7 = Bytes.toBytes("testqualifier7");
468    byte[] val = Bytes.toBytes("testval");
469
470    // Setting up memstore
471    memstore.add(new KeyValue(row, fam, qf1, val), null);
472    memstore.add(new KeyValue(row, fam, qf2, val), null);
473    memstore.add(new KeyValue(row, fam, qf3, val), null);
474
475    // Creating a snapshot
476    MemStoreSnapshot snapshot = memstore.snapshot();
477    assertEquals(3, memstore.getSnapshot().getCellsCount());
478
479    // Adding value to "new" memstore
480    assertEquals(0, memstore.getActive().getCellsCount());
481    memstore.add(new KeyValue(row, fam, qf4, val), null);
482    memstore.add(new KeyValue(row, fam, qf5, val), null);
483    assertEquals(2, memstore.getActive().getCellsCount());
484
485    // opening scanner before clear the snapshot
486    List<KeyValueScanner> scanners = memstore.getScanners(0);
487    // Shouldn't putting back the chunks to pool,since some scanners are opening
488    // based on their data
489    // close the scanners
490    for(KeyValueScanner scanner : snapshot.getScanners()) {
491      scanner.close();
492    }
493    memstore.clearSnapshot(snapshot.getId());
494
495    assertTrue(chunkCreator.getPoolSize() == 0);
496
497    // Chunks will be put back to pool after close scanners;
498    for (KeyValueScanner scanner : scanners) {
499      scanner.close();
500    }
501    assertTrue(chunkCreator.getPoolSize() > 0);
502
503    // clear chunks
504    chunkCreator.clearChunksInPool();
505
506    // Creating another snapshot
507
508    snapshot = memstore.snapshot();
509    // Adding more value
510    memstore.add(new KeyValue(row, fam, qf6, val), null);
511    memstore.add(new KeyValue(row, fam, qf7, val), null);
512    // opening scanners
513    scanners = memstore.getScanners(0);
514    // close scanners before clear the snapshot
515    for (KeyValueScanner scanner : scanners) {
516      scanner.close();
517    }
518    // Since no opening scanner, the chunks of snapshot should be put back to
519    // pool
520    // close the scanners
521    for(KeyValueScanner scanner : snapshot.getScanners()) {
522      scanner.close();
523    }
524    memstore.clearSnapshot(snapshot.getId());
525    assertTrue(chunkCreator.getPoolSize() > 0);
526  }
527
528  @Test
529  public void testPuttingBackChunksWithOpeningPipelineScanner()
530      throws IOException {
531
532    // set memstore to do data compaction and not to use the speculative scan
533    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
534    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
535        String.valueOf(compactionType));
536    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
537
538    byte[] row = Bytes.toBytes("testrow");
539    byte[] fam = Bytes.toBytes("testfamily");
540    byte[] qf1 = Bytes.toBytes("testqualifier1");
541    byte[] qf2 = Bytes.toBytes("testqualifier2");
542    byte[] qf3 = Bytes.toBytes("testqualifier3");
543    byte[] val = Bytes.toBytes("testval");
544
545    // Setting up memstore
546    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
547    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
548    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
549
550    // Creating a pipeline
551    ((MyCompactingMemStore)memstore).disableCompaction();
552    ((CompactingMemStore)memstore).flushInMemory();
553
554    // Adding value to "new" memstore
555    assertEquals(0, memstore.getActive().getCellsCount());
556    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
557    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
558    assertEquals(2, memstore.getActive().getCellsCount());
559
560    // pipeline bucket 2
561    ((CompactingMemStore)memstore).flushInMemory();
562    // opening scanner before force flushing
563    List<KeyValueScanner> scanners = memstore.getScanners(0);
564    // Shouldn't putting back the chunks to pool,since some scanners are opening
565    // based on their data
566    ((MyCompactingMemStore)memstore).enableCompaction();
567    // trigger compaction
568    ((CompactingMemStore)memstore).flushInMemory();
569
570    // Adding value to "new" memstore
571    assertEquals(0, memstore.getActive().getCellsCount());
572    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
573    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
574    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
575    assertEquals(3, memstore.getActive().getCellsCount());
576
577    assertTrue(chunkCreator.getPoolSize() == 0);
578
579    // Chunks will be put back to pool after close scanners;
580    for (KeyValueScanner scanner : scanners) {
581      scanner.close();
582    }
583    assertTrue(chunkCreator.getPoolSize() > 0);
584
585    // clear chunks
586    chunkCreator.clearChunksInPool();
587
588    // Creating another snapshot
589
590    MemStoreSnapshot snapshot = memstore.snapshot();
591    // close the scanners
592    for(KeyValueScanner scanner : snapshot.getScanners()) {
593      scanner.close();
594    }
595    memstore.clearSnapshot(snapshot.getId());
596
597    snapshot = memstore.snapshot();
598    // Adding more value
599    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
600    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
601    // opening scanners
602    scanners = memstore.getScanners(0);
603    // close scanners before clear the snapshot
604    for (KeyValueScanner scanner : scanners) {
605      scanner.close();
606    }
607    // Since no opening scanner, the chunks of snapshot should be put back to
608    // pool
609    // close the scanners
610    for(KeyValueScanner scanner : snapshot.getScanners()) {
611      scanner.close();
612    }
613    memstore.clearSnapshot(snapshot.getId());
614    assertTrue(chunkCreator.getPoolSize() > 0);
615  }
616
617  //////////////////////////////////////////////////////////////////////////////
618  // Compaction tests
619  //////////////////////////////////////////////////////////////////////////////
620  @Test
621  public void testCompaction1Bucket() throws IOException {
622
623    // set memstore to do basic structure flattening, the "eager" option is tested in
624    // TestCompactingToCellFlatMapMemStore
625    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
626    memstore.getConfiguration()
627        .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
628    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
629
630    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
631
632    // test 1 bucket
633    int totalCellsLen = addRowsByKeys(memstore, keys1);
634    int oneCellOnCSLMHeapSize = 120;
635    int oneCellOnCAHeapSize = 88;
636    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
637    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
638    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
639
640    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
641    assertEquals(0, memstore.getSnapshot().getCellsCount());
642    // There is no compaction, as the compacting memstore type is basic.
643    // totalCellsLen remains the same
644    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
645        + 4 * oneCellOnCAHeapSize;
646    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
647    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
648
649    MemStoreSize mss = memstore.getFlushableSize();
650    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
651    // simulate flusher
652    region.decrMemStoreSize(mss);
653    ImmutableSegment s = memstore.getSnapshot();
654    assertEquals(4, s.getCellsCount());
655    assertEquals(0, regionServicesForStores.getMemStoreSize());
656
657    memstore.clearSnapshot(snapshot.getId());
658  }
659
660  @Test
661  public void testCompaction2Buckets() throws IOException {
662
663    // set memstore to do basic structure flattening, the "eager" option is tested in
664    // TestCompactingToCellFlatMapMemStore
665    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
666    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
667        String.valueOf(compactionType));
668    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
669        String.valueOf(1));
670    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
671    String[] keys1 = { "A", "A", "B", "C" };
672    String[] keys2 = { "A", "B", "D" };
673
674    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
675    int oneCellOnCSLMHeapSize = 120;
676    int oneCellOnCAHeapSize = 88;
677    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
678
679    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
680    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
681
682    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
683    int counter = 0;
684    for ( Segment s : memstore.getSegments()) {
685      counter += s.getCellsCount();
686    }
687    assertEquals(4, counter);
688    assertEquals(0, memstore.getSnapshot().getCellsCount());
689    // There is no compaction, as the compacting memstore type is basic.
690    // totalCellsLen remains the same
691    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
692    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
693        + 4 * oneCellOnCAHeapSize;
694    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
695
696    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
697    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
698    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
699    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
700
701    MemStoreSize mss = memstore.getFlushableSize();
702    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
703    assertEquals(0, memstore.getSnapshot().getCellsCount());
704    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
705    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
706        + 7 * oneCellOnCAHeapSize;
707    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
708
709    mss = memstore.getFlushableSize();
710    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
711    // simulate flusher
712    region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
713      mss.getCellsCount());
714    ImmutableSegment s = memstore.getSnapshot();
715    assertEquals(7, s.getCellsCount());
716    assertEquals(0, regionServicesForStores.getMemStoreSize());
717
718    memstore.clearSnapshot(snapshot.getId());
719  }
720
721  @Test
722  public void testCompaction3Buckets() throws IOException {
723
724    // set memstore to do data compaction and not to use the speculative scan
725    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
726    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
727        String.valueOf(compactionType));
728    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
729    String[] keys1 = { "A", "A", "B", "C" };
730    String[] keys2 = { "A", "B", "D" };
731    String[] keys3 = { "D", "B", "B" };
732
733    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
734    int oneCellOnCSLMHeapSize = 120;
735    int oneCellOnCAHeapSize = 88;
736    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
737    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
738    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
739    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
740
741    assertEquals(0, memstore.getSnapshot().getCellsCount());
742    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
743    // totalCellsLen
744    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
745    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
746    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
747    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
748        + 3 * oneCellOnCAHeapSize;
749    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
750
751    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
752    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
753
754    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
755    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
756
757    ((MyCompactingMemStore) memstore).disableCompaction();
758    MemStoreSize mss = memstore.getFlushableSize();
759    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
760    assertEquals(0, memstore.getSnapshot().getCellsCount());
761    // No change in the cells data size. ie. memstore size. as there is no compaction.
762    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
763    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
764        ((CompactingMemStore) memstore).heapSize());
765
766    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
767    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
768        regionServicesForStores.getMemStoreSize());
769    long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
770        + 3 * oneCellOnCSLMHeapSize;
771    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
772
773    ((MyCompactingMemStore)memstore).enableCompaction();
774    mss = memstore.getFlushableSize();
775    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
776    assertEquals(0, memstore.getSnapshot().getCellsCount());
777    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
778    // Out of total 10, only 4 cells are unique
779    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
780    totalCellsLen3 = 0;// All duplicated cells.
781    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
782        regionServicesForStores.getMemStoreSize());
783    // Only 4 unique cells left
784    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
785        + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
786
787    mss = memstore.getFlushableSize();
788    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
789    // simulate flusher
790    region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
791      mss.getCellsCount());
792    ImmutableSegment s = memstore.getSnapshot();
793    assertEquals(4, s.getCellsCount());
794    assertEquals(0, regionServicesForStores.getMemStoreSize());
795
796    memstore.clearSnapshot(snapshot.getId());
797  }
798
799  @Test
800  public void testMagicCompaction3Buckets() throws IOException {
801
802    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
803    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
804        String.valueOf(compactionType));
805    memstore.getConfiguration().setDouble(
806        AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
807    memstore.getConfiguration().setInt(
808        AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
809    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
810    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
811
812    String[] keys1 = { "A", "B", "D" };
813    String[] keys2 = { "A" };
814    String[] keys3 = { "A", "A", "B", "C" };
815    String[] keys4 = { "D", "B", "B" };
816
817    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
818    int oneCellOnCSLMHeapSize = 120;
819    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
820    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
821    assertEquals(totalHeapSize, memstore.heapSize());
822
823    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten
824    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
825    assertEquals(1.0,
826        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
827    assertEquals(0, memstore.getSnapshot().getCellsCount());
828
829    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
830    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
831    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
832    assertEquals(1.0,
833        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
834    assertEquals(0, memstore.getSnapshot().getCellsCount());
835
836    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
837    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
838    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
839    assertEquals((4.0 / 8.0),
840        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
841    assertEquals(0, memstore.getSnapshot().getCellsCount());
842
843    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
844    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
845    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
846    assertTrue(4 == numCells || 11 == numCells);
847    assertEquals(0, memstore.getSnapshot().getCellsCount());
848
849    MemStoreSize mss = memstore.getFlushableSize();
850    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
851    // simulate flusher
852    region.decrMemStoreSize(mss);
853    ImmutableSegment s = memstore.getSnapshot();
854    numCells = s.getCellsCount();
855    assertTrue(4 == numCells || 11 == numCells);
856    assertEquals(0, regionServicesForStores.getMemStoreSize());
857
858    memstore.clearSnapshot(snapshot.getId());
859  }
860
861  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
862    byte[] fam = Bytes.toBytes("testfamily");
863    byte[] qf = Bytes.toBytes("testqualifier");
864    long size = hmc.getActive().getDataSize();
865    long heapOverhead = hmc.getActive().getHeapSize();
866    int cellsCount = hmc.getActive().getCellsCount();
867    int totalLen = 0;
868    for (int i = 0; i < keys.length; i++) {
869      long timestamp = System.currentTimeMillis();
870      Threads.sleep(1); // to make sure each kv gets a different ts
871      byte[] row = Bytes.toBytes(keys[i]);
872      byte[] val = Bytes.toBytes(keys[i] + i);
873      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
874      totalLen += Segment.getCellLength(kv);
875      hmc.add(kv, null);
876      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
877    }
878    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
879      hmc.getActive().getHeapSize() - heapOverhead, 0,
880      hmc.getActive().getCellsCount() - cellsCount);
881    return totalLen;
882  }
883
884  // for controlling the val size when adding a new cell
885  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
886    byte[] fam = Bytes.toBytes("testfamily");
887    byte[] qf = Bytes.toBytes("testqualifier");
888    long size = hmc.getActive().getDataSize();
889    long heapOverhead = hmc.getActive().getHeapSize();
890    int cellsCount = hmc.getActive().getCellsCount();
891    int totalLen = 0;
892    for (int i = 0; i < keys.length; i++) {
893      long timestamp = System.currentTimeMillis();
894      Threads.sleep(1); // to make sure each kv gets a different ts
895      byte[] row = Bytes.toBytes(keys[i]);
896      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
897      totalLen += Segment.getCellLength(kv);
898      hmc.add(kv, null);
899      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
900    }
901    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
902      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
903    return totalLen;
904  }
905
906  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
907    long t = 1234;
908
909    @Override
910    public long currentTime() {
911            return t;
912        }
913      public void setCurrentTimeMillis(long t) {
914        this.t = t;
915      }
916    }
917
918  static protected class MyCompactingMemStore extends CompactingMemStore {
919
920    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
921        RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
922        throws IOException {
923      super(conf, c, store, regionServices, compactionPolicy);
924    }
925
926    void disableCompaction() {
927      allowCompaction.set(false);
928    }
929
930    void enableCompaction() {
931      allowCompaction.set(true);
932    }
933    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
934        throws IllegalArgumentIOException {
935      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
936    }
937
938  }
939}