001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.concurrent.BlockingDeque; 030import java.util.concurrent.LinkedBlockingDeque; 031import java.util.stream.Stream; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.junit.jupiter.api.AfterEach; 041import org.junit.jupiter.api.BeforeEach; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.TestTemplate; 044import org.junit.jupiter.params.provider.Arguments; 045import org.mockito.Mockito; 046import org.mockito.invocation.InvocationOnMock; 047import org.mockito.stubbing.Answer; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 050import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 051 052import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 057 058/** 059 * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test 060 * via "Multi" commands) so classified as MediumTests 061 */ 062@Tag(MediumTests.TAG) 063@HBaseParameterizedTestTemplate(name = "{index}: rejectLargeBatchOp={0}") 064public class TestMultiLogThreshold { 065 066 private static final TableName NAME = TableName.valueOf("tableName"); 067 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 068 069 private HBaseTestingUtil util; 070 private Configuration conf; 071 private int threshold; 072 private HRegionServer rs; 073 private RSRpcServices services; 074 075 private org.apache.logging.log4j.core.Appender appender; 076 077 private final boolean rejectLargeBatchOp; 078 079 public static Stream<Arguments> parameters() { 080 return Stream.of(Arguments.of(false), Arguments.of(true)); 081 } 082 083 public TestMultiLogThreshold(boolean rejectLargeBatchOp) { 084 this.rejectLargeBatchOp = rejectLargeBatchOp; 085 } 086 087 private final class LevelAndMessage { 088 final org.apache.logging.log4j.Level level; 089 090 final String msg; 091 092 public LevelAndMessage(org.apache.logging.log4j.Level level, String msg) { 093 this.level = level; 094 this.msg = msg; 095 } 096 097 } 098 099 // log4j2 will reuse the LogEvent so we need to copy the level and message out. 100 private BlockingDeque<LevelAndMessage> logs = new LinkedBlockingDeque<>(); 101 102 @BeforeEach 103 public void setupTest() throws Exception { 104 util = new HBaseTestingUtil(); 105 conf = util.getConfiguration(); 106 threshold = 107 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 108 conf.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp); 109 util.startMiniCluster(); 110 util.createTable(NAME, TEST_FAM); 111 rs = util.getRSForFirstRegionInTable(NAME); 112 appender = mock(org.apache.logging.log4j.core.Appender.class); 113 when(appender.getName()).thenReturn("mockAppender"); 114 when(appender.isStarted()).thenReturn(true); 115 doAnswer(new Answer<Void>() { 116 117 @Override 118 public Void answer(InvocationOnMock invocation) throws Throwable { 119 org.apache.logging.log4j.core.LogEvent logEvent = 120 invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class); 121 logs.add( 122 new LevelAndMessage(logEvent.getLevel(), logEvent.getMessage().getFormattedMessage())); 123 return null; 124 } 125 }).when(appender).append(any(org.apache.logging.log4j.core.LogEvent.class)); 126 ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 127 .getLogger(RSRpcServices.class)).addAppender(appender); 128 } 129 130 @AfterEach 131 public void tearDown() throws Exception { 132 ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 133 .getLogger(RSRpcServices.class)).removeAppender(appender); 134 util.shutdownMiniCluster(); 135 } 136 137 private enum ActionType { 138 REGION_ACTIONS, 139 ACTIONS 140 } 141 142 /** 143 * Sends a multi request with a certain amount of rows, will populate Multi command with either 144 * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of 145 * Actions 146 */ 147 private void sendMultiRequest(int rows, ActionType actionType) 148 throws ServiceException, IOException { 149 RpcController rpcc = Mockito.mock(HBaseRpcController.class); 150 MultiRequest.Builder builder = MultiRequest.newBuilder(); 151 int numRAs = 1; 152 int numAs = 1; 153 switch (actionType) { 154 case REGION_ACTIONS: 155 numRAs = rows; 156 break; 157 case ACTIONS: 158 numAs = rows; 159 break; 160 } 161 for (int i = 0; i < numRAs; i++) { 162 RegionAction.Builder rab = RegionAction.newBuilder(); 163 rab.setRegion(RequestConverter.buildRegionSpecifier( 164 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 165 Bytes.toBytes("someStuff" + i))); 166 for (int j = 0; j < numAs; j++) { 167 Action.Builder ab = Action.newBuilder(); 168 rab.addAction(ab.build()); 169 } 170 builder.addRegionAction(rab.build()); 171 } 172 services = new RSRpcServices(rs); 173 services.multi(rpcc, builder.build()); 174 } 175 176 private void assertLogBatchWarnings(boolean expected) { 177 boolean actual = false; 178 for (LevelAndMessage event : logs) { 179 if ( 180 event.level == org.apache.logging.log4j.Level.WARN 181 && event.msg.contains("Large batch operation detected") 182 ) { 183 actual = true; 184 break; 185 } 186 } 187 logs.clear(); 188 assertEquals(expected, actual); 189 } 190 191 @TestTemplate 192 public void testMultiLogThresholdRegionActions() throws ServiceException, IOException { 193 try { 194 sendMultiRequest(threshold + 1, ActionType.REGION_ACTIONS); 195 assertFalse(rejectLargeBatchOp); 196 } catch (ServiceException e) { 197 assertTrue(rejectLargeBatchOp); 198 } 199 assertLogBatchWarnings(true); 200 201 sendMultiRequest(threshold, ActionType.REGION_ACTIONS); 202 assertLogBatchWarnings(false); 203 204 try { 205 sendMultiRequest(threshold + 1, ActionType.ACTIONS); 206 assertFalse(rejectLargeBatchOp); 207 } catch (ServiceException e) { 208 assertTrue(rejectLargeBatchOp); 209 } 210 assertLogBatchWarnings(true); 211 212 sendMultiRequest(threshold, ActionType.ACTIONS); 213 assertLogBatchWarnings(false); 214 } 215}