001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayDeque;
026import java.util.ArrayList;
027import java.util.Deque;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import org.apache.commons.io.IOUtils;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.RemoteIterator;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.BackupInfo;
041import org.apache.hadoop.hbase.backup.BackupMergeJob;
042import org.apache.hadoop.hbase.backup.HBackupFileSystem;
043import org.apache.hadoop.hbase.backup.impl.BackupManifest;
044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
045import org.apache.hadoop.hbase.backup.util.BackupUtils;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
049import org.apache.hadoop.hbase.util.FSTableDescriptors;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.hadoop.util.Tool;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * MapReduce implementation of {@link BackupMergeJob} Must be initialized with configuration of a
058 * backup destination cluster
059 */
060@InterfaceAudience.Private
061public class MapReduceBackupMergeJob implements BackupMergeJob {
062  public static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupMergeJob.class);
063
064  protected Tool player;
065  protected Configuration conf;
066
067  public MapReduceBackupMergeJob() {
068  }
069
070  @Override
071  public Configuration getConf() {
072    return conf;
073  }
074
075  @Override
076  public void setConf(Configuration conf) {
077    this.conf = conf;
078  }
079
080  @Override
081  public void run(String[] backupIds) throws IOException {
082    String bulkOutputConfKey;
083
084    // TODO : run player on remote cluster
085    player = new MapReduceHFileSplitterJob();
086    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
087    // Player reads all files in arbitrary directory structure and creates
088    // a Map task for each file
089    String bids = StringUtils.join(backupIds, ",");
090
091    if (LOG.isDebugEnabled()) {
092      LOG.debug("Merge backup images " + bids);
093    }
094
095    List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
096    boolean finishedTables = false;
097    Connection conn = ConnectionFactory.createConnection(getConf());
098    BackupSystemTable table = new BackupSystemTable(conn);
099    FileSystem fs = null;
100
101    try {
102
103      // Get exclusive lock on backup system
104      table.startBackupExclusiveOperation();
105      // Start merge operation
106      table.startMergeOperation(backupIds);
107
108      // Select most recent backup id
109      String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
110
111      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
112
113      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
114      String backupRoot = bInfo.getBackupRootDir();
115      Path backupRootPath = new Path(backupRoot);
116      fs = backupRootPath.getFileSystem(conf);
117
118      for (int i = 0; i < tableNames.length; i++) {
119        LOG.info("Merge backup images for " + tableNames[i]);
120
121        // Find input directories for table
122        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
123        String dirs = StringUtils.join(dirPaths, ",");
124
125        // bulkOutputPath should be on the same filesystem as backupRoot
126        Path tmpRestoreOutputDir = HBackupFileSystem.getBackupTmpDirPath(backupRoot);
127        Path bulkOutputPath = BackupUtils.getBulkOutputDir(tmpRestoreOutputDir,
128          BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
129        // Delete content if exists
130        if (fs.exists(bulkOutputPath)) {
131          if (!fs.delete(bulkOutputPath, true)) {
132            LOG.warn("Can not delete: " + bulkOutputPath);
133          }
134        }
135        Configuration conf = getConf();
136        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
137        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
138
139        player.setConf(getConf());
140        int result = player.run(playerArgs);
141        if (!succeeded(result)) {
142          throw new IOException("Can not merge backup images for " + dirs
143            + " (check Hadoop/MR and HBase logs). Player return code =" + result);
144        }
145        // Add to processed table list
146        processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
147        LOG.debug("Merge Job finished:" + result);
148      }
149      List<TableName> tableList = toTableNameList(processedTableList);
150      table.updateProcessedTablesForMerge(tableList);
151      finishedTables = true;
152
153      // PHASE 2 (modification of a backup file system)
154      // Move existing mergedBackupId data into tmp directory
155      // we will need it later in case of a failure
156      Path tmpBackupDir =
157        HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId);
158      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
159
160      if (!fs.rename(backupDirPath, tmpBackupDir)) {
161        throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir);
162      } else {
163        LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir);
164      }
165      // Move new data into backup dest
166      for (Pair<TableName, Path> tn : processedTableList) {
167        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
168      }
169      // Update backup manifest
170      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
171      updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
172      // Copy meta files back from tmp to backup dir
173      copyMetaData(fs, tmpBackupDir, backupDirPath);
174      // Delete tmp dir (Rename back during repair)
175      if (!fs.delete(tmpBackupDir, true)) {
176        // WARN and ignore
177        LOG.warn("Could not delete tmp dir: " + tmpBackupDir);
178      }
179      // Delete old data
180      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
181      // Finish merge session
182      table.finishMergeOperation();
183      // Release lock
184      table.finishBackupExclusiveOperation();
185    } catch (RuntimeException e) {
186
187      throw e;
188    } catch (Exception e) {
189      LOG.error(e.toString(), e);
190      if (!finishedTables) {
191        // cleanup bulk directories and finish merge
192        // merge MUST be repeated (no need for repair)
193        if (fs != null) {
194          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
195        }
196        table.finishMergeOperation();
197        table.finishBackupExclusiveOperation();
198        throw new IOException("Backup merge operation failed, you should try it again", e);
199      } else {
200        // backup repair must be run
201        throw new IOException(
202          "Backup merge operation failed, run backup repair tool to restore system's integrity", e);
203      }
204    } finally {
205      table.close();
206      conn.close();
207    }
208  }
209
210  /**
211   * Copy meta data to of a backup session
212   * @param fs            file system
213   * @param tmpBackupDir  temp backup directory, where meta is locaed
214   * @param backupDirPath new path for backup
215   * @throws IOException exception
216   */
217  protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath)
218    throws IOException {
219    RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true);
220    List<Path> toKeep = new ArrayList<Path>();
221    while (it.hasNext()) {
222      Path p = it.next().getPath();
223      if (fs.isDirectory(p)) {
224        continue;
225      }
226      // Keep meta
227      String fileName = p.toString();
228      if (
229        fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
230          || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0
231      ) {
232        toKeep.add(p);
233      }
234    }
235    // Copy meta to destination
236    for (Path p : toKeep) {
237      Path newPath = convertToDest(p, backupDirPath);
238      copyFile(fs, p, newPath);
239    }
240  }
241
242  /**
243   * Copy file in DFS from p to newPath
244   * @param fs      file system
245   * @param p       old path
246   * @param newPath new path
247   * @throws IOException exception
248   */
249  protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException {
250    try (InputStream in = fs.open(p); OutputStream out = fs.create(newPath, true)) {
251      IOUtils.copy(in, out);
252    }
253    boolean exists = fs.exists(newPath);
254    if (!exists) {
255      throw new IOException("Failed to copy meta file to: " + newPath);
256    }
257  }
258
259  /**
260   * Converts path before copying
261   * @param p             path
262   * @param backupDirPath backup root
263   * @return converted path
264   */
265  protected Path convertToDest(Path p, Path backupDirPath) {
266    String backupId = backupDirPath.getName();
267    Deque<String> stack = new ArrayDeque<String>();
268    String name = null;
269    while (true) {
270      name = p.getName();
271      if (!name.equals(backupId)) {
272        stack.push(name);
273        p = p.getParent();
274      } else {
275        break;
276      }
277    }
278    Path newPath = new Path(backupDirPath.toString());
279    while (!stack.isEmpty()) {
280      newPath = new Path(newPath, stack.pop());
281    }
282    return newPath;
283  }
284
285  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
286    ArrayList<Path> list = new ArrayList<>();
287    for (Pair<TableName, Path> p : processedTableList) {
288      list.add(p.getSecond());
289    }
290    return list;
291  }
292
293  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
294    ArrayList<TableName> list = new ArrayList<>();
295    for (Pair<TableName, Path> p : processedTableList) {
296      list.add(p.getFirst());
297    }
298    return list;
299  }
300
301  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
302    for (Path path : pathList) {
303      if (!fs.delete(path, true)) {
304        LOG.warn("Can't delete " + path);
305      }
306    }
307  }
308
309  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
310    List<String> backupsToDelete) throws IllegalArgumentException, IOException {
311    BackupManifest manifest =
312      HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
313    manifest.getBackupImage().removeAncestors(backupsToDelete);
314    // save back
315    manifest.store(conf);
316  }
317
318  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
319    String backupRoot) throws IOException {
320    // Delete from backup system table
321    try (BackupSystemTable table = new BackupSystemTable(conn)) {
322      for (String backupId : backupIds) {
323        table.deleteBackupInfo(backupId);
324      }
325    }
326
327    // Delete from file system
328    for (String backupId : backupIds) {
329      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
330
331      if (!fs.delete(backupDirPath, true)) {
332        LOG.warn("Could not delete " + backupDirPath);
333      }
334    }
335  }
336
337  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
338    List<String> list = new ArrayList<>();
339    for (String id : backupIds) {
340      if (id.equals(mergedBackupId)) {
341        continue;
342      }
343      list.add(id);
344    }
345    return list;
346  }
347
348  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
349    TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException {
350    Path dest =
351      new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
352
353    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
354    for (FileStatus fst : fsts) {
355      if (fst.isDirectory()) {
356        String family = fst.getPath().getName();
357        Path newDst = new Path(dest, family);
358        if (fs.exists(newDst)) {
359          if (!fs.delete(newDst, true)) {
360            throw new IOException("failed to delete :" + newDst);
361          }
362        } else {
363          fs.mkdirs(dest);
364        }
365        boolean result = fs.rename(fst.getPath(), dest);
366        LOG.debug("MoveData from " + fst.getPath() + " to " + dest + " result=" + result);
367      }
368    }
369  }
370
371  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
372    Set<TableName> allSet = new HashSet<>();
373
374    try (Connection conn = ConnectionFactory.createConnection(conf);
375      BackupSystemTable table = new BackupSystemTable(conn)) {
376      for (String backupId : backupIds) {
377        BackupInfo bInfo = table.readBackupInfo(backupId);
378
379        allSet.addAll(bInfo.getTableNames());
380      }
381    }
382
383    TableName[] ret = new TableName[allSet.size()];
384    return allSet.toArray(ret);
385  }
386
387  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
388    String[] backupIds) throws IOException {
389    List<Path> dirs = new ArrayList<>();
390
391    for (String backupId : backupIds) {
392      Path fileBackupDirPath =
393        new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName));
394      if (fs.exists(fileBackupDirPath)) {
395        dirs.add(fileBackupDirPath);
396      } else {
397        if (LOG.isDebugEnabled()) {
398          LOG.debug("File: " + fileBackupDirPath + " does not exist.");
399        }
400      }
401    }
402    Path[] ret = new Path[dirs.size()];
403    return dirs.toArray(ret);
404  }
405}