001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.net.SocketTimeoutException; 026import java.util.Map; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 035import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 036import org.apache.hadoop.hbase.ipc.HBaseRpcController; 037import org.apache.hadoop.hbase.ipc.RpcClientFactory; 038import org.apache.hadoop.hbase.net.Address; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.junit.jupiter.api.AfterAll; 043import org.junit.jupiter.api.BeforeAll; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 049import org.apache.hbase.thirdparty.com.google.protobuf.Message; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 051import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 054 055@Tag(MediumTests.TAG) 056@Tag(ClientTests.TAG) 057public class TestClientTimeouts { 058 059 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 060 protected static int SLAVES = 1; 061 062 @BeforeAll 063 public static void setUpBeforeClass() throws Exception { 064 TEST_UTIL.startMiniCluster(SLAVES); 065 } 066 067 @AfterAll 068 public static void tearDownAfterClass() throws Exception { 069 TEST_UTIL.shutdownMiniCluster(); 070 } 071 072 private Connection createConnection() { 073 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 074 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 075 // Set the custom RPC client with random timeouts as the client 076 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 077 RandomTimeoutRpcClient.class.getName()); 078 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 079 for (;;) { 080 try { 081 return ConnectionFactory.createConnection(conf); 082 } catch (IOException e) { 083 // since we randomly throw SocketTimeoutException, it is possible that we fail when creating 084 // the Connection, but this is not what we want to test here, so just ignore it and try 085 // again 086 } 087 } 088 } 089 090 /** 091 * Test that a client that fails an RPC to the master retries properly and doesn't throw any 092 * unexpected exceptions. 093 */ 094 @Test 095 public void testAdminTimeout() throws Exception { 096 try (Connection conn = createConnection(); Admin admin = conn.getAdmin()) { 097 int initialInvocations = invokations.get(); 098 boolean balanceEnabled = admin.isBalancerEnabled(); 099 for (int i = 0; i < 5; i++) { 100 assertEquals(balanceEnabled, admin.balancerSwitch(!balanceEnabled, false)); 101 balanceEnabled = !balanceEnabled; 102 } 103 // Ensure the RandomTimeoutRpcEngine is actually being used. 104 assertTrue(invokations.get() > initialInvocations); 105 } 106 } 107 108 /** 109 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 110 */ 111 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 112 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 113 MetricsConnection metrics, Map<String, byte[]> connectionAttributes) { 114 super(conf, clusterId, localAddr, metrics, connectionAttributes); 115 } 116 117 // Return my own instance, one that does random timeouts 118 @Override 119 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 120 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 121 } 122 123 @Override 124 public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 125 return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); 126 } 127 } 128 129 private static AtomicInteger invokations = new AtomicInteger(); 130 131 private static final double CHANCE_OF_TIMEOUT = 0.3; 132 133 /** 134 * Blocking rpc channel that goes via hbase rpc. 135 */ 136 private static class RandomTimeoutBlockingRpcChannel 137 extends AbstractRpcClient.BlockingRpcChannelImplementation { 138 139 RandomTimeoutBlockingRpcChannel(BlockingRpcClient rpcClient, ServerName sn, User ticket, 140 int rpcTimeout) { 141 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 142 } 143 144 @Override 145 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, 146 Message returnType) throws ServiceException { 147 invokations.getAndIncrement(); 148 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 149 // throw a ServiceException, becuase that is the only exception type that 150 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 151 // "actual" type, this may not properly mimic the underlying RpcEngine. 152 throw new ServiceException(new SocketTimeoutException("fake timeout")); 153 } 154 return super.callBlockingMethod(md, controller, param, returnType); 155 } 156 } 157 158 private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { 159 160 RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket, 161 int rpcTimeout) { 162 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 163 } 164 165 @Override 166 public void callMethod(MethodDescriptor md, RpcController controller, Message param, 167 Message returnType, RpcCallback<Message> done) { 168 invokations.getAndIncrement(); 169 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 170 // throw a ServiceException, because that is the only exception type that 171 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 172 // "actual" type, this may not properly mimic the underlying RpcEngine. 173 ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout")); 174 done.run(null); 175 return; 176 } 177 super.callMethod(md, controller, param, returnType, done); 178 } 179 } 180}