001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayDeque; 026import java.util.ArrayList; 027import java.util.Deque; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import org.apache.commons.io.IOUtils; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.RemoteIterator; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.backup.BackupInfo; 041import org.apache.hadoop.hbase.backup.BackupMergeJob; 042import org.apache.hadoop.hbase.backup.HBackupFileSystem; 043import org.apache.hadoop.hbase.backup.impl.BackupManifest; 044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 045import org.apache.hadoop.hbase.backup.util.BackupUtils; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 049import org.apache.hadoop.hbase.util.FSTableDescriptors; 050import org.apache.hadoop.hbase.util.Pair; 051import org.apache.hadoop.util.Tool; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * MapReduce implementation of {@link BackupMergeJob} Must be initialized with configuration of a 058 * backup destination cluster 059 */ 060@InterfaceAudience.Private 061public class MapReduceBackupMergeJob implements BackupMergeJob { 062 public static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupMergeJob.class); 063 064 protected Tool player; 065 protected Configuration conf; 066 067 public MapReduceBackupMergeJob() { 068 } 069 070 @Override 071 public Configuration getConf() { 072 return conf; 073 } 074 075 @Override 076 public void setConf(Configuration conf) { 077 this.conf = conf; 078 } 079 080 @Override 081 public void run(String[] backupIds) throws IOException { 082 String bulkOutputConfKey; 083 084 // TODO : run player on remote cluster 085 player = new MapReduceHFileSplitterJob(); 086 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 087 // Player reads all files in arbitrary directory structure and creates 088 // a Map task for each file 089 String bids = StringUtils.join(backupIds, ","); 090 091 if (LOG.isDebugEnabled()) { 092 LOG.debug("Merge backup images " + bids); 093 } 094 095 List<Pair<TableName, Path>> processedTableList = new ArrayList<>(); 096 boolean finishedTables = false; 097 Connection conn = ConnectionFactory.createConnection(getConf()); 098 BackupSystemTable table = new BackupSystemTable(conn); 099 FileSystem fs = null; 100 101 try { 102 103 // Get exclusive lock on backup system 104 table.startBackupExclusiveOperation(); 105 // Start merge operation 106 table.startMergeOperation(backupIds); 107 108 // Select most recent backup id 109 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds); 110 111 TableName[] tableNames = getTableNamesInBackupImages(backupIds); 112 113 BackupInfo bInfo = table.readBackupInfo(backupIds[0]); 114 String backupRoot = bInfo.getBackupRootDir(); 115 Path backupRootPath = new Path(backupRoot); 116 fs = backupRootPath.getFileSystem(conf); 117 118 for (int i = 0; i < tableNames.length; i++) { 119 LOG.info("Merge backup images for " + tableNames[i]); 120 121 // Find input directories for table 122 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); 123 String dirs = StringUtils.join(dirPaths, ","); 124 125 // bulkOutputPath should be on the same filesystem as backupRoot 126 Path tmpRestoreOutputDir = HBackupFileSystem.getBackupTmpDirPath(backupRoot); 127 Path bulkOutputPath = BackupUtils.getBulkOutputDir(tmpRestoreOutputDir, 128 BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false); 129 // Delete content if exists 130 if (fs.exists(bulkOutputPath)) { 131 if (!fs.delete(bulkOutputPath, true)) { 132 LOG.warn("Can not delete: " + bulkOutputPath); 133 } 134 } 135 Configuration conf = getConf(); 136 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 137 String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; 138 139 player.setConf(getConf()); 140 int result = player.run(playerArgs); 141 if (!succeeded(result)) { 142 throw new IOException("Can not merge backup images for " + dirs 143 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 144 } 145 // Add to processed table list 146 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath)); 147 LOG.debug("Merge Job finished:" + result); 148 } 149 List<TableName> tableList = toTableNameList(processedTableList); 150 table.updateProcessedTablesForMerge(tableList); 151 finishedTables = true; 152 153 // PHASE 2 (modification of a backup file system) 154 // Move existing mergedBackupId data into tmp directory 155 // we will need it later in case of a failure 156 Path tmpBackupDir = 157 HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId); 158 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId); 159 160 if (!fs.rename(backupDirPath, tmpBackupDir)) { 161 throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir); 162 } else { 163 LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir); 164 } 165 // Move new data into backup dest 166 for (Pair<TableName, Path> tn : processedTableList) { 167 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); 168 } 169 // Update backup manifest 170 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); 171 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete); 172 // Copy meta files back from tmp to backup dir 173 copyMetaData(fs, tmpBackupDir, backupDirPath); 174 // Delete tmp dir (Rename back during repair) 175 if (!fs.delete(tmpBackupDir, true)) { 176 // WARN and ignore 177 LOG.warn("Could not delete tmp dir: " + tmpBackupDir); 178 } 179 // Delete old data 180 deleteBackupImages(backupsToDelete, conn, fs, backupRoot); 181 // Finish merge session 182 table.finishMergeOperation(); 183 // Release lock 184 table.finishBackupExclusiveOperation(); 185 } catch (RuntimeException e) { 186 187 throw e; 188 } catch (Exception e) { 189 LOG.error(e.toString(), e); 190 if (!finishedTables) { 191 // cleanup bulk directories and finish merge 192 // merge MUST be repeated (no need for repair) 193 if (fs != null) { 194 cleanupBulkLoadDirs(fs, toPathList(processedTableList)); 195 } 196 table.finishMergeOperation(); 197 table.finishBackupExclusiveOperation(); 198 throw new IOException("Backup merge operation failed, you should try it again", e); 199 } else { 200 // backup repair must be run 201 throw new IOException( 202 "Backup merge operation failed, run backup repair tool to restore system's integrity", e); 203 } 204 } finally { 205 table.close(); 206 conn.close(); 207 } 208 } 209 210 /** 211 * Copy meta data to of a backup session 212 * @param fs file system 213 * @param tmpBackupDir temp backup directory, where meta is locaed 214 * @param backupDirPath new path for backup 215 * @throws IOException exception 216 */ 217 protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath) 218 throws IOException { 219 RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true); 220 List<Path> toKeep = new ArrayList<Path>(); 221 while (it.hasNext()) { 222 Path p = it.next().getPath(); 223 if (fs.isDirectory(p)) { 224 continue; 225 } 226 // Keep meta 227 String fileName = p.toString(); 228 if ( 229 fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 230 || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0 231 ) { 232 toKeep.add(p); 233 } 234 } 235 // Copy meta to destination 236 for (Path p : toKeep) { 237 Path newPath = convertToDest(p, backupDirPath); 238 copyFile(fs, p, newPath); 239 } 240 } 241 242 /** 243 * Copy file in DFS from p to newPath 244 * @param fs file system 245 * @param p old path 246 * @param newPath new path 247 * @throws IOException exception 248 */ 249 protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException { 250 try (InputStream in = fs.open(p); OutputStream out = fs.create(newPath, true)) { 251 IOUtils.copy(in, out); 252 } 253 boolean exists = fs.exists(newPath); 254 if (!exists) { 255 throw new IOException("Failed to copy meta file to: " + newPath); 256 } 257 } 258 259 /** 260 * Converts path before copying 261 * @param p path 262 * @param backupDirPath backup root 263 * @return converted path 264 */ 265 protected Path convertToDest(Path p, Path backupDirPath) { 266 String backupId = backupDirPath.getName(); 267 Deque<String> stack = new ArrayDeque<String>(); 268 String name = null; 269 while (true) { 270 name = p.getName(); 271 if (!name.equals(backupId)) { 272 stack.push(name); 273 p = p.getParent(); 274 } else { 275 break; 276 } 277 } 278 Path newPath = new Path(backupDirPath.toString()); 279 while (!stack.isEmpty()) { 280 newPath = new Path(newPath, stack.pop()); 281 } 282 return newPath; 283 } 284 285 protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) { 286 ArrayList<Path> list = new ArrayList<>(); 287 for (Pair<TableName, Path> p : processedTableList) { 288 list.add(p.getSecond()); 289 } 290 return list; 291 } 292 293 protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) { 294 ArrayList<TableName> list = new ArrayList<>(); 295 for (Pair<TableName, Path> p : processedTableList) { 296 list.add(p.getFirst()); 297 } 298 return list; 299 } 300 301 protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException { 302 for (Path path : pathList) { 303 if (!fs.delete(path, true)) { 304 LOG.warn("Can't delete " + path); 305 } 306 } 307 } 308 309 protected void updateBackupManifest(String backupRoot, String mergedBackupId, 310 List<String> backupsToDelete) throws IllegalArgumentException, IOException { 311 BackupManifest manifest = 312 HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); 313 manifest.getBackupImage().removeAncestors(backupsToDelete); 314 // save back 315 manifest.store(conf); 316 } 317 318 protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs, 319 String backupRoot) throws IOException { 320 // Delete from backup system table 321 try (BackupSystemTable table = new BackupSystemTable(conn)) { 322 for (String backupId : backupIds) { 323 table.deleteBackupInfo(backupId); 324 } 325 } 326 327 // Delete from file system 328 for (String backupId : backupIds) { 329 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); 330 331 if (!fs.delete(backupDirPath, true)) { 332 LOG.warn("Could not delete " + backupDirPath); 333 } 334 } 335 } 336 337 protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { 338 List<String> list = new ArrayList<>(); 339 for (String id : backupIds) { 340 if (id.equals(mergedBackupId)) { 341 continue; 342 } 343 list.add(id); 344 } 345 return list; 346 } 347 348 protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, 349 TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException { 350 Path dest = 351 new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName)); 352 353 FileStatus[] fsts = fs.listStatus(bulkOutputPath); 354 for (FileStatus fst : fsts) { 355 if (fst.isDirectory()) { 356 String family = fst.getPath().getName(); 357 Path newDst = new Path(dest, family); 358 if (fs.exists(newDst)) { 359 if (!fs.delete(newDst, true)) { 360 throw new IOException("failed to delete :" + newDst); 361 } 362 } else { 363 fs.mkdirs(dest); 364 } 365 boolean result = fs.rename(fst.getPath(), dest); 366 LOG.debug("MoveData from " + fst.getPath() + " to " + dest + " result=" + result); 367 } 368 } 369 } 370 371 protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { 372 Set<TableName> allSet = new HashSet<>(); 373 374 try (Connection conn = ConnectionFactory.createConnection(conf); 375 BackupSystemTable table = new BackupSystemTable(conn)) { 376 for (String backupId : backupIds) { 377 BackupInfo bInfo = table.readBackupInfo(backupId); 378 379 allSet.addAll(bInfo.getTableNames()); 380 } 381 } 382 383 TableName[] ret = new TableName[allSet.size()]; 384 return allSet.toArray(ret); 385 } 386 387 protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, 388 String[] backupIds) throws IOException { 389 List<Path> dirs = new ArrayList<>(); 390 391 for (String backupId : backupIds) { 392 Path fileBackupDirPath = 393 new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName)); 394 if (fs.exists(fileBackupDirPath)) { 395 dirs.add(fileBackupDirPath); 396 } else { 397 if (LOG.isDebugEnabled()) { 398 LOG.debug("File: " + fileBackupDirPath + " does not exist."); 399 } 400 } 401 } 402 Path[] ret = new Path[dirs.size()]; 403 return dirs.toArray(ret); 404 } 405}