001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.function.Consumer;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.util.Bytes;
047import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
048import org.apache.hadoop.hbase.util.EnvironmentEdge;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050
051/**
052 * Holds back the requests if they reach any thresholds.
053 */
054@InterfaceAudience.Private
055@InterfaceStability.Evolving
056class SimpleRequestController implements RequestController {
057  private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class);
058  /**
059   * The maximum heap size for each request.
060   */
061  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
062
063  /**
064   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
065   */
066  @VisibleForTesting
067  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
068
069  /**
070   * The maximum number of rows for each request.
071   */
072  public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
073  /**
074   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
075   */
076  @VisibleForTesting
077  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
078
079  /**
080   * The maximum size of submit.
081   */
082  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
083  /**
084   * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
085   */
086  @VisibleForTesting
087  static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
088  @VisibleForTesting
089  final AtomicLong tasksInProgress = new AtomicLong(0);
090  @VisibleForTesting
091  final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
092          = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
093  @VisibleForTesting
094  final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
095  /**
096   * The number of tasks simultaneously executed on the cluster.
097   */
098  private final int maxTotalConcurrentTasks;
099
100  /**
101   * The maximum heap size for each request.
102   */
103  private final long maxHeapSizePerRequest;
104  /**
105   * The maximum number of rows for each request.
106   */
107  private final long maxRowsPerRequest;
108  private final long maxHeapSizeSubmit;
109  /**
110   * The number of tasks we run in parallel on a single region. With 1 (the
111   * default) , we ensure that the ordering of the queries is respected: we
112   * don't start a set of operations on a region before the previous one is
113   * done. As well, this limits the pressure we put on the region server.
114   */
115  @VisibleForTesting
116  final int maxConcurrentTasksPerRegion;
117
118  /**
119   * The number of task simultaneously executed on a single region server.
120   */
121  @VisibleForTesting
122  final int maxConcurrentTasksPerServer;
123  private final int thresholdToLogUndoneTaskDetails;
124  public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
125      "hbase.client.threshold.log.details";
126  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
127  public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
128      "hbase.client.threshold.log.region.details";
129  private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
130  private final int thresholdToLogRegionDetails;
131  SimpleRequestController(final Configuration conf) {
132    this.maxTotalConcurrentTasks = checkAndGet(conf,
133            HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
134            HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
135    this.maxConcurrentTasksPerServer = checkAndGet(conf,
136            HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
137            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
138    this.maxConcurrentTasksPerRegion = checkAndGet(conf,
139            HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
140            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
141    this.maxHeapSizePerRequest = checkAndGet(conf,
142            HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
143            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
144    this.maxRowsPerRequest = checkAndGet(conf,
145            HBASE_CLIENT_MAX_PERREQUEST_ROWS,
146            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
147    this.maxHeapSizeSubmit = checkAndGet(conf,
148            HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE,
149            DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
150    this.thresholdToLogUndoneTaskDetails = conf.getInt(
151          THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
152          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
153    this.thresholdToLogRegionDetails = conf.getInt(
154          THRESHOLD_TO_LOG_REGION_DETAILS,
155          DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
156  }
157
158  private static int checkAndGet(Configuration conf, String key, int defaultValue) {
159    int value = conf.getInt(key, defaultValue);
160    if (value <= 0) {
161      throw new IllegalArgumentException(key + "=" + value);
162    }
163    return value;
164  }
165
166  private static long checkAndGet(Configuration conf, String key, long defaultValue) {
167    long value = conf.getLong(key, defaultValue);
168    if (value <= 0) {
169      throw new IllegalArgumentException(key + "=" + value);
170    }
171    return value;
172  }
173
174  @VisibleForTesting
175  static Checker newChecker(List<RowChecker> checkers) {
176    return new Checker() {
177      private boolean isEnd = false;
178
179      @Override
180      public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
181        if (isEnd) {
182          return ReturnCode.END;
183        }
184        long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
185        ReturnCode code = ReturnCode.INCLUDE;
186        for (RowChecker checker : checkers) {
187          switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
188            case END:
189              isEnd = true;
190              code = ReturnCode.END;
191              break;
192            case SKIP:
193              code = ReturnCode.SKIP;
194              break;
195            case INCLUDE:
196            default:
197              break;
198          }
199          if (code == ReturnCode.END) {
200            break;
201          }
202        }
203        for (RowChecker checker : checkers) {
204          checker.notifyFinal(code, loc, heapSizeOfRow);
205        }
206        return code;
207      }
208
209      @Override
210      public void reset() throws InterruptedIOException {
211        isEnd = false;
212        InterruptedIOException e = null;
213        for (RowChecker checker : checkers) {
214          try {
215            checker.reset();
216          } catch (InterruptedIOException ex) {
217            e = ex;
218          }
219        }
220        if (e != null) {
221          throw e;
222        }
223      }
224    };
225  }
226
227  @Override
228  public Checker newChecker() {
229    List<RowChecker> checkers = new ArrayList<>(4);
230    checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
231            maxConcurrentTasksPerServer,
232            maxConcurrentTasksPerRegion,
233            tasksInProgress,
234            taskCounterPerServer,
235            taskCounterPerRegion));
236    checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
237    checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
238    checkers.add(new RequestRowsChecker(maxRowsPerRequest));
239    return newChecker(checkers);
240  }
241
242  @Override
243  public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
244    tasksInProgress.incrementAndGet();
245
246    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
247
248    regions.forEach((regBytes)
249            -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
250    );
251  }
252
253  @Override
254  public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
255    regions.forEach(regBytes -> {
256      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
257      regionCnt.decrementAndGet();
258    });
259
260    taskCounterPerServer.get(sn).decrementAndGet();
261    tasksInProgress.decrementAndGet();
262    synchronized (tasksInProgress) {
263      tasksInProgress.notifyAll();
264    }
265  }
266
267  @Override
268  public long getNumberOfTasksInProgress() {
269    return tasksInProgress.get();
270  }
271
272  @Override
273  public void waitForMaximumCurrentTasks(long max, long id,
274    int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
275    assert max >= 0;
276    long lastLog = EnvironmentEdgeManager.currentTime();
277    long currentInProgress, oldInProgress = Long.MAX_VALUE;
278    while ((currentInProgress = tasksInProgress.get()) > max) {
279      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
280        long now = EnvironmentEdgeManager.currentTime();
281        if (now > lastLog + periodToTrigger) {
282          lastLog = now;
283          if (trigger != null) {
284            trigger.accept(currentInProgress);
285          }
286          logDetailsOfUndoneTasks(currentInProgress);
287        }
288      }
289      oldInProgress = currentInProgress;
290      try {
291        synchronized (tasksInProgress) {
292          if (tasksInProgress.get() == oldInProgress) {
293            tasksInProgress.wait(10);
294          }
295        }
296      } catch (InterruptedException e) {
297        throw new InterruptedIOException("#" + id + ", interrupted." +
298            " currentNumberOfTask=" + currentInProgress);
299      }
300    }
301  }
302
303  private void logDetailsOfUndoneTasks(long taskInProgress) {
304    if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
305      ArrayList<ServerName> servers = new ArrayList<>();
306      for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
307        if (entry.getValue().get() > 0) {
308          servers.add(entry.getKey());
309        }
310      }
311      LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
312    }
313
314    if (taskInProgress <= thresholdToLogRegionDetails) {
315      ArrayList<String> regions = new ArrayList<>();
316      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
317        if (entry.getValue().get() > 0) {
318          regions.add(Bytes.toString(entry.getKey()));
319        }
320      }
321      LOG.info("Regions against which left over task(s) are processed: " + regions);
322    }
323  }
324
325  @Override
326  public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
327    waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
328  }
329
330  /**
331   * limit the heapsize of total submitted data. Reduce the limit of heapsize
332   * for submitting quickly if there is no running task.
333   */
334  @VisibleForTesting
335  static class SubmittedSizeChecker implements RowChecker {
336
337    private final long maxHeapSizeSubmit;
338    private long heapSize = 0;
339
340    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
341      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
342    }
343
344    @Override
345    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
346      if (heapSize >= maxHeapSizeSubmit) {
347        return ReturnCode.END;
348      }
349      return ReturnCode.INCLUDE;
350    }
351
352    @Override
353    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
354      if (code == ReturnCode.INCLUDE) {
355        heapSize += heapSizeOfRow;
356      }
357    }
358
359    @Override
360    public void reset() {
361      heapSize = 0;
362    }
363  }
364
365  /**
366   * limit the max number of tasks in an AsyncProcess.
367   */
368  @VisibleForTesting
369  static class TaskCountChecker implements RowChecker {
370
371    private static final long MAX_WAITING_TIME = 1000; //ms
372    private final Set<RegionInfo> regionsIncluded = new HashSet<>();
373    private final Set<ServerName> serversIncluded = new HashSet<>();
374    private final int maxConcurrentTasksPerRegion;
375    private final int maxTotalConcurrentTasks;
376    private final int maxConcurrentTasksPerServer;
377    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
378    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
379    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
380    private final AtomicLong tasksInProgress;
381
382    TaskCountChecker(final int maxTotalConcurrentTasks,
383            final int maxConcurrentTasksPerServer,
384            final int maxConcurrentTasksPerRegion,
385            final AtomicLong tasksInProgress,
386            final Map<ServerName, AtomicInteger> taskCounterPerServer,
387            final Map<byte[], AtomicInteger> taskCounterPerRegion) {
388      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
389      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
390      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
391      this.taskCounterPerRegion = taskCounterPerRegion;
392      this.taskCounterPerServer = taskCounterPerServer;
393      this.tasksInProgress = tasksInProgress;
394    }
395
396    @Override
397    public void reset() throws InterruptedIOException {
398      // prevent the busy-waiting
399      waitForRegion();
400      regionsIncluded.clear();
401      serversIncluded.clear();
402      busyRegions.clear();
403    }
404
405    private void waitForRegion() throws InterruptedIOException {
406      if (busyRegions.isEmpty()) {
407        return;
408      }
409      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
410      final long start = ee.currentTime();
411      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
412        for (byte[] region : busyRegions) {
413          AtomicInteger count = taskCounterPerRegion.get(region);
414          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
415            return;
416          }
417        }
418        try {
419          synchronized (tasksInProgress) {
420            tasksInProgress.wait(10);
421          }
422        } catch (InterruptedException e) {
423          throw new InterruptedIOException("Interrupted."
424                  + " tasksInProgress=" + tasksInProgress);
425        }
426      }
427    }
428
429    /**
430     * 1) check the regions is allowed. 2) check the concurrent tasks for
431     * regions. 3) check the total concurrent tasks. 4) check the concurrent
432     * tasks for server.
433     *
434     * @param loc the destination of data
435     * @param heapSizeOfRow the data size
436     * @return either Include {@link RequestController.ReturnCode} or skip
437     *         {@link RequestController.ReturnCode}
438     */
439    @Override
440    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
441      RegionInfo regionInfo = loc.getRegion();
442      if (regionsIncluded.contains(regionInfo)) {
443        // We already know what to do with this region.
444        return ReturnCode.INCLUDE;
445      }
446      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegion().getRegionName());
447      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
448        // Too many tasks on this region already.
449        return ReturnCode.SKIP;
450      }
451      int newServers = serversIncluded.size()
452              + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
453      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
454        // Too many tasks.
455        return ReturnCode.SKIP;
456      }
457      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
458      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
459        // Too many tasks for this individual server
460        return ReturnCode.SKIP;
461      }
462      return ReturnCode.INCLUDE;
463    }
464
465    @Override
466    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
467      if (code == ReturnCode.INCLUDE) {
468        regionsIncluded.add(loc.getRegion());
469        serversIncluded.add(loc.getServerName());
470      }
471      busyRegions.add(loc.getRegion().getRegionName());
472    }
473  }
474
475  /**
476   * limit the number of rows for each request.
477   */
478  @VisibleForTesting
479  static class RequestRowsChecker implements RowChecker {
480
481    private final long maxRowsPerRequest;
482    private final Map<ServerName, Long> serverRows = new HashMap<>();
483
484    RequestRowsChecker(final long maxRowsPerRequest) {
485      this.maxRowsPerRequest = maxRowsPerRequest;
486    }
487
488    @Override
489    public void reset() {
490      serverRows.clear();
491    }
492
493    @Override
494    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
495      long currentRows = serverRows.containsKey(loc.getServerName())
496              ? serverRows.get(loc.getServerName()) : 0L;
497      // accept at least one row
498      if (currentRows == 0 || currentRows < maxRowsPerRequest) {
499        return ReturnCode.INCLUDE;
500      }
501      return ReturnCode.SKIP;
502    }
503
504    @Override
505    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
506      if (code == ReturnCode.INCLUDE) {
507        long currentRows = serverRows.containsKey(loc.getServerName())
508                ? serverRows.get(loc.getServerName()) : 0L;
509        serverRows.put(loc.getServerName(), currentRows + 1);
510      }
511    }
512  }
513
514  /**
515   * limit the heap size for each request.
516   */
517  @VisibleForTesting
518  static class RequestHeapSizeChecker implements RowChecker {
519
520    private final long maxHeapSizePerRequest;
521    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
522
523    RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
524      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
525    }
526
527    @Override
528    public void reset() {
529      serverRequestSizes.clear();
530    }
531
532    @Override
533    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
534      // Is it ok for limit of request size?
535      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
536              ? serverRequestSizes.get(loc.getServerName()) : 0L;
537      // accept at least one request
538      if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
539        return ReturnCode.INCLUDE;
540      }
541      return ReturnCode.SKIP;
542    }
543
544    @Override
545    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
546      if (code == ReturnCode.INCLUDE) {
547        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
548                ? serverRequestSizes.get(loc.getServerName()) : 0L;
549        serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
550      }
551    }
552  }
553
554  /**
555   * Provide a way to control the flow of rows iteration.
556   */
557  @VisibleForTesting
558  interface RowChecker {
559
560    ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
561
562    /**
563     * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
564     * so the checker need the final decision to update the inner state.
565     *
566     * @param code The final decision
567     * @param loc the destination of data
568     * @param heapSizeOfRow the data size
569     */
570    void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
571
572    /**
573     * Reset the inner state.
574     */
575    void reset() throws InterruptedIOException;
576  }
577}