001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.coprocessor.ObserverContext; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 036import org.apache.hadoop.hbase.coprocessor.RegionObserver; 037import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.Threads; 042import org.junit.jupiter.api.AfterAll; 043import org.junit.jupiter.api.BeforeAll; 044import org.junit.jupiter.api.BeforeEach; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047import org.junit.jupiter.api.TestInfo; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050 051@Tag(MediumTests.TAG) 052@Tag(ClientTests.TAG) 053public class TestAsyncTableNoncedRetry { 054 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 057 private static final TableName TABLE_NAME = TableName.valueOf("async"); 058 059 private static final byte[] FAMILY = Bytes.toBytes("cf"); 060 061 private static final byte[] QUALIFIER = Bytes.toBytes("cq"); 062 063 private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2"); 064 065 private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3"); 066 067 private static final byte[] VALUE = Bytes.toBytes("value"); 068 069 private static AsyncConnection ASYNC_CONN; 070 071 private byte[] row; 072 073 private static final AtomicInteger CALLED = new AtomicInteger(); 074 075 private static final long SLEEP_TIME = 2000; 076 077 private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time 078 079 // The number of miniBatchOperations that are executed in a RegionServer 080 private static int miniBatchOperationCount; 081 082 public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { 083 084 @Override 085 public Optional<RegionObserver> getRegionObserver() { 086 return Optional.of(this); 087 } 088 089 @Override 090 public void postBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 091 MiniBatchOperationInProgress<Mutation> miniBatchOp) { 092 // We sleep when the last of the miniBatchOperation is executed 093 if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) { 094 Threads.sleepWithoutInterrupt(SLEEP_TIME); 095 } 096 } 097 } 098 099 @BeforeAll 100 public static void setUpBeforeClass() throws Exception { 101 TEST_UTIL.startMiniCluster(1); 102 TEST_UTIL.getAdmin() 103 .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 104 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 105 .setCoprocessor(SleepOnceCP.class.getName()).build()); 106 TEST_UTIL.waitTableAvailable(TABLE_NAME); 107 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 108 } 109 110 @AfterAll 111 public static void tearDownAfterClass() throws Exception { 112 Closeables.close(ASYNC_CONN, true); 113 TEST_UTIL.shutdownMiniCluster(); 114 } 115 116 @BeforeEach 117 public void setUp(TestInfo info) throws IOException, InterruptedException { 118 row = Bytes.toBytes(info.getTestMethod().get().getName().replaceAll("[^0-9A-Za-z]", "_")); 119 CALLED.set(0); 120 } 121 122 @Test 123 public void testAppend() throws InterruptedException, ExecutionException { 124 assertEquals(0, CALLED.get()); 125 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 126 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 127 128 miniBatchOperationCount = 1; 129 Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 130 131 // make sure we called twice and the result is still correct 132 assertEquals(2, CALLED.get()); 133 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 134 } 135 136 @Test 137 public void testAppendWhenReturnResultsEqualsFalse() 138 throws InterruptedException, ExecutionException { 139 assertEquals(0, CALLED.get()); 140 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 141 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 142 143 miniBatchOperationCount = 1; 144 Result result = table 145 .append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE).setReturnResults(false)).get(); 146 147 // make sure we called twice and the result is still correct 148 assertEquals(2, CALLED.get()); 149 assertTrue(result.isEmpty()); 150 } 151 152 @Test 153 public void testIncrement() throws InterruptedException, ExecutionException { 154 assertEquals(0, CALLED.get()); 155 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 156 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 157 158 miniBatchOperationCount = 1; 159 long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get(); 160 161 // make sure we called twice and the result is still correct 162 assertEquals(2, CALLED.get()); 163 assertEquals(1L, result); 164 } 165 166 @Test 167 public void testIncrementWhenReturnResultsEqualsFalse() 168 throws InterruptedException, ExecutionException { 169 assertEquals(0, CALLED.get()); 170 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 171 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 172 173 miniBatchOperationCount = 1; 174 Result result = table 175 .increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L).setReturnResults(false)).get(); 176 177 // make sure we called twice and the result is still correct 178 assertEquals(2, CALLED.get()); 179 assertTrue(result.isEmpty()); 180 } 181 182 @Test 183 public void testIncrementInRowMutations() 184 throws InterruptedException, ExecutionException, IOException { 185 assertEquals(0, CALLED.get()); 186 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 187 .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 188 189 miniBatchOperationCount = 1; 190 Result result = 191 table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 192 .add(new Delete(row).addColumn(FAMILY, QUALIFIER2))).get(); 193 194 // make sure we called twice and the result is still correct 195 assertEquals(2, CALLED.get()); 196 assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); 197 } 198 199 @Test 200 public void testAppendInRowMutations() 201 throws InterruptedException, ExecutionException, IOException { 202 assertEquals(0, CALLED.get()); 203 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 204 .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 205 206 miniBatchOperationCount = 1; 207 Result result = 208 table.mutateRow(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) 209 .add(new Delete(row).addColumn(FAMILY, QUALIFIER2))).get(); 210 211 // make sure we called twice and the result is still correct 212 assertEquals(2, CALLED.get()); 213 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 214 } 215 216 @Test 217 public void testIncrementAndAppendInRowMutations() 218 throws InterruptedException, ExecutionException, IOException { 219 assertEquals(0, CALLED.get()); 220 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 221 .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 222 223 miniBatchOperationCount = 1; 224 Result result = 225 table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 226 .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get(); 227 228 // make sure we called twice and the result is still correct 229 assertEquals(2, CALLED.get()); 230 assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); 231 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2)); 232 } 233 234 @Test 235 public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException { 236 assertEquals(0, CALLED.get()); 237 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 238 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 239 240 miniBatchOperationCount = 1; 241 CheckAndMutateResult result = 242 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2) 243 .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get(); 244 245 // make sure we called twice and the result is still correct 246 assertEquals(2, CALLED.get()); 247 assertTrue(result.isSuccess()); 248 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 249 } 250 251 @Test 252 public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException { 253 assertEquals(0, CALLED.get()); 254 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 255 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 256 257 miniBatchOperationCount = 1; 258 CheckAndMutateResult result = 259 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2) 260 .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get(); 261 262 // make sure we called twice and the result is still correct 263 assertEquals(2, CALLED.get()); 264 assertTrue(result.isSuccess()); 265 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); 266 } 267 268 @Test 269 public void testIncrementInRowMutationsInCheckAndMutate() 270 throws InterruptedException, ExecutionException, IOException { 271 assertEquals(0, CALLED.get()); 272 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 273 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 274 275 miniBatchOperationCount = 1; 276 CheckAndMutateResult result = 277 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3) 278 .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 279 .add(new Delete(row).addColumn(FAMILY, QUALIFIER2)))) 280 .get(); 281 282 // make sure we called twice and the result is still correct 283 assertEquals(2, CALLED.get()); 284 assertTrue(result.isSuccess()); 285 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 286 } 287 288 @Test 289 public void testAppendInRowMutationsInCheckAndMutate() 290 throws InterruptedException, ExecutionException, IOException { 291 assertEquals(0, CALLED.get()); 292 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 293 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 294 295 miniBatchOperationCount = 1; 296 CheckAndMutateResult result = 297 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3) 298 .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) 299 .add(new Delete(row).addColumn(FAMILY, QUALIFIER2)))) 300 .get(); 301 302 // make sure we called twice and the result is still correct 303 assertEquals(2, CALLED.get()); 304 assertTrue(result.isSuccess()); 305 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); 306 } 307 308 @Test 309 public void testIncrementAndAppendInRowMutationsInCheckAndMutate() 310 throws InterruptedException, ExecutionException, IOException { 311 assertEquals(0, CALLED.get()); 312 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 313 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 314 315 miniBatchOperationCount = 1; 316 CheckAndMutateResult result = 317 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3) 318 .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 319 .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))) 320 .get(); 321 322 // make sure we called twice and the result is still correct 323 assertEquals(2, CALLED.get()); 324 assertTrue(result.isSuccess()); 325 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 326 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); 327 } 328 329 @Test 330 public void testBatch() throws InterruptedException, ExecutionException, IOException { 331 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 332 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 333 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 334 byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5"); 335 byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6"); 336 337 assertEquals(0, CALLED.get()); 338 339 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 340 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 341 342 miniBatchOperationCount = 6; 343 List<Object> results = 344 table.batchAll(Arrays.asList(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE), 345 new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L), 346 new RowMutations(row3).add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L)) 347 .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)), 348 CheckAndMutate.newBuilder(row4).ifNotExists(FAMILY, QUALIFIER2) 349 .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)), 350 CheckAndMutate.newBuilder(row5).ifNotExists(FAMILY, QUALIFIER2) 351 .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)), 352 CheckAndMutate.newBuilder(row6).ifNotExists(FAMILY, QUALIFIER3) 353 .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L)) 354 .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))) 355 .get(); 356 357 // make sure we called twice and the result is still correct 358 359 // should be called 12 times as 6 miniBatchOperations are called twice 360 assertEquals(12, CALLED.get()); 361 362 assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER)); 363 364 assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER))); 365 366 assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER))); 367 assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2)); 368 369 CheckAndMutateResult result; 370 371 result = (CheckAndMutateResult) results.get(3); 372 assertTrue(result.isSuccess()); 373 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 374 375 result = (CheckAndMutateResult) results.get(4); 376 assertTrue(result.isSuccess()); 377 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); 378 379 result = (CheckAndMutateResult) results.get(5); 380 assertTrue(result.isSuccess()); 381 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 382 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); 383 } 384}