001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.BrokenBarrierException;
031import java.util.concurrent.CyclicBarrier;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionInfo;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.SmallTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.Assert;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051@Category({ClientTests.class, SmallTests.class})
052public class TestSimpleRequestController {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestSimpleRequestController.class);
057
058  private static final TableName DUMMY_TABLE
059          = TableName.valueOf("DUMMY_TABLE");
060  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
061  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
062  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
063  private static final ServerName SN = ServerName.valueOf("s1,1,1");
064  private static final ServerName SN2 = ServerName.valueOf("s2,2,2");
065  private static final HRegionInfo HRI1
066          = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
067  private static final HRegionInfo HRI2
068          = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
069  private static final HRegionInfo HRI3
070          = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
071  private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
072  private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
073  private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
074
075  @Test
076  public void testIllegalRequestHeapSize() {
077    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
078  }
079
080  @Test
081  public void testIllegalRsTasks() {
082    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
083  }
084
085  @Test
086  public void testIllegalRegionTasks() {
087    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
088  }
089
090  @Test
091  public void testIllegalSubmittedSize() {
092    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
093  }
094
095  @Test
096  public void testIllegalRequestRows() {
097    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
098  }
099
100  private void testIllegalArgument(String key, long value) {
101    Configuration conf = HBaseConfiguration.create();
102    conf.setLong(key, value);
103    try {
104      SimpleRequestController controller = new SimpleRequestController(conf);
105      fail("The " + key + " must be bigger than zero");
106    } catch (IllegalArgumentException e) {
107    }
108  }
109
110  private static Put createPut(long maxHeapSizePerRequest) {
111    return new Put(Bytes.toBytes("row")) {
112      @Override
113      public long heapSize() {
114        return maxHeapSizePerRequest;
115      }
116    };
117  }
118
119  @Test
120  public void testTaskCheckerHost() throws IOException {
121    final int maxTotalConcurrentTasks = 100;
122    final int maxConcurrentTasksPerServer = 2;
123    final int maxConcurrentTasksPerRegion = 1;
124    final AtomicLong tasksInProgress = new AtomicLong(0);
125    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
126    final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
127    SimpleRequestController.TaskCountChecker countChecker =
128        new SimpleRequestController.TaskCountChecker(
129            maxTotalConcurrentTasks,
130            maxConcurrentTasksPerServer,
131            maxConcurrentTasksPerRegion,
132            tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
133    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
134    // unlimiited
135    SimpleRequestController.RequestHeapSizeChecker sizeChecker =
136        new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
137    RequestController.Checker checker =
138        SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
139    ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
140    assertEquals(ReturnCode.INCLUDE, loc1Code);
141
142    ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
143    // rejected for size
144    assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
145
146    ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
147    // rejected for size
148    assertNotEquals(ReturnCode.INCLUDE, loc2Code);
149
150    // fill the task slots for LOC3.
151    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
152    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
153
154    ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
155    // rejected for count
156    assertNotEquals(ReturnCode.INCLUDE, loc3Code);
157
158    // release the task slots for LOC3.
159    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
160    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
161
162    ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
163    assertEquals(ReturnCode.INCLUDE, loc3Code_2);
164  }
165
166  @Test
167  public void testRequestHeapSizeChecker() throws IOException {
168    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
169    SimpleRequestController.RequestHeapSizeChecker checker
170            = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
171
172    // inner state is unchanged.
173    for (int i = 0; i != 10; ++i) {
174      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
175      assertEquals(ReturnCode.INCLUDE, code);
176      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
177      assertEquals(ReturnCode.INCLUDE, code);
178    }
179
180    // accept the data located on LOC1 region.
181    ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
182    assertEquals(ReturnCode.INCLUDE, acceptCode);
183    checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
184
185    // the sn server reachs the limit.
186    for (int i = 0; i != 10; ++i) {
187      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
188      assertNotEquals(ReturnCode.INCLUDE, code);
189      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
190      assertNotEquals(ReturnCode.INCLUDE, code);
191    }
192
193    // the request to sn2 server should be accepted.
194    for (int i = 0; i != 10; ++i) {
195      ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
196      assertEquals(ReturnCode.INCLUDE, code);
197    }
198
199    checker.reset();
200    for (int i = 0; i != 10; ++i) {
201      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
202      assertEquals(ReturnCode.INCLUDE, code);
203      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
204      assertEquals(ReturnCode.INCLUDE, code);
205    }
206  }
207
208  @Test
209  public void testRequestRowsChecker() throws IOException {
210    final long maxRowCount = 100;
211    SimpleRequestController.RequestRowsChecker checker
212      = new SimpleRequestController.RequestRowsChecker(maxRowCount);
213
214    final long heapSizeOfRow = 100; //unused
215    // inner state is unchanged.
216    for (int i = 0; i != 10; ++i) {
217      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
218      assertEquals(ReturnCode.INCLUDE, code);
219      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
220      assertEquals(ReturnCode.INCLUDE, code);
221    }
222
223    // accept the data located on LOC1 region.
224    for (int i = 0; i != maxRowCount; ++i) {
225      ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
226      assertEquals(ReturnCode.INCLUDE, acceptCode);
227      checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
228    }
229
230    // the sn server reachs the limit.
231    for (int i = 0; i != 10; ++i) {
232      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
233      assertNotEquals(ReturnCode.INCLUDE, code);
234      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
235      assertNotEquals(ReturnCode.INCLUDE, code);
236    }
237
238    // the request to sn2 server should be accepted.
239    for (int i = 0; i != 10; ++i) {
240      ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
241      assertEquals(ReturnCode.INCLUDE, code);
242    }
243
244    checker.reset();
245    for (int i = 0; i != 10; ++i) {
246      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
247      assertEquals(ReturnCode.INCLUDE, code);
248      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
249      assertEquals(ReturnCode.INCLUDE, code);
250    }
251  }
252
253  @Test
254  public void testSubmittedSizeChecker() {
255    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
256    SimpleRequestController.SubmittedSizeChecker checker
257            = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
258
259    for (int i = 0; i != 10; ++i) {
260      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
261      assertEquals(ReturnCode.INCLUDE, include);
262    }
263
264    for (int i = 0; i != 10; ++i) {
265      checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
266    }
267
268    for (int i = 0; i != 10; ++i) {
269      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
270      assertEquals(ReturnCode.END, include);
271    }
272    for (int i = 0; i != 10; ++i) {
273      ReturnCode include = checker.canTakeOperation(LOC2, 100000);
274      assertEquals(ReturnCode.END, include);
275    }
276    checker.reset();
277    for (int i = 0; i != 10; ++i) {
278      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
279      assertEquals(ReturnCode.INCLUDE, include);
280    }
281  }
282
283  @Test
284  public void testTaskCountChecker() throws InterruptedIOException {
285    long heapSizeOfRow = 12345;
286    int maxTotalConcurrentTasks = 100;
287    int maxConcurrentTasksPerServer = 2;
288    int maxConcurrentTasksPerRegion = 1;
289    AtomicLong tasksInProgress = new AtomicLong(0);
290    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
291    Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
292    SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
293            maxTotalConcurrentTasks,
294            maxConcurrentTasksPerServer,
295            maxConcurrentTasksPerRegion,
296            tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
297
298    // inner state is unchanged.
299    for (int i = 0; i != 10; ++i) {
300      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
301      assertEquals(ReturnCode.INCLUDE, code);
302    }
303    // add LOC1 region.
304    ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
305    assertEquals(ReturnCode.INCLUDE, code);
306    checker.notifyFinal(code, LOC1, heapSizeOfRow);
307
308    // fill the task slots for LOC1.
309    taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100));
310    taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
311
312    // the region was previously accepted, so it must be accpted now.
313    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
314      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
315      assertEquals(ReturnCode.INCLUDE, includeCode);
316      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
317    }
318
319    // fill the task slots for LOC3.
320    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
321    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
322
323    // no task slots.
324    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
325      ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
326      assertNotEquals(ReturnCode.INCLUDE, excludeCode);
327      checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
328    }
329
330    // release the tasks for LOC3.
331    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
332    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
333
334    // add LOC3 region.
335    ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
336    assertEquals(ReturnCode.INCLUDE, code3);
337    checker.notifyFinal(code3, LOC3, heapSizeOfRow);
338
339    // the region was previously accepted, so it must be accpted now.
340    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
341      ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
342      assertEquals(ReturnCode.INCLUDE, includeCode);
343      checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
344    }
345
346    checker.reset();
347    // the region was previously accepted,
348    // but checker have reseted and task slots for LOC1 is full.
349    // So it must be rejected now.
350    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
351      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
352      assertNotEquals(ReturnCode.INCLUDE, includeCode);
353      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
354    }
355  }
356
357  @Test
358  public void testWaitForMaximumCurrentTasks() throws Exception {
359    final AtomicInteger max = new AtomicInteger(0);
360    final CyclicBarrier barrier = new CyclicBarrier(2);
361    SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
362    final AtomicLong tasks = controller.tasksInProgress;
363    Runnable runnable = () -> {
364      try {
365        barrier.await();
366        controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
367      } catch (InterruptedIOException e) {
368        Assert.fail(e.getMessage());
369      } catch (InterruptedException | BrokenBarrierException e) {
370        e.printStackTrace();
371      }
372    };
373    // First test that our runnable thread only exits when tasks is zero.
374    Thread t = new Thread(runnable);
375    t.start();
376    barrier.await();
377    t.join();
378    // Now assert we stay running if max == zero and tasks is > 0.
379    barrier.reset();
380    tasks.set(1000000);
381    t = new Thread(runnable);
382    t.start();
383    barrier.await();
384    while (tasks.get() > 0) {
385      assertTrue(t.isAlive());
386      tasks.set(tasks.get() - 1);
387    }
388    t.join();
389  }
390}