001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED;
021import static org.junit.jupiter.api.Assertions.assertArrayEquals;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.Callable;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.ipc.CallRunner;
040import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
041import org.apache.hadoop.hbase.ipc.PriorityFunction;
042import org.apache.hadoop.hbase.ipc.RpcScheduler;
043import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
044import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
045import org.apache.hadoop.hbase.regionserver.RSRpcServices;
046import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
047import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
048import org.apache.hadoop.hbase.testclassification.ClientTests;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.junit.jupiter.api.AfterAll;
052import org.junit.jupiter.api.AfterEach;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.Test;
057
058import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
059import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
060
061@Tag(MediumTests.TAG)
062@Tag(ClientTests.TAG)
063public class TestAsyncClientPauseForServerOverloaded {
064
065  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
066
067  private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded");
068
069  private static byte[] FAMILY = Bytes.toBytes("Family");
070
071  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
072
073  private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1);
074  private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS =
075    TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_SERVER_OVERLOADED_NANOS);
076
077  private static AsyncConnection CONN;
078
079  private static volatile FailMode MODE = null;
080
081  enum FailMode {
082    CALL_QUEUE_TOO_BIG,
083    CALL_DROPPED;
084
085    private ConcurrentMap<MethodDescriptor, AtomicInteger> invoked = new ConcurrentHashMap<>();
086
087    // this is for test scan, where we will send a open scanner first and then a next, and we
088    // expect that we hit CQTBE two times.
089    private boolean shouldFail(CallRunner callRunner) {
090      MethodDescriptor method = callRunner.getRpcCall().getMethod();
091      return invoked.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0;
092    }
093  }
094
095  public static final class OverloadedRpcScheduler extends SimpleRpcScheduler {
096
097    public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
098      int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
099      Abortable server, int highPriorityLevel) {
100      super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
101        metaTransitionHandler, priority, server, highPriorityLevel);
102    }
103
104    @Override
105    public boolean dispatch(CallRunner callTask) {
106      if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) {
107        return false;
108      }
109      return super.dispatch(callTask);
110    }
111  }
112
113  public static final class OverloadedQueue extends TestPluggableQueueImpl {
114
115    public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) {
116      super(maxQueueLength, priority, conf);
117    }
118
119    @Override
120    public boolean offer(CallRunner callRunner) {
121      if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) {
122        callRunner.drop();
123        return true;
124      }
125      return super.offer(callRunner);
126    }
127  }
128
129  public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory {
130
131    @Override
132    public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
133      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
134        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
135      return new OverloadedRpcScheduler(conf, handlerCount,
136        conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
137          HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
138        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
139          HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
140        conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
141          HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
142        priority, server, HConstants.QOS_THRESHOLD);
143    }
144
145  }
146
147  @BeforeAll
148  public static void setUp() throws Exception {
149    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
150    UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
151    UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
152      OverloadedQueue.class, PluggableBlockingQueue.class);
153    UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
154      OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class);
155    UTIL.startMiniCluster(1);
156
157    Configuration conf = new Configuration(UTIL.getConfiguration());
158    conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS);
159    CONN = ConnectionFactory.createAsyncConnection(conf).get();
160  }
161
162  @AfterAll
163  public static void tearDown() throws Exception {
164    Closeables.close(CONN, true);
165    UTIL.shutdownMiniCluster();
166  }
167
168  @BeforeEach
169  public void setUpBeforeTest() throws IOException {
170    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
171      for (int i = 0; i < 100; i++) {
172        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
173      }
174    }
175    MODE = FailMode.CALL_QUEUE_TOO_BIG;
176  }
177
178  @AfterEach
179  public void tearDownAfterTest() throws IOException {
180    for (FailMode mode : FailMode.values()) {
181      mode.invoked.clear();
182    }
183    MODE = null;
184    UTIL.getAdmin().disableTable(TABLE_NAME);
185    UTIL.getAdmin().deleteTable(TABLE_NAME);
186  }
187
188  private void assertTime(Callable<Void> callable, long time) throws Exception {
189    for (FailMode mode : FailMode.values()) {
190      MODE = mode;
191
192      long startNs = System.nanoTime();
193      callable.call();
194      long costNs = System.nanoTime() - startNs;
195      assertTrue(costNs > time);
196    }
197  }
198
199  @Test
200  public void testGet() throws Exception {
201    assertTime(() -> {
202      Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
203      assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
204      return null;
205    }, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
206  }
207
208  @Test
209  public void testBatch() throws Exception {
210    assertTime(() -> {
211      List<CompletableFuture<?>> futures = new ArrayList<>();
212      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
213        for (int i = 100; i < 110; i++) {
214          futures.add(mutator
215            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
216        }
217      }
218      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
219    }, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
220  }
221
222  @Test
223  public void testScan() throws Exception {
224    // we will hit CallQueueTooBigException two times so the sleep time should be twice
225    assertTime(() -> {
226      try (
227        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
228        for (int i = 0; i < 100; i++) {
229          Result result = scanner.next();
230          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
231        }
232        assertNull(scanner.next());
233      }
234      return null;
235    }, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2);
236  }
237}