001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.BrokenBarrierException;
031import java.util.concurrent.CyclicBarrier;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050@Category({ClientTests.class, SmallTests.class})
051public class TestSimpleRequestController {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055      HBaseClassTestRule.forClass(TestSimpleRequestController.class);
056
057  private static final TableName DUMMY_TABLE
058          = TableName.valueOf("DUMMY_TABLE");
059  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
060  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
061  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
062  private static final ServerName SN = ServerName.valueOf("s1,1,1");
063  private static final ServerName SN2 = ServerName.valueOf("s2,2,2");
064  private static final RegionInfo HRI1 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
065    .setStartKey(DUMMY_BYTES_1).setEndKey(DUMMY_BYTES_2).setRegionId(1).build();
066  private static final RegionInfo HRI2 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
067    .setStartKey(DUMMY_BYTES_2).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(2).build();
068  private static final RegionInfo HRI3 = RegionInfoBuilder.newBuilder(DUMMY_TABLE)
069    .setStartKey(DUMMY_BYTES_3).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(3).build();
070  private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
071  private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
072  private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
073
074  @Test
075  public void testIllegalRequestHeapSize() {
076    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
077  }
078
079  @Test
080  public void testIllegalRsTasks() {
081    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
082  }
083
084  @Test
085  public void testIllegalRegionTasks() {
086    testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
087  }
088
089  @Test
090  public void testIllegalSubmittedSize() {
091    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
092  }
093
094  @Test
095  public void testIllegalRequestRows() {
096    testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
097  }
098
099  private void testIllegalArgument(String key, long value) {
100    Configuration conf = HBaseConfiguration.create();
101    conf.setLong(key, value);
102    try {
103      new SimpleRequestController(conf);
104      fail("The " + key + " must be bigger than zero");
105    } catch (IllegalArgumentException e) {
106    }
107  }
108
109  private static Put createPut(long maxHeapSizePerRequest) {
110    return new Put(Bytes.toBytes("row")) {
111      @Override
112      public long heapSize() {
113        return maxHeapSizePerRequest;
114      }
115    };
116  }
117
118  @Test
119  public void testTaskCheckerHost() throws IOException {
120    final int maxTotalConcurrentTasks = 100;
121    final int maxConcurrentTasksPerServer = 2;
122    final int maxConcurrentTasksPerRegion = 1;
123    final AtomicLong tasksInProgress = new AtomicLong(0);
124    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
125    final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
126    SimpleRequestController.TaskCountChecker countChecker =
127        new SimpleRequestController.TaskCountChecker(
128            maxTotalConcurrentTasks,
129            maxConcurrentTasksPerServer,
130            maxConcurrentTasksPerRegion,
131            tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
132    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
133    // unlimiited
134    SimpleRequestController.RequestHeapSizeChecker sizeChecker =
135        new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
136    RequestController.Checker checker =
137        SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
138    ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
139    assertEquals(ReturnCode.INCLUDE, loc1Code);
140
141    ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
142    // rejected for size
143    assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
144
145    ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
146    // rejected for size
147    assertNotEquals(ReturnCode.INCLUDE, loc2Code);
148
149    // fill the task slots for LOC3.
150    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
151    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
152
153    ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
154    // rejected for count
155    assertNotEquals(ReturnCode.INCLUDE, loc3Code);
156
157    // release the task slots for LOC3.
158    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
159    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
160
161    ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
162    assertEquals(ReturnCode.INCLUDE, loc3Code_2);
163  }
164
165  @Test
166  public void testRequestHeapSizeChecker() throws IOException {
167    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
168    SimpleRequestController.RequestHeapSizeChecker checker
169            = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
170
171    // inner state is unchanged.
172    for (int i = 0; i != 10; ++i) {
173      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
174      assertEquals(ReturnCode.INCLUDE, code);
175      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
176      assertEquals(ReturnCode.INCLUDE, code);
177    }
178
179    // accept the data located on LOC1 region.
180    ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
181    assertEquals(ReturnCode.INCLUDE, acceptCode);
182    checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
183
184    // the sn server reachs the limit.
185    for (int i = 0; i != 10; ++i) {
186      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
187      assertNotEquals(ReturnCode.INCLUDE, code);
188      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
189      assertNotEquals(ReturnCode.INCLUDE, code);
190    }
191
192    // the request to sn2 server should be accepted.
193    for (int i = 0; i != 10; ++i) {
194      ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
195      assertEquals(ReturnCode.INCLUDE, code);
196    }
197
198    checker.reset();
199    for (int i = 0; i != 10; ++i) {
200      ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
201      assertEquals(ReturnCode.INCLUDE, code);
202      code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
203      assertEquals(ReturnCode.INCLUDE, code);
204    }
205  }
206
207  @Test
208  public void testRequestRowsChecker() throws IOException {
209    final long maxRowCount = 100;
210    SimpleRequestController.RequestRowsChecker checker
211      = new SimpleRequestController.RequestRowsChecker(maxRowCount);
212
213    final long heapSizeOfRow = 100; //unused
214    // inner state is unchanged.
215    for (int i = 0; i != 10; ++i) {
216      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
217      assertEquals(ReturnCode.INCLUDE, code);
218      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
219      assertEquals(ReturnCode.INCLUDE, code);
220    }
221
222    // accept the data located on LOC1 region.
223    for (int i = 0; i != maxRowCount; ++i) {
224      ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
225      assertEquals(ReturnCode.INCLUDE, acceptCode);
226      checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
227    }
228
229    // the sn server reachs the limit.
230    for (int i = 0; i != 10; ++i) {
231      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
232      assertNotEquals(ReturnCode.INCLUDE, code);
233      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
234      assertNotEquals(ReturnCode.INCLUDE, code);
235    }
236
237    // the request to sn2 server should be accepted.
238    for (int i = 0; i != 10; ++i) {
239      ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
240      assertEquals(ReturnCode.INCLUDE, code);
241    }
242
243    checker.reset();
244    for (int i = 0; i != 10; ++i) {
245      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
246      assertEquals(ReturnCode.INCLUDE, code);
247      code = checker.canTakeOperation(LOC2, heapSizeOfRow);
248      assertEquals(ReturnCode.INCLUDE, code);
249    }
250  }
251
252  @Test
253  public void testSubmittedSizeChecker() {
254    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
255    SimpleRequestController.SubmittedSizeChecker checker
256            = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
257
258    for (int i = 0; i != 10; ++i) {
259      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
260      assertEquals(ReturnCode.INCLUDE, include);
261    }
262
263    for (int i = 0; i != 10; ++i) {
264      checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
265    }
266
267    for (int i = 0; i != 10; ++i) {
268      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
269      assertEquals(ReturnCode.END, include);
270    }
271    for (int i = 0; i != 10; ++i) {
272      ReturnCode include = checker.canTakeOperation(LOC2, 100000);
273      assertEquals(ReturnCode.END, include);
274    }
275    checker.reset();
276    for (int i = 0; i != 10; ++i) {
277      ReturnCode include = checker.canTakeOperation(LOC1, 100000);
278      assertEquals(ReturnCode.INCLUDE, include);
279    }
280  }
281
282  @Test
283  public void testTaskCountChecker() throws InterruptedIOException {
284    long heapSizeOfRow = 12345;
285    int maxTotalConcurrentTasks = 100;
286    int maxConcurrentTasksPerServer = 2;
287    int maxConcurrentTasksPerRegion = 1;
288    AtomicLong tasksInProgress = new AtomicLong(0);
289    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
290    Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
291    SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
292            maxTotalConcurrentTasks,
293            maxConcurrentTasksPerServer,
294            maxConcurrentTasksPerRegion,
295            tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
296
297    // inner state is unchanged.
298    for (int i = 0; i != 10; ++i) {
299      ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
300      assertEquals(ReturnCode.INCLUDE, code);
301    }
302    // add LOC1 region.
303    ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
304    assertEquals(ReturnCode.INCLUDE, code);
305    checker.notifyFinal(code, LOC1, heapSizeOfRow);
306
307    // fill the task slots for LOC1.
308    taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100));
309    taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
310
311    // the region was previously accepted, so it must be accpted now.
312    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
313      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
314      assertEquals(ReturnCode.INCLUDE, includeCode);
315      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
316    }
317
318    // fill the task slots for LOC3.
319    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100));
320    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
321
322    // no task slots.
323    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
324      ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
325      assertNotEquals(ReturnCode.INCLUDE, excludeCode);
326      checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
327    }
328
329    // release the tasks for LOC3.
330    taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0));
331    taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
332
333    // add LOC3 region.
334    ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
335    assertEquals(ReturnCode.INCLUDE, code3);
336    checker.notifyFinal(code3, LOC3, heapSizeOfRow);
337
338    // the region was previously accepted, so it must be accpted now.
339    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
340      ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
341      assertEquals(ReturnCode.INCLUDE, includeCode);
342      checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
343    }
344
345    checker.reset();
346    // the region was previously accepted,
347    // but checker have reseted and task slots for LOC1 is full.
348    // So it must be rejected now.
349    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
350      ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
351      assertNotEquals(ReturnCode.INCLUDE, includeCode);
352      checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
353    }
354  }
355
356  @Test
357  public void testWaitForMaximumCurrentTasks() throws Exception {
358    final AtomicInteger max = new AtomicInteger(0);
359    final CyclicBarrier barrier = new CyclicBarrier(2);
360    SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
361    final AtomicLong tasks = controller.tasksInProgress;
362    Runnable runnable = () -> {
363      try {
364        barrier.await();
365        controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
366      } catch (InterruptedIOException e) {
367        Assert.fail(e.getMessage());
368      } catch (InterruptedException | BrokenBarrierException e) {
369        e.printStackTrace();
370      }
371    };
372    // First test that our runnable thread only exits when tasks is zero.
373    Thread t = new Thread(runnable);
374    t.start();
375    barrier.await();
376    t.join();
377    // Now assert we stay running if max == zero and tasks is > 0.
378    barrier.reset();
379    tasks.set(1000000);
380    t = new Thread(runnable);
381    t.start();
382    barrier.await();
383    while (tasks.get() > 0) {
384      assertTrue(t.isAlive());
385      tasks.set(tasks.get() - 1);
386    }
387    t.join();
388  }
389}