001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.concurrent.BrokenBarrierException;
032import java.util.concurrent.CyclicBarrier;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.Assert;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ ClientTests.class, SmallTests.class })
053public class TestSimpleRequestController {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestSimpleRequestController.class);
058
059  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
060  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
061  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
062  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
063  private static final ServerName SN = ServerName.valueOf("s1,1,1");
064  private static final ServerName SN2 = ServerName.valueOf("s2,2,2");
065  private static final HRegionInfo HRI1 =
066    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
067  private static final HRegionInfo HRI2 =
068    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
069  private static final HRegionInfo HRI3 =
070    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
071  private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
072  private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
073  private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
074
075  @Test
076  public void testIllegalRequestHeapSize() {
077    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
078  }
079
080  @Test
081  public void testIllegalRsTasks() {
082    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
083  }
084
085  @Test
086  public void testIllegalRegionTasks() {
087    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
088  }
089
090  @Test
091  public void testIllegalSubmittedSize() {
092    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
093  }
094
095  @Test
096  public void testIllegalRequestRows() {
097    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
098  }
099
100  private void testIllegalArgument(String key, long value) {
101    Configuration conf = HBaseConfiguration.create();
102    conf.setLong(key, value);
103    try {
104      new SimpleRequestController(conf);
105      fail("The " + key + " must be bigger than zero");
106    } catch (IllegalArgumentException e) {
107      // Expected
108    }
109  }
110
111  private static Put createPut(long maxHeapSizePerRequest) {
112    return new Put(Bytes.toBytes("row")) {
113      @Override
114      public long heapSize() {
115        return maxHeapSizePerRequest;
116      }
117    };
118  }
119
120  @Test
121  public void testTaskCheckerHost() throws IOException {
122    final int maxTotalConcurrentTasks = 100;
123    final int maxConcurrentTasksPerServer = 2;
124    final int maxConcurrentTasksPerRegion = 1;
125    final AtomicLong tasksInProgress = new AtomicLong(0);
126    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
127    final Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR);
128    SimpleRequestController.TaskCountChecker countChecker =
129      new SimpleRequestController.TaskCountChecker(maxTotalConcurrentTasks,
130        maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, tasksInProgress,
131        taskCounterPerServer, taskCounterPerRegion);
132    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
133    // unlimiited
134    SimpleRequestController.RequestHeapSizeChecker sizeChecker =
135      new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
136    RequestController.Checker checker =
137      SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
138    ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
139    assertEquals(ReturnCode.INCLUDE, loc1Code);
140
141    ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
142    // rejected for size
143    assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
144
145    ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
146    // rejected for size
147    assertNotEquals(ReturnCode.INCLUDE, loc2Code);
148
149    // fill the task slots for LOC3.
150    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
151    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
152
153    ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
154    // rejected for count
155    assertNotEquals(ReturnCode.INCLUDE, loc3Code);
156
157    // release the task slots for LOC3.
158    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
159    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
160
161    ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
162    assertEquals(ReturnCode.INCLUDE, loc3Code_2);
163  }
164
165  @Test
166  public void testRequestHeapSizeChecker() throws IOException {
167    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
168    SimpleRequestController.RequestHeapSizeChecker checker =
169      new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
170
171    // inner state is unchanged.
172    for (int i = 0; i != 10; ++i) {
173      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
174      assertEquals(ReturnCode.INCLUDE, code);
175      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
176      assertEquals(ReturnCode.INCLUDE, code);
177    }
178
179    // accept the data located on LOC1 region.
180    ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
181    assertEquals(ReturnCode.INCLUDE, acceptCode);
182    checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
183
184    // the sn server reachs the limit.
185    for (int i = 0; i != 10; ++i) {
186      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
187      assertNotEquals(ReturnCode.INCLUDE, code);
188      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
189      assertNotEquals(ReturnCode.INCLUDE, code);
190    }
191
192    // the request to sn2 server should be accepted.
193    for (int i = 0; i != 10; ++i) {
194      ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
195      assertEquals(ReturnCode.INCLUDE, code);
196    }
197
198    checker.reset();
199    for (int i = 0; i != 10; ++i) {
200      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
201      assertEquals(ReturnCode.INCLUDE, code);
202      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
203      assertEquals(ReturnCode.INCLUDE, code);
204    }
205  }
206
207  @Test
208  public void testRequestRowsChecker() throws IOException {
209    final long maxRowCount = 100;
210    SimpleRequestController.RequestRowsChecker checker =
211      new SimpleRequestController.RequestRowsChecker(maxRowCount);
212
213    final long heapSizeOfRow = 100; // unused
214    // inner state is unchanged.
215    for (int i = 0; i != 10; ++i) {
216      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
217      assertEquals(ReturnCode.INCLUDE, code);
218      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
219      assertEquals(ReturnCode.INCLUDE, code);
220    }
221
222    // accept the data located on LOC1 region.
223    for (int i = 0; i != maxRowCount; ++i) {
224      ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
225      assertEquals(ReturnCode.INCLUDE, acceptCode);
226      checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
227    }
228
229    // the sn server reachs the limit.
230    for (int i = 0; i != 10; ++i) {
231      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
232      assertNotEquals(ReturnCode.INCLUDE, code);
233      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
234      assertNotEquals(ReturnCode.INCLUDE, code);
235    }
236
237    // the request to sn2 server should be accepted.
238    for (int i = 0; i != 10; ++i) {
239      ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
240      assertEquals(ReturnCode.INCLUDE, code);
241    }
242
243    checker.reset();
244    for (int i = 0; i != 10; ++i) {
245      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
246      assertEquals(ReturnCode.INCLUDE, code);
247      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
248      assertEquals(ReturnCode.INCLUDE, code);
249    }
250  }
251
252  @Test
253  public void testSubmittedSizeChecker() {
254    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
255    SimpleRequestController.SubmittedSizeChecker checker =
256      new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
257
258    for (int i = 0; i != 10; ++i) {
259      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
260      assertEquals(ReturnCode.INCLUDE, include);
261    }
262
263    for (int i = 0; i != 10; ++i) {
264      checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
265    }
266
267    for (int i = 0; i != 10; ++i) {
268      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
269      assertEquals(ReturnCode.END, include);
270    }
271    for (int i = 0; i != 10; ++i) {
272      ReturnCode include = checker.canTakeOperation(LOC2, 100000);
273      assertEquals(ReturnCode.END, include);
274    }
275    checker.reset();
276    for (int i = 0; i != 10; ++i) {
277      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
278      assertEquals(ReturnCode.INCLUDE, include);
279    }
280  }
281
282  @Test
283  public void testTaskCountChecker() throws InterruptedIOException {
284    long heapSizeOfRow = 12345;
285    int maxTotalConcurrentTasks = 100;
286    int maxConcurrentTasksPerServer = 2;
287    int maxConcurrentTasksPerRegion = 1;
288    AtomicLong tasksInProgress = new AtomicLong(0);
289    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
290    Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR);
291    SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
292      maxTotalConcurrentTasks, maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion,
293      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
294
295    // inner state is unchanged.
296    for (int i = 0; i != 10; ++i) {
297      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
298      assertEquals(ReturnCode.INCLUDE, code);
299    }
300    // add LOC1 region.
301    ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
302    assertEquals(ReturnCode.INCLUDE, code);
303    checker.notifyFinal(code, LOC1, heapSizeOfRow);
304
305    // fill the task slots for LOC1.
306    taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100));
307    taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
308
309    // the region was previously accepted, so it must be accpted now.
310    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
311      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
312      assertEquals(ReturnCode.INCLUDE, includeCode);
313      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
314    }
315
316    // fill the task slots for LOC3.
317    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
318    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
319
320    // no task slots.
321    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
322      ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
323      assertNotEquals(ReturnCode.INCLUDE, excludeCode);
324      checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
325    }
326
327    // release the tasks for LOC3.
328    taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
329    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
330
331    // add LOC3 region.
332    ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
333    assertEquals(ReturnCode.INCLUDE, code3);
334    checker.notifyFinal(code3, LOC3, heapSizeOfRow);
335
336    // the region was previously accepted, so it must be accpted now.
337    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
338      ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
339      assertEquals(ReturnCode.INCLUDE, includeCode);
340      checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
341    }
342
343    checker.reset();
344    // the region was previously accepted,
345    // but checker have reseted and task slots for LOC1 is full.
346    // So it must be rejected now.
347    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
348      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
349      assertNotEquals(ReturnCode.INCLUDE, includeCode);
350      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
351    }
352  }
353
354  @Test
355  public void testWaitForMaximumCurrentTasks() throws Exception {
356    final AtomicInteger max = new AtomicInteger(0);
357    final CyclicBarrier barrier = new CyclicBarrier(2);
358    SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
359    final AtomicLong tasks = controller.tasksInProgress;
360    Runnable runnable = () -> {
361      try {
362        barrier.await();
363        controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
364      } catch (InterruptedIOException | InterruptedException | BrokenBarrierException e) {
365        Assert.fail(e.getMessage());
366      }
367    };
368    // First test that our runnable thread only exits when tasks is zero.
369    Thread t = new Thread(runnable);
370    t.start();
371    barrier.await();
372    t.join();
373    // Now assert we stay running if max == zero and tasks is > 0.
374    barrier.reset();
375    tasks.set(1000000);
376    t = new Thread(runnable);
377    t.start();
378    barrier.await();
379    while (tasks.get() > 0) {
380      assertTrue(t.isAlive());
381      tasks.set(tasks.get() - 1);
382    }
383    t.join();
384  }
385}