001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot; 021import static org.apache.hadoop.hbase.backup.BackupInfo.withState; 022import static org.apache.hadoop.hbase.backup.BackupInfo.withType; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.nio.charset.StandardCharsets; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Objects; 040import java.util.Set; 041import java.util.TreeMap; 042import java.util.TreeSet; 043import java.util.function.Predicate; 044import java.util.stream.Collectors; 045import java.util.stream.Stream; 046import org.apache.commons.lang3.ArrayUtils; 047import org.apache.commons.lang3.StringUtils; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.NamespaceDescriptor; 054import org.apache.hadoop.hbase.NamespaceExistException; 055import org.apache.hadoop.hbase.ServerName; 056import org.apache.hadoop.hbase.TableExistsException; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.TableNotDisabledException; 059import org.apache.hadoop.hbase.backup.BackupInfo; 060import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 061import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 062import org.apache.hadoop.hbase.backup.BackupType; 063import org.apache.hadoop.hbase.client.Admin; 064import org.apache.hadoop.hbase.client.BufferedMutator; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 067import org.apache.hadoop.hbase.client.Connection; 068import org.apache.hadoop.hbase.client.Delete; 069import org.apache.hadoop.hbase.client.Get; 070import org.apache.hadoop.hbase.client.Put; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.SnapshotDescription; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 085import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 086import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 087 088import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 090 091/** 092 * This class provides API to access backup system table<br> 093 * Backup system table schema:<br> 094 * <p> 095 * <ul> 096 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 097 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 098 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 099 * include table; value=empty</li> 100 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 101 * timestamp]</li> 102 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 103 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 104 * name</li> 105 * </ul> 106 * </p> 107 */ 108@InterfaceAudience.Private 109public final class BackupSystemTable implements Closeable { 110 111 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 112 113 static class WALItem { 114 String backupId; 115 String walFile; 116 String backupRoot; 117 118 WALItem(String backupId, String walFile, String backupRoot) { 119 this.backupId = backupId; 120 this.walFile = walFile; 121 this.backupRoot = backupRoot; 122 } 123 124 public String getBackupId() { 125 return backupId; 126 } 127 128 public String getWalFile() { 129 return walFile; 130 } 131 132 public String getBackupRoot() { 133 return backupRoot; 134 } 135 136 @Override 137 public String toString() { 138 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 139 } 140 } 141 142 /** 143 * Backup system table (main) name 144 */ 145 private TableName tableName; 146 147 /** 148 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 149 * separate table because we have to isolate general backup operations: create, merge etc from 150 * activity of RegionObserver, which controls process of a bulk loading 151 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 152 */ 153 private TableName bulkLoadTableName; 154 155 /** 156 * Stores backup sessions (contexts) 157 */ 158 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 159 /** 160 * Stores other meta 161 */ 162 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 163 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 164 /** 165 * Connection to HBase cluster, shared among all instances 166 */ 167 private final Connection connection; 168 169 private final static String BACKUP_INFO_PREFIX = "session:"; 170 private final static String START_CODE_ROW = "startcode:"; 171 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 172 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 173 174 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 175 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 176 177 private final static String INCR_BACKUP_SET = "incrbackupset:"; 178 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 179 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 180 181 private final static String BULK_LOAD_PREFIX = "bulk:"; 182 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 183 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 184 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 185 186 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 187 final static byte[] FAM_COL = Bytes.toBytes("fam"); 188 final static byte[] PATH_COL = Bytes.toBytes("path"); 189 190 private final static String SET_KEY_PREFIX = "backupset:"; 191 192 // separator between BULK_LOAD_PREFIX and ordinals 193 private final static String BLK_LD_DELIM = ":"; 194 private final static byte[] EMPTY_VALUE = new byte[] {}; 195 196 // Safe delimiter in a string 197 private final static String NULL = "\u0000"; 198 199 public BackupSystemTable(Connection conn) throws IOException { 200 this.connection = conn; 201 Configuration conf = this.connection.getConfiguration(); 202 tableName = BackupSystemTable.getTableName(conf); 203 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 204 checkSystemTable(); 205 } 206 207 private void checkSystemTable() throws IOException { 208 try (Admin admin = connection.getAdmin()) { 209 verifyNamespaceExists(admin); 210 Configuration conf = connection.getConfiguration(); 211 if (!admin.tableExists(tableName)) { 212 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 213 createSystemTable(admin, backupHTD); 214 } 215 ensureTableEnabled(admin, tableName); 216 if (!admin.tableExists(bulkLoadTableName)) { 217 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 218 createSystemTable(admin, blHTD); 219 } 220 ensureTableEnabled(admin, bulkLoadTableName); 221 waitForSystemTable(admin, tableName); 222 waitForSystemTable(admin, bulkLoadTableName); 223 } 224 } 225 226 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 227 try { 228 admin.createTable(descriptor); 229 } catch (TableExistsException e) { 230 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 231 // so may be subject to race conditions where one caller succeeds in creating the 232 // table and others fail because it now exists 233 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 234 } 235 } 236 237 private void verifyNamespaceExists(Admin admin) throws IOException { 238 String namespaceName = tableName.getNamespaceAsString(); 239 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 240 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 241 boolean exists = false; 242 for (NamespaceDescriptor nsd : list) { 243 if (nsd.getName().equals(ns.getName())) { 244 exists = true; 245 break; 246 } 247 } 248 if (!exists) { 249 try { 250 admin.createNamespace(ns); 251 } catch (NamespaceExistException e) { 252 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 253 // so may be subject to race conditions where one caller succeeds in creating the 254 // namespace and others fail because it now exists 255 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 256 } 257 } 258 } 259 260 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 261 // Return fast if the table is available and avoid a log message 262 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 263 return; 264 } 265 long TIMEOUT = 60000; 266 long startTime = EnvironmentEdgeManager.currentTime(); 267 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 268 tableName); 269 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 270 try { 271 Thread.sleep(100); 272 } catch (InterruptedException e) { 273 throw (IOException) new InterruptedIOException().initCause(e); 274 } 275 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 276 throw new IOException( 277 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 278 } 279 } 280 LOG.debug("Backup table {} exists and available", tableName); 281 } 282 283 @Override 284 public void close() { 285 // do nothing 286 } 287 288 /** 289 * Updates status (state) of a backup session in backup system table table 290 * @param info backup info 291 * @throws IOException exception 292 */ 293 public void updateBackupInfo(BackupInfo info) throws IOException { 294 if (LOG.isTraceEnabled()) { 295 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 296 + " set status=" + info.getState()); 297 } 298 try (Table table = connection.getTable(tableName)) { 299 Put put = createPutForBackupInfo(info); 300 table.put(put); 301 } 302 } 303 304 /* 305 * @param backupId the backup Id 306 * @return Map of rows to path of bulk loaded hfile 307 */ 308 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 309 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 310 try (Table table = connection.getTable(bulkLoadTableName); 311 ResultScanner scanner = table.getScanner(scan)) { 312 Result res = null; 313 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 314 while ((res = scanner.next()) != null) { 315 res.advance(); 316 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 317 for (Cell cell : res.listCells()) { 318 if ( 319 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 320 BackupSystemTable.PATH_COL.length) == 0 321 ) { 322 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 323 } 324 } 325 } 326 return map; 327 } 328 } 329 330 /** 331 * Deletes backup status from backup system table table 332 * @param backupId backup id 333 * @throws IOException exception 334 */ 335 public void deleteBackupInfo(String backupId) throws IOException { 336 if (LOG.isTraceEnabled()) { 337 LOG.trace("delete backup status in backup system table for " + backupId); 338 } 339 try (Table table = connection.getTable(tableName)) { 340 Delete del = createDeleteForBackupInfo(backupId); 341 table.delete(del); 342 } 343 } 344 345 /** 346 * Registers a bulk load. 347 * @param tableName table name 348 * @param region the region receiving hfile 349 * @param cfToHfilePath column family and associated hfiles 350 */ 351 public void registerBulkLoad(TableName tableName, byte[] region, 352 Map<byte[], List<Path>> cfToHfilePath) throws IOException { 353 if (LOG.isDebugEnabled()) { 354 LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, 355 cfToHfilePath.size()); 356 } 357 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 358 List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); 359 bufferedMutator.mutate(puts); 360 LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName); 361 } 362 } 363 364 /** 365 * Removes entries from the table that tracks all bulk loaded hfiles. 366 * @param rows the row keys of the entries to be deleted 367 */ 368 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 369 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 370 List<Delete> deletes = new ArrayList<>(); 371 for (byte[] row : rows) { 372 Delete del = new Delete(row); 373 deletes.add(del); 374 LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row)); 375 } 376 bufferedMutator.mutate(deletes); 377 LOG.debug("Deleted {} bulk load entries.", rows.size()); 378 } 379 } 380 381 /** 382 * Reads all registered bulk loads. 383 */ 384 public List<BulkLoad> readBulkloadRows() throws IOException { 385 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null); 386 return processBulkLoadRowScan(scan); 387 } 388 389 /** 390 * Reads the registered bulk loads for the given tables. 391 */ 392 public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException { 393 List<BulkLoad> result = new ArrayList<>(); 394 for (TableName table : tableList) { 395 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); 396 result.addAll(processBulkLoadRowScan(scan)); 397 } 398 return result; 399 } 400 401 private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException { 402 List<BulkLoad> result = new ArrayList<>(); 403 try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); 404 ResultScanner scanner = bulkLoadTable.getScanner(scan)) { 405 Result res; 406 while ((res = scanner.next()) != null) { 407 res.advance(); 408 TableName table = null; 409 String fam = null; 410 String path = null; 411 String region = null; 412 byte[] row = null; 413 for (Cell cell : res.listCells()) { 414 row = CellUtil.cloneRow(cell); 415 String rowStr = Bytes.toString(row); 416 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 417 if ( 418 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 419 BackupSystemTable.TBL_COL.length) == 0 420 ) { 421 table = TableName.valueOf(CellUtil.cloneValue(cell)); 422 } else if ( 423 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 424 BackupSystemTable.FAM_COL.length) == 0 425 ) { 426 fam = Bytes.toString(CellUtil.cloneValue(cell)); 427 } else if ( 428 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 429 BackupSystemTable.PATH_COL.length) == 0 430 ) { 431 path = Bytes.toString(CellUtil.cloneValue(cell)); 432 } 433 } 434 result.add(new BulkLoad(table, region, fam, path, row)); 435 LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path); 436 } 437 } 438 return result; 439 } 440 441 /** 442 * Reads backup status object (instance of backup info) from backup system table table 443 * @param backupId backup id 444 * @return Current status of backup session or null 445 */ 446 public BackupInfo readBackupInfo(String backupId) throws IOException { 447 if (LOG.isTraceEnabled()) { 448 LOG.trace("read backup status from backup system table for: " + backupId); 449 } 450 451 try (Table table = connection.getTable(tableName)) { 452 Get get = createGetForBackupInfo(backupId); 453 Result res = table.get(get); 454 if (res.isEmpty()) { 455 return null; 456 } 457 return resultToBackupInfo(res); 458 } 459 } 460 461 /** 462 * Exclusive operations are: create, delete, merge 463 * @throws IOException if a table operation fails or an active backup exclusive operation is 464 * already underway 465 */ 466 public void startBackupExclusiveOperation() throws IOException { 467 LOG.debug("Start new backup exclusive operation"); 468 469 try (Table table = connection.getTable(tableName)) { 470 Put put = createPutForStartBackupSession(); 471 // First try to put if row does not exist 472 if ( 473 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 474 .ifNotExists().thenPut(put) 475 ) { 476 // Row exists, try to put if value == ACTIVE_SESSION_NO 477 if ( 478 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 479 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 480 ) { 481 throw new ExclusiveOperationException(); 482 } 483 } 484 } 485 } 486 487 private Put createPutForStartBackupSession() { 488 Put put = new Put(ACTIVE_SESSION_ROW); 489 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 490 return put; 491 } 492 493 public void finishBackupExclusiveOperation() throws IOException { 494 LOG.debug("Finish backup exclusive operation"); 495 496 try (Table table = connection.getTable(tableName)) { 497 Put put = createPutForStopBackupSession(); 498 if ( 499 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 500 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 501 ) { 502 throw new IOException("There is no active backup exclusive operation"); 503 } 504 } 505 } 506 507 private Put createPutForStopBackupSession() { 508 Put put = new Put(ACTIVE_SESSION_ROW); 509 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 510 return put; 511 } 512 513 /** 514 * Get the Region Servers log information after the last log roll from backup system table. 515 * @param backupRoot root directory path to backup 516 * @return RS log info 517 * @throws IOException exception 518 */ 519 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 520 throws IOException { 521 LOG.trace("read region server last roll log result to backup system table"); 522 523 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 524 525 try (Table table = connection.getTable(tableName); 526 ResultScanner scanner = table.getScanner(scan)) { 527 Result res; 528 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 529 while ((res = scanner.next()) != null) { 530 res.advance(); 531 Cell cell = res.current(); 532 byte[] row = CellUtil.cloneRow(cell); 533 String server = getServerNameForReadRegionServerLastLogRollResult(row); 534 byte[] data = CellUtil.cloneValue(cell); 535 rsTimestampMap.put(server, Bytes.toLong(data)); 536 } 537 return rsTimestampMap; 538 } 539 } 540 541 /** 542 * Writes Region Server last roll log result (timestamp) to backup system table table 543 * @param server Region Server name 544 * @param ts last log timestamp 545 * @param backupRoot root directory path to backup 546 * @throws IOException exception 547 */ 548 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 549 throws IOException { 550 LOG.trace("write region server last roll log result to backup system table"); 551 552 try (Table table = connection.getTable(tableName)) { 553 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 554 table.put(put); 555 } 556 } 557 558 /** 559 * Retrieve all table names that are part of any known completed backup 560 */ 561 public Set<TableName> getTablesIncludedInBackups() throws IOException { 562 // Incremental backups have the same tables as the preceding full backups 563 List<BackupInfo> infos = 564 getBackupHistory(withState(BackupState.COMPLETE), withType(BackupType.FULL)); 565 return infos.stream().flatMap(info -> info.getTableNames().stream()) 566 .collect(Collectors.toSet()); 567 } 568 569 /** 570 * Goes through all backup history corresponding to the provided root folder, and collects all 571 * backup info mentioning each of the provided tables. 572 * @param set the tables for which to collect the {@code BackupInfo} 573 * @param backupRoot backup destination path to retrieve backup history for 574 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 575 * least one {@code BackupInfo} 576 * @throws IOException if getting the backup history fails 577 */ 578 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 579 String backupRoot) throws IOException { 580 List<BackupInfo> history = getBackupHistory(withRoot(backupRoot)); 581 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 582 for (BackupInfo info : history) { 583 List<TableName> tables = info.getTableNames(); 584 for (TableName tableName : tables) { 585 if (set.contains(tableName)) { 586 List<BackupInfo> list = 587 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 588 list.add(info); 589 } 590 } 591 } 592 return tableHistoryMap; 593 } 594 595 /** 596 * Get all backup information passing the given filters, ordered by descending backupId. I.e. from 597 * newest to oldest. 598 */ 599 public List<BackupInfo> getBackupHistory(BackupInfo.Filter... toInclude) throws IOException { 600 return getBackupHistory(Order.NEW_TO_OLD, Integer.MAX_VALUE, toInclude); 601 } 602 603 /** 604 * Retrieves the first n entries of the sorted, filtered list of backup infos. 605 * @param order desired ordering of the results. 606 * @param n number of entries to return 607 */ 608 public List<BackupInfo> getBackupHistory(Order order, int n, BackupInfo.Filter... toInclude) 609 throws IOException { 610 Preconditions.checkArgument(n >= 0, "n should be >= 0"); 611 LOG.trace("get backup infos from backup system table"); 612 613 if (n == 0) { 614 return Collections.emptyList(); 615 } 616 617 Predicate<BackupInfo> combinedPredicate = Stream.of(toInclude) 618 .map(filter -> (Predicate<BackupInfo>) filter).reduce(Predicate::and).orElse(x -> true); 619 620 Scan scan = createScanForBackupHistory(order); 621 List<BackupInfo> list = new ArrayList<>(); 622 623 try (Table table = connection.getTable(tableName); 624 ResultScanner scanner = table.getScanner(scan)) { 625 Result res; 626 while ((res = scanner.next()) != null) { 627 res.advance(); 628 BackupInfo context = cellToBackupInfo(res.current()); 629 if (combinedPredicate.test(context)) { 630 list.add(context); 631 if (list.size() == n) { 632 break; 633 } 634 } 635 } 636 return list; 637 } 638 } 639 640 /** 641 * Write the current timestamps for each regionserver to backup system table after a successful 642 * full or incremental backup. The saved timestamp is of the last log file that was backed up 643 * already. 644 * @param tables tables 645 * @param newTimestamps timestamps 646 * @param backupRoot root directory path to backup 647 * @throws IOException exception 648 */ 649 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 650 String backupRoot) throws IOException { 651 if (LOG.isTraceEnabled()) { 652 LOG.trace("write RS log time stamps to backup system table for tables [" 653 + StringUtils.join(tables, ",") + "]"); 654 } 655 List<Put> puts = new ArrayList<>(); 656 for (TableName table : tables) { 657 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 658 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 659 puts.add(put); 660 } 661 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 662 bufferedMutator.mutate(puts); 663 } 664 } 665 666 /** 667 * Read the timestamp for each region server log after the last successful backup. Each table has 668 * its own set of the timestamps. The info is stored for each table as a concatenated string of 669 * rs->timestapmp 670 * @param backupRoot root directory path to backup 671 * @return the timestamp for each region server. key: tableName value: 672 * RegionServer,PreviousTimeStamp 673 * @throws IOException exception 674 */ 675 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 676 throws IOException { 677 if (LOG.isTraceEnabled()) { 678 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 679 } 680 681 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 682 683 Scan scan = createScanForReadLogTimestampMap(backupRoot); 684 try (Table table = connection.getTable(tableName); 685 ResultScanner scanner = table.getScanner(scan)) { 686 Result res; 687 while ((res = scanner.next()) != null) { 688 res.advance(); 689 Cell cell = res.current(); 690 byte[] row = CellUtil.cloneRow(cell); 691 String tabName = getTableNameForReadLogTimestampMap(row); 692 TableName tn = TableName.valueOf(tabName); 693 byte[] data = CellUtil.cloneValue(cell); 694 if (data == null) { 695 throw new IOException("Data of last backup data from backup system table " 696 + "is empty. Create a backup first."); 697 } 698 if (data != null && data.length > 0) { 699 HashMap<String, Long> lastBackup = 700 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 701 tableTimestampMap.put(tn, lastBackup); 702 } 703 } 704 return tableTimestampMap; 705 } 706 } 707 708 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 709 Map<String, Long> map) { 710 BackupProtos.TableServerTimestamp.Builder tstBuilder = 711 BackupProtos.TableServerTimestamp.newBuilder(); 712 tstBuilder 713 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 714 715 for (Entry<String, Long> entry : map.entrySet()) { 716 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 717 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 718 ServerName sn = ServerName.parseServerName(entry.getKey()); 719 snBuilder.setHostName(sn.getHostname()); 720 snBuilder.setPort(sn.getPort()); 721 builder.setServerName(snBuilder.build()); 722 builder.setTimestamp(entry.getValue()); 723 tstBuilder.addServerTimestamp(builder.build()); 724 } 725 726 return tstBuilder.build(); 727 } 728 729 private HashMap<String, Long> 730 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 731 732 HashMap<String, Long> map = new HashMap<>(); 733 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 734 for (BackupProtos.ServerTimestamp st : list) { 735 ServerName sn = 736 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 737 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 738 } 739 return map; 740 } 741 742 /** 743 * Return the current tables covered by incremental backup. 744 * @param backupRoot root directory path to backup 745 * @return set of tableNames 746 * @throws IOException exception 747 */ 748 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 749 LOG.trace("get incremental backup table set from backup system table"); 750 751 TreeSet<TableName> set = new TreeSet<>(); 752 753 try (Table table = connection.getTable(tableName)) { 754 Get get = createGetForIncrBackupTableSet(backupRoot); 755 Result res = table.get(get); 756 if (res.isEmpty()) { 757 return set; 758 } 759 List<Cell> cells = res.listCells(); 760 for (Cell cell : cells) { 761 // qualifier = table name - we use table names as qualifiers 762 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 763 } 764 return set; 765 } 766 } 767 768 /** 769 * Add tables to global incremental backup set 770 * @param tables set of tables 771 * @param backupRoot root directory path to backup 772 * @throws IOException exception 773 */ 774 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 775 throws IOException { 776 if (LOG.isTraceEnabled()) { 777 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 778 + " tables [" + StringUtils.join(tables, " ") + "]"); 779 } 780 if (LOG.isDebugEnabled()) { 781 tables.forEach(table -> LOG.debug(Objects.toString(table))); 782 } 783 try (Table table = connection.getTable(tableName)) { 784 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 785 table.put(put); 786 } 787 } 788 789 /** 790 * Deletes incremental backup set for a backup destination 791 * @param backupRoot backup root 792 */ 793 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 794 if (LOG.isTraceEnabled()) { 795 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 796 } 797 try (Table table = connection.getTable(tableName)) { 798 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 799 table.delete(delete); 800 } 801 } 802 803 /** 804 * Checks if we have at least one backup session in backup system table This API is used by 805 * BackupLogCleaner 806 * @return true, if at least one session exists in backup system table 807 * @throws IOException exception 808 */ 809 public boolean hasBackupSessions() throws IOException { 810 LOG.trace("Has backup sessions from backup system table"); 811 812 Scan scan = createScanForBackupHistory(Order.OLD_TO_NEW); 813 scan.setCaching(1); 814 try (Table table = connection.getTable(tableName); 815 ResultScanner scanner = table.getScanner(scan)) { 816 return scanner.next() != null; 817 } 818 } 819 820 /** 821 * BACKUP SETS 822 */ 823 824 /** 825 * Get backup set list 826 * @return backup set list 827 * @throws IOException if a table or scanner operation fails 828 */ 829 public List<String> listBackupSets() throws IOException { 830 LOG.trace("Backup set list"); 831 832 List<String> list = new ArrayList<>(); 833 try (Table table = connection.getTable(tableName)) { 834 Scan scan = createScanForBackupSetList(); 835 scan.readVersions(1); 836 try (ResultScanner scanner = table.getScanner(scan)) { 837 Result res; 838 while ((res = scanner.next()) != null) { 839 res.advance(); 840 list.add(cellKeyToBackupSetName(res.current())); 841 } 842 return list; 843 } 844 } 845 } 846 847 /** 848 * Get backup set description (list of tables) 849 * @param name set's name 850 * @return list of tables in a backup set 851 * @throws IOException if a table operation fails 852 */ 853 public List<TableName> describeBackupSet(String name) throws IOException { 854 if (LOG.isTraceEnabled()) { 855 LOG.trace(" Backup set describe: " + name); 856 } 857 try (Table table = connection.getTable(tableName)) { 858 Get get = createGetForBackupSet(name); 859 Result res = table.get(get); 860 if (res.isEmpty()) { 861 return null; 862 } 863 res.advance(); 864 String[] tables = cellValueToBackupSet(res.current()); 865 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 866 .collect(Collectors.toList()); 867 } 868 } 869 870 /** 871 * Add backup set (list of tables) 872 * @param name set name 873 * @param newTables list of tables, comma-separated 874 * @throws IOException if a table operation fails 875 */ 876 public void addToBackupSet(String name, String[] newTables) throws IOException { 877 if (LOG.isTraceEnabled()) { 878 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 879 } 880 String[] union = null; 881 try (Table table = connection.getTable(tableName)) { 882 Get get = createGetForBackupSet(name); 883 Result res = table.get(get); 884 if (res.isEmpty()) { 885 union = newTables; 886 } else { 887 res.advance(); 888 String[] tables = cellValueToBackupSet(res.current()); 889 union = merge(tables, newTables); 890 } 891 Put put = createPutForBackupSet(name, union); 892 table.put(put); 893 } 894 } 895 896 /** 897 * Remove tables from backup set (list of tables) 898 * @param name set name 899 * @param toRemove list of tables 900 * @throws IOException if a table operation or deleting the backup set fails 901 */ 902 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 903 if (LOG.isTraceEnabled()) { 904 LOG.trace( 905 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 906 } 907 String[] disjoint; 908 String[] tables; 909 try (Table table = connection.getTable(tableName)) { 910 Get get = createGetForBackupSet(name); 911 Result res = table.get(get); 912 if (res.isEmpty()) { 913 LOG.warn("Backup set '" + name + "' not found."); 914 return; 915 } else { 916 res.advance(); 917 tables = cellValueToBackupSet(res.current()); 918 disjoint = disjoin(tables, toRemove); 919 } 920 if (disjoint.length > 0 && disjoint.length != tables.length) { 921 Put put = createPutForBackupSet(name, disjoint); 922 table.put(put); 923 } else if (disjoint.length == tables.length) { 924 LOG.warn("Backup set '" + name + "' does not contain tables [" 925 + StringUtils.join(toRemove, " ") + "]"); 926 } else { // disjoint.length == 0 and tables.length >0 927 // Delete backup set 928 LOG.info("Backup set '" + name + "' is empty. Deleting."); 929 deleteBackupSet(name); 930 } 931 } 932 } 933 934 private String[] merge(String[] existingTables, String[] newTables) { 935 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 936 tables.addAll(Arrays.asList(newTables)); 937 return tables.toArray(new String[0]); 938 } 939 940 private String[] disjoin(String[] existingTables, String[] toRemove) { 941 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 942 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 943 return tables.toArray(new String[0]); 944 } 945 946 /** 947 * Delete backup set 948 * @param name set's name 949 * @throws IOException if getting or deleting the table fails 950 */ 951 public void deleteBackupSet(String name) throws IOException { 952 if (LOG.isTraceEnabled()) { 953 LOG.trace(" Backup set delete: " + name); 954 } 955 try (Table table = connection.getTable(tableName)) { 956 Delete del = createDeleteForBackupSet(name); 957 table.delete(del); 958 } 959 } 960 961 /** 962 * Get backup system table descriptor 963 * @return table's descriptor 964 */ 965 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 966 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 967 968 ColumnFamilyDescriptorBuilder colBuilder = 969 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 970 971 colBuilder.setMaxVersions(1); 972 Configuration config = HBaseConfiguration.create(); 973 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 974 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 975 colBuilder.setTimeToLive(ttl); 976 977 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 978 builder.setColumnFamily(colSessionsDesc); 979 980 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 981 colBuilder.setTimeToLive(ttl); 982 builder.setColumnFamily(colBuilder.build()); 983 return builder.build(); 984 } 985 986 public static TableName getTableName(Configuration conf) { 987 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 988 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 989 return TableName.valueOf(name); 990 } 991 992 public static String getTableNameAsString(Configuration conf) { 993 return getTableName(conf).getNameAsString(); 994 } 995 996 public static String getSnapshotName(Configuration conf) { 997 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 998 } 999 1000 /** 1001 * Get backup system table descriptor 1002 * @return table's descriptor 1003 */ 1004 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1005 TableDescriptorBuilder builder = 1006 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1007 1008 ColumnFamilyDescriptorBuilder colBuilder = 1009 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1010 colBuilder.setMaxVersions(1); 1011 Configuration config = HBaseConfiguration.create(); 1012 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1013 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1014 colBuilder.setTimeToLive(ttl); 1015 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1016 builder.setColumnFamily(colSessionsDesc); 1017 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1018 colBuilder.setTimeToLive(ttl); 1019 builder.setColumnFamily(colBuilder.build()); 1020 return builder.build(); 1021 } 1022 1023 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1024 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1025 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1026 return TableName.valueOf(name); 1027 } 1028 1029 /** 1030 * Creates Put operation for a given backup info object 1031 * @param context backup info 1032 * @return put operation 1033 * @throws IOException exception 1034 */ 1035 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1036 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1037 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1038 context.toByteArray()); 1039 return put; 1040 } 1041 1042 /** 1043 * Creates Get operation for a given backup id 1044 * @param backupId backup's ID 1045 * @return get operation 1046 * @throws IOException exception 1047 */ 1048 private Get createGetForBackupInfo(String backupId) throws IOException { 1049 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1050 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1051 get.readVersions(1); 1052 return get; 1053 } 1054 1055 /** 1056 * Creates Delete operation for a given backup id 1057 * @param backupId backup's ID 1058 * @return delete operation 1059 */ 1060 private Delete createDeleteForBackupInfo(String backupId) { 1061 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1062 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1063 return del; 1064 } 1065 1066 /** 1067 * Converts Result to BackupInfo 1068 * @param res HBase result 1069 * @return backup info instance 1070 * @throws IOException exception 1071 */ 1072 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1073 res.advance(); 1074 Cell cell = res.current(); 1075 return cellToBackupInfo(cell); 1076 } 1077 1078 /** 1079 * Creates Get to retrieve incremental backup table set from backup system table 1080 * @return get operation 1081 * @throws IOException exception 1082 */ 1083 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1084 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1085 get.addFamily(BackupSystemTable.META_FAMILY); 1086 get.readVersions(1); 1087 return get; 1088 } 1089 1090 /** 1091 * Creates Put to store incremental backup table set 1092 * @param tables tables 1093 * @return put operation 1094 */ 1095 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1096 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1097 for (TableName table : tables) { 1098 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1099 EMPTY_VALUE); 1100 } 1101 return put; 1102 } 1103 1104 /** 1105 * Creates Delete for incremental backup table set 1106 * @param backupRoot backup root 1107 * @return delete operation 1108 */ 1109 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1110 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1111 delete.addFamily(BackupSystemTable.META_FAMILY); 1112 return delete; 1113 } 1114 1115 /** 1116 * Creates Scan operation to load backup history 1117 * @param order order of the scan results 1118 * @return scan operation 1119 */ 1120 private Scan createScanForBackupHistory(Order order) { 1121 Scan scan = new Scan(); 1122 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1123 if (order == Order.NEW_TO_OLD) { 1124 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1125 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1126 scan.setReversed(true); 1127 scan.withStartRow(stopRow, false); 1128 scan.withStopRow(startRow); 1129 } else if (order == Order.OLD_TO_NEW) { 1130 scan.setStartStopRowForPrefixScan(startRow); 1131 } else { 1132 throw new IllegalArgumentException("Unsupported order: " + order); 1133 } 1134 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1135 scan.readVersions(1); 1136 return scan; 1137 } 1138 1139 /** 1140 * Converts cell to backup info instance. 1141 * @param current current cell 1142 * @return backup backup info instance 1143 * @throws IOException exception 1144 */ 1145 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1146 byte[] data = CellUtil.cloneValue(current); 1147 return BackupInfo.fromByteArray(data); 1148 } 1149 1150 /** 1151 * Creates Put to write RS last roll log timestamp map 1152 * @param table table 1153 * @param smap map, containing RS:ts 1154 * @return put operation 1155 */ 1156 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1157 String backupRoot) { 1158 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1159 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1160 return put; 1161 } 1162 1163 /** 1164 * Creates Scan to load table-> { RS -> ts} map of maps 1165 * @return scan operation 1166 */ 1167 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1168 Scan scan = new Scan(); 1169 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1170 scan.addFamily(BackupSystemTable.META_FAMILY); 1171 1172 return scan; 1173 } 1174 1175 /** 1176 * Get table name from rowkey 1177 * @param cloneRow rowkey 1178 * @return table name 1179 */ 1180 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1181 String s = Bytes.toString(cloneRow); 1182 int index = s.lastIndexOf(NULL); 1183 return s.substring(index + 1); 1184 } 1185 1186 /** 1187 * Creates Put to store RS last log result 1188 * @param server server name 1189 * @param timestamp log roll result (timestamp) 1190 * @return put operation 1191 */ 1192 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1193 String backupRoot) { 1194 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1195 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1196 Bytes.toBytes(timestamp)); 1197 return put; 1198 } 1199 1200 /** 1201 * Creates Scan operation to load last RS log roll results 1202 * @return scan operation 1203 */ 1204 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1205 Scan scan = new Scan(); 1206 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1207 scan.addFamily(BackupSystemTable.META_FAMILY); 1208 scan.readVersions(1); 1209 1210 return scan; 1211 } 1212 1213 /** 1214 * Get server's name from rowkey 1215 * @param row rowkey 1216 * @return server's name 1217 */ 1218 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1219 String s = Bytes.toString(row); 1220 int index = s.lastIndexOf(NULL); 1221 return s.substring(index + 1); 1222 } 1223 1224 /** 1225 * Creates Put's for bulk loads. 1226 */ 1227 private static List<Put> createPutForBulkLoad(TableName table, byte[] region, 1228 Map<byte[], List<Path>> columnFamilyToHFilePaths) { 1229 List<Put> puts = new ArrayList<>(); 1230 for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) { 1231 for (Path path : entry.getValue()) { 1232 String file = path.toString(); 1233 int lastSlash = file.lastIndexOf("/"); 1234 String filename = file.substring(lastSlash + 1); 1235 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1236 Bytes.toString(region), BLK_LD_DELIM, filename)); 1237 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1238 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1239 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1240 puts.add(put); 1241 LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); 1242 } 1243 } 1244 return puts; 1245 } 1246 1247 public static void snapshot(Connection conn) throws IOException { 1248 try (Admin admin = conn.getAdmin()) { 1249 Configuration conf = conn.getConfiguration(); 1250 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1251 } 1252 } 1253 1254 public static void restoreFromSnapshot(Connection conn) throws IOException { 1255 Configuration conf = conn.getConfiguration(); 1256 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1257 try (Admin admin = conn.getAdmin()) { 1258 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1259 if (snapshotExists(admin, snapshotName)) { 1260 admin.restoreBackupSystemTable(snapshotName); 1261 LOG.debug("Done restoring backup system table"); 1262 } else { 1263 // Snapshot does not exists, i.e completeBackup failed after 1264 // deleting backup system table snapshot 1265 // In this case we log WARN and proceed 1266 LOG.warn( 1267 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1268 } 1269 } 1270 } 1271 1272 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1273 List<SnapshotDescription> list = admin.listSnapshots(); 1274 for (SnapshotDescription desc : list) { 1275 if (desc.getName().equals(snapshotName)) { 1276 return true; 1277 } 1278 } 1279 return false; 1280 } 1281 1282 public static boolean snapshotExists(Connection conn) throws IOException { 1283 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1284 } 1285 1286 public static void deleteSnapshot(Connection conn) throws IOException { 1287 Configuration conf = conn.getConfiguration(); 1288 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1289 try (Admin admin = conn.getAdmin()) { 1290 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1291 if (snapshotExists(admin, snapshotName)) { 1292 admin.deleteSnapshot(snapshotName); 1293 LOG.debug("Done deleting backup system table snapshot"); 1294 } else { 1295 LOG.error("Snapshot " + snapshotName + " does not exists"); 1296 } 1297 } 1298 } 1299 1300 private Put createPutForDeleteOperation(String[] backupIdList) { 1301 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1302 Put put = new Put(DELETE_OP_ROW); 1303 put.addColumn(META_FAMILY, FAM_COL, value); 1304 return put; 1305 } 1306 1307 private Delete createDeleteForBackupDeleteOperation() { 1308 Delete delete = new Delete(DELETE_OP_ROW); 1309 delete.addFamily(META_FAMILY); 1310 return delete; 1311 } 1312 1313 private Get createGetForDeleteOperation() { 1314 Get get = new Get(DELETE_OP_ROW); 1315 get.addFamily(META_FAMILY); 1316 return get; 1317 } 1318 1319 public void startDeleteOperation(String[] backupIdList) throws IOException { 1320 if (LOG.isTraceEnabled()) { 1321 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1322 } 1323 Put put = createPutForDeleteOperation(backupIdList); 1324 try (Table table = connection.getTable(tableName)) { 1325 table.put(put); 1326 } 1327 } 1328 1329 public void finishDeleteOperation() throws IOException { 1330 LOG.trace("Finsih delete operation for backup ids"); 1331 1332 Delete delete = createDeleteForBackupDeleteOperation(); 1333 try (Table table = connection.getTable(tableName)) { 1334 table.delete(delete); 1335 } 1336 } 1337 1338 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1339 LOG.trace("Get delete operation for backup ids"); 1340 1341 Get get = createGetForDeleteOperation(); 1342 try (Table table = connection.getTable(tableName)) { 1343 Result res = table.get(get); 1344 if (res.isEmpty()) { 1345 return null; 1346 } 1347 Cell cell = res.listCells().get(0); 1348 byte[] val = CellUtil.cloneValue(cell); 1349 if (val.length == 0) { 1350 return null; 1351 } 1352 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1353 .toArray(String[]::new); 1354 } 1355 } 1356 1357 private Put createPutForMergeOperation(String[] backupIdList) { 1358 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1359 Put put = new Put(MERGE_OP_ROW); 1360 put.addColumn(META_FAMILY, FAM_COL, value); 1361 return put; 1362 } 1363 1364 public boolean isMergeInProgress() throws IOException { 1365 Get get = new Get(MERGE_OP_ROW); 1366 try (Table table = connection.getTable(tableName)) { 1367 Result res = table.get(get); 1368 return !res.isEmpty(); 1369 } 1370 } 1371 1372 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1373 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1374 Put put = new Put(MERGE_OP_ROW); 1375 put.addColumn(META_FAMILY, PATH_COL, value); 1376 return put; 1377 } 1378 1379 private Delete createDeleteForBackupMergeOperation() { 1380 Delete delete = new Delete(MERGE_OP_ROW); 1381 delete.addFamily(META_FAMILY); 1382 return delete; 1383 } 1384 1385 private Get createGetForMergeOperation() { 1386 Get get = new Get(MERGE_OP_ROW); 1387 get.addFamily(META_FAMILY); 1388 return get; 1389 } 1390 1391 public void startMergeOperation(String[] backupIdList) throws IOException { 1392 if (LOG.isTraceEnabled()) { 1393 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1394 } 1395 Put put = createPutForMergeOperation(backupIdList); 1396 try (Table table = connection.getTable(tableName)) { 1397 table.put(put); 1398 } 1399 } 1400 1401 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1402 if (LOG.isTraceEnabled()) { 1403 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1404 } 1405 Put put = createPutForUpdateTablesForMerge(tables); 1406 try (Table table = connection.getTable(tableName)) { 1407 table.put(put); 1408 } 1409 } 1410 1411 public void finishMergeOperation() throws IOException { 1412 LOG.trace("Finish merge operation for backup ids"); 1413 1414 Delete delete = createDeleteForBackupMergeOperation(); 1415 try (Table table = connection.getTable(tableName)) { 1416 table.delete(delete); 1417 } 1418 } 1419 1420 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1421 LOG.trace("Get backup ids for merge operation"); 1422 1423 Get get = createGetForMergeOperation(); 1424 try (Table table = connection.getTable(tableName)) { 1425 Result res = table.get(get); 1426 if (res.isEmpty()) { 1427 return null; 1428 } 1429 Cell cell = res.listCells().get(0); 1430 byte[] val = CellUtil.cloneValue(cell); 1431 if (val.length == 0) { 1432 return null; 1433 } 1434 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1435 .toArray(String[]::new); 1436 } 1437 } 1438 1439 /** 1440 * Creates a scan to read all registered bulk loads for the given table, or for all tables if 1441 * {@code table} is {@code null}. 1442 */ 1443 static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) { 1444 Scan scan = new Scan(); 1445 byte[] startRow = table == null 1446 ? BULK_LOAD_PREFIX_BYTES 1447 : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1448 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1449 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1450 scan.withStartRow(startRow); 1451 scan.withStopRow(stopRow); 1452 scan.addFamily(BackupSystemTable.META_FAMILY); 1453 scan.readVersions(1); 1454 return scan; 1455 } 1456 1457 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1458 // format is bulk : namespace : table : region : file 1459 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1460 } 1461 1462 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1463 // format is bulk : namespace : table : region : file 1464 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1465 Iterator<String> i = parts.iterator(); 1466 int idx = 3; 1467 if (parts.size() == 4) { 1468 // the table is in default namespace 1469 idx = 2; 1470 } 1471 String region = Iterators.get(i, idx); 1472 LOG.debug("bulk row string " + rowStr + " region " + region); 1473 return region; 1474 } 1475 1476 /* 1477 * Used to query bulk loaded hfiles which have been copied by incremental backup 1478 * @param backupId the backup Id. It can be null when querying for all tables 1479 * @return the Scan object 1480 * @deprecated This method is broken if a backupId is specified - see HBASE-28715 1481 */ 1482 static Scan createScanForBulkLoadedFiles(String backupId) { 1483 Scan scan = new Scan(); 1484 byte[] startRow = 1485 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1486 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1487 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1488 scan.withStartRow(startRow); 1489 scan.withStopRow(stopRow); 1490 scan.addFamily(BackupSystemTable.META_FAMILY); 1491 scan.readVersions(1); 1492 return scan; 1493 } 1494 1495 /** 1496 * Creates Scan operation to load backup set list 1497 * @return scan operation 1498 */ 1499 private Scan createScanForBackupSetList() { 1500 Scan scan = new Scan(); 1501 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1502 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1503 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1504 scan.withStartRow(startRow); 1505 scan.withStopRow(stopRow); 1506 scan.addFamily(BackupSystemTable.META_FAMILY); 1507 return scan; 1508 } 1509 1510 /** 1511 * Creates Get operation to load backup set content 1512 * @return get operation 1513 */ 1514 private Get createGetForBackupSet(String name) { 1515 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1516 get.addFamily(BackupSystemTable.META_FAMILY); 1517 return get; 1518 } 1519 1520 /** 1521 * Creates Delete operation to delete backup set content 1522 * @param name backup set's name 1523 * @return delete operation 1524 */ 1525 private Delete createDeleteForBackupSet(String name) { 1526 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1527 del.addFamily(BackupSystemTable.META_FAMILY); 1528 return del; 1529 } 1530 1531 /** 1532 * Creates Put operation to update backup set content 1533 * @param name backup set's name 1534 * @param tables list of tables 1535 * @return put operation 1536 */ 1537 private Put createPutForBackupSet(String name, String[] tables) { 1538 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1539 byte[] value = convertToByteArray(tables); 1540 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1541 return put; 1542 } 1543 1544 private byte[] convertToByteArray(String[] tables) { 1545 return Bytes.toBytes(StringUtils.join(tables, ",")); 1546 } 1547 1548 /** 1549 * Converts cell to backup set list. 1550 * @param current current cell 1551 * @return backup set as array of table names 1552 */ 1553 private String[] cellValueToBackupSet(Cell current) { 1554 byte[] data = CellUtil.cloneValue(current); 1555 if (!ArrayUtils.isEmpty(data)) { 1556 return Bytes.toString(data).split(","); 1557 } 1558 return new String[0]; 1559 } 1560 1561 /** 1562 * Converts cell key to backup set name. 1563 * @param current current cell 1564 * @return backup set name 1565 */ 1566 private String cellKeyToBackupSetName(Cell current) { 1567 byte[] data = CellUtil.cloneRow(current); 1568 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1569 } 1570 1571 private static byte[] rowkey(String s, String... other) { 1572 StringBuilder sb = new StringBuilder(s); 1573 for (String ss : other) { 1574 sb.append(ss); 1575 } 1576 return Bytes.toBytes(sb.toString()); 1577 } 1578 1579 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1580 if (!admin.isTableEnabled(tableName)) { 1581 try { 1582 admin.enableTable(tableName); 1583 } catch (TableNotDisabledException ignored) { 1584 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1585 } 1586 } 1587 } 1588 1589 public enum Order { 1590 /** 1591 * Old backups first, most recents last. I.e. sorted by ascending backupId. 1592 */ 1593 OLD_TO_NEW, 1594 /** 1595 * New backups first, oldest last. I.e. sorted by descending backupId. 1596 */ 1597 NEW_TO_OLD 1598 } 1599}