001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ExecutionException;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.filter.BinaryComparator;
039import org.apache.hadoop.hbase.ipc.HBaseRpcController;
040import org.apache.hadoop.hbase.regionserver.HRegion;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.After;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.mockito.Mockito;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
060
061/**
062 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
063 */
064@Category({ MediumTests.class, ClientTests.class })
065public class TestMalformedCellFromClient {
066  private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class);
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
070
071  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
072  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
073  private static final int CELL_SIZE = 100;
074  private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
075
076  @BeforeClass
077  public static void setUpBeforeClass() throws Exception {
078    // disable the retry
079    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
080    TEST_UTIL.startMiniCluster(1);
081  }
082
083  @Before
084  public void before() throws Exception {
085    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
086      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
087      .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
088    TEST_UTIL.getConnection().getAdmin().createTable(desc);
089  }
090
091  @After
092  public void tearDown() throws Exception {
093    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
094      TEST_UTIL.deleteTable(htd.getTableName());
095    }
096  }
097
098  @AfterClass
099  public static void tearDownAfterClass() throws Exception {
100    TEST_UTIL.shutdownMiniCluster();
101  }
102
103  /**
104   * The purpose of this ut is to check the consistency between the exception and results.
105   * If the RetriesExhaustedWithDetailsException contains the whole batch,
106   * each result should be of IOE. Otherwise, the row operation which is not in the exception
107   * should have a true result.
108   */
109  @Test
110  public void testRegionException() throws InterruptedException, IOException {
111    List<Row> batches = new ArrayList<>();
112    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
113    // the rm is used to prompt the region exception.
114    // see RSRpcServices#multi
115    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
116    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
117    batches.add(rm);
118    Object[] results = new Object[batches.size()];
119
120    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
121      Throwable exceptionByCaught = null;
122      try {
123        table.batch(batches, results);
124        fail("Where is the exception? We put the malformed cells!!!");
125      } catch (RetriesExhaustedWithDetailsException e) {
126        for (Throwable throwable : e.getCauses()) {
127          assertNotNull(throwable);
128        }
129        assertEquals(1, e.getNumExceptions());
130        exceptionByCaught = e.getCause(0);
131      }
132      for (Object obj : results) {
133        assertNotNull(obj);
134      }
135      assertEquals(Result.class, results[0].getClass());
136      assertEquals(exceptionByCaught.getClass(), results[1].getClass());
137      Result result = table.get(new Get(Bytes.toBytes("good")));
138      assertEquals(1, result.size());
139      Cell cell = result.getColumnLatestCell(FAMILY, null);
140      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
141    }
142  }
143
144  /**
145   * This test verifies region exception doesn't corrupt the results of batch. The prescription is
146   * shown below. 1) honor the action result rather than region exception. If the action have both
147   * of true result and region exception, the action is fine as the exception is caused by other
148   * actions which are in the same region. 2) honor the action exception rather than region
149   * exception. If the action have both of action exception and region exception, we deal with the
150   * action exception only. If we also handle the region exception for the same action, it will
151   * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
152   * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each
153   * result should be of IOE. Otherwise, the row operation which is not in the exception should have
154   * a true result. The no-cluster test is in TestAsyncProcessWithRegionException.
155   */
156  @Test
157  public void testRegionExceptionByAsync() throws Exception {
158    List<Row> batches = new ArrayList<>();
159    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
160    // the rm is used to prompt the region exception.
161    // see RSRpcServices#multi
162    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
163    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
164    batches.add(rm);
165    try (AsyncConnection asyncConnection = ConnectionFactory
166      .createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
167      AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
168      List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
169      assertEquals(2, results.size());
170      try {
171        results.get(1).get();
172        fail("Where is the exception? We put the malformed cells!!!");
173      } catch (ExecutionException e) {
174        // pass
175      }
176      Result result = table.get(new Get(Bytes.toBytes("good"))).get();
177      assertEquals(1, result.size());
178      Cell cell = result.getColumnLatestCell(FAMILY, null);
179      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
180    }
181  }
182
183  /**
184   * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
185   * Currently, we have no client api to submit the request consisting of condition-rm and mutation.
186   * Hence, this test build the request manually.
187   */
188  @Test
189  public void testAtomicOperations() throws Exception {
190    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
191    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
192    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
193    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
194
195    // build the request
196    HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
197    ClientProtos.MultiRequest request =
198      ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
199        .addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter
200          .buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
201            r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder()
202          .setMutation(
203            ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
204        .build();
205
206    List<Cell> cells = new ArrayList<>();
207    for (Mutation m : rm.getMutations()) {
208      cells.addAll(m.getCellList(FAMILY));
209    }
210    cells.addAll(put.getCellList(FAMILY));
211    assertEquals(3, cells.size());
212    HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
213    Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells));
214    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(
215      TEST_UTIL.getMiniHBaseCluster()
216        .getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName()));
217
218    ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
219    assertEquals(2, response.getRegionActionResultCount());
220    assertTrue(response.getRegionActionResultList().get(0).hasException());
221    assertFalse(response.getRegionActionResultList().get(1).hasException());
222    assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
223    assertTrue(
224      response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
225    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
226      Result result = table.get(new Get(Bytes.toBytes("good")));
227      assertEquals(1, result.size());
228      Cell cell = result.getColumnLatestCell(FAMILY, null);
229      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
230    }
231  }
232
233  private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
234    throws IOException {
235    ClientProtos.RegionAction.Builder builder = RequestConverter
236      .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
237    builder.setAtomic(true);
238    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
239    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
240    ClientProtos.Condition condition = RequestConverter
241      .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
242        HBaseProtos.CompareType.EQUAL, null);
243    for (Mutation mutation : rm.getMutations()) {
244      ClientProtos.MutationProto.MutationType mutateType = null;
245      if (mutation instanceof Put) {
246        mutateType = ClientProtos.MutationProto.MutationType.PUT;
247      } else if (mutation instanceof Delete) {
248        mutateType = ClientProtos.MutationProto.MutationType.DELETE;
249      } else {
250        throw new DoNotRetryIOException(
251          "RowMutations supports only put and delete, not " + mutation.getClass().getName());
252      }
253      mutationBuilder.clear();
254      ClientProtos.MutationProto mp =
255        ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
256      actionBuilder.clear();
257      actionBuilder.setMutation(mp);
258      builder.addAction(actionBuilder.build());
259    }
260    ClientProtos.MultiRequest request =
261      ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
262        .setCondition(condition).build();
263    return request;
264  }
265
266  /**
267   * This test depends on how regionserver process the batch ops.
268   * 1) group the put/delete until meeting the increment
269   * 2) process the batch of put/delete
270   * 3) process the increment
271   * see RSRpcServices#doNonAtomicRegionMutation
272   */
273  @Test
274  public void testNonAtomicOperations() throws InterruptedException, IOException {
275    Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100);
276    List<Row> batches = new ArrayList<>();
277    // the first and second puts will be group by regionserver
278    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
279    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
280    // this Increment should succeed
281    batches.add(inc);
282    // this put should succeed
283    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]));
284    Object[] objs = new Object[batches.size()];
285    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
286      table.batch(batches, objs);
287      fail("Where is the exception? We put the malformed cells!!!");
288    } catch (RetriesExhaustedWithDetailsException e) {
289      assertEquals(2, e.getNumExceptions());
290      for (int i = 0; i != e.getNumExceptions(); ++i) {
291        assertNotNull(e.getCause(i));
292        assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass());
293        assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
294      }
295    } finally {
296      assertObjects(objs, batches.size());
297      assertTrue(objs[0] instanceof IOException);
298      assertTrue(objs[1] instanceof IOException);
299      assertEquals(Result.class, objs[2].getClass());
300      assertEquals(Result.class, objs[3].getClass());
301    }
302  }
303
304  @Test
305  public void testRowMutations() throws InterruptedException, IOException {
306    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]);
307    List<Row> batches = new ArrayList<>();
308    RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
309    // the first and second puts will be group by regionserver
310    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
311    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
312    batches.add(mutations);
313    // this bm should succeed
314    mutations = new RowMutations(Bytes.toBytes("good"));
315    mutations.add(put);
316    mutations.add(put);
317    batches.add(mutations);
318    Object[] objs = new Object[batches.size()];
319    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
320      table.batch(batches, objs);
321      fail("Where is the exception? We put the malformed cells!!!");
322    } catch (RetriesExhaustedWithDetailsException e) {
323      assertEquals(1, e.getNumExceptions());
324      for (int i = 0; i != e.getNumExceptions(); ++i) {
325        assertNotNull(e.getCause(i));
326        assertTrue(e.getCause(i) instanceof IOException);
327        assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
328      }
329    } finally {
330      assertObjects(objs, batches.size());
331      assertTrue(objs[0] instanceof IOException);
332      assertEquals(Result.class, objs[1].getClass());
333    }
334  }
335
336  private static void assertObjects(Object[] objs, int expectedSize) {
337    int count = 0;
338    for (Object obj : objs) {
339      assertNotNull(obj);
340      ++count;
341    }
342    assertEquals(expectedSize, count);
343  }
344}