001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayDeque;
026import java.util.ArrayList;
027import java.util.Deque;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import org.apache.commons.io.IOUtils;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.RemoteIterator;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.BackupInfo;
041import org.apache.hadoop.hbase.backup.BackupMergeJob;
042import org.apache.hadoop.hbase.backup.HBackupFileSystem;
043import org.apache.hadoop.hbase.backup.impl.BackupManifest;
044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
045import org.apache.hadoop.hbase.backup.util.BackupUtils;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
049import org.apache.hadoop.hbase.util.FSTableDescriptors;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.hadoop.util.Tool;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * MapReduce implementation of {@link BackupMergeJob} Must be initialized with configuration of a
058 * backup destination cluster
059 */
060@InterfaceAudience.Private
061public class MapReduceBackupMergeJob implements BackupMergeJob {
062  public static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupMergeJob.class);
063
064  protected Tool player;
065  protected Configuration conf;
066
067  public MapReduceBackupMergeJob() {
068  }
069
070  @Override
071  public Configuration getConf() {
072    return conf;
073  }
074
075  @Override
076  public void setConf(Configuration conf) {
077    this.conf = conf;
078  }
079
080  @Override
081  public void run(String[] backupIds) throws IOException {
082    String bulkOutputConfKey;
083
084    // TODO : run player on remote cluster
085    player = new MapReduceHFileSplitterJob();
086    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
087    // Player reads all files in arbitrary directory structure and creates
088    // a Map task for each file
089    String bids = StringUtils.join(backupIds, ",");
090
091    if (LOG.isDebugEnabled()) {
092      LOG.debug("Merge backup images " + bids);
093    }
094
095    List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
096    boolean finishedTables = false;
097    Connection conn = ConnectionFactory.createConnection(getConf());
098    BackupSystemTable table = new BackupSystemTable(conn);
099    FileSystem fs = FileSystem.get(getConf());
100
101    try {
102
103      // Get exclusive lock on backup system
104      table.startBackupExclusiveOperation();
105      // Start merge operation
106      table.startMergeOperation(backupIds);
107
108      // Select most recent backup id
109      String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
110
111      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
112
113      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
114      String backupRoot = bInfo.getBackupRootDir();
115
116      for (int i = 0; i < tableNames.length; i++) {
117        LOG.info("Merge backup images for " + tableNames[i]);
118
119        // Find input directories for table
120        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
121        String dirs = StringUtils.join(dirPaths, ",");
122
123        Path bulkOutputPath = BackupUtils.getBulkOutputDir(
124          BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
125        // Delete content if exists
126        if (fs.exists(bulkOutputPath)) {
127          if (!fs.delete(bulkOutputPath, true)) {
128            LOG.warn("Can not delete: " + bulkOutputPath);
129          }
130        }
131        Configuration conf = getConf();
132        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
133        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
134
135        player.setConf(getConf());
136        int result = player.run(playerArgs);
137        if (!succeeded(result)) {
138          throw new IOException("Can not merge backup images for " + dirs
139            + " (check Hadoop/MR and HBase logs). Player return code =" + result);
140        }
141        // Add to processed table list
142        processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
143        LOG.debug("Merge Job finished:" + result);
144      }
145      List<TableName> tableList = toTableNameList(processedTableList);
146      table.updateProcessedTablesForMerge(tableList);
147      finishedTables = true;
148
149      // PHASE 2 (modification of a backup file system)
150      // Move existing mergedBackupId data into tmp directory
151      // we will need it later in case of a failure
152      Path tmpBackupDir =
153        HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId);
154      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
155
156      if (!fs.rename(backupDirPath, tmpBackupDir)) {
157        throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir);
158      } else {
159        LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir);
160      }
161      // Move new data into backup dest
162      for (Pair<TableName, Path> tn : processedTableList) {
163        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
164      }
165      // Update backup manifest
166      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
167      updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
168      // Copy meta files back from tmp to backup dir
169      copyMetaData(fs, tmpBackupDir, backupDirPath);
170      // Delete tmp dir (Rename back during repair)
171      if (!fs.delete(tmpBackupDir, true)) {
172        // WARN and ignore
173        LOG.warn("Could not delete tmp dir: " + tmpBackupDir);
174      }
175      // Delete old data
176      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
177      // Finish merge session
178      table.finishMergeOperation();
179      // Release lock
180      table.finishBackupExclusiveOperation();
181    } catch (RuntimeException e) {
182
183      throw e;
184    } catch (Exception e) {
185      LOG.error(e.toString(), e);
186      if (!finishedTables) {
187        // cleanup bulk directories and finish merge
188        // merge MUST be repeated (no need for repair)
189        cleanupBulkLoadDirs(fs, toPathList(processedTableList));
190        table.finishMergeOperation();
191        table.finishBackupExclusiveOperation();
192        throw new IOException("Backup merge operation failed, you should try it again", e);
193      } else {
194        // backup repair must be run
195        throw new IOException(
196          "Backup merge operation failed, run backup repair tool to restore system's integrity", e);
197      }
198    } finally {
199      table.close();
200      conn.close();
201    }
202  }
203
204  /**
205   * Copy meta data to of a backup session
206   * @param fs            file system
207   * @param tmpBackupDir  temp backup directory, where meta is locaed
208   * @param backupDirPath new path for backup
209   * @throws IOException exception
210   */
211  protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath)
212    throws IOException {
213    RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true);
214    List<Path> toKeep = new ArrayList<Path>();
215    while (it.hasNext()) {
216      Path p = it.next().getPath();
217      if (fs.isDirectory(p)) {
218        continue;
219      }
220      // Keep meta
221      String fileName = p.toString();
222      if (
223        fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
224          || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0
225      ) {
226        toKeep.add(p);
227      }
228    }
229    // Copy meta to destination
230    for (Path p : toKeep) {
231      Path newPath = convertToDest(p, backupDirPath);
232      copyFile(fs, p, newPath);
233    }
234  }
235
236  /**
237   * Copy file in DFS from p to newPath
238   * @param fs      file system
239   * @param p       old path
240   * @param newPath new path
241   * @throws IOException exception
242   */
243  protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException {
244    try (InputStream in = fs.open(p); OutputStream out = fs.create(newPath, true)) {
245      IOUtils.copy(in, out);
246    }
247    boolean exists = fs.exists(newPath);
248    if (!exists) {
249      throw new IOException("Failed to copy meta file to: " + newPath);
250    }
251  }
252
253  /**
254   * Converts path before copying
255   * @param p             path
256   * @param backupDirPath backup root
257   * @return converted path
258   */
259  protected Path convertToDest(Path p, Path backupDirPath) {
260    String backupId = backupDirPath.getName();
261    Deque<String> stack = new ArrayDeque<String>();
262    String name = null;
263    while (true) {
264      name = p.getName();
265      if (!name.equals(backupId)) {
266        stack.push(name);
267        p = p.getParent();
268      } else {
269        break;
270      }
271    }
272    Path newPath = new Path(backupDirPath.toString());
273    while (!stack.isEmpty()) {
274      newPath = new Path(newPath, stack.pop());
275    }
276    return newPath;
277  }
278
279  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
280    ArrayList<Path> list = new ArrayList<>();
281    for (Pair<TableName, Path> p : processedTableList) {
282      list.add(p.getSecond());
283    }
284    return list;
285  }
286
287  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
288    ArrayList<TableName> list = new ArrayList<>();
289    for (Pair<TableName, Path> p : processedTableList) {
290      list.add(p.getFirst());
291    }
292    return list;
293  }
294
295  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
296    for (Path path : pathList) {
297      if (!fs.delete(path, true)) {
298        LOG.warn("Can't delete " + path);
299      }
300    }
301  }
302
303  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
304    List<String> backupsToDelete) throws IllegalArgumentException, IOException {
305    BackupManifest manifest =
306      HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
307    manifest.getBackupImage().removeAncestors(backupsToDelete);
308    // save back
309    manifest.store(conf);
310  }
311
312  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
313    String backupRoot) throws IOException {
314    // Delete from backup system table
315    try (BackupSystemTable table = new BackupSystemTable(conn)) {
316      for (String backupId : backupIds) {
317        table.deleteBackupInfo(backupId);
318      }
319    }
320
321    // Delete from file system
322    for (String backupId : backupIds) {
323      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
324
325      if (!fs.delete(backupDirPath, true)) {
326        LOG.warn("Could not delete " + backupDirPath);
327      }
328    }
329  }
330
331  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
332    List<String> list = new ArrayList<>();
333    for (String id : backupIds) {
334      if (id.equals(mergedBackupId)) {
335        continue;
336      }
337      list.add(id);
338    }
339    return list;
340  }
341
342  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
343    TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException {
344    Path dest =
345      new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
346
347    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
348    for (FileStatus fst : fsts) {
349      if (fst.isDirectory()) {
350        String family = fst.getPath().getName();
351        Path newDst = new Path(dest, family);
352        if (fs.exists(newDst)) {
353          if (!fs.delete(newDst, true)) {
354            throw new IOException("failed to delete :" + newDst);
355          }
356        } else {
357          fs.mkdirs(dest);
358        }
359        boolean result = fs.rename(fst.getPath(), dest);
360        LOG.debug("MoveData from " + fst.getPath() + " to " + dest + " result=" + result);
361      }
362    }
363  }
364
365  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
366    Set<TableName> allSet = new HashSet<>();
367
368    try (Connection conn = ConnectionFactory.createConnection(conf);
369      BackupSystemTable table = new BackupSystemTable(conn)) {
370      for (String backupId : backupIds) {
371        BackupInfo bInfo = table.readBackupInfo(backupId);
372
373        allSet.addAll(bInfo.getTableNames());
374      }
375    }
376
377    TableName[] ret = new TableName[allSet.size()];
378    return allSet.toArray(ret);
379  }
380
381  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
382    String[] backupIds) throws IOException {
383    List<Path> dirs = new ArrayList<>();
384
385    for (String backupId : backupIds) {
386      Path fileBackupDirPath =
387        new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName));
388      if (fs.exists(fileBackupDirPath)) {
389        dirs.add(fileBackupDirPath);
390      } else {
391        if (LOG.isDebugEnabled()) {
392          LOG.debug("File: " + fileBackupDirPath + " does not exist.");
393        }
394      }
395    }
396    Path[] ret = new Path[dirs.size()];
397    return dirs.toArray(ret);
398  }
399}