001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
021import static org.apache.hadoop.hbase.backup.impl.BackupSystemTable.Order.NEW_TO_OLD;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.BackupHFileCleaner;
034import org.apache.hadoop.hbase.backup.BackupInfo;
035import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
036import org.apache.hadoop.hbase.backup.BackupMasterObserver;
037import org.apache.hadoop.hbase.backup.BackupObserver;
038import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
039import org.apache.hadoop.hbase.backup.BackupType;
040import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
041import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
042import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
047import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
048import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Handles backup requests, creates backup info records in backup system table to keep track of
056 * backup sessions, dispatches backup request.
057 */
058@InterfaceAudience.Private
059public class BackupManager implements Closeable {
060  // in seconds
061  public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
062    "hbase.backup.exclusive.op.timeout.seconds";
063  // In seconds
064  private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
065  private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
066
067  protected Configuration conf = null;
068  protected BackupInfo backupInfo = null;
069  protected BackupSystemTable systemTable;
070  protected final Connection conn;
071
072  /**
073   * Backup manager constructor.
074   * @param conn connection
075   * @param conf configuration
076   * @throws IOException exception
077   */
078  public BackupManager(Connection conn, Configuration conf) throws IOException {
079    if (
080      !conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
081        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
082    ) {
083      throw new BackupException("HBase backup is not enabled. Check your "
084        + BackupRestoreConstants.BACKUP_ENABLE_KEY + " setting.");
085    }
086    this.conf = conf;
087    this.conn = conn;
088    this.systemTable = new BackupSystemTable(conn);
089  }
090
091  /**
092   * Returns backup info
093   */
094  protected BackupInfo getBackupInfo() {
095    return backupInfo;
096  }
097
098  /**
099   * This method modifies the master's configuration in order to inject backup-related features
100   * (TESTs only)
101   * @param conf configuration
102   */
103  public static void decorateMasterConfiguration(Configuration conf) {
104    if (!isBackupEnabled(conf)) {
105      return;
106    }
107    // Add WAL archive cleaner plug-in
108    String plugins = conf.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
109    String cleanerClass = BackupLogCleaner.class.getCanonicalName();
110    if (!plugins.contains(cleanerClass)) {
111      conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
112    }
113
114    String classes = conf.get(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY);
115    String masterProcedureClass = LogRollMasterProcedureManager.class.getName();
116    if (classes == null) {
117      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass);
118    } else if (!classes.contains(masterProcedureClass)) {
119      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
120        classes + "," + masterProcedureClass);
121    }
122
123    plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
124    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
125      (plugins == null ? "" : plugins + ",") + BackupHFileCleaner.class.getName());
126
127    String observerClass = BackupMasterObserver.class.getName();
128    String masterCoProc = conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
129    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
130      (masterCoProc == null ? "" : masterCoProc + ",") + observerClass);
131
132    if (LOG.isDebugEnabled()) {
133      LOG.debug(
134        "Added log cleaner: {}. Added master procedure manager: {}."
135          + " Added master procedure manager: {}. Added master observer: {}",
136        cleanerClass, masterProcedureClass, BackupHFileCleaner.class.getName(), observerClass);
137    }
138  }
139
140  /**
141   * This method modifies the Region Server configuration in order to inject backup-related features
142   * TESTs only.
143   * @param conf configuration
144   */
145  public static void decorateRegionServerConfiguration(Configuration conf) {
146    if (!isBackupEnabled(conf)) {
147      return;
148    }
149
150    String classes = conf.get(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY);
151    String regionProcedureClass = LogRollRegionServerProcedureManager.class.getName();
152    if (classes == null) {
153      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass);
154    } else if (!classes.contains(regionProcedureClass)) {
155      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
156        classes + "," + regionProcedureClass);
157    }
158    String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
159    String regionObserverClass = BackupObserver.class.getName();
160    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
161      (coproc == null ? "" : coproc + ",") + regionObserverClass);
162    if (LOG.isDebugEnabled()) {
163      LOG.debug("Added region procedure manager: {}. Added region observer: {}",
164        regionProcedureClass, regionObserverClass);
165    }
166  }
167
168  public static boolean isBackupEnabled(Configuration conf) {
169    return conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
170      BackupRestoreConstants.BACKUP_ENABLE_DEFAULT);
171  }
172
173  /**
174   * Get configuration
175   */
176  Configuration getConf() {
177    return conf;
178  }
179
180  /**
181   * Stop all the work of backup.
182   */
183  @Override
184  public void close() {
185    if (systemTable != null) {
186      try {
187        systemTable.close();
188      } catch (Exception e) {
189        LOG.error(e.toString(), e);
190      }
191    }
192  }
193
194  /**
195   * Creates a backup info based on input backup request.
196   * @param backupId      backup id
197   * @param type          type
198   * @param tableList     table list
199   * @param targetRootDir root dir
200   * @param workers       number of parallel workers
201   * @param bandwidth     bandwidth per worker in MB per sec
202   * @throws BackupException exception
203   */
204  public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
205    String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify)
206    throws BackupException {
207    if (targetRootDir == null) {
208      throw new BackupException("Wrong backup request parameter: target backup root directory");
209    }
210
211    if (type == BackupType.FULL && (tableList == null || tableList.isEmpty())) {
212      // If table list is null for full backup, which means backup all tables. Then fill the table
213      // list with all user tables from meta. It no table available, throw the request exception.
214      List<TableDescriptor> htds = null;
215      try (Admin admin = conn.getAdmin()) {
216        htds = admin.listTableDescriptors();
217      } catch (Exception e) {
218        throw new BackupException(e);
219      }
220
221      if (htds == null) {
222        throw new BackupException("No table exists for full backup of all tables.");
223      } else {
224        tableList = new ArrayList<>();
225        for (TableDescriptor hTableDescriptor : htds) {
226          TableName tn = hTableDescriptor.getTableName();
227          if (tn.equals(BackupSystemTable.getTableName(conf))) {
228            // skip backup system table
229            continue;
230          }
231          tableList.add(hTableDescriptor.getTableName());
232        }
233
234        LOG.info("Full backup all the tables available in the cluster: {}", tableList);
235      }
236    }
237
238    // there are one or more tables in the table list
239    backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
240      targetRootDir);
241    backupInfo.setBandwidth(bandwidth);
242    backupInfo.setWorkers(workers);
243    backupInfo.setNoChecksumVerify(noChecksumVerify);
244    return backupInfo;
245  }
246
247  /**
248   * Check if any ongoing backup. Currently, we only reply on checking status in backup system
249   * table. We need to consider to handle the case of orphan records in the future. Otherwise, all
250   * the coming request will fail.
251   * @return the ongoing backup id if on going backup exists, otherwise null
252   * @throws IOException exception
253   */
254  private String getOngoingBackupId() throws IOException {
255    List<BackupInfo> sessions =
256      systemTable.getBackupHistory(NEW_TO_OLD, 1, withState(BackupState.RUNNING));
257    if (sessions.size() == 0) {
258      return null;
259    }
260    return sessions.get(0).getBackupId();
261  }
262
263  /**
264   * Start the backup manager service.
265   * @throws IOException exception
266   */
267  public void initialize() throws IOException {
268    String ongoingBackupId = this.getOngoingBackupId();
269    if (ongoingBackupId != null) {
270      LOG.info("There is a ongoing backup {}"
271        + ". Can not launch new backup until no ongoing backup remains.", ongoingBackupId);
272      throw new BackupException("There is ongoing backup seesion.");
273    }
274  }
275
276  public void setBackupInfo(BackupInfo backupInfo) {
277    this.backupInfo = backupInfo;
278  }
279
280  /*
281   * backup system table operations
282   */
283
284  /**
285   * Updates status (state) of a backup session in a persistent store
286   * @param context context
287   * @throws IOException exception
288   */
289  public void updateBackupInfo(BackupInfo context) throws IOException {
290    systemTable.updateBackupInfo(context);
291  }
292
293  /**
294   * Starts new backup session
295   * @throws IOException if active session already exists
296   */
297  public void startBackupSession() throws IOException {
298    long startTime = EnvironmentEdgeManager.currentTime();
299    long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
300      DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
301    long lastWarningOutputTime = 0;
302    while (EnvironmentEdgeManager.currentTime() - startTime < timeout) {
303      try {
304        systemTable.startBackupExclusiveOperation();
305        return;
306      } catch (IOException e) {
307        if (e instanceof ExclusiveOperationException) {
308          // sleep, then repeat
309          try {
310            Thread.sleep(1000);
311          } catch (InterruptedException e1) {
312            // Restore the interrupted status
313            Thread.currentThread().interrupt();
314          }
315          if (
316            lastWarningOutputTime == 0
317              || (EnvironmentEdgeManager.currentTime() - lastWarningOutputTime) > 60000
318          ) {
319            lastWarningOutputTime = EnvironmentEdgeManager.currentTime();
320            LOG.warn("Waiting to acquire backup exclusive lock for {}s",
321              +(lastWarningOutputTime - startTime) / 1000);
322          }
323        } else {
324          throw e;
325        }
326      }
327    }
328    throw new IOException(
329      "Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s");
330  }
331
332  /**
333   * Finishes active backup session
334   * @throws IOException if no active session
335   */
336  public void finishBackupSession() throws IOException {
337    systemTable.finishBackupExclusiveOperation();
338  }
339
340  /**
341   * Read the last backup start code (timestamp) of last successful backup. Will return null if
342   * there is no startcode stored in backup system table or the value is of length 0. These two
343   * cases indicate there is no successful backup completed so far.
344   * @return the timestamp of a last successful backup
345   * @throws IOException exception
346   */
347  public String readBackupStartCode() throws IOException {
348    return systemTable.readBackupStartCode(backupInfo.getBackupRootDir());
349  }
350
351  /**
352   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
353   * @param startCode start code
354   * @throws IOException exception
355   */
356  public void writeBackupStartCode(Long startCode) throws IOException {
357    systemTable.writeBackupStartCode(startCode, backupInfo.getBackupRootDir());
358  }
359
360  /**
361   * Get the RS log information after the last log roll from backup system table.
362   * @return RS log info
363   * @throws IOException exception
364   */
365  public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOException {
366    return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
367  }
368
369  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
370    return systemTable.readBulkloadRows(tableList);
371  }
372
373  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
374    systemTable.deleteBulkLoadedRows(rows);
375  }
376
377  /**
378   * Get all backup information, ordered by descending start time. I.e. from newest to oldest.
379   */
380  public List<BackupInfo> getBackupHistory(BackupInfo.Filter... filters) throws IOException {
381    return systemTable.getBackupHistory(filters);
382  }
383
384  /**
385   * Write the current timestamps for each regionserver to backup system table after a successful
386   * full or incremental backup. Each table may have a different set of log timestamps. The saved
387   * timestamp is of the last log file that was backed up already.
388   * @param tables tables
389   * @throws IOException exception
390   */
391  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps)
392    throws IOException {
393    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
394  }
395
396  /**
397   * Read the timestamp for each region server log after the last successful backup. Each table has
398   * its own set of the timestamps.
399   * @return the timestamp for each region server. key: tableName value:
400   *         RegionServer,PreviousTimeStamp
401   * @throws IOException exception
402   */
403  public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
404    return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
405  }
406
407  /**
408   * Return the current tables covered by incremental backup.
409   * @return set of tableNames
410   * @throws IOException exception
411   */
412  public Set<TableName> getIncrementalBackupTableSet() throws IOException {
413    return systemTable.getIncrementalBackupTableSet(backupInfo.getBackupRootDir());
414  }
415
416  /**
417   * Adds set of tables to overall incremental backup table set
418   * @param tables tables
419   * @throws IOException exception
420   */
421  public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOException {
422    systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
423  }
424
425  public Connection getConnection() {
426    return conn;
427  }
428}