001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.*;
021
022import java.lang.management.ManagementFactory;
023import java.nio.ByteBuffer;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.Random;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ByteBufferKeyValue;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.MultithreadedTestUtil;
036import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
037import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
050import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
051
052@Category({RegionServerTests.class, SmallTests.class})
053public class TestMemStoreLAB {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057      HBaseClassTestRule.forClass(TestMemStoreLAB.class);
058
059  private final static Configuration conf = new Configuration();
060
061  private static final byte[] rk = Bytes.toBytes("r1");
062  private static final byte[] cf = Bytes.toBytes("f");
063  private static final byte[] q = Bytes.toBytes("q");
064
065  @BeforeClass
066  public static void setUpBeforeClass() throws Exception {
067    ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
068        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
069  }
070
071  @AfterClass
072  public static void tearDownAfterClass() throws Exception {
073    long globalMemStoreLimit =
074        (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
075            * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
076    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
077      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
078  }
079
080  /**
081   * Test a bunch of random allocations
082   */
083  @Test
084  public void testLABRandomAllocation() {
085    Random rand = new Random();
086    MemStoreLAB mslab = new MemStoreLABImpl();
087    int expectedOff = 0;
088    ByteBuffer lastBuffer = null;
089    int lastChunkId = -1;
090    // 100K iterations by 0-1K alloc -> 50MB expected
091    // should be reasonable for unit test and also cover wraparound
092    // behavior
093    for (int i = 0; i < 100000; i++) {
094      int valSize = rand.nextInt(3);
095      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
096      int size = kv.getSerializedSize();
097      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
098      if (newKv.getBuffer() != lastBuffer) {
099        // since we add the chunkID at the 0th offset of the chunk and the
100        // chunkid is an int we need to account for those 4 bytes
101        expectedOff = Bytes.SIZEOF_INT;
102        lastBuffer = newKv.getBuffer();
103        int chunkId = newKv.getBuffer().getInt(0);
104        assertTrue("chunkid should be different", chunkId != lastChunkId);
105        lastChunkId = chunkId;
106      }
107      assertEquals(expectedOff, newKv.getOffset());
108      assertTrue("Allocation overruns buffer",
109          newKv.getOffset() + size <= newKv.getBuffer().capacity());
110      expectedOff += size;
111    }
112  }
113
114  @Test
115  public void testLABLargeAllocation() {
116    MemStoreLAB mslab = new MemStoreLABImpl();
117    KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
118    Cell newCell = mslab.copyCellInto(kv);
119    assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
120  }
121
122  /**
123   * Test allocation from lots of threads, making sure the results don't
124   * overlap in any way
125   */
126  @Test
127  public void testLABThreading() throws Exception {
128    Configuration conf = new Configuration();
129    MultithreadedTestUtil.TestContext ctx =
130      new MultithreadedTestUtil.TestContext(conf);
131
132    final AtomicInteger totalAllocated = new AtomicInteger();
133
134    final MemStoreLAB mslab = new MemStoreLABImpl();
135    List<List<AllocRecord>> allocations = Lists.newArrayList();
136
137    for (int i = 0; i < 10; i++) {
138      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
139      allocations.add(allocsByThisThread);
140
141      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
142        private Random r = new Random();
143        @Override
144        public void doAnAction() throws Exception {
145          int valSize = r.nextInt(3);
146          KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
147          int size = kv.getSerializedSize();
148          ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
149          totalAllocated.addAndGet(size);
150          allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
151        }
152      };
153      ctx.addThread(t);
154    }
155
156    ctx.startThreads();
157    while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) {
158      Thread.sleep(10);
159    }
160    ctx.stop();
161    // Partition the allocations by the actual byte[] they point into,
162    // make sure offsets are unique for each chunk
163    Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk =
164      Maps.newHashMap();
165
166    int sizeCounted = 0;
167    for (AllocRecord rec : Iterables.concat(allocations)) {
168      sizeCounted += rec.size;
169      if (rec.size == 0) continue;
170      Map<Integer, AllocRecord> mapForThisByteArray =
171        mapsByChunk.get(rec.alloc);
172      if (mapForThisByteArray == null) {
173        mapForThisByteArray = Maps.newTreeMap();
174        mapsByChunk.put(rec.alloc, mapForThisByteArray);
175      }
176      AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
177      assertNull("Already had an entry " + oldVal + " for allocation " + rec,
178          oldVal);
179    }
180    assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
181
182    // Now check each byte array to make sure allocations don't overlap
183    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
184      // since we add the chunkID at the 0th offset of the chunk and the
185      // chunkid is an int we need to account for those 4 bytes
186      int expectedOff = Bytes.SIZEOF_INT;
187      for (AllocRecord alloc : allocsInChunk.values()) {
188        assertEquals(expectedOff, alloc.offset);
189        assertTrue("Allocation overruns buffer",
190            alloc.offset + alloc.size <= alloc.alloc.capacity());
191        expectedOff += alloc.size;
192      }
193    }
194  }
195
196  /**
197   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
198   * there's no memory leak (HBASE-16195)
199   * @throws Exception if any error occurred
200   */
201  @Test
202  public void testLABChunkQueue() throws Exception {
203    ChunkCreator oldInstance = null;
204    try {
205      MemStoreLABImpl mslab = new MemStoreLABImpl();
206      // by default setting, there should be no chunks initialized in the pool
207      assertTrue(mslab.getPooledChunks().isEmpty());
208      oldInstance = ChunkCreator.instance;
209      ChunkCreator.instance = null;
210      // reset mslab with chunk pool
211      Configuration conf = HBaseConfiguration.create();
212      conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
213      // set chunk size to default max alloc size, so we could easily trigger chunk retirement
214      conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
215      // reconstruct mslab
216      long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
217          .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
218      ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false,
219        globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
220      ChunkCreator.clearDisableFlag();
221      mslab = new MemStoreLABImpl(conf);
222      // launch multiple threads to trigger frequent chunk retirement
223      List<Thread> threads = new ArrayList<>();
224      final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
225          new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
226      for (int i = 0; i < 10; i++) {
227        threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
228      }
229      for (Thread thread : threads) {
230        thread.start();
231      }
232      // let it run for some time
233      Thread.sleep(1000);
234      for (Thread thread : threads) {
235        thread.interrupt();
236      }
237      boolean threadsRunning = true;
238      boolean alive = false;
239      while (threadsRunning) {
240        alive = false;
241        for (Thread thread : threads) {
242          if (thread.isAlive()) {
243            alive = true;
244            break;
245          }
246        }
247        if (!alive) {
248          threadsRunning = false;
249        }
250      }
251      // none of the chunkIds would have been returned back
252      assertTrue("All the chunks must have been cleared",
253          ChunkCreator.instance.numberOfMappedChunks() != 0);
254      int pooledChunksNum = mslab.getPooledChunks().size();
255      // close the mslab
256      mslab.close();
257      // make sure all chunks where reclaimed back to pool
258      int queueLength = mslab.getNumOfChunksReturnedToPool();
259      assertTrue("All chunks in chunk queue should be reclaimed or removed"
260          + " after mslab closed but actually: " + (pooledChunksNum-queueLength),
261          pooledChunksNum-queueLength == 0);
262    } finally {
263      ChunkCreator.instance = oldInstance;
264    }
265  }
266
267  private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
268      Cell cellToCopyInto) {
269    Thread thread = new Thread() {
270      volatile boolean stopped = false;
271
272      @Override
273      public void run() {
274        while (!stopped) {
275          // keep triggering chunk retirement
276          mslab.copyCellInto(cellToCopyInto);
277        }
278      }
279
280      @Override
281      public void interrupt() {
282        this.stopped = true;
283      }
284    };
285    thread.setName(threadName);
286    thread.setDaemon(true);
287    return thread;
288  }
289
290  private static class AllocRecord implements Comparable<AllocRecord>{
291    private final ByteBuffer alloc;
292    private final int offset;
293    private final int size;
294
295    public AllocRecord(ByteBuffer alloc, int offset, int size) {
296      super();
297      this.alloc = alloc;
298      this.offset = offset;
299      this.size = size;
300    }
301
302    @Override
303    public int compareTo(AllocRecord e) {
304      if (alloc != e.alloc) {
305        throw new RuntimeException("Can only compare within a particular array");
306      }
307      return Ints.compare(this.offset, e.offset);
308    }
309
310    @Override
311    public String toString() {
312      return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
313    }
314  }
315}
316