001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.concurrent.BlockingDeque;
030import java.util.concurrent.LinkedBlockingDeque;
031import java.util.stream.Stream;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.jupiter.api.AfterEach;
041import org.junit.jupiter.api.BeforeEach;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.TestTemplate;
044import org.junit.jupiter.params.provider.Arguments;
045import org.mockito.Mockito;
046import org.mockito.invocation.InvocationOnMock;
047import org.mockito.stubbing.Answer;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
050import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
051
052import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
057
058/**
059 * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
060 * via "Multi" commands) so classified as MediumTests
061 */
062@Tag(MediumTests.TAG)
063@HBaseParameterizedTestTemplate(name = "{index}: rejectLargeBatchOp={0}")
064public class TestMultiLogThreshold {
065
066  private static final TableName NAME = TableName.valueOf("tableName");
067  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
068
069  private HBaseTestingUtil util;
070  private Configuration conf;
071  private int threshold;
072  private HRegionServer rs;
073  private RSRpcServices services;
074
075  private org.apache.logging.log4j.core.Appender appender;
076
077  private final boolean rejectLargeBatchOp;
078
079  public static Stream<Arguments> parameters() {
080    return Stream.of(Arguments.of(false), Arguments.of(true));
081  }
082
083  public TestMultiLogThreshold(boolean rejectLargeBatchOp) {
084    this.rejectLargeBatchOp = rejectLargeBatchOp;
085  }
086
087  private final class LevelAndMessage {
088    final org.apache.logging.log4j.Level level;
089
090    final String msg;
091
092    public LevelAndMessage(org.apache.logging.log4j.Level level, String msg) {
093      this.level = level;
094      this.msg = msg;
095    }
096
097  }
098
099  // log4j2 will reuse the LogEvent so we need to copy the level and message out.
100  private BlockingDeque<LevelAndMessage> logs = new LinkedBlockingDeque<>();
101
102  @BeforeEach
103  public void setupTest() throws Exception {
104    util = new HBaseTestingUtil();
105    conf = util.getConfiguration();
106    threshold =
107      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
108    conf.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
109    util.startMiniCluster();
110    util.createTable(NAME, TEST_FAM);
111    rs = util.getRSForFirstRegionInTable(NAME);
112    appender = mock(org.apache.logging.log4j.core.Appender.class);
113    when(appender.getName()).thenReturn("mockAppender");
114    when(appender.isStarted()).thenReturn(true);
115    doAnswer(new Answer<Void>() {
116
117      @Override
118      public Void answer(InvocationOnMock invocation) throws Throwable {
119        org.apache.logging.log4j.core.LogEvent logEvent =
120          invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class);
121        logs.add(
122          new LevelAndMessage(logEvent.getLevel(), logEvent.getMessage().getFormattedMessage()));
123        return null;
124      }
125    }).when(appender).append(any(org.apache.logging.log4j.core.LogEvent.class));
126    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
127      .getLogger(RSRpcServices.class)).addAppender(appender);
128  }
129
130  @AfterEach
131  public void tearDown() throws Exception {
132    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
133      .getLogger(RSRpcServices.class)).removeAppender(appender);
134    util.shutdownMiniCluster();
135  }
136
137  private enum ActionType {
138    REGION_ACTIONS,
139    ACTIONS
140  }
141
142  /**
143   * Sends a multi request with a certain amount of rows, will populate Multi command with either
144   * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
145   * Actions
146   */
147  private void sendMultiRequest(int rows, ActionType actionType)
148    throws ServiceException, IOException {
149    RpcController rpcc = Mockito.mock(HBaseRpcController.class);
150    MultiRequest.Builder builder = MultiRequest.newBuilder();
151    int numRAs = 1;
152    int numAs = 1;
153    switch (actionType) {
154      case REGION_ACTIONS:
155        numRAs = rows;
156        break;
157      case ACTIONS:
158        numAs = rows;
159        break;
160    }
161    for (int i = 0; i < numRAs; i++) {
162      RegionAction.Builder rab = RegionAction.newBuilder();
163      rab.setRegion(RequestConverter.buildRegionSpecifier(
164        HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
165        Bytes.toBytes("someStuff" + i)));
166      for (int j = 0; j < numAs; j++) {
167        Action.Builder ab = Action.newBuilder();
168        rab.addAction(ab.build());
169      }
170      builder.addRegionAction(rab.build());
171    }
172    services = new RSRpcServices(rs);
173    services.multi(rpcc, builder.build());
174  }
175
176  private void assertLogBatchWarnings(boolean expected) {
177    boolean actual = false;
178    for (LevelAndMessage event : logs) {
179      if (
180        event.level == org.apache.logging.log4j.Level.WARN
181          && event.msg.contains("Large batch operation detected")
182      ) {
183        actual = true;
184        break;
185      }
186    }
187    logs.clear();
188    assertEquals(expected, actual);
189  }
190
191  @TestTemplate
192  public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
193    try {
194      sendMultiRequest(threshold + 1, ActionType.REGION_ACTIONS);
195      assertFalse(rejectLargeBatchOp);
196    } catch (ServiceException e) {
197      assertTrue(rejectLargeBatchOp);
198    }
199    assertLogBatchWarnings(true);
200
201    sendMultiRequest(threshold, ActionType.REGION_ACTIONS);
202    assertLogBatchWarnings(false);
203
204    try {
205      sendMultiRequest(threshold + 1, ActionType.ACTIONS);
206      assertFalse(rejectLargeBatchOp);
207    } catch (ServiceException e) {
208      assertTrue(rejectLargeBatchOp);
209    }
210    assertLogBatchWarnings(true);
211
212    sendMultiRequest(threshold, ActionType.ACTIONS);
213    assertLogBatchWarnings(false);
214  }
215}