001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.concurrent.BrokenBarrierException;
032import java.util.concurrent.CyclicBarrier;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048@Tag(ClientTests.TAG)
049@Tag(SmallTests.TAG)
050public class TestSimpleRequestController {
051
052  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
053  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
054  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
055  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
056  private static final ServerName SN = ServerName.valueOf("s1,1,1");
057  private static final ServerName SN2 = ServerName.valueOf("s2,2,2");
058  private static final RegionInfo HRI1 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
059    .setStartKey(DUMMY_BYTES_1).setEndKey(DUMMY_BYTES_2).setRegionId(1).build();
060  private static final RegionInfo HRI2 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
061    .setStartKey(DUMMY_BYTES_2).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(2).build();
062  private static final RegionInfo HRI3 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
063    .setStartKey(DUMMY_BYTES_3).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(3).build();
064  private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
065  private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
066  private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
067
068  @Test
069  public void testIllegalRequestHeapSize() {
070    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
071  }
072
073  @Test
074  public void testIllegalRsTasks() {
075    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
076  }
077
078  @Test
079  public void testIllegalRegionTasks() {
080    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
081  }
082
083  @Test
084  public void testIllegalSubmittedSize() {
085    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
086  }
087
088  @Test
089  public void testIllegalRequestRows() {
090    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
091  }
092
093  private void testIllegalArgument(String key, long value) {
094    Configuration conf = HBaseConfiguration.create();
095    conf.setLong(key, value);
096    try {
097      new SimpleRequestController(conf);
098      fail("The " + key + " must be bigger than zero");
099    } catch (IllegalArgumentException e) {
100      // Expected
101    }
102  }
103
104  private static Put createPut(long maxHeapSizePerRequest) {
105    return new Put(Bytes.toBytes("row")) {
106      @Override
107      public long heapSize() {
108        return maxHeapSizePerRequest;
109      }
110    };
111  }
112
113  @Test
114  public void testTaskCheckerHost() throws IOException {
115    final int maxTotalConcurrentTasks = 100;
116    final int maxConcurrentTasksPerServer = 2;
117    final int maxConcurrentTasksPerRegion = 1;
118    final AtomicLong tasksInProgress = new AtomicLong(0);
119    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
120    final Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR);
121    SimpleRequestController.TaskCountChecker countChecker =
122      new SimpleRequestController.TaskCountChecker(maxTotalConcurrentTasks,
123        maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, tasksInProgress,
124        taskCounterPerServer, taskCounterPerRegion);
125    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
126    // unlimiited
127    SimpleRequestController.RequestHeapSizeChecker sizeChecker =
128      new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
129    RequestController.Checker checker =
130      SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
131    ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
132    assertEquals(ReturnCode.INCLUDE, loc1Code);
133
134    ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
135    // rejected for size
136    assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
137
138    ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
139    // rejected for size
140    assertNotEquals(ReturnCode.INCLUDE, loc2Code);
141
142    // fill the task slots for LOC3.
143    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
144    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
145
146    ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
147    // rejected for count
148    assertNotEquals(ReturnCode.INCLUDE, loc3Code);
149
150    // release the task slots for LOC3.
151    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
152    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
153
154    ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
155    assertEquals(ReturnCode.INCLUDE, loc3Code_2);
156  }
157
158  @Test
159  public void testRequestHeapSizeChecker() throws IOException {
160    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
161    SimpleRequestController.RequestHeapSizeChecker checker =
162      new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
163
164    // inner state is unchanged.
165    for (int i = 0; i != 10; ++i) {
166      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
167      assertEquals(ReturnCode.INCLUDE, code);
168      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
169      assertEquals(ReturnCode.INCLUDE, code);
170    }
171
172    // accept the data located on LOC1 region.
173    ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
174    assertEquals(ReturnCode.INCLUDE, acceptCode);
175    checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
176
177    // the sn server reachs the limit.
178    for (int i = 0; i != 10; ++i) {
179      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
180      assertNotEquals(ReturnCode.INCLUDE, code);
181      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
182      assertNotEquals(ReturnCode.INCLUDE, code);
183    }
184
185    // the request to sn2 server should be accepted.
186    for (int i = 0; i != 10; ++i) {
187      ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
188      assertEquals(ReturnCode.INCLUDE, code);
189    }
190
191    checker.reset();
192    for (int i = 0; i != 10; ++i) {
193      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
194      assertEquals(ReturnCode.INCLUDE, code);
195      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
196      assertEquals(ReturnCode.INCLUDE, code);
197    }
198  }
199
200  @Test
201  public void testRequestRowsChecker() throws IOException {
202    final long maxRowCount = 100;
203    SimpleRequestController.RequestRowsChecker checker =
204      new SimpleRequestController.RequestRowsChecker(maxRowCount);
205
206    final long heapSizeOfRow = 100; // unused
207    // inner state is unchanged.
208    for (int i = 0; i != 10; ++i) {
209      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
210      assertEquals(ReturnCode.INCLUDE, code);
211      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
212      assertEquals(ReturnCode.INCLUDE, code);
213    }
214
215    // accept the data located on LOC1 region.
216    for (int i = 0; i != maxRowCount; ++i) {
217      ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
218      assertEquals(ReturnCode.INCLUDE, acceptCode);
219      checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
220    }
221
222    // the sn server reachs the limit.
223    for (int i = 0; i != 10; ++i) {
224      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
225      assertNotEquals(ReturnCode.INCLUDE, code);
226      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
227      assertNotEquals(ReturnCode.INCLUDE, code);
228    }
229
230    // the request to sn2 server should be accepted.
231    for (int i = 0; i != 10; ++i) {
232      ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
233      assertEquals(ReturnCode.INCLUDE, code);
234    }
235
236    checker.reset();
237    for (int i = 0; i != 10; ++i) {
238      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
239      assertEquals(ReturnCode.INCLUDE, code);
240      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
241      assertEquals(ReturnCode.INCLUDE, code);
242    }
243  }
244
245  @Test
246  public void testSubmittedSizeChecker() {
247    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
248    SimpleRequestController.SubmittedSizeChecker checker =
249      new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
250
251    for (int i = 0; i != 10; ++i) {
252      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
253      assertEquals(ReturnCode.INCLUDE, include);
254    }
255
256    for (int i = 0; i != 10; ++i) {
257      checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
258    }
259
260    for (int i = 0; i != 10; ++i) {
261      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
262      assertEquals(ReturnCode.END, include);
263    }
264    for (int i = 0; i != 10; ++i) {
265      ReturnCode include = checker.canTakeOperation(LOC2, 100000);
266      assertEquals(ReturnCode.END, include);
267    }
268    checker.reset();
269    for (int i = 0; i != 10; ++i) {
270      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
271      assertEquals(ReturnCode.INCLUDE, include);
272    }
273  }
274
275  @Test
276  public void testTaskCountChecker() throws InterruptedIOException {
277    long heapSizeOfRow = 12345;
278    int maxTotalConcurrentTasks = 100;
279    int maxConcurrentTasksPerServer = 2;
280    int maxConcurrentTasksPerRegion = 1;
281    AtomicLong tasksInProgress = new AtomicLong(0);
282    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
283    Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR);
284    SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
285      maxTotalConcurrentTasks, maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion,
286      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
287
288    // inner state is unchanged.
289    for (int i = 0; i != 10; ++i) {
290      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
291      assertEquals(ReturnCode.INCLUDE, code);
292    }
293    // add LOC1 region.
294    ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
295    assertEquals(ReturnCode.INCLUDE, code);
296    checker.notifyFinal(code, LOC1, heapSizeOfRow);
297
298    // fill the task slots for LOC1.
299    taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100));
300    taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
301
302    // the region was previously accepted, so it must be accpted now.
303    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
304      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
305      assertEquals(ReturnCode.INCLUDE, includeCode);
306      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
307    }
308
309    // fill the task slots for LOC3.
310    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
311    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
312
313    // no task slots.
314    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
315      ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
316      assertNotEquals(ReturnCode.INCLUDE, excludeCode);
317      checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
318    }
319
320    // release the tasks for LOC3.
321    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
322    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
323
324    // add LOC3 region.
325    ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
326    assertEquals(ReturnCode.INCLUDE, code3);
327    checker.notifyFinal(code3, LOC3, heapSizeOfRow);
328
329    // the region was previously accepted, so it must be accpted now.
330    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
331      ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
332      assertEquals(ReturnCode.INCLUDE, includeCode);
333      checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
334    }
335
336    checker.reset();
337    // the region was previously accepted,
338    // but checker have reseted and task slots for LOC1 is full.
339    // So it must be rejected now.
340    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
341      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
342      assertNotEquals(ReturnCode.INCLUDE, includeCode);
343      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
344    }
345  }
346
347  @Test
348  public void testWaitForMaximumCurrentTasks() throws Exception {
349    final AtomicInteger max = new AtomicInteger(0);
350    final CyclicBarrier barrier = new CyclicBarrier(2);
351    SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
352    final AtomicLong tasks = controller.tasksInProgress;
353    Runnable runnable = () -> {
354      try {
355        barrier.await();
356        controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
357      } catch (InterruptedIOException | InterruptedException | BrokenBarrierException e) {
358        fail(e.getMessage());
359      }
360    };
361    // First test that our runnable thread only exits when tasks is zero.
362    Thread t = new Thread(runnable);
363    t.start();
364    barrier.await();
365    t.join();
366    // Now assert we stay running if max == zero and tasks is > 0.
367    barrier.reset();
368    tasks.set(1000000);
369    t = new Thread(runnable);
370    t.start();
371    barrier.await();
372    while (tasks.get() > 0) {
373      assertTrue(t.isAlive());
374      tasks.set(tasks.get() - 1);
375    }
376    t.join();
377  }
378}