001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
021
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.TreeMap;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.backup.BackupCopyJob;
035import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
036import org.apache.hadoop.hbase.backup.BackupRequest;
037import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
038import org.apache.hadoop.hbase.backup.BackupType;
039import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
040import org.apache.hadoop.hbase.backup.util.BackupUtils;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.mapreduce.WALPlayer;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.CommonFSUtils;
046import org.apache.hadoop.hbase.util.HFileArchiveUtil;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
049import org.apache.hadoop.util.Tool;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Incremental backup implementation. See the {@link #execute() execute} method.
056 */
057@InterfaceAudience.Private
058public class IncrementalTableBackupClient extends TableBackupClient {
059  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class);
060
061  protected IncrementalTableBackupClient() {
062  }
063
064  public IncrementalTableBackupClient(final Connection conn, final String backupId,
065    BackupRequest request) throws IOException {
066    super(conn, backupId, request);
067  }
068
069  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
070    List<String> list = new ArrayList<>();
071    for (String file : incrBackupFileList) {
072      Path p = new Path(file);
073      if (fs.exists(p) || isActiveWalPath(p)) {
074        list.add(file);
075      } else {
076        LOG.warn("Can't find file: " + file);
077      }
078    }
079    return list;
080  }
081
082  /**
083   * Check if a given path is belongs to active WAL directory
084   * @param p path
085   * @return true, if yes
086   */
087  protected boolean isActiveWalPath(Path p) {
088    return !AbstractFSWALProvider.isArchivedLogFile(p);
089  }
090
091  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
092    if (sTableList == null) {
093      return 0;
094    }
095
096    for (int i = 0; i < sTableList.size(); i++) {
097      if (tbl.equals(sTableList.get(i))) {
098        return i;
099      }
100    }
101    return -1;
102  }
103
104  /*
105   * Reads bulk load records from backup table, iterates through the records and forms the paths for
106   * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
107   * @param sTableList list of tables to be backed up
108   * @return map of table to List of files
109   */
110  @SuppressWarnings("unchecked")
111  protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList)
112    throws IOException {
113    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
114    List<String> activeFiles = new ArrayList<>();
115    List<String> archiveFiles = new ArrayList<>();
116    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
117      backupManager.readBulkloadRows(sTableList);
118    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
119    FileSystem tgtFs;
120    try {
121      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
122    } catch (URISyntaxException use) {
123      throw new IOException("Unable to get FileSystem", use);
124    }
125    Path rootdir = CommonFSUtils.getRootDir(conf);
126    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
127
128    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : map
129      .entrySet()) {
130      TableName srcTable = tblEntry.getKey();
131
132      int srcIdx = getIndex(srcTable, sTableList);
133      if (srcIdx < 0) {
134        LOG.warn("Couldn't find " + srcTable + " in source table List");
135        continue;
136      }
137      if (mapForSrc[srcIdx] == null) {
138        mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
139      }
140      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
141      Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
142        srcTable.getQualifierAsString());
143      for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> regionEntry : tblEntry
144        .getValue().entrySet()) {
145        String regionName = regionEntry.getKey();
146        Path regionDir = new Path(tblDir, regionName);
147        // map from family to List of hfiles
148        for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : regionEntry.getValue()
149          .entrySet()) {
150          String fam = famEntry.getKey();
151          Path famDir = new Path(regionDir, fam);
152          List<Path> files;
153          if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) {
154            files = new ArrayList<>();
155            mapForSrc[srcIdx].put(Bytes.toBytes(fam), files);
156          } else {
157            files = mapForSrc[srcIdx].get(Bytes.toBytes(fam));
158          }
159          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
160          String tblName = srcTable.getQualifierAsString();
161          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
162          if (!tgtFs.mkdirs(tgtFam)) {
163            throw new IOException("couldn't create " + tgtFam);
164          }
165          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
166            String file = fileWithState.getFirst();
167            int idx = file.lastIndexOf("/");
168            String filename = file;
169            if (idx > 0) {
170              filename = file.substring(idx + 1);
171            }
172            Path p = new Path(famDir, filename);
173            Path tgt = new Path(tgtFam, filename);
174            Path archive = new Path(archiveDir, filename);
175            if (fs.exists(p)) {
176              if (LOG.isTraceEnabled()) {
177                LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
178              }
179              if (LOG.isTraceEnabled()) {
180                LOG.trace("copying " + p + " to " + tgt);
181              }
182              activeFiles.add(p.toString());
183            } else if (fs.exists(archive)) {
184              LOG.debug("copying archive " + archive + " to " + tgt);
185              archiveFiles.add(archive.toString());
186            }
187            files.add(tgt);
188          }
189        }
190      }
191    }
192
193    copyBulkLoadedFiles(activeFiles, archiveFiles);
194    backupManager.deleteBulkLoadedRows(pair.getSecond());
195    return mapForSrc;
196  }
197
198  private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
199    throws IOException {
200    try {
201      // Enable special mode of BackupDistCp
202      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
203      // Copy active files
204      String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
205      int attempt = 1;
206      while (activeFiles.size() > 0) {
207        LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++);
208        String[] toCopy = new String[activeFiles.size()];
209        activeFiles.toArray(toCopy);
210        // Active file can be archived during copy operation,
211        // we need to handle this properly
212        try {
213          incrementalCopyHFiles(toCopy, tgtDest);
214          break;
215        } catch (IOException e) {
216          // Check if some files got archived
217          // Update active and archived lists
218          // When file is being moved from active to archive
219          // directory, the number of active files decreases
220          int numOfActive = activeFiles.size();
221          updateFileLists(activeFiles, archiveFiles);
222          if (activeFiles.size() < numOfActive) {
223            continue;
224          }
225          // if not - throw exception
226          throw e;
227        }
228      }
229      // If incremental copy will fail for archived files
230      // we will have partially loaded files in backup destination (only files from active data
231      // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
232      if (archiveFiles.size() > 0) {
233        String[] toCopy = new String[archiveFiles.size()];
234        archiveFiles.toArray(toCopy);
235        incrementalCopyHFiles(toCopy, tgtDest);
236      }
237    } finally {
238      // Disable special mode of BackupDistCp
239      conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
240    }
241  }
242
243  private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
244    throws IOException {
245    List<String> newlyArchived = new ArrayList<>();
246
247    for (String spath : activeFiles) {
248      if (!fs.exists(new Path(spath))) {
249        newlyArchived.add(spath);
250      }
251    }
252
253    if (newlyArchived.size() > 0) {
254      activeFiles.removeAll(newlyArchived);
255      archiveFiles.addAll(newlyArchived);
256    }
257
258    LOG.debug(newlyArchived.size() + " files have been archived.");
259  }
260
261  @Override
262  public void execute() throws IOException {
263    try {
264      // case PREPARE_INCREMENTAL:
265      beginBackup(backupManager, backupInfo);
266      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
267      LOG.debug("For incremental backup, current table set is "
268        + backupManager.getIncrementalBackupTableSet());
269      newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
270    } catch (Exception e) {
271      // fail the overall backup and return
272      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
273        BackupType.INCREMENTAL, conf);
274      throw new IOException(e);
275    }
276
277    // case INCREMENTAL_COPY:
278    try {
279      // copy out the table and region info files for each table
280      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
281      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
282      convertWALsToHFiles();
283      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
284        backupInfo.getBackupRootDir());
285    } catch (Exception e) {
286      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
287      // fail the overall backup and return
288      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
289      throw new IOException(e);
290    }
291    // case INCR_BACKUP_COMPLETE:
292    // set overall backup status: complete. Here we make sure to complete the backup.
293    // After this checkpoint, even if entering cancel process, will let the backup finished
294    try {
295      // Set the previousTimestampMap which is before this current log roll to the manifest.
296      Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
297      backupInfo.setIncrTimestampMap(previousTimestampMap);
298
299      // The table list in backupInfo is good for both full backup and incremental backup.
300      // For incremental backup, it contains the incremental backup table set.
301      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
302
303      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
304        backupManager.readLogTimestampMap();
305
306      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
307      Long newStartCode =
308        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
309      backupManager.writeBackupStartCode(newStartCode);
310
311      handleBulkLoad(backupInfo.getTableNames());
312      // backup complete
313      completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
314
315    } catch (IOException e) {
316      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
317        BackupType.INCREMENTAL, conf);
318      throw new IOException(e);
319    }
320  }
321
322  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
323    try {
324      LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
325      // set overall backup phase: incremental_copy
326      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
327      // get incremental backup file list and prepare parms for DistCp
328      String[] strArr = new String[files.length + 1];
329      System.arraycopy(files, 0, strArr, 0, files.length);
330      strArr[strArr.length - 1] = backupDest;
331
332      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
333      if (LOG.isDebugEnabled()) {
334        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
335      }
336      conf.set(JOB_NAME_CONF_KEY, jobname);
337
338      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
339      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
340      if (res != 0) {
341        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
342        throw new IOException(
343          "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest);
344      }
345      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest
346        + " finished.");
347    } finally {
348      deleteBulkLoadDirectory();
349    }
350  }
351
352  protected void deleteBulkLoadDirectory() throws IOException {
353    // delete original bulk load directory on method exit
354    Path path = getBulkOutputDir();
355    FileSystem fs = FileSystem.get(path.toUri(), conf);
356    boolean result = fs.delete(path, true);
357    if (!result) {
358      LOG.warn("Could not delete " + path);
359    }
360  }
361
362  protected void convertWALsToHFiles() throws IOException {
363    // get incremental backup file list and prepare parameters for DistCp
364    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
365    // Get list of tables in incremental backup set
366    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
367    // filter missing files out (they have been copied by previous backups)
368    incrBackupFileList = filterMissingFiles(incrBackupFileList);
369    List<String> tableList = new ArrayList<String>();
370    for (TableName table : tableSet) {
371      // Check if table exists
372      if (tableExists(table, conn)) {
373        tableList.add(table.getNameAsString());
374      } else {
375        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
376      }
377    }
378    walToHFiles(incrBackupFileList, tableList);
379
380  }
381
382  protected boolean tableExists(TableName table, Connection conn) throws IOException {
383    try (Admin admin = conn.getAdmin()) {
384      return admin.tableExists(table);
385    }
386  }
387
388  protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
389    Tool player = new WALPlayer();
390
391    // Player reads all files in arbitrary directory structure and creates
392    // a Map task for each file. We use ';' as separator
393    // because WAL file names contains ','
394    String dirs = StringUtils.join(dirPaths, ';');
395    String jobname = "Incremental_Backup-" + backupId;
396
397    Path bulkOutputPath = getBulkOutputDir();
398    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
399    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
400    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
401    conf.set(JOB_NAME_CONF_KEY, jobname);
402    String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
403
404    try {
405      player.setConf(conf);
406      int result = player.run(playerArgs);
407      if (result != 0) {
408        throw new IOException("WAL Player failed");
409      }
410      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
411      conf.unset(JOB_NAME_CONF_KEY);
412    } catch (IOException e) {
413      throw e;
414    } catch (Exception ee) {
415      throw new IOException("Can not convert from directory " + dirs
416        + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
417    }
418  }
419
420  protected Path getBulkOutputDirForTable(TableName table) {
421    Path tablePath = getBulkOutputDir();
422    tablePath = new Path(tablePath, table.getNamespaceAsString());
423    tablePath = new Path(tablePath, table.getQualifierAsString());
424    return new Path(tablePath, "data");
425  }
426
427  protected Path getBulkOutputDir() {
428    String backupId = backupInfo.getBackupId();
429    Path path = new Path(backupInfo.getBackupRootDir());
430    path = new Path(path, ".tmp");
431    path = new Path(path, backupId);
432    return path;
433  }
434}