001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
021import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.util.Optional;
026import java.util.concurrent.ForkJoinPool;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.function.Supplier;
030import java.util.stream.Stream;
031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
035import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.MasterObserver;
038import org.apache.hadoop.hbase.coprocessor.ObserverContext;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.util.Threads;
042import org.junit.jupiter.api.AfterEach;
043import org.junit.jupiter.api.BeforeEach;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.TestTemplate;
046import org.junit.jupiter.params.provider.Arguments;
047
048import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
049
050@Tag(LargeTests.TAG)
051@Tag(ClientTests.TAG)
052@HBaseParameterizedTestTemplate
053public class TestAsyncAdminBuilder {
054
055  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056  private static AsyncConnection ASYNC_CONN;
057
058  private final Supplier<AsyncAdminBuilder> getAdminBuilder;
059
060  public TestAsyncAdminBuilder(Supplier<AsyncAdminBuilder> getAdminBuilder) {
061    this.getAdminBuilder = getAdminBuilder;
062  }
063
064  private static AsyncAdminBuilder getRawAsyncAdminBuilder() {
065    return ASYNC_CONN.getAdminBuilder();
066  }
067
068  private static AsyncAdminBuilder getAsyncAdminBuilder() {
069    return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool());
070  }
071
072  public static Stream<Arguments> parameters() {
073    return Stream.of(
074      Arguments.of((Supplier<AsyncAdminBuilder>) TestAsyncAdminBuilder::getRawAsyncAdminBuilder),
075      Arguments.of((Supplier<AsyncAdminBuilder>) TestAsyncAdminBuilder::getAsyncAdminBuilder));
076  }
077
078  private static final int DEFAULT_RPC_TIMEOUT = 10000;
079  private static final int DEFAULT_OPERATION_TIMEOUT = 30000;
080  private static final int DEFAULT_RETRIES_NUMBER = 2;
081
082  @BeforeEach
083  public void setUp() throws Exception {
084    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT);
085    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
086      DEFAULT_OPERATION_TIMEOUT);
087    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
088      DEFAULT_RETRIES_NUMBER);
089    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
090  }
091
092  @AfterEach
093  public void tearDown() throws Exception {
094    Closeables.close(ASYNC_CONN, true);
095    TEST_UTIL.shutdownMiniCluster();
096  }
097
098  @TestTemplate
099  public void testRpcTimeout() throws Exception {
100    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
101      TestRpcTimeoutCoprocessor.class.getName());
102    TEST_UTIL.startMiniCluster(2);
103    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
104
105    try {
106      getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
107        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
108      fail("We expect an exception here");
109    } catch (Exception e) {
110      // expected
111    }
112
113    try {
114      getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
115        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
116    } catch (Exception e) {
117      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
118    }
119  }
120
121  @TestTemplate
122  public void testOperationTimeout() throws Exception {
123    // set retry number to 100 to make sure that this test only be affected by operation timeout
124    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100);
125    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
126      TestOperationTimeoutCoprocessor.class.getName());
127    TEST_UTIL.startMiniCluster(2);
128    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
129
130    try {
131      getAdminBuilder.get()
132        .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
133        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
134      fail("We expect an exception here");
135    } catch (Exception e) {
136      // expected
137    }
138
139    try {
140      getAdminBuilder.get()
141        .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
142        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
143    } catch (Exception e) {
144      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
145    }
146  }
147
148  @TestTemplate
149  public void testMaxRetries() throws Exception {
150    // set operation timeout to 300s to make sure that this test only be affected by retry number
151    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000);
152    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
153      TestMaxRetriesCoprocessor.class.getName());
154    TEST_UTIL.startMiniCluster(2);
155    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
156
157    try {
158      getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build()
159        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
160      fail("We expect an exception here");
161    } catch (Exception e) {
162      // expected
163    }
164
165    try {
166      getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build()
167        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
168    } catch (Exception e) {
169      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
170    }
171  }
172
173  public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
174    public TestRpcTimeoutCoprocessor() {
175    }
176
177    @Override
178    public Optional<MasterObserver> getMasterObserver() {
179      return Optional.of(this);
180    }
181
182    @Override
183    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
184      String namespace) throws IOException {
185      Threads.sleep(DEFAULT_RPC_TIMEOUT);
186    }
187  }
188
189  public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
190    AtomicLong sleepTime = new AtomicLong(0);
191
192    public TestOperationTimeoutCoprocessor() {
193    }
194
195    @Override
196    public Optional<MasterObserver> getMasterObserver() {
197      return Optional.of(this);
198    }
199
200    @Override
201    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
202      String namespace) throws IOException {
203      Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
204      if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
205        throw new IOException("call fail");
206      }
207    }
208  }
209
210  public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver {
211    AtomicLong retryNum = new AtomicLong(0);
212
213    public TestMaxRetriesCoprocessor() {
214    }
215
216    @Override
217    public Optional<MasterObserver> getMasterObserver() {
218      return Optional.of(this);
219    }
220
221    @Override
222    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
223      String namespace) throws IOException {
224      if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
225        throw new IOException("call fail");
226      }
227    }
228  }
229}