001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Field;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.math.BigDecimal;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Objects;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.BackupCopyJob;
034import org.apache.hadoop.hbase.backup.BackupInfo;
035import org.apache.hadoop.hbase.backup.BackupType;
036import org.apache.hadoop.hbase.backup.impl.BackupManager;
037import org.apache.hadoop.hbase.backup.util.BackupUtils;
038import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
039import org.apache.hadoop.io.SequenceFile;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapreduce.Cluster;
042import org.apache.hadoop.mapreduce.Counters;
043import org.apache.hadoop.mapreduce.Job;
044import org.apache.hadoop.mapreduce.JobID;
045import org.apache.hadoop.tools.CopyListingFileStatus;
046import org.apache.hadoop.tools.DistCp;
047import org.apache.hadoop.tools.DistCpConstants;
048import org.apache.hadoop.tools.DistCpOptions;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.zookeeper.KeeperException.NoNodeException;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
056 * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
057 * other is copying for incremental log files, which bases on extending DistCp's function.
058 */
059@InterfaceAudience.Private
060public class MapReduceBackupCopyJob implements BackupCopyJob {
061  public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve";
062  private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class);
063
064  private Configuration conf;
065
066  // Accumulated progress within the whole backup process for the copy operation
067  private float progressDone = 0.1f;
068  private long bytesCopied = 0;
069  private static float INIT_PROGRESS = 0.1f;
070
071  // The percentage of the current copy task within the whole task if multiple time copies are
072  // needed. The default value is 100%, which means only 1 copy task for the whole.
073  private float subTaskPercntgInWholeTask = 1f;
074
075  public MapReduceBackupCopyJob() {
076  }
077
078  @Override
079  public Configuration getConf() {
080    return conf;
081  }
082
083  @Override
084  public void setConf(Configuration conf) {
085    this.conf = conf;
086  }
087
088  /**
089   * Get the current copy task percentage within the whole task if multiple copies are needed.
090   * @return the current copy task percentage
091   */
092  public float getSubTaskPercntgInWholeTask() {
093    return subTaskPercntgInWholeTask;
094  }
095
096  /**
097   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
098   * be called before calling
099   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
100   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
101   */
102  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
103    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
104  }
105
106  static class SnapshotCopy extends ExportSnapshot {
107    private BackupInfo backupInfo;
108    private TableName table;
109
110    public SnapshotCopy(BackupInfo backupInfo, TableName table) {
111      super();
112      this.backupInfo = backupInfo;
113      this.table = table;
114    }
115
116    public TableName getTable() {
117      return this.table;
118    }
119
120    public BackupInfo getBackupInfo() {
121      return this.backupInfo;
122    }
123  }
124
125  /**
126   * Update the ongoing backup with new progress.
127   * @param backupInfo  backup info
128   * @param newProgress progress
129   * @param bytesCopied bytes copied
130   * @throws NoNodeException exception
131   */
132  static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, int newProgress,
133    long bytesCopied) throws IOException {
134    // compose the new backup progress data, using fake number for now
135    String backupProgressData = newProgress + "%";
136
137    backupInfo.setProgress(newProgress);
138    backupManager.updateBackupInfo(backupInfo);
139    LOG.debug("Backup progress data \"" + backupProgressData
140      + "\" has been updated to backup system table for " + backupInfo.getBackupId());
141  }
142
143  /**
144   * Extends DistCp for progress updating to backup system table during backup. Using DistCpV2
145   * (MAPREDUCE-2765). Simply extend it and override execute() method to get the Job reference for
146   * progress updating. Only the argument "src1, [src2, [...]] dst" is supported, no more DistCp
147   * options.
148   */
149
150  class BackupDistCp extends DistCp {
151
152    private BackupInfo backupInfo;
153    private BackupManager backupManager;
154
155    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
156      BackupManager backupManager) throws Exception {
157      super(conf, options);
158      this.backupInfo = backupInfo;
159      this.backupManager = backupManager;
160    }
161
162    @Override
163    public Job execute() throws Exception {
164
165      // reflection preparation for private methods and fields
166      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
167      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
168
169      Field fieldInputOptions = getInputOptionsField(classDistCp);
170      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
171
172      methodCleanup.setAccessible(true);
173      fieldInputOptions.setAccessible(true);
174      fieldSubmitted.setAccessible(true);
175
176      // execute() logic starts here
177      assert fieldInputOptions.get(this) != null;
178
179      Job job = null;
180      try {
181
182        List<Path> srcs = getSourcePaths(fieldInputOptions);
183
184        long totalSrcLgth = 0;
185        for (Path aSrc : srcs) {
186          totalSrcLgth += BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
187        }
188
189        // Async call
190        job = super.execute();
191        // Update the copy progress to system table every 0.5s if progress value changed
192        int progressReportFreq = MapReduceBackupCopyJob.this.getConf()
193          .getInt("hbase.backup.progressreport.frequency", 500);
194        float lastProgress = progressDone;
195        while (!job.isComplete()) {
196          float newProgress =
197            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
198
199          if (newProgress > lastProgress) {
200
201            BigDecimal progressData =
202              new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
203            String newProgressStr = progressData + "%";
204            LOG.info("Progress: " + newProgressStr);
205            updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
206            LOG.debug("Backup progress data updated to backup system table: \"Progress: "
207              + newProgressStr + ".\"");
208            lastProgress = newProgress;
209          }
210          Thread.sleep(progressReportFreq);
211        }
212        // update the progress data after copy job complete
213        float newProgress =
214          progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
215        BigDecimal progressData =
216          new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
217
218        String newProgressStr = progressData + "%";
219        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
220          + " mapProgress: " + job.mapProgress());
221
222        // accumulate the overall backup progress
223        progressDone = newProgress;
224        bytesCopied += totalSrcLgth;
225
226        updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
227        LOG.debug("Backup progress data updated to backup system table: \"Progress: "
228          + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
229      } catch (Throwable t) {
230        LOG.error(t.toString(), t);
231        throw t;
232      }
233
234      String jobID = job.getJobID().toString();
235      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
236
237      LOG.debug(
238        "DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + job.isSuccessful());
239      Counters ctrs = job.getCounters();
240      LOG.debug(Objects.toString(ctrs));
241      if (job.isComplete() && !job.isSuccessful()) {
242        throw new Exception("DistCp job-id: " + jobID + " failed");
243      }
244
245      return job;
246    }
247
248    private Field getInputOptionsField(Class<?> classDistCp) throws IOException {
249      Field f = null;
250      try {
251        f = classDistCp.getDeclaredField("inputOptions");
252      } catch (Exception e) {
253        // Haddop 3
254        try {
255          f = classDistCp.getDeclaredField("context");
256        } catch (NoSuchFieldException | SecurityException e1) {
257          throw new IOException(e1);
258        }
259      }
260      return f;
261    }
262
263    @SuppressWarnings("unchecked")
264    private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException {
265      Object options;
266      try {
267        options = fieldInputOptions.get(this);
268        if (options instanceof DistCpOptions) {
269          return ((DistCpOptions) options).getSourcePaths();
270        } else {
271          // Hadoop 3
272          Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
273          Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
274          methodGetSourcePaths.setAccessible(true);
275
276          return (List<Path>) methodGetSourcePaths.invoke(options);
277        }
278      } catch (IllegalArgumentException | IllegalAccessException | ClassNotFoundException
279        | NoSuchMethodException | SecurityException | InvocationTargetException e) {
280        throw new IOException(e);
281      }
282
283    }
284
285    @Override
286    protected Path createInputFileListing(Job job) throws IOException {
287
288      if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
289        return super.createInputFileListing(job);
290      }
291      long totalBytesExpected = 0;
292      int totalRecords = 0;
293      Path fileListingPath = getFileListingPath();
294      try (SequenceFile.Writer writer = getWriter(fileListingPath)) {
295        List<Path> srcFiles = getSourceFiles();
296        if (srcFiles.size() == 0) {
297          return fileListingPath;
298        }
299        totalRecords = srcFiles.size();
300        FileSystem fs = srcFiles.get(0).getFileSystem(conf);
301        for (Path path : srcFiles) {
302          FileStatus fst = fs.getFileStatus(path);
303          totalBytesExpected += fst.getLen();
304          Text key = getKey(path);
305          writer.append(key, new CopyListingFileStatus(fst));
306        }
307        writer.close();
308
309        // update jobs configuration
310
311        Configuration cfg = job.getConfiguration();
312        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
313        cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
314        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
315      } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
316        | IllegalAccessException | NoSuchMethodException | ClassNotFoundException
317        | InvocationTargetException e) {
318        throw new IOException(e);
319      }
320      return fileListingPath;
321    }
322
323    private Text getKey(Path path) {
324      int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1);
325      int count = 0;
326      String relPath = "";
327      while (count++ < level) {
328        relPath = Path.SEPARATOR + path.getName() + relPath;
329        path = path.getParent();
330      }
331      return new Text(relPath);
332    }
333
334    private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException,
335      IllegalArgumentException, IllegalAccessException, NoSuchMethodException,
336      ClassNotFoundException, InvocationTargetException, IOException {
337      Field options = null;
338      try {
339        options = DistCp.class.getDeclaredField("inputOptions");
340      } catch (NoSuchFieldException | SecurityException e) {
341        options = DistCp.class.getDeclaredField("context");
342      }
343      options.setAccessible(true);
344      return getSourcePaths(options);
345    }
346
347    private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
348      FileSystem fs = pathToListFile.getFileSystem(conf);
349      fs.delete(pathToListFile, false);
350      return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile),
351        SequenceFile.Writer.keyClass(Text.class),
352        SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
353        SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
354    }
355
356  }
357
358  /**
359   * Do backup copy based on different types.
360   * @param context  The backup info
361   * @param conf     The hadoop configuration
362   * @param copyType The backup copy type
363   * @param options  Options for customized ExportSnapshot or DistCp
364   * @throws Exception exception
365   */
366  @Override
367  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
368    BackupType copyType, String[] options) throws IOException {
369    int res = 0;
370
371    try {
372      if (copyType == BackupType.FULL) {
373        SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
374        LOG.debug("Doing SNAPSHOT_COPY");
375        // Make a new instance of conf to be used by the snapshot copy class.
376        snapshotCp.setConf(new Configuration(conf));
377        res = snapshotCp.run(options);
378
379      } else if (copyType == BackupType.INCREMENTAL) {
380        LOG.debug("Doing COPY_TYPE_DISTCP");
381        setSubTaskPercntgInWholeTask(1f);
382
383        BackupDistCp distcp =
384          new BackupDistCp(new Configuration(conf), null, context, backupManager);
385        // Handle a special case where the source file is a single file.
386        // In this case, distcp will not create the target dir. It just take the
387        // target as a file name and copy source file to the target (as a file name).
388        // We need to create the target dir before run distcp.
389        LOG.debug("DistCp options: " + Arrays.toString(options));
390        Path dest = new Path(options[options.length - 1]);
391        String[] newOptions = new String[options.length + 1];
392        System.arraycopy(options, 0, newOptions, 1, options.length);
393        newOptions[0] = "-async"; // run DisCp in async mode
394        FileSystem destfs = dest.getFileSystem(conf);
395        if (!destfs.exists(dest)) {
396          destfs.mkdirs(dest);
397        }
398        res = distcp.run(newOptions);
399      }
400      return res;
401
402    } catch (Exception e) {
403      throw new IOException(e);
404    }
405  }
406
407  @Override
408  public void cancel(String jobId) throws IOException {
409    JobID id = JobID.forName(jobId);
410    Cluster cluster = new Cluster(this.getConf());
411    try {
412      Job job = cluster.getJob(id);
413      if (job == null) {
414        LOG.error("No job found for " + id);
415        // should we throw exception
416        return;
417      }
418      if (job.isComplete() || job.isRetired()) {
419        return;
420      }
421
422      job.killJob();
423      LOG.debug("Killed copy job " + id);
424    } catch (InterruptedException e) {
425      throw new IOException(e);
426    }
427  }
428
429}