001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.mockito.Mockito;
046
047/**
048 * The purpose of this test is to make sure the region exception won't corrupt the results of batch.
049 * The prescription is shown below. 1) honor the action result rather than region exception. If the
050 * action have both of true result and region exception, the action is fine as the exception is
051 * caused by other actions which are in the same region. 2) honor the action exception rather than
052 * region exception. If the action have both of action exception and region exception, we deal with
053 * the action exception only. If we also handle the region exception for the same action, it will
054 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
055 * block forever. This bug can be reproduced by real use case. see TestMalformedCellFromClient(in
056 * branch-1.4+). It uses the batch of RowMutations to present the bug. Given that the batch of
057 * RowMutations is only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't
058 * encounter this issue. We still backport the fix to branch-1.3 and branch-1.2 in case we ignore
059 * some write paths.
060 */
061@Category({ ClientTests.class, SmallTests.class })
062public class TestAsyncProcessWithRegionException {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
067
068  private static final Result EMPTY_RESULT = Result.create(null, true);
069  private static final IOException IOE = new IOException("YOU CAN'T PASS");
070  private static final Configuration CONF = new Configuration();
071  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
072  private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
073  private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
074  private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
075    Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
076  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
077  private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
078  private static final RegionInfo REGION_INFO =
079    RegionInfoBuilder.newBuilder(DUMMY_TABLE).setStartKey(HConstants.EMPTY_START_ROW)
080      .setEndKey(HConstants.EMPTY_END_ROW).setSplit(false).setRegionId(1).build();
081
082  private static final HRegionLocation REGION_LOCATION =
083    new HRegionLocation(REGION_INFO, SERVER_NAME);
084
085  @BeforeClass
086  public static void setUpBeforeClass() {
087    // disable the retry
088    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
089  }
090
091  @Test
092  public void testSuccessivePut() throws Exception {
093    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
094
095    List<Put> puts = new ArrayList<>(1);
096    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
097    final int expectedSize = puts.size();
098    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
099    arf.waitUntilDone();
100    Object[] result = arf.getResults();
101    assertEquals(expectedSize, result.length);
102    for (Object r : result) {
103      assertEquals(Result.class, r.getClass());
104    }
105    assertTrue(puts.isEmpty());
106    assertActionsInProgress(arf);
107  }
108
109  @Test
110  public void testFailedPut() throws Exception {
111    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
112
113    List<Put> puts = new ArrayList<>(2);
114    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
115    // this put should fail
116    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
117    final int expectedSize = puts.size();
118
119    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
120    arf.waitUntilDone();
121    // There is a failed puts
122    assertError(arf, 1);
123    Object[] result = arf.getResults();
124    assertEquals(expectedSize, result.length);
125    assertEquals(Result.class, result[0].getClass());
126    assertTrue(result[1] instanceof IOException);
127    assertTrue(puts.isEmpty());
128    assertActionsInProgress(arf);
129  }
130
131  @Test
132  public void testFailedPutWithoutActionException() throws Exception {
133    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
134
135    List<Put> puts = new ArrayList<>(3);
136    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
137    // this put should fail
138    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
139    // this put should fail, and it won't have action exception
140    puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
141    final int expectedSize = puts.size();
142
143    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
144    arf.waitUntilDone();
145    // There are two failed puts
146    assertError(arf, 2);
147    Object[] result = arf.getResults();
148    assertEquals(expectedSize, result.length);
149    assertEquals(Result.class, result[0].getClass());
150    assertTrue(result[1] instanceof IOException);
151    assertTrue(result[2] instanceof IOException);
152    assertTrue(puts.isEmpty());
153    assertActionsInProgress(arf);
154  }
155
156  private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
157    assertTrue(arf.hasError());
158    RetriesExhaustedWithDetailsException e = arf.getErrors();
159    List<Throwable> errors = e.getCauses();
160    assertEquals(expectedCountOfFailure, errors.size());
161    for (Throwable t : errors) {
162      assertTrue(t instanceof IOException);
163    }
164  }
165
166  private static void assertActionsInProgress(AsyncRequestFuture arf) {
167    if (arf instanceof AsyncRequestFutureImpl) {
168      assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
169    }
170  }
171
172  private static ClusterConnection createHConnection() throws IOException {
173    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
174    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
175    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
176    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
177    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
178    Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
179    setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
180    setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
181    Mockito
182      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
183      .thenReturn(Collections.singletonList(REGION_LOCATION));
184    return hc;
185  }
186
187  private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
188    throws IOException {
189    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
190      Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
191    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
192      Mockito.anyBoolean())).thenReturn(result);
193  }
194
195  private static class MyAsyncProcess extends AsyncProcess {
196    private final ExecutorService service = Executors.newFixedThreadPool(5);
197
198    MyAsyncProcess(ClusterConnection hc, Configuration conf) {
199      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
200    }
201
202    public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
203      throws InterruptedIOException {
204      return submit(AsyncProcessTask.newBuilder().setPool(service).setTableName(tableName)
205        .setRowAccess(rows).setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
206        .setNeedResults(true).setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
207        .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT).build());
208    }
209
210    @Override
211    protected RpcRetryingCaller<AbstractResponse>
212      createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
213      MultiServerCallable callable1 = (MultiServerCallable) callable;
214      MultiResponse mr = new MultiResponse();
215      callable1.getMulti().actions.forEach((regionName, actions) -> {
216        actions.forEach(action -> {
217          if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
218            mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
219          } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
220            mr.add(regionName, action.getOriginalIndex(), IOE);
221          }
222        });
223      });
224      mr.addException(REGION_INFO.getRegionName(), IOE);
225      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
226        @Override
227        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
228          int callTimeout) {
229          try {
230            // sleep one second in order for threadpool to start another thread instead of reusing
231            // existing one.
232            Thread.sleep(1000);
233          } catch (InterruptedException e) {
234            // pass
235          }
236          return mr;
237        }
238      };
239    }
240  }
241}