001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.net.SocketAddress;
024import java.net.SocketTimeoutException;
025import java.util.Random;
026import java.util.concurrent.ThreadLocalRandom;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.MasterNotRunningException;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
036import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.ipc.RpcClientFactory;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058@Category({ MediumTests.class, ClientTests.class })
059public class TestClientTimeouts {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestClientTimeouts.class);
064
065  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066  protected static int SLAVES = 1;
067
068  /**
069   * @throws java.lang.Exception
070   */
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    TEST_UTIL.startMiniCluster(SLAVES);
074    // Set the custom RPC client with random timeouts as the client
075    TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
076      RandomTimeoutRpcClient.class.getName());
077  }
078
079  /**
080   * @throws java.lang.Exception
081   */
082  @AfterClass
083  public static void tearDownAfterClass() throws Exception {
084    TEST_UTIL.shutdownMiniCluster();
085  }
086
087  /**
088   * Test that a client that fails an RPC to the master retries properly and doesn't throw any
089   * unexpected exceptions. n
090   */
091  @Test
092  public void testAdminTimeout() throws Exception {
093    boolean lastFailed = false;
094    int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
095    RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
096      .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
097
098    try {
099      for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
100        lastFailed = false;
101        // Ensure the HBaseAdmin uses a new connection by changing Configuration.
102        Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
103        conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
104        Admin admin = null;
105        Connection connection = null;
106        try {
107          connection = ConnectionFactory.createConnection(conf);
108          admin = connection.getAdmin();
109          // run some admin commands
110          HBaseAdmin.available(conf);
111          admin.setBalancerRunning(false, false);
112        } catch (MasterNotRunningException ex) {
113          // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
114          // a MasterNotRunningException. It's a bug if we get other exceptions.
115          lastFailed = true;
116        } finally {
117          if (admin != null) {
118            admin.close();
119            if (admin.getConnection().isClosed()) {
120              rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
121                .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
122            }
123          }
124          if (connection != null) {
125            connection.close();
126          }
127        }
128      }
129      // Ensure the RandomTimeoutRpcEngine is actually being used.
130      assertFalse(lastFailed);
131      assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
132    } finally {
133      rpcClient.close();
134    }
135  }
136
137  /**
138   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
139   */
140  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
141    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
142      MetricsConnection metrics) {
143      super(conf, clusterId, localAddr, metrics);
144    }
145
146    // Return my own instance, one that does random timeouts
147    @Override
148    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
149      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
150    }
151
152    @Override
153    public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
154      return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
155    }
156  }
157
158  /**
159   * Blocking rpc channel that goes via hbase rpc.
160   */
161  static class RandomTimeoutBlockingRpcChannel
162    extends AbstractRpcClient.BlockingRpcChannelImplementation {
163    private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTime());
164    public static final double CHANCE_OF_TIMEOUT = 0.3;
165    private static AtomicInteger invokations = new AtomicInteger();
166
167    RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn,
168      final User ticket, final int rpcTimeout) {
169      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
170    }
171
172    @Override
173    public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param,
174      Message returnType) throws ServiceException {
175      invokations.getAndIncrement();
176      if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) {
177        // throw a ServiceException, becuase that is the only exception type that
178        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
179        // "actual" type, this may not properly mimic the underlying RpcEngine.
180        throw new ServiceException(new SocketTimeoutException("fake timeout"));
181      }
182      return super.callBlockingMethod(md, controller, param, returnType);
183    }
184  }
185
186  private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
187
188    RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
189      int rpcTimeout) {
190      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
191    }
192
193    @Override
194    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
195      Message returnType, RpcCallback<Message> done) {
196      RandomTimeoutBlockingRpcChannel.invokations.getAndIncrement();
197      if (
198        ThreadLocalRandom.current().nextFloat() < RandomTimeoutBlockingRpcChannel.CHANCE_OF_TIMEOUT
199      ) {
200        // throw a ServiceException, because that is the only exception type that
201        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
202        // "actual" type, this may not properly mimic the underlying RpcEngine.
203        ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
204        done.run(null);
205        return;
206      }
207      super.callMethod(md, controller, param, returnType, done);
208    }
209  }
210}