001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.ipc.CallRunner;
040import org.apache.hadoop.hbase.ipc.PriorityFunction;
041import org.apache.hadoop.hbase.ipc.RpcScheduler;
042import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
043import org.apache.hadoop.hbase.regionserver.RSRpcServices;
044import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
045import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.After;
050import org.junit.AfterClass;
051import org.junit.Before;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056
057import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
058import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
059
060@Category({ MediumTests.class, ClientTests.class })
061public class TestAsyncClientPauseForCallQueueTooBig {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
066
067  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
068
069  private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
070
071  private static byte[] FAMILY = Bytes.toBytes("Family");
072
073  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
074
075  private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
076
077  private static AsyncConnection CONN;
078
079  private static boolean FAIL = false;
080
081  private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new ConcurrentHashMap<>();
082
083  public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
084
085    public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
086        int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
087        Abortable server, int highPriorityLevel) {
088      super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
089        metaTransitionHandler, priority, server, highPriorityLevel);
090    }
091
092    @Override
093    public boolean dispatch(CallRunner callTask) throws InterruptedException {
094      if (FAIL) {
095        MethodDescriptor method = callTask.getRpcCall().getMethod();
096        // this is for test scan, where we will send a open scanner first and then a next, and we
097        // expect that we hit CQTBE two times.
098        if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) {
099          return false;
100        }
101      }
102      return super.dispatch(callTask);
103    }
104  }
105
106  public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory {
107
108    @Override
109    public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
110      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
111        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
112      return new CQTBERpcScheduler(conf, handlerCount,
113        conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
114          HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
115        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
116          HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
117        conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
118          HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
119        priority, server, HConstants.QOS_THRESHOLD);
120    }
121
122  }
123
124  @BeforeClass
125  public static void setUp() throws Exception {
126    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
127    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
128      TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
129    UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
130      CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
131    UTIL.startMiniCluster(1);
132    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
133  }
134
135  @AfterClass
136  public static void tearDown() throws Exception {
137    Closeables.close(CONN, true);
138    UTIL.shutdownMiniCluster();
139  }
140
141  @Before
142  public void setUpBeforeTest() throws IOException {
143    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
144      for (int i = 0; i < 100; i++) {
145        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
146      }
147    }
148    FAIL = true;
149  }
150
151  @After
152  public void tearDownAfterTest() throws IOException {
153    FAIL = false;
154    INVOKED.clear();
155    UTIL.getAdmin().disableTable(TABLE_NAME);
156    UTIL.getAdmin().deleteTable(TABLE_NAME);
157  }
158
159  private void assertTime(Callable<Void> callable, long time) throws Exception {
160    long startNs = System.nanoTime();
161    callable.call();
162    long costNs = System.nanoTime() - startNs;
163    assertTrue(costNs > time);
164  }
165
166  @Test
167  public void testGet() throws Exception {
168    assertTime(() -> {
169      Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
170      assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
171      return null;
172    }, PAUSE_FOR_CQTBE_NS);
173  }
174
175  @Test
176  public void testBatch() throws Exception {
177    assertTime(() -> {
178      List<CompletableFuture<?>> futures = new ArrayList<>();
179      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
180        for (int i = 100; i < 110; i++) {
181          futures.add(mutator
182            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
183        }
184      }
185      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
186    }, PAUSE_FOR_CQTBE_NS);
187  }
188
189  @Test
190  public void testScan() throws Exception {
191    // we will hit CallQueueTooBigException two times so the sleep time should be twice
192    assertTime(() -> {
193      try (
194        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
195        for (int i = 0; i < 100; i++) {
196          Result result = scanner.next();
197          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
198        }
199        assertNull(scanner.next());
200      }
201      return null;
202    }, PAUSE_FOR_CQTBE_NS * 2);
203  }
204}