001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.function.Consumer;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdge;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.apache.yetus.audience.InterfaceStability;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Holds back the requests if they reach any thresholds.
052 */
053@InterfaceAudience.Private
054@InterfaceStability.Evolving
055class SimpleRequestController implements RequestController {
056  private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class);
057  /**
058   * The maximum heap size for each request.
059   */
060  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE =
061    "hbase.client.max.perrequest.heapsize";
062
063  /**
064   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
065   */
066  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
067
068  /**
069   * The maximum number of rows for each request.
070   */
071  public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
072  /**
073   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
074   */
075  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
076
077  /**
078   * The maximum size of submit.
079   */
080  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
081  /**
082   * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
083   */
084  static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE =
085    DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
086  final AtomicLong tasksInProgress = new AtomicLong(0);
087  final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
088    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
089  final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
090  /**
091   * The number of tasks simultaneously executed on the cluster.
092   */
093  private final int maxTotalConcurrentTasks;
094
095  /**
096   * The maximum heap size for each request.
097   */
098  private final long maxHeapSizePerRequest;
099  /**
100   * The maximum number of rows for each request.
101   */
102  private final long maxRowsPerRequest;
103  private final long maxHeapSizeSubmit;
104  /**
105   * The number of tasks we run in parallel on a single region. With 1 (the default) , we ensure
106   * that the ordering of the queries is respected: we don't start a set of operations on a region
107   * before the previous one is done. As well, this limits the pressure we put on the region server.
108   */
109  final int maxConcurrentTasksPerRegion;
110
111  /**
112   * The number of task simultaneously executed on a single region server.
113   */
114  final int maxConcurrentTasksPerServer;
115  private final int thresholdToLogUndoneTaskDetails;
116  public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
117    "hbase.client.threshold.log.details";
118  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
119  public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
120    "hbase.client.threshold.log.region.details";
121  private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
122  private final int thresholdToLogRegionDetails;
123
124  SimpleRequestController(final Configuration conf) {
125    this.maxTotalConcurrentTasks = checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
126      HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
127    this.maxConcurrentTasksPerServer =
128      checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
129        HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
130    this.maxConcurrentTasksPerRegion =
131      checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
132        HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
133    this.maxHeapSizePerRequest = checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
134      DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
135    this.maxRowsPerRequest =
136      checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_ROWS, DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
137    this.maxHeapSizeSubmit =
138      checkAndGet(conf, HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
139    this.thresholdToLogUndoneTaskDetails = conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
140      DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
141    this.thresholdToLogRegionDetails =
142      conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS, DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
143  }
144
145  private static int checkAndGet(Configuration conf, String key, int defaultValue) {
146    int value = conf.getInt(key, defaultValue);
147    if (value <= 0) {
148      throw new IllegalArgumentException(key + "=" + value);
149    }
150    return value;
151  }
152
153  private static long checkAndGet(Configuration conf, String key, long defaultValue) {
154    long value = conf.getLong(key, defaultValue);
155    if (value <= 0) {
156      throw new IllegalArgumentException(key + "=" + value);
157    }
158    return value;
159  }
160
161  static Checker newChecker(List<RowChecker> checkers) {
162    return new Checker() {
163      private boolean isEnd = false;
164
165      @Override
166      public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
167        if (isEnd) {
168          return ReturnCode.END;
169        }
170        long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
171        ReturnCode code = ReturnCode.INCLUDE;
172        for (RowChecker checker : checkers) {
173          switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
174            case END:
175              isEnd = true;
176              code = ReturnCode.END;
177              break;
178            case SKIP:
179              code = ReturnCode.SKIP;
180              break;
181            case INCLUDE:
182            default:
183              break;
184          }
185          if (code == ReturnCode.END) {
186            break;
187          }
188        }
189        for (RowChecker checker : checkers) {
190          checker.notifyFinal(code, loc, heapSizeOfRow);
191        }
192        return code;
193      }
194
195      @Override
196      public void reset() throws InterruptedIOException {
197        isEnd = false;
198        InterruptedIOException e = null;
199        for (RowChecker checker : checkers) {
200          try {
201            checker.reset();
202          } catch (InterruptedIOException ex) {
203            e = ex;
204          }
205        }
206        if (e != null) {
207          throw e;
208        }
209      }
210    };
211  }
212
213  @Override
214  public Checker newChecker() {
215    List<RowChecker> checkers = new ArrayList<>(4);
216    checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, maxConcurrentTasksPerServer,
217      maxConcurrentTasksPerRegion, tasksInProgress, taskCounterPerServer, taskCounterPerRegion));
218    checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
219    checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
220    checkers.add(new RequestRowsChecker(maxRowsPerRequest));
221    return newChecker(checkers);
222  }
223
224  @Override
225  public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
226    tasksInProgress.incrementAndGet();
227
228    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
229
230    regions
231      .forEach((regBytes) -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new)
232        .incrementAndGet());
233  }
234
235  @Override
236  public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
237    regions.forEach(regBytes -> {
238      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
239      regionCnt.decrementAndGet();
240    });
241
242    taskCounterPerServer.get(sn).decrementAndGet();
243    tasksInProgress.decrementAndGet();
244    synchronized (tasksInProgress) {
245      tasksInProgress.notifyAll();
246    }
247  }
248
249  @Override
250  public long getNumberOfTasksInProgress() {
251    return tasksInProgress.get();
252  }
253
254  @Override
255  public void waitForMaximumCurrentTasks(long max, long id, int periodToTrigger,
256    Consumer<Long> trigger) throws InterruptedIOException {
257    assert max >= 0;
258    long lastLog = EnvironmentEdgeManager.currentTime();
259    long currentInProgress, oldInProgress = Long.MAX_VALUE;
260    while ((currentInProgress = tasksInProgress.get()) > max) {
261      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
262        long now = EnvironmentEdgeManager.currentTime();
263        if (now > lastLog + periodToTrigger) {
264          lastLog = now;
265          if (trigger != null) {
266            trigger.accept(currentInProgress);
267          }
268          logDetailsOfUndoneTasks(currentInProgress);
269        }
270      }
271      oldInProgress = currentInProgress;
272      try {
273        synchronized (tasksInProgress) {
274          if (tasksInProgress.get() == oldInProgress) {
275            tasksInProgress.wait(10);
276          }
277        }
278      } catch (InterruptedException e) {
279        throw new InterruptedIOException(
280          "#" + id + ", interrupted." + " currentNumberOfTask=" + currentInProgress);
281      }
282    }
283  }
284
285  private void logDetailsOfUndoneTasks(long taskInProgress) {
286    if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
287      ArrayList<ServerName> servers = new ArrayList<>();
288      for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
289        if (entry.getValue().get() > 0) {
290          servers.add(entry.getKey());
291        }
292      }
293      LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
294    }
295
296    if (taskInProgress <= thresholdToLogRegionDetails) {
297      ArrayList<String> regions = new ArrayList<>();
298      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
299        if (entry.getValue().get() > 0) {
300          regions.add(Bytes.toString(entry.getKey()));
301        }
302      }
303      LOG.info("Regions against which left over task(s) are processed: " + regions);
304    }
305  }
306
307  @Override
308  public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger)
309    throws InterruptedIOException {
310    waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
311  }
312
313  /**
314   * limit the heapsize of total submitted data. Reduce the limit of heapsize for submitting quickly
315   * if there is no running task.
316   */
317  static class SubmittedSizeChecker implements RowChecker {
318
319    private final long maxHeapSizeSubmit;
320    private long heapSize = 0;
321
322    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
323      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
324    }
325
326    @Override
327    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
328      if (heapSize >= maxHeapSizeSubmit) {
329        return ReturnCode.END;
330      }
331      return ReturnCode.INCLUDE;
332    }
333
334    @Override
335    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
336      if (code == ReturnCode.INCLUDE) {
337        heapSize += heapSizeOfRow;
338      }
339    }
340
341    @Override
342    public void reset() {
343      heapSize = 0;
344    }
345  }
346
347  /**
348   * limit the max number of tasks in an AsyncProcess.
349   */
350  static class TaskCountChecker implements RowChecker {
351
352    private static final long MAX_WAITING_TIME = 1000; // ms
353    private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
354    private final Set<ServerName> serversIncluded = new HashSet<>();
355    private final int maxConcurrentTasksPerRegion;
356    private final int maxTotalConcurrentTasks;
357    private final int maxConcurrentTasksPerServer;
358    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
359    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
360    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
361    private final AtomicLong tasksInProgress;
362
363    TaskCountChecker(final int maxTotalConcurrentTasks, final int maxConcurrentTasksPerServer,
364      final int maxConcurrentTasksPerRegion, final AtomicLong tasksInProgress,
365      final Map<ServerName, AtomicInteger> taskCounterPerServer,
366      final Map<byte[], AtomicInteger> taskCounterPerRegion) {
367      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
368      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
369      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
370      this.taskCounterPerRegion = taskCounterPerRegion;
371      this.taskCounterPerServer = taskCounterPerServer;
372      this.tasksInProgress = tasksInProgress;
373    }
374
375    @Override
376    public void reset() throws InterruptedIOException {
377      // prevent the busy-waiting
378      waitForRegion();
379      regionsIncluded.clear();
380      serversIncluded.clear();
381      busyRegions.clear();
382    }
383
384    private void waitForRegion() throws InterruptedIOException {
385      if (busyRegions.isEmpty()) {
386        return;
387      }
388      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
389      final long start = ee.currentTime();
390      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
391        for (byte[] region : busyRegions) {
392          AtomicInteger count = taskCounterPerRegion.get(region);
393          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
394            return;
395          }
396        }
397        try {
398          synchronized (tasksInProgress) {
399            tasksInProgress.wait(10);
400          }
401        } catch (InterruptedException e) {
402          throw new InterruptedIOException("Interrupted." + " tasksInProgress=" + tasksInProgress);
403        }
404      }
405    }
406
407    /**
408     * 1) check the regions is allowed. 2) check the concurrent tasks for regions. 3) check the
409     * total concurrent tasks. 4) check the concurrent tasks for server.
410     * @param loc           the destination of data
411     * @param heapSizeOfRow the data size
412     * @return either Include {@link RequestController.ReturnCode} or skip
413     *         {@link RequestController.ReturnCode}
414     */
415    @Override
416    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
417
418      HRegionInfo regionInfo = loc.getRegionInfo();
419      if (regionsIncluded.contains(regionInfo)) {
420        // We already know what to do with this region.
421        return ReturnCode.INCLUDE;
422      }
423      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
424      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
425        // Too many tasks on this region already.
426        return ReturnCode.SKIP;
427      }
428      int newServers =
429        serversIncluded.size() + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
430      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
431        // Too many tasks.
432        return ReturnCode.SKIP;
433      }
434      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
435      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
436        // Too many tasks for this individual server
437        return ReturnCode.SKIP;
438      }
439      return ReturnCode.INCLUDE;
440    }
441
442    @Override
443    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
444      if (code == ReturnCode.INCLUDE) {
445        regionsIncluded.add(loc.getRegionInfo());
446        serversIncluded.add(loc.getServerName());
447      }
448      busyRegions.add(loc.getRegionInfo().getRegionName());
449    }
450  }
451
452  /**
453   * limit the number of rows for each request.
454   */
455  static class RequestRowsChecker implements RowChecker {
456
457    private final long maxRowsPerRequest;
458    private final Map<ServerName, Long> serverRows = new HashMap<>();
459
460    RequestRowsChecker(final long maxRowsPerRequest) {
461      this.maxRowsPerRequest = maxRowsPerRequest;
462    }
463
464    @Override
465    public void reset() {
466      serverRows.clear();
467    }
468
469    @Override
470    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
471      long currentRows =
472        serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L;
473      // accept at least one row
474      if (currentRows == 0 || currentRows < maxRowsPerRequest) {
475        return ReturnCode.INCLUDE;
476      }
477      return ReturnCode.SKIP;
478    }
479
480    @Override
481    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
482      if (code == ReturnCode.INCLUDE) {
483        long currentRows =
484          serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L;
485        serverRows.put(loc.getServerName(), currentRows + 1);
486      }
487    }
488  }
489
490  /**
491   * limit the heap size for each request.
492   */
493  static class RequestHeapSizeChecker implements RowChecker {
494
495    private final long maxHeapSizePerRequest;
496    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
497
498    RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
499      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
500    }
501
502    @Override
503    public void reset() {
504      serverRequestSizes.clear();
505    }
506
507    @Override
508    public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
509      // Is it ok for limit of request size?
510      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
511        ? serverRequestSizes.get(loc.getServerName())
512        : 0L;
513      // accept at least one request
514      if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
515        return ReturnCode.INCLUDE;
516      }
517      return ReturnCode.SKIP;
518    }
519
520    @Override
521    public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
522      if (code == ReturnCode.INCLUDE) {
523        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
524          ? serverRequestSizes.get(loc.getServerName())
525          : 0L;
526        serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
527      }
528    }
529  }
530
531  /**
532   * Provide a way to control the flow of rows iteration.
533   */
534  interface RowChecker {
535
536    ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
537
538    /**
539     * Add the final ReturnCode to the checker. The ReturnCode may be reversed, so the checker need
540     * the final decision to update the inner state.
541     * @param code          The final decision
542     * @param loc           the destination of data
543     * @param heapSizeOfRow the data size
544     */
545    void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
546
547    /**
548     * Reset the inner state.
549     */
550    void reset() throws InterruptedIOException;
551  }
552}