001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertThrows;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.CompletionException;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CellScanner;
036import org.apache.hadoop.hbase.CompatibilityFactory;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.PrivateCellUtil;
042import org.apache.hadoop.hbase.test.MetricsAssertHelper;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.RPCTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
052import org.apache.hbase.thirdparty.io.netty.channel.Channel;
053
054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
055import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
056
057@Category({ RPCTests.class, MediumTests.class })
058public class TestNettyChannelWritability {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestNettyChannelWritability.class);
063
064  private static final MetricsAssertHelper METRICS_ASSERT =
065    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
066
067  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
068  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
069
070  /**
071   * Test that we properly send configured watermarks to netty, and trigger setWritable when
072   * necessary.
073   */
074  @Test
075  public void testNettyWritableWatermarks() throws Exception {
076    Configuration conf = HBaseConfiguration.create();
077    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1);
078    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2);
079
080    NettyRpcServer rpcServer = createRpcServer(conf, 0);
081    try {
082      sendAndReceive(conf, rpcServer, 5);
083      METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0,
084        rpcServer.metrics.getMetricsSource());
085    } finally {
086      rpcServer.stop();
087    }
088  }
089
090  /**
091   * Test that our fatal watermark is honored, which requires artificially causing some queueing so
092   * that pendingOutboundBytes increases.
093   */
094  @Test
095  public void testNettyWritableFatalThreshold() throws Exception {
096    Configuration conf = HBaseConfiguration.create();
097    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1);
098
099    // flushAfter is 3 here, with requestCount 5 below. If we never flush, the WriteTasks will sit
100    // in the eventloop. So we flush a few at once, which will ensure that we hit fatal threshold
101    NettyRpcServer rpcServer = createRpcServer(conf, 3);
102    try {
103      CompletionException exception =
104        assertThrows(CompletionException.class, () -> sendAndReceive(conf, rpcServer, 5));
105      assertTrue(exception.getCause().getCause() instanceof ServiceException);
106      METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0,
107        rpcServer.metrics.getMetricsSource());
108    } finally {
109      rpcServer.stop();
110    }
111  }
112
113  private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, int requestCount)
114    throws Exception {
115    List<ExtendedCell> cells = new ArrayList<>();
116    int count = 3;
117    for (int i = 0; i < count; i++) {
118      cells.add(CELL);
119    }
120
121    try (NettyRpcClient client = new NettyRpcClient(conf)) {
122      rpcServer.start();
123      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
124        newBlockingStub(client, rpcServer.getListenerAddress());
125      CompletableFuture<Void>[] futures = new CompletableFuture[requestCount];
126      for (int i = 0; i < requestCount; i++) {
127        futures[i] = CompletableFuture.runAsync(() -> {
128          try {
129            sendMessage(cells, stub);
130          } catch (Exception e) {
131            throw new RuntimeException(e);
132          }
133        });
134      }
135      CompletableFuture.allOf(futures).join();
136    }
137  }
138
139  private void sendMessage(List<ExtendedCell> cells,
140    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws Exception {
141    HBaseRpcController pcrc =
142      new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells));
143    String message = "hello";
144    assertEquals(message,
145      stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
146        .getMessage());
147    int index = 0;
148    CellScanner cellScanner = pcrc.cellScanner();
149    assertNotNull(cellScanner);
150    while (cellScanner.advance()) {
151      assertEquals(CELL, cellScanner.current());
152      index++;
153    }
154    assertEquals(cells.size(), index);
155  }
156
157  private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) throws IOException {
158    String name = "testRpcServer";
159    ArrayList<RpcServer.BlockingServiceAndInterface> services =
160      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null));
161
162    InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0);
163    FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1);
164
165    AtomicInteger writeCount = new AtomicInteger(0);
166
167    return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true) {
168      @Override
169      protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
170        return new NettyServerRpcConnection(this, channel) {
171          @Override
172          protected void doRespond(RpcResponse resp) {
173            if (writeCount.incrementAndGet() >= flushAfter) {
174              super.doRespond(resp);
175            } else {
176              channel.write(resp);
177            }
178          }
179        };
180      }
181    };
182  }
183}