001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
021import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.ForkJoinPool;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.function.Supplier;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
037import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
038import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
039import org.apache.hadoop.hbase.coprocessor.MasterObserver;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Threads;
044import org.junit.After;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.junit.runner.RunWith;
050import org.junit.runners.Parameterized;
051import org.junit.runners.Parameterized.Parameter;
052import org.junit.runners.Parameterized.Parameters;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056@RunWith(Parameterized.class)
057@Category({ LargeTests.class, ClientTests.class })
058public class TestAsyncAdminBuilder {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestAsyncAdminBuilder.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncAdminBuilder.class);
065  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066  private static AsyncConnection ASYNC_CONN;
067
068  @Parameter
069  public Supplier<AsyncAdminBuilder> getAdminBuilder;
070
071  private static AsyncAdminBuilder getRawAsyncAdminBuilder() {
072    return ASYNC_CONN.getAdminBuilder();
073  }
074
075  private static AsyncAdminBuilder getAsyncAdminBuilder() {
076    return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool());
077  }
078
079  @Parameters
080  public static List<Object[]> params() {
081    return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder },
082      new Supplier<?>[] { TestAsyncAdminBuilder::getAsyncAdminBuilder });
083  }
084
085  private static final int DEFAULT_RPC_TIMEOUT = 10000;
086  private static final int DEFAULT_OPERATION_TIMEOUT = 30000;
087  private static final int DEFAULT_RETRIES_NUMBER = 2;
088
089  @Before
090  public void setUp() throws Exception {
091    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT);
092    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
093      DEFAULT_OPERATION_TIMEOUT);
094    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
095      DEFAULT_RETRIES_NUMBER);
096    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
097  }
098
099  @After
100  public void tearDown() throws Exception {
101    IOUtils.closeQuietly(ASYNC_CONN);
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  @Test
106  public void testRpcTimeout() throws Exception {
107    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
108      TestRpcTimeoutCoprocessor.class.getName());
109    TEST_UTIL.startMiniCluster(2);
110    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
111
112    try {
113      getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
114          .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
115      fail("We expect an exception here");
116    } catch (Exception e) {
117      // expected
118    }
119
120    try {
121      getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
122          .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
123    } catch (Exception e) {
124      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
125    }
126  }
127
128  @Test
129  public void testOperationTimeout() throws Exception {
130    // set retry number to 100 to make sure that this test only be affected by operation timeout
131    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100);
132    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
133      TestOperationTimeoutCoprocessor.class.getName());
134    TEST_UTIL.startMiniCluster(2);
135    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
136
137    try {
138      getAdminBuilder.get()
139          .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
140          .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
141      fail("We expect an exception here");
142    } catch (Exception e) {
143      // expected
144    }
145
146    try {
147      getAdminBuilder.get()
148          .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
149          .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
150    } catch (Exception e) {
151      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
152    }
153  }
154
155  @Test
156  public void testMaxRetries() throws Exception {
157    // set operation timeout to 300s to make sure that this test only be affected by retry number
158    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000);
159    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
160      TestMaxRetriesCoprocessor.class.getName());
161    TEST_UTIL.startMiniCluster(2);
162    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
163
164    try {
165      getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build()
166          .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
167      fail("We expect an exception here");
168    } catch (Exception e) {
169      // expected
170    }
171
172    try {
173      getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build()
174          .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
175    } catch (Exception e) {
176      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
177    }
178  }
179
180  public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
181    public TestRpcTimeoutCoprocessor() {
182    }
183
184
185    @Override
186    public Optional<MasterObserver> getMasterObserver() {
187      return Optional.of(this);
188    }
189    @Override
190    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
191        String namespace) throws IOException {
192      Threads.sleep(DEFAULT_RPC_TIMEOUT);
193    }
194  }
195
196  public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
197    AtomicLong sleepTime = new AtomicLong(0);
198
199    public TestOperationTimeoutCoprocessor() {
200    }
201
202    @Override
203    public Optional<MasterObserver> getMasterObserver() {
204      return Optional.of(this);
205    }
206
207    @Override
208    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
209        String namespace) throws IOException {
210      Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
211      if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
212        throw new IOException("call fail");
213      }
214    }
215  }
216
217  public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver {
218    AtomicLong retryNum = new AtomicLong(0);
219
220    public TestMaxRetriesCoprocessor() {
221    }
222
223    @Override
224    public Optional<MasterObserver> getMasterObserver() {
225      return Optional.of(this);
226    }
227
228    @Override
229    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
230        String namespace) throws IOException {
231      if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
232        throw new IOException("call fail");
233      }
234    }
235  }
236}