001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.function.Consumer;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.EnvironmentEdge;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.yetus.audience.InterfaceStability;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Holds back the requests if they reach any thresholds.
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054class SimpleRequestController implements RequestController {
055  private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class);
056  /**
057   * The maximum heap size for each request.
058   */
059  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE =
060    "hbase.client.max.perrequest.heapsize";
061
062  /**
063   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
064   */
065  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
066
067  /**
068   * The maximum number of rows for each request.
069   */
070  public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
071  /**
072   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
073   */
074  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
075
076  /**
077   * The maximum size of submit.
078   */
079  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
080  /**
081   * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
082   */
083  static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE =
084    DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
085  final AtomicLong tasksInProgress = new AtomicLong(0);
086  final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
087    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
088  final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
089  /**
090   * The number of tasks simultaneously executed on the cluster.
091   */
092  private final int maxTotalConcurrentTasks;
093
094  /**
095   * The maximum heap size for each request.
096   */
097  private final long maxHeapSizePerRequest;
098  /**
099   * The maximum number of rows for each request.
100   */
101  private final long maxRowsPerRequest;
102  private final long maxHeapSizeSubmit;
103  /**
104   * The number of tasks we run in parallel on a single region. With 1 (the default) , we ensure
105   * that the ordering of the queries is respected: we don't start a set of operations on a region
106   * before the previous one is done. As well, this limits the pressure we put on the region server.
107   */
108  final int maxConcurrentTasksPerRegion;
109
110  /**
111   * The number of task simultaneously executed on a single region server.
112   */
113  final int maxConcurrentTasksPerServer;
114  private final int thresholdToLogUndoneTaskDetails;
115  public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
116    "hbase.client.threshold.log.details";
117  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
118  public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
119    "hbase.client.threshold.log.region.details";
120  private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
121  private final int thresholdToLogRegionDetails;
122
123  SimpleRequestController(final Configuration conf) {
124    this.maxTotalConcurrentTasks = checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
125      HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
126    this.maxConcurrentTasksPerServer =
127      checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
128        HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
129    this.maxConcurrentTasksPerRegion =
130      checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
131        HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
132    this.maxHeapSizePerRequest = checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
133      DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
134    this.maxRowsPerRequest =
135      checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_ROWS, DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
136    this.maxHeapSizeSubmit =
137      checkAndGet(conf, HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
138    this.thresholdToLogUndoneTaskDetails = conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
139      DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
140    this.thresholdToLogRegionDetails =
141      conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS, DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
142  }
143
144  private static int checkAndGet(Configuration conf, String key, int defaultValue) {
145    int value = conf.getInt(key, defaultValue);
146    if (value <= 0) {
147      throw new IllegalArgumentException(key + "=" + value);
148    }
149    return value;
150  }
151
152  private static long checkAndGet(Configuration conf, String key, long defaultValue) {
153    long value = conf.getLong(key, defaultValue);
154    if (value <= 0) {
155      throw new IllegalArgumentException(key + "=" + value);
156    }
157    return value;
158  }
159
160  static Checker newChecker(List<RowChecker> checkers) {
161    return new Checker() {
162      private boolean isEnd = false;
163
164      @Override
165      public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
166        if (isEnd) {
167          return ReturnCode.END;
168        }
169        long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
170        ReturnCode code = ReturnCode.INCLUDE;
171        for (RowChecker checker : checkers) {
172          switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
173            case END:
174              isEnd = true;
175              code = ReturnCode.END;
176              break;
177            case SKIP:
178              code = ReturnCode.SKIP;
179              break;
180            case INCLUDE:
181            default:
182              break;
183          }
184          if (code == ReturnCode.END) {
185            break;
186          }
187        }
188        for (RowChecker checker : checkers) {
189          checker.notifyFinal(code, loc, heapSizeOfRow);
190        }
191        return code;
192      }
193
194      @Override
195      public void reset() throws InterruptedIOException {
196        isEnd = false;
197        InterruptedIOException e = null;
198        for (RowChecker checker : checkers) {
199          try {
200            checker.reset();
201          } catch (InterruptedIOException ex) {
202            e = ex;
203          }
204        }
205        if (e != null) {
206          throw e;
207        }
208      }
209    };
210  }
211
212  @Override
213  public Checker newChecker() {
214    List<RowChecker> checkers = new ArrayList<>(4);
215    checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, maxConcurrentTasksPerServer,
216      maxConcurrentTasksPerRegion, tasksInProgress, taskCounterPerServer, taskCounterPerRegion));
217    checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
218    checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
219    checkers.add(new RequestRowsChecker(maxRowsPerRequest));
220    return newChecker(checkers);
221  }
222
223  @Override
224  public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
225    tasksInProgress.incrementAndGet();
226
227    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
228
229    regions
230      .forEach((regBytes) -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new)
231        .incrementAndGet());
232  }
233
234  @Override
235  public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
236    regions.forEach(regBytes -> {
237      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
238      regionCnt.decrementAndGet();
239    });
240
241    taskCounterPerServer.get(sn).decrementAndGet();
242    tasksInProgress.decrementAndGet();
243    synchronized (tasksInProgress) {
244      tasksInProgress.notifyAll();
245    }
246  }
247
248  @Override
249  public long getNumberOfTasksInProgress() {
250    return tasksInProgress.get();
251  }
252
253  @Override
254  public void waitForMaximumCurrentTasks(long max, long id, int periodToTrigger,
255    Consumer<Long> trigger) throws InterruptedIOException {
256    assert max >= 0;
257    long lastLog = EnvironmentEdgeManager.currentTime();
258    long currentInProgress, oldInProgress = Long.MAX_VALUE;
259    while ((currentInProgress = tasksInProgress.get()) > max) {
260      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
261        long now = EnvironmentEdgeManager.currentTime();
262        if (now > lastLog + periodToTrigger) {
263          lastLog = now;
264          if (trigger != null) {
265            trigger.accept(currentInProgress);
266          }
267          logDetailsOfUndoneTasks(currentInProgress);
268        }
269      }
270      oldInProgress = currentInProgress;
271      try {
272        synchronized (tasksInProgress) {
273          if (tasksInProgress.get() == oldInProgress) {
274            tasksInProgress.wait(10);
275          }
276        }
277      } catch (InterruptedException e) {
278        throw new InterruptedIOException(
279          "#" + id + ", interrupted." + " currentNumberOfTask=" + currentInProgress);
280      }
281    }
282  }
283
284  private void logDetailsOfUndoneTasks(long taskInProgress) {
285    if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
286      ArrayList<ServerName> servers = new ArrayList<>();
287      for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
288        if (entry.getValue().get() > 0) {
289          servers.add(entry.getKey());
290        }
291      }
292      LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
293    }
294
295    if (taskInProgress <= thresholdToLogRegionDetails) {
296      ArrayList<String> regions = new ArrayList<>();
297      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
298        if (entry.getValue().get() > 0) {
299          regions.add(Bytes.toString(entry.getKey()));
300        }
301      }
302      LOG.info("Regions against which left over task(s) are processed: " + regions);
303    }
304  }
305
306  @Override
307  public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger)
308    throws InterruptedIOException {
309    waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
310  }
311
312  /**
313   * limit the heapsize of total submitted data. Reduce the limit of heapsize for submitting quickly
314   * if there is no running task.
315   */
316  static class SubmittedSizeChecker implements RowChecker {
317
318    private final long maxHeapSizeSubmit;
319    private long heapSize = 0;
320
321    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
322      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
323    }
324
325    @Override
326    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
327      if (heapSize >= maxHeapSizeSubmit) {
328        return ReturnCode.END;
329      }
330      return ReturnCode.INCLUDE;
331    }
332
333    @Override
334    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
335      if (code == ReturnCode.INCLUDE) {
336        heapSize += heapSizeOfRow;
337      }
338    }
339
340    @Override
341    public void reset() {
342      heapSize = 0;
343    }
344  }
345
346  /**
347   * limit the max number of tasks in an AsyncProcess.
348   */
349  static class TaskCountChecker implements RowChecker {
350
351    private static final long MAX_WAITING_TIME = 1000; // ms
352    private final Set<RegionInfo> regionsIncluded = new HashSet<>();
353    private final Set<ServerName> serversIncluded = new HashSet<>();
354    private final int maxConcurrentTasksPerRegion;
355    private final int maxTotalConcurrentTasks;
356    private final int maxConcurrentTasksPerServer;
357    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
358    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
359    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
360    private final AtomicLong tasksInProgress;
361
362    TaskCountChecker(final int maxTotalConcurrentTasks, final int maxConcurrentTasksPerServer,
363      final int maxConcurrentTasksPerRegion, final AtomicLong tasksInProgress,
364      final Map<ServerName, AtomicInteger> taskCounterPerServer,
365      final Map<byte[], AtomicInteger> taskCounterPerRegion) {
366      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
367      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
368      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
369      this.taskCounterPerRegion = taskCounterPerRegion;
370      this.taskCounterPerServer = taskCounterPerServer;
371      this.tasksInProgress = tasksInProgress;
372    }
373
374    @Override
375    public void reset() throws InterruptedIOException {
376      // prevent the busy-waiting
377      waitForRegion();
378      regionsIncluded.clear();
379      serversIncluded.clear();
380      busyRegions.clear();
381    }
382
383    private void waitForRegion() throws InterruptedIOException {
384      if (busyRegions.isEmpty()) {
385        return;
386      }
387      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
388      final long start = ee.currentTime();
389      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
390        for (byte[] region : busyRegions) {
391          AtomicInteger count = taskCounterPerRegion.get(region);
392          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
393            return;
394          }
395        }
396        try {
397          synchronized (tasksInProgress) {
398            tasksInProgress.wait(10);
399          }
400        } catch (InterruptedException e) {
401          throw new InterruptedIOException("Interrupted." + " tasksInProgress=" + tasksInProgress);
402        }
403      }
404    }
405
406    /**
407     * 1) check the regions is allowed. 2) check the concurrent tasks for regions. 3) check the
408     * total concurrent tasks. 4) check the concurrent tasks for server.
409     * @param loc           the destination of data
410     * @param heapSizeOfRow the data size
411     * @return either Include {@link RequestController.ReturnCode} or skip
412     *         {@link RequestController.ReturnCode}
413     */
414    @Override
415    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
416      RegionInfo regionInfo = loc.getRegion();
417      if (regionsIncluded.contains(regionInfo)) {
418        // We already know what to do with this region.
419        return ReturnCode.INCLUDE;
420      }
421      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegion().getRegionName());
422      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
423        // Too many tasks on this region already.
424        return ReturnCode.SKIP;
425      }
426      int newServers =
427        serversIncluded.size() + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
428      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
429        // Too many tasks.
430        return ReturnCode.SKIP;
431      }
432      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
433      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
434        // Too many tasks for this individual server
435        return ReturnCode.SKIP;
436      }
437      return ReturnCode.INCLUDE;
438    }
439
440    @Override
441    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
442      if (code == ReturnCode.INCLUDE) {
443        regionsIncluded.add(loc.getRegion());
444        serversIncluded.add(loc.getServerName());
445      }
446      busyRegions.add(loc.getRegion().getRegionName());
447    }
448  }
449
450  /**
451   * limit the number of rows for each request.
452   */
453  static class RequestRowsChecker implements RowChecker {
454
455    private final long maxRowsPerRequest;
456    private final Map<ServerName, Long> serverRows = new HashMap<>();
457
458    RequestRowsChecker(final long maxRowsPerRequest) {
459      this.maxRowsPerRequest = maxRowsPerRequest;
460    }
461
462    @Override
463    public void reset() {
464      serverRows.clear();
465    }
466
467    @Override
468    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
469      long currentRows =
470        serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L;
471      // accept at least one row
472      if (currentRows == 0 || currentRows < maxRowsPerRequest) {
473        return ReturnCode.INCLUDE;
474      }
475      return ReturnCode.SKIP;
476    }
477
478    @Override
479    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
480      if (code == ReturnCode.INCLUDE) {
481        long currentRows =
482          serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L;
483        serverRows.put(loc.getServerName(), currentRows + 1);
484      }
485    }
486  }
487
488  /**
489   * limit the heap size for each request.
490   */
491  static class RequestHeapSizeChecker implements RowChecker {
492
493    private final long maxHeapSizePerRequest;
494    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
495
496    RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
497      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
498    }
499
500    @Override
501    public void reset() {
502      serverRequestSizes.clear();
503    }
504
505    @Override
506    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
507      // Is it ok for limit of request size?
508      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
509        ? serverRequestSizes.get(loc.getServerName())
510        : 0L;
511      // accept at least one request
512      if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
513        return ReturnCode.INCLUDE;
514      }
515      return ReturnCode.SKIP;
516    }
517
518    @Override
519    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
520      if (code == ReturnCode.INCLUDE) {
521        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
522          ? serverRequestSizes.get(loc.getServerName())
523          : 0L;
524        serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
525      }
526    }
527  }
528
529  /**
530   * Provide a way to control the flow of rows iteration.
531   */
532  interface RowChecker {
533
534    ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
535
536    /**
537     * Add the final ReturnCode to the checker. The ReturnCode may be reversed, so the checker need
538     * the final decision to update the inner state.
539     * @param code          The final decision
540     * @param loc           the destination of data
541     * @param heapSizeOfRow the data size
542     */
543    void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
544
545    /**
546     * Reset the inner state.
547     */
548    void reset() throws InterruptedIOException;
549  }
550}