001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.ThreadPoolExecutor; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 024import org.apache.hadoop.hbase.backup.impl.BackupManager; 025import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; 026import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 027import org.apache.hadoop.hbase.procedure.ProcedureCoordinationManager; 028import org.apache.hadoop.hbase.procedure.ProcedureMember; 029import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 030import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 031import org.apache.hadoop.hbase.procedure.Subprocedure; 032import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 033import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinationManager; 034import org.apache.hadoop.hbase.regionserver.RegionServerServices; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.zookeeper.KeeperException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This manager class handles the work dealing with distributed WAL roll request. 042 * <p> 043 * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is 044 * responsible by this region server. If any failures occur with the sub-procedure, the manager's 045 * procedure member notifies the procedure coordinator to abort all others. 046 * <p> 047 * On startup, requires {@link #start()} to be called. 048 * <p> 049 * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called 050 */ 051@InterfaceAudience.Private 052public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { 053 private static final Logger LOG = 054 LoggerFactory.getLogger(LogRollRegionServerProcedureManager.class); 055 056 /** Conf key for number of request threads to start backup on region servers */ 057 public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; 058 /** # of threads for backup work on the rs. */ 059 public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; 060 061 public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; 062 public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; 063 064 /** Conf key for millis between checks to see if backup work completed or if there are errors */ 065 public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; 066 /** Default amount of time to check for errors while regions finish backup work */ 067 private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; 068 069 private RegionServerServices rss; 070 private ProcedureMemberRpcs memberRpcs; 071 private ProcedureMember member; 072 private boolean started = false; 073 074 /** 075 * Create a default backup procedure manager 076 */ 077 public LogRollRegionServerProcedureManager() { 078 } 079 080 /** 081 * Start accepting backup procedure requests. 082 */ 083 @Override 084 public void start() { 085 if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { 086 LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY 087 + " setting"); 088 return; 089 } 090 this.memberRpcs.start(rss.getServerName().toString(), member); 091 started = true; 092 LOG.info("Started region server backup manager."); 093 } 094 095 /** 096 * Close <tt>this</tt> and all running backup procedure tasks 097 * @param force forcefully stop all running tasks 098 * @throws IOException exception 099 */ 100 @Override 101 public void stop(boolean force) throws IOException { 102 if (!started) { 103 return; 104 } 105 String mode = force ? "abruptly" : "gracefully"; 106 LOG.info("Stopping RegionServerBackupManager " + mode + "."); 107 108 try { 109 this.member.close(); 110 } finally { 111 this.memberRpcs.close(); 112 } 113 } 114 115 /** 116 * If in a running state, creates the specified subprocedure for handling a backup procedure. 117 * @return Subprocedure to submit to the ProcedureMember. 118 */ 119 public Subprocedure buildSubprocedure(byte[] data) { 120 // don't run a backup if the parent is stop(ping) 121 if (rss.isStopping() || rss.isStopped()) { 122 throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() 123 + ", because stopping/stopped!"); 124 } 125 126 LOG.info("Attempting to run a roll log procedure for backup."); 127 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); 128 Configuration conf = rss.getConfiguration(); 129 long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); 130 long wakeMillis = 131 conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); 132 133 LogRollBackupSubprocedurePool taskManager = 134 new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); 135 return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, 136 taskManager, data); 137 } 138 139 /** 140 * Build the actual backup procedure runner that will do all the 'hard' work 141 */ 142 public class BackupSubprocedureBuilder implements SubprocedureFactory { 143 @Override 144 public Subprocedure buildSubprocedure(String name, byte[] data) { 145 return LogRollRegionServerProcedureManager.this.buildSubprocedure(data); 146 } 147 } 148 149 @Override 150 public void initialize(RegionServerServices rss) throws KeeperException { 151 this.rss = rss; 152 if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { 153 LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY 154 + " setting"); 155 return; 156 } 157 ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(rss); 158 this.memberRpcs = coordManager 159 .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); 160 161 // read in the backup handler configuration properties 162 Configuration conf = rss.getConfiguration(); 163 long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); 164 int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); 165 // create the actual cohort member 166 ThreadPoolExecutor pool = 167 ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); 168 this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); 169 } 170 171 @Override 172 public String getProcedureSignature() { 173 return "backup-proc"; 174 } 175}