001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.regionserver; 019 020import java.io.Closeable; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.ExecutorCompletionService; 026import java.util.concurrent.Future; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.errorhandling.ForeignException; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 039 040/** 041 * Handle running each of the individual tasks for completing a backup procedure on a region server. 042 */ 043@InterfaceAudience.Private 044public class LogRollBackupSubprocedurePool implements Closeable, Abortable { 045 private static final Logger LOG = LoggerFactory.getLogger(LogRollBackupSubprocedurePool.class); 046 047 /** Maximum number of concurrent snapshot region tasks that can run concurrently */ 048 private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; 049 private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; 050 051 private final ExecutorCompletionService<Void> taskPool; 052 private final ThreadPoolExecutor executor; 053 private volatile boolean aborted; 054 private final List<Future<Void>> futures = new ArrayList<>(); 055 private final String name; 056 057 public LogRollBackupSubprocedurePool(String name, Configuration conf) { 058 // configure the executor service 059 long keepAlive = conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, 060 LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); 061 int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); 062 this.name = name; 063 executor = 064 new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), 065 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").setDaemon(true) 066 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 067 taskPool = new ExecutorCompletionService<>(executor); 068 } 069 070 /** 071 * Submit a task to the pool. 072 */ 073 public void submitTask(final Callable<Void> task) { 074 Future<Void> f = this.taskPool.submit(task); 075 futures.add(f); 076 } 077 078 /** 079 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} 080 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 081 * @throws ForeignException exception 082 */ 083 public boolean waitForOutstandingTasks() throws ForeignException { 084 LOG.debug("Waiting for backup procedure to finish."); 085 086 try { 087 for (Future<Void> f : futures) { 088 f.get(); 089 } 090 return true; 091 } catch (InterruptedException e) { 092 if (aborted) { 093 throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", 094 e); 095 } 096 Thread.currentThread().interrupt(); 097 } catch (ExecutionException e) { 098 if (e.getCause() instanceof ForeignException) { 099 throw (ForeignException) e.getCause(); 100 } 101 throw new ForeignException(name, e.getCause()); 102 } finally { 103 // close off remaining tasks 104 for (Future<Void> f : futures) { 105 if (!f.isDone()) { 106 f.cancel(true); 107 } 108 } 109 } 110 return false; 111 } 112 113 /** 114 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly 115 * finish 116 */ 117 @Override 118 public void close() { 119 executor.shutdown(); 120 } 121 122 @Override 123 public void abort(String why, Throwable e) { 124 if (this.aborted) { 125 return; 126 } 127 128 this.aborted = true; 129 LOG.warn("Aborting because: " + why, e); 130 this.executor.shutdownNow(); 131 } 132 133 @Override 134 public boolean isAborted() { 135 return this.aborted; 136 } 137}