001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertThrows;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.CompletionException;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CellScanner;
036import org.apache.hadoop.hbase.CompatibilityFactory;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.test.MetricsAssertHelper;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.testclassification.RPCTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
050import org.apache.hbase.thirdparty.io.netty.channel.Channel;
051
052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
054
055@Tag(RPCTests.TAG)
056@Tag(MediumTests.TAG)
057public class TestNettyChannelWritability {
058
059  private static final MetricsAssertHelper METRICS_ASSERT =
060    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
061
062  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
063  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
064
065  /**
066   * Test that we properly send configured watermarks to netty, and trigger setWritable when
067   * necessary.
068   */
069  @Test
070  public void testNettyWritableWatermarks() throws Exception {
071    Configuration conf = HBaseConfiguration.create();
072    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1);
073    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2);
074
075    NettyRpcServer rpcServer = createRpcServer(conf, 0);
076    try {
077      sendAndReceive(conf, rpcServer, 5);
078      METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0,
079        rpcServer.metrics.getMetricsSource());
080    } finally {
081      rpcServer.stop();
082    }
083  }
084
085  /**
086   * Test that our fatal watermark is honored, which requires artificially causing some queueing so
087   * that pendingOutboundBytes increases.
088   */
089  @Test
090  public void testNettyWritableFatalThreshold() throws Exception {
091    Configuration conf = HBaseConfiguration.create();
092    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1);
093
094    // flushAfter is 3 here, with requestCount 5 below. If we never flush, the WriteTasks will sit
095    // in the eventloop. So we flush a few at once, which will ensure that we hit fatal threshold
096    NettyRpcServer rpcServer = createRpcServer(conf, 3);
097    try {
098      CompletionException exception =
099        assertThrows(CompletionException.class, () -> sendAndReceive(conf, rpcServer, 5));
100      assertTrue(exception.getCause().getCause() instanceof ServiceException);
101      METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0,
102        rpcServer.metrics.getMetricsSource());
103    } finally {
104      rpcServer.stop();
105    }
106  }
107
108  private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, int requestCount)
109    throws Exception {
110    List<ExtendedCell> cells = new ArrayList<>();
111    int count = 3;
112    for (int i = 0; i < count; i++) {
113      cells.add(CELL);
114    }
115
116    try (NettyRpcClient client = new NettyRpcClient(conf)) {
117      rpcServer.start();
118      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
119        newBlockingStub(client, rpcServer.getListenerAddress());
120      CompletableFuture<Void>[] futures = new CompletableFuture[requestCount];
121      for (int i = 0; i < requestCount; i++) {
122        futures[i] = CompletableFuture.runAsync(() -> {
123          try {
124            sendMessage(cells, stub);
125          } catch (Exception e) {
126            throw new RuntimeException(e);
127          }
128        });
129      }
130      CompletableFuture.allOf(futures).join();
131    }
132  }
133
134  private void sendMessage(List<ExtendedCell> cells,
135    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws Exception {
136    HBaseRpcController pcrc =
137      new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells));
138    String message = "hello";
139    assertEquals(message,
140      stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
141        .getMessage());
142    int index = 0;
143    CellScanner cellScanner = pcrc.cellScanner();
144    assertNotNull(cellScanner);
145    while (cellScanner.advance()) {
146      assertEquals(CELL, cellScanner.current());
147      index++;
148    }
149    assertEquals(cells.size(), index);
150  }
151
152  private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) throws IOException {
153    String name = "testRpcServer";
154    ArrayList<RpcServer.BlockingServiceAndInterface> services =
155      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null));
156
157    InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0);
158    FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1);
159
160    AtomicInteger writeCount = new AtomicInteger(0);
161
162    return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true) {
163      @Override
164      protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
165        return new NettyServerRpcConnection(this, channel) {
166          @Override
167          protected void doRespond(RpcResponse resp) {
168            if (writeCount.incrementAndGet() >= flushAfter) {
169              super.doRespond(resp);
170            } else {
171              channel.write(resp);
172            }
173          }
174        };
175      }
176    };
177  }
178}