001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutionException; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.filter.BinaryComparator; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.mockito.Mockito; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 060 061/** 062 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly. 063 */ 064@Category({ MediumTests.class, ClientTests.class }) 065public class TestMalformedCellFromClient { 066 private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class); 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); 070 071 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 073 private static final int CELL_SIZE = 100; 074 private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient"); 075 076 @BeforeClass 077 public static void setUpBeforeClass() throws Exception { 078 // disable the retry 079 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 080 TEST_UTIL.startMiniCluster(1); 081 } 082 083 @Before 084 public void before() throws Exception { 085 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 086 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 087 .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build(); 088 TEST_UTIL.getConnection().getAdmin().createTable(desc); 089 } 090 091 @After 092 public void tearDown() throws Exception { 093 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 094 TEST_UTIL.deleteTable(htd.getTableName()); 095 } 096 } 097 098 @AfterClass 099 public static void tearDownAfterClass() throws Exception { 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 /** 104 * The purpose of this ut is to check the consistency between the exception and results. 105 * If the RetriesExhaustedWithDetailsException contains the whole batch, 106 * each result should be of IOE. Otherwise, the row operation which is not in the exception 107 * should have a true result. 108 */ 109 @Test 110 public void testRegionException() throws InterruptedException, IOException { 111 List<Row> batches = new ArrayList<>(); 112 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 113 // the rm is used to prompt the region exception. 114 // see RSRpcServices#multi 115 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 116 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 117 batches.add(rm); 118 Object[] results = new Object[batches.size()]; 119 120 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 121 Throwable exceptionByCaught = null; 122 try { 123 table.batch(batches, results); 124 fail("Where is the exception? We put the malformed cells!!!"); 125 } catch (RetriesExhaustedWithDetailsException e) { 126 for (Throwable throwable : e.getCauses()) { 127 assertNotNull(throwable); 128 } 129 assertEquals(1, e.getNumExceptions()); 130 exceptionByCaught = e.getCause(0); 131 } 132 for (Object obj : results) { 133 assertNotNull(obj); 134 } 135 assertEquals(Result.class, results[0].getClass()); 136 assertEquals(exceptionByCaught.getClass(), results[1].getClass()); 137 Result result = table.get(new Get(Bytes.toBytes("good"))); 138 assertEquals(1, result.size()); 139 Cell cell = result.getColumnLatestCell(FAMILY, null); 140 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 141 } 142 } 143 144 /** 145 * This test verifies region exception doesn't corrupt the results of batch. The prescription is 146 * shown below. 1) honor the action result rather than region exception. If the action have both 147 * of true result and region exception, the action is fine as the exception is caused by other 148 * actions which are in the same region. 2) honor the action exception rather than region 149 * exception. If the action have both of action exception and region exception, we deal with the 150 * action exception only. If we also handle the region exception for the same action, it will 151 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 152 * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each 153 * result should be of IOE. Otherwise, the row operation which is not in the exception should have 154 * a true result. The no-cluster test is in TestAsyncProcessWithRegionException. 155 */ 156 @Test 157 public void testRegionExceptionByAsync() throws Exception { 158 List<Row> batches = new ArrayList<>(); 159 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 160 // the rm is used to prompt the region exception. 161 // see RSRpcServices#multi 162 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 163 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 164 batches.add(rm); 165 try (AsyncConnection asyncConnection = ConnectionFactory 166 .createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 167 AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME); 168 List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches); 169 assertEquals(2, results.size()); 170 try { 171 results.get(1).get(); 172 fail("Where is the exception? We put the malformed cells!!!"); 173 } catch (ExecutionException e) { 174 // pass 175 } 176 Result result = table.get(new Get(Bytes.toBytes("good"))).get(); 177 assertEquals(1, result.size()); 178 Cell cell = result.getColumnLatestCell(FAMILY, null); 179 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 180 } 181 } 182 183 /** 184 * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed. 185 * Currently, we have no client api to submit the request consisting of condition-rm and mutation. 186 * Hence, this test build the request manually. 187 */ 188 @Test 189 public void testAtomicOperations() throws Exception { 190 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 191 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 192 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10])); 193 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]); 194 195 // build the request 196 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 197 ClientProtos.MultiRequest request = 198 ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName())) 199 .addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter 200 .buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 201 r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder() 202 .setMutation( 203 ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put)))) 204 .build(); 205 206 List<Cell> cells = new ArrayList<>(); 207 for (Mutation m : rm.getMutations()) { 208 cells.addAll(m.getCellList(FAMILY)); 209 } 210 cells.addAll(put.getCellList(FAMILY)); 211 assertEquals(3, cells.size()); 212 HBaseRpcController controller = Mockito.mock(HBaseRpcController.class); 213 Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells)); 214 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer( 215 TEST_UTIL.getMiniHBaseCluster() 216 .getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName())); 217 218 ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request); 219 assertEquals(2, response.getRegionActionResultCount()); 220 assertTrue(response.getRegionActionResultList().get(0).hasException()); 221 assertFalse(response.getRegionActionResultList().get(1).hasException()); 222 assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount()); 223 assertTrue( 224 response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult()); 225 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 226 Result result = table.get(new Get(Bytes.toBytes("good"))); 227 assertEquals(1, result.size()); 228 Cell cell = result.getColumnLatestCell(FAMILY, null); 229 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 230 } 231 } 232 233 private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName) 234 throws IOException { 235 ClientProtos.RegionAction.Builder builder = RequestConverter 236 .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName); 237 builder.setAtomic(true); 238 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 239 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 240 ClientProtos.Condition condition = RequestConverter 241 .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]), 242 HBaseProtos.CompareType.EQUAL, null); 243 for (Mutation mutation : rm.getMutations()) { 244 ClientProtos.MutationProto.MutationType mutateType = null; 245 if (mutation instanceof Put) { 246 mutateType = ClientProtos.MutationProto.MutationType.PUT; 247 } else if (mutation instanceof Delete) { 248 mutateType = ClientProtos.MutationProto.MutationType.DELETE; 249 } else { 250 throw new DoNotRetryIOException( 251 "RowMutations supports only put and delete, not " + mutation.getClass().getName()); 252 } 253 mutationBuilder.clear(); 254 ClientProtos.MutationProto mp = 255 ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder); 256 actionBuilder.clear(); 257 actionBuilder.setMutation(mp); 258 builder.addAction(actionBuilder.build()); 259 } 260 ClientProtos.MultiRequest request = 261 ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) 262 .setCondition(condition).build(); 263 return request; 264 } 265 266 /** 267 * This test depends on how regionserver process the batch ops. 268 * 1) group the put/delete until meeting the increment 269 * 2) process the batch of put/delete 270 * 3) process the increment 271 * see RSRpcServices#doNonAtomicRegionMutation 272 */ 273 @Test 274 public void testNonAtomicOperations() throws InterruptedException, IOException { 275 Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100); 276 List<Row> batches = new ArrayList<>(); 277 // the first and second puts will be group by regionserver 278 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 279 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 280 // this Increment should succeed 281 batches.add(inc); 282 // this put should succeed 283 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1])); 284 Object[] objs = new Object[batches.size()]; 285 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 286 table.batch(batches, objs); 287 fail("Where is the exception? We put the malformed cells!!!"); 288 } catch (RetriesExhaustedWithDetailsException e) { 289 assertEquals(2, e.getNumExceptions()); 290 for (int i = 0; i != e.getNumExceptions(); ++i) { 291 assertNotNull(e.getCause(i)); 292 assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass()); 293 assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); 294 } 295 } finally { 296 assertObjects(objs, batches.size()); 297 assertTrue(objs[0] instanceof IOException); 298 assertTrue(objs[1] instanceof IOException); 299 assertEquals(Result.class, objs[2].getClass()); 300 assertEquals(Result.class, objs[3].getClass()); 301 } 302 } 303 304 @Test 305 public void testRowMutations() throws InterruptedException, IOException { 306 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]); 307 List<Row> batches = new ArrayList<>(); 308 RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); 309 // the first and second puts will be group by regionserver 310 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 311 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 312 batches.add(mutations); 313 // this bm should succeed 314 mutations = new RowMutations(Bytes.toBytes("good")); 315 mutations.add(put); 316 mutations.add(put); 317 batches.add(mutations); 318 Object[] objs = new Object[batches.size()]; 319 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 320 table.batch(batches, objs); 321 fail("Where is the exception? We put the malformed cells!!!"); 322 } catch (RetriesExhaustedWithDetailsException e) { 323 assertEquals(1, e.getNumExceptions()); 324 for (int i = 0; i != e.getNumExceptions(); ++i) { 325 assertNotNull(e.getCause(i)); 326 assertTrue(e.getCause(i) instanceof IOException); 327 assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); 328 } 329 } finally { 330 assertObjects(objs, batches.size()); 331 assertTrue(objs[0] instanceof IOException); 332 assertEquals(Result.class, objs[1].getClass()); 333 } 334 } 335 336 private static void assertObjects(Object[] objs, int expectedSize) { 337 int count = 0; 338 for (Object obj : objs) { 339 assertNotNull(obj); 340 ++count; 341 } 342 assertEquals(expectedSize, count); 343 } 344}