001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ExecutionException;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.ExtendedCell;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.PrivateCellUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.ipc.HBaseRpcController;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.jupiter.api.AfterAll;
047import org.junit.jupiter.api.AfterEach;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.mockito.Mockito;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
060
061/**
062 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
063 */
064@Tag(MediumTests.TAG)
065@Tag(ClientTests.TAG)
066public class TestMalformedCellFromClient {
067  private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class);
068
069  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
070  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
071  private static final int CELL_SIZE = 100;
072  private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
073
074  @BeforeAll
075  public static void setUpBeforeClass() throws Exception {
076    // disable the retry
077    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
078    TEST_UTIL.startMiniCluster(1);
079  }
080
081  @BeforeEach
082  public void before() throws Exception {
083    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
084      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
085      .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
086    TEST_UTIL.getConnection().getAdmin().createTable(desc);
087  }
088
089  @AfterEach
090  public void tearDown() throws Exception {
091    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
092      TEST_UTIL.deleteTable(htd.getTableName());
093    }
094  }
095
096  @AfterAll
097  public static void tearDownAfterClass() throws Exception {
098    TEST_UTIL.shutdownMiniCluster();
099  }
100
101  /**
102   * The purpose of this ut is to check the consistency between the exception and results. If the
103   * RetriesExhaustedWithDetailsException contains the whole batch, each result should be of IOE.
104   * Otherwise, the row operation which is not in the exception should have a true result.
105   */
106  @Test
107  public void testRegionException() throws InterruptedException, IOException {
108    List<Row> batches = new ArrayList<>();
109    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
110    // the rm is used to prompt the region exception.
111    // see RSRpcServices#multi
112    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
113    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
114    batches.add(rm);
115    Object[] results = new Object[batches.size()];
116
117    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
118      Throwable exceptionByCaught = null;
119      try {
120        table.batch(batches, results);
121        fail("Where is the exception? We put the malformed cells!!!");
122      } catch (RetriesExhaustedException e) {
123        exceptionByCaught = e.getCause();
124      }
125      for (Object obj : results) {
126        assertNotNull(obj);
127      }
128      assertEquals(Result.class, results[0].getClass());
129      assertEquals(exceptionByCaught.getClass(), results[1].getClass());
130      Result result = table.get(new Get(Bytes.toBytes("good")));
131      assertEquals(1, result.size());
132      Cell cell = result.getColumnLatestCell(FAMILY, null);
133      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
134    }
135  }
136
137  /**
138   * This test verifies region exception doesn't corrupt the results of batch. The prescription is
139   * shown below. 1) honor the action result rather than region exception. If the action have both
140   * of true result and region exception, the action is fine as the exception is caused by other
141   * actions which are in the same region. 2) honor the action exception rather than region
142   * exception. If the action have both of action exception and region exception, we deal with the
143   * action exception only. If we also handle the region exception for the same action, it will
144   * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
145   * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each
146   * result should be of IOE. Otherwise, the row operation which is not in the exception should have
147   * a true result. The no-cluster test is in TestAsyncProcessWithRegionException.
148   */
149  @Test
150  public void testRegionExceptionByAsync() throws Exception {
151    List<Row> batches = new ArrayList<>();
152    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
153    // the rm is used to prompt the region exception.
154    // see RSRpcServices#multi
155    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
156    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
157    batches.add(rm);
158    try (AsyncConnection asyncConnection =
159      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
160      AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
161      List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
162      assertEquals(2, results.size());
163      try {
164        results.get(1).get();
165        fail("Where is the exception? We put the malformed cells!!!");
166      } catch (ExecutionException e) {
167        // pass
168      }
169      Result result = table.get(new Get(Bytes.toBytes("good"))).get();
170      assertEquals(1, result.size());
171      Cell cell = result.getColumnLatestCell(FAMILY, null);
172      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
173    }
174  }
175
176  /**
177   * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
178   * Currently, we have no client api to submit the request consisting of condition-rm and mutation.
179   * Hence, this test build the request manually.
180   */
181  @Test
182  public void testAtomicOperations() throws Exception {
183    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
184    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
185    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
186    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
187
188    // build the request
189    HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
190    ClientProtos.MultiRequest request =
191      ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
192        .addRegionAction(ClientProtos.RegionAction.newBuilder()
193          .setRegion(RequestConverter.buildRegionSpecifier(
194            HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
195            r.getRegionInfo().getRegionName()))
196          .addAction(ClientProtos.Action.newBuilder().setMutation(
197            ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
198        .build();
199
200    List<ExtendedCell> cells = new ArrayList<>();
201    for (Mutation m : rm.getMutations()) {
202      cells.addAll(m.getCellList(FAMILY));
203    }
204    cells.addAll(put.getCellList(FAMILY));
205    assertEquals(3, cells.size());
206    HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
207    Mockito.when(controller.cellScanner())
208      .thenReturn(PrivateCellUtil.createExtendedCellScanner(cells));
209    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(TEST_UTIL
210      .getMiniHBaseCluster().getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName()));
211
212    ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
213    assertEquals(2, response.getRegionActionResultCount());
214    assertTrue(response.getRegionActionResultList().get(0).hasException());
215    assertFalse(response.getRegionActionResultList().get(1).hasException());
216    assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
217    assertTrue(
218      response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
219    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
220      Result result = table.get(new Get(Bytes.toBytes("good")));
221      assertEquals(1, result.size());
222      Cell cell = result.getColumnLatestCell(FAMILY, null);
223      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
224    }
225  }
226
227  private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
228    throws IOException {
229    ClientProtos.RegionAction.Builder builder = RequestConverter
230      .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
231    builder.setAtomic(true);
232    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
233    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
234    ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null,
235      CompareOperator.EQUAL, new byte[10], null, null, false);
236    for (Mutation mutation : rm.getMutations()) {
237      ClientProtos.MutationProto.MutationType mutateType = null;
238      if (mutation instanceof Put) {
239        mutateType = ClientProtos.MutationProto.MutationType.PUT;
240      } else if (mutation instanceof Delete) {
241        mutateType = ClientProtos.MutationProto.MutationType.DELETE;
242      } else {
243        throw new DoNotRetryIOException(
244          "RowMutations supports only put and delete, not " + mutation.getClass().getName());
245      }
246      mutationBuilder.clear();
247      ClientProtos.MutationProto mp =
248        ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
249      actionBuilder.clear();
250      actionBuilder.setMutation(mp);
251      builder.addAction(actionBuilder.build());
252    }
253    ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder()
254      .addRegionAction(builder.setCondition(condition).build()).build();
255    return request;
256  }
257
258  /**
259   * This test depends on how regionserver process the batch ops. 1) group the put/delete until
260   * meeting the increment 2) process the batch of put/delete 3) process the increment see
261   * RSRpcServices#doNonAtomicRegionMutation
262   */
263  @Test
264  public void testNonAtomicOperations() throws InterruptedException, IOException {
265    Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100);
266    List<Row> batches = new ArrayList<>();
267    // the first and second puts will be group by regionserver
268    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
269    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
270    // this Increment should succeed
271    batches.add(inc);
272    // this put should succeed
273    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]));
274    Object[] objs = new Object[batches.size()];
275    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
276      table.batch(batches, objs);
277      fail("Where is the exception? We put the malformed cells!!!");
278    } catch (RetriesExhaustedException e) {
279      Throwable error = e.getCause();
280      for (;;) {
281        assertNotNull(error, "Can not find a DoNotRetryIOException on stack trace");
282        if (error instanceof DoNotRetryIOException) {
283          break;
284        }
285        error = error.getCause();
286      }
287    } finally {
288      assertObjects(objs, batches.size());
289      assertTrue(objs[0] instanceof IOException);
290      assertTrue(objs[1] instanceof IOException);
291      assertEquals(Result.class, objs[2].getClass());
292      assertEquals(Result.class, objs[3].getClass());
293    }
294  }
295
296  @Test
297  public void testRowMutations() throws InterruptedException, IOException {
298    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]);
299    List<Row> batches = new ArrayList<>();
300    RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
301    // the first and second puts will be group by regionserver
302    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
303    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
304    batches.add(mutations);
305    // this bm should succeed
306    mutations = new RowMutations(Bytes.toBytes("good"));
307    mutations.add(put);
308    mutations.add(put);
309    batches.add(mutations);
310    Object[] objs = new Object[batches.size()];
311    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
312      table.batch(batches, objs);
313      fail("Where is the exception? We put the malformed cells!!!");
314    } catch (RetriesExhaustedException e) {
315      Throwable error = e.getCause();
316      for (;;) {
317        assertNotNull(error, "Can not find a DoNotRetryIOException on stack trace");
318        if (error instanceof DoNotRetryIOException) {
319          break;
320        }
321        error = error.getCause();
322      }
323    } finally {
324      assertObjects(objs, batches.size());
325      assertTrue(objs[0] instanceof IOException);
326      assertEquals(Result.class, objs[1].getClass());
327    }
328  }
329
330  private static void assertObjects(Object[] objs, int expectedSize) {
331    int count = 0;
332    for (Object obj : objs) {
333      assertNotNull(obj);
334      ++count;
335    }
336    assertEquals(expectedSize, count);
337  }
338}