001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.backup.regionserver;
020
021import java.io.Closeable;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.Future;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.errorhandling.ForeignException;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Handle running each of the individual tasks for completing a backup procedure on a region
042 * server.
043 */
044@InterfaceAudience.Private
045public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
046  private static final Logger LOG = LoggerFactory.getLogger(LogRollBackupSubprocedurePool.class);
047
048  /** Maximum number of concurrent snapshot region tasks that can run concurrently */
049  private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
050  private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
051
052  private final ExecutorCompletionService<Void> taskPool;
053  private final ThreadPoolExecutor executor;
054  private volatile boolean aborted;
055  private final List<Future<Void>> futures = new ArrayList<>();
056  private final String name;
057
058  public LogRollBackupSubprocedurePool(String name, Configuration conf) {
059    // configure the executor service
060    long keepAlive =
061        conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
062          LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
063    int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
064    this.name = name;
065    executor =
066        new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
067            new LinkedBlockingQueue<>(),
068            Threads.newDaemonThreadFactory("rs(" + name + ")-backup"));
069    taskPool = new ExecutorCompletionService<>(executor);
070  }
071
072  /**
073   * Submit a task to the pool.
074   */
075  public void submitTask(final Callable<Void> task) {
076    Future<Void> f = this.taskPool.submit(task);
077    futures.add(f);
078  }
079
080  /**
081   * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
082   * @return <tt>true</tt> on success, <tt>false</tt> otherwise
083   * @throws ForeignException exception
084   */
085  public boolean waitForOutstandingTasks() throws ForeignException {
086    LOG.debug("Waiting for backup procedure to finish.");
087
088    try {
089      for (Future<Void> f : futures) {
090        f.get();
091      }
092      return true;
093    } catch (InterruptedException e) {
094      if (aborted) {
095        throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
096            e);
097      }
098      Thread.currentThread().interrupt();
099    } catch (ExecutionException e) {
100      if (e.getCause() instanceof ForeignException) {
101        throw (ForeignException) e.getCause();
102      }
103      throw new ForeignException(name, e.getCause());
104    } finally {
105      // close off remaining tasks
106      for (Future<Void> f : futures) {
107        if (!f.isDone()) {
108          f.cancel(true);
109        }
110      }
111    }
112    return false;
113  }
114
115  /**
116   * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
117   * finish
118   */
119  @Override
120  public void close() {
121    executor.shutdown();
122  }
123
124  @Override
125  public void abort(String why, Throwable e) {
126    if (this.aborted) {
127      return;
128    }
129
130    this.aborted = true;
131    LOG.warn("Aborting because: " + why, e);
132    this.executor.shutdownNow();
133  }
134
135  @Override
136  public boolean isAborted() {
137    return this.aborted;
138  }
139}