001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.BrokenBarrierException; 031import java.util.concurrent.CyclicBarrier; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.Assert; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050@Category({ClientTests.class, SmallTests.class}) 051public class TestSimpleRequestController { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestSimpleRequestController.class); 056 057 private static final TableName DUMMY_TABLE 058 = TableName.valueOf("DUMMY_TABLE"); 059 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 060 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 061 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 062 private static final ServerName SN = ServerName.valueOf("s1,1,1"); 063 private static final ServerName SN2 = ServerName.valueOf("s2,2,2"); 064 private static final RegionInfo HRI1 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 065 .setStartKey(DUMMY_BYTES_1).setEndKey(DUMMY_BYTES_2).setRegionId(1).build(); 066 private static final RegionInfo HRI2 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 067 .setStartKey(DUMMY_BYTES_2).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(2).build(); 068 private static final RegionInfo HRI3 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 069 .setStartKey(DUMMY_BYTES_3).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(3).build(); 070 private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN); 071 private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN); 072 private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2); 073 074 @Test 075 public void testIllegalRequestHeapSize() { 076 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); 077 } 078 079 @Test 080 public void testIllegalRsTasks() { 081 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1); 082 } 083 084 @Test 085 public void testIllegalRegionTasks() { 086 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1); 087 } 088 089 @Test 090 public void testIllegalSubmittedSize() { 091 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1); 092 } 093 094 @Test 095 public void testIllegalRequestRows() { 096 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1); 097 } 098 099 private void testIllegalArgument(String key, long value) { 100 Configuration conf = HBaseConfiguration.create(); 101 conf.setLong(key, value); 102 try { 103 new SimpleRequestController(conf); 104 fail("The " + key + " must be bigger than zero"); 105 } catch (IllegalArgumentException e) { 106 } 107 } 108 109 private static Put createPut(long maxHeapSizePerRequest) { 110 return new Put(Bytes.toBytes("row")) { 111 @Override 112 public long heapSize() { 113 return maxHeapSizePerRequest; 114 } 115 }; 116 } 117 118 @Test 119 public void testTaskCheckerHost() throws IOException { 120 final int maxTotalConcurrentTasks = 100; 121 final int maxConcurrentTasksPerServer = 2; 122 final int maxConcurrentTasksPerRegion = 1; 123 final AtomicLong tasksInProgress = new AtomicLong(0); 124 final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 125 final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>(); 126 SimpleRequestController.TaskCountChecker countChecker = 127 new SimpleRequestController.TaskCountChecker( 128 maxTotalConcurrentTasks, 129 maxConcurrentTasksPerServer, 130 maxConcurrentTasksPerRegion, 131 tasksInProgress, taskCounterPerServer, taskCounterPerRegion); 132 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 133 // unlimiited 134 SimpleRequestController.RequestHeapSizeChecker sizeChecker = 135 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 136 RequestController.Checker checker = 137 SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker)); 138 ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 139 assertEquals(ReturnCode.INCLUDE, loc1Code); 140 141 ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 142 // rejected for size 143 assertNotEquals(ReturnCode.INCLUDE, loc1Code_2); 144 145 ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest)); 146 // rejected for size 147 assertNotEquals(ReturnCode.INCLUDE, loc2Code); 148 149 // fill the task slots for LOC3. 150 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100)); 151 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 152 153 ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L)); 154 // rejected for count 155 assertNotEquals(ReturnCode.INCLUDE, loc3Code); 156 157 // release the task slots for LOC3. 158 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0)); 159 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 160 161 ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L)); 162 assertEquals(ReturnCode.INCLUDE, loc3Code_2); 163 } 164 165 @Test 166 public void testRequestHeapSizeChecker() throws IOException { 167 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 168 SimpleRequestController.RequestHeapSizeChecker checker 169 = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 170 171 // inner state is unchanged. 172 for (int i = 0; i != 10; ++i) { 173 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 174 assertEquals(ReturnCode.INCLUDE, code); 175 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 176 assertEquals(ReturnCode.INCLUDE, code); 177 } 178 179 // accept the data located on LOC1 region. 180 ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 181 assertEquals(ReturnCode.INCLUDE, acceptCode); 182 checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest); 183 184 // the sn server reachs the limit. 185 for (int i = 0; i != 10; ++i) { 186 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 187 assertNotEquals(ReturnCode.INCLUDE, code); 188 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 189 assertNotEquals(ReturnCode.INCLUDE, code); 190 } 191 192 // the request to sn2 server should be accepted. 193 for (int i = 0; i != 10; ++i) { 194 ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest); 195 assertEquals(ReturnCode.INCLUDE, code); 196 } 197 198 checker.reset(); 199 for (int i = 0; i != 10; ++i) { 200 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 201 assertEquals(ReturnCode.INCLUDE, code); 202 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 203 assertEquals(ReturnCode.INCLUDE, code); 204 } 205 } 206 207 @Test 208 public void testRequestRowsChecker() throws IOException { 209 final long maxRowCount = 100; 210 SimpleRequestController.RequestRowsChecker checker 211 = new SimpleRequestController.RequestRowsChecker(maxRowCount); 212 213 final long heapSizeOfRow = 100; //unused 214 // inner state is unchanged. 215 for (int i = 0; i != 10; ++i) { 216 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 217 assertEquals(ReturnCode.INCLUDE, code); 218 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 219 assertEquals(ReturnCode.INCLUDE, code); 220 } 221 222 // accept the data located on LOC1 region. 223 for (int i = 0; i != maxRowCount; ++i) { 224 ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 225 assertEquals(ReturnCode.INCLUDE, acceptCode); 226 checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow); 227 } 228 229 // the sn server reachs the limit. 230 for (int i = 0; i != 10; ++i) { 231 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 232 assertNotEquals(ReturnCode.INCLUDE, code); 233 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 234 assertNotEquals(ReturnCode.INCLUDE, code); 235 } 236 237 // the request to sn2 server should be accepted. 238 for (int i = 0; i != 10; ++i) { 239 ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow); 240 assertEquals(ReturnCode.INCLUDE, code); 241 } 242 243 checker.reset(); 244 for (int i = 0; i != 10; ++i) { 245 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 246 assertEquals(ReturnCode.INCLUDE, code); 247 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 248 assertEquals(ReturnCode.INCLUDE, code); 249 } 250 } 251 252 @Test 253 public void testSubmittedSizeChecker() { 254 final long maxHeapSizeSubmit = 2 * 1024 * 1024; 255 SimpleRequestController.SubmittedSizeChecker checker 256 = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit); 257 258 for (int i = 0; i != 10; ++i) { 259 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 260 assertEquals(ReturnCode.INCLUDE, include); 261 } 262 263 for (int i = 0; i != 10; ++i) { 264 checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit); 265 } 266 267 for (int i = 0; i != 10; ++i) { 268 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 269 assertEquals(ReturnCode.END, include); 270 } 271 for (int i = 0; i != 10; ++i) { 272 ReturnCode include = checker.canTakeOperation(LOC2, 100000); 273 assertEquals(ReturnCode.END, include); 274 } 275 checker.reset(); 276 for (int i = 0; i != 10; ++i) { 277 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 278 assertEquals(ReturnCode.INCLUDE, include); 279 } 280 } 281 282 @Test 283 public void testTaskCountChecker() throws InterruptedIOException { 284 long heapSizeOfRow = 12345; 285 int maxTotalConcurrentTasks = 100; 286 int maxConcurrentTasksPerServer = 2; 287 int maxConcurrentTasksPerRegion = 1; 288 AtomicLong tasksInProgress = new AtomicLong(0); 289 Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 290 Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>(); 291 SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker( 292 maxTotalConcurrentTasks, 293 maxConcurrentTasksPerServer, 294 maxConcurrentTasksPerRegion, 295 tasksInProgress, taskCounterPerServer, taskCounterPerRegion); 296 297 // inner state is unchanged. 298 for (int i = 0; i != 10; ++i) { 299 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 300 assertEquals(ReturnCode.INCLUDE, code); 301 } 302 // add LOC1 region. 303 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 304 assertEquals(ReturnCode.INCLUDE, code); 305 checker.notifyFinal(code, LOC1, heapSizeOfRow); 306 307 // fill the task slots for LOC1. 308 taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100)); 309 taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100)); 310 311 // the region was previously accepted, so it must be accpted now. 312 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 313 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 314 assertEquals(ReturnCode.INCLUDE, includeCode); 315 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 316 } 317 318 // fill the task slots for LOC3. 319 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100)); 320 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 321 322 // no task slots. 323 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 324 ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 325 assertNotEquals(ReturnCode.INCLUDE, excludeCode); 326 checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow); 327 } 328 329 // release the tasks for LOC3. 330 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0)); 331 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 332 333 // add LOC3 region. 334 ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow); 335 assertEquals(ReturnCode.INCLUDE, code3); 336 checker.notifyFinal(code3, LOC3, heapSizeOfRow); 337 338 // the region was previously accepted, so it must be accpted now. 339 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 340 ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 341 assertEquals(ReturnCode.INCLUDE, includeCode); 342 checker.notifyFinal(includeCode, LOC3, heapSizeOfRow); 343 } 344 345 checker.reset(); 346 // the region was previously accepted, 347 // but checker have reseted and task slots for LOC1 is full. 348 // So it must be rejected now. 349 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 350 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 351 assertNotEquals(ReturnCode.INCLUDE, includeCode); 352 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 353 } 354 } 355 356 @Test 357 public void testWaitForMaximumCurrentTasks() throws Exception { 358 final AtomicInteger max = new AtomicInteger(0); 359 final CyclicBarrier barrier = new CyclicBarrier(2); 360 SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create()); 361 final AtomicLong tasks = controller.tasksInProgress; 362 Runnable runnable = () -> { 363 try { 364 barrier.await(); 365 controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null); 366 } catch (InterruptedIOException e) { 367 Assert.fail(e.getMessage()); 368 } catch (InterruptedException | BrokenBarrierException e) { 369 e.printStackTrace(); 370 } 371 }; 372 // First test that our runnable thread only exits when tasks is zero. 373 Thread t = new Thread(runnable); 374 t.start(); 375 barrier.await(); 376 t.join(); 377 // Now assert we stay running if max == zero and tasks is > 0. 378 barrier.reset(); 379 tasks.set(1000000); 380 t = new Thread(runnable); 381 t.start(); 382 barrier.await(); 383 while (tasks.get() > 0) { 384 assertTrue(t.isAlive()); 385 tasks.set(tasks.get() - 1); 386 } 387 t.join(); 388 } 389}