001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeSet; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicLong; 036import java.util.function.Consumer; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdge; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.yetus.audience.InterfaceStability; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Holds back the requests if they reach any thresholds. 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054class SimpleRequestController implements RequestController { 055 private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class); 056 /** 057 * The maximum heap size for each request. 058 */ 059 public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 060 "hbase.client.max.perrequest.heapsize"; 061 062 /** 063 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}. 064 */ 065 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; 066 067 /** 068 * The maximum number of rows for each request. 069 */ 070 public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows"; 071 /** 072 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}. 073 */ 074 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048; 075 076 /** 077 * The maximum size of submit. 078 */ 079 public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; 080 /** 081 * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}. 082 */ 083 static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = 084 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; 085 final AtomicLong tasksInProgress = new AtomicLong(0); 086 final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = 087 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 088 final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>(); 089 /** 090 * The number of tasks simultaneously executed on the cluster. 091 */ 092 private final int maxTotalConcurrentTasks; 093 094 /** 095 * The maximum heap size for each request. 096 */ 097 private final long maxHeapSizePerRequest; 098 /** 099 * The maximum number of rows for each request. 100 */ 101 private final long maxRowsPerRequest; 102 private final long maxHeapSizeSubmit; 103 /** 104 * The number of tasks we run in parallel on a single region. With 1 (the default) , we ensure 105 * that the ordering of the queries is respected: we don't start a set of operations on a region 106 * before the previous one is done. As well, this limits the pressure we put on the region server. 107 */ 108 final int maxConcurrentTasksPerRegion; 109 110 /** 111 * The number of task simultaneously executed on a single region server. 112 */ 113 final int maxConcurrentTasksPerServer; 114 private final int thresholdToLogUndoneTaskDetails; 115 public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 116 "hbase.client.threshold.log.details"; 117 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; 118 public static final String THRESHOLD_TO_LOG_REGION_DETAILS = 119 "hbase.client.threshold.log.region.details"; 120 private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2; 121 private final int thresholdToLogRegionDetails; 122 123 SimpleRequestController(final Configuration conf) { 124 this.maxTotalConcurrentTasks = checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 125 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); 126 this.maxConcurrentTasksPerServer = 127 checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, 128 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); 129 this.maxConcurrentTasksPerRegion = 130 checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, 131 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); 132 this.maxHeapSizePerRequest = checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 133 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 134 this.maxRowsPerRequest = 135 checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_ROWS, DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS); 136 this.maxHeapSizeSubmit = 137 checkAndGet(conf, HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); 138 this.thresholdToLogUndoneTaskDetails = conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, 139 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); 140 this.thresholdToLogRegionDetails = 141 conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS, DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS); 142 } 143 144 private static int checkAndGet(Configuration conf, String key, int defaultValue) { 145 int value = conf.getInt(key, defaultValue); 146 if (value <= 0) { 147 throw new IllegalArgumentException(key + "=" + value); 148 } 149 return value; 150 } 151 152 private static long checkAndGet(Configuration conf, String key, long defaultValue) { 153 long value = conf.getLong(key, defaultValue); 154 if (value <= 0) { 155 throw new IllegalArgumentException(key + "=" + value); 156 } 157 return value; 158 } 159 160 static Checker newChecker(List<RowChecker> checkers) { 161 return new Checker() { 162 private boolean isEnd = false; 163 164 @Override 165 public ReturnCode canTakeRow(HRegionLocation loc, Row row) { 166 if (isEnd) { 167 return ReturnCode.END; 168 } 169 long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0; 170 ReturnCode code = ReturnCode.INCLUDE; 171 for (RowChecker checker : checkers) { 172 switch (checker.canTakeOperation(loc, heapSizeOfRow)) { 173 case END: 174 isEnd = true; 175 code = ReturnCode.END; 176 break; 177 case SKIP: 178 code = ReturnCode.SKIP; 179 break; 180 case INCLUDE: 181 default: 182 break; 183 } 184 if (code == ReturnCode.END) { 185 break; 186 } 187 } 188 for (RowChecker checker : checkers) { 189 checker.notifyFinal(code, loc, heapSizeOfRow); 190 } 191 return code; 192 } 193 194 @Override 195 public void reset() throws InterruptedIOException { 196 isEnd = false; 197 InterruptedIOException e = null; 198 for (RowChecker checker : checkers) { 199 try { 200 checker.reset(); 201 } catch (InterruptedIOException ex) { 202 e = ex; 203 } 204 } 205 if (e != null) { 206 throw e; 207 } 208 } 209 }; 210 } 211 212 @Override 213 public Checker newChecker() { 214 List<RowChecker> checkers = new ArrayList<>(4); 215 checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, maxConcurrentTasksPerServer, 216 maxConcurrentTasksPerRegion, tasksInProgress, taskCounterPerServer, taskCounterPerRegion)); 217 checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest)); 218 checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit)); 219 checkers.add(new RequestRowsChecker(maxRowsPerRequest)); 220 return newChecker(checkers); 221 } 222 223 @Override 224 public void incTaskCounters(Collection<byte[]> regions, ServerName sn) { 225 tasksInProgress.incrementAndGet(); 226 227 computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); 228 229 regions 230 .forEach((regBytes) -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new) 231 .incrementAndGet()); 232 } 233 234 @Override 235 public void decTaskCounters(Collection<byte[]> regions, ServerName sn) { 236 regions.forEach(regBytes -> { 237 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); 238 regionCnt.decrementAndGet(); 239 }); 240 241 taskCounterPerServer.get(sn).decrementAndGet(); 242 tasksInProgress.decrementAndGet(); 243 synchronized (tasksInProgress) { 244 tasksInProgress.notifyAll(); 245 } 246 } 247 248 @Override 249 public long getNumberOfTasksInProgress() { 250 return tasksInProgress.get(); 251 } 252 253 @Override 254 public void waitForMaximumCurrentTasks(long max, long id, int periodToTrigger, 255 Consumer<Long> trigger) throws InterruptedIOException { 256 assert max >= 0; 257 long lastLog = EnvironmentEdgeManager.currentTime(); 258 long currentInProgress, oldInProgress = Long.MAX_VALUE; 259 while ((currentInProgress = tasksInProgress.get()) > max) { 260 if (oldInProgress != currentInProgress) { // Wait for in progress to change. 261 long now = EnvironmentEdgeManager.currentTime(); 262 if (now > lastLog + periodToTrigger) { 263 lastLog = now; 264 if (trigger != null) { 265 trigger.accept(currentInProgress); 266 } 267 logDetailsOfUndoneTasks(currentInProgress); 268 } 269 } 270 oldInProgress = currentInProgress; 271 try { 272 synchronized (tasksInProgress) { 273 if (tasksInProgress.get() == oldInProgress) { 274 tasksInProgress.wait(10); 275 } 276 } 277 } catch (InterruptedException e) { 278 throw new InterruptedIOException( 279 "#" + id + ", interrupted." + " currentNumberOfTask=" + currentInProgress); 280 } 281 } 282 } 283 284 private void logDetailsOfUndoneTasks(long taskInProgress) { 285 if (taskInProgress <= thresholdToLogUndoneTaskDetails) { 286 ArrayList<ServerName> servers = new ArrayList<>(); 287 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) { 288 if (entry.getValue().get() > 0) { 289 servers.add(entry.getKey()); 290 } 291 } 292 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); 293 } 294 295 if (taskInProgress <= thresholdToLogRegionDetails) { 296 ArrayList<String> regions = new ArrayList<>(); 297 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) { 298 if (entry.getValue().get() > 0) { 299 regions.add(Bytes.toString(entry.getKey())); 300 } 301 } 302 LOG.info("Regions against which left over task(s) are processed: " + regions); 303 } 304 } 305 306 @Override 307 public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) 308 throws InterruptedIOException { 309 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger); 310 } 311 312 /** 313 * limit the heapsize of total submitted data. Reduce the limit of heapsize for submitting quickly 314 * if there is no running task. 315 */ 316 static class SubmittedSizeChecker implements RowChecker { 317 318 private final long maxHeapSizeSubmit; 319 private long heapSize = 0; 320 321 SubmittedSizeChecker(final long maxHeapSizeSubmit) { 322 this.maxHeapSizeSubmit = maxHeapSizeSubmit; 323 } 324 325 @Override 326 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 327 if (heapSize >= maxHeapSizeSubmit) { 328 return ReturnCode.END; 329 } 330 return ReturnCode.INCLUDE; 331 } 332 333 @Override 334 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 335 if (code == ReturnCode.INCLUDE) { 336 heapSize += heapSizeOfRow; 337 } 338 } 339 340 @Override 341 public void reset() { 342 heapSize = 0; 343 } 344 } 345 346 /** 347 * limit the max number of tasks in an AsyncProcess. 348 */ 349 static class TaskCountChecker implements RowChecker { 350 351 private static final long MAX_WAITING_TIME = 1000; // ms 352 private final Set<RegionInfo> regionsIncluded = new HashSet<>(); 353 private final Set<ServerName> serversIncluded = new HashSet<>(); 354 private final int maxConcurrentTasksPerRegion; 355 private final int maxTotalConcurrentTasks; 356 private final int maxConcurrentTasksPerServer; 357 private final Map<byte[], AtomicInteger> taskCounterPerRegion; 358 private final Map<ServerName, AtomicInteger> taskCounterPerServer; 359 private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); 360 private final AtomicLong tasksInProgress; 361 362 TaskCountChecker(final int maxTotalConcurrentTasks, final int maxConcurrentTasksPerServer, 363 final int maxConcurrentTasksPerRegion, final AtomicLong tasksInProgress, 364 final Map<ServerName, AtomicInteger> taskCounterPerServer, 365 final Map<byte[], AtomicInteger> taskCounterPerRegion) { 366 this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; 367 this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; 368 this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; 369 this.taskCounterPerRegion = taskCounterPerRegion; 370 this.taskCounterPerServer = taskCounterPerServer; 371 this.tasksInProgress = tasksInProgress; 372 } 373 374 @Override 375 public void reset() throws InterruptedIOException { 376 // prevent the busy-waiting 377 waitForRegion(); 378 regionsIncluded.clear(); 379 serversIncluded.clear(); 380 busyRegions.clear(); 381 } 382 383 private void waitForRegion() throws InterruptedIOException { 384 if (busyRegions.isEmpty()) { 385 return; 386 } 387 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 388 final long start = ee.currentTime(); 389 while ((ee.currentTime() - start) <= MAX_WAITING_TIME) { 390 for (byte[] region : busyRegions) { 391 AtomicInteger count = taskCounterPerRegion.get(region); 392 if (count == null || count.get() < maxConcurrentTasksPerRegion) { 393 return; 394 } 395 } 396 try { 397 synchronized (tasksInProgress) { 398 tasksInProgress.wait(10); 399 } 400 } catch (InterruptedException e) { 401 throw new InterruptedIOException("Interrupted." + " tasksInProgress=" + tasksInProgress); 402 } 403 } 404 } 405 406 /** 407 * 1) check the regions is allowed. 2) check the concurrent tasks for regions. 3) check the 408 * total concurrent tasks. 4) check the concurrent tasks for server. 409 * @param loc the destination of data 410 * @param heapSizeOfRow the data size 411 * @return either Include {@link RequestController.ReturnCode} or skip 412 * {@link RequestController.ReturnCode} 413 */ 414 @Override 415 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 416 RegionInfo regionInfo = loc.getRegion(); 417 if (regionsIncluded.contains(regionInfo)) { 418 // We already know what to do with this region. 419 return ReturnCode.INCLUDE; 420 } 421 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegion().getRegionName()); 422 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { 423 // Too many tasks on this region already. 424 return ReturnCode.SKIP; 425 } 426 int newServers = 427 serversIncluded.size() + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); 428 if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { 429 // Too many tasks. 430 return ReturnCode.SKIP; 431 } 432 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); 433 if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { 434 // Too many tasks for this individual server 435 return ReturnCode.SKIP; 436 } 437 return ReturnCode.INCLUDE; 438 } 439 440 @Override 441 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 442 if (code == ReturnCode.INCLUDE) { 443 regionsIncluded.add(loc.getRegion()); 444 serversIncluded.add(loc.getServerName()); 445 } 446 busyRegions.add(loc.getRegion().getRegionName()); 447 } 448 } 449 450 /** 451 * limit the number of rows for each request. 452 */ 453 static class RequestRowsChecker implements RowChecker { 454 455 private final long maxRowsPerRequest; 456 private final Map<ServerName, Long> serverRows = new HashMap<>(); 457 458 RequestRowsChecker(final long maxRowsPerRequest) { 459 this.maxRowsPerRequest = maxRowsPerRequest; 460 } 461 462 @Override 463 public void reset() { 464 serverRows.clear(); 465 } 466 467 @Override 468 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 469 long currentRows = 470 serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L; 471 // accept at least one row 472 if (currentRows == 0 || currentRows < maxRowsPerRequest) { 473 return ReturnCode.INCLUDE; 474 } 475 return ReturnCode.SKIP; 476 } 477 478 @Override 479 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 480 if (code == ReturnCode.INCLUDE) { 481 long currentRows = 482 serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L; 483 serverRows.put(loc.getServerName(), currentRows + 1); 484 } 485 } 486 } 487 488 /** 489 * limit the heap size for each request. 490 */ 491 static class RequestHeapSizeChecker implements RowChecker { 492 493 private final long maxHeapSizePerRequest; 494 private final Map<ServerName, Long> serverRequestSizes = new HashMap<>(); 495 496 RequestHeapSizeChecker(final long maxHeapSizePerRequest) { 497 this.maxHeapSizePerRequest = maxHeapSizePerRequest; 498 } 499 500 @Override 501 public void reset() { 502 serverRequestSizes.clear(); 503 } 504 505 @Override 506 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 507 // Is it ok for limit of request size? 508 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) 509 ? serverRequestSizes.get(loc.getServerName()) 510 : 0L; 511 // accept at least one request 512 if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) { 513 return ReturnCode.INCLUDE; 514 } 515 return ReturnCode.SKIP; 516 } 517 518 @Override 519 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 520 if (code == ReturnCode.INCLUDE) { 521 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) 522 ? serverRequestSizes.get(loc.getServerName()) 523 : 0L; 524 serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow); 525 } 526 } 527 } 528 529 /** 530 * Provide a way to control the flow of rows iteration. 531 */ 532 interface RowChecker { 533 534 ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow); 535 536 /** 537 * Add the final ReturnCode to the checker. The ReturnCode may be reversed, so the checker need 538 * the final decision to update the inner state. 539 * @param code The final decision 540 * @param loc the destination of data 541 * @param heapSizeOfRow the data size 542 */ 543 void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow); 544 545 /** 546 * Reset the inner state. 547 */ 548 void reset() throws InterruptedIOException; 549 } 550}