001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutionException; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.ExtendedCell; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.PrivateCellUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.ipc.HBaseRpcController; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.jupiter.api.AfterAll; 047import org.junit.jupiter.api.AfterEach; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.mockito.Mockito; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 060 061/** 062 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly. 063 */ 064@Tag(MediumTests.TAG) 065@Tag(ClientTests.TAG) 066public class TestMalformedCellFromClient { 067 private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class); 068 069 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 070 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 071 private static final int CELL_SIZE = 100; 072 private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient"); 073 074 @BeforeAll 075 public static void setUpBeforeClass() throws Exception { 076 // disable the retry 077 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 078 TEST_UTIL.startMiniCluster(1); 079 } 080 081 @BeforeEach 082 public void before() throws Exception { 083 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 084 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 085 .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build(); 086 TEST_UTIL.getConnection().getAdmin().createTable(desc); 087 } 088 089 @AfterEach 090 public void tearDown() throws Exception { 091 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 092 TEST_UTIL.deleteTable(htd.getTableName()); 093 } 094 } 095 096 @AfterAll 097 public static void tearDownAfterClass() throws Exception { 098 TEST_UTIL.shutdownMiniCluster(); 099 } 100 101 /** 102 * The purpose of this ut is to check the consistency between the exception and results. If the 103 * RetriesExhaustedWithDetailsException contains the whole batch, each result should be of IOE. 104 * Otherwise, the row operation which is not in the exception should have a true result. 105 */ 106 @Test 107 public void testRegionException() throws InterruptedException, IOException { 108 List<Row> batches = new ArrayList<>(); 109 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 110 // the rm is used to prompt the region exception. 111 // see RSRpcServices#multi 112 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 113 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 114 batches.add(rm); 115 Object[] results = new Object[batches.size()]; 116 117 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 118 Throwable exceptionByCaught = null; 119 try { 120 table.batch(batches, results); 121 fail("Where is the exception? We put the malformed cells!!!"); 122 } catch (RetriesExhaustedException e) { 123 exceptionByCaught = e.getCause(); 124 } 125 for (Object obj : results) { 126 assertNotNull(obj); 127 } 128 assertEquals(Result.class, results[0].getClass()); 129 assertEquals(exceptionByCaught.getClass(), results[1].getClass()); 130 Result result = table.get(new Get(Bytes.toBytes("good"))); 131 assertEquals(1, result.size()); 132 Cell cell = result.getColumnLatestCell(FAMILY, null); 133 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 134 } 135 } 136 137 /** 138 * This test verifies region exception doesn't corrupt the results of batch. The prescription is 139 * shown below. 1) honor the action result rather than region exception. If the action have both 140 * of true result and region exception, the action is fine as the exception is caused by other 141 * actions which are in the same region. 2) honor the action exception rather than region 142 * exception. If the action have both of action exception and region exception, we deal with the 143 * action exception only. If we also handle the region exception for the same action, it will 144 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 145 * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each 146 * result should be of IOE. Otherwise, the row operation which is not in the exception should have 147 * a true result. The no-cluster test is in TestAsyncProcessWithRegionException. 148 */ 149 @Test 150 public void testRegionExceptionByAsync() throws Exception { 151 List<Row> batches = new ArrayList<>(); 152 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 153 // the rm is used to prompt the region exception. 154 // see RSRpcServices#multi 155 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 156 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 157 batches.add(rm); 158 try (AsyncConnection asyncConnection = 159 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 160 AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME); 161 List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches); 162 assertEquals(2, results.size()); 163 try { 164 results.get(1).get(); 165 fail("Where is the exception? We put the malformed cells!!!"); 166 } catch (ExecutionException e) { 167 // pass 168 } 169 Result result = table.get(new Get(Bytes.toBytes("good"))).get(); 170 assertEquals(1, result.size()); 171 Cell cell = result.getColumnLatestCell(FAMILY, null); 172 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 173 } 174 } 175 176 /** 177 * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed. 178 * Currently, we have no client api to submit the request consisting of condition-rm and mutation. 179 * Hence, this test build the request manually. 180 */ 181 @Test 182 public void testAtomicOperations() throws Exception { 183 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 184 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 185 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10])); 186 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]); 187 188 // build the request 189 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 190 ClientProtos.MultiRequest request = 191 ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName())) 192 .addRegionAction(ClientProtos.RegionAction.newBuilder() 193 .setRegion(RequestConverter.buildRegionSpecifier( 194 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 195 r.getRegionInfo().getRegionName())) 196 .addAction(ClientProtos.Action.newBuilder().setMutation( 197 ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put)))) 198 .build(); 199 200 List<ExtendedCell> cells = new ArrayList<>(); 201 for (Mutation m : rm.getMutations()) { 202 cells.addAll(m.getCellList(FAMILY)); 203 } 204 cells.addAll(put.getCellList(FAMILY)); 205 assertEquals(3, cells.size()); 206 HBaseRpcController controller = Mockito.mock(HBaseRpcController.class); 207 Mockito.when(controller.cellScanner()) 208 .thenReturn(PrivateCellUtil.createExtendedCellScanner(cells)); 209 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(TEST_UTIL 210 .getMiniHBaseCluster().getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName())); 211 212 ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request); 213 assertEquals(2, response.getRegionActionResultCount()); 214 assertTrue(response.getRegionActionResultList().get(0).hasException()); 215 assertFalse(response.getRegionActionResultList().get(1).hasException()); 216 assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount()); 217 assertTrue( 218 response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult()); 219 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 220 Result result = table.get(new Get(Bytes.toBytes("good"))); 221 assertEquals(1, result.size()); 222 Cell cell = result.getColumnLatestCell(FAMILY, null); 223 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 224 } 225 } 226 227 private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName) 228 throws IOException { 229 ClientProtos.RegionAction.Builder builder = RequestConverter 230 .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName); 231 builder.setAtomic(true); 232 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 233 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 234 ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null, 235 CompareOperator.EQUAL, new byte[10], null, null, false); 236 for (Mutation mutation : rm.getMutations()) { 237 ClientProtos.MutationProto.MutationType mutateType = null; 238 if (mutation instanceof Put) { 239 mutateType = ClientProtos.MutationProto.MutationType.PUT; 240 } else if (mutation instanceof Delete) { 241 mutateType = ClientProtos.MutationProto.MutationType.DELETE; 242 } else { 243 throw new DoNotRetryIOException( 244 "RowMutations supports only put and delete, not " + mutation.getClass().getName()); 245 } 246 mutationBuilder.clear(); 247 ClientProtos.MutationProto mp = 248 ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder); 249 actionBuilder.clear(); 250 actionBuilder.setMutation(mp); 251 builder.addAction(actionBuilder.build()); 252 } 253 ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder() 254 .addRegionAction(builder.setCondition(condition).build()).build(); 255 return request; 256 } 257 258 /** 259 * This test depends on how regionserver process the batch ops. 1) group the put/delete until 260 * meeting the increment 2) process the batch of put/delete 3) process the increment see 261 * RSRpcServices#doNonAtomicRegionMutation 262 */ 263 @Test 264 public void testNonAtomicOperations() throws InterruptedException, IOException { 265 Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100); 266 List<Row> batches = new ArrayList<>(); 267 // the first and second puts will be group by regionserver 268 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 269 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 270 // this Increment should succeed 271 batches.add(inc); 272 // this put should succeed 273 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1])); 274 Object[] objs = new Object[batches.size()]; 275 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 276 table.batch(batches, objs); 277 fail("Where is the exception? We put the malformed cells!!!"); 278 } catch (RetriesExhaustedException e) { 279 Throwable error = e.getCause(); 280 for (;;) { 281 assertNotNull(error, "Can not find a DoNotRetryIOException on stack trace"); 282 if (error instanceof DoNotRetryIOException) { 283 break; 284 } 285 error = error.getCause(); 286 } 287 } finally { 288 assertObjects(objs, batches.size()); 289 assertTrue(objs[0] instanceof IOException); 290 assertTrue(objs[1] instanceof IOException); 291 assertEquals(Result.class, objs[2].getClass()); 292 assertEquals(Result.class, objs[3].getClass()); 293 } 294 } 295 296 @Test 297 public void testRowMutations() throws InterruptedException, IOException { 298 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]); 299 List<Row> batches = new ArrayList<>(); 300 RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); 301 // the first and second puts will be group by regionserver 302 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 303 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 304 batches.add(mutations); 305 // this bm should succeed 306 mutations = new RowMutations(Bytes.toBytes("good")); 307 mutations.add(put); 308 mutations.add(put); 309 batches.add(mutations); 310 Object[] objs = new Object[batches.size()]; 311 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 312 table.batch(batches, objs); 313 fail("Where is the exception? We put the malformed cells!!!"); 314 } catch (RetriesExhaustedException e) { 315 Throwable error = e.getCause(); 316 for (;;) { 317 assertNotNull(error, "Can not find a DoNotRetryIOException on stack trace"); 318 if (error instanceof DoNotRetryIOException) { 319 break; 320 } 321 error = error.getCause(); 322 } 323 } finally { 324 assertObjects(objs, batches.size()); 325 assertTrue(objs[0] instanceof IOException); 326 assertEquals(Result.class, objs[1].getClass()); 327 } 328 } 329 330 private static void assertObjects(Object[] objs, int expectedSize) { 331 int count = 0; 332 for (Object obj : objs) { 333 assertNotNull(obj); 334 ++count; 335 } 336 assertEquals(expectedSize, count); 337 } 338}