001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.*; 021 022import java.lang.management.ManagementFactory; 023import java.nio.ByteBuffer; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.Random; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ByteBufferKeyValue; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.hadoop.hbase.MultithreadedTestUtil; 037import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 038import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047 048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 051import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 052 053@Category({RegionServerTests.class, SmallTests.class}) 054public class TestMemStoreLAB { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestMemStoreLAB.class); 059 060 private final static Configuration conf = new Configuration(); 061 062 private static final byte[] rk = Bytes.toBytes("r1"); 063 private static final byte[] cf = Bytes.toBytes("f"); 064 private static final byte[] q = Bytes.toBytes("q"); 065 066 @BeforeClass 067 public static void setUpBeforeClass() throws Exception { 068 ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f, 069 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); 070 } 071 072 @AfterClass 073 public static void tearDownAfterClass() throws Exception { 074 long globalMemStoreLimit = 075 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 076 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 077 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f, 078 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); 079 } 080 081 /** 082 * Test a bunch of random allocations 083 */ 084 @Test 085 public void testLABRandomAllocation() { 086 Random rand = new Random(); 087 MemStoreLAB mslab = new MemStoreLABImpl(); 088 int expectedOff = 0; 089 ByteBuffer lastBuffer = null; 090 int lastChunkId = -1; 091 // 100K iterations by 0-1K alloc -> 50MB expected 092 // should be reasonable for unit test and also cover wraparound 093 // behavior 094 for (int i = 0; i < 100000; i++) { 095 int valSize = rand.nextInt(3); 096 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 097 int size = KeyValueUtil.length(kv); 098 ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); 099 if (newKv.getBuffer() != lastBuffer) { 100 // since we add the chunkID at the 0th offset of the chunk and the 101 // chunkid is an int we need to account for those 4 bytes 102 expectedOff = Bytes.SIZEOF_INT; 103 lastBuffer = newKv.getBuffer(); 104 int chunkId = newKv.getBuffer().getInt(0); 105 assertTrue("chunkid should be different", chunkId != lastChunkId); 106 lastChunkId = chunkId; 107 } 108 assertEquals(expectedOff, newKv.getOffset()); 109 assertTrue("Allocation overruns buffer", 110 newKv.getOffset() + size <= newKv.getBuffer().capacity()); 111 expectedOff += size; 112 } 113 } 114 115 @Test 116 public void testLABLargeAllocation() { 117 MemStoreLAB mslab = new MemStoreLABImpl(); 118 KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); 119 Cell newCell = mslab.copyCellInto(kv); 120 assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); 121 } 122 123 /** 124 * Test allocation from lots of threads, making sure the results don't 125 * overlap in any way 126 */ 127 @Test 128 public void testLABThreading() throws Exception { 129 Configuration conf = new Configuration(); 130 MultithreadedTestUtil.TestContext ctx = 131 new MultithreadedTestUtil.TestContext(conf); 132 133 final AtomicInteger totalAllocated = new AtomicInteger(); 134 135 final MemStoreLAB mslab = new MemStoreLABImpl(); 136 List<List<AllocRecord>> allocations = Lists.newArrayList(); 137 138 for (int i = 0; i < 10; i++) { 139 final List<AllocRecord> allocsByThisThread = Lists.newLinkedList(); 140 allocations.add(allocsByThisThread); 141 142 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 143 private Random r = new Random(); 144 @Override 145 public void doAnAction() throws Exception { 146 int valSize = r.nextInt(3); 147 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 148 int size = KeyValueUtil.length(kv); 149 ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv); 150 totalAllocated.addAndGet(size); 151 allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size)); 152 } 153 }; 154 ctx.addThread(t); 155 } 156 157 ctx.startThreads(); 158 while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) { 159 Thread.sleep(10); 160 } 161 ctx.stop(); 162 // Partition the allocations by the actual byte[] they point into, 163 // make sure offsets are unique for each chunk 164 Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = 165 Maps.newHashMap(); 166 167 int sizeCounted = 0; 168 for (AllocRecord rec : Iterables.concat(allocations)) { 169 sizeCounted += rec.size; 170 if (rec.size == 0) continue; 171 Map<Integer, AllocRecord> mapForThisByteArray = 172 mapsByChunk.get(rec.alloc); 173 if (mapForThisByteArray == null) { 174 mapForThisByteArray = Maps.newTreeMap(); 175 mapsByChunk.put(rec.alloc, mapForThisByteArray); 176 } 177 AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); 178 assertNull("Already had an entry " + oldVal + " for allocation " + rec, 179 oldVal); 180 } 181 assertEquals("Sanity check test", sizeCounted, totalAllocated.get()); 182 183 // Now check each byte array to make sure allocations don't overlap 184 for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) { 185 // since we add the chunkID at the 0th offset of the chunk and the 186 // chunkid is an int we need to account for those 4 bytes 187 int expectedOff = Bytes.SIZEOF_INT; 188 for (AllocRecord alloc : allocsInChunk.values()) { 189 assertEquals(expectedOff, alloc.offset); 190 assertTrue("Allocation overruns buffer", 191 alloc.offset + alloc.size <= alloc.alloc.capacity()); 192 expectedOff += alloc.size; 193 } 194 } 195 } 196 197 /** 198 * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure 199 * there's no memory leak (HBASE-16195) 200 * @throws Exception if any error occurred 201 */ 202 @Test 203 public void testLABChunkQueue() throws Exception { 204 ChunkCreator oldInstance = null; 205 try { 206 MemStoreLABImpl mslab = new MemStoreLABImpl(); 207 // by default setting, there should be no chunks initialized in the pool 208 assertTrue(mslab.getPooledChunks().isEmpty()); 209 oldInstance = ChunkCreator.instance; 210 ChunkCreator.instance = null; 211 // reset mslab with chunk pool 212 Configuration conf = HBaseConfiguration.create(); 213 conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); 214 // set chunk size to default max alloc size, so we could easily trigger chunk retirement 215 conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); 216 // reconstruct mslab 217 long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() 218 .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 219 ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, 220 globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); 221 ChunkCreator.clearDisableFlag(); 222 mslab = new MemStoreLABImpl(conf); 223 // launch multiple threads to trigger frequent chunk retirement 224 List<Thread> threads = new ArrayList<>(); 225 final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), 226 new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); 227 for (int i = 0; i < 10; i++) { 228 threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); 229 } 230 for (Thread thread : threads) { 231 thread.start(); 232 } 233 // let it run for some time 234 Thread.sleep(1000); 235 for (Thread thread : threads) { 236 thread.interrupt(); 237 } 238 boolean threadsRunning = true; 239 boolean alive = false; 240 while (threadsRunning) { 241 alive = false; 242 for (Thread thread : threads) { 243 if (thread.isAlive()) { 244 alive = true; 245 break; 246 } 247 } 248 if (!alive) { 249 threadsRunning = false; 250 } 251 } 252 // none of the chunkIds would have been returned back 253 assertTrue("All the chunks must have been cleared", 254 ChunkCreator.instance.numberOfMappedChunks() != 0); 255 int pooledChunksNum = mslab.getPooledChunks().size(); 256 // close the mslab 257 mslab.close(); 258 // make sure all chunks where reclaimed back to pool 259 int queueLength = mslab.getNumOfChunksReturnedToPool(); 260 assertTrue("All chunks in chunk queue should be reclaimed or removed" 261 + " after mslab closed but actually: " + (pooledChunksNum-queueLength), 262 pooledChunksNum-queueLength == 0); 263 } finally { 264 ChunkCreator.instance = oldInstance; 265 } 266 } 267 268 private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, 269 Cell cellToCopyInto) { 270 Thread thread = new Thread() { 271 volatile boolean stopped = false; 272 273 @Override 274 public void run() { 275 while (!stopped) { 276 // keep triggering chunk retirement 277 mslab.copyCellInto(cellToCopyInto); 278 } 279 } 280 281 @Override 282 public void interrupt() { 283 this.stopped = true; 284 } 285 }; 286 thread.setName(threadName); 287 thread.setDaemon(true); 288 return thread; 289 } 290 291 private static class AllocRecord implements Comparable<AllocRecord>{ 292 private final ByteBuffer alloc; 293 private final int offset; 294 private final int size; 295 296 public AllocRecord(ByteBuffer alloc, int offset, int size) { 297 super(); 298 this.alloc = alloc; 299 this.offset = offset; 300 this.size = size; 301 } 302 303 @Override 304 public int compareTo(AllocRecord e) { 305 if (alloc != e.alloc) { 306 throw new RuntimeException("Can only compare within a particular array"); 307 } 308 return Ints.compare(this.offset, e.offset); 309 } 310 311 @Override 312 public String toString() { 313 return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; 314 } 315 } 316} 317