001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.mockito.Mockito; 046 047/** 048 * The purpose of this test is to make sure the region exception won't corrupt the results 049 * of batch. The prescription is shown below. 050 * 1) honor the action result rather than region exception. If the action have both of true result 051 * and region exception, the action is fine as the exception is caused by other actions 052 * which are in the same region. 053 * 2) honor the action exception rather than region exception. If the action have both of action 054 * exception and region exception, we deal with the action exception only. If we also 055 * handle the region exception for the same action, it will introduce the negative count of 056 * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever. 057 * 058 * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+). 059 * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is 060 * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue. 061 * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths. 062 */ 063@Category({ ClientTests.class, SmallTests.class }) 064public class TestAsyncProcessWithRegionException { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class); 069 070 private static final Result EMPTY_RESULT = Result.create(null, true); 071 private static final IOException IOE = new IOException("YOU CAN'T PASS"); 072 private static final Configuration CONF = new Configuration(); 073 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 074 private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW"); 075 private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW"); 076 private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION = 077 Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION"); 078 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 079 private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1"); 080 private static final RegionInfo REGION_INFO = 081 RegionInfoBuilder.newBuilder(DUMMY_TABLE) 082 .setStartKey(HConstants.EMPTY_START_ROW) 083 .setEndKey(HConstants.EMPTY_END_ROW) 084 .setSplit(false) 085 .setRegionId(1) 086 .build(); 087 088 private static final HRegionLocation REGION_LOCATION = 089 new HRegionLocation(REGION_INFO, SERVER_NAME); 090 091 @BeforeClass 092 public static void setUpBeforeClass() { 093 // disable the retry 094 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 095 } 096 097 @Test 098 public void testSuccessivePut() throws Exception { 099 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 100 101 List<Put> puts = new ArrayList<>(1); 102 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 103 final int expectedSize = puts.size(); 104 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 105 arf.waitUntilDone(); 106 Object[] result = arf.getResults(); 107 assertEquals(expectedSize, result.length); 108 for (Object r : result) { 109 assertEquals(Result.class, r.getClass()); 110 } 111 assertTrue(puts.isEmpty()); 112 assertActionsInProgress(arf); 113 } 114 115 @Test 116 public void testFailedPut() throws Exception { 117 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 118 119 List<Put> puts = new ArrayList<>(2); 120 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 121 // this put should fail 122 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 123 final int expectedSize = puts.size(); 124 125 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 126 arf.waitUntilDone(); 127 // There is a failed puts 128 assertError(arf, 1); 129 Object[] result = arf.getResults(); 130 assertEquals(expectedSize, result.length); 131 assertEquals(Result.class, result[0].getClass()); 132 assertTrue(result[1] instanceof IOException); 133 assertTrue(puts.isEmpty()); 134 assertActionsInProgress(arf); 135 } 136 137 @Test 138 public void testFailedPutWithoutActionException() throws Exception { 139 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 140 141 List<Put> puts = new ArrayList<>(3); 142 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 143 // this put should fail 144 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 145 // this put should fail, and it won't have action exception 146 puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY)); 147 final int expectedSize = puts.size(); 148 149 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 150 arf.waitUntilDone(); 151 // There are two failed puts 152 assertError(arf, 2); 153 Object[] result = arf.getResults(); 154 assertEquals(expectedSize, result.length); 155 assertEquals(Result.class, result[0].getClass()); 156 assertTrue(result[1] instanceof IOException); 157 assertTrue(result[2] instanceof IOException); 158 assertTrue(puts.isEmpty()); 159 assertActionsInProgress(arf); 160 } 161 162 private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) { 163 assertTrue(arf.hasError()); 164 RetriesExhaustedWithDetailsException e = arf.getErrors(); 165 List<Throwable> errors = e.getCauses(); 166 assertEquals(expectedCountOfFailure, errors.size()); 167 for (Throwable t : errors) { 168 assertTrue(t instanceof IOException); 169 } 170 } 171 172 private static void assertActionsInProgress(AsyncRequestFuture arf) { 173 if (arf instanceof AsyncRequestFutureImpl) { 174 assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress()); 175 } 176 } 177 178 private static ClusterConnection createHConnection() throws IOException { 179 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 180 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 181 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 182 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 183 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 184 Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF)); 185 setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION)); 186 setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION)); 187 Mockito 188 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 189 .thenReturn(Collections.singletonList(REGION_LOCATION)); 190 return hc; 191 } 192 193 private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) 194 throws IOException { 195 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 196 Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 197 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 198 Mockito.anyBoolean())).thenReturn(result); 199 } 200 201 private static class MyAsyncProcess extends AsyncProcess { 202 private final ExecutorService service = Executors.newFixedThreadPool(5); 203 204 MyAsyncProcess(ClusterConnection hc, Configuration conf) { 205 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 206 } 207 208 public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows) 209 throws InterruptedIOException { 210 return submit(AsyncProcessTask.newBuilder() 211 .setPool(service) 212 .setTableName(tableName) 213 .setRowAccess(rows) 214 .setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL) 215 .setNeedResults(true) 216 .setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT) 217 .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT) 218 .build()); 219 } 220 221 @Override 222 protected RpcRetryingCaller<AbstractResponse> createCaller( 223 CancellableRegionServerCallable callable, int rpcTimeout) { 224 MultiServerCallable callable1 = (MultiServerCallable) callable; 225 MultiResponse mr = new MultiResponse(); 226 callable1.getMulti().actions.forEach((regionName, actions) -> { 227 actions.forEach(action -> { 228 if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) { 229 mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT); 230 } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) { 231 mr.add(regionName, action.getOriginalIndex(), IOE); 232 } 233 }); 234 }); 235 mr.addException(REGION_INFO.getRegionName(), IOE); 236 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) { 237 @Override 238 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 239 int callTimeout) { 240 try { 241 // sleep one second in order for threadpool to start another thread instead of reusing 242 // existing one. 243 Thread.sleep(1000); 244 } catch (InterruptedException e) { 245 // pass 246 } 247 return mr; 248 } 249 }; 250 } 251 } 252}