001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.CHUNK_SIZE_KEY;
021import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.MAX_ALLOC_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.lang.management.ManagementFactory;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.ThreadLocalRandom;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ByteBufferKeyValue;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.MultithreadedTestUtil;
043import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
044import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
045import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.junit.AfterClass;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
059import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
060
061@Category({ RegionServerTests.class, MediumTests.class })
062public class TestMemStoreLAB {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestMemStoreLAB.class);
067
068  private final static Configuration conf = new Configuration();
069
070  private static final byte[] rk = Bytes.toBytes("r1");
071  private static final byte[] cf = Bytes.toBytes("f");
072  private static final byte[] q = Bytes.toBytes("q");
073
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
077      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
078  }
079
080  @AfterClass
081  public static void tearDownAfterClass() throws Exception {
082    long globalMemStoreLimit =
083      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
084        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
085    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
086      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
087  }
088
089  /**
090   * Test a bunch of random allocations
091   */
092  @Test
093  public void testLABRandomAllocation() {
094    MemStoreLAB mslab = new MemStoreLABImpl();
095    int expectedOff = 0;
096    ByteBuffer lastBuffer = null;
097    int lastChunkId = -1;
098    // 100K iterations by 0-1K alloc -> 50MB expected
099    // should be reasonable for unit test and also cover wraparound
100    // behavior
101    Random rand = ThreadLocalRandom.current();
102    for (int i = 0; i < 100000; i++) {
103      int valSize = rand.nextInt(3);
104      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
105      int size = kv.getSerializedSize();
106      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
107      if (newKv.getBuffer() != lastBuffer) {
108        // since we add the chunkID at the 0th offset of the chunk and the
109        // chunkid is an int we need to account for those 4 bytes
110        expectedOff = Bytes.SIZEOF_INT;
111        lastBuffer = newKv.getBuffer();
112        int chunkId = newKv.getBuffer().getInt(0);
113        assertTrue("chunkid should be different", chunkId != lastChunkId);
114        lastChunkId = chunkId;
115      }
116      assertEquals(expectedOff, newKv.getOffset());
117      assertTrue("Allocation overruns buffer",
118        newKv.getOffset() + size <= newKv.getBuffer().capacity());
119      expectedOff += size;
120    }
121  }
122
123  @Test
124  public void testLABLargeAllocation() {
125    MemStoreLAB mslab = new MemStoreLABImpl();
126    KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
127    Cell newCell = mslab.copyCellInto(kv);
128    assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
129  }
130
131  /**
132   * Test allocation from lots of threads, making sure the results don't overlap in any way
133   */
134  @Test
135  public void testLABThreading() throws Exception {
136    Configuration conf = new Configuration();
137    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
138
139    final AtomicInteger totalAllocated = new AtomicInteger();
140
141    final MemStoreLAB mslab = new MemStoreLABImpl();
142    List<List<AllocRecord>> allocations = Lists.newArrayList();
143
144    for (int i = 0; i < 10; i++) {
145      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
146      allocations.add(allocsByThisThread);
147
148      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
149        @Override
150        public void doAnAction() throws Exception {
151          int valSize = ThreadLocalRandom.current().nextInt(3);
152          KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
153          int size = kv.getSerializedSize();
154          ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
155          totalAllocated.addAndGet(size);
156          allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
157        }
158      };
159      ctx.addThread(t);
160    }
161
162    ctx.startThreads();
163    while (totalAllocated.get() < 50 * 1024 * 1000 && ctx.shouldRun()) {
164      Thread.sleep(10);
165    }
166    ctx.stop();
167    // Partition the allocations by the actual byte[] they point into,
168    // make sure offsets are unique for each chunk
169    Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = Maps.newHashMap();
170
171    int sizeCounted = 0;
172    for (AllocRecord rec : Iterables.concat(allocations)) {
173      sizeCounted += rec.size;
174      if (rec.size == 0) {
175        continue;
176      }
177      Map<Integer, AllocRecord> mapForThisByteArray = mapsByChunk.get(rec.alloc);
178      if (mapForThisByteArray == null) {
179        mapForThisByteArray = Maps.newTreeMap();
180        mapsByChunk.put(rec.alloc, mapForThisByteArray);
181      }
182      AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
183      assertNull("Already had an entry " + oldVal + " for allocation " + rec, oldVal);
184    }
185    assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
186
187    // Now check each byte array to make sure allocations don't overlap
188    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
189      // since we add the chunkID at the 0th offset of the chunk and the
190      // chunkid is an int we need to account for those 4 bytes
191      int expectedOff = Bytes.SIZEOF_INT;
192      for (AllocRecord alloc : allocsInChunk.values()) {
193        assertEquals(expectedOff, alloc.offset);
194        assertTrue("Allocation overruns buffer",
195          alloc.offset + alloc.size <= alloc.alloc.capacity());
196        expectedOff += alloc.size;
197      }
198    }
199  }
200
201  /**
202   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
203   * there's no memory leak (HBASE-16195)
204   * @throws Exception if any error occurred
205   */
206  @Test
207  public void testLABChunkQueue() throws Exception {
208    ChunkCreator oldInstance = null;
209    try {
210      MemStoreLABImpl mslab = new MemStoreLABImpl();
211      // by default setting, there should be no chunks initialized in the pool
212      assertTrue(mslab.getPooledChunks().isEmpty());
213      oldInstance = ChunkCreator.instance;
214      ChunkCreator.instance = null;
215      // reset mslab with chunk pool
216      Configuration conf = HBaseConfiguration.create();
217      conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
218      // set chunk size to default max alloc size, so we could easily trigger chunk retirement
219      conf.setLong(CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
220      // reconstruct mslab
221      long globalMemStoreLimit =
222        (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
223          * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
224      ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, globalMemStoreLimit, 0.1f,
225        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
226        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
227      ChunkCreator.clearDisableFlag();
228      mslab = new MemStoreLABImpl(conf);
229      // launch multiple threads to trigger frequent chunk retirement
230      List<Thread> threads = new ArrayList<>();
231      final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
232        new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
233      for (int i = 0; i < 10; i++) {
234        threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
235      }
236      for (Thread thread : threads) {
237        thread.start();
238      }
239      // let it run for some time
240      Thread.sleep(1000);
241      for (Thread thread : threads) {
242        thread.interrupt();
243      }
244      boolean threadsRunning = true;
245      boolean alive = false;
246      while (threadsRunning) {
247        alive = false;
248        for (Thread thread : threads) {
249          if (thread.isAlive()) {
250            alive = true;
251            break;
252          }
253        }
254        if (!alive) {
255          threadsRunning = false;
256        }
257      }
258      // none of the chunkIds would have been returned back
259      assertTrue("All the chunks must have been cleared",
260        ChunkCreator.instance.numberOfMappedChunks() != 0);
261      Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks);
262      int pooledChunksNum = mslab.getPooledChunks().size();
263      // close the mslab
264      mslab.close();
265      // make sure all chunks where reclaimed back to pool
266      int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds);
267      assertTrue(
268        "All chunks in chunk queue should be reclaimed or removed"
269          + " after mslab closed but actually: " + (pooledChunksNum - queueLength),
270        pooledChunksNum - queueLength == 0);
271    } finally {
272      ChunkCreator.instance = oldInstance;
273    }
274  }
275
276  /**
277   * Test cell with right length, which constructed by testForceCopyOfBigCellInto. (HBASE-26467)
278   */
279  @Test
280  public void testForceCopyOfBigCellInto() {
281    Configuration conf = HBaseConfiguration.create();
282    int chunkSize = ChunkCreator.getInstance().getChunkSize();
283    conf.setInt(CHUNK_SIZE_KEY, chunkSize);
284    conf.setInt(MAX_ALLOC_KEY, chunkSize / 2);
285
286    MemStoreLABImpl mslab = new MemStoreLABImpl(conf);
287    byte[] row = Bytes.toBytes("row");
288    byte[] columnFamily = Bytes.toBytes("columnFamily");
289    byte[] qualify = Bytes.toBytes("qualify");
290    byte[] smallValue = new byte[chunkSize / 2];
291    byte[] bigValue = new byte[chunkSize];
292    KeyValue smallKV =
293      new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), smallValue);
294
295    assertEquals(smallKV.getSerializedSize(),
296      mslab.forceCopyOfBigCellInto(smallKV).getSerializedSize());
297
298    KeyValue bigKV =
299      new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), bigValue);
300    assertEquals(bigKV.getSerializedSize(),
301      mslab.forceCopyOfBigCellInto(bigKV).getSerializedSize());
302
303    /**
304     * Add test by HBASE-26576,all the chunks are in {@link ChunkCreator#chunkIdMap}
305     */
306    assertTrue(mslab.chunks.size() == 2);
307    Chunk dataChunk = null;
308    Chunk jumboChunk = null;
309
310    for (Integer chunkId : mslab.chunks) {
311      Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
312      assertTrue(chunk != null);
313      if (chunk.getChunkType() == ChunkType.JUMBO_CHUNK) {
314        jumboChunk = chunk;
315      } else if (chunk.getChunkType() == ChunkType.DATA_CHUNK) {
316        dataChunk = chunk;
317      }
318    }
319
320    assertTrue(dataChunk != null);
321    assertTrue(jumboChunk != null);
322
323    mslab.close();
324    /**
325     * After mslab close, jumboChunk is removed from {@link ChunkCreator#chunkIdMap} but because
326     * dataChunk is recycled to pool so it is still in {@link ChunkCreator#chunkIdMap}.
327     */
328    assertTrue(ChunkCreator.getInstance().getChunk(jumboChunk.getId()) == null);
329    assertTrue(!ChunkCreator.getInstance().isChunkInPool(jumboChunk.getId()));
330    assertTrue(ChunkCreator.getInstance().getChunk(dataChunk.getId()) == dataChunk);
331    assertTrue(ChunkCreator.getInstance().isChunkInPool(dataChunk.getId()));
332
333  }
334
335  private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
336    Cell cellToCopyInto) {
337    Thread thread = new Thread() {
338      volatile boolean stopped = false;
339
340      @Override
341      public void run() {
342        while (!stopped) {
343          // keep triggering chunk retirement
344          mslab.copyCellInto(cellToCopyInto);
345        }
346      }
347
348      @Override
349      public void interrupt() {
350        this.stopped = true;
351      }
352    };
353    thread.setName(threadName);
354    thread.setDaemon(true);
355    return thread;
356  }
357
358  private static class AllocRecord implements Comparable<AllocRecord> {
359    private final ByteBuffer alloc;
360    private final int offset;
361    private final int size;
362
363    public AllocRecord(ByteBuffer alloc, int offset, int size) {
364      super();
365      this.alloc = alloc;
366      this.offset = offset;
367      this.size = size;
368    }
369
370    @Override
371    public int compareTo(AllocRecord e) {
372      if (alloc != e.alloc) {
373        throw new RuntimeException("Can only compare within a particular array");
374      }
375      return Ints.compare(this.offset, e.offset);
376    }
377
378    @Override
379    public String toString() {
380      return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
381    }
382  }
383}