001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; 021 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeMap; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.backup.BackupCopyJob; 036import org.apache.hadoop.hbase.backup.BackupInfo; 037import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; 038import org.apache.hadoop.hbase.backup.BackupRequest; 039import org.apache.hadoop.hbase.backup.BackupRestoreFactory; 040import org.apache.hadoop.hbase.backup.BackupType; 041import org.apache.hadoop.hbase.backup.HBackupFileSystem; 042import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; 043import org.apache.hadoop.hbase.backup.util.BackupUtils; 044import org.apache.hadoop.hbase.client.Admin; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.mapreduce.WALPlayer; 048import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 049import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.HFileArchiveUtil; 053import org.apache.hadoop.hbase.util.Pair; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.hadoop.util.Tool; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 061 062/** 063 * Incremental backup implementation. See the {@link #execute() execute} method. 064 */ 065@InterfaceAudience.Private 066public class IncrementalTableBackupClient extends TableBackupClient { 067 private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); 068 069 protected IncrementalTableBackupClient() { 070 } 071 072 public IncrementalTableBackupClient(final Connection conn, final String backupId, 073 BackupRequest request) throws IOException { 074 super(conn, backupId, request); 075 } 076 077 protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { 078 List<String> list = new ArrayList<>(); 079 for (String file : incrBackupFileList) { 080 Path p = new Path(file); 081 if (fs.exists(p) || isActiveWalPath(p)) { 082 list.add(file); 083 } else { 084 LOG.warn("Can't find file: " + file); 085 } 086 } 087 return list; 088 } 089 090 /** 091 * Check if a given path is belongs to active WAL directory 092 * @param p path 093 * @return true, if yes 094 */ 095 protected boolean isActiveWalPath(Path p) { 096 return !AbstractFSWALProvider.isArchivedLogFile(p); 097 } 098 099 protected static int getIndex(TableName tbl, List<TableName> sTableList) { 100 if (sTableList == null) { 101 return 0; 102 } 103 104 for (int i = 0; i < sTableList.size(); i++) { 105 if (tbl.equals(sTableList.get(i))) { 106 return i; 107 } 108 } 109 return -1; 110 } 111 112 /* 113 * Reads bulk load records from backup table, iterates through the records and forms the paths for 114 * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT 115 * clean up the entries in the bulk load system table. Those entries should not be cleaned until 116 * the backup is marked as complete. 117 * @param sTableList list of tables to be backed up 118 * @return the rowkeys of bulk loaded files 119 */ 120 @SuppressWarnings("unchecked") 121 protected List<byte[]> handleBulkLoad(List<TableName> sTableList) throws IOException { 122 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()]; 123 List<String> activeFiles = new ArrayList<>(); 124 List<String> archiveFiles = new ArrayList<>(); 125 Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair = 126 backupManager.readBulkloadRows(sTableList); 127 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst(); 128 FileSystem tgtFs; 129 try { 130 tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); 131 } catch (URISyntaxException use) { 132 throw new IOException("Unable to get FileSystem", use); 133 } 134 Path rootdir = CommonFSUtils.getRootDir(conf); 135 Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); 136 137 for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : map 138 .entrySet()) { 139 TableName srcTable = tblEntry.getKey(); 140 141 int srcIdx = getIndex(srcTable, sTableList); 142 if (srcIdx < 0) { 143 LOG.warn("Couldn't find " + srcTable + " in source table List"); 144 continue; 145 } 146 if (mapForSrc[srcIdx] == null) { 147 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 148 } 149 Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); 150 Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), 151 srcTable.getQualifierAsString()); 152 for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> regionEntry : tblEntry 153 .getValue().entrySet()) { 154 String regionName = regionEntry.getKey(); 155 Path regionDir = new Path(tblDir, regionName); 156 // map from family to List of hfiles 157 for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : regionEntry.getValue() 158 .entrySet()) { 159 String fam = famEntry.getKey(); 160 Path famDir = new Path(regionDir, fam); 161 List<Path> files; 162 if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) { 163 files = new ArrayList<>(); 164 mapForSrc[srcIdx].put(Bytes.toBytes(fam), files); 165 } else { 166 files = mapForSrc[srcIdx].get(Bytes.toBytes(fam)); 167 } 168 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); 169 String tblName = srcTable.getQualifierAsString(); 170 Path tgtFam = new Path(new Path(tgtTable, regionName), fam); 171 if (!tgtFs.mkdirs(tgtFam)) { 172 throw new IOException("couldn't create " + tgtFam); 173 } 174 for (Pair<String, Boolean> fileWithState : famEntry.getValue()) { 175 String file = fileWithState.getFirst(); 176 int idx = file.lastIndexOf("/"); 177 String filename = file; 178 if (idx > 0) { 179 filename = file.substring(idx + 1); 180 } 181 Path p = new Path(famDir, filename); 182 Path tgt = new Path(tgtFam, filename); 183 Path archive = new Path(archiveDir, filename); 184 if (fs.exists(p)) { 185 if (LOG.isTraceEnabled()) { 186 LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); 187 } 188 if (LOG.isTraceEnabled()) { 189 LOG.trace("copying " + p + " to " + tgt); 190 } 191 activeFiles.add(p.toString()); 192 } else if (fs.exists(archive)) { 193 LOG.debug("copying archive " + archive + " to " + tgt); 194 archiveFiles.add(archive.toString()); 195 } 196 files.add(tgt); 197 } 198 } 199 } 200 } 201 202 copyBulkLoadedFiles(activeFiles, archiveFiles); 203 204 return pair.getSecond(); 205 } 206 207 private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles) 208 throws IOException { 209 try { 210 // Enable special mode of BackupDistCp 211 conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); 212 // Copy active files 213 String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); 214 int attempt = 1; 215 while (activeFiles.size() > 0) { 216 LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++); 217 String[] toCopy = new String[activeFiles.size()]; 218 activeFiles.toArray(toCopy); 219 // Active file can be archived during copy operation, 220 // we need to handle this properly 221 try { 222 incrementalCopyHFiles(toCopy, tgtDest); 223 break; 224 } catch (IOException e) { 225 // Check if some files got archived 226 // Update active and archived lists 227 // When file is being moved from active to archive 228 // directory, the number of active files decreases 229 int numOfActive = activeFiles.size(); 230 updateFileLists(activeFiles, archiveFiles); 231 if (activeFiles.size() < numOfActive) { 232 continue; 233 } 234 // if not - throw exception 235 throw e; 236 } 237 } 238 // If incremental copy will fail for archived files 239 // we will have partially loaded files in backup destination (only files from active data 240 // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up 241 if (archiveFiles.size() > 0) { 242 String[] toCopy = new String[archiveFiles.size()]; 243 archiveFiles.toArray(toCopy); 244 incrementalCopyHFiles(toCopy, tgtDest); 245 } 246 } finally { 247 // Disable special mode of BackupDistCp 248 conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); 249 } 250 } 251 252 private void updateFileLists(List<String> activeFiles, List<String> archiveFiles) 253 throws IOException { 254 List<String> newlyArchived = new ArrayList<>(); 255 256 for (String spath : activeFiles) { 257 if (!fs.exists(new Path(spath))) { 258 newlyArchived.add(spath); 259 } 260 } 261 262 if (newlyArchived.size() > 0) { 263 activeFiles.removeAll(newlyArchived); 264 archiveFiles.addAll(newlyArchived); 265 } 266 267 LOG.debug(newlyArchived.size() + " files have been archived."); 268 } 269 270 /** 271 * @throws IOException If the execution of the backup fails 272 * @throws ColumnFamilyMismatchException If the column families of the current table do not match 273 * the column families for the last full backup. In which 274 * case, a full backup should be taken 275 */ 276 @Override 277 public void execute() throws IOException, ColumnFamilyMismatchException { 278 try { 279 Map<TableName, String> tablesToFullBackupIds = getFullBackupIds(); 280 verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds); 281 282 // case PREPARE_INCREMENTAL: 283 beginBackup(backupManager, backupInfo); 284 backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); 285 LOG.debug("For incremental backup, current table set is " 286 + backupManager.getIncrementalBackupTableSet()); 287 newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); 288 } catch (Exception e) { 289 // fail the overall backup and return 290 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 291 BackupType.INCREMENTAL, conf); 292 throw new IOException(e); 293 } 294 295 // case INCREMENTAL_COPY: 296 try { 297 // copy out the table and region info files for each table 298 BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); 299 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT 300 convertWALsToHFiles(); 301 incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, 302 backupInfo.getBackupRootDir()); 303 } catch (Exception e) { 304 String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; 305 // fail the overall backup and return 306 failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); 307 throw new IOException(e); 308 } 309 // case INCR_BACKUP_COMPLETE: 310 // set overall backup status: complete. Here we make sure to complete the backup. 311 // After this checkpoint, even if entering cancel process, will let the backup finished 312 try { 313 // Set the previousTimestampMap which is before this current log roll to the manifest. 314 Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap(); 315 backupInfo.setIncrTimestampMap(previousTimestampMap); 316 317 // The table list in backupInfo is good for both full backup and incremental backup. 318 // For incremental backup, it contains the incremental backup table set. 319 backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); 320 321 Map<TableName, Map<String, Long>> newTableSetTimestampMap = 322 backupManager.readLogTimestampMap(); 323 324 backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); 325 Long newStartCode = 326 BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); 327 backupManager.writeBackupStartCode(newStartCode); 328 329 List<byte[]> bulkLoadedRows = handleBulkLoad(backupInfo.getTableNames()); 330 331 // backup complete 332 completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); 333 334 backupManager.deleteBulkLoadedRows(bulkLoadedRows); 335 } catch (IOException e) { 336 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 337 BackupType.INCREMENTAL, conf); 338 throw new IOException(e); 339 } 340 } 341 342 protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { 343 try { 344 LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest); 345 // set overall backup phase: incremental_copy 346 backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); 347 // get incremental backup file list and prepare parms for DistCp 348 String[] strArr = new String[files.length + 1]; 349 System.arraycopy(files, 0, strArr, 0, files.length); 350 strArr[strArr.length - 1] = backupDest; 351 352 String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); 353 if (LOG.isDebugEnabled()) { 354 LOG.debug("Setting incremental copy HFiles job name to : " + jobname); 355 } 356 conf.set(JOB_NAME_CONF_KEY, jobname); 357 358 BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); 359 int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); 360 if (res != 0) { 361 LOG.error("Copy incremental HFile files failed with return code: " + res + "."); 362 throw new IOException( 363 "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest); 364 } 365 LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest 366 + " finished."); 367 } finally { 368 deleteBulkLoadDirectory(); 369 } 370 } 371 372 protected void deleteBulkLoadDirectory() throws IOException { 373 // delete original bulk load directory on method exit 374 Path path = getBulkOutputDir(); 375 FileSystem fs = FileSystem.get(path.toUri(), conf); 376 boolean result = fs.delete(path, true); 377 if (!result) { 378 LOG.warn("Could not delete " + path); 379 } 380 } 381 382 protected void convertWALsToHFiles() throws IOException { 383 // get incremental backup file list and prepare parameters for DistCp 384 List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); 385 // Get list of tables in incremental backup set 386 Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); 387 // filter missing files out (they have been copied by previous backups) 388 incrBackupFileList = filterMissingFiles(incrBackupFileList); 389 List<String> tableList = new ArrayList<String>(); 390 for (TableName table : tableSet) { 391 // Check if table exists 392 if (tableExists(table, conn)) { 393 tableList.add(table.getNameAsString()); 394 } else { 395 LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); 396 } 397 } 398 walToHFiles(incrBackupFileList, tableList); 399 400 } 401 402 protected boolean tableExists(TableName table, Connection conn) throws IOException { 403 try (Admin admin = conn.getAdmin()) { 404 return admin.tableExists(table); 405 } 406 } 407 408 protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException { 409 Tool player = new WALPlayer(); 410 411 // Player reads all files in arbitrary directory structure and creates 412 // a Map task for each file. We use ';' as separator 413 // because WAL file names contains ',' 414 String dirs = StringUtils.join(dirPaths, ';'); 415 String jobname = "Incremental_Backup-" + backupId; 416 417 Path bulkOutputPath = getBulkOutputDir(); 418 conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 419 conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 420 conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 421 conf.set(JOB_NAME_CONF_KEY, jobname); 422 String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; 423 424 try { 425 player.setConf(conf); 426 int result = player.run(playerArgs); 427 if (result != 0) { 428 throw new IOException("WAL Player failed"); 429 } 430 conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); 431 conf.unset(JOB_NAME_CONF_KEY); 432 } catch (IOException e) { 433 throw e; 434 } catch (Exception ee) { 435 throw new IOException("Can not convert from directory " + dirs 436 + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); 437 } 438 } 439 440 protected Path getBulkOutputDirForTable(TableName table) { 441 Path tablePath = getBulkOutputDir(); 442 tablePath = new Path(tablePath, table.getNamespaceAsString()); 443 tablePath = new Path(tablePath, table.getQualifierAsString()); 444 return new Path(tablePath, "data"); 445 } 446 447 protected Path getBulkOutputDir() { 448 String backupId = backupInfo.getBackupId(); 449 Path path = new Path(backupInfo.getBackupRootDir()); 450 path = new Path(path, ".tmp"); 451 path = new Path(path, backupId); 452 return path; 453 } 454 455 private Map<TableName, String> getFullBackupIds() throws IOException { 456 // Ancestors are stored from newest to oldest, so we can iterate backwards 457 // in order to populate our backupId map with the most recent full backup 458 // for a given table 459 List<BackupManifest.BackupImage> images = getAncestors(backupInfo); 460 Map<TableName, String> results = new HashMap<>(); 461 for (int i = images.size() - 1; i >= 0; i--) { 462 BackupManifest.BackupImage image = images.get(i); 463 if (image.getType() != BackupType.FULL) { 464 continue; 465 } 466 467 for (TableName tn : image.getTableNames()) { 468 results.put(tn, image.getBackupId()); 469 } 470 } 471 return results; 472 } 473 474 /** 475 * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full 476 * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch 477 * is detected, a full table backup should be taken, rather than an incremental one 478 */ 479 private void verifyCfCompatibility(Set<TableName> tables, 480 Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException { 481 ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder = 482 ColumnFamilyMismatchException.newBuilder(); 483 try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { 484 for (TableName tn : tables) { 485 String backupId = tablesToFullBackupId.get(tn); 486 BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId); 487 488 ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies(); 489 String snapshotName = fullBackupInfo.getSnapshotName(tn); 490 Path root = HBackupFileSystem.getTableBackupPath(tn, 491 new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId()); 492 Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root); 493 494 FileSystem fs; 495 try { 496 fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf); 497 } catch (URISyntaxException e) { 498 throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e); 499 } 500 501 SnapshotProtos.SnapshotDescription snapshotDescription = 502 SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir); 503 SnapshotManifest manifest = 504 SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription); 505 506 ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies(); 507 if (!areCfsCompatible(currentCfs, backupCfs)) { 508 exBuilder.addMismatchedTable(tn, currentCfs, backupCfs); 509 } 510 } 511 } 512 513 ColumnFamilyMismatchException ex = exBuilder.build(); 514 if (!ex.getMismatchedTables().isEmpty()) { 515 throw ex; 516 } 517 } 518 519 private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs, 520 ColumnFamilyDescriptor[] backupCfs) { 521 if (currentCfs.length != backupCfs.length) { 522 return false; 523 } 524 525 for (int i = 0; i < backupCfs.length; i++) { 526 String currentCf = currentCfs[i].getNameAsString(); 527 String backupCf = backupCfs[i].getNameAsString(); 528 529 if (!currentCf.equals(backupCf)) { 530 return false; 531 } 532 } 533 534 return true; 535 } 536}