001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.Arrays;
030import java.util.List;
031import java.util.concurrent.BlockingDeque;
032import java.util.concurrent.LinkedBlockingDeque;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.After;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.runner.RunWith;
047import org.junit.runners.Parameterized;
048import org.mockito.Mockito;
049import org.mockito.invocation.InvocationOnMock;
050import org.mockito.stubbing.Answer;
051
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
054
055import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
060
061/**
062 * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
063 * via "Multi" commands) so classified as MediumTests
064 */
065@RunWith(Parameterized.class)
066@Category(MediumTests.class)
067public class TestMultiLogThreshold {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestMultiLogThreshold.class);
072
073  private static final TableName NAME = TableName.valueOf("tableName");
074  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
075
076  private HBaseTestingUtil util;
077  private Configuration conf;
078  private int threshold;
079  private HRegionServer rs;
080  private RSRpcServices services;
081
082  private org.apache.logging.log4j.core.Appender appender;
083
084  @Parameterized.Parameter
085  public static boolean rejectLargeBatchOp;
086
087  @Parameterized.Parameters
088  public static List<Object[]> params() {
089    return Arrays.asList(new Object[] { false }, new Object[] { true });
090  }
091
092  private final class LevelAndMessage {
093    final org.apache.logging.log4j.Level level;
094
095    final String msg;
096
097    public LevelAndMessage(org.apache.logging.log4j.Level level, String msg) {
098      this.level = level;
099      this.msg = msg;
100    }
101
102  }
103
104  // log4j2 will reuse the LogEvent so we need to copy the level and message out.
105  private BlockingDeque<LevelAndMessage> logs = new LinkedBlockingDeque<>();
106
107  @Before
108  public void setupTest() throws Exception {
109    util = new HBaseTestingUtil();
110    conf = util.getConfiguration();
111    threshold =
112      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
113    conf.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
114    util.startMiniCluster();
115    util.createTable(NAME, TEST_FAM);
116    rs = util.getRSForFirstRegionInTable(NAME);
117    appender = mock(org.apache.logging.log4j.core.Appender.class);
118    when(appender.getName()).thenReturn("mockAppender");
119    when(appender.isStarted()).thenReturn(true);
120    doAnswer(new Answer<Void>() {
121
122      @Override
123      public Void answer(InvocationOnMock invocation) throws Throwable {
124        org.apache.logging.log4j.core.LogEvent logEvent =
125          invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class);
126        logs.add(
127          new LevelAndMessage(logEvent.getLevel(), logEvent.getMessage().getFormattedMessage()));
128        return null;
129      }
130    }).when(appender).append(any(org.apache.logging.log4j.core.LogEvent.class));
131    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
132      .getLogger(RSRpcServices.class)).addAppender(appender);
133  }
134
135  @After
136  public void tearDown() throws Exception {
137    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
138      .getLogger(RSRpcServices.class)).removeAppender(appender);
139    util.shutdownMiniCluster();
140  }
141
142  private enum ActionType {
143    REGION_ACTIONS, ACTIONS
144  }
145
146  /**
147   * Sends a multi request with a certain amount of rows, will populate Multi command with either
148   * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
149   * Actions
150   */
151  private void sendMultiRequest(int rows, ActionType actionType)
152    throws ServiceException, IOException {
153    RpcController rpcc = Mockito.mock(HBaseRpcController.class);
154    MultiRequest.Builder builder = MultiRequest.newBuilder();
155    int numRAs = 1;
156    int numAs = 1;
157    switch (actionType) {
158      case REGION_ACTIONS:
159        numRAs = rows;
160        break;
161      case ACTIONS:
162        numAs = rows;
163        break;
164    }
165    for (int i = 0; i < numRAs; i++) {
166      RegionAction.Builder rab = RegionAction.newBuilder();
167      rab.setRegion(RequestConverter.buildRegionSpecifier(
168        HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
169        Bytes.toBytes("someStuff" + i)));
170      for (int j = 0; j < numAs; j++) {
171        Action.Builder ab = Action.newBuilder();
172        rab.addAction(ab.build());
173      }
174      builder.addRegionAction(rab.build());
175    }
176    services = new RSRpcServices(rs);
177    services.multi(rpcc, builder.build());
178  }
179
180  private void assertLogBatchWarnings(boolean expected) {
181    boolean actual = false;
182    for (LevelAndMessage event : logs) {
183      if (event.level == org.apache.logging.log4j.Level.WARN &&
184        event.msg.contains("Large batch operation detected")) {
185        actual = true;
186        break;
187      }
188    }
189    logs.clear();
190    assertEquals(expected, actual);
191  }
192
193  @Test
194  public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
195    try {
196      sendMultiRequest(threshold + 1, ActionType.REGION_ACTIONS);
197      assertFalse(rejectLargeBatchOp);
198    } catch (ServiceException e) {
199      assertTrue(rejectLargeBatchOp);
200    }
201    assertLogBatchWarnings(true);
202
203    sendMultiRequest(threshold, ActionType.REGION_ACTIONS);
204    assertLogBatchWarnings(false);
205
206    try {
207      sendMultiRequest(threshold + 1, ActionType.ACTIONS);
208      assertFalse(rejectLargeBatchOp);
209    } catch (ServiceException e) {
210      assertTrue(rejectLargeBatchOp);
211    }
212    assertLogBatchWarnings(true);
213
214    sendMultiRequest(threshold, ActionType.ACTIONS);
215    assertLogBatchWarnings(false);
216  }
217}