001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertThrows; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.CompletionException; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.CellScanner; 036import org.apache.hadoop.hbase.CompatibilityFactory; 037import org.apache.hadoop.hbase.ExtendedCell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.PrivateCellUtil; 042import org.apache.hadoop.hbase.test.MetricsAssertHelper; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.RPCTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 052import org.apache.hbase.thirdparty.io.netty.channel.Channel; 053 054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 055import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; 056 057@Category({ RPCTests.class, MediumTests.class }) 058public class TestNettyChannelWritability { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestNettyChannelWritability.class); 063 064 private static final MetricsAssertHelper METRICS_ASSERT = 065 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 066 067 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 068 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 069 070 /** 071 * Test that we properly send configured watermarks to netty, and trigger setWritable when 072 * necessary. 073 */ 074 @Test 075 public void testNettyWritableWatermarks() throws Exception { 076 Configuration conf = HBaseConfiguration.create(); 077 conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1); 078 conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2); 079 080 NettyRpcServer rpcServer = createRpcServer(conf, 0); 081 try { 082 sendAndReceive(conf, rpcServer, 5); 083 METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0, 084 rpcServer.metrics.getMetricsSource()); 085 } finally { 086 rpcServer.stop(); 087 } 088 } 089 090 /** 091 * Test that our fatal watermark is honored, which requires artificially causing some queueing so 092 * that pendingOutboundBytes increases. 093 */ 094 @Test 095 public void testNettyWritableFatalThreshold() throws Exception { 096 Configuration conf = HBaseConfiguration.create(); 097 conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1); 098 099 // flushAfter is 3 here, with requestCount 5 below. If we never flush, the WriteTasks will sit 100 // in the eventloop. So we flush a few at once, which will ensure that we hit fatal threshold 101 NettyRpcServer rpcServer = createRpcServer(conf, 3); 102 try { 103 CompletionException exception = 104 assertThrows(CompletionException.class, () -> sendAndReceive(conf, rpcServer, 5)); 105 assertTrue(exception.getCause().getCause() instanceof ServiceException); 106 METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0, 107 rpcServer.metrics.getMetricsSource()); 108 } finally { 109 rpcServer.stop(); 110 } 111 } 112 113 private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, int requestCount) 114 throws Exception { 115 List<ExtendedCell> cells = new ArrayList<>(); 116 int count = 3; 117 for (int i = 0; i < count; i++) { 118 cells.add(CELL); 119 } 120 121 try (NettyRpcClient client = new NettyRpcClient(conf)) { 122 rpcServer.start(); 123 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 124 newBlockingStub(client, rpcServer.getListenerAddress()); 125 CompletableFuture<Void>[] futures = new CompletableFuture[requestCount]; 126 for (int i = 0; i < requestCount; i++) { 127 futures[i] = CompletableFuture.runAsync(() -> { 128 try { 129 sendMessage(cells, stub); 130 } catch (Exception e) { 131 throw new RuntimeException(e); 132 } 133 }); 134 } 135 CompletableFuture.allOf(futures).join(); 136 } 137 } 138 139 private void sendMessage(List<ExtendedCell> cells, 140 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws Exception { 141 HBaseRpcController pcrc = 142 new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells)); 143 String message = "hello"; 144 assertEquals(message, 145 stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) 146 .getMessage()); 147 int index = 0; 148 CellScanner cellScanner = pcrc.cellScanner(); 149 assertNotNull(cellScanner); 150 while (cellScanner.advance()) { 151 assertEquals(CELL, cellScanner.current()); 152 index++; 153 } 154 assertEquals(cells.size(), index); 155 } 156 157 private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) throws IOException { 158 String name = "testRpcServer"; 159 ArrayList<RpcServer.BlockingServiceAndInterface> services = 160 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)); 161 162 InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0); 163 FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1); 164 165 AtomicInteger writeCount = new AtomicInteger(0); 166 167 return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true) { 168 @Override 169 protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { 170 return new NettyServerRpcConnection(this, channel) { 171 @Override 172 protected void doRespond(RpcResponse resp) { 173 if (writeCount.incrementAndGet() >= flushAfter) { 174 super.doRespond(resp); 175 } else { 176 channel.write(resp); 177 } 178 } 179 }; 180 } 181 }; 182 } 183}