001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.CHUNK_SIZE_KEY; 021import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.MAX_ALLOC_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.lang.management.ManagementFactory; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.ThreadLocalRandom; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ByteBufferKeyValue; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.MultithreadedTestUtil; 043import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 044import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 045import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.junit.AfterClass; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055 056import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 059import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 060 061@Category({ RegionServerTests.class, MediumTests.class }) 062public class TestMemStoreLAB { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestMemStoreLAB.class); 067 068 private final static Configuration conf = new Configuration(); 069 070 private static final byte[] rk = Bytes.toBytes("r1"); 071 private static final byte[] cf = Bytes.toBytes("f"); 072 private static final byte[] q = Bytes.toBytes("q"); 073 074 @BeforeClass 075 public static void setUpBeforeClass() throws Exception { 076 ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f, 077 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 078 } 079 080 @AfterClass 081 public static void tearDownAfterClass() throws Exception { 082 long globalMemStoreLimit = 083 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 084 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 085 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f, 086 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 087 } 088 089 /** 090 * Test a bunch of random allocations 091 */ 092 @Test 093 public void testLABRandomAllocation() { 094 MemStoreLAB mslab = new MemStoreLABImpl(); 095 int expectedOff = 0; 096 ByteBuffer lastBuffer = null; 097 int lastChunkId = -1; 098 // 100K iterations by 0-1K alloc -> 50MB expected 099 // should be reasonable for unit test and also cover wraparound 100 // behavior 101 Random rand = ThreadLocalRandom.current(); 102 for (int i = 0; i < 100000; i++) { 103 int valSize = rand.nextInt(3); 104 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 105 int size = kv.getSerializedSize(); 106 ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); 107 if (newKv.getBuffer() != lastBuffer) { 108 // since we add the chunkID at the 0th offset of the chunk and the 109 // chunkid is an int we need to account for those 4 bytes 110 expectedOff = Bytes.SIZEOF_INT; 111 lastBuffer = newKv.getBuffer(); 112 int chunkId = newKv.getBuffer().getInt(0); 113 assertTrue("chunkid should be different", chunkId != lastChunkId); 114 lastChunkId = chunkId; 115 } 116 assertEquals(expectedOff, newKv.getOffset()); 117 assertTrue("Allocation overruns buffer", 118 newKv.getOffset() + size <= newKv.getBuffer().capacity()); 119 expectedOff += size; 120 } 121 } 122 123 @Test 124 public void testLABLargeAllocation() { 125 MemStoreLAB mslab = new MemStoreLABImpl(); 126 KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); 127 Cell newCell = mslab.copyCellInto(kv); 128 assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); 129 } 130 131 /** 132 * Test allocation from lots of threads, making sure the results don't overlap in any way 133 */ 134 @Test 135 public void testLABThreading() throws Exception { 136 Configuration conf = new Configuration(); 137 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 138 139 final AtomicInteger totalAllocated = new AtomicInteger(); 140 141 final MemStoreLAB mslab = new MemStoreLABImpl(); 142 List<List<AllocRecord>> allocations = Lists.newArrayList(); 143 144 for (int i = 0; i < 10; i++) { 145 final List<AllocRecord> allocsByThisThread = Lists.newLinkedList(); 146 allocations.add(allocsByThisThread); 147 148 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 149 @Override 150 public void doAnAction() throws Exception { 151 int valSize = ThreadLocalRandom.current().nextInt(3); 152 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 153 int size = kv.getSerializedSize(); 154 ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv); 155 totalAllocated.addAndGet(size); 156 allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size)); 157 } 158 }; 159 ctx.addThread(t); 160 } 161 162 ctx.startThreads(); 163 while (totalAllocated.get() < 50 * 1024 * 1000 && ctx.shouldRun()) { 164 Thread.sleep(10); 165 } 166 ctx.stop(); 167 // Partition the allocations by the actual byte[] they point into, 168 // make sure offsets are unique for each chunk 169 Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = Maps.newHashMap(); 170 171 int sizeCounted = 0; 172 for (AllocRecord rec : Iterables.concat(allocations)) { 173 sizeCounted += rec.size; 174 if (rec.size == 0) { 175 continue; 176 } 177 Map<Integer, AllocRecord> mapForThisByteArray = mapsByChunk.get(rec.alloc); 178 if (mapForThisByteArray == null) { 179 mapForThisByteArray = Maps.newTreeMap(); 180 mapsByChunk.put(rec.alloc, mapForThisByteArray); 181 } 182 AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); 183 assertNull("Already had an entry " + oldVal + " for allocation " + rec, oldVal); 184 } 185 assertEquals("Sanity check test", sizeCounted, totalAllocated.get()); 186 187 // Now check each byte array to make sure allocations don't overlap 188 for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) { 189 // since we add the chunkID at the 0th offset of the chunk and the 190 // chunkid is an int we need to account for those 4 bytes 191 int expectedOff = Bytes.SIZEOF_INT; 192 for (AllocRecord alloc : allocsInChunk.values()) { 193 assertEquals(expectedOff, alloc.offset); 194 assertTrue("Allocation overruns buffer", 195 alloc.offset + alloc.size <= alloc.alloc.capacity()); 196 expectedOff += alloc.size; 197 } 198 } 199 } 200 201 /** 202 * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure 203 * there's no memory leak (HBASE-16195) 204 * @throws Exception if any error occurred 205 */ 206 @Test 207 public void testLABChunkQueue() throws Exception { 208 ChunkCreator oldInstance = null; 209 try { 210 MemStoreLABImpl mslab = new MemStoreLABImpl(); 211 // by default setting, there should be no chunks initialized in the pool 212 assertTrue(mslab.getPooledChunks().isEmpty()); 213 oldInstance = ChunkCreator.instance; 214 ChunkCreator.instance = null; 215 // reset mslab with chunk pool 216 Configuration conf = HBaseConfiguration.create(); 217 conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); 218 // set chunk size to default max alloc size, so we could easily trigger chunk retirement 219 conf.setLong(CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); 220 // reconstruct mslab 221 long globalMemStoreLimit = 222 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 223 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 224 ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, globalMemStoreLimit, 0.1f, 225 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 226 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 227 ChunkCreator.clearDisableFlag(); 228 mslab = new MemStoreLABImpl(conf); 229 // launch multiple threads to trigger frequent chunk retirement 230 List<Thread> threads = new ArrayList<>(); 231 final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), 232 new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); 233 for (int i = 0; i < 10; i++) { 234 threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); 235 } 236 for (Thread thread : threads) { 237 thread.start(); 238 } 239 // let it run for some time 240 Thread.sleep(1000); 241 for (Thread thread : threads) { 242 thread.interrupt(); 243 } 244 boolean threadsRunning = true; 245 boolean alive = false; 246 while (threadsRunning) { 247 alive = false; 248 for (Thread thread : threads) { 249 if (thread.isAlive()) { 250 alive = true; 251 break; 252 } 253 } 254 if (!alive) { 255 threadsRunning = false; 256 } 257 } 258 // none of the chunkIds would have been returned back 259 assertTrue("All the chunks must have been cleared", 260 ChunkCreator.instance.numberOfMappedChunks() != 0); 261 Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks); 262 int pooledChunksNum = mslab.getPooledChunks().size(); 263 // close the mslab 264 mslab.close(); 265 // make sure all chunks where reclaimed back to pool 266 int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds); 267 assertTrue( 268 "All chunks in chunk queue should be reclaimed or removed" 269 + " after mslab closed but actually: " + (pooledChunksNum - queueLength), 270 pooledChunksNum - queueLength == 0); 271 } finally { 272 ChunkCreator.instance = oldInstance; 273 } 274 } 275 276 /** 277 * Test cell with right length, which constructed by testForceCopyOfBigCellInto. (HBASE-26467) 278 */ 279 @Test 280 public void testForceCopyOfBigCellInto() { 281 Configuration conf = HBaseConfiguration.create(); 282 int chunkSize = ChunkCreator.getInstance().getChunkSize(); 283 conf.setInt(CHUNK_SIZE_KEY, chunkSize); 284 conf.setInt(MAX_ALLOC_KEY, chunkSize / 2); 285 286 MemStoreLABImpl mslab = new MemStoreLABImpl(conf); 287 byte[] row = Bytes.toBytes("row"); 288 byte[] columnFamily = Bytes.toBytes("columnFamily"); 289 byte[] qualify = Bytes.toBytes("qualify"); 290 byte[] smallValue = new byte[chunkSize / 2]; 291 byte[] bigValue = new byte[chunkSize]; 292 KeyValue smallKV = 293 new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), smallValue); 294 295 assertEquals(smallKV.getSerializedSize(), 296 mslab.forceCopyOfBigCellInto(smallKV).getSerializedSize()); 297 298 KeyValue bigKV = 299 new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), bigValue); 300 assertEquals(bigKV.getSerializedSize(), 301 mslab.forceCopyOfBigCellInto(bigKV).getSerializedSize()); 302 303 /** 304 * Add test by HBASE-26576,all the chunks are in {@link ChunkCreator#chunkIdMap} 305 */ 306 assertTrue(mslab.chunks.size() == 2); 307 Chunk dataChunk = null; 308 Chunk jumboChunk = null; 309 310 for (Integer chunkId : mslab.chunks) { 311 Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId); 312 assertTrue(chunk != null); 313 if (chunk.getChunkType() == ChunkType.JUMBO_CHUNK) { 314 jumboChunk = chunk; 315 } else if (chunk.getChunkType() == ChunkType.DATA_CHUNK) { 316 dataChunk = chunk; 317 } 318 } 319 320 assertTrue(dataChunk != null); 321 assertTrue(jumboChunk != null); 322 323 mslab.close(); 324 /** 325 * After mslab close, jumboChunk is removed from {@link ChunkCreator#chunkIdMap} but because 326 * dataChunk is recycled to pool so it is still in {@link ChunkCreator#chunkIdMap}. 327 */ 328 assertTrue(ChunkCreator.getInstance().getChunk(jumboChunk.getId()) == null); 329 assertTrue(!ChunkCreator.getInstance().isChunkInPool(jumboChunk.getId())); 330 assertTrue(ChunkCreator.getInstance().getChunk(dataChunk.getId()) == dataChunk); 331 assertTrue(ChunkCreator.getInstance().isChunkInPool(dataChunk.getId())); 332 333 } 334 335 private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, 336 Cell cellToCopyInto) { 337 Thread thread = new Thread() { 338 volatile boolean stopped = false; 339 340 @Override 341 public void run() { 342 while (!stopped) { 343 // keep triggering chunk retirement 344 mslab.copyCellInto(cellToCopyInto); 345 } 346 } 347 348 @Override 349 public void interrupt() { 350 this.stopped = true; 351 } 352 }; 353 thread.setName(threadName); 354 thread.setDaemon(true); 355 return thread; 356 } 357 358 private static class AllocRecord implements Comparable<AllocRecord> { 359 private final ByteBuffer alloc; 360 private final int offset; 361 private final int size; 362 363 public AllocRecord(ByteBuffer alloc, int offset, int size) { 364 super(); 365 this.alloc = alloc; 366 this.offset = offset; 367 this.size = size; 368 } 369 370 @Override 371 public int compareTo(AllocRecord e) { 372 if (alloc != e.alloc) { 373 throw new RuntimeException("Can only compare within a particular array"); 374 } 375 return Ints.compare(this.offset, e.offset); 376 } 377 378 @Override 379 public String toString() { 380 return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; 381 } 382 } 383}