001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.regionserver;
019
020import java.io.Closeable;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.ExecutorCompletionService;
026import java.util.concurrent.Future;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.errorhandling.ForeignException;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
039
040/**
041 * Handle running each of the individual tasks for completing a backup procedure on a region server.
042 */
043@InterfaceAudience.Private
044public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
045  private static final Logger LOG = LoggerFactory.getLogger(LogRollBackupSubprocedurePool.class);
046
047  /** Maximum number of concurrent snapshot region tasks that can run concurrently */
048  private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
049  private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
050
051  private final ExecutorCompletionService<Void> taskPool;
052  private final ThreadPoolExecutor executor;
053  private volatile boolean aborted;
054  private final List<Future<Void>> futures = new ArrayList<>();
055  private final String name;
056
057  public LogRollBackupSubprocedurePool(String name, Configuration conf) {
058    // configure the executor service
059    long keepAlive = conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
060      LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
061    int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
062    this.name = name;
063    executor =
064      new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
065        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").setDaemon(true)
066          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
067    taskPool = new ExecutorCompletionService<>(executor);
068  }
069
070  /**
071   * Submit a task to the pool.
072   */
073  public void submitTask(final Callable<Void> task) {
074    Future<Void> f = this.taskPool.submit(task);
075    futures.add(f);
076  }
077
078  /**
079   * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
080   * @return <tt>true</tt> on success, <tt>false</tt> otherwise
081   * @throws ForeignException exception
082   */
083  public boolean waitForOutstandingTasks() throws ForeignException {
084    LOG.debug("Waiting for backup procedure to finish.");
085
086    try {
087      for (Future<Void> f : futures) {
088        f.get();
089      }
090      return true;
091    } catch (InterruptedException e) {
092      if (aborted) {
093        throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
094          e);
095      }
096      Thread.currentThread().interrupt();
097    } catch (ExecutionException e) {
098      if (e.getCause() instanceof ForeignException) {
099        throw (ForeignException) e.getCause();
100      }
101      throw new ForeignException(name, e.getCause());
102    } finally {
103      // close off remaining tasks
104      for (Future<Void> f : futures) {
105        if (!f.isDone()) {
106          f.cancel(true);
107        }
108      }
109    }
110    return false;
111  }
112
113  /**
114   * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
115   * finish
116   */
117  @Override
118  public void close() {
119    executor.shutdown();
120  }
121
122  @Override
123  public void abort(String why, Throwable e) {
124    if (this.aborted) {
125      return;
126    }
127
128    this.aborted = true;
129    LOG.warn("Aborting because: " + why, e);
130    this.executor.shutdownNow();
131  }
132
133  @Override
134  public boolean isAborted() {
135    return this.aborted;
136  }
137}