001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.function.Consumer;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionInfo;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.yetus.audience.InterfaceStability;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.apache.hadoop.hbase.util.Bytes;
048import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
049import org.apache.hadoop.hbase.util.EnvironmentEdge;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051
052/**
053 * Holds back the requests if they reach any thresholds.
054 */
055@InterfaceAudience.Private
056@InterfaceStability.Evolving
057class SimpleRequestController implements RequestController {
058  private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class);
059  /**
060   * The maximum heap size for each request.
061   */
062  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
063
064  /**
065   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
066   */
067  @VisibleForTesting
068  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
069
070  /**
071   * The maximum number of rows for each request.
072   */
073  public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
074  /**
075   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
076   */
077  @VisibleForTesting
078  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
079
080  /**
081   * The maximum size of submit.
082   */
083  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
084  /**
085   * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
086   */
087  @VisibleForTesting
088  static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
089  @VisibleForTesting
090  final AtomicLong tasksInProgress = new AtomicLong(0);
091  @VisibleForTesting
092  final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
093          = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
094  @VisibleForTesting
095  final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
096  /**
097   * The number of tasks simultaneously executed on the cluster.
098   */
099  private final int maxTotalConcurrentTasks;
100
101  /**
102   * The maximum heap size for each request.
103   */
104  private final long maxHeapSizePerRequest;
105  /**
106   * The maximum number of rows for each request.
107   */
108  private final long maxRowsPerRequest;
109  private final long maxHeapSizeSubmit;
110  /**
111   * The number of tasks we run in parallel on a single region. With 1 (the
112   * default) , we ensure that the ordering of the queries is respected: we
113   * don't start a set of operations on a region before the previous one is
114   * done. As well, this limits the pressure we put on the region server.
115   */
116  @VisibleForTesting
117  final int maxConcurrentTasksPerRegion;
118
119  /**
120   * The number of task simultaneously executed on a single region server.
121   */
122  @VisibleForTesting
123  final int maxConcurrentTasksPerServer;
124  private final int thresholdToLogUndoneTaskDetails;
125  public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
126      "hbase.client.threshold.log.details";
127  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
128  public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
129      "hbase.client.threshold.log.region.details";
130  private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
131  private final int thresholdToLogRegionDetails;
132  SimpleRequestController(final Configuration conf) {
133    this.maxTotalConcurrentTasks = checkAndGet(conf,
134            HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
135            HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
136    this.maxConcurrentTasksPerServer = checkAndGet(conf,
137            HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
138            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
139    this.maxConcurrentTasksPerRegion = checkAndGet(conf,
140            HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
141            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
142    this.maxHeapSizePerRequest = checkAndGet(conf,
143            HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
144            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
145    this.maxRowsPerRequest = checkAndGet(conf,
146            HBASE_CLIENT_MAX_PERREQUEST_ROWS,
147            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
148    this.maxHeapSizeSubmit = checkAndGet(conf,
149            HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE,
150            DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
151    this.thresholdToLogUndoneTaskDetails = conf.getInt(
152          THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
153          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
154    this.thresholdToLogRegionDetails = conf.getInt(
155          THRESHOLD_TO_LOG_REGION_DETAILS,
156          DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
157  }
158
159  private static int checkAndGet(Configuration conf, String key, int defaultValue) {
160    int value = conf.getInt(key, defaultValue);
161    if (value <= 0) {
162      throw new IllegalArgumentException(key + "=" + value);
163    }
164    return value;
165  }
166
167  private static long checkAndGet(Configuration conf, String key, long defaultValue) {
168    long value = conf.getLong(key, defaultValue);
169    if (value <= 0) {
170      throw new IllegalArgumentException(key + "=" + value);
171    }
172    return value;
173  }
174
175  @VisibleForTesting
176  static Checker newChecker(List<RowChecker> checkers) {
177    return new Checker() {
178      private boolean isEnd = false;
179
180      @Override
181      public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
182        if (isEnd) {
183          return ReturnCode.END;
184        }
185        long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
186        ReturnCode code = ReturnCode.INCLUDE;
187        for (RowChecker checker : checkers) {
188          switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
189            case END:
190              isEnd = true;
191              code = ReturnCode.END;
192              break;
193            case SKIP:
194              code = ReturnCode.SKIP;
195              break;
196            case INCLUDE:
197            default:
198              break;
199          }
200          if (code == ReturnCode.END) {
201            break;
202          }
203        }
204        for (RowChecker checker : checkers) {
205          checker.notifyFinal(code, loc, heapSizeOfRow);
206        }
207        return code;
208      }
209
210      @Override
211      public void reset() throws InterruptedIOException {
212        isEnd = false;
213        InterruptedIOException e = null;
214        for (RowChecker checker : checkers) {
215          try {
216            checker.reset();
217          } catch (InterruptedIOException ex) {
218            e = ex;
219          }
220        }
221        if (e != null) {
222          throw e;
223        }
224      }
225    };
226  }
227
228  @Override
229  public Checker newChecker() {
230    List<RowChecker> checkers = new ArrayList<>(4);
231    checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
232            maxConcurrentTasksPerServer,
233            maxConcurrentTasksPerRegion,
234            tasksInProgress,
235            taskCounterPerServer,
236            taskCounterPerRegion));
237    checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
238    checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
239    checkers.add(new RequestRowsChecker(maxRowsPerRequest));
240    return newChecker(checkers);
241  }
242
243  @Override
244  public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
245    tasksInProgress.incrementAndGet();
246
247    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
248
249    regions.forEach((regBytes)
250            -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
251    );
252  }
253
254  @Override
255  public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
256    regions.forEach(regBytes -> {
257      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
258      regionCnt.decrementAndGet();
259    });
260
261    taskCounterPerServer.get(sn).decrementAndGet();
262    tasksInProgress.decrementAndGet();
263    synchronized (tasksInProgress) {
264      tasksInProgress.notifyAll();
265    }
266  }
267
268  @Override
269  public long getNumberOfTasksInProgress() {
270    return tasksInProgress.get();
271  }
272
273  @Override
274  public void waitForMaximumCurrentTasks(long max, long id,
275    int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
276    assert max >= 0;
277    long lastLog = EnvironmentEdgeManager.currentTime();
278    long currentInProgress, oldInProgress = Long.MAX_VALUE;
279    while ((currentInProgress = tasksInProgress.get()) > max) {
280      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
281        long now = EnvironmentEdgeManager.currentTime();
282        if (now > lastLog + periodToTrigger) {
283          lastLog = now;
284          if (trigger != null) {
285            trigger.accept(currentInProgress);
286          }
287          logDetailsOfUndoneTasks(currentInProgress);
288        }
289      }
290      oldInProgress = currentInProgress;
291      try {
292        synchronized (tasksInProgress) {
293          if (tasksInProgress.get() == oldInProgress) {
294            tasksInProgress.wait(10);
295          }
296        }
297      } catch (InterruptedException e) {
298        throw new InterruptedIOException("#" + id + ", interrupted." +
299            " currentNumberOfTask=" + currentInProgress);
300      }
301    }
302  }
303
304  private void logDetailsOfUndoneTasks(long taskInProgress) {
305    if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
306      ArrayList<ServerName> servers = new ArrayList<>();
307      for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
308        if (entry.getValue().get() > 0) {
309          servers.add(entry.getKey());
310        }
311      }
312      LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
313    }
314
315    if (taskInProgress <= thresholdToLogRegionDetails) {
316      ArrayList<String> regions = new ArrayList<>();
317      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
318        if (entry.getValue().get() > 0) {
319          regions.add(Bytes.toString(entry.getKey()));
320        }
321      }
322      LOG.info("Regions against which left over task(s) are processed: " + regions);
323    }
324  }
325
326  @Override
327  public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
328    waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
329  }
330
331  /**
332   * limit the heapsize of total submitted data. Reduce the limit of heapsize
333   * for submitting quickly if there is no running task.
334   */
335  @VisibleForTesting
336  static class SubmittedSizeChecker implements RowChecker {
337
338    private final long maxHeapSizeSubmit;
339    private long heapSize = 0;
340
341    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
342      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
343    }
344
345    @Override
346    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
347      if (heapSize >= maxHeapSizeSubmit) {
348        return ReturnCode.END;
349      }
350      return ReturnCode.INCLUDE;
351    }
352
353    @Override
354    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
355      if (code == ReturnCode.INCLUDE) {
356        heapSize += heapSizeOfRow;
357      }
358    }
359
360    @Override
361    public void reset() {
362      heapSize = 0;
363    }
364  }
365
366  /**
367   * limit the max number of tasks in an AsyncProcess.
368   */
369  @VisibleForTesting
370  static class TaskCountChecker implements RowChecker {
371
372    private static final long MAX_WAITING_TIME = 1000; //ms
373    private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
374    private final Set<ServerName> serversIncluded = new HashSet<>();
375    private final int maxConcurrentTasksPerRegion;
376    private final int maxTotalConcurrentTasks;
377    private final int maxConcurrentTasksPerServer;
378    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
379    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
380    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
381    private final AtomicLong tasksInProgress;
382
383    TaskCountChecker(final int maxTotalConcurrentTasks,
384            final int maxConcurrentTasksPerServer,
385            final int maxConcurrentTasksPerRegion,
386            final AtomicLong tasksInProgress,
387            final Map<ServerName, AtomicInteger> taskCounterPerServer,
388            final Map<byte[], AtomicInteger> taskCounterPerRegion) {
389      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
390      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
391      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
392      this.taskCounterPerRegion = taskCounterPerRegion;
393      this.taskCounterPerServer = taskCounterPerServer;
394      this.tasksInProgress = tasksInProgress;
395    }
396
397    @Override
398    public void reset() throws InterruptedIOException {
399      // prevent the busy-waiting
400      waitForRegion();
401      regionsIncluded.clear();
402      serversIncluded.clear();
403      busyRegions.clear();
404    }
405
406    private void waitForRegion() throws InterruptedIOException {
407      if (busyRegions.isEmpty()) {
408        return;
409      }
410      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
411      final long start = ee.currentTime();
412      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
413        for (byte[] region : busyRegions) {
414          AtomicInteger count = taskCounterPerRegion.get(region);
415          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
416            return;
417          }
418        }
419        try {
420          synchronized (tasksInProgress) {
421            tasksInProgress.wait(10);
422          }
423        } catch (InterruptedException e) {
424          throw new InterruptedIOException("Interrupted."
425                  + " tasksInProgress=" + tasksInProgress);
426        }
427      }
428    }
429
430    /**
431     * 1) check the regions is allowed. 2) check the concurrent tasks for
432     * regions. 3) check the total concurrent tasks. 4) check the concurrent
433     * tasks for server.
434     *
435     * @param loc the destination of data
436     * @param heapSizeOfRow the data size
437     * @return either Include {@link RequestController.ReturnCode} or skip
438     *         {@link RequestController.ReturnCode}
439     */
440    @Override
441    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
442
443      HRegionInfo regionInfo = loc.getRegionInfo();
444      if (regionsIncluded.contains(regionInfo)) {
445        // We already know what to do with this region.
446        return ReturnCode.INCLUDE;
447      }
448      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
449      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
450        // Too many tasks on this region already.
451        return ReturnCode.SKIP;
452      }
453      int newServers = serversIncluded.size()
454              + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
455      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
456        // Too many tasks.
457        return ReturnCode.SKIP;
458      }
459      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
460      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
461        // Too many tasks for this individual server
462        return ReturnCode.SKIP;
463      }
464      return ReturnCode.INCLUDE;
465    }
466
467    @Override
468    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
469      if (code == ReturnCode.INCLUDE) {
470        regionsIncluded.add(loc.getRegionInfo());
471        serversIncluded.add(loc.getServerName());
472      }
473      busyRegions.add(loc.getRegionInfo().getRegionName());
474    }
475  }
476
477  /**
478   * limit the number of rows for each request.
479   */
480  @VisibleForTesting
481  static class RequestRowsChecker implements RowChecker {
482
483    private final long maxRowsPerRequest;
484    private final Map<ServerName, Long> serverRows = new HashMap<>();
485
486    RequestRowsChecker(final long maxRowsPerRequest) {
487      this.maxRowsPerRequest = maxRowsPerRequest;
488    }
489
490    @Override
491    public void reset() {
492      serverRows.clear();
493    }
494
495    @Override
496    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
497      long currentRows = serverRows.containsKey(loc.getServerName())
498              ? serverRows.get(loc.getServerName()) : 0L;
499      // accept at least one row
500      if (currentRows == 0 || currentRows < maxRowsPerRequest) {
501        return ReturnCode.INCLUDE;
502      }
503      return ReturnCode.SKIP;
504    }
505
506    @Override
507    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
508      if (code == ReturnCode.INCLUDE) {
509        long currentRows = serverRows.containsKey(loc.getServerName())
510                ? serverRows.get(loc.getServerName()) : 0L;
511        serverRows.put(loc.getServerName(), currentRows + 1);
512      }
513    }
514  }
515
516  /**
517   * limit the heap size for each request.
518   */
519  @VisibleForTesting
520  static class RequestHeapSizeChecker implements RowChecker {
521
522    private final long maxHeapSizePerRequest;
523    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
524
525    RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
526      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
527    }
528
529    @Override
530    public void reset() {
531      serverRequestSizes.clear();
532    }
533
534    @Override
535    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
536      // Is it ok for limit of request size?
537      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
538              ? serverRequestSizes.get(loc.getServerName()) : 0L;
539      // accept at least one request
540      if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
541        return ReturnCode.INCLUDE;
542      }
543      return ReturnCode.SKIP;
544    }
545
546    @Override
547    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
548      if (code == ReturnCode.INCLUDE) {
549        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
550                ? serverRequestSizes.get(loc.getServerName()) : 0L;
551        serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
552      }
553    }
554  }
555
556  /**
557   * Provide a way to control the flow of rows iteration.
558   */
559  @VisibleForTesting
560  interface RowChecker {
561
562    ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
563
564    /**
565     * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
566     * so the checker need the final decision to update the inner state.
567     *
568     * @param code The final decision
569     * @param loc the destination of data
570     * @param heapSizeOfRow the data size
571     */
572    void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
573
574    /**
575     * Reset the inner state.
576     */
577    void reset() throws InterruptedIOException;
578  }
579}