001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.CHUNK_SIZE_KEY;
021import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.MAX_ALLOC_KEY;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.lang.management.ManagementFactory;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.ThreadLocalRandom;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ByteBufferKeyValue;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.ExtendedCell;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.MultithreadedTestUtil;
043import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
044import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
045import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.junit.jupiter.api.AfterAll;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
058import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
059
060@Tag(RegionServerTests.TAG)
061@Tag(MediumTests.TAG)
062public class TestMemStoreLAB {
063
064  private final static Configuration conf = new Configuration();
065
066  private static final byte[] rk = Bytes.toBytes("r1");
067  private static final byte[] cf = Bytes.toBytes("f");
068  private static final byte[] q = Bytes.toBytes("q");
069
070  @BeforeAll
071  public static void setUpBeforeClass() throws Exception {
072    ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
073      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
074  }
075
076  @AfterAll
077  public static void tearDownAfterClass() throws Exception {
078    long globalMemStoreLimit =
079      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
080        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
081    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
082      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
083  }
084
085  /**
086   * Test a bunch of random allocations
087   */
088  @Test
089  public void testLABRandomAllocation() {
090    MemStoreLAB mslab = new MemStoreLABImpl();
091    int expectedOff = 0;
092    ByteBuffer lastBuffer = null;
093    int lastChunkId = -1;
094    // 100K iterations by 0-1K alloc -> 50MB expected
095    // should be reasonable for unit test and also cover wraparound
096    // behavior
097    Random rand = ThreadLocalRandom.current();
098    for (int i = 0; i < 100000; i++) {
099      int valSize = rand.nextInt(3);
100      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
101      int size = kv.getSerializedSize();
102      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
103      if (newKv.getBuffer() != lastBuffer) {
104        // since we add the chunkID at the 0th offset of the chunk and the
105        // chunkid is an int we need to account for those 4 bytes
106        expectedOff = Bytes.SIZEOF_INT;
107        lastBuffer = newKv.getBuffer();
108        int chunkId = newKv.getBuffer().getInt(0);
109        assertTrue(chunkId != lastChunkId, "chunkid should be different");
110        lastChunkId = chunkId;
111      }
112      assertEquals(expectedOff, newKv.getOffset());
113      assertTrue(newKv.getOffset() + size <= newKv.getBuffer().capacity(),
114        "Allocation overruns buffer");
115      expectedOff += size;
116    }
117  }
118
119  @Test
120  public void testLABLargeAllocation() {
121    MemStoreLAB mslab = new MemStoreLABImpl();
122    KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
123    Cell newCell = mslab.copyCellInto(kv);
124    assertNull(newCell, "2MB allocation shouldn't be satisfied by LAB.");
125  }
126
127  /**
128   * Test allocation from lots of threads, making sure the results don't overlap in any way
129   */
130  @Test
131  public void testLABThreading() throws Exception {
132    Configuration conf = new Configuration();
133    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
134
135    final AtomicInteger totalAllocated = new AtomicInteger();
136
137    final MemStoreLAB mslab = new MemStoreLABImpl();
138    List<List<AllocRecord>> allocations = Lists.newArrayList();
139
140    for (int i = 0; i < 10; i++) {
141      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
142      allocations.add(allocsByThisThread);
143
144      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
145        @Override
146        public void doAnAction() throws Exception {
147          int valSize = ThreadLocalRandom.current().nextInt(3);
148          KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
149          int size = kv.getSerializedSize();
150          ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
151          totalAllocated.addAndGet(size);
152          allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
153        }
154      };
155      ctx.addThread(t);
156    }
157
158    ctx.startThreads();
159    while (totalAllocated.get() < 50 * 1024 * 1000 && ctx.shouldRun()) {
160      Thread.sleep(10);
161    }
162    ctx.stop();
163    // Partition the allocations by the actual byte[] they point into,
164    // make sure offsets are unique for each chunk
165    Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = Maps.newHashMap();
166
167    int sizeCounted = 0;
168    for (AllocRecord rec : Iterables.concat(allocations)) {
169      sizeCounted += rec.size;
170      if (rec.size == 0) {
171        continue;
172      }
173      Map<Integer, AllocRecord> mapForThisByteArray = mapsByChunk.get(rec.alloc);
174      if (mapForThisByteArray == null) {
175        mapForThisByteArray = Maps.newTreeMap();
176        mapsByChunk.put(rec.alloc, mapForThisByteArray);
177      }
178      AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
179      assertNull(oldVal, "Already had an entry " + oldVal + " for allocation " + rec);
180    }
181    assertEquals(sizeCounted, totalAllocated.get(), "Sanity check test");
182
183    // Now check each byte array to make sure allocations don't overlap
184    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
185      // since we add the chunkID at the 0th offset of the chunk and the
186      // chunkid is an int we need to account for those 4 bytes
187      int expectedOff = Bytes.SIZEOF_INT;
188      for (AllocRecord alloc : allocsInChunk.values()) {
189        assertEquals(expectedOff, alloc.offset);
190        assertTrue(alloc.offset + alloc.size <= alloc.alloc.capacity(),
191          "Allocation overruns buffer");
192        expectedOff += alloc.size;
193      }
194    }
195  }
196
197  /**
198   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
199   * there's no memory leak (HBASE-16195)
200   * @throws Exception if any error occurred
201   */
202  @Test
203  public void testLABChunkQueue() throws Exception {
204    ChunkCreator oldInstance = null;
205    try {
206      MemStoreLABImpl mslab = new MemStoreLABImpl();
207      // by default setting, there should be no chunks initialized in the pool
208      assertTrue(mslab.getPooledChunks().isEmpty());
209      oldInstance = ChunkCreator.instance;
210      ChunkCreator.instance = null;
211      // reset mslab with chunk pool
212      Configuration conf = HBaseConfiguration.create();
213      conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
214      // set chunk size to default max alloc size, so we could easily trigger chunk retirement
215      conf.setLong(CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
216      // reconstruct mslab
217      long globalMemStoreLimit =
218        (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
219          * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
220      ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, globalMemStoreLimit, 0.1f,
221        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
222        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
223      ChunkCreator.clearDisableFlag();
224      mslab = new MemStoreLABImpl(conf);
225      // launch multiple threads to trigger frequent chunk retirement
226      List<Thread> threads = new ArrayList<>();
227      final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
228        new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
229      for (int i = 0; i < 10; i++) {
230        threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
231      }
232      for (Thread thread : threads) {
233        thread.start();
234      }
235      // let it run for some time
236      Thread.sleep(1000);
237      for (Thread thread : threads) {
238        thread.interrupt();
239      }
240      boolean threadsRunning = true;
241      boolean alive = false;
242      while (threadsRunning) {
243        alive = false;
244        for (Thread thread : threads) {
245          if (thread.isAlive()) {
246            alive = true;
247            break;
248          }
249        }
250        if (!alive) {
251          threadsRunning = false;
252        }
253      }
254      // none of the chunkIds would have been returned back
255      assertTrue(ChunkCreator.instance.numberOfMappedChunks() != 0,
256        "All the chunks must have been cleared");
257      Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks);
258      int pooledChunksNum = mslab.getPooledChunks().size();
259      // close the mslab
260      mslab.close();
261      // make sure all chunks where reclaimed back to pool
262      int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds);
263      assertTrue(pooledChunksNum - queueLength == 0,
264        "All chunks in chunk queue should be reclaimed or removed"
265          + " after mslab closed but actually: " + (pooledChunksNum - queueLength));
266    } finally {
267      ChunkCreator.instance = oldInstance;
268    }
269  }
270
271  /**
272   * Test cell with right length, which constructed by testForceCopyOfBigCellInto. (HBASE-26467)
273   */
274  @Test
275  public void testForceCopyOfBigCellInto() {
276    Configuration conf = HBaseConfiguration.create();
277    int chunkSize = ChunkCreator.getInstance().getChunkSize();
278    conf.setInt(CHUNK_SIZE_KEY, chunkSize);
279    conf.setInt(MAX_ALLOC_KEY, chunkSize / 2);
280
281    MemStoreLABImpl mslab = new MemStoreLABImpl(conf);
282    byte[] row = Bytes.toBytes("row");
283    byte[] columnFamily = Bytes.toBytes("columnFamily");
284    byte[] qualify = Bytes.toBytes("qualify");
285    byte[] smallValue = new byte[chunkSize / 2];
286    byte[] bigValue = new byte[chunkSize];
287    KeyValue smallKV =
288      new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), smallValue);
289
290    assertEquals(smallKV.getSerializedSize(),
291      mslab.forceCopyOfBigCellInto(smallKV).getSerializedSize());
292
293    KeyValue bigKV =
294      new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), bigValue);
295    assertEquals(bigKV.getSerializedSize(),
296      mslab.forceCopyOfBigCellInto(bigKV).getSerializedSize());
297
298    /**
299     * Add test by HBASE-26576,all the chunks are in {@link ChunkCreator#chunkIdMap}
300     */
301    assertTrue(mslab.chunks.size() == 2);
302    Chunk dataChunk = null;
303    Chunk jumboChunk = null;
304
305    for (Integer chunkId : mslab.chunks) {
306      Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
307      assertTrue(chunk != null);
308      if (chunk.getChunkType() == ChunkType.JUMBO_CHUNK) {
309        jumboChunk = chunk;
310      } else if (chunk.getChunkType() == ChunkType.DATA_CHUNK) {
311        dataChunk = chunk;
312      }
313    }
314
315    assertTrue(dataChunk != null);
316    assertTrue(jumboChunk != null);
317
318    mslab.close();
319    /**
320     * After mslab close, jumboChunk is removed from {@link ChunkCreator#chunkIdMap} but because
321     * dataChunk is recycled to pool so it is still in {@link ChunkCreator#chunkIdMap}.
322     */
323    assertTrue(ChunkCreator.getInstance().getChunk(jumboChunk.getId()) == null);
324    assertTrue(!ChunkCreator.getInstance().isChunkInPool(jumboChunk.getId()));
325    assertTrue(ChunkCreator.getInstance().getChunk(dataChunk.getId()) == dataChunk);
326    assertTrue(ChunkCreator.getInstance().isChunkInPool(dataChunk.getId()));
327
328  }
329
330  private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
331    ExtendedCell cellToCopyInto) {
332    Thread thread = new Thread() {
333      volatile boolean stopped = false;
334
335      @Override
336      public void run() {
337        while (!stopped) {
338          // keep triggering chunk retirement
339          mslab.copyCellInto(cellToCopyInto);
340        }
341      }
342
343      @Override
344      public void interrupt() {
345        this.stopped = true;
346      }
347    };
348    thread.setName(threadName);
349    thread.setDaemon(true);
350    return thread;
351  }
352
353  private static class AllocRecord implements Comparable<AllocRecord> {
354    private final ByteBuffer alloc;
355    private final int offset;
356    private final int size;
357
358    public AllocRecord(ByteBuffer alloc, int offset, int size) {
359      super();
360      this.alloc = alloc;
361      this.offset = offset;
362      this.size = size;
363    }
364
365    @Override
366    public int compareTo(AllocRecord e) {
367      if (alloc != e.alloc) {
368        throw new RuntimeException("Can only compare within a particular array");
369      }
370      return Ints.compare(this.offset, e.offset);
371    }
372
373    @Override
374    public String toString() {
375      return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
376    }
377  }
378}