001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeSet; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicLong; 036import java.util.function.Consumer; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionInfo; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.apache.yetus.audience.InterfaceStability; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hadoop.hbase.util.Bytes; 048import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 049import org.apache.hadoop.hbase.util.EnvironmentEdge; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051 052/** 053 * Holds back the requests if they reach any thresholds. 054 */ 055@InterfaceAudience.Private 056@InterfaceStability.Evolving 057class SimpleRequestController implements RequestController { 058 private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class); 059 /** 060 * The maximum heap size for each request. 061 */ 062 public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize"; 063 064 /** 065 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}. 066 */ 067 @VisibleForTesting 068 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; 069 070 /** 071 * The maximum number of rows for each request. 072 */ 073 public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows"; 074 /** 075 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}. 076 */ 077 @VisibleForTesting 078 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048; 079 080 /** 081 * The maximum size of submit. 082 */ 083 public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; 084 /** 085 * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}. 086 */ 087 @VisibleForTesting 088 static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; 089 @VisibleForTesting 090 final AtomicLong tasksInProgress = new AtomicLong(0); 091 @VisibleForTesting 092 final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion 093 = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 094 @VisibleForTesting 095 final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>(); 096 /** 097 * The number of tasks simultaneously executed on the cluster. 098 */ 099 private final int maxTotalConcurrentTasks; 100 101 /** 102 * The maximum heap size for each request. 103 */ 104 private final long maxHeapSizePerRequest; 105 /** 106 * The maximum number of rows for each request. 107 */ 108 private final long maxRowsPerRequest; 109 private final long maxHeapSizeSubmit; 110 /** 111 * The number of tasks we run in parallel on a single region. With 1 (the 112 * default) , we ensure that the ordering of the queries is respected: we 113 * don't start a set of operations on a region before the previous one is 114 * done. As well, this limits the pressure we put on the region server. 115 */ 116 @VisibleForTesting 117 final int maxConcurrentTasksPerRegion; 118 119 /** 120 * The number of task simultaneously executed on a single region server. 121 */ 122 @VisibleForTesting 123 final int maxConcurrentTasksPerServer; 124 private final int thresholdToLogUndoneTaskDetails; 125 public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 126 "hbase.client.threshold.log.details"; 127 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; 128 public static final String THRESHOLD_TO_LOG_REGION_DETAILS = 129 "hbase.client.threshold.log.region.details"; 130 private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2; 131 private final int thresholdToLogRegionDetails; 132 SimpleRequestController(final Configuration conf) { 133 this.maxTotalConcurrentTasks = checkAndGet(conf, 134 HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 135 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); 136 this.maxConcurrentTasksPerServer = checkAndGet(conf, 137 HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, 138 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); 139 this.maxConcurrentTasksPerRegion = checkAndGet(conf, 140 HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, 141 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); 142 this.maxHeapSizePerRequest = checkAndGet(conf, 143 HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 144 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 145 this.maxRowsPerRequest = checkAndGet(conf, 146 HBASE_CLIENT_MAX_PERREQUEST_ROWS, 147 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS); 148 this.maxHeapSizeSubmit = checkAndGet(conf, 149 HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, 150 DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); 151 this.thresholdToLogUndoneTaskDetails = conf.getInt( 152 THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, 153 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); 154 this.thresholdToLogRegionDetails = conf.getInt( 155 THRESHOLD_TO_LOG_REGION_DETAILS, 156 DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS); 157 } 158 159 private static int checkAndGet(Configuration conf, String key, int defaultValue) { 160 int value = conf.getInt(key, defaultValue); 161 if (value <= 0) { 162 throw new IllegalArgumentException(key + "=" + value); 163 } 164 return value; 165 } 166 167 private static long checkAndGet(Configuration conf, String key, long defaultValue) { 168 long value = conf.getLong(key, defaultValue); 169 if (value <= 0) { 170 throw new IllegalArgumentException(key + "=" + value); 171 } 172 return value; 173 } 174 175 @VisibleForTesting 176 static Checker newChecker(List<RowChecker> checkers) { 177 return new Checker() { 178 private boolean isEnd = false; 179 180 @Override 181 public ReturnCode canTakeRow(HRegionLocation loc, Row row) { 182 if (isEnd) { 183 return ReturnCode.END; 184 } 185 long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0; 186 ReturnCode code = ReturnCode.INCLUDE; 187 for (RowChecker checker : checkers) { 188 switch (checker.canTakeOperation(loc, heapSizeOfRow)) { 189 case END: 190 isEnd = true; 191 code = ReturnCode.END; 192 break; 193 case SKIP: 194 code = ReturnCode.SKIP; 195 break; 196 case INCLUDE: 197 default: 198 break; 199 } 200 if (code == ReturnCode.END) { 201 break; 202 } 203 } 204 for (RowChecker checker : checkers) { 205 checker.notifyFinal(code, loc, heapSizeOfRow); 206 } 207 return code; 208 } 209 210 @Override 211 public void reset() throws InterruptedIOException { 212 isEnd = false; 213 InterruptedIOException e = null; 214 for (RowChecker checker : checkers) { 215 try { 216 checker.reset(); 217 } catch (InterruptedIOException ex) { 218 e = ex; 219 } 220 } 221 if (e != null) { 222 throw e; 223 } 224 } 225 }; 226 } 227 228 @Override 229 public Checker newChecker() { 230 List<RowChecker> checkers = new ArrayList<>(4); 231 checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, 232 maxConcurrentTasksPerServer, 233 maxConcurrentTasksPerRegion, 234 tasksInProgress, 235 taskCounterPerServer, 236 taskCounterPerRegion)); 237 checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest)); 238 checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit)); 239 checkers.add(new RequestRowsChecker(maxRowsPerRequest)); 240 return newChecker(checkers); 241 } 242 243 @Override 244 public void incTaskCounters(Collection<byte[]> regions, ServerName sn) { 245 tasksInProgress.incrementAndGet(); 246 247 computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); 248 249 regions.forEach((regBytes) 250 -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet() 251 ); 252 } 253 254 @Override 255 public void decTaskCounters(Collection<byte[]> regions, ServerName sn) { 256 regions.forEach(regBytes -> { 257 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); 258 regionCnt.decrementAndGet(); 259 }); 260 261 taskCounterPerServer.get(sn).decrementAndGet(); 262 tasksInProgress.decrementAndGet(); 263 synchronized (tasksInProgress) { 264 tasksInProgress.notifyAll(); 265 } 266 } 267 268 @Override 269 public long getNumberOfTasksInProgress() { 270 return tasksInProgress.get(); 271 } 272 273 @Override 274 public void waitForMaximumCurrentTasks(long max, long id, 275 int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException { 276 assert max >= 0; 277 long lastLog = EnvironmentEdgeManager.currentTime(); 278 long currentInProgress, oldInProgress = Long.MAX_VALUE; 279 while ((currentInProgress = tasksInProgress.get()) > max) { 280 if (oldInProgress != currentInProgress) { // Wait for in progress to change. 281 long now = EnvironmentEdgeManager.currentTime(); 282 if (now > lastLog + periodToTrigger) { 283 lastLog = now; 284 if (trigger != null) { 285 trigger.accept(currentInProgress); 286 } 287 logDetailsOfUndoneTasks(currentInProgress); 288 } 289 } 290 oldInProgress = currentInProgress; 291 try { 292 synchronized (tasksInProgress) { 293 if (tasksInProgress.get() == oldInProgress) { 294 tasksInProgress.wait(10); 295 } 296 } 297 } catch (InterruptedException e) { 298 throw new InterruptedIOException("#" + id + ", interrupted." + 299 " currentNumberOfTask=" + currentInProgress); 300 } 301 } 302 } 303 304 private void logDetailsOfUndoneTasks(long taskInProgress) { 305 if (taskInProgress <= thresholdToLogUndoneTaskDetails) { 306 ArrayList<ServerName> servers = new ArrayList<>(); 307 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) { 308 if (entry.getValue().get() > 0) { 309 servers.add(entry.getKey()); 310 } 311 } 312 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); 313 } 314 315 if (taskInProgress <= thresholdToLogRegionDetails) { 316 ArrayList<String> regions = new ArrayList<>(); 317 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) { 318 if (entry.getValue().get() > 0) { 319 regions.add(Bytes.toString(entry.getKey())); 320 } 321 } 322 LOG.info("Regions against which left over task(s) are processed: " + regions); 323 } 324 } 325 326 @Override 327 public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException { 328 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger); 329 } 330 331 /** 332 * limit the heapsize of total submitted data. Reduce the limit of heapsize 333 * for submitting quickly if there is no running task. 334 */ 335 @VisibleForTesting 336 static class SubmittedSizeChecker implements RowChecker { 337 338 private final long maxHeapSizeSubmit; 339 private long heapSize = 0; 340 341 SubmittedSizeChecker(final long maxHeapSizeSubmit) { 342 this.maxHeapSizeSubmit = maxHeapSizeSubmit; 343 } 344 345 @Override 346 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 347 if (heapSize >= maxHeapSizeSubmit) { 348 return ReturnCode.END; 349 } 350 return ReturnCode.INCLUDE; 351 } 352 353 @Override 354 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 355 if (code == ReturnCode.INCLUDE) { 356 heapSize += heapSizeOfRow; 357 } 358 } 359 360 @Override 361 public void reset() { 362 heapSize = 0; 363 } 364 } 365 366 /** 367 * limit the max number of tasks in an AsyncProcess. 368 */ 369 @VisibleForTesting 370 static class TaskCountChecker implements RowChecker { 371 372 private static final long MAX_WAITING_TIME = 1000; //ms 373 private final Set<HRegionInfo> regionsIncluded = new HashSet<>(); 374 private final Set<ServerName> serversIncluded = new HashSet<>(); 375 private final int maxConcurrentTasksPerRegion; 376 private final int maxTotalConcurrentTasks; 377 private final int maxConcurrentTasksPerServer; 378 private final Map<byte[], AtomicInteger> taskCounterPerRegion; 379 private final Map<ServerName, AtomicInteger> taskCounterPerServer; 380 private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); 381 private final AtomicLong tasksInProgress; 382 383 TaskCountChecker(final int maxTotalConcurrentTasks, 384 final int maxConcurrentTasksPerServer, 385 final int maxConcurrentTasksPerRegion, 386 final AtomicLong tasksInProgress, 387 final Map<ServerName, AtomicInteger> taskCounterPerServer, 388 final Map<byte[], AtomicInteger> taskCounterPerRegion) { 389 this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; 390 this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; 391 this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; 392 this.taskCounterPerRegion = taskCounterPerRegion; 393 this.taskCounterPerServer = taskCounterPerServer; 394 this.tasksInProgress = tasksInProgress; 395 } 396 397 @Override 398 public void reset() throws InterruptedIOException { 399 // prevent the busy-waiting 400 waitForRegion(); 401 regionsIncluded.clear(); 402 serversIncluded.clear(); 403 busyRegions.clear(); 404 } 405 406 private void waitForRegion() throws InterruptedIOException { 407 if (busyRegions.isEmpty()) { 408 return; 409 } 410 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 411 final long start = ee.currentTime(); 412 while ((ee.currentTime() - start) <= MAX_WAITING_TIME) { 413 for (byte[] region : busyRegions) { 414 AtomicInteger count = taskCounterPerRegion.get(region); 415 if (count == null || count.get() < maxConcurrentTasksPerRegion) { 416 return; 417 } 418 } 419 try { 420 synchronized (tasksInProgress) { 421 tasksInProgress.wait(10); 422 } 423 } catch (InterruptedException e) { 424 throw new InterruptedIOException("Interrupted." 425 + " tasksInProgress=" + tasksInProgress); 426 } 427 } 428 } 429 430 /** 431 * 1) check the regions is allowed. 2) check the concurrent tasks for 432 * regions. 3) check the total concurrent tasks. 4) check the concurrent 433 * tasks for server. 434 * 435 * @param loc the destination of data 436 * @param heapSizeOfRow the data size 437 * @return either Include {@link RequestController.ReturnCode} or skip 438 * {@link RequestController.ReturnCode} 439 */ 440 @Override 441 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 442 443 HRegionInfo regionInfo = loc.getRegionInfo(); 444 if (regionsIncluded.contains(regionInfo)) { 445 // We already know what to do with this region. 446 return ReturnCode.INCLUDE; 447 } 448 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); 449 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { 450 // Too many tasks on this region already. 451 return ReturnCode.SKIP; 452 } 453 int newServers = serversIncluded.size() 454 + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); 455 if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { 456 // Too many tasks. 457 return ReturnCode.SKIP; 458 } 459 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); 460 if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { 461 // Too many tasks for this individual server 462 return ReturnCode.SKIP; 463 } 464 return ReturnCode.INCLUDE; 465 } 466 467 @Override 468 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 469 if (code == ReturnCode.INCLUDE) { 470 regionsIncluded.add(loc.getRegionInfo()); 471 serversIncluded.add(loc.getServerName()); 472 } 473 busyRegions.add(loc.getRegionInfo().getRegionName()); 474 } 475 } 476 477 /** 478 * limit the number of rows for each request. 479 */ 480 @VisibleForTesting 481 static class RequestRowsChecker implements RowChecker { 482 483 private final long maxRowsPerRequest; 484 private final Map<ServerName, Long> serverRows = new HashMap<>(); 485 486 RequestRowsChecker(final long maxRowsPerRequest) { 487 this.maxRowsPerRequest = maxRowsPerRequest; 488 } 489 490 @Override 491 public void reset() { 492 serverRows.clear(); 493 } 494 495 @Override 496 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 497 long currentRows = serverRows.containsKey(loc.getServerName()) 498 ? serverRows.get(loc.getServerName()) : 0L; 499 // accept at least one row 500 if (currentRows == 0 || currentRows < maxRowsPerRequest) { 501 return ReturnCode.INCLUDE; 502 } 503 return ReturnCode.SKIP; 504 } 505 506 @Override 507 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 508 if (code == ReturnCode.INCLUDE) { 509 long currentRows = serverRows.containsKey(loc.getServerName()) 510 ? serverRows.get(loc.getServerName()) : 0L; 511 serverRows.put(loc.getServerName(), currentRows + 1); 512 } 513 } 514 } 515 516 /** 517 * limit the heap size for each request. 518 */ 519 @VisibleForTesting 520 static class RequestHeapSizeChecker implements RowChecker { 521 522 private final long maxHeapSizePerRequest; 523 private final Map<ServerName, Long> serverRequestSizes = new HashMap<>(); 524 525 RequestHeapSizeChecker(final long maxHeapSizePerRequest) { 526 this.maxHeapSizePerRequest = maxHeapSizePerRequest; 527 } 528 529 @Override 530 public void reset() { 531 serverRequestSizes.clear(); 532 } 533 534 @Override 535 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 536 // Is it ok for limit of request size? 537 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) 538 ? serverRequestSizes.get(loc.getServerName()) : 0L; 539 // accept at least one request 540 if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) { 541 return ReturnCode.INCLUDE; 542 } 543 return ReturnCode.SKIP; 544 } 545 546 @Override 547 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 548 if (code == ReturnCode.INCLUDE) { 549 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) 550 ? serverRequestSizes.get(loc.getServerName()) : 0L; 551 serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow); 552 } 553 } 554 } 555 556 /** 557 * Provide a way to control the flow of rows iteration. 558 */ 559 @VisibleForTesting 560 interface RowChecker { 561 562 ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow); 563 564 /** 565 * Add the final ReturnCode to the checker. The ReturnCode may be reversed, 566 * so the checker need the final decision to update the inner state. 567 * 568 * @param code The final decision 569 * @param loc the destination of data 570 * @param heapSizeOfRow the data size 571 */ 572 void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow); 573 574 /** 575 * Reset the inner state. 576 */ 577 void reset() throws InterruptedIOException; 578 } 579}