001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Field; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import java.math.BigDecimal; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Objects; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.backup.BackupCopyJob; 035import org.apache.hadoop.hbase.backup.BackupInfo; 036import org.apache.hadoop.hbase.backup.BackupType; 037import org.apache.hadoop.hbase.backup.impl.BackupManager; 038import org.apache.hadoop.hbase.backup.util.BackupUtils; 039import org.apache.hadoop.hbase.snapshot.ExportSnapshot; 040import org.apache.hadoop.io.SequenceFile; 041import org.apache.hadoop.io.Text; 042import org.apache.hadoop.mapreduce.Cluster; 043import org.apache.hadoop.mapreduce.Counters; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.JobID; 046import org.apache.hadoop.tools.CopyListingFileStatus; 047import org.apache.hadoop.tools.DistCp; 048import org.apache.hadoop.tools.DistCpConstants; 049import org.apache.hadoop.tools.DistCpOptionSwitch; 050import org.apache.hadoop.tools.DistCpOptions; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy 057 * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the 058 * other is copying for incremental log files, which bases on extending DistCp's function. 059 */ 060@InterfaceAudience.Private 061public class MapReduceBackupCopyJob implements BackupCopyJob { 062 public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve"; 063 064 // This prefix specifies the DistCp options to be used during backup copy 065 public static final String BACKUP_COPY_OPTION_PREFIX = "hbase.backup.copy."; 066 067 private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class); 068 069 private Configuration conf; 070 071 // Accumulated progress within the whole backup process for the copy operation 072 private float progressDone = 0.1f; 073 private long bytesCopied = 0; 074 private static float INIT_PROGRESS = 0.1f; 075 076 // The percentage of the current copy task within the whole task if multiple time copies are 077 // needed. The default value is 100%, which means only 1 copy task for the whole. 078 private float subTaskPercntgInWholeTask = 1f; 079 080 public MapReduceBackupCopyJob() { 081 } 082 083 @Override 084 public Configuration getConf() { 085 return conf; 086 } 087 088 @Override 089 public void setConf(Configuration conf) { 090 this.conf = conf; 091 } 092 093 /** 094 * Get the current copy task percentage within the whole task if multiple copies are needed. 095 * @return the current copy task percentage 096 */ 097 public float getSubTaskPercntgInWholeTask() { 098 return subTaskPercntgInWholeTask; 099 } 100 101 /** 102 * Set the current copy task percentage within the whole task if multiple copies are needed. Must 103 * be called before calling 104 * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])} 105 * @param subTaskPercntgInWholeTask The percentage of the copy subtask 106 */ 107 public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { 108 this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; 109 } 110 111 static class SnapshotCopy extends ExportSnapshot { 112 private BackupInfo backupInfo; 113 private TableName table; 114 115 public SnapshotCopy(BackupInfo backupInfo, TableName table) { 116 super(); 117 this.backupInfo = backupInfo; 118 this.table = table; 119 } 120 121 public TableName getTable() { 122 return this.table; 123 } 124 125 public BackupInfo getBackupInfo() { 126 return this.backupInfo; 127 } 128 } 129 130 /** 131 * Update the ongoing backup with new progress. 132 * @param backupInfo backup info 133 * @param newProgress progress 134 * @param bytesCopied bytes copied 135 * @throws IOException exception 136 */ 137 static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, int newProgress, 138 long bytesCopied) throws IOException { 139 // compose the new backup progress data, using fake number for now 140 String backupProgressData = newProgress + "%"; 141 142 backupInfo.setProgress(newProgress); 143 backupManager.updateBackupInfo(backupInfo); 144 LOG.debug("Backup progress data \"" + backupProgressData 145 + "\" has been updated to backup system table for " + backupInfo.getBackupId()); 146 } 147 148 /** 149 * Extends DistCp for progress updating to backup system table during backup. Using DistCpV2 150 * (MAPREDUCE-2765). Simply extend it and override execute() method to get the Job reference for 151 * progress updating. Only the argument "src1, [src2, [...]] dst" is supported, no more DistCp 152 * options. 153 */ 154 155 class BackupDistCp extends DistCp { 156 157 private BackupInfo backupInfo; 158 private BackupManager backupManager; 159 160 public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo, 161 BackupManager backupManager) throws Exception { 162 super(conf, options); 163 this.backupInfo = backupInfo; 164 this.backupManager = backupManager; 165 } 166 167 @Override 168 public Job execute() throws Exception { 169 170 // reflection preparation for private methods and fields 171 Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; 172 Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); 173 174 Field fieldInputOptions = getInputOptionsField(classDistCp); 175 Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); 176 177 methodCleanup.setAccessible(true); 178 fieldInputOptions.setAccessible(true); 179 fieldSubmitted.setAccessible(true); 180 181 // execute() logic starts here 182 assert fieldInputOptions.get(this) != null; 183 184 Job job = null; 185 try { 186 187 List<Path> srcs = getSourcePaths(fieldInputOptions); 188 189 long totalSrcLgth = 0; 190 for (Path aSrc : srcs) { 191 totalSrcLgth += BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc); 192 } 193 194 // Async call 195 job = super.execute(); 196 // Update the copy progress to system table every 0.5s if progress value changed 197 int progressReportFreq = MapReduceBackupCopyJob.this.getConf() 198 .getInt("hbase.backup.progressreport.frequency", 500); 199 float lastProgress = progressDone; 200 while (!job.isComplete()) { 201 float newProgress = 202 progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); 203 204 if (newProgress > lastProgress) { 205 206 BigDecimal progressData = 207 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); 208 String newProgressStr = progressData + "%"; 209 LOG.info("Progress: " + newProgressStr); 210 updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); 211 LOG.debug("Backup progress data updated to backup system table: \"Progress: " 212 + newProgressStr + ".\""); 213 lastProgress = newProgress; 214 } 215 Thread.sleep(progressReportFreq); 216 } 217 // update the progress data after copy job complete 218 float newProgress = 219 progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); 220 BigDecimal progressData = 221 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); 222 223 String newProgressStr = progressData + "%"; 224 LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask 225 + " mapProgress: " + job.mapProgress()); 226 227 // accumulate the overall backup progress 228 progressDone = newProgress; 229 bytesCopied += totalSrcLgth; 230 231 updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); 232 LOG.debug("Backup progress data updated to backup system table: \"Progress: " 233 + newProgressStr + " - " + bytesCopied + " bytes copied.\""); 234 } catch (Throwable t) { 235 LOG.error(t.toString(), t); 236 throw t; 237 } 238 239 String jobID = job.getJobID().toString(); 240 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); 241 242 LOG.debug( 243 "DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + job.isSuccessful()); 244 Counters ctrs = job.getCounters(); 245 LOG.debug(Objects.toString(ctrs)); 246 if (job.isComplete() && !job.isSuccessful()) { 247 throw new Exception("DistCp job-id: " + jobID + " failed"); 248 } 249 250 return job; 251 } 252 253 private Field getInputOptionsField(Class<?> classDistCp) throws IOException { 254 Field f = null; 255 try { 256 f = classDistCp.getDeclaredField("inputOptions"); 257 } catch (Exception e) { 258 // Haddop 3 259 try { 260 f = classDistCp.getDeclaredField("context"); 261 } catch (NoSuchFieldException | SecurityException e1) { 262 throw new IOException(e1); 263 } 264 } 265 return f; 266 } 267 268 @SuppressWarnings("unchecked") 269 private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException { 270 Object options; 271 try { 272 options = fieldInputOptions.get(this); 273 if (options instanceof DistCpOptions) { 274 return ((DistCpOptions) options).getSourcePaths(); 275 } else { 276 // Hadoop 3 277 Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext"); 278 Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths"); 279 methodGetSourcePaths.setAccessible(true); 280 281 return (List<Path>) methodGetSourcePaths.invoke(options); 282 } 283 } catch (IllegalArgumentException | IllegalAccessException | ClassNotFoundException 284 | NoSuchMethodException | SecurityException | InvocationTargetException e) { 285 throw new IOException(e); 286 } 287 288 } 289 290 @Override 291 protected Path createInputFileListing(Job job) throws IOException { 292 293 if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) { 294 return super.createInputFileListing(job); 295 } 296 long totalBytesExpected = 0; 297 int totalRecords = 0; 298 Path fileListingPath = getFileListingPath(); 299 try (SequenceFile.Writer writer = getWriter(fileListingPath)) { 300 List<Path> srcFiles = getSourceFiles(); 301 if (srcFiles.size() == 0) { 302 return fileListingPath; 303 } 304 totalRecords = srcFiles.size(); 305 FileSystem fs = srcFiles.get(0).getFileSystem(conf); 306 for (Path path : srcFiles) { 307 FileStatus fst = fs.getFileStatus(path); 308 totalBytesExpected += fst.getLen(); 309 Text key = getKey(path); 310 writer.append(key, new CopyListingFileStatus(fst)); 311 } 312 writer.close(); 313 314 // update jobs configuration 315 316 Configuration cfg = job.getConfiguration(); 317 cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected); 318 cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString()); 319 cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords); 320 } catch (NoSuchFieldException | SecurityException | IllegalArgumentException 321 | IllegalAccessException | NoSuchMethodException | ClassNotFoundException 322 | InvocationTargetException e) { 323 throw new IOException(e); 324 } 325 return fileListingPath; 326 } 327 328 private Text getKey(Path path) { 329 int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1); 330 int count = 0; 331 String relPath = ""; 332 while (count++ < level) { 333 relPath = Path.SEPARATOR + path.getName() + relPath; 334 path = path.getParent(); 335 } 336 return new Text(relPath); 337 } 338 339 private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException, 340 IllegalArgumentException, IllegalAccessException, NoSuchMethodException, 341 ClassNotFoundException, InvocationTargetException, IOException { 342 Field options = null; 343 try { 344 options = DistCp.class.getDeclaredField("inputOptions"); 345 } catch (NoSuchFieldException | SecurityException e) { 346 options = DistCp.class.getDeclaredField("context"); 347 } 348 options.setAccessible(true); 349 return getSourcePaths(options); 350 } 351 352 private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { 353 FileSystem fs = pathToListFile.getFileSystem(conf); 354 fs.delete(pathToListFile, false); 355 return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile), 356 SequenceFile.Writer.keyClass(Text.class), 357 SequenceFile.Writer.valueClass(CopyListingFileStatus.class), 358 SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); 359 } 360 361 } 362 363 /** 364 * Do backup copy based on different types. 365 * @param context The backup info 366 * @param conf The hadoop configuration 367 * @param copyType The backup copy type 368 * @param options Options for customized ExportSnapshot or DistCp 369 * @throws IOException exception 370 */ 371 @Override 372 public int copy(BackupInfo context, BackupManager backupManager, Configuration conf, 373 BackupType copyType, String[] options) throws IOException { 374 int res = 0; 375 376 try { 377 if (copyType == BackupType.FULL) { 378 SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1])); 379 LOG.debug("Doing SNAPSHOT_COPY"); 380 // Make a new instance of conf to be used by the snapshot copy class. 381 snapshotCp.setConf(new Configuration(conf)); 382 res = snapshotCp.run(options); 383 384 } else if (copyType == BackupType.INCREMENTAL) { 385 LOG.debug("Doing COPY_TYPE_DISTCP"); 386 setSubTaskPercntgInWholeTask(1f); 387 388 BackupDistCp distcp = 389 new BackupDistCp(new Configuration(conf), null, context, backupManager); 390 // Handle a special case where the source file is a single file. 391 // In this case, distcp will not create the target dir. It just take the 392 // target as a file name and copy source file to the target (as a file name). 393 // We need to create the target dir before run distcp. 394 LOG.debug("DistCp options: " + Arrays.toString(options)); 395 Path dest = new Path(options[options.length - 1]); 396 String[] newOptions = new String[options.length + 1]; 397 System.arraycopy(options, 0, newOptions, 1, options.length); 398 newOptions[0] = "-async"; // run DisCp in async mode 399 FileSystem destfs = dest.getFileSystem(conf); 400 if (!destfs.exists(dest)) { 401 destfs.mkdirs(dest); 402 } 403 404 List<String> distCpOptionsFromConf = parseDistCpOptions(conf); 405 String[] finalOptions = new String[newOptions.length + distCpOptionsFromConf.size()]; 406 for (int i = 0; i < distCpOptionsFromConf.size(); i++) { 407 finalOptions[i] = distCpOptionsFromConf.get(i); 408 } 409 System.arraycopy(newOptions, 0, finalOptions, distCpOptionsFromConf.size(), 410 newOptions.length); 411 res = distcp.run(finalOptions); 412 } 413 return res; 414 415 } catch (Exception e) { 416 throw new IOException(e); 417 } 418 } 419 420 @Override 421 public void cancel(String jobId) throws IOException { 422 JobID id = JobID.forName(jobId); 423 Cluster cluster = new Cluster(this.getConf()); 424 try { 425 Job job = cluster.getJob(id); 426 if (job == null) { 427 LOG.error("No job found for " + id); 428 // should we throw exception 429 return; 430 } 431 if (job.isComplete() || job.isRetired()) { 432 return; 433 } 434 435 job.killJob(); 436 LOG.debug("Killed copy job " + id); 437 } catch (InterruptedException e) { 438 throw new IOException(e); 439 } 440 } 441 442 protected static List<String> parseDistCpOptions(Configuration conf) { 443 List<String> extraArgsFromConf = new ArrayList<>(); 444 445 for (DistCpOptionSwitch optionSwitch : DistCpOptionSwitch.values()) { 446 String configLabel = BACKUP_COPY_OPTION_PREFIX + optionSwitch.getConfigLabel(); 447 if (conf.get(configLabel) != null) { 448 if (optionSwitch.getOption().hasArg()) { 449 extraArgsFromConf.add("-" + optionSwitch.getOption().getOpt()); 450 extraArgsFromConf.add(conf.get(configLabel)); 451 } else { 452 boolean value = conf.getBoolean(configLabel, false); 453 if (value) { 454 extraArgsFromConf.add("-" + optionSwitch.getOption().getOpt()); 455 } 456 } 457 } 458 } 459 460 return extraArgsFromConf; 461 } 462 463}