001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.Closeable; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.nio.charset.StandardCharsets; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.Objects; 036import java.util.Set; 037import java.util.TreeMap; 038import java.util.TreeSet; 039import java.util.stream.Collectors; 040import org.apache.commons.lang3.ArrayUtils; 041import org.apache.commons.lang3.StringUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.NamespaceDescriptor; 048import org.apache.hadoop.hbase.NamespaceExistException; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.TableExistsException; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.TableNotDisabledException; 053import org.apache.hadoop.hbase.backup.BackupInfo; 054import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 055import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 056import org.apache.hadoop.hbase.backup.BackupType; 057import org.apache.hadoop.hbase.backup.util.BackupUtils; 058import org.apache.hadoop.hbase.client.Admin; 059import org.apache.hadoop.hbase.client.BufferedMutator; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.SnapshotDescription; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.client.TableDescriptor; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 080import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 084 085/** 086 * This class provides API to access backup system table<br> 087 * Backup system table schema:<br> 088 * <p> 089 * <ul> 090 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 091 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 092 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 093 * include table; value=empty</li> 094 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 095 * timestamp]</li> 096 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 097 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 098 * name</li> 099 * </ul> 100 * </p> 101 */ 102@InterfaceAudience.Private 103public final class BackupSystemTable implements Closeable { 104 105 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 106 107 static class WALItem { 108 String backupId; 109 String walFile; 110 String backupRoot; 111 112 WALItem(String backupId, String walFile, String backupRoot) { 113 this.backupId = backupId; 114 this.walFile = walFile; 115 this.backupRoot = backupRoot; 116 } 117 118 public String getBackupId() { 119 return backupId; 120 } 121 122 public String getWalFile() { 123 return walFile; 124 } 125 126 public String getBackupRoot() { 127 return backupRoot; 128 } 129 130 @Override 131 public String toString() { 132 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 133 } 134 } 135 136 /** 137 * Backup system table (main) name 138 */ 139 private TableName tableName; 140 141 /** 142 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 143 * separate table because we have to isolate general backup operations: create, merge etc from 144 * activity of RegionObserver, which controls process of a bulk loading 145 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 146 */ 147 private TableName bulkLoadTableName; 148 149 /** 150 * Stores backup sessions (contexts) 151 */ 152 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 153 /** 154 * Stores other meta 155 */ 156 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 157 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 158 /** 159 * Connection to HBase cluster, shared among all instances 160 */ 161 private final Connection connection; 162 163 private final static String BACKUP_INFO_PREFIX = "session:"; 164 private final static String START_CODE_ROW = "startcode:"; 165 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 166 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 167 168 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 169 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 170 171 private final static String INCR_BACKUP_SET = "incrbackupset:"; 172 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 173 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 174 175 private final static String BULK_LOAD_PREFIX = "bulk:"; 176 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 177 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 178 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 179 180 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 181 final static byte[] FAM_COL = Bytes.toBytes("fam"); 182 final static byte[] PATH_COL = Bytes.toBytes("path"); 183 184 private final static String SET_KEY_PREFIX = "backupset:"; 185 186 // separator between BULK_LOAD_PREFIX and ordinals 187 private final static String BLK_LD_DELIM = ":"; 188 private final static byte[] EMPTY_VALUE = new byte[] {}; 189 190 // Safe delimiter in a string 191 private final static String NULL = "\u0000"; 192 193 public BackupSystemTable(Connection conn) throws IOException { 194 this.connection = conn; 195 Configuration conf = this.connection.getConfiguration(); 196 tableName = BackupSystemTable.getTableName(conf); 197 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 198 checkSystemTable(); 199 } 200 201 private void checkSystemTable() throws IOException { 202 try (Admin admin = connection.getAdmin()) { 203 verifyNamespaceExists(admin); 204 Configuration conf = connection.getConfiguration(); 205 if (!admin.tableExists(tableName)) { 206 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 207 createSystemTable(admin, backupHTD); 208 } 209 ensureTableEnabled(admin, tableName); 210 if (!admin.tableExists(bulkLoadTableName)) { 211 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 212 createSystemTable(admin, blHTD); 213 } 214 ensureTableEnabled(admin, bulkLoadTableName); 215 waitForSystemTable(admin, tableName); 216 waitForSystemTable(admin, bulkLoadTableName); 217 } 218 } 219 220 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 221 try { 222 admin.createTable(descriptor); 223 } catch (TableExistsException e) { 224 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 225 // so may be subject to race conditions where one caller succeeds in creating the 226 // table and others fail because it now exists 227 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 228 } 229 } 230 231 private void verifyNamespaceExists(Admin admin) throws IOException { 232 String namespaceName = tableName.getNamespaceAsString(); 233 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 234 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 235 boolean exists = false; 236 for (NamespaceDescriptor nsd : list) { 237 if (nsd.getName().equals(ns.getName())) { 238 exists = true; 239 break; 240 } 241 } 242 if (!exists) { 243 try { 244 admin.createNamespace(ns); 245 } catch (NamespaceExistException e) { 246 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 247 // so may be subject to race conditions where one caller succeeds in creating the 248 // namespace and others fail because it now exists 249 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 250 } 251 } 252 } 253 254 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 255 // Return fast if the table is available and avoid a log message 256 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 257 return; 258 } 259 long TIMEOUT = 60000; 260 long startTime = EnvironmentEdgeManager.currentTime(); 261 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 262 tableName); 263 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 264 try { 265 Thread.sleep(100); 266 } catch (InterruptedException e) { 267 throw (IOException) new InterruptedIOException().initCause(e); 268 } 269 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 270 throw new IOException( 271 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 272 } 273 } 274 LOG.debug("Backup table {} exists and available", tableName); 275 } 276 277 @Override 278 public void close() { 279 // do nothing 280 } 281 282 /** 283 * Updates status (state) of a backup session in backup system table table 284 * @param info backup info 285 * @throws IOException exception 286 */ 287 public void updateBackupInfo(BackupInfo info) throws IOException { 288 if (LOG.isTraceEnabled()) { 289 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 290 + " set status=" + info.getState()); 291 } 292 try (Table table = connection.getTable(tableName)) { 293 Put put = createPutForBackupInfo(info); 294 table.put(put); 295 } 296 } 297 298 /* 299 * @param backupId the backup Id 300 * @return Map of rows to path of bulk loaded hfile 301 */ 302 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 303 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 304 try (Table table = connection.getTable(bulkLoadTableName); 305 ResultScanner scanner = table.getScanner(scan)) { 306 Result res = null; 307 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 308 while ((res = scanner.next()) != null) { 309 res.advance(); 310 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 311 for (Cell cell : res.listCells()) { 312 if ( 313 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 314 BackupSystemTable.PATH_COL.length) == 0 315 ) { 316 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 317 } 318 } 319 } 320 return map; 321 } 322 } 323 324 /** 325 * Deletes backup status from backup system table table 326 * @param backupId backup id 327 * @throws IOException exception 328 */ 329 public void deleteBackupInfo(String backupId) throws IOException { 330 if (LOG.isTraceEnabled()) { 331 LOG.trace("delete backup status in backup system table for " + backupId); 332 } 333 try (Table table = connection.getTable(tableName)) { 334 Delete del = createDeleteForBackupInfo(backupId); 335 table.delete(del); 336 } 337 } 338 339 /** 340 * Registers a bulk load. 341 * @param tableName table name 342 * @param region the region receiving hfile 343 * @param cfToHfilePath column family and associated hfiles 344 */ 345 public void registerBulkLoad(TableName tableName, byte[] region, 346 Map<byte[], List<Path>> cfToHfilePath) throws IOException { 347 if (LOG.isDebugEnabled()) { 348 LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, 349 cfToHfilePath.size()); 350 } 351 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 352 List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); 353 bufferedMutator.mutate(puts); 354 LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName); 355 } 356 } 357 358 /** 359 * Removes entries from the table that tracks all bulk loaded hfiles. 360 * @param rows the row keys of the entries to be deleted 361 */ 362 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 363 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 364 List<Delete> deletes = new ArrayList<>(); 365 for (byte[] row : rows) { 366 Delete del = new Delete(row); 367 deletes.add(del); 368 LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row)); 369 } 370 bufferedMutator.mutate(deletes); 371 LOG.debug("Deleted {} bulk load entries.", rows.size()); 372 } 373 } 374 375 /** 376 * Reads all registered bulk loads. 377 */ 378 public List<BulkLoad> readBulkloadRows() throws IOException { 379 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null); 380 return processBulkLoadRowScan(scan); 381 } 382 383 /** 384 * Reads the registered bulk loads for the given tables. 385 */ 386 public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException { 387 List<BulkLoad> result = new ArrayList<>(); 388 for (TableName table : tableList) { 389 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); 390 result.addAll(processBulkLoadRowScan(scan)); 391 } 392 return result; 393 } 394 395 private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException { 396 List<BulkLoad> result = new ArrayList<>(); 397 try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); 398 ResultScanner scanner = bulkLoadTable.getScanner(scan)) { 399 Result res; 400 while ((res = scanner.next()) != null) { 401 res.advance(); 402 TableName table = null; 403 String fam = null; 404 String path = null; 405 String region = null; 406 byte[] row = null; 407 for (Cell cell : res.listCells()) { 408 row = CellUtil.cloneRow(cell); 409 String rowStr = Bytes.toString(row); 410 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 411 if ( 412 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 413 BackupSystemTable.TBL_COL.length) == 0 414 ) { 415 table = TableName.valueOf(CellUtil.cloneValue(cell)); 416 } else if ( 417 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 418 BackupSystemTable.FAM_COL.length) == 0 419 ) { 420 fam = Bytes.toString(CellUtil.cloneValue(cell)); 421 } else if ( 422 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 423 BackupSystemTable.PATH_COL.length) == 0 424 ) { 425 path = Bytes.toString(CellUtil.cloneValue(cell)); 426 } 427 } 428 result.add(new BulkLoad(table, region, fam, path, row)); 429 LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path); 430 } 431 } 432 return result; 433 } 434 435 /** 436 * Reads backup status object (instance of backup info) from backup system table table 437 * @param backupId backup id 438 * @return Current status of backup session or null 439 */ 440 public BackupInfo readBackupInfo(String backupId) throws IOException { 441 if (LOG.isTraceEnabled()) { 442 LOG.trace("read backup status from backup system table for: " + backupId); 443 } 444 445 try (Table table = connection.getTable(tableName)) { 446 Get get = createGetForBackupInfo(backupId); 447 Result res = table.get(get); 448 if (res.isEmpty()) { 449 return null; 450 } 451 return resultToBackupInfo(res); 452 } 453 } 454 455 /** 456 * Read the last backup start code (timestamp) of last successful backup. Will return null if 457 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 458 * there is no successful backup completed so far. 459 * @param backupRoot directory path to backup destination 460 * @return the timestamp of last successful backup 461 * @throws IOException exception 462 */ 463 public String readBackupStartCode(String backupRoot) throws IOException { 464 LOG.trace("read backup start code from backup system table"); 465 466 try (Table table = connection.getTable(tableName)) { 467 Get get = createGetForStartCode(backupRoot); 468 Result res = table.get(get); 469 if (res.isEmpty()) { 470 return null; 471 } 472 Cell cell = res.listCells().get(0); 473 byte[] val = CellUtil.cloneValue(cell); 474 if (val.length == 0) { 475 return null; 476 } 477 return new String(val, StandardCharsets.UTF_8); 478 } 479 } 480 481 /** 482 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 483 * @param startCode start code 484 * @param backupRoot root directory path to backup 485 * @throws IOException exception 486 */ 487 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 488 if (LOG.isTraceEnabled()) { 489 LOG.trace("write backup start code to backup system table " + startCode); 490 } 491 try (Table table = connection.getTable(tableName)) { 492 Put put = createPutForStartCode(startCode.toString(), backupRoot); 493 table.put(put); 494 } 495 } 496 497 /** 498 * Exclusive operations are: create, delete, merge 499 * @throws IOException if a table operation fails or an active backup exclusive operation is 500 * already underway 501 */ 502 public void startBackupExclusiveOperation() throws IOException { 503 LOG.debug("Start new backup exclusive operation"); 504 505 try (Table table = connection.getTable(tableName)) { 506 Put put = createPutForStartBackupSession(); 507 // First try to put if row does not exist 508 if ( 509 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 510 .ifNotExists().thenPut(put) 511 ) { 512 // Row exists, try to put if value == ACTIVE_SESSION_NO 513 if ( 514 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 515 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 516 ) { 517 throw new ExclusiveOperationException(); 518 } 519 } 520 } 521 } 522 523 private Put createPutForStartBackupSession() { 524 Put put = new Put(ACTIVE_SESSION_ROW); 525 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 526 return put; 527 } 528 529 public void finishBackupExclusiveOperation() throws IOException { 530 LOG.debug("Finish backup exclusive operation"); 531 532 try (Table table = connection.getTable(tableName)) { 533 Put put = createPutForStopBackupSession(); 534 if ( 535 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 536 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 537 ) { 538 throw new IOException("There is no active backup exclusive operation"); 539 } 540 } 541 } 542 543 private Put createPutForStopBackupSession() { 544 Put put = new Put(ACTIVE_SESSION_ROW); 545 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 546 return put; 547 } 548 549 /** 550 * Get the Region Servers log information after the last log roll from backup system table. 551 * @param backupRoot root directory path to backup 552 * @return RS log info 553 * @throws IOException exception 554 */ 555 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 556 throws IOException { 557 LOG.trace("read region server last roll log result to backup system table"); 558 559 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 560 561 try (Table table = connection.getTable(tableName); 562 ResultScanner scanner = table.getScanner(scan)) { 563 Result res; 564 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 565 while ((res = scanner.next()) != null) { 566 res.advance(); 567 Cell cell = res.current(); 568 byte[] row = CellUtil.cloneRow(cell); 569 String server = getServerNameForReadRegionServerLastLogRollResult(row); 570 byte[] data = CellUtil.cloneValue(cell); 571 rsTimestampMap.put(server, Bytes.toLong(data)); 572 } 573 return rsTimestampMap; 574 } 575 } 576 577 /** 578 * Writes Region Server last roll log result (timestamp) to backup system table table 579 * @param server Region Server name 580 * @param ts last log timestamp 581 * @param backupRoot root directory path to backup 582 * @throws IOException exception 583 */ 584 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 585 throws IOException { 586 LOG.trace("write region server last roll log result to backup system table"); 587 588 try (Table table = connection.getTable(tableName)) { 589 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 590 table.put(put); 591 } 592 } 593 594 /** 595 * Get all completed backup information (in desc order by time) 596 * @param onlyCompleted true, if only successfully completed sessions 597 * @return history info of BackupCompleteData 598 * @throws IOException exception 599 */ 600 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { 601 LOG.trace("get backup history from backup system table"); 602 603 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; 604 ArrayList<BackupInfo> list = getBackupInfos(state); 605 return BackupUtils.sortHistoryListDesc(list); 606 } 607 608 /** 609 * Get all backups history 610 * @return list of backup info 611 * @throws IOException if getting the backup history fails 612 */ 613 public List<BackupInfo> getBackupHistory() throws IOException { 614 return getBackupHistory(false); 615 } 616 617 /** 618 * Get first n backup history records 619 * @param n number of records, if n== -1 - max number is ignored 620 * @return list of records 621 * @throws IOException if getting the backup history fails 622 */ 623 public List<BackupInfo> getHistory(int n) throws IOException { 624 List<BackupInfo> history = getBackupHistory(); 625 if (n == -1 || history.size() <= n) { 626 return history; 627 } 628 return Collections.unmodifiableList(history.subList(0, n)); 629 } 630 631 /** 632 * Get backup history records filtered by list of filters. 633 * @param n max number of records, if n == -1 , then max number is ignored 634 * @param filters list of filters 635 * @return backup records 636 * @throws IOException if getting the backup history fails 637 */ 638 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { 639 if (filters.length == 0) { 640 return getHistory(n); 641 } 642 643 List<BackupInfo> history = getBackupHistory(); 644 List<BackupInfo> result = new ArrayList<>(); 645 for (BackupInfo bi : history) { 646 if (n >= 0 && result.size() == n) { 647 break; 648 } 649 650 boolean passed = true; 651 for (int i = 0; i < filters.length; i++) { 652 if (!filters[i].apply(bi)) { 653 passed = false; 654 break; 655 } 656 } 657 if (passed) { 658 result.add(bi); 659 } 660 } 661 return result; 662 } 663 664 /** 665 * Retrieve all table names that are part of any known backup 666 */ 667 public Set<TableName> getTablesIncludedInBackups() throws IOException { 668 Set<TableName> names = new HashSet<>(); 669 List<BackupInfo> infos = getBackupHistory(true); 670 for (BackupInfo info : infos) { 671 // Incremental backups have the same tables as the preceding full backups 672 if (info.getType() == BackupType.FULL) { 673 names.addAll(info.getTableNames()); 674 } 675 } 676 return names; 677 } 678 679 /** 680 * Get history for backup destination 681 * @param backupRoot backup destination path 682 * @return List of backup info 683 * @throws IOException if getting the backup history fails 684 */ 685 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { 686 ArrayList<BackupInfo> history = getBackupHistory(false); 687 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 688 BackupInfo info = iterator.next(); 689 if (!backupRoot.equals(info.getBackupRootDir())) { 690 iterator.remove(); 691 } 692 } 693 return history; 694 } 695 696 /** 697 * Get history for a table 698 * @param name table name 699 * @return history for a table 700 * @throws IOException if getting the backup history fails 701 */ 702 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { 703 List<BackupInfo> history = getBackupHistory(); 704 List<BackupInfo> tableHistory = new ArrayList<>(); 705 for (BackupInfo info : history) { 706 List<TableName> tables = info.getTableNames(); 707 if (tables.contains(name)) { 708 tableHistory.add(info); 709 } 710 } 711 return tableHistory; 712 } 713 714 /** 715 * Goes through all backup history corresponding to the provided root folder, and collects all 716 * backup info mentioning each of the provided tables. 717 * @param set the tables for which to collect the {@code BackupInfo} 718 * @param backupRoot backup destination path to retrieve backup history for 719 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 720 * least one {@code BackupInfo} 721 * @throws IOException if getting the backup history fails 722 */ 723 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 724 String backupRoot) throws IOException { 725 List<BackupInfo> history = getBackupHistory(backupRoot); 726 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 727 for (BackupInfo info : history) { 728 List<TableName> tables = info.getTableNames(); 729 for (TableName tableName : tables) { 730 if (set.contains(tableName)) { 731 List<BackupInfo> list = 732 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 733 list.add(info); 734 } 735 } 736 } 737 return tableHistoryMap; 738 } 739 740 /** 741 * Get all backup sessions with a given state (in descending order by time) 742 * @param state backup session state 743 * @return history info of backup info objects 744 * @throws IOException exception 745 */ 746 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { 747 LOG.trace("get backup infos from backup system table"); 748 749 Scan scan = createScanForBackupHistory(); 750 ArrayList<BackupInfo> list = new ArrayList<>(); 751 752 try (Table table = connection.getTable(tableName); 753 ResultScanner scanner = table.getScanner(scan)) { 754 Result res; 755 while ((res = scanner.next()) != null) { 756 res.advance(); 757 BackupInfo context = cellToBackupInfo(res.current()); 758 if (state != BackupState.ANY && context.getState() != state) { 759 continue; 760 } 761 list.add(context); 762 } 763 return list; 764 } 765 } 766 767 /** 768 * Write the current timestamps for each regionserver to backup system table after a successful 769 * full or incremental backup. The saved timestamp is of the last log file that was backed up 770 * already. 771 * @param tables tables 772 * @param newTimestamps timestamps 773 * @param backupRoot root directory path to backup 774 * @throws IOException exception 775 */ 776 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 777 String backupRoot) throws IOException { 778 if (LOG.isTraceEnabled()) { 779 LOG.trace("write RS log time stamps to backup system table for tables [" 780 + StringUtils.join(tables, ",") + "]"); 781 } 782 List<Put> puts = new ArrayList<>(); 783 for (TableName table : tables) { 784 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 785 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 786 puts.add(put); 787 } 788 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 789 bufferedMutator.mutate(puts); 790 } 791 } 792 793 /** 794 * Read the timestamp for each region server log after the last successful backup. Each table has 795 * its own set of the timestamps. The info is stored for each table as a concatenated string of 796 * rs->timestapmp 797 * @param backupRoot root directory path to backup 798 * @return the timestamp for each region server. key: tableName value: 799 * RegionServer,PreviousTimeStamp 800 * @throws IOException exception 801 */ 802 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 803 throws IOException { 804 if (LOG.isTraceEnabled()) { 805 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 806 } 807 808 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 809 810 Scan scan = createScanForReadLogTimestampMap(backupRoot); 811 try (Table table = connection.getTable(tableName); 812 ResultScanner scanner = table.getScanner(scan)) { 813 Result res; 814 while ((res = scanner.next()) != null) { 815 res.advance(); 816 Cell cell = res.current(); 817 byte[] row = CellUtil.cloneRow(cell); 818 String tabName = getTableNameForReadLogTimestampMap(row); 819 TableName tn = TableName.valueOf(tabName); 820 byte[] data = CellUtil.cloneValue(cell); 821 if (data == null) { 822 throw new IOException("Data of last backup data from backup system table " 823 + "is empty. Create a backup first."); 824 } 825 if (data != null && data.length > 0) { 826 HashMap<String, Long> lastBackup = 827 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 828 tableTimestampMap.put(tn, lastBackup); 829 } 830 } 831 return tableTimestampMap; 832 } 833 } 834 835 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 836 Map<String, Long> map) { 837 BackupProtos.TableServerTimestamp.Builder tstBuilder = 838 BackupProtos.TableServerTimestamp.newBuilder(); 839 tstBuilder 840 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 841 842 for (Entry<String, Long> entry : map.entrySet()) { 843 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 844 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 845 ServerName sn = ServerName.parseServerName(entry.getKey()); 846 snBuilder.setHostName(sn.getHostname()); 847 snBuilder.setPort(sn.getPort()); 848 builder.setServerName(snBuilder.build()); 849 builder.setTimestamp(entry.getValue()); 850 tstBuilder.addServerTimestamp(builder.build()); 851 } 852 853 return tstBuilder.build(); 854 } 855 856 private HashMap<String, Long> 857 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 858 859 HashMap<String, Long> map = new HashMap<>(); 860 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 861 for (BackupProtos.ServerTimestamp st : list) { 862 ServerName sn = 863 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 864 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 865 } 866 return map; 867 } 868 869 /** 870 * Return the current tables covered by incremental backup. 871 * @param backupRoot root directory path to backup 872 * @return set of tableNames 873 * @throws IOException exception 874 */ 875 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 876 LOG.trace("get incremental backup table set from backup system table"); 877 878 TreeSet<TableName> set = new TreeSet<>(); 879 880 try (Table table = connection.getTable(tableName)) { 881 Get get = createGetForIncrBackupTableSet(backupRoot); 882 Result res = table.get(get); 883 if (res.isEmpty()) { 884 return set; 885 } 886 List<Cell> cells = res.listCells(); 887 for (Cell cell : cells) { 888 // qualifier = table name - we use table names as qualifiers 889 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 890 } 891 return set; 892 } 893 } 894 895 /** 896 * Add tables to global incremental backup set 897 * @param tables set of tables 898 * @param backupRoot root directory path to backup 899 * @throws IOException exception 900 */ 901 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 902 throws IOException { 903 if (LOG.isTraceEnabled()) { 904 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 905 + " tables [" + StringUtils.join(tables, " ") + "]"); 906 } 907 if (LOG.isDebugEnabled()) { 908 tables.forEach(table -> LOG.debug(Objects.toString(table))); 909 } 910 try (Table table = connection.getTable(tableName)) { 911 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 912 table.put(put); 913 } 914 } 915 916 /** 917 * Deletes incremental backup set for a backup destination 918 * @param backupRoot backup root 919 */ 920 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 921 if (LOG.isTraceEnabled()) { 922 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 923 } 924 try (Table table = connection.getTable(tableName)) { 925 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 926 table.delete(delete); 927 } 928 } 929 930 /** 931 * Checks if we have at least one backup session in backup system table This API is used by 932 * BackupLogCleaner 933 * @return true, if - at least one session exists in backup system table table 934 * @throws IOException exception 935 */ 936 public boolean hasBackupSessions() throws IOException { 937 LOG.trace("Has backup sessions from backup system table"); 938 939 boolean result = false; 940 Scan scan = createScanForBackupHistory(); 941 scan.setCaching(1); 942 try (Table table = connection.getTable(tableName); 943 ResultScanner scanner = table.getScanner(scan)) { 944 if (scanner.next() != null) { 945 result = true; 946 } 947 return result; 948 } 949 } 950 951 /** 952 * BACKUP SETS 953 */ 954 955 /** 956 * Get backup set list 957 * @return backup set list 958 * @throws IOException if a table or scanner operation fails 959 */ 960 public List<String> listBackupSets() throws IOException { 961 LOG.trace("Backup set list"); 962 963 List<String> list = new ArrayList<>(); 964 try (Table table = connection.getTable(tableName)) { 965 Scan scan = createScanForBackupSetList(); 966 scan.readVersions(1); 967 try (ResultScanner scanner = table.getScanner(scan)) { 968 Result res; 969 while ((res = scanner.next()) != null) { 970 res.advance(); 971 list.add(cellKeyToBackupSetName(res.current())); 972 } 973 return list; 974 } 975 } 976 } 977 978 /** 979 * Get backup set description (list of tables) 980 * @param name set's name 981 * @return list of tables in a backup set 982 * @throws IOException if a table operation fails 983 */ 984 public List<TableName> describeBackupSet(String name) throws IOException { 985 if (LOG.isTraceEnabled()) { 986 LOG.trace(" Backup set describe: " + name); 987 } 988 try (Table table = connection.getTable(tableName)) { 989 Get get = createGetForBackupSet(name); 990 Result res = table.get(get); 991 if (res.isEmpty()) { 992 return null; 993 } 994 res.advance(); 995 String[] tables = cellValueToBackupSet(res.current()); 996 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 997 .collect(Collectors.toList()); 998 } 999 } 1000 1001 /** 1002 * Add backup set (list of tables) 1003 * @param name set name 1004 * @param newTables list of tables, comma-separated 1005 * @throws IOException if a table operation fails 1006 */ 1007 public void addToBackupSet(String name, String[] newTables) throws IOException { 1008 if (LOG.isTraceEnabled()) { 1009 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 1010 } 1011 String[] union = null; 1012 try (Table table = connection.getTable(tableName)) { 1013 Get get = createGetForBackupSet(name); 1014 Result res = table.get(get); 1015 if (res.isEmpty()) { 1016 union = newTables; 1017 } else { 1018 res.advance(); 1019 String[] tables = cellValueToBackupSet(res.current()); 1020 union = merge(tables, newTables); 1021 } 1022 Put put = createPutForBackupSet(name, union); 1023 table.put(put); 1024 } 1025 } 1026 1027 /** 1028 * Remove tables from backup set (list of tables) 1029 * @param name set name 1030 * @param toRemove list of tables 1031 * @throws IOException if a table operation or deleting the backup set fails 1032 */ 1033 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 1034 if (LOG.isTraceEnabled()) { 1035 LOG.trace( 1036 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 1037 } 1038 String[] disjoint; 1039 String[] tables; 1040 try (Table table = connection.getTable(tableName)) { 1041 Get get = createGetForBackupSet(name); 1042 Result res = table.get(get); 1043 if (res.isEmpty()) { 1044 LOG.warn("Backup set '" + name + "' not found."); 1045 return; 1046 } else { 1047 res.advance(); 1048 tables = cellValueToBackupSet(res.current()); 1049 disjoint = disjoin(tables, toRemove); 1050 } 1051 if (disjoint.length > 0 && disjoint.length != tables.length) { 1052 Put put = createPutForBackupSet(name, disjoint); 1053 table.put(put); 1054 } else if (disjoint.length == tables.length) { 1055 LOG.warn("Backup set '" + name + "' does not contain tables [" 1056 + StringUtils.join(toRemove, " ") + "]"); 1057 } else { // disjoint.length == 0 and tables.length >0 1058 // Delete backup set 1059 LOG.info("Backup set '" + name + "' is empty. Deleting."); 1060 deleteBackupSet(name); 1061 } 1062 } 1063 } 1064 1065 private String[] merge(String[] existingTables, String[] newTables) { 1066 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1067 tables.addAll(Arrays.asList(newTables)); 1068 return tables.toArray(new String[0]); 1069 } 1070 1071 private String[] disjoin(String[] existingTables, String[] toRemove) { 1072 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1073 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 1074 return tables.toArray(new String[0]); 1075 } 1076 1077 /** 1078 * Delete backup set 1079 * @param name set's name 1080 * @throws IOException if getting or deleting the table fails 1081 */ 1082 public void deleteBackupSet(String name) throws IOException { 1083 if (LOG.isTraceEnabled()) { 1084 LOG.trace(" Backup set delete: " + name); 1085 } 1086 try (Table table = connection.getTable(tableName)) { 1087 Delete del = createDeleteForBackupSet(name); 1088 table.delete(del); 1089 } 1090 } 1091 1092 /** 1093 * Get backup system table descriptor 1094 * @return table's descriptor 1095 */ 1096 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1097 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1098 1099 ColumnFamilyDescriptorBuilder colBuilder = 1100 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1101 1102 colBuilder.setMaxVersions(1); 1103 Configuration config = HBaseConfiguration.create(); 1104 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1105 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1106 colBuilder.setTimeToLive(ttl); 1107 1108 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1109 builder.setColumnFamily(colSessionsDesc); 1110 1111 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1112 colBuilder.setTimeToLive(ttl); 1113 builder.setColumnFamily(colBuilder.build()); 1114 return builder.build(); 1115 } 1116 1117 public static TableName getTableName(Configuration conf) { 1118 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1119 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1120 return TableName.valueOf(name); 1121 } 1122 1123 public static String getTableNameAsString(Configuration conf) { 1124 return getTableName(conf).getNameAsString(); 1125 } 1126 1127 public static String getSnapshotName(Configuration conf) { 1128 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1129 } 1130 1131 /** 1132 * Get backup system table descriptor 1133 * @return table's descriptor 1134 */ 1135 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1136 TableDescriptorBuilder builder = 1137 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1138 1139 ColumnFamilyDescriptorBuilder colBuilder = 1140 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1141 colBuilder.setMaxVersions(1); 1142 Configuration config = HBaseConfiguration.create(); 1143 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1144 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1145 colBuilder.setTimeToLive(ttl); 1146 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1147 builder.setColumnFamily(colSessionsDesc); 1148 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1149 colBuilder.setTimeToLive(ttl); 1150 builder.setColumnFamily(colBuilder.build()); 1151 return builder.build(); 1152 } 1153 1154 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1155 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1156 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1157 return TableName.valueOf(name); 1158 } 1159 1160 /** 1161 * Creates Put operation for a given backup info object 1162 * @param context backup info 1163 * @return put operation 1164 * @throws IOException exception 1165 */ 1166 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1167 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1168 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1169 context.toByteArray()); 1170 return put; 1171 } 1172 1173 /** 1174 * Creates Get operation for a given backup id 1175 * @param backupId backup's ID 1176 * @return get operation 1177 * @throws IOException exception 1178 */ 1179 private Get createGetForBackupInfo(String backupId) throws IOException { 1180 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1181 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1182 get.readVersions(1); 1183 return get; 1184 } 1185 1186 /** 1187 * Creates Delete operation for a given backup id 1188 * @param backupId backup's ID 1189 * @return delete operation 1190 */ 1191 private Delete createDeleteForBackupInfo(String backupId) { 1192 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1193 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1194 return del; 1195 } 1196 1197 /** 1198 * Converts Result to BackupInfo 1199 * @param res HBase result 1200 * @return backup info instance 1201 * @throws IOException exception 1202 */ 1203 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1204 res.advance(); 1205 Cell cell = res.current(); 1206 return cellToBackupInfo(cell); 1207 } 1208 1209 /** 1210 * Creates Get operation to retrieve start code from backup system table 1211 * @return get operation 1212 * @throws IOException exception 1213 */ 1214 private Get createGetForStartCode(String rootPath) throws IOException { 1215 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1216 get.addFamily(BackupSystemTable.META_FAMILY); 1217 get.readVersions(1); 1218 return get; 1219 } 1220 1221 /** 1222 * Creates Put operation to store start code to backup system table 1223 * @return put operation 1224 */ 1225 private Put createPutForStartCode(String startCode, String rootPath) { 1226 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1227 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1228 Bytes.toBytes(startCode)); 1229 return put; 1230 } 1231 1232 /** 1233 * Creates Get to retrieve incremental backup table set from backup system table 1234 * @return get operation 1235 * @throws IOException exception 1236 */ 1237 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1238 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1239 get.addFamily(BackupSystemTable.META_FAMILY); 1240 get.readVersions(1); 1241 return get; 1242 } 1243 1244 /** 1245 * Creates Put to store incremental backup table set 1246 * @param tables tables 1247 * @return put operation 1248 */ 1249 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1250 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1251 for (TableName table : tables) { 1252 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1253 EMPTY_VALUE); 1254 } 1255 return put; 1256 } 1257 1258 /** 1259 * Creates Delete for incremental backup table set 1260 * @param backupRoot backup root 1261 * @return delete operation 1262 */ 1263 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1264 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1265 delete.addFamily(BackupSystemTable.META_FAMILY); 1266 return delete; 1267 } 1268 1269 /** 1270 * Creates Scan operation to load backup history 1271 * @return scan operation 1272 */ 1273 private Scan createScanForBackupHistory() { 1274 Scan scan = new Scan(); 1275 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1276 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1277 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1278 scan.withStartRow(startRow); 1279 scan.withStopRow(stopRow); 1280 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1281 scan.readVersions(1); 1282 return scan; 1283 } 1284 1285 /** 1286 * Converts cell to backup info instance. 1287 * @param current current cell 1288 * @return backup backup info instance 1289 * @throws IOException exception 1290 */ 1291 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1292 byte[] data = CellUtil.cloneValue(current); 1293 return BackupInfo.fromByteArray(data); 1294 } 1295 1296 /** 1297 * Creates Put to write RS last roll log timestamp map 1298 * @param table table 1299 * @param smap map, containing RS:ts 1300 * @return put operation 1301 */ 1302 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1303 String backupRoot) { 1304 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1305 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1306 return put; 1307 } 1308 1309 /** 1310 * Creates Scan to load table-> { RS -> ts} map of maps 1311 * @return scan operation 1312 */ 1313 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1314 Scan scan = new Scan(); 1315 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1316 scan.addFamily(BackupSystemTable.META_FAMILY); 1317 1318 return scan; 1319 } 1320 1321 /** 1322 * Get table name from rowkey 1323 * @param cloneRow rowkey 1324 * @return table name 1325 */ 1326 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1327 String s = Bytes.toString(cloneRow); 1328 int index = s.lastIndexOf(NULL); 1329 return s.substring(index + 1); 1330 } 1331 1332 /** 1333 * Creates Put to store RS last log result 1334 * @param server server name 1335 * @param timestamp log roll result (timestamp) 1336 * @return put operation 1337 */ 1338 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1339 String backupRoot) { 1340 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1341 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1342 Bytes.toBytes(timestamp)); 1343 return put; 1344 } 1345 1346 /** 1347 * Creates Scan operation to load last RS log roll results 1348 * @return scan operation 1349 */ 1350 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1351 Scan scan = new Scan(); 1352 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1353 scan.addFamily(BackupSystemTable.META_FAMILY); 1354 scan.readVersions(1); 1355 1356 return scan; 1357 } 1358 1359 /** 1360 * Get server's name from rowkey 1361 * @param row rowkey 1362 * @return server's name 1363 */ 1364 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1365 String s = Bytes.toString(row); 1366 int index = s.lastIndexOf(NULL); 1367 return s.substring(index + 1); 1368 } 1369 1370 /** 1371 * Creates Put's for bulk loads. 1372 */ 1373 private static List<Put> createPutForBulkLoad(TableName table, byte[] region, 1374 Map<byte[], List<Path>> columnFamilyToHFilePaths) { 1375 List<Put> puts = new ArrayList<>(); 1376 for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) { 1377 for (Path path : entry.getValue()) { 1378 String file = path.toString(); 1379 int lastSlash = file.lastIndexOf("/"); 1380 String filename = file.substring(lastSlash + 1); 1381 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1382 Bytes.toString(region), BLK_LD_DELIM, filename)); 1383 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1384 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1385 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1386 puts.add(put); 1387 LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); 1388 } 1389 } 1390 return puts; 1391 } 1392 1393 public static void snapshot(Connection conn) throws IOException { 1394 try (Admin admin = conn.getAdmin()) { 1395 Configuration conf = conn.getConfiguration(); 1396 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1397 } 1398 } 1399 1400 public static void restoreFromSnapshot(Connection conn) throws IOException { 1401 Configuration conf = conn.getConfiguration(); 1402 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1403 try (Admin admin = conn.getAdmin()) { 1404 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1405 if (snapshotExists(admin, snapshotName)) { 1406 admin.disableTable(BackupSystemTable.getTableName(conf)); 1407 admin.restoreSnapshot(snapshotName); 1408 admin.enableTable(BackupSystemTable.getTableName(conf)); 1409 LOG.debug("Done restoring backup system table"); 1410 } else { 1411 // Snapshot does not exists, i.e completeBackup failed after 1412 // deleting backup system table snapshot 1413 // In this case we log WARN and proceed 1414 LOG.warn( 1415 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1416 } 1417 } 1418 } 1419 1420 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1421 List<SnapshotDescription> list = admin.listSnapshots(); 1422 for (SnapshotDescription desc : list) { 1423 if (desc.getName().equals(snapshotName)) { 1424 return true; 1425 } 1426 } 1427 return false; 1428 } 1429 1430 public static boolean snapshotExists(Connection conn) throws IOException { 1431 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1432 } 1433 1434 public static void deleteSnapshot(Connection conn) throws IOException { 1435 Configuration conf = conn.getConfiguration(); 1436 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1437 try (Admin admin = conn.getAdmin()) { 1438 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1439 if (snapshotExists(admin, snapshotName)) { 1440 admin.deleteSnapshot(snapshotName); 1441 LOG.debug("Done deleting backup system table snapshot"); 1442 } else { 1443 LOG.error("Snapshot " + snapshotName + " does not exists"); 1444 } 1445 } 1446 } 1447 1448 private Put createPutForDeleteOperation(String[] backupIdList) { 1449 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1450 Put put = new Put(DELETE_OP_ROW); 1451 put.addColumn(META_FAMILY, FAM_COL, value); 1452 return put; 1453 } 1454 1455 private Delete createDeleteForBackupDeleteOperation() { 1456 Delete delete = new Delete(DELETE_OP_ROW); 1457 delete.addFamily(META_FAMILY); 1458 return delete; 1459 } 1460 1461 private Get createGetForDeleteOperation() { 1462 Get get = new Get(DELETE_OP_ROW); 1463 get.addFamily(META_FAMILY); 1464 return get; 1465 } 1466 1467 public void startDeleteOperation(String[] backupIdList) throws IOException { 1468 if (LOG.isTraceEnabled()) { 1469 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1470 } 1471 Put put = createPutForDeleteOperation(backupIdList); 1472 try (Table table = connection.getTable(tableName)) { 1473 table.put(put); 1474 } 1475 } 1476 1477 public void finishDeleteOperation() throws IOException { 1478 LOG.trace("Finsih delete operation for backup ids"); 1479 1480 Delete delete = createDeleteForBackupDeleteOperation(); 1481 try (Table table = connection.getTable(tableName)) { 1482 table.delete(delete); 1483 } 1484 } 1485 1486 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1487 LOG.trace("Get delete operation for backup ids"); 1488 1489 Get get = createGetForDeleteOperation(); 1490 try (Table table = connection.getTable(tableName)) { 1491 Result res = table.get(get); 1492 if (res.isEmpty()) { 1493 return null; 1494 } 1495 Cell cell = res.listCells().get(0); 1496 byte[] val = CellUtil.cloneValue(cell); 1497 if (val.length == 0) { 1498 return null; 1499 } 1500 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1501 .toArray(String[]::new); 1502 } 1503 } 1504 1505 private Put createPutForMergeOperation(String[] backupIdList) { 1506 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1507 Put put = new Put(MERGE_OP_ROW); 1508 put.addColumn(META_FAMILY, FAM_COL, value); 1509 return put; 1510 } 1511 1512 public boolean isMergeInProgress() throws IOException { 1513 Get get = new Get(MERGE_OP_ROW); 1514 try (Table table = connection.getTable(tableName)) { 1515 Result res = table.get(get); 1516 return !res.isEmpty(); 1517 } 1518 } 1519 1520 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1521 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1522 Put put = new Put(MERGE_OP_ROW); 1523 put.addColumn(META_FAMILY, PATH_COL, value); 1524 return put; 1525 } 1526 1527 private Delete createDeleteForBackupMergeOperation() { 1528 Delete delete = new Delete(MERGE_OP_ROW); 1529 delete.addFamily(META_FAMILY); 1530 return delete; 1531 } 1532 1533 private Get createGetForMergeOperation() { 1534 Get get = new Get(MERGE_OP_ROW); 1535 get.addFamily(META_FAMILY); 1536 return get; 1537 } 1538 1539 public void startMergeOperation(String[] backupIdList) throws IOException { 1540 if (LOG.isTraceEnabled()) { 1541 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1542 } 1543 Put put = createPutForMergeOperation(backupIdList); 1544 try (Table table = connection.getTable(tableName)) { 1545 table.put(put); 1546 } 1547 } 1548 1549 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1550 if (LOG.isTraceEnabled()) { 1551 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1552 } 1553 Put put = createPutForUpdateTablesForMerge(tables); 1554 try (Table table = connection.getTable(tableName)) { 1555 table.put(put); 1556 } 1557 } 1558 1559 public void finishMergeOperation() throws IOException { 1560 LOG.trace("Finish merge operation for backup ids"); 1561 1562 Delete delete = createDeleteForBackupMergeOperation(); 1563 try (Table table = connection.getTable(tableName)) { 1564 table.delete(delete); 1565 } 1566 } 1567 1568 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1569 LOG.trace("Get backup ids for merge operation"); 1570 1571 Get get = createGetForMergeOperation(); 1572 try (Table table = connection.getTable(tableName)) { 1573 Result res = table.get(get); 1574 if (res.isEmpty()) { 1575 return null; 1576 } 1577 Cell cell = res.listCells().get(0); 1578 byte[] val = CellUtil.cloneValue(cell); 1579 if (val.length == 0) { 1580 return null; 1581 } 1582 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1583 .toArray(String[]::new); 1584 } 1585 } 1586 1587 /** 1588 * Creates a scan to read all registered bulk loads for the given table, or for all tables if 1589 * {@code table} is {@code null}. 1590 */ 1591 static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) { 1592 Scan scan = new Scan(); 1593 byte[] startRow = table == null 1594 ? BULK_LOAD_PREFIX_BYTES 1595 : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1596 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1597 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1598 scan.withStartRow(startRow); 1599 scan.withStopRow(stopRow); 1600 scan.addFamily(BackupSystemTable.META_FAMILY); 1601 scan.readVersions(1); 1602 return scan; 1603 } 1604 1605 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1606 // format is bulk : namespace : table : region : file 1607 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1608 } 1609 1610 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1611 // format is bulk : namespace : table : region : file 1612 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1613 Iterator<String> i = parts.iterator(); 1614 int idx = 3; 1615 if (parts.size() == 4) { 1616 // the table is in default namespace 1617 idx = 2; 1618 } 1619 String region = Iterators.get(i, idx); 1620 LOG.debug("bulk row string " + rowStr + " region " + region); 1621 return region; 1622 } 1623 1624 /* 1625 * Used to query bulk loaded hfiles which have been copied by incremental backup 1626 * @param backupId the backup Id. It can be null when querying for all tables 1627 * @return the Scan object 1628 * @deprecated This method is broken if a backupId is specified - see HBASE-28715 1629 */ 1630 static Scan createScanForBulkLoadedFiles(String backupId) { 1631 Scan scan = new Scan(); 1632 byte[] startRow = 1633 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1634 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1635 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1636 scan.withStartRow(startRow); 1637 scan.withStopRow(stopRow); 1638 scan.addFamily(BackupSystemTable.META_FAMILY); 1639 scan.readVersions(1); 1640 return scan; 1641 } 1642 1643 /** 1644 * Creates Scan operation to load backup set list 1645 * @return scan operation 1646 */ 1647 private Scan createScanForBackupSetList() { 1648 Scan scan = new Scan(); 1649 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1650 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1651 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1652 scan.withStartRow(startRow); 1653 scan.withStopRow(stopRow); 1654 scan.addFamily(BackupSystemTable.META_FAMILY); 1655 return scan; 1656 } 1657 1658 /** 1659 * Creates Get operation to load backup set content 1660 * @return get operation 1661 */ 1662 private Get createGetForBackupSet(String name) { 1663 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1664 get.addFamily(BackupSystemTable.META_FAMILY); 1665 return get; 1666 } 1667 1668 /** 1669 * Creates Delete operation to delete backup set content 1670 * @param name backup set's name 1671 * @return delete operation 1672 */ 1673 private Delete createDeleteForBackupSet(String name) { 1674 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1675 del.addFamily(BackupSystemTable.META_FAMILY); 1676 return del; 1677 } 1678 1679 /** 1680 * Creates Put operation to update backup set content 1681 * @param name backup set's name 1682 * @param tables list of tables 1683 * @return put operation 1684 */ 1685 private Put createPutForBackupSet(String name, String[] tables) { 1686 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1687 byte[] value = convertToByteArray(tables); 1688 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1689 return put; 1690 } 1691 1692 private byte[] convertToByteArray(String[] tables) { 1693 return Bytes.toBytes(StringUtils.join(tables, ",")); 1694 } 1695 1696 /** 1697 * Converts cell to backup set list. 1698 * @param current current cell 1699 * @return backup set as array of table names 1700 */ 1701 private String[] cellValueToBackupSet(Cell current) { 1702 byte[] data = CellUtil.cloneValue(current); 1703 if (!ArrayUtils.isEmpty(data)) { 1704 return Bytes.toString(data).split(","); 1705 } 1706 return new String[0]; 1707 } 1708 1709 /** 1710 * Converts cell key to backup set name. 1711 * @param current current cell 1712 * @return backup set name 1713 */ 1714 private String cellKeyToBackupSetName(Cell current) { 1715 byte[] data = CellUtil.cloneRow(current); 1716 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1717 } 1718 1719 private static byte[] rowkey(String s, String... other) { 1720 StringBuilder sb = new StringBuilder(s); 1721 for (String ss : other) { 1722 sb.append(ss); 1723 } 1724 return Bytes.toBytes(sb.toString()); 1725 } 1726 1727 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1728 if (!admin.isTableEnabled(tableName)) { 1729 try { 1730 admin.enableTable(tableName); 1731 } catch (TableNotDisabledException ignored) { 1732 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1733 } 1734 } 1735 } 1736}