001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeepDeletedCells;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValueTestUtil;
043import org.apache.hadoop.hbase.MemoryCompactionPolicy;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
053import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdge;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.junit.After;
062import org.junit.Before;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.mockito.Mockito;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * compacted memstore test case
072 */
073@Category({ RegionServerTests.class, MediumTests.class })
074public class TestCompactingMemStore extends TestDefaultMemStore {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestCompactingMemStore.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
081  protected static ChunkCreator chunkCreator;
082  protected HRegion region;
083  protected RegionServicesForStores regionServicesForStores;
084  protected HStore store;
085
086  //////////////////////////////////////////////////////////////////////////////
087  // Helpers
088  //////////////////////////////////////////////////////////////////////////////
089  protected static byte[] makeQualifier(final int i1, final int i2) {
090    return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2));
091  }
092
093  @After
094  public void tearDown() throws Exception {
095    chunkCreator.clearChunksInPool();
096  }
097
098  @Override
099  @Before
100  public void setUp() throws Exception {
101    compactingSetUp();
102    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(),
103      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
104    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
105  }
106
107  protected void compactingSetUp() throws Exception {
108    super.internalSetUp();
109    Configuration conf = new Configuration();
110    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
111    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
112    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
113    HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf);
114    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY);
115    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar"))
116      .setColumnFamily(familyDescriptor).build();
117    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build();
118    WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info);
119    this.region =
120      HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true);
121    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
122    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
123    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
124    this.store = new HStore(region, familyDescriptor, conf, false);
125
126    long globalMemStoreLimit =
127      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
128        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
129    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
130      globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
131      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
132    assertNotNull(chunkCreator);
133  }
134
135  /**
136   * A simple test which flush in memory affect timeOfOldestEdit
137   */
138  @Test
139  public void testTimeOfOldestEdit() {
140    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
141    final byte[] r = Bytes.toBytes("r");
142    final byte[] f = Bytes.toBytes("f");
143    final byte[] q = Bytes.toBytes("q");
144    final byte[] v = Bytes.toBytes("v");
145    final KeyValue kv = new KeyValue(r, f, q, v);
146    memstore.add(kv, null);
147    long timeOfOldestEdit = memstore.timeOfOldestEdit();
148    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
149
150    ((CompactingMemStore) memstore).flushInMemory();
151    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
152    memstore.add(kv, null);
153    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
154    memstore.snapshot();
155    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
156  }
157
158  /**
159   * A simple test which verifies the 3 possible states when scanning across snapshot.
160   */
161  @Override
162  @Test
163  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
164    // we are going to the scanning across snapshot with two kvs
165    // kv1 should always be returned before kv2
166    final byte[] one = Bytes.toBytes(1);
167    final byte[] two = Bytes.toBytes(2);
168    final byte[] f = Bytes.toBytes("f");
169    final byte[] q = Bytes.toBytes("q");
170    final byte[] v = Bytes.toBytes(3);
171
172    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
173    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
174
175    // use case 1: both kvs in kvset
176    this.memstore.add(kv1.clone(), null);
177    this.memstore.add(kv2.clone(), null);
178    // snapshot is empty,active segment is not empty,
179    // empty segment is skipped.
180    verifyOneScanAcrossSnapshot2(kv1, kv2);
181
182    // use case 2: both kvs in snapshot
183    this.memstore.snapshot();
184    // active segment is empty,snapshot is not empty,
185    // empty segment is skipped.
186    verifyOneScanAcrossSnapshot2(kv1, kv2);
187
188    // use case 3: first in snapshot second in kvset
189    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
190      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
191    this.memstore.add(kv1.clone(), null);
192    // As compaction is starting in the background the repetition
193    // of the k1 might be removed BUT the scanners created earlier
194    // should look on the OLD MutableCellSetSegment, so this should be OK...
195    this.memstore.snapshot();
196    this.memstore.add(kv2.clone(), null);
197    verifyScanAcrossSnapshot2(kv1, kv2);
198  }
199
200  /**
201   * Test memstore snapshots
202   */
203  @Override
204  @Test
205  public void testSnapshotting() throws IOException {
206    final int snapshotCount = 5;
207    // Add some rows, run a snapshot. Do it a few times.
208    for (int i = 0; i < snapshotCount; i++) {
209      addRows(this.memstore);
210      runSnapshot(this.memstore, true);
211      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
212    }
213  }
214
215  //////////////////////////////////////////////////////////////////////////////
216  // Get tests
217  //////////////////////////////////////////////////////////////////////////////
218
219  /**
220   * Test getNextRow from memstore
221   */
222  @Override
223  @Test
224  public void testGetNextRow() throws Exception {
225    addRows(this.memstore);
226    // Add more versions to make it a little more interesting.
227    Thread.sleep(1);
228    addRows(this.memstore);
229    Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY);
230    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
231      new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0);
232    for (int i = 0; i < ROW_COUNT; i++) {
233      Cell nr = ((CompactingMemStore) this.memstore)
234        .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime()));
235      if (i + 1 == ROW_COUNT) {
236        assertNull(nr);
237      } else {
238        assertTrue(CellComparator.getInstance().compareRows(nr,
239          new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0);
240      }
241    }
242    // starting from each row, validate results should contain the starting row
243    Configuration conf = HBaseConfiguration.create();
244    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
245      ScanInfo scanInfo =
246        new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
247          HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
248      try (InternalScanner scanner =
249        new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
250          memstore.getScanners(0))) {
251        List<Cell> results = new ArrayList<>();
252        for (int i = 0; scanner.next(results); i++) {
253          int rowId = startRowId + i;
254          Cell left = results.get(0);
255          byte[] row1 = Bytes.toBytes(rowId);
256          assertTrue("Row name",
257            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
258          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
259          List<Cell> row = new ArrayList<>();
260          for (Cell kv : results) {
261            row.add(kv);
262          }
263          isExpectedRowWithoutTimestamps(rowId, row);
264          // Clear out set. Otherwise row results accumulate.
265          results.clear();
266        }
267      }
268    }
269  }
270
271  @Override
272  @Test
273  public void testGet_memstoreAndSnapShot() throws IOException {
274    byte[] row = Bytes.toBytes("testrow");
275    byte[] fam = Bytes.toBytes("testfamily");
276    byte[] qf1 = Bytes.toBytes("testqualifier1");
277    byte[] qf2 = Bytes.toBytes("testqualifier2");
278    byte[] qf3 = Bytes.toBytes("testqualifier3");
279    byte[] qf4 = Bytes.toBytes("testqualifier4");
280    byte[] qf5 = Bytes.toBytes("testqualifier5");
281    byte[] val = Bytes.toBytes("testval");
282
283    // Setting up memstore
284    memstore.add(new KeyValue(row, fam, qf1, val), null);
285    memstore.add(new KeyValue(row, fam, qf2, val), null);
286    memstore.add(new KeyValue(row, fam, qf3, val), null);
287    // Pushing to pipeline
288    ((CompactingMemStore) memstore).flushInMemory();
289    assertEquals(0, memstore.getSnapshot().getCellsCount());
290    // Creating a snapshot
291    memstore.snapshot();
292    assertEquals(3, memstore.getSnapshot().getCellsCount());
293    // Adding value to "new" memstore
294    assertEquals(0, memstore.getActive().getCellsCount());
295    memstore.add(new KeyValue(row, fam, qf4, val), null);
296    memstore.add(new KeyValue(row, fam, qf5, val), null);
297    assertEquals(2, memstore.getActive().getCellsCount());
298  }
299
300  ////////////////////////////////////
301  // Test for periodic memstore flushes
302  // based on time of oldest edit
303  ////////////////////////////////////
304
305  /**
306   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older
307   * keyvalues are deleted from the memstore.
308   */
309  @Override
310  @Test
311  public void testUpsertMemstoreSize() throws Exception {
312    MemStoreSize oldSize = memstore.size();
313
314    List<Cell> l = new ArrayList<>();
315    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
316    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
317    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
318
319    kv1.setSequenceId(1);
320    kv2.setSequenceId(1);
321    kv3.setSequenceId(1);
322    l.add(kv1);
323    l.add(kv2);
324    l.add(kv3);
325
326    this.memstore.upsert(l, 2, null);// readpoint is 2
327    MemStoreSize newSize = this.memstore.size();
328    assert (newSize.getDataSize() > oldSize.getDataSize());
329    // The kv1 should be removed.
330    assert (memstore.getActive().getCellsCount() == 2);
331
332    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
333    kv4.setSequenceId(1);
334    l.clear();
335    l.add(kv4);
336    this.memstore.upsert(l, 3, null);
337    assertEquals(newSize, this.memstore.size());
338    // The kv2 should be removed.
339    assert (memstore.getActive().getCellsCount() == 2);
340    // this.memstore = null;
341  }
342
343  /**
344   * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in
345   * memstore.
346   */
347  @Override
348  @Test
349  public void testUpdateToTimeOfOldestEdit() throws Exception {
350    try {
351      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
352      EnvironmentEdgeManager.injectEdge(edge);
353      long t = memstore.timeOfOldestEdit();
354      assertEquals(Long.MAX_VALUE, t);
355
356      // test the case that the timeOfOldestEdit is updated after a KV add
357      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
358      t = memstore.timeOfOldestEdit();
359      assertTrue(t == 1234);
360      // The method will also assert
361      // the value is reset to Long.MAX_VALUE
362      t = runSnapshot(memstore, true);
363
364      // test the case that the timeOfOldestEdit is updated after a KV delete
365      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
366      t = memstore.timeOfOldestEdit();
367      assertTrue(t == 1234);
368      t = runSnapshot(memstore, true);
369
370      // test the case that the timeOfOldestEdit is updated after a KV upsert
371      List<Cell> l = new ArrayList<>();
372      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
373      kv1.setSequenceId(100);
374      l.add(kv1);
375      memstore.upsert(l, 1000, null);
376      t = memstore.timeOfOldestEdit();
377      assertTrue(t == 1234);
378    } finally {
379      EnvironmentEdgeManager.reset();
380    }
381  }
382
383  private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException {
384    // Save off old state.
385    long oldHistorySize = hmc.getSnapshot().getDataSize();
386    long prevTimeStamp = hmc.timeOfOldestEdit();
387
388    hmc.snapshot();
389    MemStoreSnapshot snapshot = hmc.snapshot();
390    if (useForce) {
391      // Make some assertions about what just happened.
392      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
393      long t = hmc.timeOfOldestEdit();
394      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
395      hmc.clearSnapshot(snapshot.getId());
396    } else {
397      long t = hmc.timeOfOldestEdit();
398      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
399    }
400    return prevTimeStamp;
401  }
402
403  private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) {
404    int i = 0;
405    for (Cell kv : kvs) {
406      byte[] expectedColname = makeQualifier(rowIndex, i++);
407      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
408      // Value is column name as bytes. Usually result is
409      // 100 bytes in size at least. This is the default size
410      // for BytesWriteable. For comparison, convert bytes to
411      // String and trim to remove trailing null bytes.
412      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
413    }
414  }
415
416  @Test
417  public void testPuttingBackChunksAfterFlushing() throws IOException {
418    byte[] row = Bytes.toBytes("testrow");
419    byte[] fam = Bytes.toBytes("testfamily");
420    byte[] qf1 = Bytes.toBytes("testqualifier1");
421    byte[] qf2 = Bytes.toBytes("testqualifier2");
422    byte[] qf3 = Bytes.toBytes("testqualifier3");
423    byte[] qf4 = Bytes.toBytes("testqualifier4");
424    byte[] qf5 = Bytes.toBytes("testqualifier5");
425    byte[] val = Bytes.toBytes("testval");
426
427    // Setting up memstore
428    memstore.add(new KeyValue(row, fam, qf1, val), null);
429    memstore.add(new KeyValue(row, fam, qf2, val), null);
430    memstore.add(new KeyValue(row, fam, qf3, val), null);
431
432    // Creating a snapshot
433    MemStoreSnapshot snapshot = memstore.snapshot();
434    assertEquals(3, memstore.getSnapshot().getCellsCount());
435
436    // Adding value to "new" memstore
437    assertEquals(0, memstore.getActive().getCellsCount());
438    memstore.add(new KeyValue(row, fam, qf4, val), null);
439    memstore.add(new KeyValue(row, fam, qf5, val), null);
440    assertEquals(2, memstore.getActive().getCellsCount());
441    // close the scanners
442    for (KeyValueScanner scanner : snapshot.getScanners()) {
443      scanner.close();
444    }
445    memstore.clearSnapshot(snapshot.getId());
446
447    int chunkCount = chunkCreator.getPoolSize();
448    assertTrue(chunkCount > 0);
449
450  }
451
452  @Test
453  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
454    byte[] row = Bytes.toBytes("testrow");
455    byte[] fam = Bytes.toBytes("testfamily");
456    byte[] qf1 = Bytes.toBytes("testqualifier1");
457    byte[] qf2 = Bytes.toBytes("testqualifier2");
458    byte[] qf3 = Bytes.toBytes("testqualifier3");
459    byte[] qf4 = Bytes.toBytes("testqualifier4");
460    byte[] qf5 = Bytes.toBytes("testqualifier5");
461    byte[] qf6 = Bytes.toBytes("testqualifier6");
462    byte[] qf7 = Bytes.toBytes("testqualifier7");
463    byte[] val = Bytes.toBytes("testval");
464
465    // Setting up memstore
466    memstore.add(new KeyValue(row, fam, qf1, val), null);
467    memstore.add(new KeyValue(row, fam, qf2, val), null);
468    memstore.add(new KeyValue(row, fam, qf3, val), null);
469
470    // Creating a snapshot
471    MemStoreSnapshot snapshot = memstore.snapshot();
472    assertEquals(3, memstore.getSnapshot().getCellsCount());
473
474    // Adding value to "new" memstore
475    assertEquals(0, memstore.getActive().getCellsCount());
476    memstore.add(new KeyValue(row, fam, qf4, val), null);
477    memstore.add(new KeyValue(row, fam, qf5, val), null);
478    assertEquals(2, memstore.getActive().getCellsCount());
479
480    // opening scanner before clear the snapshot
481    List<KeyValueScanner> scanners = memstore.getScanners(0);
482    // Shouldn't putting back the chunks to pool,since some scanners are opening
483    // based on their data
484    // close the scanners
485    for (KeyValueScanner scanner : snapshot.getScanners()) {
486      scanner.close();
487    }
488    memstore.clearSnapshot(snapshot.getId());
489
490    assertTrue(chunkCreator.getPoolSize() == 0);
491
492    // Chunks will be put back to pool after close scanners;
493    for (KeyValueScanner scanner : scanners) {
494      scanner.close();
495    }
496    assertTrue(chunkCreator.getPoolSize() > 0);
497
498    // clear chunks
499    chunkCreator.clearChunksInPool();
500
501    // Creating another snapshot
502
503    snapshot = memstore.snapshot();
504    // Adding more value
505    memstore.add(new KeyValue(row, fam, qf6, val), null);
506    memstore.add(new KeyValue(row, fam, qf7, val), null);
507    // opening scanners
508    scanners = memstore.getScanners(0);
509    // close scanners before clear the snapshot
510    for (KeyValueScanner scanner : scanners) {
511      scanner.close();
512    }
513    // Since no opening scanner, the chunks of snapshot should be put back to
514    // pool
515    // close the scanners
516    for (KeyValueScanner scanner : snapshot.getScanners()) {
517      scanner.close();
518    }
519    memstore.clearSnapshot(snapshot.getId());
520    assertTrue(chunkCreator.getPoolSize() > 0);
521  }
522
523  @Test
524  public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException {
525
526    // set memstore to do data compaction and not to use the speculative scan
527    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
528    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
529      String.valueOf(compactionType));
530    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
531
532    byte[] row = Bytes.toBytes("testrow");
533    byte[] fam = Bytes.toBytes("testfamily");
534    byte[] qf1 = Bytes.toBytes("testqualifier1");
535    byte[] qf2 = Bytes.toBytes("testqualifier2");
536    byte[] qf3 = Bytes.toBytes("testqualifier3");
537    byte[] val = Bytes.toBytes("testval");
538
539    // Setting up memstore
540    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
541    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
542    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
543
544    // Creating a pipeline
545    ((MyCompactingMemStore) memstore).disableCompaction();
546    ((CompactingMemStore) memstore).flushInMemory();
547
548    // Adding value to "new" memstore
549    assertEquals(0, memstore.getActive().getCellsCount());
550    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
551    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
552    assertEquals(2, memstore.getActive().getCellsCount());
553
554    // pipeline bucket 2
555    ((CompactingMemStore) memstore).flushInMemory();
556    // opening scanner before force flushing
557    List<KeyValueScanner> scanners = memstore.getScanners(0);
558    // Shouldn't putting back the chunks to pool,since some scanners are opening
559    // based on their data
560    ((MyCompactingMemStore) memstore).enableCompaction();
561    // trigger compaction
562    ((CompactingMemStore) memstore).flushInMemory();
563
564    // Adding value to "new" memstore
565    assertEquals(0, memstore.getActive().getCellsCount());
566    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
567    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
568    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
569    assertEquals(3, memstore.getActive().getCellsCount());
570
571    assertTrue(chunkCreator.getPoolSize() == 0);
572
573    // Chunks will be put back to pool after close scanners;
574    for (KeyValueScanner scanner : scanners) {
575      scanner.close();
576    }
577    assertTrue(chunkCreator.getPoolSize() > 0);
578
579    // clear chunks
580    chunkCreator.clearChunksInPool();
581
582    // Creating another snapshot
583
584    MemStoreSnapshot snapshot = memstore.snapshot();
585    // close the scanners
586    for (KeyValueScanner scanner : snapshot.getScanners()) {
587      scanner.close();
588    }
589    memstore.clearSnapshot(snapshot.getId());
590
591    snapshot = memstore.snapshot();
592    // Adding more value
593    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
594    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
595    // opening scanners
596    scanners = memstore.getScanners(0);
597    // close scanners before clear the snapshot
598    for (KeyValueScanner scanner : scanners) {
599      scanner.close();
600    }
601    // Since no opening scanner, the chunks of snapshot should be put back to
602    // pool
603    // close the scanners
604    for (KeyValueScanner scanner : snapshot.getScanners()) {
605      scanner.close();
606    }
607    memstore.clearSnapshot(snapshot.getId());
608    assertTrue(chunkCreator.getPoolSize() > 0);
609  }
610
611  //////////////////////////////////////////////////////////////////////////////
612  // Compaction tests
613  //////////////////////////////////////////////////////////////////////////////
614  @Test
615  public void testCompaction1Bucket() throws IOException {
616
617    // set memstore to do basic structure flattening, the "eager" option is tested in
618    // TestCompactingToCellFlatMapMemStore
619    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
620    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
621      String.valueOf(compactionType));
622    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
623
624    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
625
626    // test 1 bucket
627    int totalCellsLen = addRowsByKeys(memstore, keys1);
628    int oneCellOnCSLMHeapSize = 120;
629    int oneCellOnCAHeapSize = 88;
630    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
631    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
632    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
633
634    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
635    assertEquals(0, memstore.getSnapshot().getCellsCount());
636    // There is no compaction, as the compacting memstore type is basic.
637    // totalCellsLen remains the same
638    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
639      + 4 * oneCellOnCAHeapSize;
640    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
641    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
642
643    MemStoreSize mss = memstore.getFlushableSize();
644    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
645    // simulate flusher
646    region.decrMemStoreSize(mss);
647    ImmutableSegment s = memstore.getSnapshot();
648    assertEquals(4, s.getCellsCount());
649    assertEquals(0, regionServicesForStores.getMemStoreSize());
650
651    memstore.clearSnapshot(snapshot.getId());
652  }
653
654  @Test
655  public void testCompaction2Buckets() throws IOException {
656
657    // set memstore to do basic structure flattening, the "eager" option is tested in
658    // TestCompactingToCellFlatMapMemStore
659    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
660    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
661      String.valueOf(compactionType));
662    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
663      String.valueOf(1));
664    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
665    String[] keys1 = { "A", "A", "B", "C" };
666    String[] keys2 = { "A", "B", "D" };
667
668    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
669    int oneCellOnCSLMHeapSize = 120;
670    int oneCellOnCAHeapSize = 88;
671    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
672
673    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
674    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
675
676    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
677    int counter = 0;
678    for (Segment s : memstore.getSegments()) {
679      counter += s.getCellsCount();
680    }
681    assertEquals(4, counter);
682    assertEquals(0, memstore.getSnapshot().getCellsCount());
683    // There is no compaction, as the compacting memstore type is basic.
684    // totalCellsLen remains the same
685    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
686    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
687      + 4 * oneCellOnCAHeapSize;
688    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
689
690    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
691    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
692    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
693    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
694
695    MemStoreSize mss = memstore.getFlushableSize();
696    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
697    assertEquals(0, memstore.getSnapshot().getCellsCount());
698    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
699    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
700      + 7 * oneCellOnCAHeapSize;
701    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
702
703    mss = memstore.getFlushableSize();
704    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
705    // simulate flusher
706    region.decrMemStoreSize(mss);
707    ImmutableSegment s = memstore.getSnapshot();
708    assertEquals(7, s.getCellsCount());
709    assertEquals(0, regionServicesForStores.getMemStoreSize());
710
711    memstore.clearSnapshot(snapshot.getId());
712  }
713
714  @Test
715  public void testCompaction3Buckets() throws IOException {
716
717    // set memstore to do data compaction and not to use the speculative scan
718    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
719    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
720      String.valueOf(compactionType));
721    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
722    String[] keys1 = { "A", "A", "B", "C" };
723    String[] keys2 = { "A", "B", "D" };
724    String[] keys3 = { "D", "B", "B" };
725
726    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
727    int oneCellOnCSLMHeapSize = 120;
728    int oneCellOnCAHeapSize = 88;
729    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
730    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
731    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
732    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
733
734    assertEquals(0, memstore.getSnapshot().getCellsCount());
735    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
736    // totalCellsLen
737    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
738    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
739    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
740    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
741      + 3 * oneCellOnCAHeapSize;
742    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
743
744    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
745    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
746
747    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
748    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
749
750    ((MyCompactingMemStore) memstore).disableCompaction();
751    MemStoreSize mss = memstore.getFlushableSize();
752    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
753    assertEquals(0, memstore.getSnapshot().getCellsCount());
754    // No change in the cells data size. ie. memstore size. as there is no compaction.
755    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
756    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
757      ((CompactingMemStore) memstore).heapSize());
758
759    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
760    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
761      regionServicesForStores.getMemStoreSize());
762    long totalHeapSize3 =
763      totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize;
764    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
765
766    ((MyCompactingMemStore) memstore).enableCompaction();
767    mss = memstore.getFlushableSize();
768    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
769    assertEquals(0, memstore.getSnapshot().getCellsCount());
770    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
771    // Out of total 10, only 4 cells are unique
772    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
773    totalCellsLen3 = 0;// All duplicated cells.
774    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
775      regionServicesForStores.getMemStoreSize());
776    // Only 4 unique cells left
777    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
778      + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
779
780    mss = memstore.getFlushableSize();
781    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
782    // simulate flusher
783    region.decrMemStoreSize(mss);
784    ImmutableSegment s = memstore.getSnapshot();
785    assertEquals(4, s.getCellsCount());
786    assertEquals(0, regionServicesForStores.getMemStoreSize());
787
788    memstore.clearSnapshot(snapshot.getId());
789  }
790
791  @Test
792  public void testMagicCompaction3Buckets() throws IOException {
793
794    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
795    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
796      String.valueOf(compactionType));
797    memstore.getConfiguration()
798      .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
799    memstore.getConfiguration()
800      .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
801    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
802    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
803
804    String[] keys1 = { "A", "B", "D" };
805    String[] keys2 = { "A" };
806    String[] keys3 = { "A", "A", "B", "C" };
807    String[] keys4 = { "D", "B", "B" };
808
809    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
810    int oneCellOnCSLMHeapSize = 120;
811    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
812    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
813    assertEquals(totalHeapSize, memstore.heapSize());
814
815    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten
816    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
817    assertEquals(1.0,
818      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
819    assertEquals(0, memstore.getSnapshot().getCellsCount());
820
821    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
822    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
823    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
824    assertEquals(1.0,
825      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
826    assertEquals(0, memstore.getSnapshot().getCellsCount());
827
828    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
829    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
830    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
831    assertEquals((4.0 / 8.0),
832      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
833    assertEquals(0, memstore.getSnapshot().getCellsCount());
834
835    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
836    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
837    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
838    assertTrue(4 == numCells || 11 == numCells);
839    assertEquals(0, memstore.getSnapshot().getCellsCount());
840
841    MemStoreSize mss = memstore.getFlushableSize();
842    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
843    // simulate flusher
844    region.decrMemStoreSize(mss);
845    ImmutableSegment s = memstore.getSnapshot();
846    numCells = s.getCellsCount();
847    assertTrue(4 == numCells || 11 == numCells);
848    assertEquals(0, regionServicesForStores.getMemStoreSize());
849
850    memstore.clearSnapshot(snapshot.getId());
851  }
852
853  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
854    byte[] fam = Bytes.toBytes("testfamily");
855    byte[] qf = Bytes.toBytes("testqualifier");
856    long size = hmc.getActive().getDataSize();
857    long heapOverhead = hmc.getActive().getHeapSize();
858    int cellsCount = hmc.getActive().getCellsCount();
859    int totalLen = 0;
860    for (int i = 0; i < keys.length; i++) {
861      long timestamp = EnvironmentEdgeManager.currentTime();
862      Threads.sleep(1); // to make sure each kv gets a different ts
863      byte[] row = Bytes.toBytes(keys[i]);
864      byte[] val = Bytes.toBytes(keys[i] + i);
865      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
866      totalLen += Segment.getCellLength(kv);
867      hmc.add(kv, null);
868      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
869    }
870    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
871      hmc.getActive().getHeapSize() - heapOverhead, 0,
872      hmc.getActive().getCellsCount() - cellsCount);
873    return totalLen;
874  }
875
876  // for controlling the val size when adding a new cell
877  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
878    byte[] fam = Bytes.toBytes("testfamily");
879    byte[] qf = Bytes.toBytes("testqualifier");
880    long size = hmc.getActive().getDataSize();
881    long heapOverhead = hmc.getActive().getHeapSize();
882    int cellsCount = hmc.getActive().getCellsCount();
883    int totalLen = 0;
884    for (int i = 0; i < keys.length; i++) {
885      long timestamp = EnvironmentEdgeManager.currentTime();
886      Threads.sleep(1); // to make sure each kv gets a different ts
887      byte[] row = Bytes.toBytes(keys[i]);
888      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
889      totalLen += Segment.getCellLength(kv);
890      hmc.add(kv, null);
891      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
892    }
893    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
894      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
895    return totalLen;
896  }
897
898  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
899    long t = 1234;
900
901    @Override
902    public long currentTime() {
903      return t;
904    }
905  }
906
907  static protected class MyCompactingMemStore extends CompactingMemStore {
908
909    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
910      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
911      throws IOException {
912      super(conf, c, store, regionServices, compactionPolicy);
913    }
914
915    void disableCompaction() {
916      allowCompaction.set(false);
917    }
918
919    void enableCompaction() {
920      allowCompaction.set(true);
921    }
922
923    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
924      throws IllegalArgumentIOException {
925      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
926    }
927
928  }
929}