001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot; 021import static org.apache.hadoop.hbase.backup.BackupInfo.withState; 022import static org.apache.hadoop.hbase.backup.BackupInfo.withType; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.nio.charset.StandardCharsets; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Objects; 040import java.util.Set; 041import java.util.TreeMap; 042import java.util.TreeSet; 043import java.util.function.Predicate; 044import java.util.stream.Collectors; 045import java.util.stream.Stream; 046import org.apache.commons.lang3.ArrayUtils; 047import org.apache.commons.lang3.StringUtils; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.NamespaceDescriptor; 054import org.apache.hadoop.hbase.NamespaceExistException; 055import org.apache.hadoop.hbase.ServerName; 056import org.apache.hadoop.hbase.TableExistsException; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.TableNotDisabledException; 059import org.apache.hadoop.hbase.backup.BackupInfo; 060import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 061import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 062import org.apache.hadoop.hbase.backup.BackupType; 063import org.apache.hadoop.hbase.client.Admin; 064import org.apache.hadoop.hbase.client.BufferedMutator; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 067import org.apache.hadoop.hbase.client.Connection; 068import org.apache.hadoop.hbase.client.Delete; 069import org.apache.hadoop.hbase.client.Get; 070import org.apache.hadoop.hbase.client.Put; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.SnapshotDescription; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 085import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 086import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 087 088import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 090 091/** 092 * This class provides API to access backup system table<br> 093 * Backup system table schema:<br> 094 * <p> 095 * <ul> 096 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 097 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 098 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 099 * include table; value=empty</li> 100 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 101 * timestamp]</li> 102 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 103 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 104 * name</li> 105 * </ul> 106 * </p> 107 */ 108@InterfaceAudience.Private 109public final class BackupSystemTable implements Closeable { 110 111 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 112 113 static class WALItem { 114 String backupId; 115 String walFile; 116 String backupRoot; 117 118 WALItem(String backupId, String walFile, String backupRoot) { 119 this.backupId = backupId; 120 this.walFile = walFile; 121 this.backupRoot = backupRoot; 122 } 123 124 public String getBackupId() { 125 return backupId; 126 } 127 128 public String getWalFile() { 129 return walFile; 130 } 131 132 public String getBackupRoot() { 133 return backupRoot; 134 } 135 136 @Override 137 public String toString() { 138 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 139 } 140 } 141 142 /** 143 * Backup system table (main) name 144 */ 145 private TableName tableName; 146 147 /** 148 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 149 * separate table because we have to isolate general backup operations: create, merge etc from 150 * activity of RegionObserver, which controls process of a bulk loading 151 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 152 */ 153 private TableName bulkLoadTableName; 154 155 /** 156 * Stores backup sessions (contexts) 157 */ 158 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 159 /** 160 * Stores other meta 161 */ 162 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 163 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 164 /** 165 * Connection to HBase cluster, shared among all instances 166 */ 167 private final Connection connection; 168 169 private final static String BACKUP_INFO_PREFIX = "session:"; 170 private final static String START_CODE_ROW = "startcode:"; 171 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 172 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 173 174 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 175 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 176 177 private final static String INCR_BACKUP_SET = "incrbackupset:"; 178 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 179 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 180 181 private final static String BULK_LOAD_PREFIX = "bulk:"; 182 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 183 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 184 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 185 186 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 187 final static byte[] FAM_COL = Bytes.toBytes("fam"); 188 final static byte[] PATH_COL = Bytes.toBytes("path"); 189 190 private final static String SET_KEY_PREFIX = "backupset:"; 191 192 // separator between BULK_LOAD_PREFIX and ordinals 193 private final static String BLK_LD_DELIM = ":"; 194 private final static byte[] EMPTY_VALUE = new byte[] {}; 195 196 // Safe delimiter in a string 197 private final static String NULL = "\u0000"; 198 199 public BackupSystemTable(Connection conn) throws IOException { 200 this.connection = conn; 201 Configuration conf = this.connection.getConfiguration(); 202 tableName = BackupSystemTable.getTableName(conf); 203 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 204 checkSystemTable(); 205 } 206 207 private void checkSystemTable() throws IOException { 208 try (Admin admin = connection.getAdmin()) { 209 verifyNamespaceExists(admin); 210 Configuration conf = connection.getConfiguration(); 211 if (!admin.tableExists(tableName)) { 212 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 213 createSystemTable(admin, backupHTD); 214 } 215 ensureTableEnabled(admin, tableName); 216 if (!admin.tableExists(bulkLoadTableName)) { 217 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 218 createSystemTable(admin, blHTD); 219 } 220 ensureTableEnabled(admin, bulkLoadTableName); 221 waitForSystemTable(admin, tableName); 222 waitForSystemTable(admin, bulkLoadTableName); 223 } 224 } 225 226 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 227 try { 228 admin.createTable(descriptor); 229 } catch (TableExistsException e) { 230 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 231 // so may be subject to race conditions where one caller succeeds in creating the 232 // table and others fail because it now exists 233 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 234 } 235 } 236 237 private void verifyNamespaceExists(Admin admin) throws IOException { 238 String namespaceName = tableName.getNamespaceAsString(); 239 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 240 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 241 boolean exists = false; 242 for (NamespaceDescriptor nsd : list) { 243 if (nsd.getName().equals(ns.getName())) { 244 exists = true; 245 break; 246 } 247 } 248 if (!exists) { 249 try { 250 admin.createNamespace(ns); 251 } catch (NamespaceExistException e) { 252 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 253 // so may be subject to race conditions where one caller succeeds in creating the 254 // namespace and others fail because it now exists 255 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 256 } 257 } 258 } 259 260 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 261 // Return fast if the table is available and avoid a log message 262 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 263 return; 264 } 265 long TIMEOUT = 60000; 266 long startTime = EnvironmentEdgeManager.currentTime(); 267 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 268 tableName); 269 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 270 try { 271 Thread.sleep(100); 272 } catch (InterruptedException e) { 273 throw (IOException) new InterruptedIOException().initCause(e); 274 } 275 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 276 throw new IOException( 277 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 278 } 279 } 280 LOG.debug("Backup table {} exists and available", tableName); 281 } 282 283 @Override 284 public void close() { 285 // do nothing 286 } 287 288 /** 289 * Updates status (state) of a backup session in backup system table table 290 * @param info backup info 291 * @throws IOException exception 292 */ 293 public void updateBackupInfo(BackupInfo info) throws IOException { 294 if (LOG.isTraceEnabled()) { 295 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 296 + " set status=" + info.getState()); 297 } 298 try (Table table = connection.getTable(tableName)) { 299 Put put = createPutForBackupInfo(info); 300 table.put(put); 301 } 302 } 303 304 /* 305 * @param backupId the backup Id 306 * @return Map of rows to path of bulk loaded hfile 307 */ 308 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 309 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 310 try (Table table = connection.getTable(bulkLoadTableName); 311 ResultScanner scanner = table.getScanner(scan)) { 312 Result res = null; 313 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 314 while ((res = scanner.next()) != null) { 315 res.advance(); 316 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 317 for (Cell cell : res.listCells()) { 318 if ( 319 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 320 BackupSystemTable.PATH_COL.length) == 0 321 ) { 322 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 323 } 324 } 325 } 326 return map; 327 } 328 } 329 330 /** 331 * Deletes backup status from backup system table table 332 * @param backupId backup id 333 * @throws IOException exception 334 */ 335 public void deleteBackupInfo(String backupId) throws IOException { 336 if (LOG.isTraceEnabled()) { 337 LOG.trace("delete backup status in backup system table for " + backupId); 338 } 339 try (Table table = connection.getTable(tableName)) { 340 Delete del = createDeleteForBackupInfo(backupId); 341 table.delete(del); 342 } 343 } 344 345 /** 346 * Registers a bulk load. 347 * @param tableName table name 348 * @param region the region receiving hfile 349 * @param cfToHfilePath column family and associated hfiles 350 */ 351 public void registerBulkLoad(TableName tableName, byte[] region, 352 Map<byte[], List<Path>> cfToHfilePath) throws IOException { 353 if (LOG.isDebugEnabled()) { 354 LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, 355 cfToHfilePath.size()); 356 } 357 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 358 List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); 359 bufferedMutator.mutate(puts); 360 LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName); 361 } 362 } 363 364 /** 365 * Removes entries from the table that tracks all bulk loaded hfiles. 366 * @param rows the row keys of the entries to be deleted 367 */ 368 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 369 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 370 List<Delete> deletes = new ArrayList<>(); 371 for (byte[] row : rows) { 372 Delete del = new Delete(row); 373 deletes.add(del); 374 LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row)); 375 } 376 bufferedMutator.mutate(deletes); 377 LOG.debug("Deleted {} bulk load entries.", rows.size()); 378 } 379 } 380 381 /** 382 * Reads all registered bulk loads. 383 */ 384 public List<BulkLoad> readBulkloadRows() throws IOException { 385 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null); 386 return processBulkLoadRowScan(scan); 387 } 388 389 /** 390 * Reads the registered bulk loads for the given tables. 391 */ 392 public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException { 393 List<BulkLoad> result = new ArrayList<>(); 394 for (TableName table : tableList) { 395 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); 396 result.addAll(processBulkLoadRowScan(scan)); 397 } 398 return result; 399 } 400 401 private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException { 402 List<BulkLoad> result = new ArrayList<>(); 403 try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); 404 ResultScanner scanner = bulkLoadTable.getScanner(scan)) { 405 Result res; 406 while ((res = scanner.next()) != null) { 407 res.advance(); 408 TableName table = null; 409 String fam = null; 410 String path = null; 411 String region = null; 412 byte[] row = null; 413 for (Cell cell : res.listCells()) { 414 row = CellUtil.cloneRow(cell); 415 String rowStr = Bytes.toString(row); 416 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 417 if ( 418 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 419 BackupSystemTable.TBL_COL.length) == 0 420 ) { 421 table = TableName.valueOf(CellUtil.cloneValue(cell)); 422 } else if ( 423 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 424 BackupSystemTable.FAM_COL.length) == 0 425 ) { 426 fam = Bytes.toString(CellUtil.cloneValue(cell)); 427 } else if ( 428 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 429 BackupSystemTable.PATH_COL.length) == 0 430 ) { 431 path = Bytes.toString(CellUtil.cloneValue(cell)); 432 } 433 } 434 result.add(new BulkLoad(table, region, fam, path, row)); 435 LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path); 436 } 437 } 438 return result; 439 } 440 441 /** 442 * Reads backup status object (instance of backup info) from backup system table table 443 * @param backupId backup id 444 * @return Current status of backup session or null 445 */ 446 public BackupInfo readBackupInfo(String backupId) throws IOException { 447 if (LOG.isTraceEnabled()) { 448 LOG.trace("read backup status from backup system table for: " + backupId); 449 } 450 451 try (Table table = connection.getTable(tableName)) { 452 Get get = createGetForBackupInfo(backupId); 453 Result res = table.get(get); 454 if (res.isEmpty()) { 455 return null; 456 } 457 return resultToBackupInfo(res); 458 } 459 } 460 461 /** 462 * Read the last backup start code (timestamp) of last successful backup. Will return null if 463 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 464 * there is no successful backup completed so far. 465 * @param backupRoot directory path to backup destination 466 * @return the timestamp of last successful backup 467 * @throws IOException exception 468 */ 469 public String readBackupStartCode(String backupRoot) throws IOException { 470 LOG.trace("read backup start code from backup system table"); 471 472 try (Table table = connection.getTable(tableName)) { 473 Get get = createGetForStartCode(backupRoot); 474 Result res = table.get(get); 475 if (res.isEmpty()) { 476 return null; 477 } 478 Cell cell = res.listCells().get(0); 479 byte[] val = CellUtil.cloneValue(cell); 480 if (val.length == 0) { 481 return null; 482 } 483 return new String(val, StandardCharsets.UTF_8); 484 } 485 } 486 487 /** 488 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 489 * @param startCode start code 490 * @param backupRoot root directory path to backup 491 * @throws IOException exception 492 */ 493 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 494 if (LOG.isTraceEnabled()) { 495 LOG.trace("write backup start code to backup system table " + startCode); 496 } 497 try (Table table = connection.getTable(tableName)) { 498 Put put = createPutForStartCode(startCode.toString(), backupRoot); 499 table.put(put); 500 } 501 } 502 503 /** 504 * Exclusive operations are: create, delete, merge 505 * @throws IOException if a table operation fails or an active backup exclusive operation is 506 * already underway 507 */ 508 public void startBackupExclusiveOperation() throws IOException { 509 LOG.debug("Start new backup exclusive operation"); 510 511 try (Table table = connection.getTable(tableName)) { 512 Put put = createPutForStartBackupSession(); 513 // First try to put if row does not exist 514 if ( 515 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 516 .ifNotExists().thenPut(put) 517 ) { 518 // Row exists, try to put if value == ACTIVE_SESSION_NO 519 if ( 520 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 521 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 522 ) { 523 throw new ExclusiveOperationException(); 524 } 525 } 526 } 527 } 528 529 private Put createPutForStartBackupSession() { 530 Put put = new Put(ACTIVE_SESSION_ROW); 531 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 532 return put; 533 } 534 535 public void finishBackupExclusiveOperation() throws IOException { 536 LOG.debug("Finish backup exclusive operation"); 537 538 try (Table table = connection.getTable(tableName)) { 539 Put put = createPutForStopBackupSession(); 540 if ( 541 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 542 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 543 ) { 544 throw new IOException("There is no active backup exclusive operation"); 545 } 546 } 547 } 548 549 private Put createPutForStopBackupSession() { 550 Put put = new Put(ACTIVE_SESSION_ROW); 551 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 552 return put; 553 } 554 555 /** 556 * Get the Region Servers log information after the last log roll from backup system table. 557 * @param backupRoot root directory path to backup 558 * @return RS log info 559 * @throws IOException exception 560 */ 561 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 562 throws IOException { 563 LOG.trace("read region server last roll log result to backup system table"); 564 565 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 566 567 try (Table table = connection.getTable(tableName); 568 ResultScanner scanner = table.getScanner(scan)) { 569 Result res; 570 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 571 while ((res = scanner.next()) != null) { 572 res.advance(); 573 Cell cell = res.current(); 574 byte[] row = CellUtil.cloneRow(cell); 575 String server = getServerNameForReadRegionServerLastLogRollResult(row); 576 byte[] data = CellUtil.cloneValue(cell); 577 rsTimestampMap.put(server, Bytes.toLong(data)); 578 } 579 return rsTimestampMap; 580 } 581 } 582 583 /** 584 * Writes Region Server last roll log result (timestamp) to backup system table table 585 * @param server Region Server name 586 * @param ts last log timestamp 587 * @param backupRoot root directory path to backup 588 * @throws IOException exception 589 */ 590 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 591 throws IOException { 592 LOG.trace("write region server last roll log result to backup system table"); 593 594 try (Table table = connection.getTable(tableName)) { 595 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 596 table.put(put); 597 } 598 } 599 600 /** 601 * Retrieve all table names that are part of any known completed backup 602 */ 603 public Set<TableName> getTablesIncludedInBackups() throws IOException { 604 // Incremental backups have the same tables as the preceding full backups 605 List<BackupInfo> infos = 606 getBackupHistory(withState(BackupState.COMPLETE), withType(BackupType.FULL)); 607 return infos.stream().flatMap(info -> info.getTableNames().stream()) 608 .collect(Collectors.toSet()); 609 } 610 611 /** 612 * Goes through all backup history corresponding to the provided root folder, and collects all 613 * backup info mentioning each of the provided tables. 614 * @param set the tables for which to collect the {@code BackupInfo} 615 * @param backupRoot backup destination path to retrieve backup history for 616 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 617 * least one {@code BackupInfo} 618 * @throws IOException if getting the backup history fails 619 */ 620 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 621 String backupRoot) throws IOException { 622 List<BackupInfo> history = getBackupHistory(withRoot(backupRoot)); 623 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 624 for (BackupInfo info : history) { 625 List<TableName> tables = info.getTableNames(); 626 for (TableName tableName : tables) { 627 if (set.contains(tableName)) { 628 List<BackupInfo> list = 629 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 630 list.add(info); 631 } 632 } 633 } 634 return tableHistoryMap; 635 } 636 637 /** 638 * Get all backup information passing the given filters, ordered by descending backupId. I.e. from 639 * newest to oldest. 640 */ 641 public List<BackupInfo> getBackupHistory(BackupInfo.Filter... toInclude) throws IOException { 642 return getBackupHistory(Order.NEW_TO_OLD, Integer.MAX_VALUE, toInclude); 643 } 644 645 /** 646 * Retrieves the first n entries of the sorted, filtered list of backup infos. 647 * @param order desired ordering of the results. 648 * @param n number of entries to return 649 */ 650 public List<BackupInfo> getBackupHistory(Order order, int n, BackupInfo.Filter... toInclude) 651 throws IOException { 652 Preconditions.checkArgument(n >= 0, "n should be >= 0"); 653 LOG.trace("get backup infos from backup system table"); 654 655 if (n == 0) { 656 return Collections.emptyList(); 657 } 658 659 Predicate<BackupInfo> combinedPredicate = Stream.of(toInclude) 660 .map(filter -> (Predicate<BackupInfo>) filter).reduce(Predicate::and).orElse(x -> true); 661 662 Scan scan = createScanForBackupHistory(order); 663 List<BackupInfo> list = new ArrayList<>(); 664 665 try (Table table = connection.getTable(tableName); 666 ResultScanner scanner = table.getScanner(scan)) { 667 Result res; 668 while ((res = scanner.next()) != null) { 669 res.advance(); 670 BackupInfo context = cellToBackupInfo(res.current()); 671 if (combinedPredicate.test(context)) { 672 list.add(context); 673 if (list.size() == n) { 674 break; 675 } 676 } 677 } 678 return list; 679 } 680 } 681 682 /** 683 * Write the current timestamps for each regionserver to backup system table after a successful 684 * full or incremental backup. The saved timestamp is of the last log file that was backed up 685 * already. 686 * @param tables tables 687 * @param newTimestamps timestamps 688 * @param backupRoot root directory path to backup 689 * @throws IOException exception 690 */ 691 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 692 String backupRoot) throws IOException { 693 if (LOG.isTraceEnabled()) { 694 LOG.trace("write RS log time stamps to backup system table for tables [" 695 + StringUtils.join(tables, ",") + "]"); 696 } 697 List<Put> puts = new ArrayList<>(); 698 for (TableName table : tables) { 699 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 700 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 701 puts.add(put); 702 } 703 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 704 bufferedMutator.mutate(puts); 705 } 706 } 707 708 /** 709 * Read the timestamp for each region server log after the last successful backup. Each table has 710 * its own set of the timestamps. The info is stored for each table as a concatenated string of 711 * rs->timestapmp 712 * @param backupRoot root directory path to backup 713 * @return the timestamp for each region server. key: tableName value: 714 * RegionServer,PreviousTimeStamp 715 * @throws IOException exception 716 */ 717 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 718 throws IOException { 719 if (LOG.isTraceEnabled()) { 720 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 721 } 722 723 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 724 725 Scan scan = createScanForReadLogTimestampMap(backupRoot); 726 try (Table table = connection.getTable(tableName); 727 ResultScanner scanner = table.getScanner(scan)) { 728 Result res; 729 while ((res = scanner.next()) != null) { 730 res.advance(); 731 Cell cell = res.current(); 732 byte[] row = CellUtil.cloneRow(cell); 733 String tabName = getTableNameForReadLogTimestampMap(row); 734 TableName tn = TableName.valueOf(tabName); 735 byte[] data = CellUtil.cloneValue(cell); 736 if (data == null) { 737 throw new IOException("Data of last backup data from backup system table " 738 + "is empty. Create a backup first."); 739 } 740 if (data != null && data.length > 0) { 741 HashMap<String, Long> lastBackup = 742 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 743 tableTimestampMap.put(tn, lastBackup); 744 } 745 } 746 return tableTimestampMap; 747 } 748 } 749 750 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 751 Map<String, Long> map) { 752 BackupProtos.TableServerTimestamp.Builder tstBuilder = 753 BackupProtos.TableServerTimestamp.newBuilder(); 754 tstBuilder 755 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 756 757 for (Entry<String, Long> entry : map.entrySet()) { 758 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 759 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 760 ServerName sn = ServerName.parseServerName(entry.getKey()); 761 snBuilder.setHostName(sn.getHostname()); 762 snBuilder.setPort(sn.getPort()); 763 builder.setServerName(snBuilder.build()); 764 builder.setTimestamp(entry.getValue()); 765 tstBuilder.addServerTimestamp(builder.build()); 766 } 767 768 return tstBuilder.build(); 769 } 770 771 private HashMap<String, Long> 772 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 773 774 HashMap<String, Long> map = new HashMap<>(); 775 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 776 for (BackupProtos.ServerTimestamp st : list) { 777 ServerName sn = 778 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 779 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 780 } 781 return map; 782 } 783 784 /** 785 * Return the current tables covered by incremental backup. 786 * @param backupRoot root directory path to backup 787 * @return set of tableNames 788 * @throws IOException exception 789 */ 790 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 791 LOG.trace("get incremental backup table set from backup system table"); 792 793 TreeSet<TableName> set = new TreeSet<>(); 794 795 try (Table table = connection.getTable(tableName)) { 796 Get get = createGetForIncrBackupTableSet(backupRoot); 797 Result res = table.get(get); 798 if (res.isEmpty()) { 799 return set; 800 } 801 List<Cell> cells = res.listCells(); 802 for (Cell cell : cells) { 803 // qualifier = table name - we use table names as qualifiers 804 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 805 } 806 return set; 807 } 808 } 809 810 /** 811 * Add tables to global incremental backup set 812 * @param tables set of tables 813 * @param backupRoot root directory path to backup 814 * @throws IOException exception 815 */ 816 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 817 throws IOException { 818 if (LOG.isTraceEnabled()) { 819 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 820 + " tables [" + StringUtils.join(tables, " ") + "]"); 821 } 822 if (LOG.isDebugEnabled()) { 823 tables.forEach(table -> LOG.debug(Objects.toString(table))); 824 } 825 try (Table table = connection.getTable(tableName)) { 826 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 827 table.put(put); 828 } 829 } 830 831 /** 832 * Deletes incremental backup set for a backup destination 833 * @param backupRoot backup root 834 */ 835 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 836 if (LOG.isTraceEnabled()) { 837 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 838 } 839 try (Table table = connection.getTable(tableName)) { 840 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 841 table.delete(delete); 842 } 843 } 844 845 /** 846 * Checks if we have at least one backup session in backup system table This API is used by 847 * BackupLogCleaner 848 * @return true, if at least one session exists in backup system table 849 * @throws IOException exception 850 */ 851 public boolean hasBackupSessions() throws IOException { 852 LOG.trace("Has backup sessions from backup system table"); 853 854 Scan scan = createScanForBackupHistory(Order.OLD_TO_NEW); 855 scan.setCaching(1); 856 try (Table table = connection.getTable(tableName); 857 ResultScanner scanner = table.getScanner(scan)) { 858 return scanner.next() != null; 859 } 860 } 861 862 /** 863 * BACKUP SETS 864 */ 865 866 /** 867 * Get backup set list 868 * @return backup set list 869 * @throws IOException if a table or scanner operation fails 870 */ 871 public List<String> listBackupSets() throws IOException { 872 LOG.trace("Backup set list"); 873 874 List<String> list = new ArrayList<>(); 875 try (Table table = connection.getTable(tableName)) { 876 Scan scan = createScanForBackupSetList(); 877 scan.readVersions(1); 878 try (ResultScanner scanner = table.getScanner(scan)) { 879 Result res; 880 while ((res = scanner.next()) != null) { 881 res.advance(); 882 list.add(cellKeyToBackupSetName(res.current())); 883 } 884 return list; 885 } 886 } 887 } 888 889 /** 890 * Get backup set description (list of tables) 891 * @param name set's name 892 * @return list of tables in a backup set 893 * @throws IOException if a table operation fails 894 */ 895 public List<TableName> describeBackupSet(String name) throws IOException { 896 if (LOG.isTraceEnabled()) { 897 LOG.trace(" Backup set describe: " + name); 898 } 899 try (Table table = connection.getTable(tableName)) { 900 Get get = createGetForBackupSet(name); 901 Result res = table.get(get); 902 if (res.isEmpty()) { 903 return null; 904 } 905 res.advance(); 906 String[] tables = cellValueToBackupSet(res.current()); 907 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 908 .collect(Collectors.toList()); 909 } 910 } 911 912 /** 913 * Add backup set (list of tables) 914 * @param name set name 915 * @param newTables list of tables, comma-separated 916 * @throws IOException if a table operation fails 917 */ 918 public void addToBackupSet(String name, String[] newTables) throws IOException { 919 if (LOG.isTraceEnabled()) { 920 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 921 } 922 String[] union = null; 923 try (Table table = connection.getTable(tableName)) { 924 Get get = createGetForBackupSet(name); 925 Result res = table.get(get); 926 if (res.isEmpty()) { 927 union = newTables; 928 } else { 929 res.advance(); 930 String[] tables = cellValueToBackupSet(res.current()); 931 union = merge(tables, newTables); 932 } 933 Put put = createPutForBackupSet(name, union); 934 table.put(put); 935 } 936 } 937 938 /** 939 * Remove tables from backup set (list of tables) 940 * @param name set name 941 * @param toRemove list of tables 942 * @throws IOException if a table operation or deleting the backup set fails 943 */ 944 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 945 if (LOG.isTraceEnabled()) { 946 LOG.trace( 947 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 948 } 949 String[] disjoint; 950 String[] tables; 951 try (Table table = connection.getTable(tableName)) { 952 Get get = createGetForBackupSet(name); 953 Result res = table.get(get); 954 if (res.isEmpty()) { 955 LOG.warn("Backup set '" + name + "' not found."); 956 return; 957 } else { 958 res.advance(); 959 tables = cellValueToBackupSet(res.current()); 960 disjoint = disjoin(tables, toRemove); 961 } 962 if (disjoint.length > 0 && disjoint.length != tables.length) { 963 Put put = createPutForBackupSet(name, disjoint); 964 table.put(put); 965 } else if (disjoint.length == tables.length) { 966 LOG.warn("Backup set '" + name + "' does not contain tables [" 967 + StringUtils.join(toRemove, " ") + "]"); 968 } else { // disjoint.length == 0 and tables.length >0 969 // Delete backup set 970 LOG.info("Backup set '" + name + "' is empty. Deleting."); 971 deleteBackupSet(name); 972 } 973 } 974 } 975 976 private String[] merge(String[] existingTables, String[] newTables) { 977 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 978 tables.addAll(Arrays.asList(newTables)); 979 return tables.toArray(new String[0]); 980 } 981 982 private String[] disjoin(String[] existingTables, String[] toRemove) { 983 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 984 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 985 return tables.toArray(new String[0]); 986 } 987 988 /** 989 * Delete backup set 990 * @param name set's name 991 * @throws IOException if getting or deleting the table fails 992 */ 993 public void deleteBackupSet(String name) throws IOException { 994 if (LOG.isTraceEnabled()) { 995 LOG.trace(" Backup set delete: " + name); 996 } 997 try (Table table = connection.getTable(tableName)) { 998 Delete del = createDeleteForBackupSet(name); 999 table.delete(del); 1000 } 1001 } 1002 1003 /** 1004 * Get backup system table descriptor 1005 * @return table's descriptor 1006 */ 1007 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1008 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1009 1010 ColumnFamilyDescriptorBuilder colBuilder = 1011 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1012 1013 colBuilder.setMaxVersions(1); 1014 Configuration config = HBaseConfiguration.create(); 1015 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1016 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1017 colBuilder.setTimeToLive(ttl); 1018 1019 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1020 builder.setColumnFamily(colSessionsDesc); 1021 1022 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1023 colBuilder.setTimeToLive(ttl); 1024 builder.setColumnFamily(colBuilder.build()); 1025 return builder.build(); 1026 } 1027 1028 public static TableName getTableName(Configuration conf) { 1029 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1030 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1031 return TableName.valueOf(name); 1032 } 1033 1034 public static String getTableNameAsString(Configuration conf) { 1035 return getTableName(conf).getNameAsString(); 1036 } 1037 1038 public static String getSnapshotName(Configuration conf) { 1039 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1040 } 1041 1042 /** 1043 * Get backup system table descriptor 1044 * @return table's descriptor 1045 */ 1046 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1047 TableDescriptorBuilder builder = 1048 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1049 1050 ColumnFamilyDescriptorBuilder colBuilder = 1051 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1052 colBuilder.setMaxVersions(1); 1053 Configuration config = HBaseConfiguration.create(); 1054 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1055 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1056 colBuilder.setTimeToLive(ttl); 1057 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1058 builder.setColumnFamily(colSessionsDesc); 1059 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1060 colBuilder.setTimeToLive(ttl); 1061 builder.setColumnFamily(colBuilder.build()); 1062 return builder.build(); 1063 } 1064 1065 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1066 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1067 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1068 return TableName.valueOf(name); 1069 } 1070 1071 /** 1072 * Creates Put operation for a given backup info object 1073 * @param context backup info 1074 * @return put operation 1075 * @throws IOException exception 1076 */ 1077 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1078 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1079 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1080 context.toByteArray()); 1081 return put; 1082 } 1083 1084 /** 1085 * Creates Get operation for a given backup id 1086 * @param backupId backup's ID 1087 * @return get operation 1088 * @throws IOException exception 1089 */ 1090 private Get createGetForBackupInfo(String backupId) throws IOException { 1091 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1092 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1093 get.readVersions(1); 1094 return get; 1095 } 1096 1097 /** 1098 * Creates Delete operation for a given backup id 1099 * @param backupId backup's ID 1100 * @return delete operation 1101 */ 1102 private Delete createDeleteForBackupInfo(String backupId) { 1103 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1104 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1105 return del; 1106 } 1107 1108 /** 1109 * Converts Result to BackupInfo 1110 * @param res HBase result 1111 * @return backup info instance 1112 * @throws IOException exception 1113 */ 1114 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1115 res.advance(); 1116 Cell cell = res.current(); 1117 return cellToBackupInfo(cell); 1118 } 1119 1120 /** 1121 * Creates Get operation to retrieve start code from backup system table 1122 * @return get operation 1123 * @throws IOException exception 1124 */ 1125 private Get createGetForStartCode(String rootPath) throws IOException { 1126 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1127 get.addFamily(BackupSystemTable.META_FAMILY); 1128 get.readVersions(1); 1129 return get; 1130 } 1131 1132 /** 1133 * Creates Put operation to store start code to backup system table 1134 * @return put operation 1135 */ 1136 private Put createPutForStartCode(String startCode, String rootPath) { 1137 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1138 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1139 Bytes.toBytes(startCode)); 1140 return put; 1141 } 1142 1143 /** 1144 * Creates Get to retrieve incremental backup table set from backup system table 1145 * @return get operation 1146 * @throws IOException exception 1147 */ 1148 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1149 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1150 get.addFamily(BackupSystemTable.META_FAMILY); 1151 get.readVersions(1); 1152 return get; 1153 } 1154 1155 /** 1156 * Creates Put to store incremental backup table set 1157 * @param tables tables 1158 * @return put operation 1159 */ 1160 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1161 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1162 for (TableName table : tables) { 1163 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1164 EMPTY_VALUE); 1165 } 1166 return put; 1167 } 1168 1169 /** 1170 * Creates Delete for incremental backup table set 1171 * @param backupRoot backup root 1172 * @return delete operation 1173 */ 1174 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1175 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1176 delete.addFamily(BackupSystemTable.META_FAMILY); 1177 return delete; 1178 } 1179 1180 /** 1181 * Creates Scan operation to load backup history 1182 * @param order order of the scan results 1183 * @return scan operation 1184 */ 1185 private Scan createScanForBackupHistory(Order order) { 1186 Scan scan = new Scan(); 1187 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1188 if (order == Order.NEW_TO_OLD) { 1189 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1190 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1191 scan.setReversed(true); 1192 scan.withStartRow(stopRow, false); 1193 scan.withStopRow(startRow); 1194 } else if (order == Order.OLD_TO_NEW) { 1195 scan.setStartStopRowForPrefixScan(startRow); 1196 } else { 1197 throw new IllegalArgumentException("Unsupported order: " + order); 1198 } 1199 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1200 scan.readVersions(1); 1201 return scan; 1202 } 1203 1204 /** 1205 * Converts cell to backup info instance. 1206 * @param current current cell 1207 * @return backup backup info instance 1208 * @throws IOException exception 1209 */ 1210 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1211 byte[] data = CellUtil.cloneValue(current); 1212 return BackupInfo.fromByteArray(data); 1213 } 1214 1215 /** 1216 * Creates Put to write RS last roll log timestamp map 1217 * @param table table 1218 * @param smap map, containing RS:ts 1219 * @return put operation 1220 */ 1221 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1222 String backupRoot) { 1223 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1224 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1225 return put; 1226 } 1227 1228 /** 1229 * Creates Scan to load table-> { RS -> ts} map of maps 1230 * @return scan operation 1231 */ 1232 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1233 Scan scan = new Scan(); 1234 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1235 scan.addFamily(BackupSystemTable.META_FAMILY); 1236 1237 return scan; 1238 } 1239 1240 /** 1241 * Get table name from rowkey 1242 * @param cloneRow rowkey 1243 * @return table name 1244 */ 1245 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1246 String s = Bytes.toString(cloneRow); 1247 int index = s.lastIndexOf(NULL); 1248 return s.substring(index + 1); 1249 } 1250 1251 /** 1252 * Creates Put to store RS last log result 1253 * @param server server name 1254 * @param timestamp log roll result (timestamp) 1255 * @return put operation 1256 */ 1257 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1258 String backupRoot) { 1259 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1260 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1261 Bytes.toBytes(timestamp)); 1262 return put; 1263 } 1264 1265 /** 1266 * Creates Scan operation to load last RS log roll results 1267 * @return scan operation 1268 */ 1269 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1270 Scan scan = new Scan(); 1271 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1272 scan.addFamily(BackupSystemTable.META_FAMILY); 1273 scan.readVersions(1); 1274 1275 return scan; 1276 } 1277 1278 /** 1279 * Get server's name from rowkey 1280 * @param row rowkey 1281 * @return server's name 1282 */ 1283 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1284 String s = Bytes.toString(row); 1285 int index = s.lastIndexOf(NULL); 1286 return s.substring(index + 1); 1287 } 1288 1289 /** 1290 * Creates Put's for bulk loads. 1291 */ 1292 private static List<Put> createPutForBulkLoad(TableName table, byte[] region, 1293 Map<byte[], List<Path>> columnFamilyToHFilePaths) { 1294 List<Put> puts = new ArrayList<>(); 1295 for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) { 1296 for (Path path : entry.getValue()) { 1297 String file = path.toString(); 1298 int lastSlash = file.lastIndexOf("/"); 1299 String filename = file.substring(lastSlash + 1); 1300 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1301 Bytes.toString(region), BLK_LD_DELIM, filename)); 1302 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1303 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1304 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1305 puts.add(put); 1306 LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); 1307 } 1308 } 1309 return puts; 1310 } 1311 1312 public static void snapshot(Connection conn) throws IOException { 1313 try (Admin admin = conn.getAdmin()) { 1314 Configuration conf = conn.getConfiguration(); 1315 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1316 } 1317 } 1318 1319 public static void restoreFromSnapshot(Connection conn) throws IOException { 1320 Configuration conf = conn.getConfiguration(); 1321 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1322 try (Admin admin = conn.getAdmin()) { 1323 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1324 if (snapshotExists(admin, snapshotName)) { 1325 admin.restoreBackupSystemTable(snapshotName); 1326 LOG.debug("Done restoring backup system table"); 1327 } else { 1328 // Snapshot does not exists, i.e completeBackup failed after 1329 // deleting backup system table snapshot 1330 // In this case we log WARN and proceed 1331 LOG.warn( 1332 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1333 } 1334 } 1335 } 1336 1337 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1338 List<SnapshotDescription> list = admin.listSnapshots(); 1339 for (SnapshotDescription desc : list) { 1340 if (desc.getName().equals(snapshotName)) { 1341 return true; 1342 } 1343 } 1344 return false; 1345 } 1346 1347 public static boolean snapshotExists(Connection conn) throws IOException { 1348 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1349 } 1350 1351 public static void deleteSnapshot(Connection conn) throws IOException { 1352 Configuration conf = conn.getConfiguration(); 1353 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1354 try (Admin admin = conn.getAdmin()) { 1355 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1356 if (snapshotExists(admin, snapshotName)) { 1357 admin.deleteSnapshot(snapshotName); 1358 LOG.debug("Done deleting backup system table snapshot"); 1359 } else { 1360 LOG.error("Snapshot " + snapshotName + " does not exists"); 1361 } 1362 } 1363 } 1364 1365 private Put createPutForDeleteOperation(String[] backupIdList) { 1366 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1367 Put put = new Put(DELETE_OP_ROW); 1368 put.addColumn(META_FAMILY, FAM_COL, value); 1369 return put; 1370 } 1371 1372 private Delete createDeleteForBackupDeleteOperation() { 1373 Delete delete = new Delete(DELETE_OP_ROW); 1374 delete.addFamily(META_FAMILY); 1375 return delete; 1376 } 1377 1378 private Get createGetForDeleteOperation() { 1379 Get get = new Get(DELETE_OP_ROW); 1380 get.addFamily(META_FAMILY); 1381 return get; 1382 } 1383 1384 public void startDeleteOperation(String[] backupIdList) throws IOException { 1385 if (LOG.isTraceEnabled()) { 1386 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1387 } 1388 Put put = createPutForDeleteOperation(backupIdList); 1389 try (Table table = connection.getTable(tableName)) { 1390 table.put(put); 1391 } 1392 } 1393 1394 public void finishDeleteOperation() throws IOException { 1395 LOG.trace("Finsih delete operation for backup ids"); 1396 1397 Delete delete = createDeleteForBackupDeleteOperation(); 1398 try (Table table = connection.getTable(tableName)) { 1399 table.delete(delete); 1400 } 1401 } 1402 1403 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1404 LOG.trace("Get delete operation for backup ids"); 1405 1406 Get get = createGetForDeleteOperation(); 1407 try (Table table = connection.getTable(tableName)) { 1408 Result res = table.get(get); 1409 if (res.isEmpty()) { 1410 return null; 1411 } 1412 Cell cell = res.listCells().get(0); 1413 byte[] val = CellUtil.cloneValue(cell); 1414 if (val.length == 0) { 1415 return null; 1416 } 1417 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1418 .toArray(String[]::new); 1419 } 1420 } 1421 1422 private Put createPutForMergeOperation(String[] backupIdList) { 1423 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1424 Put put = new Put(MERGE_OP_ROW); 1425 put.addColumn(META_FAMILY, FAM_COL, value); 1426 return put; 1427 } 1428 1429 public boolean isMergeInProgress() throws IOException { 1430 Get get = new Get(MERGE_OP_ROW); 1431 try (Table table = connection.getTable(tableName)) { 1432 Result res = table.get(get); 1433 return !res.isEmpty(); 1434 } 1435 } 1436 1437 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1438 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1439 Put put = new Put(MERGE_OP_ROW); 1440 put.addColumn(META_FAMILY, PATH_COL, value); 1441 return put; 1442 } 1443 1444 private Delete createDeleteForBackupMergeOperation() { 1445 Delete delete = new Delete(MERGE_OP_ROW); 1446 delete.addFamily(META_FAMILY); 1447 return delete; 1448 } 1449 1450 private Get createGetForMergeOperation() { 1451 Get get = new Get(MERGE_OP_ROW); 1452 get.addFamily(META_FAMILY); 1453 return get; 1454 } 1455 1456 public void startMergeOperation(String[] backupIdList) throws IOException { 1457 if (LOG.isTraceEnabled()) { 1458 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1459 } 1460 Put put = createPutForMergeOperation(backupIdList); 1461 try (Table table = connection.getTable(tableName)) { 1462 table.put(put); 1463 } 1464 } 1465 1466 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1467 if (LOG.isTraceEnabled()) { 1468 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1469 } 1470 Put put = createPutForUpdateTablesForMerge(tables); 1471 try (Table table = connection.getTable(tableName)) { 1472 table.put(put); 1473 } 1474 } 1475 1476 public void finishMergeOperation() throws IOException { 1477 LOG.trace("Finish merge operation for backup ids"); 1478 1479 Delete delete = createDeleteForBackupMergeOperation(); 1480 try (Table table = connection.getTable(tableName)) { 1481 table.delete(delete); 1482 } 1483 } 1484 1485 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1486 LOG.trace("Get backup ids for merge operation"); 1487 1488 Get get = createGetForMergeOperation(); 1489 try (Table table = connection.getTable(tableName)) { 1490 Result res = table.get(get); 1491 if (res.isEmpty()) { 1492 return null; 1493 } 1494 Cell cell = res.listCells().get(0); 1495 byte[] val = CellUtil.cloneValue(cell); 1496 if (val.length == 0) { 1497 return null; 1498 } 1499 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1500 .toArray(String[]::new); 1501 } 1502 } 1503 1504 /** 1505 * Creates a scan to read all registered bulk loads for the given table, or for all tables if 1506 * {@code table} is {@code null}. 1507 */ 1508 static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) { 1509 Scan scan = new Scan(); 1510 byte[] startRow = table == null 1511 ? BULK_LOAD_PREFIX_BYTES 1512 : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1513 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1514 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1515 scan.withStartRow(startRow); 1516 scan.withStopRow(stopRow); 1517 scan.addFamily(BackupSystemTable.META_FAMILY); 1518 scan.readVersions(1); 1519 return scan; 1520 } 1521 1522 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1523 // format is bulk : namespace : table : region : file 1524 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1525 } 1526 1527 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1528 // format is bulk : namespace : table : region : file 1529 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1530 Iterator<String> i = parts.iterator(); 1531 int idx = 3; 1532 if (parts.size() == 4) { 1533 // the table is in default namespace 1534 idx = 2; 1535 } 1536 String region = Iterators.get(i, idx); 1537 LOG.debug("bulk row string " + rowStr + " region " + region); 1538 return region; 1539 } 1540 1541 /* 1542 * Used to query bulk loaded hfiles which have been copied by incremental backup 1543 * @param backupId the backup Id. It can be null when querying for all tables 1544 * @return the Scan object 1545 * @deprecated This method is broken if a backupId is specified - see HBASE-28715 1546 */ 1547 static Scan createScanForBulkLoadedFiles(String backupId) { 1548 Scan scan = new Scan(); 1549 byte[] startRow = 1550 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1551 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1552 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1553 scan.withStartRow(startRow); 1554 scan.withStopRow(stopRow); 1555 scan.addFamily(BackupSystemTable.META_FAMILY); 1556 scan.readVersions(1); 1557 return scan; 1558 } 1559 1560 /** 1561 * Creates Scan operation to load backup set list 1562 * @return scan operation 1563 */ 1564 private Scan createScanForBackupSetList() { 1565 Scan scan = new Scan(); 1566 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1567 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1568 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1569 scan.withStartRow(startRow); 1570 scan.withStopRow(stopRow); 1571 scan.addFamily(BackupSystemTable.META_FAMILY); 1572 return scan; 1573 } 1574 1575 /** 1576 * Creates Get operation to load backup set content 1577 * @return get operation 1578 */ 1579 private Get createGetForBackupSet(String name) { 1580 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1581 get.addFamily(BackupSystemTable.META_FAMILY); 1582 return get; 1583 } 1584 1585 /** 1586 * Creates Delete operation to delete backup set content 1587 * @param name backup set's name 1588 * @return delete operation 1589 */ 1590 private Delete createDeleteForBackupSet(String name) { 1591 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1592 del.addFamily(BackupSystemTable.META_FAMILY); 1593 return del; 1594 } 1595 1596 /** 1597 * Creates Put operation to update backup set content 1598 * @param name backup set's name 1599 * @param tables list of tables 1600 * @return put operation 1601 */ 1602 private Put createPutForBackupSet(String name, String[] tables) { 1603 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1604 byte[] value = convertToByteArray(tables); 1605 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1606 return put; 1607 } 1608 1609 private byte[] convertToByteArray(String[] tables) { 1610 return Bytes.toBytes(StringUtils.join(tables, ",")); 1611 } 1612 1613 /** 1614 * Converts cell to backup set list. 1615 * @param current current cell 1616 * @return backup set as array of table names 1617 */ 1618 private String[] cellValueToBackupSet(Cell current) { 1619 byte[] data = CellUtil.cloneValue(current); 1620 if (!ArrayUtils.isEmpty(data)) { 1621 return Bytes.toString(data).split(","); 1622 } 1623 return new String[0]; 1624 } 1625 1626 /** 1627 * Converts cell key to backup set name. 1628 * @param current current cell 1629 * @return backup set name 1630 */ 1631 private String cellKeyToBackupSetName(Cell current) { 1632 byte[] data = CellUtil.cloneRow(current); 1633 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1634 } 1635 1636 private static byte[] rowkey(String s, String... other) { 1637 StringBuilder sb = new StringBuilder(s); 1638 for (String ss : other) { 1639 sb.append(ss); 1640 } 1641 return Bytes.toBytes(sb.toString()); 1642 } 1643 1644 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1645 if (!admin.isTableEnabled(tableName)) { 1646 try { 1647 admin.enableTable(tableName); 1648 } catch (TableNotDisabledException ignored) { 1649 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1650 } 1651 } 1652 } 1653 1654 public enum Order { 1655 /** 1656 * Old backups first, most recents last. I.e. sorted by ascending backupId. 1657 */ 1658 OLD_TO_NEW, 1659 /** 1660 * New backups first, oldest last. I.e. sorted by descending backupId. 1661 */ 1662 NEW_TO_OLD 1663 } 1664}