001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.mockito.Mockito; 046 047/** 048 * The purpose of this test is to make sure the region exception won't corrupt the results of batch. 049 * The prescription is shown below. 1) honor the action result rather than region exception. If the 050 * action have both of true result and region exception, the action is fine as the exception is 051 * caused by other actions which are in the same region. 2) honor the action exception rather than 052 * region exception. If the action have both of action exception and region exception, we deal with 053 * the action exception only. If we also handle the region exception for the same action, it will 054 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 055 * block forever. This bug can be reproduced by real use case. see TestMalformedCellFromClient(in 056 * branch-1.4+). It uses the batch of RowMutations to present the bug. Given that the batch of 057 * RowMutations is only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't 058 * encounter this issue. We still backport the fix to branch-1.3 and branch-1.2 in case we ignore 059 * some write paths. 060 */ 061@Category({ ClientTests.class, SmallTests.class }) 062public class TestAsyncProcessWithRegionException { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class); 067 068 private static final Result EMPTY_RESULT = Result.create(null, true); 069 private static final IOException IOE = new IOException("YOU CAN'T PASS"); 070 private static final Configuration CONF = new Configuration(); 071 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 072 private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW"); 073 private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW"); 074 private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION = 075 Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION"); 076 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 077 private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1"); 078 private static final RegionInfo REGION_INFO = 079 RegionInfoBuilder.newBuilder(DUMMY_TABLE).setStartKey(HConstants.EMPTY_START_ROW) 080 .setEndKey(HConstants.EMPTY_END_ROW).setSplit(false).setRegionId(1).build(); 081 082 private static final HRegionLocation REGION_LOCATION = 083 new HRegionLocation(REGION_INFO, SERVER_NAME); 084 085 @BeforeClass 086 public static void setUpBeforeClass() { 087 // disable the retry 088 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 089 } 090 091 @Test 092 public void testSuccessivePut() throws Exception { 093 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 094 095 List<Put> puts = new ArrayList<>(1); 096 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 097 final int expectedSize = puts.size(); 098 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 099 arf.waitUntilDone(); 100 Object[] result = arf.getResults(); 101 assertEquals(expectedSize, result.length); 102 for (Object r : result) { 103 assertEquals(Result.class, r.getClass()); 104 } 105 assertTrue(puts.isEmpty()); 106 assertActionsInProgress(arf); 107 } 108 109 @Test 110 public void testFailedPut() throws Exception { 111 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 112 113 List<Put> puts = new ArrayList<>(2); 114 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 115 // this put should fail 116 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 117 final int expectedSize = puts.size(); 118 119 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 120 arf.waitUntilDone(); 121 // There is a failed puts 122 assertError(arf, 1); 123 Object[] result = arf.getResults(); 124 assertEquals(expectedSize, result.length); 125 assertEquals(Result.class, result[0].getClass()); 126 assertTrue(result[1] instanceof IOException); 127 assertTrue(puts.isEmpty()); 128 assertActionsInProgress(arf); 129 } 130 131 @Test 132 public void testFailedPutWithoutActionException() throws Exception { 133 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 134 135 List<Put> puts = new ArrayList<>(3); 136 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 137 // this put should fail 138 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 139 // this put should fail, and it won't have action exception 140 puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY)); 141 final int expectedSize = puts.size(); 142 143 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 144 arf.waitUntilDone(); 145 // There are two failed puts 146 assertError(arf, 2); 147 Object[] result = arf.getResults(); 148 assertEquals(expectedSize, result.length); 149 assertEquals(Result.class, result[0].getClass()); 150 assertTrue(result[1] instanceof IOException); 151 assertTrue(result[2] instanceof IOException); 152 assertTrue(puts.isEmpty()); 153 assertActionsInProgress(arf); 154 } 155 156 private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) { 157 assertTrue(arf.hasError()); 158 RetriesExhaustedWithDetailsException e = arf.getErrors(); 159 List<Throwable> errors = e.getCauses(); 160 assertEquals(expectedCountOfFailure, errors.size()); 161 for (Throwable t : errors) { 162 assertTrue(t instanceof IOException); 163 } 164 } 165 166 private static void assertActionsInProgress(AsyncRequestFuture arf) { 167 if (arf instanceof AsyncRequestFutureImpl) { 168 assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress()); 169 } 170 } 171 172 private static ClusterConnection createHConnection() throws IOException { 173 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 174 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 175 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 176 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 177 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 178 Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF)); 179 setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION)); 180 setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION)); 181 Mockito 182 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 183 .thenReturn(Collections.singletonList(REGION_LOCATION)); 184 return hc; 185 } 186 187 private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) 188 throws IOException { 189 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 190 Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 191 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 192 Mockito.anyBoolean())).thenReturn(result); 193 } 194 195 private static class MyAsyncProcess extends AsyncProcess { 196 private final ExecutorService service = Executors.newFixedThreadPool(5); 197 198 MyAsyncProcess(ClusterConnection hc, Configuration conf) { 199 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 200 } 201 202 public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows) 203 throws InterruptedIOException { 204 return submit(AsyncProcessTask.newBuilder().setPool(service).setTableName(tableName) 205 .setRowAccess(rows).setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL) 206 .setNeedResults(true).setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT) 207 .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT).build()); 208 } 209 210 @Override 211 protected RpcRetryingCaller<AbstractResponse> 212 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 213 MultiServerCallable callable1 = (MultiServerCallable) callable; 214 MultiResponse mr = new MultiResponse(); 215 callable1.getMulti().actions.forEach((regionName, actions) -> { 216 actions.forEach(action -> { 217 if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) { 218 mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT); 219 } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) { 220 mr.add(regionName, action.getOriginalIndex(), IOE); 221 } 222 }); 223 }); 224 mr.addException(REGION_INFO.getRegionName(), IOE); 225 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) { 226 @Override 227 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 228 int callTimeout) { 229 try { 230 // sleep one second in order for threadpool to start another thread instead of reusing 231 // existing one. 232 Thread.sleep(1000); 233 } catch (InterruptedException e) { 234 // pass 235 } 236 return mr; 237 } 238 }; 239 } 240 } 241}