001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.net.SocketTimeoutException;
026import java.util.Map;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
036import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.ipc.RpcClientFactory;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.junit.AfterClass;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
051import org.apache.hbase.thirdparty.com.google.protobuf.Message;
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
056
057@Category({ MediumTests.class, ClientTests.class })
058public class TestClientTimeouts {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestClientTimeouts.class);
063
064  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
065  protected static int SLAVES = 1;
066
067  /**
068   * @throws java.lang.Exception
069   */
070  @BeforeClass
071  public static void setUpBeforeClass() throws Exception {
072    TEST_UTIL.startMiniCluster(SLAVES);
073  }
074
075  /**
076   * @throws java.lang.Exception
077   */
078  @AfterClass
079  public static void tearDownAfterClass() throws Exception {
080    TEST_UTIL.shutdownMiniCluster();
081  }
082
083  private Connection createConnection() {
084    // Ensure the HBaseAdmin uses a new connection by changing Configuration.
085    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
086    // Set the custom RPC client with random timeouts as the client
087    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
088      RandomTimeoutRpcClient.class.getName());
089    conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
090    for (;;) {
091      try {
092        return ConnectionFactory.createConnection(conf);
093      } catch (IOException e) {
094        // since we randomly throw SocketTimeoutException, it is possible that we fail when creating
095        // the Connection, but this is not what we want to test here, so just ignore it and try
096        // again
097      }
098    }
099  }
100
101  /**
102   * Test that a client that fails an RPC to the master retries properly and doesn't throw any
103   * unexpected exceptions.
104   */
105  @Test
106  public void testAdminTimeout() throws Exception {
107    try (Connection conn = createConnection(); Admin admin = conn.getAdmin()) {
108      int initialInvocations = invokations.get();
109      boolean balanceEnabled = admin.isBalancerEnabled();
110      for (int i = 0; i < 5; i++) {
111        assertEquals(balanceEnabled, admin.balancerSwitch(!balanceEnabled, false));
112        balanceEnabled = !balanceEnabled;
113      }
114      // Ensure the RandomTimeoutRpcEngine is actually being used.
115      assertTrue(invokations.get() > initialInvocations);
116    }
117  }
118
119  /**
120   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
121   */
122  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
123    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
124      MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
125      super(conf, clusterId, localAddr, metrics, connectionAttributes);
126    }
127
128    // Return my own instance, one that does random timeouts
129    @Override
130    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
131      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
132    }
133
134    @Override
135    public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
136      return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
137    }
138  }
139
140  private static final double CHANCE_OF_TIMEOUT = 0.3;
141  private static AtomicInteger invokations = new AtomicInteger();
142
143  /**
144   * Blocking rpc channel that goes via hbase rpc.
145   */
146  static class RandomTimeoutBlockingRpcChannel
147    extends AbstractRpcClient.BlockingRpcChannelImplementation {
148
149    RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn,
150      final User ticket, final int rpcTimeout) {
151      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
152    }
153
154    @Override
155    public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param,
156      Message returnType) throws ServiceException {
157      invokations.getAndIncrement();
158      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
159        // throw a ServiceException, becuase that is the only exception type that
160        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
161        // "actual" type, this may not properly mimic the underlying RpcEngine.
162        throw new ServiceException(new SocketTimeoutException("fake timeout"));
163      }
164      return super.callBlockingMethod(md, controller, param, returnType);
165    }
166  }
167
168  private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
169
170    RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
171      int rpcTimeout) {
172      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
173    }
174
175    @Override
176    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
177      Message returnType, RpcCallback<Message> done) {
178      invokations.getAndIncrement();
179      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
180        // throw a ServiceException, because that is the only exception type that
181        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
182        // "actual" type, this may not properly mimic the underlying RpcEngine.
183        ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
184        done.run(null);
185        return;
186      }
187      super.callMethod(md, controller, param, returnType, done);
188    }
189  }
190}