001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.regionserver;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadPoolExecutor;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
024import org.apache.hadoop.hbase.backup.impl.BackupManager;
025import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
026import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
027import org.apache.hadoop.hbase.procedure.ProcedureCoordinationManager;
028import org.apache.hadoop.hbase.procedure.ProcedureMember;
029import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
030import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
031import org.apache.hadoop.hbase.procedure.Subprocedure;
032import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
033import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinationManager;
034import org.apache.hadoop.hbase.regionserver.RegionServerServices;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.zookeeper.KeeperException;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * This manager class handles the work dealing with distributed WAL roll request.
042 * <p>
043 * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
044 * responsible by this region server. If any failures occur with the sub-procedure, the manager's
045 * procedure member notifies the procedure coordinator to abort all others.
046 * <p>
047 * On startup, requires {@link #start()} to be called.
048 * <p>
049 * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called
050 */
051@InterfaceAudience.Private
052public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
053  private static final Logger LOG =
054    LoggerFactory.getLogger(LogRollRegionServerProcedureManager.class);
055
056  /** Conf key for number of request threads to start backup on region servers */
057  public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
058  /** # of threads for backup work on the rs. */
059  public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
060
061  public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
062  public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
063
064  /** Conf key for millis between checks to see if backup work completed or if there are errors */
065  public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
066  /** Default amount of time to check for errors while regions finish backup work */
067  private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
068
069  private RegionServerServices rss;
070  private ProcedureMemberRpcs memberRpcs;
071  private ProcedureMember member;
072  private boolean started = false;
073
074  /**
075   * Create a default backup procedure manager
076   */
077  public LogRollRegionServerProcedureManager() {
078  }
079
080  /**
081   * Start accepting backup procedure requests.
082   */
083  @Override
084  public void start() {
085    if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
086      LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
087        + " setting");
088      return;
089    }
090    this.memberRpcs.start(rss.getServerName().toString(), member);
091    started = true;
092    LOG.info("Started region server backup manager.");
093  }
094
095  /**
096   * Close <tt>this</tt> and all running backup procedure tasks
097   * @param force forcefully stop all running tasks
098   * @throws IOException exception
099   */
100  @Override
101  public void stop(boolean force) throws IOException {
102    if (!started) {
103      return;
104    }
105    String mode = force ? "abruptly" : "gracefully";
106    LOG.info("Stopping RegionServerBackupManager " + mode + ".");
107
108    try {
109      this.member.close();
110    } finally {
111      this.memberRpcs.close();
112    }
113  }
114
115  /**
116   * If in a running state, creates the specified subprocedure for handling a backup procedure.
117   * @return Subprocedure to submit to the ProcedureMember.
118   */
119  public Subprocedure buildSubprocedure(byte[] data) {
120    // don't run a backup if the parent is stop(ping)
121    if (rss.isStopping() || rss.isStopped()) {
122      throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
123        + ", because stopping/stopped!");
124    }
125
126    LOG.info("Attempting to run a roll log procedure for backup.");
127    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
128    Configuration conf = rss.getConfiguration();
129    long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
130    long wakeMillis =
131      conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
132
133    LogRollBackupSubprocedurePool taskManager =
134      new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
135    return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
136      taskManager, data);
137  }
138
139  /**
140   * Build the actual backup procedure runner that will do all the 'hard' work
141   */
142  public class BackupSubprocedureBuilder implements SubprocedureFactory {
143    @Override
144    public Subprocedure buildSubprocedure(String name, byte[] data) {
145      return LogRollRegionServerProcedureManager.this.buildSubprocedure(data);
146    }
147  }
148
149  @Override
150  public void initialize(RegionServerServices rss) throws KeeperException {
151    this.rss = rss;
152    if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
153      LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
154        + " setting");
155      return;
156    }
157    ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(rss);
158    this.memberRpcs = coordManager
159      .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
160
161    // read in the backup handler configuration properties
162    Configuration conf = rss.getConfiguration();
163    long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
164    int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
165    // create the actual cohort member
166    ThreadPoolExecutor pool =
167      ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
168    this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
169  }
170
171  @Override
172  public String getProcedureSignature() {
173    return "backup-proc";
174  }
175}