001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
036import org.apache.hadoop.hbase.coprocessor.RegionObserver;
037import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.Threads;
042import org.junit.jupiter.api.AfterAll;
043import org.junit.jupiter.api.BeforeAll;
044import org.junit.jupiter.api.BeforeEach;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047import org.junit.jupiter.api.TestInfo;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050
051@Tag(MediumTests.TAG)
052@Tag(ClientTests.TAG)
053public class TestAsyncTableNoncedRetry {
054
055  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056
057  private static final TableName TABLE_NAME = TableName.valueOf("async");
058
059  private static final byte[] FAMILY = Bytes.toBytes("cf");
060
061  private static final byte[] QUALIFIER = Bytes.toBytes("cq");
062
063  private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
064
065  private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
066
067  private static final byte[] VALUE = Bytes.toBytes("value");
068
069  private static AsyncConnection ASYNC_CONN;
070
071  private byte[] row;
072
073  private static final AtomicInteger CALLED = new AtomicInteger();
074
075  private static final long SLEEP_TIME = 2000;
076
077  private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
078
079  // The number of miniBatchOperations that are executed in a RegionServer
080  private static int miniBatchOperationCount;
081
082  public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
083
084    @Override
085    public Optional<RegionObserver> getRegionObserver() {
086      return Optional.of(this);
087    }
088
089    @Override
090    public void postBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c,
091      MiniBatchOperationInProgress<Mutation> miniBatchOp) {
092      // We sleep when the last of the miniBatchOperation is executed
093      if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
094        Threads.sleepWithoutInterrupt(SLEEP_TIME);
095      }
096    }
097  }
098
099  @BeforeAll
100  public static void setUpBeforeClass() throws Exception {
101    TEST_UTIL.startMiniCluster(1);
102    TEST_UTIL.getAdmin()
103      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
104        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
105        .setCoprocessor(SleepOnceCP.class.getName()).build());
106    TEST_UTIL.waitTableAvailable(TABLE_NAME);
107    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
108  }
109
110  @AfterAll
111  public static void tearDownAfterClass() throws Exception {
112    Closeables.close(ASYNC_CONN, true);
113    TEST_UTIL.shutdownMiniCluster();
114  }
115
116  @BeforeEach
117  public void setUp(TestInfo info) throws IOException, InterruptedException {
118    row = Bytes.toBytes(info.getTestMethod().get().getName().replaceAll("[^0-9A-Za-z]", "_"));
119    CALLED.set(0);
120  }
121
122  @Test
123  public void testAppend() throws InterruptedException, ExecutionException {
124    assertEquals(0, CALLED.get());
125    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
126      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
127
128    miniBatchOperationCount = 1;
129    Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
130
131    // make sure we called twice and the result is still correct
132    assertEquals(2, CALLED.get());
133    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
134  }
135
136  @Test
137  public void testAppendWhenReturnResultsEqualsFalse()
138    throws InterruptedException, ExecutionException {
139    assertEquals(0, CALLED.get());
140    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
141      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
142
143    miniBatchOperationCount = 1;
144    Result result = table
145      .append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE).setReturnResults(false)).get();
146
147    // make sure we called twice and the result is still correct
148    assertEquals(2, CALLED.get());
149    assertTrue(result.isEmpty());
150  }
151
152  @Test
153  public void testIncrement() throws InterruptedException, ExecutionException {
154    assertEquals(0, CALLED.get());
155    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
156      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
157
158    miniBatchOperationCount = 1;
159    long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get();
160
161    // make sure we called twice and the result is still correct
162    assertEquals(2, CALLED.get());
163    assertEquals(1L, result);
164  }
165
166  @Test
167  public void testIncrementWhenReturnResultsEqualsFalse()
168    throws InterruptedException, ExecutionException {
169    assertEquals(0, CALLED.get());
170    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
171      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
172
173    miniBatchOperationCount = 1;
174    Result result = table
175      .increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L).setReturnResults(false)).get();
176
177    // make sure we called twice and the result is still correct
178    assertEquals(2, CALLED.get());
179    assertTrue(result.isEmpty());
180  }
181
182  @Test
183  public void testIncrementInRowMutations()
184    throws InterruptedException, ExecutionException, IOException {
185    assertEquals(0, CALLED.get());
186    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
187      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
188
189    miniBatchOperationCount = 1;
190    Result result =
191      table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
192        .add(new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
193
194    // make sure we called twice and the result is still correct
195    assertEquals(2, CALLED.get());
196    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
197  }
198
199  @Test
200  public void testAppendInRowMutations()
201    throws InterruptedException, ExecutionException, IOException {
202    assertEquals(0, CALLED.get());
203    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
204      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
205
206    miniBatchOperationCount = 1;
207    Result result =
208      table.mutateRow(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
209        .add(new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
210
211    // make sure we called twice and the result is still correct
212    assertEquals(2, CALLED.get());
213    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
214  }
215
216  @Test
217  public void testIncrementAndAppendInRowMutations()
218    throws InterruptedException, ExecutionException, IOException {
219    assertEquals(0, CALLED.get());
220    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
221      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
222
223    miniBatchOperationCount = 1;
224    Result result =
225      table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
226        .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get();
227
228    // make sure we called twice and the result is still correct
229    assertEquals(2, CALLED.get());
230    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
231    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
232  }
233
234  @Test
235  public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException {
236    assertEquals(0, CALLED.get());
237    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
238      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
239
240    miniBatchOperationCount = 1;
241    CheckAndMutateResult result =
242      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2)
243        .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get();
244
245    // make sure we called twice and the result is still correct
246    assertEquals(2, CALLED.get());
247    assertTrue(result.isSuccess());
248    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
249  }
250
251  @Test
252  public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException {
253    assertEquals(0, CALLED.get());
254    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
255      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
256
257    miniBatchOperationCount = 1;
258    CheckAndMutateResult result =
259      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2)
260        .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get();
261
262    // make sure we called twice and the result is still correct
263    assertEquals(2, CALLED.get());
264    assertTrue(result.isSuccess());
265    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
266  }
267
268  @Test
269  public void testIncrementInRowMutationsInCheckAndMutate()
270    throws InterruptedException, ExecutionException, IOException {
271    assertEquals(0, CALLED.get());
272    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
273      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
274
275    miniBatchOperationCount = 1;
276    CheckAndMutateResult result =
277      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3)
278        .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
279          .add(new Delete(row).addColumn(FAMILY, QUALIFIER2))))
280        .get();
281
282    // make sure we called twice and the result is still correct
283    assertEquals(2, CALLED.get());
284    assertTrue(result.isSuccess());
285    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
286  }
287
288  @Test
289  public void testAppendInRowMutationsInCheckAndMutate()
290    throws InterruptedException, ExecutionException, IOException {
291    assertEquals(0, CALLED.get());
292    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
293      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
294
295    miniBatchOperationCount = 1;
296    CheckAndMutateResult result =
297      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3)
298        .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
299          .add(new Delete(row).addColumn(FAMILY, QUALIFIER2))))
300        .get();
301
302    // make sure we called twice and the result is still correct
303    assertEquals(2, CALLED.get());
304    assertTrue(result.isSuccess());
305    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
306  }
307
308  @Test
309  public void testIncrementAndAppendInRowMutationsInCheckAndMutate()
310    throws InterruptedException, ExecutionException, IOException {
311    assertEquals(0, CALLED.get());
312    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
313      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
314
315    miniBatchOperationCount = 1;
316    CheckAndMutateResult result =
317      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3)
318        .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
319          .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))))
320        .get();
321
322    // make sure we called twice and the result is still correct
323    assertEquals(2, CALLED.get());
324    assertTrue(result.isSuccess());
325    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
326    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
327  }
328
329  @Test
330  public void testBatch() throws InterruptedException, ExecutionException, IOException {
331    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
332    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
333    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
334    byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
335    byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
336
337    assertEquals(0, CALLED.get());
338
339    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
340      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
341
342    miniBatchOperationCount = 6;
343    List<Object> results =
344      table.batchAll(Arrays.asList(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
345        new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
346        new RowMutations(row3).add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
347          .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
348        CheckAndMutate.newBuilder(row4).ifNotExists(FAMILY, QUALIFIER2)
349          .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
350        CheckAndMutate.newBuilder(row5).ifNotExists(FAMILY, QUALIFIER2)
351          .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
352        CheckAndMutate.newBuilder(row6).ifNotExists(FAMILY, QUALIFIER3)
353          .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
354            .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE)))))
355        .get();
356
357    // make sure we called twice and the result is still correct
358
359    // should be called 12 times as 6 miniBatchOperations are called twice
360    assertEquals(12, CALLED.get());
361
362    assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER));
363
364    assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER)));
365
366    assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER)));
367    assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2));
368
369    CheckAndMutateResult result;
370
371    result = (CheckAndMutateResult) results.get(3);
372    assertTrue(result.isSuccess());
373    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
374
375    result = (CheckAndMutateResult) results.get(4);
376    assertTrue(result.isSuccess());
377    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
378
379    result = (CheckAndMutateResult) results.get(5);
380    assertTrue(result.isSuccess());
381    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
382    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
383  }
384}