001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.Callable;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.ipc.CallRunner;
041import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
042import org.apache.hadoop.hbase.ipc.PriorityFunction;
043import org.apache.hadoop.hbase.ipc.RpcScheduler;
044import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
045import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
046import org.apache.hadoop.hbase.regionserver.RSRpcServices;
047import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
048import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.junit.After;
053import org.junit.AfterClass;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059
060import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
061import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
062
063@Category({ MediumTests.class, ClientTests.class })
064public class TestAsyncClientPauseForServerOverloaded {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class);
069
070  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
071
072  private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded");
073
074  private static byte[] FAMILY = Bytes.toBytes("Family");
075
076  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
077
078  private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1);
079  private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS =
080    TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_SERVER_OVERLOADED_NANOS);
081
082  private static AsyncConnection CONN;
083
084  private static volatile FailMode MODE = null;
085
086  enum FailMode {
087    CALL_QUEUE_TOO_BIG,
088    CALL_DROPPED;
089
090    private ConcurrentMap<MethodDescriptor, AtomicInteger> invoked = new ConcurrentHashMap<>();
091
092    // this is for test scan, where we will send a open scanner first and then a next, and we
093    // expect that we hit CQTBE two times.
094    private boolean shouldFail(CallRunner callRunner) {
095      MethodDescriptor method = callRunner.getRpcCall().getMethod();
096      return invoked.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0;
097    }
098  }
099
100  public static final class OverloadedRpcScheduler extends SimpleRpcScheduler {
101
102    public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
103      int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
104      Abortable server, int highPriorityLevel) {
105      super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
106        metaTransitionHandler, priority, server, highPriorityLevel);
107    }
108
109    @Override
110    public boolean dispatch(CallRunner callTask) {
111      if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) {
112        return false;
113      }
114      return super.dispatch(callTask);
115    }
116  }
117
118  public static final class OverloadedQueue extends TestPluggableQueueImpl {
119
120    public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) {
121      super(maxQueueLength, priority, conf);
122    }
123
124    @Override
125    public boolean offer(CallRunner callRunner) {
126      if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) {
127        callRunner.drop();
128        return true;
129      }
130      return super.offer(callRunner);
131    }
132  }
133
134  public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory {
135
136    @Override
137    public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
138      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
139        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
140      return new OverloadedRpcScheduler(conf, handlerCount,
141        conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
142          HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
143        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
144          HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
145        conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
146          HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
147        priority, server, HConstants.QOS_THRESHOLD);
148    }
149
150  }
151
152  @BeforeClass
153  public static void setUp() throws Exception {
154    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
155    UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
156    UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
157      OverloadedQueue.class, PluggableBlockingQueue.class);
158    UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
159      OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class);
160    UTIL.startMiniCluster(1);
161
162    Configuration conf = new Configuration(UTIL.getConfiguration());
163    conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS);
164    CONN = ConnectionFactory.createAsyncConnection(conf).get();
165  }
166
167  @AfterClass
168  public static void tearDown() throws Exception {
169    Closeables.close(CONN, true);
170    UTIL.shutdownMiniCluster();
171  }
172
173  @Before
174  public void setUpBeforeTest() throws IOException {
175    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
176      for (int i = 0; i < 100; i++) {
177        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
178      }
179    }
180    MODE = FailMode.CALL_QUEUE_TOO_BIG;
181  }
182
183  @After
184  public void tearDownAfterTest() throws IOException {
185    for (FailMode mode : FailMode.values()) {
186      mode.invoked.clear();
187    }
188    MODE = null;
189    UTIL.getAdmin().disableTable(TABLE_NAME);
190    UTIL.getAdmin().deleteTable(TABLE_NAME);
191  }
192
193  private void assertTime(Callable<Void> callable, long time) throws Exception {
194    for (FailMode mode : FailMode.values()) {
195      MODE = mode;
196
197      long startNs = System.nanoTime();
198      callable.call();
199      long costNs = System.nanoTime() - startNs;
200      assertTrue(costNs > time);
201    }
202  }
203
204  @Test
205  public void testGet() throws Exception {
206    assertTime(() -> {
207      Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
208      assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
209      return null;
210    }, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
211  }
212
213  @Test
214  public void testBatch() throws Exception {
215    assertTime(() -> {
216      List<CompletableFuture<?>> futures = new ArrayList<>();
217      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
218        for (int i = 100; i < 110; i++) {
219          futures.add(mutator
220            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
221        }
222      }
223      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
224    }, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
225  }
226
227  @Test
228  public void testScan() throws Exception {
229    // we will hit CallQueueTooBigException two times so the sleep time should be twice
230    assertTime(() -> {
231      try (
232        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
233        for (int i = 0; i < 100; i++) {
234          Result result = scanner.next();
235          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
236        }
237        assertNull(scanner.next());
238      }
239      return null;
240    }, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2);
241  }
242}