001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.nio.ByteBuffer;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.io.ByteBufferPool;
029import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.nio.MultiByteBuff;
032import org.apache.hadoop.hbase.nio.SingleByteBuff;
033import org.apache.hadoop.hbase.testclassification.RPCTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Pair;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039
040@Category({ RPCTests.class, SmallTests.class })
041public class TestRpcServer {
042
043  @ClassRule
044  public static final HBaseClassTestRule CLASS_RULE =
045      HBaseClassTestRule.forClass(TestRpcServer.class);
046
047  @Test
048  public void testAllocateByteBuffToReadInto() throws Exception {
049    int maxBuffersInPool = 10;
050    ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
051    initPoolWithAllBuffers(pool, maxBuffersInPool);
052    ByteBuff buff = null;
053    Pair<ByteBuff, CallCleanup> pair;
054    // When the request size is less than 1/6th of the pool buffer size. We should use on demand
055    // created on heap Buffer
056    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
057        200);
058    buff = pair.getFirst();
059    assertTrue(buff.hasArray());
060    assertEquals(maxBuffersInPool, pool.getQueueSize());
061    assertNull(pair.getSecond());
062    // When the request size is > 1/6th of the pool buffer size.
063    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
064        1024);
065    buff = pair.getFirst();
066    assertFalse(buff.hasArray());
067    assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
068    assertNotNull(pair.getSecond());
069    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
070    assertEquals(maxBuffersInPool, pool.getQueueSize());
071    // Request size> pool buffer size
072    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
073        7 * 1024);
074    buff = pair.getFirst();
075    assertFalse(buff.hasArray());
076    assertTrue(buff instanceof MultiByteBuff);
077    ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
078    assertEquals(2, bbs.length);
079    assertTrue(bbs[0].isDirect());
080    assertTrue(bbs[1].isDirect());
081    assertEquals(6 * 1024, bbs[0].limit());
082    assertEquals(1024, bbs[1].limit());
083    assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
084    assertNotNull(pair.getSecond());
085    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
086    assertEquals(maxBuffersInPool, pool.getQueueSize());
087
088    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
089        6 * 1024 + 200);
090    buff = pair.getFirst();
091    assertFalse(buff.hasArray());
092    assertTrue(buff instanceof MultiByteBuff);
093    bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
094    assertEquals(2, bbs.length);
095    assertTrue(bbs[0].isDirect());
096    assertFalse(bbs[1].isDirect());
097    assertEquals(6 * 1024, bbs[0].limit());
098    assertEquals(200, bbs[1].limit());
099    assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
100    assertNotNull(pair.getSecond());
101    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
102    assertEquals(maxBuffersInPool, pool.getQueueSize());
103
104    ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
105    for (int i = 0; i < maxBuffersInPool - 1; i++) {
106      buffers[i] = pool.getBuffer();
107    }
108    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
109        20 * 1024);
110    buff = pair.getFirst();
111    assertFalse(buff.hasArray());
112    assertTrue(buff instanceof MultiByteBuff);
113    bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
114    assertEquals(2, bbs.length);
115    assertTrue(bbs[0].isDirect());
116    assertFalse(bbs[1].isDirect());
117    assertEquals(6 * 1024, bbs[0].limit());
118    assertEquals(14 * 1024, bbs[1].limit());
119    assertEquals(0, pool.getQueueSize());
120    assertNotNull(pair.getSecond());
121    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
122    assertEquals(1, pool.getQueueSize());
123    pool.getBuffer();
124    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
125        7 * 1024);
126    buff = pair.getFirst();
127    assertTrue(buff.hasArray());
128    assertTrue(buff instanceof SingleByteBuff);
129    assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
130    assertNull(pair.getSecond());
131  }
132
133  private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
134    ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
135    // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
136    // all. Makes pool with max #buffers.
137    for (int i = 0; i < maxBuffersInPool; i++) {
138      buffers[i] = pool.getBuffer();
139    }
140    for (ByteBuffer buf : buffers) {
141      pool.putbackBuffer(buf);
142    }
143  }
144}