001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.nio.ByteBuff; 026import org.apache.hadoop.hbase.testclassification.RPCTests; 027import org.apache.hadoop.hbase.testclassification.SmallTests; 028import org.junit.ClassRule; 029import org.junit.Test; 030import org.junit.experimental.categories.Category; 031 032import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 033import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel; 034import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger; 035import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory; 036import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory; 037 038@Category({ RPCTests.class, SmallTests.class }) 039public class TestByteBuffAllocatorLeakDetection { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class); 044 045 @SuppressWarnings("unused") 046 @Test 047 public void testLeakDetection() throws InterruptedException { 048 InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory(); 049 AtomicInteger leaksDetected = new AtomicInteger(); 050 InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected)); 051 052 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); 053 assertTrue(ResourceLeakDetector.isEnabled()); 054 055 try { 056 int maxBuffersInPool = 10; 057 int bufSize = 1024; 058 int minSize = bufSize / 8; 059 ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize); 060 061 // tracking leaks happens on creation of a RefCnt, through a call to detector.track(). 062 // If a leak occurs, but detector.track() is never called again, the leak will not be 063 // realized. Further, causing a leak requires a GC event. So below we do some allocations, 064 // cause some GC's, do more allocations, and then expect a leak to show up. 065 066 // first allocate on-heap. we expect to not see a leak from this, because we 067 // dont count on-heap references 068 alloc.allocate(minSize - 1); 069 System.gc(); 070 Thread.sleep(1000); 071 072 // cause an allocation to trigger a leak detection, if there were one. 073 // keep a reference so we don't trigger a leak right away from this. 074 ByteBuff reference = alloc.allocate(minSize * 2); 075 assertEquals(0, leaksDetected.get()); 076 077 // allocate, but don't keep a reference. this should cause a leak 078 alloc.allocate(minSize * 2); 079 System.gc(); 080 Thread.sleep(1000); 081 082 // allocate again, this should cause the above leak to be detected 083 alloc.allocate(minSize * 2); 084 assertEquals(1, leaksDetected.get()); 085 } finally { 086 InternalLoggerFactory.setDefaultFactory(original); 087 } 088 } 089 090 private static class MockedLoggerFactory extends Slf4JLoggerFactory { 091 092 private AtomicInteger leaksDetected; 093 094 public MockedLoggerFactory(AtomicInteger leaksDetected) { 095 this.leaksDetected = leaksDetected; 096 } 097 098 @Override 099 public InternalLogger newInstance(String name) { 100 InternalLogger delegate = super.newInstance(name); 101 return new MockedLogger(leaksDetected, delegate); 102 } 103 } 104 105 private static class MockedLogger implements InternalLogger { 106 107 private AtomicInteger leaksDetected; 108 private InternalLogger delegate; 109 110 public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) { 111 this.leaksDetected = leaksDetected; 112 this.delegate = delegate; 113 } 114 115 private void maybeCountLeak(String msgOrFormat) { 116 if (msgOrFormat.startsWith("LEAK")) { 117 leaksDetected.incrementAndGet(); 118 } 119 } 120 121 @Override 122 public void error(String msg) { 123 maybeCountLeak(msg); 124 delegate.error(msg); 125 } 126 127 @Override 128 public void error(String format, Object arg) { 129 maybeCountLeak(format); 130 delegate.error(format, arg); 131 } 132 133 @Override 134 public void error(String format, Object argA, Object argB) { 135 maybeCountLeak(format); 136 delegate.error(format, argA, argB); 137 } 138 139 @Override 140 public void error(String format, Object... arguments) { 141 maybeCountLeak(format); 142 delegate.error(format, arguments); 143 } 144 145 @Override 146 public void error(String msg, Throwable t) { 147 maybeCountLeak(msg); 148 delegate.error(msg, t); 149 } 150 151 @Override 152 public void error(Throwable t) { 153 delegate.error(t); 154 } 155 156 @Override 157 public String name() { 158 return delegate.name(); 159 } 160 161 @Override 162 public boolean isTraceEnabled() { 163 return delegate.isTraceEnabled(); 164 } 165 166 @Override 167 public void trace(String msg) { 168 delegate.trace(msg); 169 } 170 171 @Override 172 public void trace(String format, Object arg) { 173 delegate.trace(format, arg); 174 } 175 176 @Override 177 public void trace(String format, Object argA, Object argB) { 178 delegate.trace(format, argA, argB); 179 } 180 181 @Override 182 public void trace(String format, Object... arguments) { 183 delegate.trace(format, arguments); 184 } 185 186 @Override 187 public void trace(String msg, Throwable t) { 188 delegate.trace(msg, t); 189 } 190 191 @Override 192 public void trace(Throwable t) { 193 delegate.trace(t); 194 } 195 196 @Override 197 public boolean isDebugEnabled() { 198 return delegate.isDebugEnabled(); 199 } 200 201 @Override 202 public void debug(String msg) { 203 delegate.debug(msg); 204 } 205 206 @Override 207 public void debug(String format, Object arg) { 208 delegate.debug(format, arg); 209 } 210 211 @Override 212 public void debug(String format, Object argA, Object argB) { 213 delegate.debug(format, argA, argB); 214 } 215 216 @Override 217 public void debug(String format, Object... arguments) { 218 delegate.debug(format, arguments); 219 } 220 221 @Override 222 public void debug(String msg, Throwable t) { 223 delegate.debug(msg, t); 224 } 225 226 @Override 227 public void debug(Throwable t) { 228 delegate.debug(t); 229 } 230 231 @Override 232 public boolean isInfoEnabled() { 233 return delegate.isInfoEnabled(); 234 } 235 236 @Override 237 public void info(String msg) { 238 delegate.info(msg); 239 } 240 241 @Override 242 public void info(String format, Object arg) { 243 delegate.info(format, arg); 244 } 245 246 @Override 247 public void info(String format, Object argA, Object argB) { 248 delegate.info(format, argA, argB); 249 } 250 251 @Override 252 public void info(String format, Object... arguments) { 253 delegate.info(format, arguments); 254 } 255 256 @Override 257 public void info(String msg, Throwable t) { 258 delegate.info(msg, t); 259 } 260 261 @Override 262 public void info(Throwable t) { 263 delegate.info(t); 264 } 265 266 @Override 267 public boolean isWarnEnabled() { 268 return delegate.isWarnEnabled(); 269 } 270 271 @Override 272 public void warn(String msg) { 273 delegate.warn(msg); 274 } 275 276 @Override 277 public void warn(String format, Object arg) { 278 delegate.warn(format, arg); 279 } 280 281 @Override 282 public void warn(String format, Object... arguments) { 283 delegate.warn(format, arguments); 284 } 285 286 @Override 287 public void warn(String format, Object argA, Object argB) { 288 delegate.warn(format, argA, argB); 289 } 290 291 @Override 292 public void warn(String msg, Throwable t) { 293 delegate.warn(msg, t); 294 } 295 296 @Override 297 public void warn(Throwable t) { 298 delegate.warn(t); 299 } 300 301 @Override 302 public boolean isErrorEnabled() { 303 return delegate.isErrorEnabled(); 304 } 305 306 @Override 307 public boolean isEnabled(InternalLogLevel level) { 308 return delegate.isEnabled(level); 309 } 310 311 @Override 312 public void log(InternalLogLevel level, String msg) { 313 delegate.log(level, msg); 314 } 315 316 @Override 317 public void log(InternalLogLevel level, String format, Object arg) { 318 delegate.log(level, format, arg); 319 } 320 321 @Override 322 public void log(InternalLogLevel level, String format, Object argA, Object argB) { 323 delegate.log(level, format, argA, argB); 324 } 325 326 @Override 327 public void log(InternalLogLevel level, String format, Object... arguments) { 328 delegate.log(level, format, arguments); 329 } 330 331 @Override 332 public void log(InternalLogLevel level, String msg, Throwable t) { 333 delegate.log(level, msg, t); 334 } 335 336 @Override 337 public void log(InternalLogLevel level, Throwable t) { 338 delegate.log(level, t); 339 } 340 } 341}