001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.backup.BackupHFileCleaner;
033import org.apache.hadoop.hbase.backup.BackupInfo;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
035import org.apache.hadoop.hbase.backup.BackupMasterObserver;
036import org.apache.hadoop.hbase.backup.BackupObserver;
037import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
038import org.apache.hadoop.hbase.backup.BackupType;
039import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
040import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
041import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
047import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Handles backup requests, creates backup info records in backup system table to keep track of
055 * backup sessions, dispatches backup request.
056 */
057@InterfaceAudience.Private
058public class BackupManager implements Closeable {
059  // in seconds
060  public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
061    "hbase.backup.exclusive.op.timeout.seconds";
062  // In seconds
063  private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
064  private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
065
066  protected Configuration conf = null;
067  protected BackupInfo backupInfo = null;
068  protected BackupSystemTable systemTable;
069  protected final Connection conn;
070
071  /**
072   * Backup manager constructor.
073   * @param conn connection
074   * @param conf configuration
075   * @throws IOException exception
076   */
077  public BackupManager(Connection conn, Configuration conf) throws IOException {
078    if (
079      !conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
080        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
081    ) {
082      throw new BackupException("HBase backup is not enabled. Check your "
083        + BackupRestoreConstants.BACKUP_ENABLE_KEY + " setting.");
084    }
085    this.conf = conf;
086    this.conn = conn;
087    this.systemTable = new BackupSystemTable(conn);
088  }
089
090  /**
091   * Returns backup info
092   */
093  protected BackupInfo getBackupInfo() {
094    return backupInfo;
095  }
096
097  /**
098   * This method modifies the master's configuration in order to inject backup-related features
099   * (TESTs only)
100   * @param conf configuration
101   */
102  public static void decorateMasterConfiguration(Configuration conf) {
103    if (!isBackupEnabled(conf)) {
104      return;
105    }
106    // Add WAL archive cleaner plug-in
107    String plugins = conf.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
108    String cleanerClass = BackupLogCleaner.class.getCanonicalName();
109    if (!plugins.contains(cleanerClass)) {
110      conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
111    }
112
113    String classes = conf.get(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY);
114    String masterProcedureClass = LogRollMasterProcedureManager.class.getName();
115    if (classes == null) {
116      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass);
117    } else if (!classes.contains(masterProcedureClass)) {
118      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
119        classes + "," + masterProcedureClass);
120    }
121
122    plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
123    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
124      (plugins == null ? "" : plugins + ",") + BackupHFileCleaner.class.getName());
125
126    String observerClass = BackupMasterObserver.class.getName();
127    String masterCoProc = conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
128    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
129      (masterCoProc == null ? "" : masterCoProc + ",") + observerClass);
130
131    if (LOG.isDebugEnabled()) {
132      LOG.debug(
133        "Added log cleaner: {}. Added master procedure manager: {}."
134          + " Added master procedure manager: {}. Added master observer: {}",
135        cleanerClass, masterProcedureClass, BackupHFileCleaner.class.getName(), observerClass);
136    }
137  }
138
139  /**
140   * This method modifies the Region Server configuration in order to inject backup-related features
141   * TESTs only.
142   * @param conf configuration
143   */
144  public static void decorateRegionServerConfiguration(Configuration conf) {
145    if (!isBackupEnabled(conf)) {
146      return;
147    }
148
149    String classes = conf.get(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY);
150    String regionProcedureClass = LogRollRegionServerProcedureManager.class.getName();
151    if (classes == null) {
152      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass);
153    } else if (!classes.contains(regionProcedureClass)) {
154      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
155        classes + "," + regionProcedureClass);
156    }
157    String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
158    String regionObserverClass = BackupObserver.class.getName();
159    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
160      (coproc == null ? "" : coproc + ",") + regionObserverClass);
161    if (LOG.isDebugEnabled()) {
162      LOG.debug("Added region procedure manager: {}. Added region observer: {}",
163        regionProcedureClass, regionObserverClass);
164    }
165  }
166
167  public static boolean isBackupEnabled(Configuration conf) {
168    return conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
169      BackupRestoreConstants.BACKUP_ENABLE_DEFAULT);
170  }
171
172  /**
173   * Get configuration
174   */
175  Configuration getConf() {
176    return conf;
177  }
178
179  /**
180   * Stop all the work of backup.
181   */
182  @Override
183  public void close() {
184    if (systemTable != null) {
185      try {
186        systemTable.close();
187      } catch (Exception e) {
188        LOG.error(e.toString(), e);
189      }
190    }
191  }
192
193  /**
194   * Creates a backup info based on input backup request.
195   * @param backupId      backup id
196   * @param type          type
197   * @param tableList     table list
198   * @param targetRootDir root dir
199   * @param workers       number of parallel workers
200   * @param bandwidth     bandwidth per worker in MB per sec
201   * @throws BackupException exception
202   */
203  public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
204    String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify)
205    throws BackupException {
206    if (targetRootDir == null) {
207      throw new BackupException("Wrong backup request parameter: target backup root directory");
208    }
209
210    if (type == BackupType.FULL && (tableList == null || tableList.isEmpty())) {
211      // If table list is null for full backup, which means backup all tables. Then fill the table
212      // list with all user tables from meta. It no table available, throw the request exception.
213      List<TableDescriptor> htds = null;
214      try (Admin admin = conn.getAdmin()) {
215        htds = admin.listTableDescriptors();
216      } catch (Exception e) {
217        throw new BackupException(e);
218      }
219
220      if (htds == null) {
221        throw new BackupException("No table exists for full backup of all tables.");
222      } else {
223        tableList = new ArrayList<>();
224        for (TableDescriptor hTableDescriptor : htds) {
225          TableName tn = hTableDescriptor.getTableName();
226          if (tn.equals(BackupSystemTable.getTableName(conf))) {
227            // skip backup system table
228            continue;
229          }
230          tableList.add(hTableDescriptor.getTableName());
231        }
232
233        LOG.info("Full backup all the tables available in the cluster: {}", tableList);
234      }
235    }
236
237    // there are one or more tables in the table list
238    backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
239      targetRootDir);
240    backupInfo.setBandwidth(bandwidth);
241    backupInfo.setWorkers(workers);
242    backupInfo.setNoChecksumVerify(noChecksumVerify);
243    return backupInfo;
244  }
245
246  /**
247   * Check if any ongoing backup. Currently, we only reply on checking status in backup system
248   * table. We need to consider to handle the case of orphan records in the future. Otherwise, all
249   * the coming request will fail.
250   * @return the ongoing backup id if on going backup exists, otherwise null
251   * @throws IOException exception
252   */
253  private String getOngoingBackupId() throws IOException {
254    List<BackupInfo> sessions = systemTable.getBackupInfos(withState(BackupState.RUNNING));
255    if (sessions.size() == 0) {
256      return null;
257    }
258    return sessions.get(0).getBackupId();
259  }
260
261  /**
262   * Start the backup manager service.
263   * @throws IOException exception
264   */
265  public void initialize() throws IOException {
266    String ongoingBackupId = this.getOngoingBackupId();
267    if (ongoingBackupId != null) {
268      LOG.info("There is a ongoing backup {}"
269        + ". Can not launch new backup until no ongoing backup remains.", ongoingBackupId);
270      throw new BackupException("There is ongoing backup seesion.");
271    }
272  }
273
274  public void setBackupInfo(BackupInfo backupInfo) {
275    this.backupInfo = backupInfo;
276  }
277
278  /*
279   * backup system table operations
280   */
281
282  /**
283   * Updates status (state) of a backup session in a persistent store
284   * @param context context
285   * @throws IOException exception
286   */
287  public void updateBackupInfo(BackupInfo context) throws IOException {
288    systemTable.updateBackupInfo(context);
289  }
290
291  /**
292   * Starts new backup session
293   * @throws IOException if active session already exists
294   */
295  public void startBackupSession() throws IOException {
296    long startTime = EnvironmentEdgeManager.currentTime();
297    long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
298      DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
299    long lastWarningOutputTime = 0;
300    while (EnvironmentEdgeManager.currentTime() - startTime < timeout) {
301      try {
302        systemTable.startBackupExclusiveOperation();
303        return;
304      } catch (IOException e) {
305        if (e instanceof ExclusiveOperationException) {
306          // sleep, then repeat
307          try {
308            Thread.sleep(1000);
309          } catch (InterruptedException e1) {
310            // Restore the interrupted status
311            Thread.currentThread().interrupt();
312          }
313          if (
314            lastWarningOutputTime == 0
315              || (EnvironmentEdgeManager.currentTime() - lastWarningOutputTime) > 60000
316          ) {
317            lastWarningOutputTime = EnvironmentEdgeManager.currentTime();
318            LOG.warn("Waiting to acquire backup exclusive lock for {}s",
319              +(lastWarningOutputTime - startTime) / 1000);
320          }
321        } else {
322          throw e;
323        }
324      }
325    }
326    throw new IOException(
327      "Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s");
328  }
329
330  /**
331   * Finishes active backup session
332   * @throws IOException if no active session
333   */
334  public void finishBackupSession() throws IOException {
335    systemTable.finishBackupExclusiveOperation();
336  }
337
338  /**
339   * Read the last backup start code (timestamp) of last successful backup. Will return null if
340   * there is no startcode stored in backup system table or the value is of length 0. These two
341   * cases indicate there is no successful backup completed so far.
342   * @return the timestamp of a last successful backup
343   * @throws IOException exception
344   */
345  public String readBackupStartCode() throws IOException {
346    return systemTable.readBackupStartCode(backupInfo.getBackupRootDir());
347  }
348
349  /**
350   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
351   * @param startCode start code
352   * @throws IOException exception
353   */
354  public void writeBackupStartCode(Long startCode) throws IOException {
355    systemTable.writeBackupStartCode(startCode, backupInfo.getBackupRootDir());
356  }
357
358  /**
359   * Get the RS log information after the last log roll from backup system table.
360   * @return RS log info
361   * @throws IOException exception
362   */
363  public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOException {
364    return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
365  }
366
367  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
368    return systemTable.readBulkloadRows(tableList);
369  }
370
371  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
372    systemTable.deleteBulkLoadedRows(rows);
373  }
374
375  /**
376   * Get all backup information, ordered by descending start time. I.e. from newest to oldest.
377   */
378  public List<BackupInfo> getBackupHistory(BackupInfo.Filter... filters) throws IOException {
379    return systemTable.getBackupHistory(filters);
380  }
381
382  /**
383   * Write the current timestamps for each regionserver to backup system table after a successful
384   * full or incremental backup. Each table may have a different set of log timestamps. The saved
385   * timestamp is of the last log file that was backed up already.
386   * @param tables tables
387   * @throws IOException exception
388   */
389  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps)
390    throws IOException {
391    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
392  }
393
394  /**
395   * Read the timestamp for each region server log after the last successful backup. Each table has
396   * its own set of the timestamps.
397   * @return the timestamp for each region server. key: tableName value:
398   *         RegionServer,PreviousTimeStamp
399   * @throws IOException exception
400   */
401  public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
402    return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
403  }
404
405  /**
406   * Return the current tables covered by incremental backup.
407   * @return set of tableNames
408   * @throws IOException exception
409   */
410  public Set<TableName> getIncrementalBackupTableSet() throws IOException {
411    return systemTable.getIncrementalBackupTableSet(backupInfo.getBackupRootDir());
412  }
413
414  /**
415   * Adds set of tables to overall incremental backup table set
416   * @param tables tables
417   * @throws IOException exception
418   */
419  public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOException {
420    systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
421  }
422
423  public Connection getConnection() {
424    return conn;
425  }
426}