001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Field;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.math.BigDecimal;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Objects;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.backup.BackupCopyJob;
035import org.apache.hadoop.hbase.backup.BackupInfo;
036import org.apache.hadoop.hbase.backup.BackupType;
037import org.apache.hadoop.hbase.backup.impl.BackupManager;
038import org.apache.hadoop.hbase.backup.util.BackupUtils;
039import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
040import org.apache.hadoop.io.SequenceFile;
041import org.apache.hadoop.io.Text;
042import org.apache.hadoop.mapreduce.Cluster;
043import org.apache.hadoop.mapreduce.Counters;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.mapreduce.JobID;
046import org.apache.hadoop.tools.CopyListingFileStatus;
047import org.apache.hadoop.tools.DistCp;
048import org.apache.hadoop.tools.DistCpConstants;
049import org.apache.hadoop.tools.DistCpOptionSwitch;
050import org.apache.hadoop.tools.DistCpOptions;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
057 * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
058 * other is copying for incremental log files, which bases on extending DistCp's function.
059 */
060@InterfaceAudience.Private
061public class MapReduceBackupCopyJob implements BackupCopyJob {
062  public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve";
063
064  // This prefix specifies the DistCp options to be used during backup copy
065  public static final String BACKUP_COPY_OPTION_PREFIX = "hbase.backup.copy.";
066
067  private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class);
068
069  private Configuration conf;
070
071  // Accumulated progress within the whole backup process for the copy operation
072  private float progressDone = 0.1f;
073  private long bytesCopied = 0;
074  private static float INIT_PROGRESS = 0.1f;
075
076  // The percentage of the current copy task within the whole task if multiple time copies are
077  // needed. The default value is 100%, which means only 1 copy task for the whole.
078  private float subTaskPercntgInWholeTask = 1f;
079
080  public MapReduceBackupCopyJob() {
081  }
082
083  @Override
084  public Configuration getConf() {
085    return conf;
086  }
087
088  @Override
089  public void setConf(Configuration conf) {
090    this.conf = conf;
091  }
092
093  /**
094   * Get the current copy task percentage within the whole task if multiple copies are needed.
095   * @return the current copy task percentage
096   */
097  public float getSubTaskPercntgInWholeTask() {
098    return subTaskPercntgInWholeTask;
099  }
100
101  /**
102   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
103   * be called before calling
104   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
105   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
106   */
107  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
108    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
109  }
110
111  static class SnapshotCopy extends ExportSnapshot {
112    private BackupInfo backupInfo;
113    private TableName table;
114
115    public SnapshotCopy(BackupInfo backupInfo, TableName table) {
116      super();
117      this.backupInfo = backupInfo;
118      this.table = table;
119    }
120
121    public TableName getTable() {
122      return this.table;
123    }
124
125    public BackupInfo getBackupInfo() {
126      return this.backupInfo;
127    }
128  }
129
130  /**
131   * Update the ongoing backup with new progress.
132   * @param backupInfo  backup info
133   * @param newProgress progress
134   * @param bytesCopied bytes copied
135   * @throws IOException exception
136   */
137  static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, int newProgress,
138    long bytesCopied) throws IOException {
139    // compose the new backup progress data, using fake number for now
140    String backupProgressData = newProgress + "%";
141
142    backupInfo.setProgress(newProgress);
143    backupManager.updateBackupInfo(backupInfo);
144    LOG.debug("Backup progress data \"" + backupProgressData
145      + "\" has been updated to backup system table for " + backupInfo.getBackupId());
146  }
147
148  /**
149   * Extends DistCp for progress updating to backup system table during backup. Using DistCpV2
150   * (MAPREDUCE-2765). Simply extend it and override execute() method to get the Job reference for
151   * progress updating. Only the argument "src1, [src2, [...]] dst" is supported, no more DistCp
152   * options.
153   */
154
155  class BackupDistCp extends DistCp {
156
157    private BackupInfo backupInfo;
158    private BackupManager backupManager;
159
160    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
161      BackupManager backupManager) throws Exception {
162      super(conf, options);
163      this.backupInfo = backupInfo;
164      this.backupManager = backupManager;
165    }
166
167    @Override
168    public Job execute() throws Exception {
169
170      // reflection preparation for private methods and fields
171      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
172      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
173
174      Field fieldInputOptions = getInputOptionsField(classDistCp);
175      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
176
177      methodCleanup.setAccessible(true);
178      fieldInputOptions.setAccessible(true);
179      fieldSubmitted.setAccessible(true);
180
181      // execute() logic starts here
182      assert fieldInputOptions.get(this) != null;
183
184      Job job = null;
185      try {
186
187        List<Path> srcs = getSourcePaths(fieldInputOptions);
188
189        long totalSrcLgth = 0;
190        for (Path aSrc : srcs) {
191          totalSrcLgth += BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
192        }
193
194        // Async call
195        job = super.execute();
196        // Update the copy progress to system table every 0.5s if progress value changed
197        int progressReportFreq = MapReduceBackupCopyJob.this.getConf()
198          .getInt("hbase.backup.progressreport.frequency", 500);
199        float lastProgress = progressDone;
200        while (!job.isComplete()) {
201          float newProgress =
202            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
203
204          if (newProgress > lastProgress) {
205
206            BigDecimal progressData =
207              new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
208            String newProgressStr = progressData + "%";
209            LOG.info("Progress: " + newProgressStr);
210            updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
211            LOG.debug("Backup progress data updated to backup system table: \"Progress: "
212              + newProgressStr + ".\"");
213            lastProgress = newProgress;
214          }
215          Thread.sleep(progressReportFreq);
216        }
217        // update the progress data after copy job complete
218        float newProgress =
219          progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
220        BigDecimal progressData =
221          new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
222
223        String newProgressStr = progressData + "%";
224        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
225          + " mapProgress: " + job.mapProgress());
226
227        // accumulate the overall backup progress
228        progressDone = newProgress;
229        bytesCopied += totalSrcLgth;
230
231        updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
232        LOG.debug("Backup progress data updated to backup system table: \"Progress: "
233          + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
234      } catch (Throwable t) {
235        LOG.error(t.toString(), t);
236        throw t;
237      }
238
239      String jobID = job.getJobID().toString();
240      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
241
242      LOG.debug(
243        "DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + job.isSuccessful());
244      Counters ctrs = job.getCounters();
245      LOG.debug(Objects.toString(ctrs));
246      if (job.isComplete() && !job.isSuccessful()) {
247        throw new Exception("DistCp job-id: " + jobID + " failed");
248      }
249
250      return job;
251    }
252
253    private Field getInputOptionsField(Class<?> classDistCp) throws IOException {
254      Field f = null;
255      try {
256        f = classDistCp.getDeclaredField("inputOptions");
257      } catch (Exception e) {
258        // Haddop 3
259        try {
260          f = classDistCp.getDeclaredField("context");
261        } catch (NoSuchFieldException | SecurityException e1) {
262          throw new IOException(e1);
263        }
264      }
265      return f;
266    }
267
268    @SuppressWarnings("unchecked")
269    private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException {
270      Object options;
271      try {
272        options = fieldInputOptions.get(this);
273        if (options instanceof DistCpOptions) {
274          return ((DistCpOptions) options).getSourcePaths();
275        } else {
276          // Hadoop 3
277          Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
278          Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
279          methodGetSourcePaths.setAccessible(true);
280
281          return (List<Path>) methodGetSourcePaths.invoke(options);
282        }
283      } catch (IllegalArgumentException | IllegalAccessException | ClassNotFoundException
284        | NoSuchMethodException | SecurityException | InvocationTargetException e) {
285        throw new IOException(e);
286      }
287
288    }
289
290    @Override
291    protected Path createInputFileListing(Job job) throws IOException {
292
293      if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
294        return super.createInputFileListing(job);
295      }
296      long totalBytesExpected = 0;
297      int totalRecords = 0;
298      Path fileListingPath = getFileListingPath();
299      try (SequenceFile.Writer writer = getWriter(fileListingPath)) {
300        List<Path> srcFiles = getSourceFiles();
301        if (srcFiles.size() == 0) {
302          return fileListingPath;
303        }
304        totalRecords = srcFiles.size();
305        FileSystem fs = srcFiles.get(0).getFileSystem(conf);
306        for (Path path : srcFiles) {
307          FileStatus fst = fs.getFileStatus(path);
308          totalBytesExpected += fst.getLen();
309          Text key = getKey(path);
310          writer.append(key, new CopyListingFileStatus(fst));
311        }
312        writer.close();
313
314        // update jobs configuration
315
316        Configuration cfg = job.getConfiguration();
317        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
318        cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
319        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
320      } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
321        | IllegalAccessException | NoSuchMethodException | ClassNotFoundException
322        | InvocationTargetException e) {
323        throw new IOException(e);
324      }
325      return fileListingPath;
326    }
327
328    private Text getKey(Path path) {
329      int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1);
330      int count = 0;
331      String relPath = "";
332      while (count++ < level) {
333        relPath = Path.SEPARATOR + path.getName() + relPath;
334        path = path.getParent();
335      }
336      return new Text(relPath);
337    }
338
339    private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException,
340      IllegalArgumentException, IllegalAccessException, NoSuchMethodException,
341      ClassNotFoundException, InvocationTargetException, IOException {
342      Field options = null;
343      try {
344        options = DistCp.class.getDeclaredField("inputOptions");
345      } catch (NoSuchFieldException | SecurityException e) {
346        options = DistCp.class.getDeclaredField("context");
347      }
348      options.setAccessible(true);
349      return getSourcePaths(options);
350    }
351
352    private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
353      FileSystem fs = pathToListFile.getFileSystem(conf);
354      fs.delete(pathToListFile, false);
355      return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile),
356        SequenceFile.Writer.keyClass(Text.class),
357        SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
358        SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
359    }
360
361  }
362
363  /**
364   * Do backup copy based on different types.
365   * @param context  The backup info
366   * @param conf     The hadoop configuration
367   * @param copyType The backup copy type
368   * @param options  Options for customized ExportSnapshot or DistCp
369   * @throws IOException exception
370   */
371  @Override
372  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
373    BackupType copyType, String[] options) throws IOException {
374    int res = 0;
375
376    try {
377      if (copyType == BackupType.FULL) {
378        SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
379        LOG.debug("Doing SNAPSHOT_COPY");
380        // Make a new instance of conf to be used by the snapshot copy class.
381        snapshotCp.setConf(new Configuration(conf));
382        res = snapshotCp.run(options);
383
384      } else if (copyType == BackupType.INCREMENTAL) {
385        LOG.debug("Doing COPY_TYPE_DISTCP");
386        setSubTaskPercntgInWholeTask(1f);
387
388        BackupDistCp distcp =
389          new BackupDistCp(new Configuration(conf), null, context, backupManager);
390        // Handle a special case where the source file is a single file.
391        // In this case, distcp will not create the target dir. It just take the
392        // target as a file name and copy source file to the target (as a file name).
393        // We need to create the target dir before run distcp.
394        LOG.debug("DistCp options: " + Arrays.toString(options));
395        Path dest = new Path(options[options.length - 1]);
396        String[] newOptions = new String[options.length + 1];
397        System.arraycopy(options, 0, newOptions, 1, options.length);
398        newOptions[0] = "-async"; // run DisCp in async mode
399        FileSystem destfs = dest.getFileSystem(conf);
400        if (!destfs.exists(dest)) {
401          destfs.mkdirs(dest);
402        }
403
404        List<String> distCpOptionsFromConf = parseDistCpOptions(conf);
405        String[] finalOptions = new String[newOptions.length + distCpOptionsFromConf.size()];
406        for (int i = 0; i < distCpOptionsFromConf.size(); i++) {
407          finalOptions[i] = distCpOptionsFromConf.get(i);
408        }
409        System.arraycopy(newOptions, 0, finalOptions, distCpOptionsFromConf.size(),
410          newOptions.length);
411        res = distcp.run(finalOptions);
412      }
413      return res;
414
415    } catch (Exception e) {
416      throw new IOException(e);
417    }
418  }
419
420  @Override
421  public void cancel(String jobId) throws IOException {
422    JobID id = JobID.forName(jobId);
423    Cluster cluster = new Cluster(this.getConf());
424    try {
425      Job job = cluster.getJob(id);
426      if (job == null) {
427        LOG.error("No job found for " + id);
428        // should we throw exception
429        return;
430      }
431      if (job.isComplete() || job.isRetired()) {
432        return;
433      }
434
435      job.killJob();
436      LOG.debug("Killed copy job " + id);
437    } catch (InterruptedException e) {
438      throw new IOException(e);
439    }
440  }
441
442  protected static List<String> parseDistCpOptions(Configuration conf) {
443    List<String> extraArgsFromConf = new ArrayList<>();
444
445    for (DistCpOptionSwitch optionSwitch : DistCpOptionSwitch.values()) {
446      String configLabel = BACKUP_COPY_OPTION_PREFIX + optionSwitch.getConfigLabel();
447      if (conf.get(configLabel) != null) {
448        if (optionSwitch.getOption().hasArg()) {
449          extraArgsFromConf.add("-" + optionSwitch.getOption().getOpt());
450          extraArgsFromConf.add(conf.get(configLabel));
451        } else {
452          boolean value = conf.getBoolean(configLabel, false);
453          if (value) {
454            extraArgsFromConf.add("-" + optionSwitch.getOption().getOpt());
455          }
456        }
457      }
458    }
459
460    return extraArgsFromConf;
461  }
462
463}