001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.net.SocketAddress;
024import java.net.SocketTimeoutException;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
034import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
035import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
036import org.apache.hadoop.hbase.ipc.HBaseRpcController;
037import org.apache.hadoop.hbase.ipc.RpcClientFactory;
038import org.apache.hadoop.hbase.net.Address;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047
048import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
049import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
050import org.apache.hbase.thirdparty.com.google.protobuf.Message;
051import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
054import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
055
056@Category({ MediumTests.class, ClientTests.class })
057public class TestClientTimeouts {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestClientTimeouts.class);
062
063  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
064  protected static int SLAVES = 1;
065
066  @BeforeClass
067  public static void setUpBeforeClass() throws Exception {
068    TEST_UTIL.startMiniCluster(SLAVES);
069    // Set the custom RPC client with random timeouts as the client
070    TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
071      RandomTimeoutRpcClient.class.getName());
072  }
073
074  @AfterClass
075  public static void tearDownAfterClass() throws Exception {
076    TEST_UTIL.shutdownMiniCluster();
077  }
078
079  /**
080   * Test that a client that fails an RPC to the master retries properly and doesn't throw any
081   * unexpected exceptions.
082   */
083  @Test
084  public void testAdminTimeout() throws Exception {
085    boolean lastFailed = false;
086    int initialInvocations = invokations.get();
087    RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
088      .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
089
090    try {
091      for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
092        lastFailed = false;
093        // Ensure the HBaseAdmin uses a new connection by changing Configuration.
094        Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
095        conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
096        Admin admin = null;
097        Connection connection = null;
098        try {
099          connection = ConnectionFactory.createConnection(conf);
100          admin = connection.getAdmin();
101          admin.balancerSwitch(false, false);
102        } catch (MasterRegistryFetchException ex) {
103          // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
104          // a MasterRegistryFetchException. It's a bug if we get other exceptions.
105          lastFailed = true;
106        } finally {
107          if (admin != null) {
108            admin.close();
109            if (admin.getConnection().isClosed()) {
110              rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
111                .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
112            }
113          }
114          if (connection != null) {
115            connection.close();
116          }
117        }
118      }
119      // Ensure the RandomTimeoutRpcEngine is actually being used.
120      assertFalse(lastFailed);
121      assertTrue(invokations.get() > initialInvocations);
122    } finally {
123      rpcClient.close();
124    }
125  }
126
127  /**
128   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
129   */
130  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
131    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
132        MetricsConnection metrics) {
133      super(conf, clusterId, localAddr, metrics);
134    }
135
136    // Return my own instance, one that does random timeouts
137    @Override
138    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
139      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
140    }
141
142    @Override
143    public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
144      return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
145    }
146  }
147
148  private static AtomicInteger invokations = new AtomicInteger();
149
150  private static final double CHANCE_OF_TIMEOUT = 0.3;
151
152  /**
153   * Blocking rpc channel that goes via hbase rpc.
154   */
155  private static class RandomTimeoutBlockingRpcChannel
156      extends AbstractRpcClient.BlockingRpcChannelImplementation {
157
158    RandomTimeoutBlockingRpcChannel(BlockingRpcClient rpcClient, ServerName sn, User ticket,
159        int rpcTimeout) {
160      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
161    }
162
163    @Override
164    public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param,
165        Message returnType) throws ServiceException {
166      invokations.getAndIncrement();
167      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
168        // throw a ServiceException, becuase that is the only exception type that
169        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
170        // "actual" type, this may not properly mimic the underlying RpcEngine.
171        throw new ServiceException(new SocketTimeoutException("fake timeout"));
172      }
173      return super.callBlockingMethod(md, controller, param, returnType);
174    }
175  }
176
177  private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
178
179    RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
180        int rpcTimeout) {
181      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
182    }
183
184    @Override
185    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
186        Message returnType, RpcCallback<Message> done) {
187      invokations.getAndIncrement();
188      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
189        // throw a ServiceException, because that is the only exception type that
190        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
191        // "actual" type, this may not properly mimic the underlying RpcEngine.
192        ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
193        done.run(null);
194        return;
195      }
196      super.callMethod(md, controller, param, returnType, done);
197    }
198  }
199}