001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Threads;
043import org.junit.AfterClass;
044import org.junit.Before;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051
052import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
053
054@Category({ MediumTests.class, ClientTests.class })
055public class TestAsyncTableNoncedRetry {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class);
060
061  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062
063  private static final TableName TABLE_NAME = TableName.valueOf("async");
064
065  private static final byte[] FAMILY = Bytes.toBytes("cf");
066
067  private static final byte[] QUALIFIER = Bytes.toBytes("cq");
068
069  private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
070
071  private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
072
073  private static final byte[] VALUE = Bytes.toBytes("value");
074
075  private static AsyncConnection ASYNC_CONN;
076
077  @Rule
078  public TestName testName = new TestName();
079
080  private byte[] row;
081
082  private static final AtomicInteger CALLED = new AtomicInteger();
083
084  private static final long SLEEP_TIME = 2000;
085
086  private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
087
088  // The number of miniBatchOperations that are executed in a RegionServer
089  private static int miniBatchOperationCount;
090
091  public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
092
093    @Override
094    public Optional<RegionObserver> getRegionObserver() {
095      return Optional.of(this);
096    }
097
098    @Override
099    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
100      MiniBatchOperationInProgress<Mutation> miniBatchOp) {
101      // We sleep when the last of the miniBatchOperations is executed
102      if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
103        Threads.sleepWithoutInterrupt(SLEEP_TIME);
104      }
105    }
106  }
107
108  @BeforeClass
109  public static void setUpBeforeClass() throws Exception {
110    TEST_UTIL.startMiniCluster(1);
111    TEST_UTIL.getAdmin()
112      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
113        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
114        .setCoprocessor(SleepOnceCP.class.getName()).build());
115    TEST_UTIL.waitTableAvailable(TABLE_NAME);
116    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
117  }
118
119  @AfterClass
120  public static void tearDownAfterClass() throws Exception {
121    Closeables.close(ASYNC_CONN, true);
122    TEST_UTIL.shutdownMiniCluster();
123  }
124
125  @Before
126  public void setUp() throws IOException, InterruptedException {
127    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
128    CALLED.set(0);
129  }
130
131  @Test
132  public void testAppend() throws InterruptedException, ExecutionException {
133    assertEquals(0, CALLED.get());
134    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
135      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
136
137    miniBatchOperationCount = 1;
138    Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
139
140    // make sure we called twice and the result is still correct
141    assertEquals(2, CALLED.get());
142    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
143  }
144
145  @Test
146  public void testAppendWhenReturnResultsEqualsFalse()
147    throws InterruptedException, ExecutionException {
148    assertEquals(0, CALLED.get());
149    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
150      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
151
152    miniBatchOperationCount = 1;
153    Result result = table
154      .append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE).setReturnResults(false)).get();
155
156    // make sure we called twice and the result is still correct
157    assertEquals(2, CALLED.get());
158    assertTrue(result.isEmpty());
159  }
160
161  @Test
162  public void testIncrement() throws InterruptedException, ExecutionException {
163    assertEquals(0, CALLED.get());
164    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
165      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
166
167    miniBatchOperationCount = 1;
168    long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get();
169
170    // make sure we called twice and the result is still correct
171    assertEquals(2, CALLED.get());
172    assertEquals(1L, result);
173  }
174
175  @Test
176  public void testIncrementWhenReturnResultsEqualsFalse()
177    throws InterruptedException, ExecutionException {
178    assertEquals(0, CALLED.get());
179    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
180      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
181
182    miniBatchOperationCount = 1;
183    Result result = table
184      .increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L).setReturnResults(false)).get();
185
186    // make sure we called twice and the result is still correct
187    assertEquals(2, CALLED.get());
188    assertTrue(result.isEmpty());
189  }
190
191  @Test
192  public void testIncrementInRowMutations()
193    throws InterruptedException, ExecutionException, IOException {
194    assertEquals(0, CALLED.get());
195    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
196      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
197
198    miniBatchOperationCount = 1;
199    Result result =
200      table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
201        .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
202
203    // make sure we called twice and the result is still correct
204    assertEquals(2, CALLED.get());
205    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
206  }
207
208  @Test
209  public void testAppendInRowMutations()
210    throws InterruptedException, ExecutionException, IOException {
211    assertEquals(0, CALLED.get());
212    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
213      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
214
215    miniBatchOperationCount = 1;
216    Result result =
217      table.mutateRow(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
218        .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
219
220    // make sure we called twice and the result is still correct
221    assertEquals(2, CALLED.get());
222    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
223  }
224
225  @Test
226  public void testIncrementAndAppendInRowMutations()
227    throws InterruptedException, ExecutionException, IOException {
228    assertEquals(0, CALLED.get());
229    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
230      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
231
232    miniBatchOperationCount = 1;
233    Result result =
234      table.mutateRow(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
235        .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get();
236
237    // make sure we called twice and the result is still correct
238    assertEquals(2, CALLED.get());
239    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
240    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
241  }
242
243  @Test
244  public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException {
245    assertEquals(0, CALLED.get());
246    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
247      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
248
249    miniBatchOperationCount = 1;
250    CheckAndMutateResult result =
251      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2)
252        .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get();
253
254    // make sure we called twice and the result is still correct
255    assertEquals(2, CALLED.get());
256    assertTrue(result.isSuccess());
257    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
258  }
259
260  @Test
261  public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException {
262    assertEquals(0, CALLED.get());
263    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
264      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
265
266    miniBatchOperationCount = 1;
267    CheckAndMutateResult result =
268      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER2)
269        .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get();
270
271    // make sure we called twice and the result is still correct
272    assertEquals(2, CALLED.get());
273    assertTrue(result.isSuccess());
274    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
275  }
276
277  @Test
278  public void testIncrementInRowMutationsInCheckAndMutate()
279    throws InterruptedException, ExecutionException, IOException {
280    assertEquals(0, CALLED.get());
281    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
282      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
283
284    miniBatchOperationCount = 1;
285    CheckAndMutateResult result =
286      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3)
287        .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
288          .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))))
289        .get();
290
291    // make sure we called twice and the result is still correct
292    assertEquals(2, CALLED.get());
293    assertTrue(result.isSuccess());
294    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
295  }
296
297  @Test
298  public void testAppendInRowMutationsInCheckAndMutate()
299    throws InterruptedException, ExecutionException, IOException {
300    assertEquals(0, CALLED.get());
301    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
302      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
303
304    miniBatchOperationCount = 1;
305    CheckAndMutateResult result =
306      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3)
307        .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
308          .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))))
309        .get();
310
311    // make sure we called twice and the result is still correct
312    assertEquals(2, CALLED.get());
313    assertTrue(result.isSuccess());
314    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
315  }
316
317  @Test
318  public void testIncrementAndAppendInRowMutationsInCheckAndMutate()
319    throws InterruptedException, ExecutionException, IOException {
320    assertEquals(0, CALLED.get());
321    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
322      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
323
324    miniBatchOperationCount = 1;
325    CheckAndMutateResult result =
326      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER3)
327        .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
328          .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))))
329        .get();
330
331    // make sure we called twice and the result is still correct
332    assertEquals(2, CALLED.get());
333    assertTrue(result.isSuccess());
334    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
335    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
336  }
337
338  @Test
339  public void testBatch() throws InterruptedException, ExecutionException, IOException {
340    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
341    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
342    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
343    byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
344    byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
345
346    assertEquals(0, CALLED.get());
347
348    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
349      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
350
351    miniBatchOperationCount = 6;
352    List<Object> results =
353      table.batchAll(Arrays.asList(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
354        new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
355        new RowMutations(row3).add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
356          .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
357        CheckAndMutate.newBuilder(row4).ifNotExists(FAMILY, QUALIFIER2)
358          .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
359        CheckAndMutate.newBuilder(row5).ifNotExists(FAMILY, QUALIFIER2)
360          .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
361        CheckAndMutate.newBuilder(row6).ifNotExists(FAMILY, QUALIFIER3)
362          .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
363            .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE)))))
364        .get();
365
366    // make sure we called twice and the result is still correct
367
368    // should be called 12 times as 6 miniBatchOperations are called twice
369    assertEquals(12, CALLED.get());
370
371    assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER));
372
373    assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER)));
374
375    assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER)));
376    assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2));
377
378    CheckAndMutateResult result;
379
380    result = (CheckAndMutateResult) results.get(3);
381    assertTrue(result.isSuccess());
382    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
383
384    result = (CheckAndMutateResult) results.get(4);
385    assertTrue(result.isSuccess());
386    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
387
388    result = (CheckAndMutateResult) results.get(5);
389    assertTrue(result.isSuccess());
390    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
391    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
392  }
393}