001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.Arrays;
030import java.util.List;
031import java.util.concurrent.BlockingDeque;
032import java.util.concurrent.LinkedBlockingDeque;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.After;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.runner.RunWith;
047import org.junit.runners.Parameterized;
048import org.mockito.Mockito;
049import org.mockito.invocation.InvocationOnMock;
050import org.mockito.stubbing.Answer;
051
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
054
055import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
060
061/**
062 * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
063 * via "Multi" commands) so classified as MediumTests
064 */
065@RunWith(Parameterized.class)
066@Category(MediumTests.class)
067public class TestMultiLogThreshold {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestMultiLogThreshold.class);
072
073  private static final TableName NAME = TableName.valueOf("tableName");
074  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
075
076  private HBaseTestingUtility util;
077  private Configuration conf;
078  private int threshold;
079  private HRegionServer rs;
080  private RSRpcServices services;
081
082  private org.apache.logging.log4j.core.Appender appender;
083
084  @Parameterized.Parameter
085  public static boolean rejectLargeBatchOp;
086
087  @Parameterized.Parameters
088  public static List<Object[]> params() {
089    return Arrays.asList(new Object[] { false }, new Object[] { true });
090  }
091
092  private final class LevelAndMessage {
093    final org.apache.logging.log4j.Level level;
094
095    final String msg;
096
097    public LevelAndMessage(org.apache.logging.log4j.Level level, String msg) {
098      this.level = level;
099      this.msg = msg;
100    }
101
102  }
103
104  // log4j2 will reuse the LogEvent so we need to copy the level and message out.
105  private BlockingDeque<LevelAndMessage> logs = new LinkedBlockingDeque<>();
106
107  @Before
108  public void setupTest() throws Exception {
109    util = new HBaseTestingUtility();
110    conf = util.getConfiguration();
111    threshold =
112      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
113    conf.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
114    util.startMiniCluster();
115    util.createTable(NAME, TEST_FAM);
116    rs = util.getRSForFirstRegionInTable(NAME);
117    appender = mock(org.apache.logging.log4j.core.Appender.class);
118    when(appender.getName()).thenReturn("mockAppender");
119    when(appender.isStarted()).thenReturn(true);
120    doAnswer(new Answer<Void>() {
121
122      @Override
123      public Void answer(InvocationOnMock invocation) throws Throwable {
124        org.apache.logging.log4j.core.LogEvent logEvent =
125          invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class);
126        logs.add(
127          new LevelAndMessage(logEvent.getLevel(), logEvent.getMessage().getFormattedMessage()));
128        return null;
129      }
130    }).when(appender).append(any(org.apache.logging.log4j.core.LogEvent.class));
131    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
132      .getLogger(RSRpcServices.class)).addAppender(appender);
133  }
134
135  @After
136  public void tearDown() throws Exception {
137    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
138      .getLogger(RSRpcServices.class)).removeAppender(appender);
139    util.shutdownMiniCluster();
140  }
141
142  private enum ActionType {
143    REGION_ACTIONS,
144    ACTIONS;
145  }
146
147  /**
148   * Sends a multi request with a certain amount of rows, will populate Multi command with either
149   * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
150   * Actions
151   */
152  private void sendMultiRequest(int rows, ActionType actionType)
153    throws ServiceException, IOException {
154    RpcController rpcc = Mockito.mock(HBaseRpcController.class);
155    MultiRequest.Builder builder = MultiRequest.newBuilder();
156    int numRAs = 1;
157    int numAs = 1;
158    switch (actionType) {
159      case REGION_ACTIONS:
160        numRAs = rows;
161        break;
162      case ACTIONS:
163        numAs = rows;
164        break;
165    }
166    for (int i = 0; i < numRAs; i++) {
167      RegionAction.Builder rab = RegionAction.newBuilder();
168      rab.setRegion(RequestConverter.buildRegionSpecifier(
169        HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
170        new String("someStuff" + i).getBytes()));
171      for (int j = 0; j < numAs; j++) {
172        Action.Builder ab = Action.newBuilder();
173        rab.addAction(ab.build());
174      }
175      builder.addRegionAction(rab.build());
176    }
177    services = new RSRpcServices(rs);
178    services.multi(rpcc, builder.build());
179  }
180
181  private void assertLogBatchWarnings(boolean expected) {
182    assertFalse(logs.isEmpty());
183    boolean actual = false;
184    for (LevelAndMessage event : logs) {
185      if (
186        event.level == org.apache.logging.log4j.Level.WARN
187          && event.msg.contains("Large batch operation detected")
188      ) {
189        actual = true;
190        break;
191      }
192    }
193    logs.clear();
194    assertEquals(expected, actual);
195  }
196
197  @Test
198  public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
199    try {
200      sendMultiRequest(threshold + 1, ActionType.REGION_ACTIONS);
201      assertFalse(rejectLargeBatchOp);
202    } catch (ServiceException e) {
203      assertTrue(rejectLargeBatchOp);
204    }
205    assertLogBatchWarnings(true);
206
207    sendMultiRequest(threshold, ActionType.REGION_ACTIONS);
208    assertLogBatchWarnings(false);
209
210    try {
211      sendMultiRequest(threshold + 1, ActionType.ACTIONS);
212      assertFalse(rejectLargeBatchOp);
213    } catch (ServiceException e) {
214      assertTrue(rejectLargeBatchOp);
215    }
216    assertLogBatchWarnings(true);
217
218    sendMultiRequest(threshold, ActionType.ACTIONS);
219    assertLogBatchWarnings(false);
220  }
221}