001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutionException; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.mockito.Mockito; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 060 061/** 062 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly. 063 */ 064@Category({ MediumTests.class, ClientTests.class }) 065public class TestMalformedCellFromClient { 066 private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class); 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); 070 071 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 072 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 073 private static final int CELL_SIZE = 100; 074 private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient"); 075 076 @BeforeClass 077 public static void setUpBeforeClass() throws Exception { 078 // disable the retry 079 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 080 TEST_UTIL.startMiniCluster(1); 081 } 082 083 @Before 084 public void before() throws Exception { 085 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 086 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 087 .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build(); 088 TEST_UTIL.getConnection().getAdmin().createTable(desc); 089 } 090 091 @After 092 public void tearDown() throws Exception { 093 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 094 TEST_UTIL.deleteTable(htd.getTableName()); 095 } 096 } 097 098 @AfterClass 099 public static void tearDownAfterClass() throws Exception { 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 /** 104 * The purpose of this ut is to check the consistency between the exception and results. If the 105 * RetriesExhaustedWithDetailsException contains the whole batch, each result should be of IOE. 106 * Otherwise, the row operation which is not in the exception should have a true result. 107 */ 108 @Test 109 public void testRegionException() throws InterruptedException, IOException { 110 List<Row> batches = new ArrayList<>(); 111 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 112 // the rm is used to prompt the region exception. 113 // see RSRpcServices#multi 114 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 115 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 116 batches.add(rm); 117 Object[] results = new Object[batches.size()]; 118 119 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 120 Throwable exceptionByCaught = null; 121 try { 122 table.batch(batches, results); 123 fail("Where is the exception? We put the malformed cells!!!"); 124 } catch (RetriesExhaustedException e) { 125 exceptionByCaught = e.getCause(); 126 } 127 for (Object obj : results) { 128 assertNotNull(obj); 129 } 130 assertEquals(Result.class, results[0].getClass()); 131 assertEquals(exceptionByCaught.getClass(), results[1].getClass()); 132 Result result = table.get(new Get(Bytes.toBytes("good"))); 133 assertEquals(1, result.size()); 134 Cell cell = result.getColumnLatestCell(FAMILY, null); 135 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 136 } 137 } 138 139 /** 140 * This test verifies region exception doesn't corrupt the results of batch. The prescription is 141 * shown below. 1) honor the action result rather than region exception. If the action have both 142 * of true result and region exception, the action is fine as the exception is caused by other 143 * actions which are in the same region. 2) honor the action exception rather than region 144 * exception. If the action have both of action exception and region exception, we deal with the 145 * action exception only. If we also handle the region exception for the same action, it will 146 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 147 * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each 148 * result should be of IOE. Otherwise, the row operation which is not in the exception should have 149 * a true result. The no-cluster test is in TestAsyncProcessWithRegionException. 150 */ 151 @Test 152 public void testRegionExceptionByAsync() throws Exception { 153 List<Row> batches = new ArrayList<>(); 154 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 155 // the rm is used to prompt the region exception. 156 // see RSRpcServices#multi 157 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 158 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 159 batches.add(rm); 160 try (AsyncConnection asyncConnection = 161 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 162 AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME); 163 List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches); 164 assertEquals(2, results.size()); 165 try { 166 results.get(1).get(); 167 fail("Where is the exception? We put the malformed cells!!!"); 168 } catch (ExecutionException e) { 169 // pass 170 } 171 Result result = table.get(new Get(Bytes.toBytes("good"))).get(); 172 assertEquals(1, result.size()); 173 Cell cell = result.getColumnLatestCell(FAMILY, null); 174 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 175 } 176 } 177 178 /** 179 * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed. 180 * Currently, we have no client api to submit the request consisting of condition-rm and mutation. 181 * Hence, this test build the request manually. 182 */ 183 @Test 184 public void testAtomicOperations() throws Exception { 185 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 186 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 187 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10])); 188 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]); 189 190 // build the request 191 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 192 ClientProtos.MultiRequest request = 193 ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName())) 194 .addRegionAction(ClientProtos.RegionAction.newBuilder() 195 .setRegion(RequestConverter.buildRegionSpecifier( 196 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 197 r.getRegionInfo().getRegionName())) 198 .addAction(ClientProtos.Action.newBuilder().setMutation( 199 ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put)))) 200 .build(); 201 202 List<Cell> cells = new ArrayList<>(); 203 for (Mutation m : rm.getMutations()) { 204 cells.addAll(m.getCellList(FAMILY)); 205 } 206 cells.addAll(put.getCellList(FAMILY)); 207 assertEquals(3, cells.size()); 208 HBaseRpcController controller = Mockito.mock(HBaseRpcController.class); 209 Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells)); 210 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(TEST_UTIL 211 .getMiniHBaseCluster().getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName())); 212 213 ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request); 214 assertEquals(2, response.getRegionActionResultCount()); 215 assertTrue(response.getRegionActionResultList().get(0).hasException()); 216 assertFalse(response.getRegionActionResultList().get(1).hasException()); 217 assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount()); 218 assertTrue( 219 response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult()); 220 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 221 Result result = table.get(new Get(Bytes.toBytes("good"))); 222 assertEquals(1, result.size()); 223 Cell cell = result.getColumnLatestCell(FAMILY, null); 224 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 225 } 226 } 227 228 private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName) 229 throws IOException { 230 ClientProtos.RegionAction.Builder builder = RequestConverter 231 .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName); 232 builder.setAtomic(true); 233 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 234 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 235 ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null, 236 CompareOperator.EQUAL, new byte[10], null, null); 237 for (Mutation mutation : rm.getMutations()) { 238 ClientProtos.MutationProto.MutationType mutateType = null; 239 if (mutation instanceof Put) { 240 mutateType = ClientProtos.MutationProto.MutationType.PUT; 241 } else if (mutation instanceof Delete) { 242 mutateType = ClientProtos.MutationProto.MutationType.DELETE; 243 } else { 244 throw new DoNotRetryIOException( 245 "RowMutations supports only put and delete, not " + mutation.getClass().getName()); 246 } 247 mutationBuilder.clear(); 248 ClientProtos.MutationProto mp = 249 ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder); 250 actionBuilder.clear(); 251 actionBuilder.setMutation(mp); 252 builder.addAction(actionBuilder.build()); 253 } 254 ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder() 255 .addRegionAction(builder.setCondition(condition).build()).build(); 256 return request; 257 } 258 259 /** 260 * This test depends on how regionserver process the batch ops. 1) group the put/delete until 261 * meeting the increment 2) process the batch of put/delete 3) process the increment see 262 * RSRpcServices#doNonAtomicRegionMutation 263 */ 264 @Test 265 public void testNonAtomicOperations() throws InterruptedException, IOException { 266 Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100); 267 List<Row> batches = new ArrayList<>(); 268 // the first and second puts will be group by regionserver 269 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 270 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 271 // this Increment should succeed 272 batches.add(inc); 273 // this put should succeed 274 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1])); 275 Object[] objs = new Object[batches.size()]; 276 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 277 table.batch(batches, objs); 278 fail("Where is the exception? We put the malformed cells!!!"); 279 } catch (RetriesExhaustedException e) { 280 Throwable error = e.getCause(); 281 for (;;) { 282 assertNotNull("Can not find a DoNotRetryIOException on stack trace", error); 283 if (error instanceof DoNotRetryIOException) { 284 break; 285 } 286 error = error.getCause(); 287 } 288 } finally { 289 assertObjects(objs, batches.size()); 290 assertTrue(objs[0] instanceof IOException); 291 assertTrue(objs[1] instanceof IOException); 292 assertEquals(Result.class, objs[2].getClass()); 293 assertEquals(Result.class, objs[3].getClass()); 294 } 295 } 296 297 @Test 298 public void testRowMutations() throws InterruptedException, IOException { 299 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]); 300 List<Row> batches = new ArrayList<>(); 301 RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); 302 // the first and second puts will be group by regionserver 303 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 304 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 305 batches.add(mutations); 306 // this bm should succeed 307 mutations = new RowMutations(Bytes.toBytes("good")); 308 mutations.add(put); 309 mutations.add(put); 310 batches.add(mutations); 311 Object[] objs = new Object[batches.size()]; 312 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 313 table.batch(batches, objs); 314 fail("Where is the exception? We put the malformed cells!!!"); 315 } catch (RetriesExhaustedException e) { 316 Throwable error = e.getCause(); 317 for (;;) { 318 assertNotNull("Can not find a DoNotRetryIOException on stack trace", error); 319 if (error instanceof DoNotRetryIOException) { 320 break; 321 } 322 error = error.getCause(); 323 } 324 } finally { 325 assertObjects(objs, batches.size()); 326 assertTrue(objs[0] instanceof IOException); 327 assertEquals(Result.class, objs[1].getClass()); 328 } 329 } 330 331 private static void assertObjects(Object[] objs, int expectedSize) { 332 int count = 0; 333 for (Object obj : objs) { 334 assertNotNull(obj); 335 ++count; 336 } 337 assertEquals(expectedSize, count); 338 } 339}