001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR; 021import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.Optional; 026import java.util.concurrent.ForkJoinPool; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.function.Supplier; 030import java.util.stream.Stream; 031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 035import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.MasterObserver; 038import org.apache.hadoop.hbase.coprocessor.ObserverContext; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.util.Threads; 042import org.junit.jupiter.api.AfterEach; 043import org.junit.jupiter.api.BeforeEach; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.TestTemplate; 046import org.junit.jupiter.params.provider.Arguments; 047 048import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 049 050@Tag(LargeTests.TAG) 051@Tag(ClientTests.TAG) 052@HBaseParameterizedTestTemplate 053public class TestAsyncAdminBuilder { 054 055 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 private static AsyncConnection ASYNC_CONN; 057 058 private final Supplier<AsyncAdminBuilder> getAdminBuilder; 059 060 public TestAsyncAdminBuilder(Supplier<AsyncAdminBuilder> getAdminBuilder) { 061 this.getAdminBuilder = getAdminBuilder; 062 } 063 064 private static AsyncAdminBuilder getRawAsyncAdminBuilder() { 065 return ASYNC_CONN.getAdminBuilder(); 066 } 067 068 private static AsyncAdminBuilder getAsyncAdminBuilder() { 069 return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool()); 070 } 071 072 public static Stream<Arguments> parameters() { 073 return Stream.of( 074 Arguments.of((Supplier<AsyncAdminBuilder>) TestAsyncAdminBuilder::getRawAsyncAdminBuilder), 075 Arguments.of((Supplier<AsyncAdminBuilder>) TestAsyncAdminBuilder::getAsyncAdminBuilder)); 076 } 077 078 private static final int DEFAULT_RPC_TIMEOUT = 10000; 079 private static final int DEFAULT_OPERATION_TIMEOUT = 30000; 080 private static final int DEFAULT_RETRIES_NUMBER = 2; 081 082 @BeforeEach 083 public void setUp() throws Exception { 084 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT); 085 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 086 DEFAULT_OPERATION_TIMEOUT); 087 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 088 DEFAULT_RETRIES_NUMBER); 089 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 090 } 091 092 @AfterEach 093 public void tearDown() throws Exception { 094 Closeables.close(ASYNC_CONN, true); 095 TEST_UTIL.shutdownMiniCluster(); 096 } 097 098 @TestTemplate 099 public void testRpcTimeout() throws Exception { 100 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 101 TestRpcTimeoutCoprocessor.class.getName()); 102 TEST_UTIL.startMiniCluster(2); 103 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 104 105 try { 106 getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() 107 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 108 fail("We expect an exception here"); 109 } catch (Exception e) { 110 // expected 111 } 112 113 try { 114 getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() 115 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 116 } catch (Exception e) { 117 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 118 } 119 } 120 121 @TestTemplate 122 public void testOperationTimeout() throws Exception { 123 // set retry number to 100 to make sure that this test only be affected by operation timeout 124 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); 125 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 126 TestOperationTimeoutCoprocessor.class.getName()); 127 TEST_UTIL.startMiniCluster(2); 128 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 129 130 try { 131 getAdminBuilder.get() 132 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() 133 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 134 fail("We expect an exception here"); 135 } catch (Exception e) { 136 // expected 137 } 138 139 try { 140 getAdminBuilder.get() 141 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() 142 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 143 } catch (Exception e) { 144 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 145 } 146 } 147 148 @TestTemplate 149 public void testMaxRetries() throws Exception { 150 // set operation timeout to 300s to make sure that this test only be affected by retry number 151 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000); 152 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 153 TestMaxRetriesCoprocessor.class.getName()); 154 TEST_UTIL.startMiniCluster(2); 155 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 156 157 try { 158 getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build() 159 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 160 fail("We expect an exception here"); 161 } catch (Exception e) { 162 // expected 163 } 164 165 try { 166 getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build() 167 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 168 } catch (Exception e) { 169 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 170 } 171 } 172 173 public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver { 174 public TestRpcTimeoutCoprocessor() { 175 } 176 177 @Override 178 public Optional<MasterObserver> getMasterObserver() { 179 return Optional.of(this); 180 } 181 182 @Override 183 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 184 String namespace) throws IOException { 185 Threads.sleep(DEFAULT_RPC_TIMEOUT); 186 } 187 } 188 189 public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver { 190 AtomicLong sleepTime = new AtomicLong(0); 191 192 public TestOperationTimeoutCoprocessor() { 193 } 194 195 @Override 196 public Optional<MasterObserver> getMasterObserver() { 197 return Optional.of(this); 198 } 199 200 @Override 201 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 202 String namespace) throws IOException { 203 Threads.sleep(DEFAULT_RPC_TIMEOUT / 2); 204 if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) { 205 throw new IOException("call fail"); 206 } 207 } 208 } 209 210 public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver { 211 AtomicLong retryNum = new AtomicLong(0); 212 213 public TestMaxRetriesCoprocessor() { 214 } 215 216 @Override 217 public Optional<MasterObserver> getMasterObserver() { 218 return Optional.of(this); 219 } 220 221 @Override 222 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 223 String namespace) throws IOException { 224 if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) { 225 throw new IOException("call fail"); 226 } 227 } 228 } 229}