001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.coprocessor.ObserverContext; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.RegionObserver; 038import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Threads; 043import org.junit.AfterClass; 044import org.junit.Before; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Rule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.rules.TestName; 051 052import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 053 054@Category({ MediumTests.class, ClientTests.class }) 055public class TestAsyncTableNoncedRetry { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class); 060 061 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 063 private static final TableName TABLE_NAME = TableName.valueOf("async"); 064 065 private static final byte[] FAMILY = Bytes.toBytes("cf"); 066 067 private static final byte[] QUALIFIER = Bytes.toBytes("cq"); 068 069 private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2"); 070 071 private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3"); 072 073 private static final byte[] VALUE = Bytes.toBytes("value"); 074 075 private static AsyncConnection ASYNC_CONN; 076 077 @Rule 078 public TestName testName = new TestName(); 079 080 private byte[] row; 081 082 private static final AtomicInteger CALLED = new AtomicInteger(); 083 084 private static final long SLEEP_TIME = 2000; 085 086 private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time 087 088 // The number of miniBatchOperations that are executed in a RegionServer 089 private static int miniBatchOperationCount; 090 091 public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { 092 093 @Override 094 public Optional<RegionObserver> getRegionObserver() { 095 return Optional.of(this); 096 } 097 098 @Override 099 public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 100 MiniBatchOperationInProgress<Mutation> miniBatchOp) { 101 // We sleep when the last of the miniBatchOperations is executed 102 if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) { 103 Threads.sleepWithoutInterrupt(SLEEP_TIME); 104 } 105 } 106 } 107 108 @BeforeClass 109 public static void setUpBeforeClass() throws Exception { 110 TEST_UTIL.startMiniCluster(1); 111 TEST_UTIL.getAdmin() 112 .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 113 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 114 .setCoprocessor(SleepOnceCP.class.getName()).build()); 115 TEST_UTIL.waitTableAvailable(TABLE_NAME); 116 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 117 } 118 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 Closeables.close(ASYNC_CONN, true); 122 TEST_UTIL.shutdownMiniCluster(); 123 } 124 125 @Before 126 public void setUp() throws IOException, InterruptedException { 127 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 128 CALLED.set(0); 129 } 130 131 @Test 132 public void testAppend() throws InterruptedException, ExecutionException { 133 assertEquals(0, CALLED.get()); 134 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 135 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 136 137 miniBatchOperationCount = 1; 138 Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 139 140 // make sure we called twice and the result is still correct 141 assertEquals(2, CALLED.get()); 142 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 143 } 144 145 @Test 146 public void testAppendWhenReturnResultsEqualsFalse() 147 throws InterruptedException, ExecutionException { 148 assertEquals(0, CALLED.get()); 149 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 150 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 151 152 miniBatchOperationCount = 1; 153 Result result = table 154 .append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE).setReturnResults(false)).get(); 155 156 // make sure we called twice and the result is still correct 157 assertEquals(2, CALLED.get()); 158 assertTrue(result.isEmpty()); 159 } 160 161 @Test 162 public void testIncrement() throws InterruptedException, ExecutionException { 163 assertEquals(0, CALLED.get()); 164 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 165 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 166 167 miniBatchOperationCount = 1; 168 long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get(); 169 170 // make sure we called twice and the result is still correct 171 assertEquals(2, CALLED.get()); 172 assertEquals(1L, result); 173 } 174 175 @Test 176 public void testIncrementWhenReturnResultsEqualsFalse() 177 throws InterruptedException, ExecutionException { 178 assertEquals(0, CALLED.get()); 179 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 180 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 181 182 miniBatchOperationCount = 1; 183 Result result = table 184 .increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L).setReturnResults(false)).get(); 185 186 // make sure we called twice and the result is still correct 187 assertEquals(2, CALLED.get()); 188 assertTrue(result.isEmpty()); 189 } 190 191 @Test 192 public void testIncrementInRowMutations() 193 throws InterruptedException, ExecutionException, IOException { 194 assertEquals(0, CALLED.get()); 195 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 196 .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 197 198 miniBatchOperationCount = 1; 199 Result result = 200 table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 201 .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get(); 202 203 // make sure we called twice and the result is still correct 204 assertEquals(2, CALLED.get()); 205 assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); 206 } 207 208 @Test 209 public void testAppendInRowMutations() 210 throws InterruptedException, ExecutionException, IOException { 211 assertEquals(0, CALLED.get()); 212 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 213 .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 214 215 miniBatchOperationCount = 1; 216 Result result = 217 table.mutateRow(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) 218 .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get(); 219 220 // make sure we called twice and the result is still correct 221 assertEquals(2, CALLED.get()); 222 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 223 } 224 225 @Test 226 public void testIncrementAndAppendInRowMutations() 227 throws InterruptedException, ExecutionException, IOException { 228 assertEquals(0, CALLED.get()); 229 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 230 .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 231 232 miniBatchOperationCount = 1; 233 Result result = 234 table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 235 .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get(); 236 237 // make sure we called twice and the result is still correct 238 assertEquals(2, CALLED.get()); 239 assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); 240 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2)); 241 } 242 243 @Test 244 public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException { 245 assertEquals(0, CALLED.get()); 246 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 247 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 248 249 miniBatchOperationCount = 1; 250 CheckAndMutateResult result = 251 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2) 252 .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get(); 253 254 // make sure we called twice and the result is still correct 255 assertEquals(2, CALLED.get()); 256 assertTrue(result.isSuccess()); 257 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 258 } 259 260 @Test 261 public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException { 262 assertEquals(0, CALLED.get()); 263 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 264 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 265 266 miniBatchOperationCount = 1; 267 CheckAndMutateResult result = 268 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2) 269 .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get(); 270 271 // make sure we called twice and the result is still correct 272 assertEquals(2, CALLED.get()); 273 assertTrue(result.isSuccess()); 274 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); 275 } 276 277 @Test 278 public void testIncrementInRowMutationsInCheckAndMutate() 279 throws InterruptedException, ExecutionException, IOException { 280 assertEquals(0, CALLED.get()); 281 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 282 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 283 284 miniBatchOperationCount = 1; 285 CheckAndMutateResult result = 286 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3) 287 .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 288 .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))) 289 .get(); 290 291 // make sure we called twice and the result is still correct 292 assertEquals(2, CALLED.get()); 293 assertTrue(result.isSuccess()); 294 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 295 } 296 297 @Test 298 public void testAppendInRowMutationsInCheckAndMutate() 299 throws InterruptedException, ExecutionException, IOException { 300 assertEquals(0, CALLED.get()); 301 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 302 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 303 304 miniBatchOperationCount = 1; 305 CheckAndMutateResult result = 306 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3) 307 .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) 308 .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))) 309 .get(); 310 311 // make sure we called twice and the result is still correct 312 assertEquals(2, CALLED.get()); 313 assertTrue(result.isSuccess()); 314 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); 315 } 316 317 @Test 318 public void testIncrementAndAppendInRowMutationsInCheckAndMutate() 319 throws InterruptedException, ExecutionException, IOException { 320 assertEquals(0, CALLED.get()); 321 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 322 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 323 324 miniBatchOperationCount = 1; 325 CheckAndMutateResult result = 326 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3) 327 .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) 328 .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))) 329 .get(); 330 331 // make sure we called twice and the result is still correct 332 assertEquals(2, CALLED.get()); 333 assertTrue(result.isSuccess()); 334 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 335 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); 336 } 337 338 @Test 339 public void testBatch() throws InterruptedException, ExecutionException, IOException { 340 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 341 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 342 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 343 byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5"); 344 byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6"); 345 346 assertEquals(0, CALLED.get()); 347 348 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 349 .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); 350 351 miniBatchOperationCount = 6; 352 List<Object> results = 353 table.batchAll(Arrays.asList(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE), 354 new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L), 355 new RowMutations(row3).add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L)) 356 .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)), 357 CheckAndMutate.newBuilder(row4).ifNotExists(FAMILY, QUALIFIER2) 358 .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)), 359 CheckAndMutate.newBuilder(row5).ifNotExists(FAMILY, QUALIFIER2) 360 .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)), 361 CheckAndMutate.newBuilder(row6).ifNotExists(FAMILY, QUALIFIER3) 362 .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L)) 363 .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))) 364 .get(); 365 366 // make sure we called twice and the result is still correct 367 368 // should be called 12 times as 6 miniBatchOperations are called twice 369 assertEquals(12, CALLED.get()); 370 371 assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER)); 372 373 assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER))); 374 375 assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER))); 376 assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2)); 377 378 CheckAndMutateResult result; 379 380 result = (CheckAndMutateResult) results.get(3); 381 assertTrue(result.isSuccess()); 382 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 383 384 result = (CheckAndMutateResult) results.get(4); 385 assertTrue(result.isSuccess()); 386 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); 387 388 result = (CheckAndMutateResult) results.get(5); 389 assertTrue(result.isSuccess()); 390 assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); 391 assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); 392 } 393}