001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; 021 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.TreeMap; 030import org.apache.commons.lang3.StringUtils; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.backup.BackupCopyJob; 035import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; 036import org.apache.hadoop.hbase.backup.BackupRequest; 037import org.apache.hadoop.hbase.backup.BackupRestoreFactory; 038import org.apache.hadoop.hbase.backup.BackupType; 039import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; 040import org.apache.hadoop.hbase.backup.util.BackupUtils; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.mapreduce.WALPlayer; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.CommonFSUtils; 046import org.apache.hadoop.hbase.util.HFileArchiveUtil; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 049import org.apache.hadoop.util.Tool; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Incremental backup implementation. See the {@link #execute() execute} method. 056 */ 057@InterfaceAudience.Private 058public class IncrementalTableBackupClient extends TableBackupClient { 059 private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); 060 061 protected IncrementalTableBackupClient() { 062 } 063 064 public IncrementalTableBackupClient(final Connection conn, final String backupId, 065 BackupRequest request) throws IOException { 066 super(conn, backupId, request); 067 } 068 069 protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { 070 List<String> list = new ArrayList<>(); 071 for (String file : incrBackupFileList) { 072 Path p = new Path(file); 073 if (fs.exists(p) || isActiveWalPath(p)) { 074 list.add(file); 075 } else { 076 LOG.warn("Can't find file: " + file); 077 } 078 } 079 return list; 080 } 081 082 /** 083 * Check if a given path is belongs to active WAL directory 084 * @param p path 085 * @return true, if yes 086 */ 087 protected boolean isActiveWalPath(Path p) { 088 return !AbstractFSWALProvider.isArchivedLogFile(p); 089 } 090 091 protected static int getIndex(TableName tbl, List<TableName> sTableList) { 092 if (sTableList == null) { 093 return 0; 094 } 095 096 for (int i = 0; i < sTableList.size(); i++) { 097 if (tbl.equals(sTableList.get(i))) { 098 return i; 099 } 100 } 101 return -1; 102 } 103 104 /* 105 * Reads bulk load records from backup table, iterates through the records and forms the paths for 106 * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination 107 * @param sTableList list of tables to be backed up 108 * @return map of table to List of files 109 */ 110 @SuppressWarnings("unchecked") 111 protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) 112 throws IOException { 113 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()]; 114 List<String> activeFiles = new ArrayList<>(); 115 List<String> archiveFiles = new ArrayList<>(); 116 Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair = 117 backupManager.readBulkloadRows(sTableList); 118 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst(); 119 FileSystem tgtFs; 120 try { 121 tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); 122 } catch (URISyntaxException use) { 123 throw new IOException("Unable to get FileSystem", use); 124 } 125 Path rootdir = CommonFSUtils.getRootDir(conf); 126 Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); 127 128 for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : map 129 .entrySet()) { 130 TableName srcTable = tblEntry.getKey(); 131 132 int srcIdx = getIndex(srcTable, sTableList); 133 if (srcIdx < 0) { 134 LOG.warn("Couldn't find " + srcTable + " in source table List"); 135 continue; 136 } 137 if (mapForSrc[srcIdx] == null) { 138 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 139 } 140 Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); 141 Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), 142 srcTable.getQualifierAsString()); 143 for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> regionEntry : tblEntry 144 .getValue().entrySet()) { 145 String regionName = regionEntry.getKey(); 146 Path regionDir = new Path(tblDir, regionName); 147 // map from family to List of hfiles 148 for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : regionEntry.getValue() 149 .entrySet()) { 150 String fam = famEntry.getKey(); 151 Path famDir = new Path(regionDir, fam); 152 List<Path> files; 153 if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) { 154 files = new ArrayList<>(); 155 mapForSrc[srcIdx].put(Bytes.toBytes(fam), files); 156 } else { 157 files = mapForSrc[srcIdx].get(Bytes.toBytes(fam)); 158 } 159 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); 160 String tblName = srcTable.getQualifierAsString(); 161 Path tgtFam = new Path(new Path(tgtTable, regionName), fam); 162 if (!tgtFs.mkdirs(tgtFam)) { 163 throw new IOException("couldn't create " + tgtFam); 164 } 165 for (Pair<String, Boolean> fileWithState : famEntry.getValue()) { 166 String file = fileWithState.getFirst(); 167 int idx = file.lastIndexOf("/"); 168 String filename = file; 169 if (idx > 0) { 170 filename = file.substring(idx + 1); 171 } 172 Path p = new Path(famDir, filename); 173 Path tgt = new Path(tgtFam, filename); 174 Path archive = new Path(archiveDir, filename); 175 if (fs.exists(p)) { 176 if (LOG.isTraceEnabled()) { 177 LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); 178 } 179 if (LOG.isTraceEnabled()) { 180 LOG.trace("copying " + p + " to " + tgt); 181 } 182 activeFiles.add(p.toString()); 183 } else if (fs.exists(archive)) { 184 LOG.debug("copying archive " + archive + " to " + tgt); 185 archiveFiles.add(archive.toString()); 186 } 187 files.add(tgt); 188 } 189 } 190 } 191 } 192 193 copyBulkLoadedFiles(activeFiles, archiveFiles); 194 backupManager.deleteBulkLoadedRows(pair.getSecond()); 195 return mapForSrc; 196 } 197 198 private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles) 199 throws IOException { 200 try { 201 // Enable special mode of BackupDistCp 202 conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); 203 // Copy active files 204 String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); 205 int attempt = 1; 206 while (activeFiles.size() > 0) { 207 LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++); 208 String[] toCopy = new String[activeFiles.size()]; 209 activeFiles.toArray(toCopy); 210 // Active file can be archived during copy operation, 211 // we need to handle this properly 212 try { 213 incrementalCopyHFiles(toCopy, tgtDest); 214 break; 215 } catch (IOException e) { 216 // Check if some files got archived 217 // Update active and archived lists 218 // When file is being moved from active to archive 219 // directory, the number of active files decreases 220 int numOfActive = activeFiles.size(); 221 updateFileLists(activeFiles, archiveFiles); 222 if (activeFiles.size() < numOfActive) { 223 continue; 224 } 225 // if not - throw exception 226 throw e; 227 } 228 } 229 // If incremental copy will fail for archived files 230 // we will have partially loaded files in backup destination (only files from active data 231 // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up 232 if (archiveFiles.size() > 0) { 233 String[] toCopy = new String[archiveFiles.size()]; 234 archiveFiles.toArray(toCopy); 235 incrementalCopyHFiles(toCopy, tgtDest); 236 } 237 } finally { 238 // Disable special mode of BackupDistCp 239 conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); 240 } 241 } 242 243 private void updateFileLists(List<String> activeFiles, List<String> archiveFiles) 244 throws IOException { 245 List<String> newlyArchived = new ArrayList<>(); 246 247 for (String spath : activeFiles) { 248 if (!fs.exists(new Path(spath))) { 249 newlyArchived.add(spath); 250 } 251 } 252 253 if (newlyArchived.size() > 0) { 254 activeFiles.removeAll(newlyArchived); 255 archiveFiles.addAll(newlyArchived); 256 } 257 258 LOG.debug(newlyArchived.size() + " files have been archived."); 259 } 260 261 @Override 262 public void execute() throws IOException { 263 try { 264 // case PREPARE_INCREMENTAL: 265 beginBackup(backupManager, backupInfo); 266 backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); 267 LOG.debug("For incremental backup, current table set is " 268 + backupManager.getIncrementalBackupTableSet()); 269 newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); 270 } catch (Exception e) { 271 // fail the overall backup and return 272 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 273 BackupType.INCREMENTAL, conf); 274 throw new IOException(e); 275 } 276 277 // case INCREMENTAL_COPY: 278 try { 279 // copy out the table and region info files for each table 280 BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); 281 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT 282 convertWALsToHFiles(); 283 incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, 284 backupInfo.getBackupRootDir()); 285 } catch (Exception e) { 286 String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; 287 // fail the overall backup and return 288 failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); 289 throw new IOException(e); 290 } 291 // case INCR_BACKUP_COMPLETE: 292 // set overall backup status: complete. Here we make sure to complete the backup. 293 // After this checkpoint, even if entering cancel process, will let the backup finished 294 try { 295 // Set the previousTimestampMap which is before this current log roll to the manifest. 296 Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap(); 297 backupInfo.setIncrTimestampMap(previousTimestampMap); 298 299 // The table list in backupInfo is good for both full backup and incremental backup. 300 // For incremental backup, it contains the incremental backup table set. 301 backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); 302 303 Map<TableName, Map<String, Long>> newTableSetTimestampMap = 304 backupManager.readLogTimestampMap(); 305 306 backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); 307 Long newStartCode = 308 BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); 309 backupManager.writeBackupStartCode(newStartCode); 310 311 handleBulkLoad(backupInfo.getTableNames()); 312 // backup complete 313 completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf); 314 315 } catch (IOException e) { 316 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 317 BackupType.INCREMENTAL, conf); 318 throw new IOException(e); 319 } 320 } 321 322 protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { 323 try { 324 LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest); 325 // set overall backup phase: incremental_copy 326 backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); 327 // get incremental backup file list and prepare parms for DistCp 328 String[] strArr = new String[files.length + 1]; 329 System.arraycopy(files, 0, strArr, 0, files.length); 330 strArr[strArr.length - 1] = backupDest; 331 332 String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); 333 if (LOG.isDebugEnabled()) { 334 LOG.debug("Setting incremental copy HFiles job name to : " + jobname); 335 } 336 conf.set(JOB_NAME_CONF_KEY, jobname); 337 338 BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); 339 int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); 340 if (res != 0) { 341 LOG.error("Copy incremental HFile files failed with return code: " + res + "."); 342 throw new IOException( 343 "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest); 344 } 345 LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest 346 + " finished."); 347 } finally { 348 deleteBulkLoadDirectory(); 349 } 350 } 351 352 protected void deleteBulkLoadDirectory() throws IOException { 353 // delete original bulk load directory on method exit 354 Path path = getBulkOutputDir(); 355 FileSystem fs = FileSystem.get(path.toUri(), conf); 356 boolean result = fs.delete(path, true); 357 if (!result) { 358 LOG.warn("Could not delete " + path); 359 } 360 } 361 362 protected void convertWALsToHFiles() throws IOException { 363 // get incremental backup file list and prepare parameters for DistCp 364 List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); 365 // Get list of tables in incremental backup set 366 Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); 367 // filter missing files out (they have been copied by previous backups) 368 incrBackupFileList = filterMissingFiles(incrBackupFileList); 369 List<String> tableList = new ArrayList<String>(); 370 for (TableName table : tableSet) { 371 // Check if table exists 372 if (tableExists(table, conn)) { 373 tableList.add(table.getNameAsString()); 374 } else { 375 LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); 376 } 377 } 378 walToHFiles(incrBackupFileList, tableList); 379 380 } 381 382 protected boolean tableExists(TableName table, Connection conn) throws IOException { 383 try (Admin admin = conn.getAdmin()) { 384 return admin.tableExists(table); 385 } 386 } 387 388 protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException { 389 Tool player = new WALPlayer(); 390 391 // Player reads all files in arbitrary directory structure and creates 392 // a Map task for each file. We use ';' as separator 393 // because WAL file names contains ',' 394 String dirs = StringUtils.join(dirPaths, ';'); 395 String jobname = "Incremental_Backup-" + backupId; 396 397 Path bulkOutputPath = getBulkOutputDir(); 398 conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 399 conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 400 conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 401 conf.set(JOB_NAME_CONF_KEY, jobname); 402 String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; 403 404 try { 405 player.setConf(conf); 406 int result = player.run(playerArgs); 407 if (result != 0) { 408 throw new IOException("WAL Player failed"); 409 } 410 conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); 411 conf.unset(JOB_NAME_CONF_KEY); 412 } catch (IOException e) { 413 throw e; 414 } catch (Exception ee) { 415 throw new IOException("Can not convert from directory " + dirs 416 + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); 417 } 418 } 419 420 protected Path getBulkOutputDirForTable(TableName table) { 421 Path tablePath = getBulkOutputDir(); 422 tablePath = new Path(tablePath, table.getNamespaceAsString()); 423 tablePath = new Path(tablePath, table.getQualifierAsString()); 424 return new Path(tablePath, "data"); 425 } 426 427 protected Path getBulkOutputDir() { 428 String backupId = backupInfo.getBackupId(); 429 Path path = new Path(backupInfo.getBackupRootDir()); 430 path = new Path(path, ".tmp"); 431 path = new Path(path, backupId); 432 return path; 433 } 434}