001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.backup.BackupHFileCleaner;
031import org.apache.hadoop.hbase.backup.BackupInfo;
032import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
033import org.apache.hadoop.hbase.backup.BackupMasterObserver;
034import org.apache.hadoop.hbase.backup.BackupObserver;
035import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
036import org.apache.hadoop.hbase.backup.BackupType;
037import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
038import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
039import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
044import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
045import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Handles backup requests, creates backup info records in backup system table to keep track of
053 * backup sessions, dispatches backup request.
054 */
055@InterfaceAudience.Private
056public class BackupManager implements Closeable {
057  // in seconds
058  public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
059    "hbase.backup.exclusive.op.timeout.seconds";
060  // In seconds
061  private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
062  private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
063
064  protected Configuration conf = null;
065  protected BackupInfo backupInfo = null;
066  protected BackupSystemTable systemTable;
067  protected final Connection conn;
068
069  /**
070   * Backup manager constructor.
071   * @param conn connection
072   * @param conf configuration
073   * @throws IOException exception
074   */
075  public BackupManager(Connection conn, Configuration conf) throws IOException {
076    if (
077      !conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
078        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
079    ) {
080      throw new BackupException("HBase backup is not enabled. Check your "
081        + BackupRestoreConstants.BACKUP_ENABLE_KEY + " setting.");
082    }
083    this.conf = conf;
084    this.conn = conn;
085    this.systemTable = new BackupSystemTable(conn);
086  }
087
088  /**
089   * Returns backup info
090   */
091  protected BackupInfo getBackupInfo() {
092    return backupInfo;
093  }
094
095  /**
096   * This method modifies the master's configuration in order to inject backup-related features
097   * (TESTs only)
098   * @param conf configuration
099   */
100  public static void decorateMasterConfiguration(Configuration conf) {
101    if (!isBackupEnabled(conf)) {
102      return;
103    }
104    // Add WAL archive cleaner plug-in
105    String plugins = conf.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
106    String cleanerClass = BackupLogCleaner.class.getCanonicalName();
107    if (!plugins.contains(cleanerClass)) {
108      conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
109    }
110
111    String classes = conf.get(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY);
112    String masterProcedureClass = LogRollMasterProcedureManager.class.getName();
113    if (classes == null) {
114      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass);
115    } else if (!classes.contains(masterProcedureClass)) {
116      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
117        classes + "," + masterProcedureClass);
118    }
119
120    plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
121    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
122      (plugins == null ? "" : plugins + ",") + BackupHFileCleaner.class.getName());
123
124    String observerClass = BackupMasterObserver.class.getName();
125    String masterCoProc = conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
126    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
127      (masterCoProc == null ? "" : masterCoProc + ",") + observerClass);
128
129    if (LOG.isDebugEnabled()) {
130      LOG.debug(
131        "Added log cleaner: {}. Added master procedure manager: {}."
132          + " Added master procedure manager: {}. Added master observer: {}",
133        cleanerClass, masterProcedureClass, BackupHFileCleaner.class.getName(), observerClass);
134    }
135  }
136
137  /**
138   * This method modifies the Region Server configuration in order to inject backup-related features
139   * TESTs only.
140   * @param conf configuration
141   */
142  public static void decorateRegionServerConfiguration(Configuration conf) {
143    if (!isBackupEnabled(conf)) {
144      return;
145    }
146
147    String classes = conf.get(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY);
148    String regionProcedureClass = LogRollRegionServerProcedureManager.class.getName();
149    if (classes == null) {
150      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass);
151    } else if (!classes.contains(regionProcedureClass)) {
152      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
153        classes + "," + regionProcedureClass);
154    }
155    String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
156    String regionObserverClass = BackupObserver.class.getName();
157    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
158      (coproc == null ? "" : coproc + ",") + regionObserverClass);
159    if (LOG.isDebugEnabled()) {
160      LOG.debug("Added region procedure manager: {}. Added region observer: {}",
161        regionProcedureClass, regionObserverClass);
162    }
163  }
164
165  public static boolean isBackupEnabled(Configuration conf) {
166    return conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
167      BackupRestoreConstants.BACKUP_ENABLE_DEFAULT);
168  }
169
170  /**
171   * Get configuration
172   */
173  Configuration getConf() {
174    return conf;
175  }
176
177  /**
178   * Stop all the work of backup.
179   */
180  @Override
181  public void close() {
182    if (systemTable != null) {
183      try {
184        systemTable.close();
185      } catch (Exception e) {
186        LOG.error(e.toString(), e);
187      }
188    }
189  }
190
191  /**
192   * Creates a backup info based on input backup request.
193   * @param backupId      backup id
194   * @param type          type
195   * @param tableList     table list
196   * @param targetRootDir root dir
197   * @param workers       number of parallel workers
198   * @param bandwidth     bandwidth per worker in MB per sec
199   * @throws BackupException exception
200   */
201  public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
202    String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify)
203    throws BackupException {
204    if (targetRootDir == null) {
205      throw new BackupException("Wrong backup request parameter: target backup root directory");
206    }
207
208    if (type == BackupType.FULL && (tableList == null || tableList.isEmpty())) {
209      // If table list is null for full backup, which means backup all tables. Then fill the table
210      // list with all user tables from meta. It no table available, throw the request exception.
211      List<TableDescriptor> htds = null;
212      try (Admin admin = conn.getAdmin()) {
213        htds = admin.listTableDescriptors();
214      } catch (Exception e) {
215        throw new BackupException(e);
216      }
217
218      if (htds == null) {
219        throw new BackupException("No table exists for full backup of all tables.");
220      } else {
221        tableList = new ArrayList<>();
222        for (TableDescriptor hTableDescriptor : htds) {
223          TableName tn = hTableDescriptor.getTableName();
224          if (tn.equals(BackupSystemTable.getTableName(conf))) {
225            // skip backup system table
226            continue;
227          }
228          tableList.add(hTableDescriptor.getTableName());
229        }
230
231        LOG.info("Full backup all the tables available in the cluster: {}", tableList);
232      }
233    }
234
235    // there are one or more tables in the table list
236    backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
237      targetRootDir);
238    backupInfo.setBandwidth(bandwidth);
239    backupInfo.setWorkers(workers);
240    backupInfo.setNoChecksumVerify(noChecksumVerify);
241    return backupInfo;
242  }
243
244  /**
245   * Check if any ongoing backup. Currently, we only reply on checking status in backup system
246   * table. We need to consider to handle the case of orphan records in the future. Otherwise, all
247   * the coming request will fail.
248   * @return the ongoing backup id if on going backup exists, otherwise null
249   * @throws IOException exception
250   */
251  private String getOngoingBackupId() throws IOException {
252    ArrayList<BackupInfo> sessions = systemTable.getBackupInfos(BackupState.RUNNING);
253    if (sessions.size() == 0) {
254      return null;
255    }
256    return sessions.get(0).getBackupId();
257  }
258
259  /**
260   * Start the backup manager service.
261   * @throws IOException exception
262   */
263  public void initialize() throws IOException {
264    String ongoingBackupId = this.getOngoingBackupId();
265    if (ongoingBackupId != null) {
266      LOG.info("There is a ongoing backup {}"
267        + ". Can not launch new backup until no ongoing backup remains.", ongoingBackupId);
268      throw new BackupException("There is ongoing backup seesion.");
269    }
270  }
271
272  public void setBackupInfo(BackupInfo backupInfo) {
273    this.backupInfo = backupInfo;
274  }
275
276  /*
277   * backup system table operations
278   */
279
280  /**
281   * Updates status (state) of a backup session in a persistent store
282   * @param context context
283   * @throws IOException exception
284   */
285  public void updateBackupInfo(BackupInfo context) throws IOException {
286    systemTable.updateBackupInfo(context);
287  }
288
289  /**
290   * Starts new backup session
291   * @throws IOException if active session already exists
292   */
293  public void startBackupSession() throws IOException {
294    long startTime = EnvironmentEdgeManager.currentTime();
295    long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
296      DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
297    long lastWarningOutputTime = 0;
298    while (EnvironmentEdgeManager.currentTime() - startTime < timeout) {
299      try {
300        systemTable.startBackupExclusiveOperation();
301        return;
302      } catch (IOException e) {
303        if (e instanceof ExclusiveOperationException) {
304          // sleep, then repeat
305          try {
306            Thread.sleep(1000);
307          } catch (InterruptedException e1) {
308            // Restore the interrupted status
309            Thread.currentThread().interrupt();
310          }
311          if (
312            lastWarningOutputTime == 0
313              || (EnvironmentEdgeManager.currentTime() - lastWarningOutputTime) > 60000
314          ) {
315            lastWarningOutputTime = EnvironmentEdgeManager.currentTime();
316            LOG.warn("Waiting to acquire backup exclusive lock for {}s",
317              +(lastWarningOutputTime - startTime) / 1000);
318          }
319        } else {
320          throw e;
321        }
322      }
323    }
324    throw new IOException(
325      "Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s");
326  }
327
328  /**
329   * Finishes active backup session
330   * @throws IOException if no active session
331   */
332  public void finishBackupSession() throws IOException {
333    systemTable.finishBackupExclusiveOperation();
334  }
335
336  /**
337   * Read the last backup start code (timestamp) of last successful backup. Will return null if
338   * there is no startcode stored in backup system table or the value is of length 0. These two
339   * cases indicate there is no successful backup completed so far.
340   * @return the timestamp of a last successful backup
341   * @throws IOException exception
342   */
343  public String readBackupStartCode() throws IOException {
344    return systemTable.readBackupStartCode(backupInfo.getBackupRootDir());
345  }
346
347  /**
348   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
349   * @param startCode start code
350   * @throws IOException exception
351   */
352  public void writeBackupStartCode(Long startCode) throws IOException {
353    systemTable.writeBackupStartCode(startCode, backupInfo.getBackupRootDir());
354  }
355
356  /**
357   * Get the RS log information after the last log roll from backup system table.
358   * @return RS log info
359   * @throws IOException exception
360   */
361  public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOException {
362    return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
363  }
364
365  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
366    return systemTable.readBulkloadRows(tableList);
367  }
368
369  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
370    systemTable.deleteBulkLoadedRows(rows);
371  }
372
373  /**
374   * Get all completed backup information (in desc order by time)
375   * @return history info of BackupCompleteData
376   * @throws IOException exception
377   */
378  public List<BackupInfo> getBackupHistory() throws IOException {
379    return systemTable.getBackupHistory();
380  }
381
382  public ArrayList<BackupInfo> getBackupHistory(boolean completed) throws IOException {
383    return systemTable.getBackupHistory(completed);
384  }
385
386  /**
387   * Write the current timestamps for each regionserver to backup system table after a successful
388   * full or incremental backup. Each table may have a different set of log timestamps. The saved
389   * timestamp is of the last log file that was backed up already.
390   * @param tables tables
391   * @throws IOException exception
392   */
393  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps)
394    throws IOException {
395    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
396  }
397
398  /**
399   * Read the timestamp for each region server log after the last successful backup. Each table has
400   * its own set of the timestamps.
401   * @return the timestamp for each region server. key: tableName value:
402   *         RegionServer,PreviousTimeStamp
403   * @throws IOException exception
404   */
405  public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
406    return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
407  }
408
409  /**
410   * Return the current tables covered by incremental backup.
411   * @return set of tableNames
412   * @throws IOException exception
413   */
414  public Set<TableName> getIncrementalBackupTableSet() throws IOException {
415    return systemTable.getIncrementalBackupTableSet(backupInfo.getBackupRootDir());
416  }
417
418  /**
419   * Adds set of tables to overall incremental backup table set
420   * @param tables tables
421   * @throws IOException exception
422   */
423  public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOException {
424    systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
425  }
426
427  public Connection getConnection() {
428    return conn;
429  }
430}