001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.lang.management.ManagementFactory;
025import java.nio.ByteBuffer;
026import java.util.List;
027import java.util.Random;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ByteBufferKeyValue;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
034import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
035import org.apache.hadoop.hbase.testclassification.RegionServerTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.junit.After;
039import org.junit.AfterClass;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044
045/**
046 * Test the {@link MemStoreChunkPool} class
047 */
048@Category({RegionServerTests.class, SmallTests.class})
049public class TestMemStoreChunkPool {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053      HBaseClassTestRule.forClass(TestMemStoreChunkPool.class);
054
055  private final static Configuration conf = new Configuration();
056  private static ChunkCreator chunkCreator;
057  private static boolean chunkPoolDisabledBeforeTest;
058
059  @BeforeClass
060  public static void setUpBeforeClass() throws Exception {
061    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
062    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
063    chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled;
064    ChunkCreator.chunkPoolDisabled = false;
065    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
066        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
067    chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
068      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
069    assertTrue(chunkCreator != null);
070  }
071
072  @AfterClass
073  public static void tearDownAfterClass() throws Exception {
074    ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
075  }
076
077  @After
078  public void tearDown() throws Exception {
079    chunkCreator.clearChunksInPool();
080  }
081
082  @Test
083  public void testReusingChunks() {
084    Random rand = new Random();
085    MemStoreLAB mslab = new MemStoreLABImpl(conf);
086    int expectedOff = 0;
087    ByteBuffer lastBuffer = null;
088    final byte[] rk = Bytes.toBytes("r1");
089    final byte[] cf = Bytes.toBytes("f");
090    final byte[] q = Bytes.toBytes("q");
091    // Randomly allocate some bytes
092    for (int i = 0; i < 100; i++) {
093      int valSize = rand.nextInt(1000);
094      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
095      int size = KeyValueUtil.length(kv);
096      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
097      if (newKv.getBuffer() != lastBuffer) {
098        expectedOff = 4;
099        lastBuffer = newKv.getBuffer();
100      }
101      assertEquals(expectedOff, newKv.getOffset());
102      assertTrue("Allocation overruns buffer",
103          newKv.getOffset() + size <= newKv.getBuffer().capacity());
104      expectedOff += size;
105    }
106    // chunks will be put back to pool after close
107    mslab.close();
108    int chunkCount = chunkCreator.getPoolSize();
109    assertTrue(chunkCount > 0);
110    // reconstruct mslab
111    mslab = new MemStoreLABImpl(conf);
112    // chunk should be got from the pool, so we can reuse it.
113    KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
114    mslab.copyCellInto(kv);
115    assertEquals(chunkCount - 1, chunkCreator.getPoolSize());
116  }
117
118  @Test
119  public void testPuttingBackChunksAfterFlushing() throws UnexpectedStateException {
120    byte[] row = Bytes.toBytes("testrow");
121    byte[] fam = Bytes.toBytes("testfamily");
122    byte[] qf1 = Bytes.toBytes("testqualifier1");
123    byte[] qf2 = Bytes.toBytes("testqualifier2");
124    byte[] qf3 = Bytes.toBytes("testqualifier3");
125    byte[] qf4 = Bytes.toBytes("testqualifier4");
126    byte[] qf5 = Bytes.toBytes("testqualifier5");
127    byte[] val = Bytes.toBytes("testval");
128
129    DefaultMemStore memstore = new DefaultMemStore();
130
131    // Setting up memstore
132    memstore.add(new KeyValue(row, fam, qf1, val), null);
133    memstore.add(new KeyValue(row, fam, qf2, val), null);
134    memstore.add(new KeyValue(row, fam, qf3, val), null);
135
136    // Creating a snapshot
137    MemStoreSnapshot snapshot = memstore.snapshot();
138    assertEquals(3, memstore.getSnapshot().getCellsCount());
139
140    // Adding value to "new" memstore
141    assertEquals(0, memstore.getActive().getCellsCount());
142    memstore.add(new KeyValue(row, fam, qf4, val), null);
143    memstore.add(new KeyValue(row, fam, qf5, val), null);
144    assertEquals(2, memstore.getActive().getCellsCount());
145    // close the scanner - this is how the snapshot will be used
146    for(KeyValueScanner scanner : snapshot.getScanners()) {
147      scanner.close();
148    }
149    memstore.clearSnapshot(snapshot.getId());
150
151    int chunkCount = chunkCreator.getPoolSize();
152    assertTrue(chunkCount > 0);
153
154  }
155
156  @Test
157  public void testPuttingBackChunksWithOpeningScanner()
158      throws IOException {
159    byte[] row = Bytes.toBytes("testrow");
160    byte[] fam = Bytes.toBytes("testfamily");
161    byte[] qf1 = Bytes.toBytes("testqualifier1");
162    byte[] qf2 = Bytes.toBytes("testqualifier2");
163    byte[] qf3 = Bytes.toBytes("testqualifier3");
164    byte[] qf4 = Bytes.toBytes("testqualifier4");
165    byte[] qf5 = Bytes.toBytes("testqualifier5");
166    byte[] qf6 = Bytes.toBytes("testqualifier6");
167    byte[] qf7 = Bytes.toBytes("testqualifier7");
168    byte[] val = Bytes.toBytes("testval");
169
170    DefaultMemStore memstore = new DefaultMemStore();
171
172    // Setting up memstore
173    memstore.add(new KeyValue(row, fam, qf1, val), null);
174    memstore.add(new KeyValue(row, fam, qf2, val), null);
175    memstore.add(new KeyValue(row, fam, qf3, val), null);
176
177    // Creating a snapshot
178    MemStoreSnapshot snapshot = memstore.snapshot();
179    assertEquals(3, memstore.getSnapshot().getCellsCount());
180
181    // Adding value to "new" memstore
182    assertEquals(0, memstore.getActive().getCellsCount());
183    memstore.add(new KeyValue(row, fam, qf4, val), null);
184    memstore.add(new KeyValue(row, fam, qf5, val), null);
185    assertEquals(2, memstore.getActive().getCellsCount());
186
187    // opening scanner before clear the snapshot
188    List<KeyValueScanner> scanners = memstore.getScanners(0);
189    // Shouldn't putting back the chunks to pool,since some scanners are opening
190    // based on their data
191    // close the snapshot scanner
192    for(KeyValueScanner scanner : snapshot.getScanners()) {
193      scanner.close();
194    }
195    memstore.clearSnapshot(snapshot.getId());
196
197    assertTrue(chunkCreator.getPoolSize() == 0);
198
199    // Chunks will be put back to pool after close scanners;
200    for (KeyValueScanner scanner : scanners) {
201      scanner.close();
202    }
203    assertTrue(chunkCreator.getPoolSize() > 0);
204
205    // clear chunks
206    chunkCreator.clearChunksInPool();
207
208    // Creating another snapshot
209    snapshot = memstore.snapshot();
210    // Adding more value
211    memstore.add(new KeyValue(row, fam, qf6, val), null);
212    memstore.add(new KeyValue(row, fam, qf7, val), null);
213    // opening scanners
214    scanners = memstore.getScanners(0);
215    // close scanners before clear the snapshot
216    for (KeyValueScanner scanner : scanners) {
217      scanner.close();
218    }
219    // Since no opening scanner, the chunks of snapshot should be put back to
220    // pool
221    // close the snapshot scanners
222    for(KeyValueScanner scanner : snapshot.getScanners()) {
223      scanner.close();
224    }
225    memstore.clearSnapshot(snapshot.getId());
226    assertTrue(chunkCreator.getPoolSize() > 0);
227  }
228
229  @Test
230  public void testPutbackChunksMultiThreaded() throws Exception {
231    final int maxCount = 10;
232    final int initialCount = 5;
233    final int chunkSize = 40;
234    final int valSize = 7;
235    ChunkCreator oldCreator = ChunkCreator.getInstance();
236    ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null, 0);
237    assertEquals(initialCount, newCreator.getPoolSize());
238    assertEquals(maxCount, newCreator.getMaxCount());
239    ChunkCreator.instance = newCreator;// Replace the global ref with the new one we created.
240                                             // Used it for the testing. Later in finally we put
241                                             // back the original
242    final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
243        new byte[valSize]);
244    try {
245      Runnable r = new Runnable() {
246        @Override
247        public void run() {
248          MemStoreLAB memStoreLAB = new MemStoreLABImpl(conf);
249          for (int i = 0; i < maxCount; i++) {
250            memStoreLAB.copyCellInto(kv);// Try allocate size = chunkSize. Means every
251                                         // allocate call will result in a new chunk
252          }
253          // Close MemStoreLAB so that all chunks will be tried to be put back to pool
254          memStoreLAB.close();
255        }
256      };
257      Thread t1 = new Thread(r);
258      Thread t2 = new Thread(r);
259      Thread t3 = new Thread(r);
260      t1.start();
261      t2.start();
262      t3.start();
263      t1.join();
264      t2.join();
265      t3.join();
266      assertTrue(newCreator.getPoolSize() <= maxCount);
267    } finally {
268      ChunkCreator.instance = oldCreator;
269    }
270  }
271}