001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Optional; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.commons.io.IOUtils; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.coprocessor.ObserverContext; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 036import org.apache.hadoop.hbase.coprocessor.RegionObserver; 037import org.apache.hadoop.hbase.testclassification.ClientTests; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.Threads; 041import org.junit.AfterClass; 042import org.junit.Before; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049 050@Category({ MediumTests.class, ClientTests.class }) 051public class TestAsyncTableNoncedRetry { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class); 056 057 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 058 059 private static TableName TABLE_NAME = TableName.valueOf("async"); 060 061 private static byte[] FAMILY = Bytes.toBytes("cf"); 062 063 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 064 065 private static byte[] VALUE = Bytes.toBytes("value"); 066 067 private static AsyncConnection ASYNC_CONN; 068 069 @Rule 070 public TestName testName = new TestName(); 071 072 private byte[] row; 073 074 private static AtomicInteger CALLED = new AtomicInteger(); 075 076 private static long SLEEP_TIME = 2000; 077 078 public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { 079 080 @Override 081 public Optional<RegionObserver> getRegionObserver() { 082 return Optional.of(this); 083 } 084 085 @Override 086 public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append, 087 Result result) throws IOException { 088 if (CALLED.getAndIncrement() == 0) { 089 Threads.sleepWithoutInterrupt(SLEEP_TIME); 090 } 091 return RegionObserver.super.postAppend(c, append, result); 092 } 093 094 @Override 095 public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c, 096 Increment increment, Result result) throws IOException { 097 if (CALLED.getAndIncrement() == 0) { 098 Threads.sleepWithoutInterrupt(SLEEP_TIME); 099 } 100 return RegionObserver.super.postIncrement(c, increment, result); 101 } 102 } 103 104 @BeforeClass 105 public static void setUpBeforeClass() throws Exception { 106 TEST_UTIL.startMiniCluster(1); 107 TEST_UTIL.getAdmin() 108 .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 109 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 110 .setCoprocessor(SleepOnceCP.class.getName()).build()); 111 TEST_UTIL.waitTableAvailable(TABLE_NAME); 112 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 113 } 114 115 @AfterClass 116 public static void tearDownAfterClass() throws Exception { 117 IOUtils.closeQuietly(ASYNC_CONN); 118 TEST_UTIL.shutdownMiniCluster(); 119 } 120 121 @Before 122 public void setUp() throws IOException, InterruptedException { 123 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 124 CALLED.set(0); 125 } 126 127 @Test 128 public void testAppend() throws InterruptedException, ExecutionException { 129 assertEquals(0, CALLED.get()); 130 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 131 .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); 132 Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 133 // make sure we called twice and the result is still correct 134 assertEquals(2, CALLED.get()); 135 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 136 } 137 138 @Test 139 public void testAppendWhenReturnResultsEqualsFalse() throws InterruptedException, 140 ExecutionException { 141 assertEquals(0, CALLED.get()); 142 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 143 .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); 144 Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE) 145 .setReturnResults(false)).get(); 146 // make sure we called twice and the result is still correct 147 assertEquals(2, CALLED.get()); 148 assertTrue(result.isEmpty()); 149 } 150 151 @Test 152 public void testIncrement() throws InterruptedException, ExecutionException { 153 assertEquals(0, CALLED.get()); 154 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 155 .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); 156 assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); 157 // make sure we called twice and the result is still correct 158 assertEquals(2, CALLED.get()); 159 } 160 161 @Test 162 public void testIncrementWhenReturnResultsEqualsFalse() throws InterruptedException, 163 ExecutionException { 164 assertEquals(0, CALLED.get()); 165 AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) 166 .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); 167 Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L) 168 .setReturnResults(false)).get(); 169 // make sure we called twice and the result is still correct 170 assertEquals(2, CALLED.get()); 171 assertTrue(result.isEmpty()); 172 } 173}