001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.nio.ByteBuff;
026import org.apache.hadoop.hbase.testclassification.RPCTests;
027import org.apache.hadoop.hbase.testclassification.SmallTests;
028import org.junit.ClassRule;
029import org.junit.Test;
030import org.junit.experimental.categories.Category;
031
032import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
033import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel;
034import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger;
035import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory;
036import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory;
037
038@Category({ RPCTests.class, SmallTests.class })
039public class TestByteBuffAllocatorLeakDetection {
040
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043    HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class);
044
045  @SuppressWarnings("unused")
046  @Test
047  public void testLeakDetection() throws InterruptedException {
048    InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory();
049    AtomicInteger leaksDetected = new AtomicInteger();
050    InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected));
051
052    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
053    assertTrue(ResourceLeakDetector.isEnabled());
054
055    try {
056      int maxBuffersInPool = 10;
057      int bufSize = 1024;
058      int minSize = bufSize / 8;
059      ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
060
061      // tracking leaks happens on creation of a RefCnt, through a call to detector.track().
062      // If a leak occurs, but detector.track() is never called again, the leak will not be
063      // realized. Further, causing a leak requires a GC event. So below we do some allocations,
064      // cause some GC's, do more allocations, and then expect a leak to show up.
065
066      // first allocate on-heap. we expect to not see a leak from this, because we
067      // dont count on-heap references
068      alloc.allocate(minSize - 1);
069      System.gc();
070      Thread.sleep(1000);
071
072      // cause an allocation to trigger a leak detection, if there were one.
073      // keep a reference so we don't trigger a leak right away from this.
074      ByteBuff reference = alloc.allocate(minSize * 2);
075      assertEquals(0, leaksDetected.get());
076
077      // allocate, but don't keep a reference. this should cause a leak
078      alloc.allocate(minSize * 2);
079      System.gc();
080      Thread.sleep(1000);
081
082      // allocate again, this should cause the above leak to be detected
083      alloc.allocate(minSize * 2);
084      assertEquals(1, leaksDetected.get());
085    } finally {
086      InternalLoggerFactory.setDefaultFactory(original);
087    }
088  }
089
090  private static class MockedLoggerFactory extends Slf4JLoggerFactory {
091
092    private AtomicInteger leaksDetected;
093
094    public MockedLoggerFactory(AtomicInteger leaksDetected) {
095      this.leaksDetected = leaksDetected;
096    }
097
098    @Override
099    public InternalLogger newInstance(String name) {
100      InternalLogger delegate = super.newInstance(name);
101      return new MockedLogger(leaksDetected, delegate);
102    }
103  }
104
105  private static class MockedLogger implements InternalLogger {
106
107    private AtomicInteger leaksDetected;
108    private InternalLogger delegate;
109
110    public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) {
111      this.leaksDetected = leaksDetected;
112      this.delegate = delegate;
113    }
114
115    private void maybeCountLeak(String msgOrFormat) {
116      if (msgOrFormat.startsWith("LEAK")) {
117        leaksDetected.incrementAndGet();
118      }
119    }
120
121    @Override
122    public void error(String msg) {
123      maybeCountLeak(msg);
124      delegate.error(msg);
125    }
126
127    @Override
128    public void error(String format, Object arg) {
129      maybeCountLeak(format);
130      delegate.error(format, arg);
131    }
132
133    @Override
134    public void error(String format, Object argA, Object argB) {
135      maybeCountLeak(format);
136      delegate.error(format, argA, argB);
137    }
138
139    @Override
140    public void error(String format, Object... arguments) {
141      maybeCountLeak(format);
142      delegate.error(format, arguments);
143    }
144
145    @Override
146    public void error(String msg, Throwable t) {
147      maybeCountLeak(msg);
148      delegate.error(msg, t);
149    }
150
151    @Override
152    public void error(Throwable t) {
153      delegate.error(t);
154    }
155
156    @Override
157    public String name() {
158      return delegate.name();
159    }
160
161    @Override
162    public boolean isTraceEnabled() {
163      return delegate.isTraceEnabled();
164    }
165
166    @Override
167    public void trace(String msg) {
168      delegate.trace(msg);
169    }
170
171    @Override
172    public void trace(String format, Object arg) {
173      delegate.trace(format, arg);
174    }
175
176    @Override
177    public void trace(String format, Object argA, Object argB) {
178      delegate.trace(format, argA, argB);
179    }
180
181    @Override
182    public void trace(String format, Object... arguments) {
183      delegate.trace(format, arguments);
184    }
185
186    @Override
187    public void trace(String msg, Throwable t) {
188      delegate.trace(msg, t);
189    }
190
191    @Override
192    public void trace(Throwable t) {
193      delegate.trace(t);
194    }
195
196    @Override
197    public boolean isDebugEnabled() {
198      return delegate.isDebugEnabled();
199    }
200
201    @Override
202    public void debug(String msg) {
203      delegate.debug(msg);
204    }
205
206    @Override
207    public void debug(String format, Object arg) {
208      delegate.debug(format, arg);
209    }
210
211    @Override
212    public void debug(String format, Object argA, Object argB) {
213      delegate.debug(format, argA, argB);
214    }
215
216    @Override
217    public void debug(String format, Object... arguments) {
218      delegate.debug(format, arguments);
219    }
220
221    @Override
222    public void debug(String msg, Throwable t) {
223      delegate.debug(msg, t);
224    }
225
226    @Override
227    public void debug(Throwable t) {
228      delegate.debug(t);
229    }
230
231    @Override
232    public boolean isInfoEnabled() {
233      return delegate.isInfoEnabled();
234    }
235
236    @Override
237    public void info(String msg) {
238      delegate.info(msg);
239    }
240
241    @Override
242    public void info(String format, Object arg) {
243      delegate.info(format, arg);
244    }
245
246    @Override
247    public void info(String format, Object argA, Object argB) {
248      delegate.info(format, argA, argB);
249    }
250
251    @Override
252    public void info(String format, Object... arguments) {
253      delegate.info(format, arguments);
254    }
255
256    @Override
257    public void info(String msg, Throwable t) {
258      delegate.info(msg, t);
259    }
260
261    @Override
262    public void info(Throwable t) {
263      delegate.info(t);
264    }
265
266    @Override
267    public boolean isWarnEnabled() {
268      return delegate.isWarnEnabled();
269    }
270
271    @Override
272    public void warn(String msg) {
273      delegate.warn(msg);
274    }
275
276    @Override
277    public void warn(String format, Object arg) {
278      delegate.warn(format, arg);
279    }
280
281    @Override
282    public void warn(String format, Object... arguments) {
283      delegate.warn(format, arguments);
284    }
285
286    @Override
287    public void warn(String format, Object argA, Object argB) {
288      delegate.warn(format, argA, argB);
289    }
290
291    @Override
292    public void warn(String msg, Throwable t) {
293      delegate.warn(msg, t);
294    }
295
296    @Override
297    public void warn(Throwable t) {
298      delegate.warn(t);
299    }
300
301    @Override
302    public boolean isErrorEnabled() {
303      return delegate.isErrorEnabled();
304    }
305
306    @Override
307    public boolean isEnabled(InternalLogLevel level) {
308      return delegate.isEnabled(level);
309    }
310
311    @Override
312    public void log(InternalLogLevel level, String msg) {
313      delegate.log(level, msg);
314    }
315
316    @Override
317    public void log(InternalLogLevel level, String format, Object arg) {
318      delegate.log(level, format, arg);
319    }
320
321    @Override
322    public void log(InternalLogLevel level, String format, Object argA, Object argB) {
323      delegate.log(level, format, argA, argB);
324    }
325
326    @Override
327    public void log(InternalLogLevel level, String format, Object... arguments) {
328      delegate.log(level, format, arguments);
329    }
330
331    @Override
332    public void log(InternalLogLevel level, String msg, Throwable t) {
333      delegate.log(level, msg, t);
334    }
335
336    @Override
337    public void log(InternalLogLevel level, Throwable t) {
338      delegate.log(level, t);
339    }
340  }
341}