001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.ConcurrentSkipListMap;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import java.util.function.Consumer;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionInfo;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.EnvironmentEdge;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.yetus.audience.InterfaceStability;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Holds back the requests if they reach any thresholds.
054 */
055@InterfaceAudience.Private
056@InterfaceStability.Evolving
057class SimpleRequestController implements RequestController {
058  private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class);
059  /**
060   * The maximum heap size for each request.
061   */
062  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
063
064  /**
065   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
066   */
067  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
068
069  /**
070   * The maximum number of rows for each request.
071   */
072  public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
073  /**
074   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
075   */
076  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
077
078  /**
079   * The maximum size of submit.
080   */
081  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
082  /**
083   * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
084   */
085  static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
086  final AtomicLong tasksInProgress = new AtomicLong(0);
087  final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
088          = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
089  final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
090  /**
091   * The number of tasks simultaneously executed on the cluster.
092   */
093  private final int maxTotalConcurrentTasks;
094
095  /**
096   * The maximum heap size for each request.
097   */
098  private final long maxHeapSizePerRequest;
099  /**
100   * The maximum number of rows for each request.
101   */
102  private final long maxRowsPerRequest;
103  private final long maxHeapSizeSubmit;
104  /**
105   * The number of tasks we run in parallel on a single region. With 1 (the
106   * default) , we ensure that the ordering of the queries is respected: we
107   * don't start a set of operations on a region before the previous one is
108   * done. As well, this limits the pressure we put on the region server.
109   */
110  final int maxConcurrentTasksPerRegion;
111
112  /**
113   * The number of task simultaneously executed on a single region server.
114   */
115  final int maxConcurrentTasksPerServer;
116  private final int thresholdToLogUndoneTaskDetails;
117  public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
118      "hbase.client.threshold.log.details";
119  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
120  public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
121      "hbase.client.threshold.log.region.details";
122  private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
123  private final int thresholdToLogRegionDetails;
124  SimpleRequestController(final Configuration conf) {
125    this.maxTotalConcurrentTasks = checkAndGet(conf,
126            HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
127            HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
128    this.maxConcurrentTasksPerServer = checkAndGet(conf,
129            HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
130            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
131    this.maxConcurrentTasksPerRegion = checkAndGet(conf,
132            HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
133            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
134    this.maxHeapSizePerRequest = checkAndGet(conf,
135            HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
136            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
137    this.maxRowsPerRequest = checkAndGet(conf,
138            HBASE_CLIENT_MAX_PERREQUEST_ROWS,
139            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
140    this.maxHeapSizeSubmit = checkAndGet(conf,
141            HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE,
142            DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
143    this.thresholdToLogUndoneTaskDetails = conf.getInt(
144          THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
145          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
146    this.thresholdToLogRegionDetails = conf.getInt(
147          THRESHOLD_TO_LOG_REGION_DETAILS,
148          DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
149  }
150
151  private static int checkAndGet(Configuration conf, String key, int defaultValue) {
152    int value = conf.getInt(key, defaultValue);
153    if (value <= 0) {
154      throw new IllegalArgumentException(key + "=" + value);
155    }
156    return value;
157  }
158
159  private static long checkAndGet(Configuration conf, String key, long defaultValue) {
160    long value = conf.getLong(key, defaultValue);
161    if (value <= 0) {
162      throw new IllegalArgumentException(key + "=" + value);
163    }
164    return value;
165  }
166
167  static Checker newChecker(List<RowChecker> checkers) {
168    return new Checker() {
169      private boolean isEnd = false;
170
171      @Override
172      public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
173        if (isEnd) {
174          return ReturnCode.END;
175        }
176        long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
177        ReturnCode code = ReturnCode.INCLUDE;
178        for (RowChecker checker : checkers) {
179          switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
180            case END:
181              isEnd = true;
182              code = ReturnCode.END;
183              break;
184            case SKIP:
185              code = ReturnCode.SKIP;
186              break;
187            case INCLUDE:
188            default:
189              break;
190          }
191          if (code == ReturnCode.END) {
192            break;
193          }
194        }
195        for (RowChecker checker : checkers) {
196          checker.notifyFinal(code, loc, heapSizeOfRow);
197        }
198        return code;
199      }
200
201      @Override
202      public void reset() throws InterruptedIOException {
203        isEnd = false;
204        InterruptedIOException e = null;
205        for (RowChecker checker : checkers) {
206          try {
207            checker.reset();
208          } catch (InterruptedIOException ex) {
209            e = ex;
210          }
211        }
212        if (e != null) {
213          throw e;
214        }
215      }
216    };
217  }
218
219  @Override
220  public Checker newChecker() {
221    List<RowChecker> checkers = new ArrayList<>(4);
222    checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
223            maxConcurrentTasksPerServer,
224            maxConcurrentTasksPerRegion,
225            tasksInProgress,
226            taskCounterPerServer,
227            taskCounterPerRegion));
228    checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
229    checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
230    checkers.add(new RequestRowsChecker(maxRowsPerRequest));
231    return newChecker(checkers);
232  }
233
234  @Override
235  public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
236    tasksInProgress.incrementAndGet();
237
238    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
239
240    regions.forEach((regBytes)
241            -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
242    );
243  }
244
245  @Override
246  public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
247    regions.forEach(regBytes -> {
248      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
249      regionCnt.decrementAndGet();
250    });
251
252    taskCounterPerServer.get(sn).decrementAndGet();
253    tasksInProgress.decrementAndGet();
254    synchronized (tasksInProgress) {
255      tasksInProgress.notifyAll();
256    }
257  }
258
259  @Override
260  public long getNumberOfTasksInProgress() {
261    return tasksInProgress.get();
262  }
263
264  @Override
265  public void waitForMaximumCurrentTasks(long max, long id,
266    int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
267    assert max >= 0;
268    long lastLog = EnvironmentEdgeManager.currentTime();
269    long currentInProgress, oldInProgress = Long.MAX_VALUE;
270    while ((currentInProgress = tasksInProgress.get()) > max) {
271      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
272        long now = EnvironmentEdgeManager.currentTime();
273        if (now > lastLog + periodToTrigger) {
274          lastLog = now;
275          if (trigger != null) {
276            trigger.accept(currentInProgress);
277          }
278          logDetailsOfUndoneTasks(currentInProgress);
279        }
280      }
281      oldInProgress = currentInProgress;
282      try {
283        synchronized (tasksInProgress) {
284          if (tasksInProgress.get() == oldInProgress) {
285            tasksInProgress.wait(10);
286          }
287        }
288      } catch (InterruptedException e) {
289        throw new InterruptedIOException("#" + id + ", interrupted." +
290            " currentNumberOfTask=" + currentInProgress);
291      }
292    }
293  }
294
295  private void logDetailsOfUndoneTasks(long taskInProgress) {
296    if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
297      ArrayList<ServerName> servers = new ArrayList<>();
298      for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
299        if (entry.getValue().get() > 0) {
300          servers.add(entry.getKey());
301        }
302      }
303      LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
304    }
305
306    if (taskInProgress <= thresholdToLogRegionDetails) {
307      ArrayList<String> regions = new ArrayList<>();
308      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
309        if (entry.getValue().get() > 0) {
310          regions.add(Bytes.toString(entry.getKey()));
311        }
312      }
313      LOG.info("Regions against which left over task(s) are processed: " + regions);
314    }
315  }
316
317  @Override
318  public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
319    waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
320  }
321
322  /**
323   * limit the heapsize of total submitted data. Reduce the limit of heapsize
324   * for submitting quickly if there is no running task.
325   */
326  static class SubmittedSizeChecker implements RowChecker {
327
328    private final long maxHeapSizeSubmit;
329    private long heapSize = 0;
330
331    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
332      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
333    }
334
335    @Override
336    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
337      if (heapSize >= maxHeapSizeSubmit) {
338        return ReturnCode.END;
339      }
340      return ReturnCode.INCLUDE;
341    }
342
343    @Override
344    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
345      if (code == ReturnCode.INCLUDE) {
346        heapSize += heapSizeOfRow;
347      }
348    }
349
350    @Override
351    public void reset() {
352      heapSize = 0;
353    }
354  }
355
356  /**
357   * limit the max number of tasks in an AsyncProcess.
358   */
359  static class TaskCountChecker implements RowChecker {
360
361    private static final long MAX_WAITING_TIME = 1000; //ms
362    private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
363    private final Set<ServerName> serversIncluded = new HashSet<>();
364    private final int maxConcurrentTasksPerRegion;
365    private final int maxTotalConcurrentTasks;
366    private final int maxConcurrentTasksPerServer;
367    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
368    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
369    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
370    private final AtomicLong tasksInProgress;
371
372    TaskCountChecker(final int maxTotalConcurrentTasks,
373            final int maxConcurrentTasksPerServer,
374            final int maxConcurrentTasksPerRegion,
375            final AtomicLong tasksInProgress,
376            final Map<ServerName, AtomicInteger> taskCounterPerServer,
377            final Map<byte[], AtomicInteger> taskCounterPerRegion) {
378      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
379      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
380      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
381      this.taskCounterPerRegion = taskCounterPerRegion;
382      this.taskCounterPerServer = taskCounterPerServer;
383      this.tasksInProgress = tasksInProgress;
384    }
385
386    @Override
387    public void reset() throws InterruptedIOException {
388      // prevent the busy-waiting
389      waitForRegion();
390      regionsIncluded.clear();
391      serversIncluded.clear();
392      busyRegions.clear();
393    }
394
395    private void waitForRegion() throws InterruptedIOException {
396      if (busyRegions.isEmpty()) {
397        return;
398      }
399      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
400      final long start = ee.currentTime();
401      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
402        for (byte[] region : busyRegions) {
403          AtomicInteger count = taskCounterPerRegion.get(region);
404          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
405            return;
406          }
407        }
408        try {
409          synchronized (tasksInProgress) {
410            tasksInProgress.wait(10);
411          }
412        } catch (InterruptedException e) {
413          throw new InterruptedIOException("Interrupted."
414                  + " tasksInProgress=" + tasksInProgress);
415        }
416      }
417    }
418
419    /**
420     * 1) check the regions is allowed. 2) check the concurrent tasks for
421     * regions. 3) check the total concurrent tasks. 4) check the concurrent
422     * tasks for server.
423     *
424     * @param loc the destination of data
425     * @param heapSizeOfRow the data size
426     * @return either Include {@link RequestController.ReturnCode} or skip
427     *         {@link RequestController.ReturnCode}
428     */
429    @Override
430    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
431
432      HRegionInfo regionInfo = loc.getRegionInfo();
433      if (regionsIncluded.contains(regionInfo)) {
434        // We already know what to do with this region.
435        return ReturnCode.INCLUDE;
436      }
437      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
438      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
439        // Too many tasks on this region already.
440        return ReturnCode.SKIP;
441      }
442      int newServers = serversIncluded.size()
443              + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
444      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
445        // Too many tasks.
446        return ReturnCode.SKIP;
447      }
448      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
449      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
450        // Too many tasks for this individual server
451        return ReturnCode.SKIP;
452      }
453      return ReturnCode.INCLUDE;
454    }
455
456    @Override
457    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
458      if (code == ReturnCode.INCLUDE) {
459        regionsIncluded.add(loc.getRegionInfo());
460        serversIncluded.add(loc.getServerName());
461      }
462      busyRegions.add(loc.getRegionInfo().getRegionName());
463    }
464  }
465
466  /**
467   * limit the number of rows for each request.
468   */
469  static class RequestRowsChecker implements RowChecker {
470
471    private final long maxRowsPerRequest;
472    private final Map<ServerName, Long> serverRows = new HashMap<>();
473
474    RequestRowsChecker(final long maxRowsPerRequest) {
475      this.maxRowsPerRequest = maxRowsPerRequest;
476    }
477
478    @Override
479    public void reset() {
480      serverRows.clear();
481    }
482
483    @Override
484    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
485      long currentRows = serverRows.containsKey(loc.getServerName())
486              ? serverRows.get(loc.getServerName()) : 0L;
487      // accept at least one row
488      if (currentRows == 0 || currentRows < maxRowsPerRequest) {
489        return ReturnCode.INCLUDE;
490      }
491      return ReturnCode.SKIP;
492    }
493
494    @Override
495    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
496      if (code == ReturnCode.INCLUDE) {
497        long currentRows = serverRows.containsKey(loc.getServerName())
498                ? serverRows.get(loc.getServerName()) : 0L;
499        serverRows.put(loc.getServerName(), currentRows + 1);
500      }
501    }
502  }
503
504  /**
505   * limit the heap size for each request.
506   */
507  static class RequestHeapSizeChecker implements RowChecker {
508
509    private final long maxHeapSizePerRequest;
510    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
511
512    RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
513      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
514    }
515
516    @Override
517    public void reset() {
518      serverRequestSizes.clear();
519    }
520
521    @Override
522    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
523      // Is it ok for limit of request size?
524      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
525              ? serverRequestSizes.get(loc.getServerName()) : 0L;
526      // accept at least one request
527      if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
528        return ReturnCode.INCLUDE;
529      }
530      return ReturnCode.SKIP;
531    }
532
533    @Override
534    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
535      if (code == ReturnCode.INCLUDE) {
536        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
537                ? serverRequestSizes.get(loc.getServerName()) : 0L;
538        serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
539      }
540    }
541  }
542
543  /**
544   * Provide a way to control the flow of rows iteration.
545   */
546  interface RowChecker {
547
548    ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
549
550    /**
551     * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
552     * so the checker need the final decision to update the inner state.
553     *
554     * @param code The final decision
555     * @param loc the destination of data
556     * @param heapSizeOfRow the data size
557     */
558    void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
559
560    /**
561     * Reset the inner state.
562     */
563    void reset() throws InterruptedIOException;
564  }
565}