001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertThrows; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.CompletionException; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.CellScanner; 036import org.apache.hadoop.hbase.CompatibilityFactory; 037import org.apache.hadoop.hbase.ExtendedCell; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.test.MetricsAssertHelper; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.testclassification.RPCTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 050import org.apache.hbase.thirdparty.io.netty.channel.Channel; 051 052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; 054 055@Tag(RPCTests.TAG) 056@Tag(MediumTests.TAG) 057public class TestNettyChannelWritability { 058 059 private static final MetricsAssertHelper METRICS_ASSERT = 060 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 061 062 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 063 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 064 065 /** 066 * Test that we properly send configured watermarks to netty, and trigger setWritable when 067 * necessary. 068 */ 069 @Test 070 public void testNettyWritableWatermarks() throws Exception { 071 Configuration conf = HBaseConfiguration.create(); 072 conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1); 073 conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2); 074 075 NettyRpcServer rpcServer = createRpcServer(conf, 0); 076 try { 077 sendAndReceive(conf, rpcServer, 5); 078 METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0, 079 rpcServer.metrics.getMetricsSource()); 080 } finally { 081 rpcServer.stop(); 082 } 083 } 084 085 /** 086 * Test that our fatal watermark is honored, which requires artificially causing some queueing so 087 * that pendingOutboundBytes increases. 088 */ 089 @Test 090 public void testNettyWritableFatalThreshold() throws Exception { 091 Configuration conf = HBaseConfiguration.create(); 092 conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1); 093 094 // flushAfter is 3 here, with requestCount 5 below. If we never flush, the WriteTasks will sit 095 // in the eventloop. So we flush a few at once, which will ensure that we hit fatal threshold 096 NettyRpcServer rpcServer = createRpcServer(conf, 3); 097 try { 098 CompletionException exception = 099 assertThrows(CompletionException.class, () -> sendAndReceive(conf, rpcServer, 5)); 100 assertTrue(exception.getCause().getCause() instanceof ServiceException); 101 METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0, 102 rpcServer.metrics.getMetricsSource()); 103 } finally { 104 rpcServer.stop(); 105 } 106 } 107 108 private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, int requestCount) 109 throws Exception { 110 List<ExtendedCell> cells = new ArrayList<>(); 111 int count = 3; 112 for (int i = 0; i < count; i++) { 113 cells.add(CELL); 114 } 115 116 try (NettyRpcClient client = new NettyRpcClient(conf)) { 117 rpcServer.start(); 118 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 119 newBlockingStub(client, rpcServer.getListenerAddress()); 120 CompletableFuture<Void>[] futures = new CompletableFuture[requestCount]; 121 for (int i = 0; i < requestCount; i++) { 122 futures[i] = CompletableFuture.runAsync(() -> { 123 try { 124 sendMessage(cells, stub); 125 } catch (Exception e) { 126 throw new RuntimeException(e); 127 } 128 }); 129 } 130 CompletableFuture.allOf(futures).join(); 131 } 132 } 133 134 private void sendMessage(List<ExtendedCell> cells, 135 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws Exception { 136 HBaseRpcController pcrc = 137 new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells)); 138 String message = "hello"; 139 assertEquals(message, 140 stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) 141 .getMessage()); 142 int index = 0; 143 CellScanner cellScanner = pcrc.cellScanner(); 144 assertNotNull(cellScanner); 145 while (cellScanner.advance()) { 146 assertEquals(CELL, cellScanner.current()); 147 index++; 148 } 149 assertEquals(cells.size(), index); 150 } 151 152 private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) throws IOException { 153 String name = "testRpcServer"; 154 ArrayList<RpcServer.BlockingServiceAndInterface> services = 155 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)); 156 157 InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0); 158 FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1); 159 160 AtomicInteger writeCount = new AtomicInteger(0); 161 162 return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true) { 163 @Override 164 protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { 165 return new NettyServerRpcConnection(this, channel) { 166 @Override 167 protected void doRespond(RpcResponse resp) { 168 if (writeCount.incrementAndGet() >= flushAfter) { 169 super.doRespond(resp); 170 } else { 171 channel.write(resp); 172 } 173 } 174 }; 175 } 176 }; 177 } 178}