001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeepDeletedCells;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValueTestUtil;
043import org.apache.hadoop.hbase.MemoryCompactionPolicy;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
053import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdge;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.junit.After;
062import org.junit.Before;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.mockito.Mockito;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * compacted memstore test case
072 */
073@Category({RegionServerTests.class, MediumTests.class})
074public class TestCompactingMemStore extends TestDefaultMemStore {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078      HBaseClassTestRule.forClass(TestCompactingMemStore.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
081  protected static ChunkCreator chunkCreator;
082  protected HRegion region;
083  protected RegionServicesForStores regionServicesForStores;
084  protected HStore store;
085
086  //////////////////////////////////////////////////////////////////////////////
087  // Helpers
088  //////////////////////////////////////////////////////////////////////////////
089  protected static byte[] makeQualifier(final int i1, final int i2) {
090    return Bytes.toBytes(Integer.toString(i1) + ";" +
091        Integer.toString(i2));
092  }
093
094  @After
095  public void tearDown() throws Exception {
096    chunkCreator.clearChunksInPool();
097  }
098
099  @Override
100  @Before
101  public void setUp() throws Exception {
102    compactingSetUp();
103    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparator.getInstance(),
104        store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
105    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
106  }
107
108  protected void compactingSetUp() throws Exception {
109    super.internalSetUp();
110    Configuration conf = new Configuration();
111    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
112    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
113    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
114    HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf);
115    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY);
116    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar"))
117      .setColumnFamily(familyDescriptor).build();
118    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build();
119    WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info);
120    this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf,
121      tableDescriptor, wal, true);
122    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
123    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
124    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
125    this.store = new HStore(region, familyDescriptor, conf, false);
126
127    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
128        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
129    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
130        globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
131            null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
132    assertNotNull(chunkCreator);
133  }
134
135  /**
136   * A simple test which flush in memory affect timeOfOldestEdit
137   */
138  @Test
139  public void testTimeOfOldestEdit() {
140    assertEquals(Long.MAX_VALUE,  memstore.timeOfOldestEdit());
141    final byte[] r = Bytes.toBytes("r");
142    final byte[] f = Bytes.toBytes("f");
143    final byte[] q = Bytes.toBytes("q");
144    final byte[] v = Bytes.toBytes("v");
145    final KeyValue kv = new KeyValue(r, f, q, v);
146    memstore.add(kv, null);
147    long timeOfOldestEdit = memstore.timeOfOldestEdit();
148    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
149
150    ((CompactingMemStore)memstore).flushInMemory();
151    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
152    memstore.add(kv, null);
153    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
154    memstore.snapshot();
155    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
156  }
157
158  /**
159   * A simple test which verifies the 3 possible states when scanning across snapshot.
160   *
161   * @throws IOException
162   * @throws CloneNotSupportedException
163   */
164  @Override
165  @Test
166  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
167    // we are going to the scanning across snapshot with two kvs
168    // kv1 should always be returned before kv2
169    final byte[] one = Bytes.toBytes(1);
170    final byte[] two = Bytes.toBytes(2);
171    final byte[] f = Bytes.toBytes("f");
172    final byte[] q = Bytes.toBytes("q");
173    final byte[] v = Bytes.toBytes(3);
174
175    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
176    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
177
178    // use case 1: both kvs in kvset
179    this.memstore.add(kv1.clone(), null);
180    this.memstore.add(kv2.clone(), null);
181    // snapshot is empty,active segment is not empty,
182    // empty segment is skipped.
183    verifyOneScanAcrossSnapshot2(kv1, kv2);
184
185    // use case 2: both kvs in snapshot
186    this.memstore.snapshot();
187    // active segment is empty,snapshot is not empty,
188    // empty segment is skipped.
189    verifyOneScanAcrossSnapshot2(kv1, kv2);
190
191    // use case 3: first in snapshot second in kvset
192    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
193        CellComparator.getInstance(), store, regionServicesForStores,
194        MemoryCompactionPolicy.EAGER);
195    this.memstore.add(kv1.clone(), null);
196    // As compaction is starting in the background the repetition
197    // of the k1 might be removed BUT the scanners created earlier
198    // should look on the OLD MutableCellSetSegment, so this should be OK...
199    this.memstore.snapshot();
200    this.memstore.add(kv2.clone(), null);
201    verifyScanAcrossSnapshot2(kv1,kv2);
202  }
203
204  /**
205   * Test memstore snapshots
206   * @throws IOException
207   */
208  @Override
209  @Test
210  public void testSnapshotting() throws IOException {
211    final int snapshotCount = 5;
212    // Add some rows, run a snapshot. Do it a few times.
213    for (int i = 0; i < snapshotCount; i++) {
214      addRows(this.memstore);
215      runSnapshot(this.memstore, true);
216      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
217    }
218  }
219
220
221  //////////////////////////////////////////////////////////////////////////////
222  // Get tests
223  //////////////////////////////////////////////////////////////////////////////
224
225  /** Test getNextRow from memstore
226   * @throws InterruptedException
227   */
228  @Override
229  @Test
230  public void testGetNextRow() throws Exception {
231    addRows(this.memstore);
232    // Add more versions to make it a little more interesting.
233    Thread.sleep(1);
234    addRows(this.memstore);
235    Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
236    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
237        new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0);
238    for (int i = 0; i < ROW_COUNT; i++) {
239      Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
240        EnvironmentEdgeManager.currentTime()));
241      if (i + 1 == ROW_COUNT) {
242        assertNull(nr);
243      } else {
244        assertTrue(CellComparator.getInstance().compareRows(nr,
245            new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0);
246      }
247    }
248    //starting from each row, validate results should contain the starting row
249    Configuration conf = HBaseConfiguration.create();
250    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
251      ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
252          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
253      try (InternalScanner scanner =
254          new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
255              memstore.getScanners(0))) {
256        List<Cell> results = new ArrayList<>();
257        for (int i = 0; scanner.next(results); i++) {
258          int rowId = startRowId + i;
259          Cell left = results.get(0);
260          byte[] row1 = Bytes.toBytes(rowId);
261          assertTrue("Row name",
262            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
263          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
264          List<Cell> row = new ArrayList<>();
265          for (Cell kv : results) {
266            row.add(kv);
267          }
268          isExpectedRowWithoutTimestamps(rowId, row);
269          // Clear out set. Otherwise row results accumulate.
270          results.clear();
271        }
272      }
273    }
274  }
275
276  @Override
277  @Test
278  public void testGet_memstoreAndSnapShot() throws IOException {
279    byte[] row = Bytes.toBytes("testrow");
280    byte[] fam = Bytes.toBytes("testfamily");
281    byte[] qf1 = Bytes.toBytes("testqualifier1");
282    byte[] qf2 = Bytes.toBytes("testqualifier2");
283    byte[] qf3 = Bytes.toBytes("testqualifier3");
284    byte[] qf4 = Bytes.toBytes("testqualifier4");
285    byte[] qf5 = Bytes.toBytes("testqualifier5");
286    byte[] val = Bytes.toBytes("testval");
287
288    //Setting up memstore
289    memstore.add(new KeyValue(row, fam, qf1, val), null);
290    memstore.add(new KeyValue(row, fam, qf2, val), null);
291    memstore.add(new KeyValue(row, fam, qf3, val), null);
292    //Pushing to pipeline
293    ((CompactingMemStore)memstore).flushInMemory();
294    assertEquals(0, memstore.getSnapshot().getCellsCount());
295    //Creating a snapshot
296    memstore.snapshot();
297    assertEquals(3, memstore.getSnapshot().getCellsCount());
298    //Adding value to "new" memstore
299    assertEquals(0, memstore.getActive().getCellsCount());
300    memstore.add(new KeyValue(row, fam, qf4, val), null);
301    memstore.add(new KeyValue(row, fam, qf5, val), null);
302    assertEquals(2, memstore.getActive().getCellsCount());
303  }
304
305  ////////////////////////////////////
306  // Test for periodic memstore flushes
307  // based on time of oldest edit
308  ////////////////////////////////////
309
310  /**
311   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased
312   * as older keyvalues are deleted from the memstore.
313   *
314   * @throws Exception
315   */
316  @Override
317  @Test
318  public void testUpsertMemstoreSize() throws Exception {
319    MemStoreSize oldSize = memstore.size();
320
321    List<Cell> l = new ArrayList<>();
322    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
323    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
324    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
325
326    kv1.setSequenceId(1);
327    kv2.setSequenceId(1);
328    kv3.setSequenceId(1);
329    l.add(kv1);
330    l.add(kv2);
331    l.add(kv3);
332
333    this.memstore.upsert(l, 2, null);// readpoint is 2
334    MemStoreSize newSize = this.memstore.size();
335    assert (newSize.getDataSize() > oldSize.getDataSize());
336    //The kv1 should be removed.
337    assert (memstore.getActive().getCellsCount() == 2);
338
339    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
340    kv4.setSequenceId(1);
341    l.clear();
342    l.add(kv4);
343    this.memstore.upsert(l, 3, null);
344    assertEquals(newSize, this.memstore.size());
345    //The kv2 should be removed.
346    assert (memstore.getActive().getCellsCount() == 2);
347    //this.memstore = null;
348  }
349
350  /**
351   * Tests that the timeOfOldestEdit is updated correctly for the
352   * various edit operations in memstore.
353   */
354  @Override
355  @Test
356  public void testUpdateToTimeOfOldestEdit() throws Exception {
357    try {
358      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
359      EnvironmentEdgeManager.injectEdge(edge);
360      long t = memstore.timeOfOldestEdit();
361      assertEquals(Long.MAX_VALUE, t);
362
363      // test the case that the timeOfOldestEdit is updated after a KV add
364      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
365      t = memstore.timeOfOldestEdit();
366      assertTrue(t == 1234);
367      // The method will also assert
368      // the value is reset to Long.MAX_VALUE
369      t = runSnapshot(memstore, true);
370
371      // test the case that the timeOfOldestEdit is updated after a KV delete
372      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
373      t = memstore.timeOfOldestEdit();
374      assertTrue(t == 1234);
375     t = runSnapshot(memstore, true);
376
377      // test the case that the timeOfOldestEdit is updated after a KV upsert
378      List<Cell> l = new ArrayList<>();
379      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
380      kv1.setSequenceId(100);
381      l.add(kv1);
382      memstore.upsert(l, 1000, null);
383      t = memstore.timeOfOldestEdit();
384      assertTrue(t == 1234);
385    } finally {
386      EnvironmentEdgeManager.reset();
387    }
388  }
389
390  private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
391      throws IOException {
392    // Save off old state.
393    long oldHistorySize = hmc.getSnapshot().getDataSize();
394    long prevTimeStamp = hmc.timeOfOldestEdit();
395
396    hmc.snapshot();
397    MemStoreSnapshot snapshot = hmc.snapshot();
398    if (useForce) {
399      // Make some assertions about what just happened.
400      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
401      long t = hmc.timeOfOldestEdit();
402      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
403      hmc.clearSnapshot(snapshot.getId());
404    } else {
405      long t = hmc.timeOfOldestEdit();
406      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
407    }
408    return prevTimeStamp;
409  }
410
411  private void isExpectedRowWithoutTimestamps(final int rowIndex,
412      List<Cell> kvs) {
413    int i = 0;
414    for (Cell kv : kvs) {
415      byte[] expectedColname = makeQualifier(rowIndex, i++);
416      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
417      // Value is column name as bytes.  Usually result is
418      // 100 bytes in size at least. This is the default size
419      // for BytesWriteable.  For comparison, convert bytes to
420      // String and trim to remove trailing null bytes.
421      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
422    }
423  }
424
425  @Test
426  public void testPuttingBackChunksAfterFlushing() throws IOException {
427    byte[] row = Bytes.toBytes("testrow");
428    byte[] fam = Bytes.toBytes("testfamily");
429    byte[] qf1 = Bytes.toBytes("testqualifier1");
430    byte[] qf2 = Bytes.toBytes("testqualifier2");
431    byte[] qf3 = Bytes.toBytes("testqualifier3");
432    byte[] qf4 = Bytes.toBytes("testqualifier4");
433    byte[] qf5 = Bytes.toBytes("testqualifier5");
434    byte[] val = Bytes.toBytes("testval");
435
436    // Setting up memstore
437    memstore.add(new KeyValue(row, fam, qf1, val), null);
438    memstore.add(new KeyValue(row, fam, qf2, val), null);
439    memstore.add(new KeyValue(row, fam, qf3, val), null);
440
441    // Creating a snapshot
442    MemStoreSnapshot snapshot = memstore.snapshot();
443    assertEquals(3, memstore.getSnapshot().getCellsCount());
444
445    // Adding value to "new" memstore
446    assertEquals(0, memstore.getActive().getCellsCount());
447    memstore.add(new KeyValue(row, fam, qf4, val), null);
448    memstore.add(new KeyValue(row, fam, qf5, val), null);
449    assertEquals(2, memstore.getActive().getCellsCount());
450    // close the scanners
451    for(KeyValueScanner scanner : snapshot.getScanners()) {
452      scanner.close();
453    }
454    memstore.clearSnapshot(snapshot.getId());
455
456    int chunkCount = chunkCreator.getPoolSize();
457    assertTrue(chunkCount > 0);
458
459  }
460
461  @Test
462  public void testPuttingBackChunksWithOpeningScanner()
463      throws IOException {
464    byte[] row = Bytes.toBytes("testrow");
465    byte[] fam = Bytes.toBytes("testfamily");
466    byte[] qf1 = Bytes.toBytes("testqualifier1");
467    byte[] qf2 = Bytes.toBytes("testqualifier2");
468    byte[] qf3 = Bytes.toBytes("testqualifier3");
469    byte[] qf4 = Bytes.toBytes("testqualifier4");
470    byte[] qf5 = Bytes.toBytes("testqualifier5");
471    byte[] qf6 = Bytes.toBytes("testqualifier6");
472    byte[] qf7 = Bytes.toBytes("testqualifier7");
473    byte[] val = Bytes.toBytes("testval");
474
475    // Setting up memstore
476    memstore.add(new KeyValue(row, fam, qf1, val), null);
477    memstore.add(new KeyValue(row, fam, qf2, val), null);
478    memstore.add(new KeyValue(row, fam, qf3, val), null);
479
480    // Creating a snapshot
481    MemStoreSnapshot snapshot = memstore.snapshot();
482    assertEquals(3, memstore.getSnapshot().getCellsCount());
483
484    // Adding value to "new" memstore
485    assertEquals(0, memstore.getActive().getCellsCount());
486    memstore.add(new KeyValue(row, fam, qf4, val), null);
487    memstore.add(new KeyValue(row, fam, qf5, val), null);
488    assertEquals(2, memstore.getActive().getCellsCount());
489
490    // opening scanner before clear the snapshot
491    List<KeyValueScanner> scanners = memstore.getScanners(0);
492    // Shouldn't putting back the chunks to pool,since some scanners are opening
493    // based on their data
494    // close the scanners
495    for(KeyValueScanner scanner : snapshot.getScanners()) {
496      scanner.close();
497    }
498    memstore.clearSnapshot(snapshot.getId());
499
500    assertTrue(chunkCreator.getPoolSize() == 0);
501
502    // Chunks will be put back to pool after close scanners;
503    for (KeyValueScanner scanner : scanners) {
504      scanner.close();
505    }
506    assertTrue(chunkCreator.getPoolSize() > 0);
507
508    // clear chunks
509    chunkCreator.clearChunksInPool();
510
511    // Creating another snapshot
512
513    snapshot = memstore.snapshot();
514    // Adding more value
515    memstore.add(new KeyValue(row, fam, qf6, val), null);
516    memstore.add(new KeyValue(row, fam, qf7, val), null);
517    // opening scanners
518    scanners = memstore.getScanners(0);
519    // close scanners before clear the snapshot
520    for (KeyValueScanner scanner : scanners) {
521      scanner.close();
522    }
523    // Since no opening scanner, the chunks of snapshot should be put back to
524    // pool
525    // close the scanners
526    for(KeyValueScanner scanner : snapshot.getScanners()) {
527      scanner.close();
528    }
529    memstore.clearSnapshot(snapshot.getId());
530    assertTrue(chunkCreator.getPoolSize() > 0);
531  }
532
533  @Test
534  public void testPuttingBackChunksWithOpeningPipelineScanner()
535      throws IOException {
536
537    // set memstore to do data compaction and not to use the speculative scan
538    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
539    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
540        String.valueOf(compactionType));
541    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
542
543    byte[] row = Bytes.toBytes("testrow");
544    byte[] fam = Bytes.toBytes("testfamily");
545    byte[] qf1 = Bytes.toBytes("testqualifier1");
546    byte[] qf2 = Bytes.toBytes("testqualifier2");
547    byte[] qf3 = Bytes.toBytes("testqualifier3");
548    byte[] val = Bytes.toBytes("testval");
549
550    // Setting up memstore
551    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
552    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
553    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
554
555    // Creating a pipeline
556    ((MyCompactingMemStore)memstore).disableCompaction();
557    ((CompactingMemStore)memstore).flushInMemory();
558
559    // Adding value to "new" memstore
560    assertEquals(0, memstore.getActive().getCellsCount());
561    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
562    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
563    assertEquals(2, memstore.getActive().getCellsCount());
564
565    // pipeline bucket 2
566    ((CompactingMemStore)memstore).flushInMemory();
567    // opening scanner before force flushing
568    List<KeyValueScanner> scanners = memstore.getScanners(0);
569    // Shouldn't putting back the chunks to pool,since some scanners are opening
570    // based on their data
571    ((MyCompactingMemStore)memstore).enableCompaction();
572    // trigger compaction
573    ((CompactingMemStore)memstore).flushInMemory();
574
575    // Adding value to "new" memstore
576    assertEquals(0, memstore.getActive().getCellsCount());
577    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
578    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
579    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
580    assertEquals(3, memstore.getActive().getCellsCount());
581
582    assertTrue(chunkCreator.getPoolSize() == 0);
583
584    // Chunks will be put back to pool after close scanners;
585    for (KeyValueScanner scanner : scanners) {
586      scanner.close();
587    }
588    assertTrue(chunkCreator.getPoolSize() > 0);
589
590    // clear chunks
591    chunkCreator.clearChunksInPool();
592
593    // Creating another snapshot
594
595    MemStoreSnapshot snapshot = memstore.snapshot();
596    // close the scanners
597    for(KeyValueScanner scanner : snapshot.getScanners()) {
598      scanner.close();
599    }
600    memstore.clearSnapshot(snapshot.getId());
601
602    snapshot = memstore.snapshot();
603    // Adding more value
604    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
605    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
606    // opening scanners
607    scanners = memstore.getScanners(0);
608    // close scanners before clear the snapshot
609    for (KeyValueScanner scanner : scanners) {
610      scanner.close();
611    }
612    // Since no opening scanner, the chunks of snapshot should be put back to
613    // pool
614    // close the scanners
615    for(KeyValueScanner scanner : snapshot.getScanners()) {
616      scanner.close();
617    }
618    memstore.clearSnapshot(snapshot.getId());
619    assertTrue(chunkCreator.getPoolSize() > 0);
620  }
621
622  //////////////////////////////////////////////////////////////////////////////
623  // Compaction tests
624  //////////////////////////////////////////////////////////////////////////////
625  @Test
626  public void testCompaction1Bucket() throws IOException {
627
628    // set memstore to do basic structure flattening, the "eager" option is tested in
629    // TestCompactingToCellFlatMapMemStore
630    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
631    memstore.getConfiguration()
632        .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
633    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
634
635    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
636
637    // test 1 bucket
638    int totalCellsLen = addRowsByKeys(memstore, keys1);
639    int oneCellOnCSLMHeapSize = 120;
640    int oneCellOnCAHeapSize = 88;
641    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
642    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
643    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
644
645    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
646    assertEquals(0, memstore.getSnapshot().getCellsCount());
647    // There is no compaction, as the compacting memstore type is basic.
648    // totalCellsLen remains the same
649    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
650        + 4 * oneCellOnCAHeapSize;
651    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
652    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
653
654    MemStoreSize mss = memstore.getFlushableSize();
655    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
656    // simulate flusher
657    region.decrMemStoreSize(mss);
658    ImmutableSegment s = memstore.getSnapshot();
659    assertEquals(4, s.getCellsCount());
660    assertEquals(0, regionServicesForStores.getMemStoreSize());
661
662    memstore.clearSnapshot(snapshot.getId());
663  }
664
665  @Test
666  public void testCompaction2Buckets() throws IOException {
667
668    // set memstore to do basic structure flattening, the "eager" option is tested in
669    // TestCompactingToCellFlatMapMemStore
670    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
671    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
672        String.valueOf(compactionType));
673    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
674        String.valueOf(1));
675    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
676    String[] keys1 = { "A", "A", "B", "C" };
677    String[] keys2 = { "A", "B", "D" };
678
679    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
680    int oneCellOnCSLMHeapSize = 120;
681    int oneCellOnCAHeapSize = 88;
682    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
683
684    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
685    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
686
687    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
688    int counter = 0;
689    for ( Segment s : memstore.getSegments()) {
690      counter += s.getCellsCount();
691    }
692    assertEquals(4, counter);
693    assertEquals(0, memstore.getSnapshot().getCellsCount());
694    // There is no compaction, as the compacting memstore type is basic.
695    // totalCellsLen remains the same
696    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
697    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
698        + 4 * oneCellOnCAHeapSize;
699    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
700
701    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
702    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
703    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
704    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
705
706    MemStoreSize mss = memstore.getFlushableSize();
707    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
708    assertEquals(0, memstore.getSnapshot().getCellsCount());
709    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
710    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
711        + 7 * oneCellOnCAHeapSize;
712    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
713
714    mss = memstore.getFlushableSize();
715    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
716    // simulate flusher
717    region.decrMemStoreSize(mss);
718    ImmutableSegment s = memstore.getSnapshot();
719    assertEquals(7, s.getCellsCount());
720    assertEquals(0, regionServicesForStores.getMemStoreSize());
721
722    memstore.clearSnapshot(snapshot.getId());
723  }
724
725  @Test
726  public void testCompaction3Buckets() throws IOException {
727
728    // set memstore to do data compaction and not to use the speculative scan
729    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
730    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
731        String.valueOf(compactionType));
732    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
733    String[] keys1 = { "A", "A", "B", "C" };
734    String[] keys2 = { "A", "B", "D" };
735    String[] keys3 = { "D", "B", "B" };
736
737    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
738    int oneCellOnCSLMHeapSize = 120;
739    int oneCellOnCAHeapSize = 88;
740    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
741    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
742    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
743    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
744
745    assertEquals(0, memstore.getSnapshot().getCellsCount());
746    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
747    // totalCellsLen
748    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
749    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
750    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
751    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
752        + 3 * oneCellOnCAHeapSize;
753    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
754
755    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
756    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
757
758    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
759    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
760
761    ((MyCompactingMemStore) memstore).disableCompaction();
762    MemStoreSize mss = memstore.getFlushableSize();
763    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
764    assertEquals(0, memstore.getSnapshot().getCellsCount());
765    // No change in the cells data size. ie. memstore size. as there is no compaction.
766    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
767    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
768        ((CompactingMemStore) memstore).heapSize());
769
770    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
771    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
772        regionServicesForStores.getMemStoreSize());
773    long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
774        + 3 * oneCellOnCSLMHeapSize;
775    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
776
777    ((MyCompactingMemStore)memstore).enableCompaction();
778    mss = memstore.getFlushableSize();
779    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
780    assertEquals(0, memstore.getSnapshot().getCellsCount());
781    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
782    // Out of total 10, only 4 cells are unique
783    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
784    totalCellsLen3 = 0;// All duplicated cells.
785    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
786        regionServicesForStores.getMemStoreSize());
787    // Only 4 unique cells left
788    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
789        + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
790
791    mss = memstore.getFlushableSize();
792    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
793    // simulate flusher
794    region.decrMemStoreSize(mss);
795    ImmutableSegment s = memstore.getSnapshot();
796    assertEquals(4, s.getCellsCount());
797    assertEquals(0, regionServicesForStores.getMemStoreSize());
798
799    memstore.clearSnapshot(snapshot.getId());
800  }
801
802  @Test
803  public void testMagicCompaction3Buckets() throws IOException {
804
805    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
806    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
807        String.valueOf(compactionType));
808    memstore.getConfiguration().setDouble(
809        AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
810    memstore.getConfiguration().setInt(
811        AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
812    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
813    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
814
815    String[] keys1 = { "A", "B", "D" };
816    String[] keys2 = { "A" };
817    String[] keys3 = { "A", "A", "B", "C" };
818    String[] keys4 = { "D", "B", "B" };
819
820    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
821    int oneCellOnCSLMHeapSize = 120;
822    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
823    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
824    assertEquals(totalHeapSize, memstore.heapSize());
825
826    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten
827    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
828    assertEquals(1.0,
829        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
830    assertEquals(0, memstore.getSnapshot().getCellsCount());
831
832    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
833    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
834    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
835    assertEquals(1.0,
836        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
837    assertEquals(0, memstore.getSnapshot().getCellsCount());
838
839    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
840    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
841    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
842    assertEquals((4.0 / 8.0),
843        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
844    assertEquals(0, memstore.getSnapshot().getCellsCount());
845
846    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
847    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
848    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
849    assertTrue(4 == numCells || 11 == numCells);
850    assertEquals(0, memstore.getSnapshot().getCellsCount());
851
852    MemStoreSize mss = memstore.getFlushableSize();
853    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
854    // simulate flusher
855    region.decrMemStoreSize(mss);
856    ImmutableSegment s = memstore.getSnapshot();
857    numCells = s.getCellsCount();
858    assertTrue(4 == numCells || 11 == numCells);
859    assertEquals(0, regionServicesForStores.getMemStoreSize());
860
861    memstore.clearSnapshot(snapshot.getId());
862  }
863
864  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
865    byte[] fam = Bytes.toBytes("testfamily");
866    byte[] qf = Bytes.toBytes("testqualifier");
867    long size = hmc.getActive().getDataSize();
868    long heapOverhead = hmc.getActive().getHeapSize();
869    int cellsCount = hmc.getActive().getCellsCount();
870    int totalLen = 0;
871    for (int i = 0; i < keys.length; i++) {
872      long timestamp = EnvironmentEdgeManager.currentTime();
873      Threads.sleep(1); // to make sure each kv gets a different ts
874      byte[] row = Bytes.toBytes(keys[i]);
875      byte[] val = Bytes.toBytes(keys[i] + i);
876      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
877      totalLen += Segment.getCellLength(kv);
878      hmc.add(kv, null);
879      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
880    }
881    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
882      hmc.getActive().getHeapSize() - heapOverhead, 0,
883      hmc.getActive().getCellsCount() - cellsCount);
884    return totalLen;
885  }
886
887  // for controlling the val size when adding a new cell
888  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
889    byte[] fam = Bytes.toBytes("testfamily");
890    byte[] qf = Bytes.toBytes("testqualifier");
891    long size = hmc.getActive().getDataSize();
892    long heapOverhead = hmc.getActive().getHeapSize();
893    int cellsCount = hmc.getActive().getCellsCount();
894    int totalLen = 0;
895    for (int i = 0; i < keys.length; i++) {
896      long timestamp = EnvironmentEdgeManager.currentTime();
897      Threads.sleep(1); // to make sure each kv gets a different ts
898      byte[] row = Bytes.toBytes(keys[i]);
899      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
900      totalLen += Segment.getCellLength(kv);
901      hmc.add(kv, null);
902      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
903    }
904    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
905      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
906    return totalLen;
907  }
908
909  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
910    long t = 1234;
911
912    @Override
913    public long currentTime() {
914      return t;
915    }
916  }
917
918  static protected class MyCompactingMemStore extends CompactingMemStore {
919
920    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
921        RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
922        throws IOException {
923      super(conf, c, store, regionServices, compactionPolicy);
924    }
925
926    void disableCompaction() {
927      allowCompaction.set(false);
928    }
929
930    void enableCompaction() {
931      allowCompaction.set(true);
932    }
933    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
934        throws IllegalArgumentIOException {
935      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
936    }
937
938  }
939}