001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.concurrent.BrokenBarrierException; 032import java.util.concurrent.CyclicBarrier; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047 048@Tag(ClientTests.TAG) 049@Tag(SmallTests.TAG) 050public class TestSimpleRequestController { 051 052 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 053 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 054 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 055 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 056 private static final ServerName SN = ServerName.valueOf("s1,1,1"); 057 private static final ServerName SN2 = ServerName.valueOf("s2,2,2"); 058 private static final RegionInfo HRI1 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 059 .setStartKey(DUMMY_BYTES_1).setEndKey(DUMMY_BYTES_2).setRegionId(1).build(); 060 private static final RegionInfo HRI2 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 061 .setStartKey(DUMMY_BYTES_2).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(2).build(); 062 private static final RegionInfo HRI3 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 063 .setStartKey(DUMMY_BYTES_3).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(3).build(); 064 private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN); 065 private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN); 066 private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2); 067 068 @Test 069 public void testIllegalRequestHeapSize() { 070 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); 071 } 072 073 @Test 074 public void testIllegalRsTasks() { 075 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1); 076 } 077 078 @Test 079 public void testIllegalRegionTasks() { 080 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1); 081 } 082 083 @Test 084 public void testIllegalSubmittedSize() { 085 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1); 086 } 087 088 @Test 089 public void testIllegalRequestRows() { 090 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1); 091 } 092 093 private void testIllegalArgument(String key, long value) { 094 Configuration conf = HBaseConfiguration.create(); 095 conf.setLong(key, value); 096 try { 097 new SimpleRequestController(conf); 098 fail("The " + key + " must be bigger than zero"); 099 } catch (IllegalArgumentException e) { 100 // Expected 101 } 102 } 103 104 private static Put createPut(long maxHeapSizePerRequest) { 105 return new Put(Bytes.toBytes("row")) { 106 @Override 107 public long heapSize() { 108 return maxHeapSizePerRequest; 109 } 110 }; 111 } 112 113 @Test 114 public void testTaskCheckerHost() throws IOException { 115 final int maxTotalConcurrentTasks = 100; 116 final int maxConcurrentTasksPerServer = 2; 117 final int maxConcurrentTasksPerRegion = 1; 118 final AtomicLong tasksInProgress = new AtomicLong(0); 119 final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 120 final Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR); 121 SimpleRequestController.TaskCountChecker countChecker = 122 new SimpleRequestController.TaskCountChecker(maxTotalConcurrentTasks, 123 maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, tasksInProgress, 124 taskCounterPerServer, taskCounterPerRegion); 125 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 126 // unlimiited 127 SimpleRequestController.RequestHeapSizeChecker sizeChecker = 128 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 129 RequestController.Checker checker = 130 SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker)); 131 ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 132 assertEquals(ReturnCode.INCLUDE, loc1Code); 133 134 ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 135 // rejected for size 136 assertNotEquals(ReturnCode.INCLUDE, loc1Code_2); 137 138 ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest)); 139 // rejected for size 140 assertNotEquals(ReturnCode.INCLUDE, loc2Code); 141 142 // fill the task slots for LOC3. 143 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100)); 144 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 145 146 ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L)); 147 // rejected for count 148 assertNotEquals(ReturnCode.INCLUDE, loc3Code); 149 150 // release the task slots for LOC3. 151 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0)); 152 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 153 154 ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L)); 155 assertEquals(ReturnCode.INCLUDE, loc3Code_2); 156 } 157 158 @Test 159 public void testRequestHeapSizeChecker() throws IOException { 160 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 161 SimpleRequestController.RequestHeapSizeChecker checker = 162 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 163 164 // inner state is unchanged. 165 for (int i = 0; i != 10; ++i) { 166 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 167 assertEquals(ReturnCode.INCLUDE, code); 168 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 169 assertEquals(ReturnCode.INCLUDE, code); 170 } 171 172 // accept the data located on LOC1 region. 173 ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 174 assertEquals(ReturnCode.INCLUDE, acceptCode); 175 checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest); 176 177 // the sn server reachs the limit. 178 for (int i = 0; i != 10; ++i) { 179 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 180 assertNotEquals(ReturnCode.INCLUDE, code); 181 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 182 assertNotEquals(ReturnCode.INCLUDE, code); 183 } 184 185 // the request to sn2 server should be accepted. 186 for (int i = 0; i != 10; ++i) { 187 ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest); 188 assertEquals(ReturnCode.INCLUDE, code); 189 } 190 191 checker.reset(); 192 for (int i = 0; i != 10; ++i) { 193 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 194 assertEquals(ReturnCode.INCLUDE, code); 195 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 196 assertEquals(ReturnCode.INCLUDE, code); 197 } 198 } 199 200 @Test 201 public void testRequestRowsChecker() throws IOException { 202 final long maxRowCount = 100; 203 SimpleRequestController.RequestRowsChecker checker = 204 new SimpleRequestController.RequestRowsChecker(maxRowCount); 205 206 final long heapSizeOfRow = 100; // unused 207 // inner state is unchanged. 208 for (int i = 0; i != 10; ++i) { 209 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 210 assertEquals(ReturnCode.INCLUDE, code); 211 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 212 assertEquals(ReturnCode.INCLUDE, code); 213 } 214 215 // accept the data located on LOC1 region. 216 for (int i = 0; i != maxRowCount; ++i) { 217 ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 218 assertEquals(ReturnCode.INCLUDE, acceptCode); 219 checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow); 220 } 221 222 // the sn server reachs the limit. 223 for (int i = 0; i != 10; ++i) { 224 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 225 assertNotEquals(ReturnCode.INCLUDE, code); 226 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 227 assertNotEquals(ReturnCode.INCLUDE, code); 228 } 229 230 // the request to sn2 server should be accepted. 231 for (int i = 0; i != 10; ++i) { 232 ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow); 233 assertEquals(ReturnCode.INCLUDE, code); 234 } 235 236 checker.reset(); 237 for (int i = 0; i != 10; ++i) { 238 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 239 assertEquals(ReturnCode.INCLUDE, code); 240 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 241 assertEquals(ReturnCode.INCLUDE, code); 242 } 243 } 244 245 @Test 246 public void testSubmittedSizeChecker() { 247 final long maxHeapSizeSubmit = 2 * 1024 * 1024; 248 SimpleRequestController.SubmittedSizeChecker checker = 249 new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit); 250 251 for (int i = 0; i != 10; ++i) { 252 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 253 assertEquals(ReturnCode.INCLUDE, include); 254 } 255 256 for (int i = 0; i != 10; ++i) { 257 checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit); 258 } 259 260 for (int i = 0; i != 10; ++i) { 261 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 262 assertEquals(ReturnCode.END, include); 263 } 264 for (int i = 0; i != 10; ++i) { 265 ReturnCode include = checker.canTakeOperation(LOC2, 100000); 266 assertEquals(ReturnCode.END, include); 267 } 268 checker.reset(); 269 for (int i = 0; i != 10; ++i) { 270 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 271 assertEquals(ReturnCode.INCLUDE, include); 272 } 273 } 274 275 @Test 276 public void testTaskCountChecker() throws InterruptedIOException { 277 long heapSizeOfRow = 12345; 278 int maxTotalConcurrentTasks = 100; 279 int maxConcurrentTasksPerServer = 2; 280 int maxConcurrentTasksPerRegion = 1; 281 AtomicLong tasksInProgress = new AtomicLong(0); 282 Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 283 Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR); 284 SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker( 285 maxTotalConcurrentTasks, maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, 286 tasksInProgress, taskCounterPerServer, taskCounterPerRegion); 287 288 // inner state is unchanged. 289 for (int i = 0; i != 10; ++i) { 290 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 291 assertEquals(ReturnCode.INCLUDE, code); 292 } 293 // add LOC1 region. 294 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 295 assertEquals(ReturnCode.INCLUDE, code); 296 checker.notifyFinal(code, LOC1, heapSizeOfRow); 297 298 // fill the task slots for LOC1. 299 taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100)); 300 taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100)); 301 302 // the region was previously accepted, so it must be accpted now. 303 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 304 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 305 assertEquals(ReturnCode.INCLUDE, includeCode); 306 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 307 } 308 309 // fill the task slots for LOC3. 310 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100)); 311 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 312 313 // no task slots. 314 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 315 ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 316 assertNotEquals(ReturnCode.INCLUDE, excludeCode); 317 checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow); 318 } 319 320 // release the tasks for LOC3. 321 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0)); 322 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 323 324 // add LOC3 region. 325 ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow); 326 assertEquals(ReturnCode.INCLUDE, code3); 327 checker.notifyFinal(code3, LOC3, heapSizeOfRow); 328 329 // the region was previously accepted, so it must be accpted now. 330 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 331 ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 332 assertEquals(ReturnCode.INCLUDE, includeCode); 333 checker.notifyFinal(includeCode, LOC3, heapSizeOfRow); 334 } 335 336 checker.reset(); 337 // the region was previously accepted, 338 // but checker have reseted and task slots for LOC1 is full. 339 // So it must be rejected now. 340 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 341 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 342 assertNotEquals(ReturnCode.INCLUDE, includeCode); 343 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 344 } 345 } 346 347 @Test 348 public void testWaitForMaximumCurrentTasks() throws Exception { 349 final AtomicInteger max = new AtomicInteger(0); 350 final CyclicBarrier barrier = new CyclicBarrier(2); 351 SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create()); 352 final AtomicLong tasks = controller.tasksInProgress; 353 Runnable runnable = () -> { 354 try { 355 barrier.await(); 356 controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null); 357 } catch (InterruptedIOException | InterruptedException | BrokenBarrierException e) { 358 fail(e.getMessage()); 359 } 360 }; 361 // First test that our runnable thread only exits when tasks is zero. 362 Thread t = new Thread(runnable); 363 t.start(); 364 barrier.await(); 365 t.join(); 366 // Now assert we stay running if max == zero and tasks is > 0. 367 barrier.reset(); 368 tasks.set(1000000); 369 t = new Thread(runnable); 370 t.start(); 371 barrier.await(); 372 while (tasks.get() > 0) { 373 assertTrue(t.isAlive()); 374 tasks.set(tasks.get() - 1); 375 } 376 t.join(); 377 } 378}