001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.BrokenBarrierException; 031import java.util.concurrent.CyclicBarrier; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionInfo; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.SmallTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.Assert; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051@Category({ClientTests.class, SmallTests.class}) 052public class TestSimpleRequestController { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestSimpleRequestController.class); 057 058 private static final TableName DUMMY_TABLE 059 = TableName.valueOf("DUMMY_TABLE"); 060 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 061 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 062 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 063 private static final ServerName SN = ServerName.valueOf("s1,1,1"); 064 private static final ServerName SN2 = ServerName.valueOf("s2,2,2"); 065 private static final HRegionInfo HRI1 066 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); 067 private static final HRegionInfo HRI2 068 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); 069 private static final HRegionInfo HRI3 070 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); 071 private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN); 072 private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN); 073 private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2); 074 075 @Test 076 public void testIllegalRequestHeapSize() { 077 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); 078 } 079 080 @Test 081 public void testIllegalRsTasks() { 082 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1); 083 } 084 085 @Test 086 public void testIllegalRegionTasks() { 087 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1); 088 } 089 090 @Test 091 public void testIllegalSubmittedSize() { 092 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1); 093 } 094 095 @Test 096 public void testIllegalRequestRows() { 097 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1); 098 } 099 100 private void testIllegalArgument(String key, long value) { 101 Configuration conf = HBaseConfiguration.create(); 102 conf.setLong(key, value); 103 try { 104 SimpleRequestController controller = new SimpleRequestController(conf); 105 fail("The " + key + " must be bigger than zero"); 106 } catch (IllegalArgumentException e) { 107 } 108 } 109 110 private static Put createPut(long maxHeapSizePerRequest) { 111 return new Put(Bytes.toBytes("row")) { 112 @Override 113 public long heapSize() { 114 return maxHeapSizePerRequest; 115 } 116 }; 117 } 118 119 @Test 120 public void testTaskCheckerHost() throws IOException { 121 final int maxTotalConcurrentTasks = 100; 122 final int maxConcurrentTasksPerServer = 2; 123 final int maxConcurrentTasksPerRegion = 1; 124 final AtomicLong tasksInProgress = new AtomicLong(0); 125 final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 126 final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>(); 127 SimpleRequestController.TaskCountChecker countChecker = 128 new SimpleRequestController.TaskCountChecker( 129 maxTotalConcurrentTasks, 130 maxConcurrentTasksPerServer, 131 maxConcurrentTasksPerRegion, 132 tasksInProgress, taskCounterPerServer, taskCounterPerRegion); 133 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 134 // unlimiited 135 SimpleRequestController.RequestHeapSizeChecker sizeChecker = 136 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 137 RequestController.Checker checker = 138 SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker)); 139 ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 140 assertEquals(ReturnCode.INCLUDE, loc1Code); 141 142 ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 143 // rejected for size 144 assertNotEquals(ReturnCode.INCLUDE, loc1Code_2); 145 146 ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest)); 147 // rejected for size 148 assertNotEquals(ReturnCode.INCLUDE, loc2Code); 149 150 // fill the task slots for LOC3. 151 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100)); 152 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 153 154 ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L)); 155 // rejected for count 156 assertNotEquals(ReturnCode.INCLUDE, loc3Code); 157 158 // release the task slots for LOC3. 159 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0)); 160 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 161 162 ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L)); 163 assertEquals(ReturnCode.INCLUDE, loc3Code_2); 164 } 165 166 @Test 167 public void testRequestHeapSizeChecker() throws IOException { 168 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 169 SimpleRequestController.RequestHeapSizeChecker checker 170 = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 171 172 // inner state is unchanged. 173 for (int i = 0; i != 10; ++i) { 174 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 175 assertEquals(ReturnCode.INCLUDE, code); 176 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 177 assertEquals(ReturnCode.INCLUDE, code); 178 } 179 180 // accept the data located on LOC1 region. 181 ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 182 assertEquals(ReturnCode.INCLUDE, acceptCode); 183 checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest); 184 185 // the sn server reachs the limit. 186 for (int i = 0; i != 10; ++i) { 187 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 188 assertNotEquals(ReturnCode.INCLUDE, code); 189 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 190 assertNotEquals(ReturnCode.INCLUDE, code); 191 } 192 193 // the request to sn2 server should be accepted. 194 for (int i = 0; i != 10; ++i) { 195 ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest); 196 assertEquals(ReturnCode.INCLUDE, code); 197 } 198 199 checker.reset(); 200 for (int i = 0; i != 10; ++i) { 201 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 202 assertEquals(ReturnCode.INCLUDE, code); 203 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 204 assertEquals(ReturnCode.INCLUDE, code); 205 } 206 } 207 208 @Test 209 public void testRequestRowsChecker() throws IOException { 210 final long maxRowCount = 100; 211 SimpleRequestController.RequestRowsChecker checker 212 = new SimpleRequestController.RequestRowsChecker(maxRowCount); 213 214 final long heapSizeOfRow = 100; //unused 215 // inner state is unchanged. 216 for (int i = 0; i != 10; ++i) { 217 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 218 assertEquals(ReturnCode.INCLUDE, code); 219 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 220 assertEquals(ReturnCode.INCLUDE, code); 221 } 222 223 // accept the data located on LOC1 region. 224 for (int i = 0; i != maxRowCount; ++i) { 225 ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 226 assertEquals(ReturnCode.INCLUDE, acceptCode); 227 checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow); 228 } 229 230 // the sn server reachs the limit. 231 for (int i = 0; i != 10; ++i) { 232 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 233 assertNotEquals(ReturnCode.INCLUDE, code); 234 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 235 assertNotEquals(ReturnCode.INCLUDE, code); 236 } 237 238 // the request to sn2 server should be accepted. 239 for (int i = 0; i != 10; ++i) { 240 ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow); 241 assertEquals(ReturnCode.INCLUDE, code); 242 } 243 244 checker.reset(); 245 for (int i = 0; i != 10; ++i) { 246 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 247 assertEquals(ReturnCode.INCLUDE, code); 248 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 249 assertEquals(ReturnCode.INCLUDE, code); 250 } 251 } 252 253 @Test 254 public void testSubmittedSizeChecker() { 255 final long maxHeapSizeSubmit = 2 * 1024 * 1024; 256 SimpleRequestController.SubmittedSizeChecker checker 257 = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit); 258 259 for (int i = 0; i != 10; ++i) { 260 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 261 assertEquals(ReturnCode.INCLUDE, include); 262 } 263 264 for (int i = 0; i != 10; ++i) { 265 checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit); 266 } 267 268 for (int i = 0; i != 10; ++i) { 269 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 270 assertEquals(ReturnCode.END, include); 271 } 272 for (int i = 0; i != 10; ++i) { 273 ReturnCode include = checker.canTakeOperation(LOC2, 100000); 274 assertEquals(ReturnCode.END, include); 275 } 276 checker.reset(); 277 for (int i = 0; i != 10; ++i) { 278 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 279 assertEquals(ReturnCode.INCLUDE, include); 280 } 281 } 282 283 @Test 284 public void testTaskCountChecker() throws InterruptedIOException { 285 long heapSizeOfRow = 12345; 286 int maxTotalConcurrentTasks = 100; 287 int maxConcurrentTasksPerServer = 2; 288 int maxConcurrentTasksPerRegion = 1; 289 AtomicLong tasksInProgress = new AtomicLong(0); 290 Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 291 Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>(); 292 SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker( 293 maxTotalConcurrentTasks, 294 maxConcurrentTasksPerServer, 295 maxConcurrentTasksPerRegion, 296 tasksInProgress, taskCounterPerServer, taskCounterPerRegion); 297 298 // inner state is unchanged. 299 for (int i = 0; i != 10; ++i) { 300 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 301 assertEquals(ReturnCode.INCLUDE, code); 302 } 303 // add LOC1 region. 304 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 305 assertEquals(ReturnCode.INCLUDE, code); 306 checker.notifyFinal(code, LOC1, heapSizeOfRow); 307 308 // fill the task slots for LOC1. 309 taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100)); 310 taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100)); 311 312 // the region was previously accepted, so it must be accpted now. 313 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 314 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 315 assertEquals(ReturnCode.INCLUDE, includeCode); 316 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 317 } 318 319 // fill the task slots for LOC3. 320 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100)); 321 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 322 323 // no task slots. 324 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 325 ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 326 assertNotEquals(ReturnCode.INCLUDE, excludeCode); 327 checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow); 328 } 329 330 // release the tasks for LOC3. 331 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0)); 332 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 333 334 // add LOC3 region. 335 ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow); 336 assertEquals(ReturnCode.INCLUDE, code3); 337 checker.notifyFinal(code3, LOC3, heapSizeOfRow); 338 339 // the region was previously accepted, so it must be accpted now. 340 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 341 ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 342 assertEquals(ReturnCode.INCLUDE, includeCode); 343 checker.notifyFinal(includeCode, LOC3, heapSizeOfRow); 344 } 345 346 checker.reset(); 347 // the region was previously accepted, 348 // but checker have reseted and task slots for LOC1 is full. 349 // So it must be rejected now. 350 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 351 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 352 assertNotEquals(ReturnCode.INCLUDE, includeCode); 353 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 354 } 355 } 356 357 @Test 358 public void testWaitForMaximumCurrentTasks() throws Exception { 359 final AtomicInteger max = new AtomicInteger(0); 360 final CyclicBarrier barrier = new CyclicBarrier(2); 361 SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create()); 362 final AtomicLong tasks = controller.tasksInProgress; 363 Runnable runnable = () -> { 364 try { 365 barrier.await(); 366 controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null); 367 } catch (InterruptedIOException e) { 368 Assert.fail(e.getMessage()); 369 } catch (InterruptedException | BrokenBarrierException e) { 370 e.printStackTrace(); 371 } 372 }; 373 // First test that our runnable thread only exits when tasks is zero. 374 Thread t = new Thread(runnable); 375 t.start(); 376 barrier.await(); 377 t.join(); 378 // Now assert we stay running if max == zero and tasks is > 0. 379 barrier.reset(); 380 tasks.set(1000000); 381 t = new Thread(runnable); 382 t.start(); 383 barrier.await(); 384 while (tasks.get() > 0) { 385 assertTrue(t.isAlive()); 386 tasks.set(tasks.get() - 1); 387 } 388 t.join(); 389 } 390}