001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Field;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.math.BigDecimal;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Objects;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.backup.BackupCopyJob;
035import org.apache.hadoop.hbase.backup.BackupInfo;
036import org.apache.hadoop.hbase.backup.BackupType;
037import org.apache.hadoop.hbase.backup.impl.BackupManager;
038import org.apache.hadoop.hbase.backup.util.BackupUtils;
039import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
040import org.apache.hadoop.io.SequenceFile;
041import org.apache.hadoop.io.Text;
042import org.apache.hadoop.mapreduce.Cluster;
043import org.apache.hadoop.mapreduce.Counters;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.mapreduce.JobID;
046import org.apache.hadoop.tools.CopyListingFileStatus;
047import org.apache.hadoop.tools.DistCp;
048import org.apache.hadoop.tools.DistCpConstants;
049import org.apache.hadoop.tools.DistCpOptions;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.apache.zookeeper.KeeperException.NoNodeException;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
057 * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
058 * other is copying for incremental log files, which bases on extending DistCp's function.
059 */
060@InterfaceAudience.Private
061public class MapReduceBackupCopyJob implements BackupCopyJob {
062  public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve";
063  private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class);
064
065  private Configuration conf;
066
067  // Accumulated progress within the whole backup process for the copy operation
068  private float progressDone = 0.1f;
069  private long bytesCopied = 0;
070  private static float INIT_PROGRESS = 0.1f;
071
072  // The percentage of the current copy task within the whole task if multiple time copies are
073  // needed. The default value is 100%, which means only 1 copy task for the whole.
074  private float subTaskPercntgInWholeTask = 1f;
075
076  public MapReduceBackupCopyJob() {
077  }
078
079  @Override
080  public Configuration getConf() {
081    return conf;
082  }
083
084  @Override
085  public void setConf(Configuration conf) {
086    this.conf = conf;
087  }
088
089  /**
090   * Get the current copy task percentage within the whole task if multiple copies are needed.
091   * @return the current copy task percentage
092   */
093  public float getSubTaskPercntgInWholeTask() {
094    return subTaskPercntgInWholeTask;
095  }
096
097  /**
098   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
099   * be called before calling
100   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
101   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
102   */
103  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
104    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
105  }
106
107  static class SnapshotCopy extends ExportSnapshot {
108    private BackupInfo backupInfo;
109    private TableName table;
110
111    public SnapshotCopy(BackupInfo backupInfo, TableName table) {
112      super();
113      this.backupInfo = backupInfo;
114      this.table = table;
115    }
116
117    public TableName getTable() {
118      return this.table;
119    }
120
121    public BackupInfo getBackupInfo() {
122      return this.backupInfo;
123    }
124  }
125
126  /**
127   * Update the ongoing backup with new progress.
128   * @param backupInfo backup info
129   * @param newProgress progress
130   * @param bytesCopied bytes copied
131   * @throws NoNodeException exception
132   */
133  static void updateProgress(BackupInfo backupInfo, BackupManager backupManager,
134      int newProgress, long bytesCopied) throws IOException {
135    // compose the new backup progress data, using fake number for now
136    String backupProgressData = newProgress + "%";
137
138    backupInfo.setProgress(newProgress);
139    backupManager.updateBackupInfo(backupInfo);
140    LOG.debug("Backup progress data \"" + backupProgressData
141        + "\" has been updated to backup system table for " + backupInfo.getBackupId());
142  }
143
144  /**
145   * Extends DistCp for progress updating to backup system table
146   * during backup. Using DistCpV2 (MAPREDUCE-2765).
147   * Simply extend it and override execute() method to get the
148   * Job reference for progress updating.
149   * Only the argument "src1, [src2, [...]] dst" is supported,
150   * no more DistCp options.
151   */
152
153  class BackupDistCp extends DistCp {
154
155    private BackupInfo backupInfo;
156    private BackupManager backupManager;
157
158    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
159        BackupManager backupManager) throws Exception {
160      super(conf, options);
161      this.backupInfo = backupInfo;
162      this.backupManager = backupManager;
163    }
164
165
166
167    @Override
168    public Job execute() throws Exception {
169
170      // reflection preparation for private methods and fields
171      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
172      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
173
174      Field fieldInputOptions = getInputOptionsField(classDistCp);
175      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
176
177      methodCleanup.setAccessible(true);
178      fieldInputOptions.setAccessible(true);
179      fieldSubmitted.setAccessible(true);
180
181      // execute() logic starts here
182      assert fieldInputOptions.get(this) != null;
183
184      Job job = null;
185      try {
186
187        List<Path> srcs = getSourcePaths(fieldInputOptions);
188
189        long totalSrcLgth = 0;
190        for (Path aSrc : srcs) {
191          totalSrcLgth +=
192              BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
193        }
194
195        // Async call
196        job = super.execute();
197        // Update the copy progress to system table every 0.5s if progress value changed
198        int progressReportFreq =
199            MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
200              500);
201        float lastProgress = progressDone;
202        while (!job.isComplete()) {
203          float newProgress =
204              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
205
206          if (newProgress > lastProgress) {
207
208            BigDecimal progressData =
209                new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
210            String newProgressStr = progressData + "%";
211            LOG.info("Progress: " + newProgressStr);
212            updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
213            LOG.debug("Backup progress data updated to backup system table: \"Progress: "
214                + newProgressStr + ".\"");
215            lastProgress = newProgress;
216          }
217          Thread.sleep(progressReportFreq);
218        }
219        // update the progress data after copy job complete
220        float newProgress =
221            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
222        BigDecimal progressData =
223            new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
224
225        String newProgressStr = progressData + "%";
226        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
227            + " mapProgress: " + job.mapProgress());
228
229        // accumulate the overall backup progress
230        progressDone = newProgress;
231        bytesCopied += totalSrcLgth;
232
233        updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
234        LOG.debug("Backup progress data updated to backup system table: \"Progress: "
235            + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
236      } catch (Throwable t) {
237        LOG.error(t.toString(), t);
238        throw t;
239      }
240
241      String jobID = job.getJobID().toString();
242      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
243
244      LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " "
245          + job.isSuccessful());
246      Counters ctrs = job.getCounters();
247      LOG.debug(Objects.toString(ctrs));
248      if (job.isComplete() && !job.isSuccessful()) {
249        throw new Exception("DistCp job-id: " + jobID + " failed");
250      }
251
252      return job;
253    }
254
255    private Field getInputOptionsField(Class<?> classDistCp) throws IOException{
256      Field f = null;
257      try {
258        f = classDistCp.getDeclaredField("inputOptions");
259      } catch(Exception e) {
260        // Haddop 3
261        try {
262          f = classDistCp.getDeclaredField("context");
263        } catch (NoSuchFieldException | SecurityException e1) {
264          throw new IOException(e1);
265        }
266      }
267      return f;
268    }
269
270    @SuppressWarnings("unchecked")
271    private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException{
272      Object options;
273      try {
274        options = fieldInputOptions.get(this);
275        if (options instanceof DistCpOptions) {
276          return ((DistCpOptions) options).getSourcePaths();
277        } else {
278          // Hadoop 3
279          Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
280          Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
281          methodGetSourcePaths.setAccessible(true);
282
283          return (List<Path>) methodGetSourcePaths.invoke(options);
284        }
285      } catch (IllegalArgumentException | IllegalAccessException |
286                ClassNotFoundException | NoSuchMethodException |
287                SecurityException | InvocationTargetException e) {
288        throw new IOException(e);
289      }
290
291    }
292
293    @Override
294    protected Path createInputFileListing(Job job) throws IOException {
295
296      if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
297        return super.createInputFileListing(job);
298      }
299      long totalBytesExpected = 0;
300      int totalRecords = 0;
301      Path fileListingPath = getFileListingPath();
302      try (SequenceFile.Writer writer = getWriter(fileListingPath)) {
303        List<Path> srcFiles = getSourceFiles();
304        if (srcFiles.size() == 0) {
305          return fileListingPath;
306        }
307        totalRecords = srcFiles.size();
308        FileSystem fs = srcFiles.get(0).getFileSystem(conf);
309        for (Path path : srcFiles) {
310          FileStatus fst = fs.getFileStatus(path);
311          totalBytesExpected += fst.getLen();
312          Text key = getKey(path);
313          writer.append(key, new CopyListingFileStatus(fst));
314        }
315        writer.close();
316
317        // update jobs configuration
318
319        Configuration cfg = job.getConfiguration();
320        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
321        cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
322        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
323      } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
324          | IllegalAccessException | NoSuchMethodException | ClassNotFoundException
325          | InvocationTargetException e) {
326        throw new IOException(e);
327      }
328      return fileListingPath;
329    }
330
331    private Text getKey(Path path) {
332      int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1);
333      int count = 0;
334      String relPath = "";
335      while (count++ < level) {
336        relPath = Path.SEPARATOR + path.getName() + relPath;
337        path = path.getParent();
338      }
339      return new Text(relPath);
340    }
341
342    private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException,
343        IllegalArgumentException, IllegalAccessException, NoSuchMethodException,
344        ClassNotFoundException, InvocationTargetException, IOException {
345      Field options = null;
346      try {
347        options = DistCp.class.getDeclaredField("inputOptions");
348      } catch (NoSuchFieldException | SecurityException e) {
349        options = DistCp.class.getDeclaredField("context");
350      }
351      options.setAccessible(true);
352      return getSourcePaths(options);
353    }
354
355
356
357    private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
358      FileSystem fs = pathToListFile.getFileSystem(conf);
359      fs.delete(pathToListFile, false);
360      return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile),
361        SequenceFile.Writer.keyClass(Text.class),
362        SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
363        SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
364    }
365
366  }
367
368  /**
369   * Do backup copy based on different types.
370   * @param context The backup info
371   * @param conf The hadoop configuration
372   * @param copyType The backup copy type
373   * @param options Options for customized ExportSnapshot or DistCp
374   * @throws Exception exception
375   */
376  @Override
377  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
378      BackupType copyType, String[] options) throws IOException {
379    int res = 0;
380
381    try {
382      if (copyType == BackupType.FULL) {
383        SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
384        LOG.debug("Doing SNAPSHOT_COPY");
385        // Make a new instance of conf to be used by the snapshot copy class.
386        snapshotCp.setConf(new Configuration(conf));
387        res = snapshotCp.run(options);
388
389      } else if (copyType == BackupType.INCREMENTAL) {
390        LOG.debug("Doing COPY_TYPE_DISTCP");
391        setSubTaskPercntgInWholeTask(1f);
392
393        BackupDistCp distcp =
394            new BackupDistCp(new Configuration(conf), null, context, backupManager);
395        // Handle a special case where the source file is a single file.
396        // In this case, distcp will not create the target dir. It just take the
397        // target as a file name and copy source file to the target (as a file name).
398        // We need to create the target dir before run distcp.
399        LOG.debug("DistCp options: " + Arrays.toString(options));
400        Path dest = new Path(options[options.length - 1]);
401        String[] newOptions = new String[options.length + 1];
402        System.arraycopy(options, 0, newOptions, 1, options.length);
403        newOptions[0] = "-async"; // run DisCp in async mode
404        FileSystem destfs = dest.getFileSystem(conf);
405        if (!destfs.exists(dest)) {
406          destfs.mkdirs(dest);
407        }
408        res = distcp.run(newOptions);
409      }
410      return res;
411
412    } catch (Exception e) {
413      throw new IOException(e);
414    }
415  }
416
417  @Override
418  public void cancel(String jobId) throws IOException {
419    JobID id = JobID.forName(jobId);
420    Cluster cluster = new Cluster(this.getConf());
421    try {
422      Job job = cluster.getJob(id);
423      if (job == null) {
424        LOG.error("No job found for " + id);
425        // should we throw exception
426        return;
427      }
428      if (job.isComplete() || job.isRetired()) {
429        return;
430      }
431
432      job.killJob();
433      LOG.debug("Killed copy job " + id);
434    } catch (InterruptedException e) {
435      throw new IOException(e);
436    }
437  }
438
439}