001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.mockito.Mockito;
046
047/**
048 * The purpose of this test is to make sure the region exception won't corrupt the results
049 * of batch. The prescription is shown below.
050 * 1) honor the action result rather than region exception. If the action have both of true result
051 * and region exception, the action is fine as the exception is caused by other actions
052 * which are in the same region.
053 * 2) honor the action exception rather than region exception. If the action have both of action
054 * exception and region exception, we deal with the action exception only. If we also
055 * handle the region exception for the same action, it will introduce the negative count of
056 * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
057 *
058 * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
059 * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
060 * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
061 * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
062 */
063@Category({ ClientTests.class, SmallTests.class })
064public class TestAsyncProcessWithRegionException {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
069
070  private static final Result EMPTY_RESULT = Result.create(null, true);
071  private static final IOException IOE = new IOException("YOU CAN'T PASS");
072  private static final Configuration CONF = new Configuration();
073  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
074  private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
075  private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
076  private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
077    Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
078  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
079  private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
080  private static final RegionInfo REGION_INFO =
081    RegionInfoBuilder.newBuilder(DUMMY_TABLE)
082      .setStartKey(HConstants.EMPTY_START_ROW)
083      .setEndKey(HConstants.EMPTY_END_ROW)
084      .setSplit(false)
085      .setRegionId(1)
086      .build();
087
088  private static final HRegionLocation REGION_LOCATION =
089    new HRegionLocation(REGION_INFO, SERVER_NAME);
090
091  @BeforeClass
092  public static void setUpBeforeClass() {
093    // disable the retry
094    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
095  }
096
097  @Test
098  public void testSuccessivePut() throws Exception {
099    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
100
101    List<Put> puts = new ArrayList<>(1);
102    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
103    final int expectedSize = puts.size();
104    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
105    arf.waitUntilDone();
106    Object[] result = arf.getResults();
107    assertEquals(expectedSize, result.length);
108    for (Object r : result) {
109      assertEquals(Result.class, r.getClass());
110    }
111    assertTrue(puts.isEmpty());
112    assertActionsInProgress(arf);
113  }
114
115  @Test
116  public void testFailedPut() throws Exception {
117    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
118
119    List<Put> puts = new ArrayList<>(2);
120    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
121    // this put should fail
122    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
123    final int expectedSize = puts.size();
124
125    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
126    arf.waitUntilDone();
127    // There is a failed puts
128    assertError(arf, 1);
129    Object[] result = arf.getResults();
130    assertEquals(expectedSize, result.length);
131    assertEquals(Result.class, result[0].getClass());
132    assertTrue(result[1] instanceof IOException);
133    assertTrue(puts.isEmpty());
134    assertActionsInProgress(arf);
135  }
136
137  @Test
138  public void testFailedPutWithoutActionException() throws Exception {
139    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
140
141    List<Put> puts = new ArrayList<>(3);
142    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
143    // this put should fail
144    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
145    // this put should fail, and it won't have action exception
146    puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
147    final int expectedSize = puts.size();
148
149    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
150    arf.waitUntilDone();
151    // There are two failed puts
152    assertError(arf, 2);
153    Object[] result = arf.getResults();
154    assertEquals(expectedSize, result.length);
155    assertEquals(Result.class, result[0].getClass());
156    assertTrue(result[1] instanceof IOException);
157    assertTrue(result[2] instanceof IOException);
158    assertTrue(puts.isEmpty());
159    assertActionsInProgress(arf);
160  }
161
162  private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
163    assertTrue(arf.hasError());
164    RetriesExhaustedWithDetailsException e = arf.getErrors();
165    List<Throwable> errors = e.getCauses();
166    assertEquals(expectedCountOfFailure, errors.size());
167    for (Throwable t : errors) {
168      assertTrue(t instanceof IOException);
169    }
170  }
171
172  private static void assertActionsInProgress(AsyncRequestFuture arf) {
173    if (arf instanceof AsyncRequestFutureImpl) {
174      assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
175    }
176  }
177
178  private static ClusterConnection createHConnection() throws IOException {
179    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
180    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
181    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
182    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
183    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
184    Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
185    setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
186    setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
187    Mockito
188      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
189      .thenReturn(Collections.singletonList(REGION_LOCATION));
190    return hc;
191  }
192
193  private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
194    throws IOException {
195    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
196      Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
197    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
198      Mockito.anyBoolean())).thenReturn(result);
199  }
200
201  private static class MyAsyncProcess extends AsyncProcess {
202    private final ExecutorService service = Executors.newFixedThreadPool(5);
203
204    MyAsyncProcess(ClusterConnection hc, Configuration conf) {
205      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
206    }
207
208    public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
209      throws InterruptedIOException {
210      return submit(AsyncProcessTask.newBuilder()
211        .setPool(service)
212        .setTableName(tableName)
213        .setRowAccess(rows)
214        .setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
215        .setNeedResults(true)
216        .setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
217        .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
218        .build());
219    }
220
221    @Override
222    protected RpcRetryingCaller<AbstractResponse> createCaller(
223      CancellableRegionServerCallable callable, int rpcTimeout) {
224      MultiServerCallable callable1 = (MultiServerCallable) callable;
225      MultiResponse mr = new MultiResponse();
226      callable1.getMulti().actions.forEach((regionName, actions) -> {
227        actions.forEach(action -> {
228          if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
229            mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
230          } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
231            mr.add(regionName, action.getOriginalIndex(), IOE);
232          }
233        });
234      });
235      mr.addException(REGION_INFO.getRegionName(), IOE);
236      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
237        @Override
238        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
239          int callTimeout) {
240          try {
241            // sleep one second in order for threadpool to start another thread instead of reusing
242            // existing one.
243            Thread.sleep(1000);
244          } catch (InterruptedException e) {
245            // pass
246          }
247          return mr;
248        }
249      };
250    }
251  }
252}