001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.concurrent.BrokenBarrierException;
032import java.util.concurrent.CyclicBarrier;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.SmallTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.Assert;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051@Category({ ClientTests.class, SmallTests.class })
052public class TestSimpleRequestController {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestSimpleRequestController.class);
057
058  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
059  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
060  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
061  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
062  private static final ServerName SN = ServerName.valueOf("s1,1,1");
063  private static final ServerName SN2 = ServerName.valueOf("s2,2,2");
064  private static final RegionInfo HRI1 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
065    .setStartKey(DUMMY_BYTES_1).setEndKey(DUMMY_BYTES_2).setRegionId(1).build();
066  private static final RegionInfo HRI2 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
067    .setStartKey(DUMMY_BYTES_2).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(2).build();
068  private static final RegionInfo HRI3 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
069    .setStartKey(DUMMY_BYTES_3).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(3).build();
070  private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
071  private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
072  private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
073
074  @Test
075  public void testIllegalRequestHeapSize() {
076    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
077  }
078
079  @Test
080  public void testIllegalRsTasks() {
081    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
082  }
083
084  @Test
085  public void testIllegalRegionTasks() {
086    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
087  }
088
089  @Test
090  public void testIllegalSubmittedSize() {
091    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
092  }
093
094  @Test
095  public void testIllegalRequestRows() {
096    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
097  }
098
099  private void testIllegalArgument(String key, long value) {
100    Configuration conf = HBaseConfiguration.create();
101    conf.setLong(key, value);
102    try {
103      new SimpleRequestController(conf);
104      fail("The " + key + " must be bigger than zero");
105    } catch (IllegalArgumentException e) {
106      // Expected
107    }
108  }
109
110  private static Put createPut(long maxHeapSizePerRequest) {
111    return new Put(Bytes.toBytes("row")) {
112      @Override
113      public long heapSize() {
114        return maxHeapSizePerRequest;
115      }
116    };
117  }
118
119  @Test
120  public void testTaskCheckerHost() throws IOException {
121    final int maxTotalConcurrentTasks = 100;
122    final int maxConcurrentTasksPerServer = 2;
123    final int maxConcurrentTasksPerRegion = 1;
124    final AtomicLong tasksInProgress = new AtomicLong(0);
125    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
126    final Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR);
127    SimpleRequestController.TaskCountChecker countChecker =
128      new SimpleRequestController.TaskCountChecker(maxTotalConcurrentTasks,
129        maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, tasksInProgress,
130        taskCounterPerServer, taskCounterPerRegion);
131    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
132    // unlimiited
133    SimpleRequestController.RequestHeapSizeChecker sizeChecker =
134      new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
135    RequestController.Checker checker =
136      SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
137    ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
138    assertEquals(ReturnCode.INCLUDE, loc1Code);
139
140    ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
141    // rejected for size
142    assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
143
144    ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
145    // rejected for size
146    assertNotEquals(ReturnCode.INCLUDE, loc2Code);
147
148    // fill the task slots for LOC3.
149    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
150    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
151
152    ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
153    // rejected for count
154    assertNotEquals(ReturnCode.INCLUDE, loc3Code);
155
156    // release the task slots for LOC3.
157    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
158    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
159
160    ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
161    assertEquals(ReturnCode.INCLUDE, loc3Code_2);
162  }
163
164  @Test
165  public void testRequestHeapSizeChecker() throws IOException {
166    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
167    SimpleRequestController.RequestHeapSizeChecker checker =
168      new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
169
170    // inner state is unchanged.
171    for (int i = 0; i != 10; ++i) {
172      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
173      assertEquals(ReturnCode.INCLUDE, code);
174      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
175      assertEquals(ReturnCode.INCLUDE, code);
176    }
177
178    // accept the data located on LOC1 region.
179    ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
180    assertEquals(ReturnCode.INCLUDE, acceptCode);
181    checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
182
183    // the sn server reachs the limit.
184    for (int i = 0; i != 10; ++i) {
185      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
186      assertNotEquals(ReturnCode.INCLUDE, code);
187      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
188      assertNotEquals(ReturnCode.INCLUDE, code);
189    }
190
191    // the request to sn2 server should be accepted.
192    for (int i = 0; i != 10; ++i) {
193      ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
194      assertEquals(ReturnCode.INCLUDE, code);
195    }
196
197    checker.reset();
198    for (int i = 0; i != 10; ++i) {
199      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
200      assertEquals(ReturnCode.INCLUDE, code);
201      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
202      assertEquals(ReturnCode.INCLUDE, code);
203    }
204  }
205
206  @Test
207  public void testRequestRowsChecker() throws IOException {
208    final long maxRowCount = 100;
209    SimpleRequestController.RequestRowsChecker checker =
210      new SimpleRequestController.RequestRowsChecker(maxRowCount);
211
212    final long heapSizeOfRow = 100; // unused
213    // inner state is unchanged.
214    for (int i = 0; i != 10; ++i) {
215      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
216      assertEquals(ReturnCode.INCLUDE, code);
217      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
218      assertEquals(ReturnCode.INCLUDE, code);
219    }
220
221    // accept the data located on LOC1 region.
222    for (int i = 0; i != maxRowCount; ++i) {
223      ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
224      assertEquals(ReturnCode.INCLUDE, acceptCode);
225      checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
226    }
227
228    // the sn server reachs the limit.
229    for (int i = 0; i != 10; ++i) {
230      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
231      assertNotEquals(ReturnCode.INCLUDE, code);
232      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
233      assertNotEquals(ReturnCode.INCLUDE, code);
234    }
235
236    // the request to sn2 server should be accepted.
237    for (int i = 0; i != 10; ++i) {
238      ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
239      assertEquals(ReturnCode.INCLUDE, code);
240    }
241
242    checker.reset();
243    for (int i = 0; i != 10; ++i) {
244      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
245      assertEquals(ReturnCode.INCLUDE, code);
246      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
247      assertEquals(ReturnCode.INCLUDE, code);
248    }
249  }
250
251  @Test
252  public void testSubmittedSizeChecker() {
253    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
254    SimpleRequestController.SubmittedSizeChecker checker =
255      new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
256
257    for (int i = 0; i != 10; ++i) {
258      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
259      assertEquals(ReturnCode.INCLUDE, include);
260    }
261
262    for (int i = 0; i != 10; ++i) {
263      checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
264    }
265
266    for (int i = 0; i != 10; ++i) {
267      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
268      assertEquals(ReturnCode.END, include);
269    }
270    for (int i = 0; i != 10; ++i) {
271      ReturnCode include = checker.canTakeOperation(LOC2, 100000);
272      assertEquals(ReturnCode.END, include);
273    }
274    checker.reset();
275    for (int i = 0; i != 10; ++i) {
276      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
277      assertEquals(ReturnCode.INCLUDE, include);
278    }
279  }
280
281  @Test
282  public void testTaskCountChecker() throws InterruptedIOException {
283    long heapSizeOfRow = 12345;
284    int maxTotalConcurrentTasks = 100;
285    int maxConcurrentTasksPerServer = 2;
286    int maxConcurrentTasksPerRegion = 1;
287    AtomicLong tasksInProgress = new AtomicLong(0);
288    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
289    Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR);
290    SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
291      maxTotalConcurrentTasks, maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion,
292      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
293
294    // inner state is unchanged.
295    for (int i = 0; i != 10; ++i) {
296      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
297      assertEquals(ReturnCode.INCLUDE, code);
298    }
299    // add LOC1 region.
300    ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
301    assertEquals(ReturnCode.INCLUDE, code);
302    checker.notifyFinal(code, LOC1, heapSizeOfRow);
303
304    // fill the task slots for LOC1.
305    taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100));
306    taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
307
308    // the region was previously accepted, so it must be accpted now.
309    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
310      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
311      assertEquals(ReturnCode.INCLUDE, includeCode);
312      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
313    }
314
315    // fill the task slots for LOC3.
316    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
317    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
318
319    // no task slots.
320    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
321      ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
322      assertNotEquals(ReturnCode.INCLUDE, excludeCode);
323      checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
324    }
325
326    // release the tasks for LOC3.
327    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
328    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
329
330    // add LOC3 region.
331    ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
332    assertEquals(ReturnCode.INCLUDE, code3);
333    checker.notifyFinal(code3, LOC3, heapSizeOfRow);
334
335    // the region was previously accepted, so it must be accpted now.
336    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
337      ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
338      assertEquals(ReturnCode.INCLUDE, includeCode);
339      checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
340    }
341
342    checker.reset();
343    // the region was previously accepted,
344    // but checker have reseted and task slots for LOC1 is full.
345    // So it must be rejected now.
346    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
347      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
348      assertNotEquals(ReturnCode.INCLUDE, includeCode);
349      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
350    }
351  }
352
353  @Test
354  public void testWaitForMaximumCurrentTasks() throws Exception {
355    final AtomicInteger max = new AtomicInteger(0);
356    final CyclicBarrier barrier = new CyclicBarrier(2);
357    SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
358    final AtomicLong tasks = controller.tasksInProgress;
359    Runnable runnable = () -> {
360      try {
361        barrier.await();
362        controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
363      } catch (InterruptedIOException | InterruptedException | BrokenBarrierException e) {
364        Assert.fail(e.getMessage());
365      }
366    };
367    // First test that our runnable thread only exits when tasks is zero.
368    Thread t = new Thread(runnable);
369    t.start();
370    barrier.await();
371    t.join();
372    // Now assert we stay running if max == zero and tasks is > 0.
373    barrier.reset();
374    tasks.set(1000000);
375    t = new Thread(runnable);
376    t.start();
377    barrier.await();
378    while (tasks.get() > 0) {
379      assertTrue(t.isAlive());
380      tasks.set(tasks.get() - 1);
381    }
382    t.join();
383  }
384}