001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot; 021import static org.apache.hadoop.hbase.backup.BackupInfo.withState; 022import static org.apache.hadoop.hbase.backup.BackupInfo.withType; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.nio.charset.StandardCharsets; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.Comparator; 034import java.util.HashMap; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.Map.Entry; 040import java.util.Objects; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.TreeSet; 044import java.util.function.Predicate; 045import java.util.stream.Collectors; 046import java.util.stream.Stream; 047import org.apache.commons.lang3.ArrayUtils; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.NamespaceDescriptor; 055import org.apache.hadoop.hbase.NamespaceExistException; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableExistsException; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.TableNotDisabledException; 060import org.apache.hadoop.hbase.backup.BackupInfo; 061import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 062import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 063import org.apache.hadoop.hbase.backup.BackupType; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.BufferedMutator; 066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 068import org.apache.hadoop.hbase.client.Connection; 069import org.apache.hadoop.hbase.client.Delete; 070import org.apache.hadoop.hbase.client.Get; 071import org.apache.hadoop.hbase.client.Put; 072import org.apache.hadoop.hbase.client.Result; 073import org.apache.hadoop.hbase.client.ResultScanner; 074import org.apache.hadoop.hbase.client.Scan; 075import org.apache.hadoop.hbase.client.SnapshotDescription; 076import org.apache.hadoop.hbase.client.Table; 077import org.apache.hadoop.hbase.client.TableDescriptor; 078import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.yetus.audience.InterfaceAudience; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 086import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 087 088import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 090 091/** 092 * This class provides API to access backup system table<br> 093 * Backup system table schema:<br> 094 * <p> 095 * <ul> 096 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 097 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 098 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 099 * include table; value=empty</li> 100 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 101 * timestamp]</li> 102 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 103 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 104 * name</li> 105 * </ul> 106 * </p> 107 */ 108@InterfaceAudience.Private 109public final class BackupSystemTable implements Closeable { 110 111 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 112 113 static class WALItem { 114 String backupId; 115 String walFile; 116 String backupRoot; 117 118 WALItem(String backupId, String walFile, String backupRoot) { 119 this.backupId = backupId; 120 this.walFile = walFile; 121 this.backupRoot = backupRoot; 122 } 123 124 public String getBackupId() { 125 return backupId; 126 } 127 128 public String getWalFile() { 129 return walFile; 130 } 131 132 public String getBackupRoot() { 133 return backupRoot; 134 } 135 136 @Override 137 public String toString() { 138 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 139 } 140 } 141 142 /** 143 * Backup system table (main) name 144 */ 145 private TableName tableName; 146 147 /** 148 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 149 * separate table because we have to isolate general backup operations: create, merge etc from 150 * activity of RegionObserver, which controls process of a bulk loading 151 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 152 */ 153 private TableName bulkLoadTableName; 154 155 /** 156 * Stores backup sessions (contexts) 157 */ 158 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 159 /** 160 * Stores other meta 161 */ 162 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 163 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 164 /** 165 * Connection to HBase cluster, shared among all instances 166 */ 167 private final Connection connection; 168 169 private final static String BACKUP_INFO_PREFIX = "session:"; 170 private final static String START_CODE_ROW = "startcode:"; 171 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 172 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 173 174 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 175 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 176 177 private final static String INCR_BACKUP_SET = "incrbackupset:"; 178 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 179 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 180 181 private final static String BULK_LOAD_PREFIX = "bulk:"; 182 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 183 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 184 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 185 186 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 187 final static byte[] FAM_COL = Bytes.toBytes("fam"); 188 final static byte[] PATH_COL = Bytes.toBytes("path"); 189 190 private final static String SET_KEY_PREFIX = "backupset:"; 191 192 // separator between BULK_LOAD_PREFIX and ordinals 193 private final static String BLK_LD_DELIM = ":"; 194 private final static byte[] EMPTY_VALUE = new byte[] {}; 195 196 // Safe delimiter in a string 197 private final static String NULL = "\u0000"; 198 199 public BackupSystemTable(Connection conn) throws IOException { 200 this.connection = conn; 201 Configuration conf = this.connection.getConfiguration(); 202 tableName = BackupSystemTable.getTableName(conf); 203 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 204 checkSystemTable(); 205 } 206 207 private void checkSystemTable() throws IOException { 208 try (Admin admin = connection.getAdmin()) { 209 verifyNamespaceExists(admin); 210 Configuration conf = connection.getConfiguration(); 211 if (!admin.tableExists(tableName)) { 212 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 213 createSystemTable(admin, backupHTD); 214 } 215 ensureTableEnabled(admin, tableName); 216 if (!admin.tableExists(bulkLoadTableName)) { 217 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 218 createSystemTable(admin, blHTD); 219 } 220 ensureTableEnabled(admin, bulkLoadTableName); 221 waitForSystemTable(admin, tableName); 222 waitForSystemTable(admin, bulkLoadTableName); 223 } 224 } 225 226 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 227 try { 228 admin.createTable(descriptor); 229 } catch (TableExistsException e) { 230 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 231 // so may be subject to race conditions where one caller succeeds in creating the 232 // table and others fail because it now exists 233 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 234 } 235 } 236 237 private void verifyNamespaceExists(Admin admin) throws IOException { 238 String namespaceName = tableName.getNamespaceAsString(); 239 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 240 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 241 boolean exists = false; 242 for (NamespaceDescriptor nsd : list) { 243 if (nsd.getName().equals(ns.getName())) { 244 exists = true; 245 break; 246 } 247 } 248 if (!exists) { 249 try { 250 admin.createNamespace(ns); 251 } catch (NamespaceExistException e) { 252 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 253 // so may be subject to race conditions where one caller succeeds in creating the 254 // namespace and others fail because it now exists 255 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 256 } 257 } 258 } 259 260 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 261 // Return fast if the table is available and avoid a log message 262 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 263 return; 264 } 265 long TIMEOUT = 60000; 266 long startTime = EnvironmentEdgeManager.currentTime(); 267 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 268 tableName); 269 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 270 try { 271 Thread.sleep(100); 272 } catch (InterruptedException e) { 273 throw (IOException) new InterruptedIOException().initCause(e); 274 } 275 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 276 throw new IOException( 277 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 278 } 279 } 280 LOG.debug("Backup table {} exists and available", tableName); 281 } 282 283 @Override 284 public void close() { 285 // do nothing 286 } 287 288 /** 289 * Updates status (state) of a backup session in backup system table table 290 * @param info backup info 291 * @throws IOException exception 292 */ 293 public void updateBackupInfo(BackupInfo info) throws IOException { 294 if (LOG.isTraceEnabled()) { 295 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 296 + " set status=" + info.getState()); 297 } 298 try (Table table = connection.getTable(tableName)) { 299 Put put = createPutForBackupInfo(info); 300 table.put(put); 301 } 302 } 303 304 /* 305 * @param backupId the backup Id 306 * @return Map of rows to path of bulk loaded hfile 307 */ 308 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 309 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 310 try (Table table = connection.getTable(bulkLoadTableName); 311 ResultScanner scanner = table.getScanner(scan)) { 312 Result res = null; 313 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 314 while ((res = scanner.next()) != null) { 315 res.advance(); 316 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 317 for (Cell cell : res.listCells()) { 318 if ( 319 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 320 BackupSystemTable.PATH_COL.length) == 0 321 ) { 322 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 323 } 324 } 325 } 326 return map; 327 } 328 } 329 330 /** 331 * Deletes backup status from backup system table table 332 * @param backupId backup id 333 * @throws IOException exception 334 */ 335 public void deleteBackupInfo(String backupId) throws IOException { 336 if (LOG.isTraceEnabled()) { 337 LOG.trace("delete backup status in backup system table for " + backupId); 338 } 339 try (Table table = connection.getTable(tableName)) { 340 Delete del = createDeleteForBackupInfo(backupId); 341 table.delete(del); 342 } 343 } 344 345 /** 346 * Registers a bulk load. 347 * @param tableName table name 348 * @param region the region receiving hfile 349 * @param cfToHfilePath column family and associated hfiles 350 */ 351 public void registerBulkLoad(TableName tableName, byte[] region, 352 Map<byte[], List<Path>> cfToHfilePath) throws IOException { 353 if (LOG.isDebugEnabled()) { 354 LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, 355 cfToHfilePath.size()); 356 } 357 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 358 List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); 359 bufferedMutator.mutate(puts); 360 LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName); 361 } 362 } 363 364 /** 365 * Removes entries from the table that tracks all bulk loaded hfiles. 366 * @param rows the row keys of the entries to be deleted 367 */ 368 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 369 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 370 List<Delete> deletes = new ArrayList<>(); 371 for (byte[] row : rows) { 372 Delete del = new Delete(row); 373 deletes.add(del); 374 LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row)); 375 } 376 bufferedMutator.mutate(deletes); 377 LOG.debug("Deleted {} bulk load entries.", rows.size()); 378 } 379 } 380 381 /** 382 * Reads all registered bulk loads. 383 */ 384 public List<BulkLoad> readBulkloadRows() throws IOException { 385 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null); 386 return processBulkLoadRowScan(scan); 387 } 388 389 /** 390 * Reads the registered bulk loads for the given tables. 391 */ 392 public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException { 393 List<BulkLoad> result = new ArrayList<>(); 394 for (TableName table : tableList) { 395 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); 396 result.addAll(processBulkLoadRowScan(scan)); 397 } 398 return result; 399 } 400 401 private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException { 402 List<BulkLoad> result = new ArrayList<>(); 403 try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); 404 ResultScanner scanner = bulkLoadTable.getScanner(scan)) { 405 Result res; 406 while ((res = scanner.next()) != null) { 407 res.advance(); 408 TableName table = null; 409 String fam = null; 410 String path = null; 411 String region = null; 412 byte[] row = null; 413 for (Cell cell : res.listCells()) { 414 row = CellUtil.cloneRow(cell); 415 String rowStr = Bytes.toString(row); 416 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 417 if ( 418 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 419 BackupSystemTable.TBL_COL.length) == 0 420 ) { 421 table = TableName.valueOf(CellUtil.cloneValue(cell)); 422 } else if ( 423 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 424 BackupSystemTable.FAM_COL.length) == 0 425 ) { 426 fam = Bytes.toString(CellUtil.cloneValue(cell)); 427 } else if ( 428 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 429 BackupSystemTable.PATH_COL.length) == 0 430 ) { 431 path = Bytes.toString(CellUtil.cloneValue(cell)); 432 } 433 } 434 result.add(new BulkLoad(table, region, fam, path, row)); 435 LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path); 436 } 437 } 438 return result; 439 } 440 441 /** 442 * Reads backup status object (instance of backup info) from backup system table table 443 * @param backupId backup id 444 * @return Current status of backup session or null 445 */ 446 public BackupInfo readBackupInfo(String backupId) throws IOException { 447 if (LOG.isTraceEnabled()) { 448 LOG.trace("read backup status from backup system table for: " + backupId); 449 } 450 451 try (Table table = connection.getTable(tableName)) { 452 Get get = createGetForBackupInfo(backupId); 453 Result res = table.get(get); 454 if (res.isEmpty()) { 455 return null; 456 } 457 return resultToBackupInfo(res); 458 } 459 } 460 461 /** 462 * Read the last backup start code (timestamp) of last successful backup. Will return null if 463 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 464 * there is no successful backup completed so far. 465 * @param backupRoot directory path to backup destination 466 * @return the timestamp of last successful backup 467 * @throws IOException exception 468 */ 469 public String readBackupStartCode(String backupRoot) throws IOException { 470 LOG.trace("read backup start code from backup system table"); 471 472 try (Table table = connection.getTable(tableName)) { 473 Get get = createGetForStartCode(backupRoot); 474 Result res = table.get(get); 475 if (res.isEmpty()) { 476 return null; 477 } 478 Cell cell = res.listCells().get(0); 479 byte[] val = CellUtil.cloneValue(cell); 480 if (val.length == 0) { 481 return null; 482 } 483 return new String(val, StandardCharsets.UTF_8); 484 } 485 } 486 487 /** 488 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 489 * @param startCode start code 490 * @param backupRoot root directory path to backup 491 * @throws IOException exception 492 */ 493 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 494 if (LOG.isTraceEnabled()) { 495 LOG.trace("write backup start code to backup system table " + startCode); 496 } 497 try (Table table = connection.getTable(tableName)) { 498 Put put = createPutForStartCode(startCode.toString(), backupRoot); 499 table.put(put); 500 } 501 } 502 503 /** 504 * Exclusive operations are: create, delete, merge 505 * @throws IOException if a table operation fails or an active backup exclusive operation is 506 * already underway 507 */ 508 public void startBackupExclusiveOperation() throws IOException { 509 LOG.debug("Start new backup exclusive operation"); 510 511 try (Table table = connection.getTable(tableName)) { 512 Put put = createPutForStartBackupSession(); 513 // First try to put if row does not exist 514 if ( 515 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 516 .ifNotExists().thenPut(put) 517 ) { 518 // Row exists, try to put if value == ACTIVE_SESSION_NO 519 if ( 520 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 521 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 522 ) { 523 throw new ExclusiveOperationException(); 524 } 525 } 526 } 527 } 528 529 private Put createPutForStartBackupSession() { 530 Put put = new Put(ACTIVE_SESSION_ROW); 531 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 532 return put; 533 } 534 535 public void finishBackupExclusiveOperation() throws IOException { 536 LOG.debug("Finish backup exclusive operation"); 537 538 try (Table table = connection.getTable(tableName)) { 539 Put put = createPutForStopBackupSession(); 540 if ( 541 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 542 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 543 ) { 544 throw new IOException("There is no active backup exclusive operation"); 545 } 546 } 547 } 548 549 private Put createPutForStopBackupSession() { 550 Put put = new Put(ACTIVE_SESSION_ROW); 551 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 552 return put; 553 } 554 555 /** 556 * Get the Region Servers log information after the last log roll from backup system table. 557 * @param backupRoot root directory path to backup 558 * @return RS log info 559 * @throws IOException exception 560 */ 561 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 562 throws IOException { 563 LOG.trace("read region server last roll log result to backup system table"); 564 565 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 566 567 try (Table table = connection.getTable(tableName); 568 ResultScanner scanner = table.getScanner(scan)) { 569 Result res; 570 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 571 while ((res = scanner.next()) != null) { 572 res.advance(); 573 Cell cell = res.current(); 574 byte[] row = CellUtil.cloneRow(cell); 575 String server = getServerNameForReadRegionServerLastLogRollResult(row); 576 byte[] data = CellUtil.cloneValue(cell); 577 rsTimestampMap.put(server, Bytes.toLong(data)); 578 } 579 return rsTimestampMap; 580 } 581 } 582 583 /** 584 * Writes Region Server last roll log result (timestamp) to backup system table table 585 * @param server Region Server name 586 * @param ts last log timestamp 587 * @param backupRoot root directory path to backup 588 * @throws IOException exception 589 */ 590 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 591 throws IOException { 592 LOG.trace("write region server last roll log result to backup system table"); 593 594 try (Table table = connection.getTable(tableName)) { 595 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 596 table.put(put); 597 } 598 } 599 600 /** 601 * Get all backup information passing the given filters, ordered by descending start time. I.e. 602 * from newest to oldest. 603 */ 604 public List<BackupInfo> getBackupHistory(BackupInfo.Filter... toInclude) throws IOException { 605 LOG.trace("get backup history from backup system table"); 606 607 List<BackupInfo> list = getBackupInfos(toInclude); 608 list.sort(Comparator.comparing(BackupInfo::getStartTs).reversed()); 609 return list; 610 } 611 612 /** 613 * Retrieve all table names that are part of any known completed backup 614 */ 615 public Set<TableName> getTablesIncludedInBackups() throws IOException { 616 // Incremental backups have the same tables as the preceding full backups 617 List<BackupInfo> infos = 618 getBackupInfos(withState(BackupState.COMPLETE), withType(BackupType.FULL)); 619 return infos.stream().flatMap(info -> info.getTableNames().stream()) 620 .collect(Collectors.toSet()); 621 } 622 623 /** 624 * Goes through all backup history corresponding to the provided root folder, and collects all 625 * backup info mentioning each of the provided tables. 626 * @param set the tables for which to collect the {@code BackupInfo} 627 * @param backupRoot backup destination path to retrieve backup history for 628 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 629 * least one {@code BackupInfo} 630 * @throws IOException if getting the backup history fails 631 */ 632 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 633 String backupRoot) throws IOException { 634 List<BackupInfo> history = getBackupHistory(withRoot(backupRoot)); 635 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 636 for (BackupInfo info : history) { 637 List<TableName> tables = info.getTableNames(); 638 for (TableName tableName : tables) { 639 if (set.contains(tableName)) { 640 List<BackupInfo> list = 641 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 642 list.add(info); 643 } 644 } 645 } 646 return tableHistoryMap; 647 } 648 649 /** 650 * Get all backup infos passing the given filters (ordered by ascending backup id) 651 */ 652 public List<BackupInfo> getBackupInfos(BackupInfo.Filter... toInclude) throws IOException { 653 return getBackupInfos(Integer.MAX_VALUE, toInclude); 654 } 655 656 /** 657 * Get the first n backup infos passing the given filters (ordered by ascending backup id) 658 */ 659 public List<BackupInfo> getBackupInfos(int n, BackupInfo.Filter... toInclude) throws IOException { 660 LOG.trace("get backup infos from backup system table"); 661 662 if (n <= 0) { 663 return Collections.emptyList(); 664 } 665 666 Predicate<BackupInfo> combinedPredicate = Stream.of(toInclude) 667 .map(filter -> (Predicate<BackupInfo>) filter).reduce(Predicate::and).orElse(x -> true); 668 669 Scan scan = createScanForBackupHistory(); 670 List<BackupInfo> list = new ArrayList<>(); 671 672 try (Table table = connection.getTable(tableName); 673 ResultScanner scanner = table.getScanner(scan)) { 674 Result res; 675 while ((res = scanner.next()) != null) { 676 res.advance(); 677 BackupInfo context = cellToBackupInfo(res.current()); 678 if (combinedPredicate.test(context)) { 679 list.add(context); 680 if (list.size() == n) { 681 break; 682 } 683 } 684 } 685 return list; 686 } 687 } 688 689 /** 690 * Write the current timestamps for each regionserver to backup system table after a successful 691 * full or incremental backup. The saved timestamp is of the last log file that was backed up 692 * already. 693 * @param tables tables 694 * @param newTimestamps timestamps 695 * @param backupRoot root directory path to backup 696 * @throws IOException exception 697 */ 698 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 699 String backupRoot) throws IOException { 700 if (LOG.isTraceEnabled()) { 701 LOG.trace("write RS log time stamps to backup system table for tables [" 702 + StringUtils.join(tables, ",") + "]"); 703 } 704 List<Put> puts = new ArrayList<>(); 705 for (TableName table : tables) { 706 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 707 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 708 puts.add(put); 709 } 710 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 711 bufferedMutator.mutate(puts); 712 } 713 } 714 715 /** 716 * Read the timestamp for each region server log after the last successful backup. Each table has 717 * its own set of the timestamps. The info is stored for each table as a concatenated string of 718 * rs->timestapmp 719 * @param backupRoot root directory path to backup 720 * @return the timestamp for each region server. key: tableName value: 721 * RegionServer,PreviousTimeStamp 722 * @throws IOException exception 723 */ 724 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 725 throws IOException { 726 if (LOG.isTraceEnabled()) { 727 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 728 } 729 730 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 731 732 Scan scan = createScanForReadLogTimestampMap(backupRoot); 733 try (Table table = connection.getTable(tableName); 734 ResultScanner scanner = table.getScanner(scan)) { 735 Result res; 736 while ((res = scanner.next()) != null) { 737 res.advance(); 738 Cell cell = res.current(); 739 byte[] row = CellUtil.cloneRow(cell); 740 String tabName = getTableNameForReadLogTimestampMap(row); 741 TableName tn = TableName.valueOf(tabName); 742 byte[] data = CellUtil.cloneValue(cell); 743 if (data == null) { 744 throw new IOException("Data of last backup data from backup system table " 745 + "is empty. Create a backup first."); 746 } 747 if (data != null && data.length > 0) { 748 HashMap<String, Long> lastBackup = 749 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 750 tableTimestampMap.put(tn, lastBackup); 751 } 752 } 753 return tableTimestampMap; 754 } 755 } 756 757 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 758 Map<String, Long> map) { 759 BackupProtos.TableServerTimestamp.Builder tstBuilder = 760 BackupProtos.TableServerTimestamp.newBuilder(); 761 tstBuilder 762 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 763 764 for (Entry<String, Long> entry : map.entrySet()) { 765 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 766 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 767 ServerName sn = ServerName.parseServerName(entry.getKey()); 768 snBuilder.setHostName(sn.getHostname()); 769 snBuilder.setPort(sn.getPort()); 770 builder.setServerName(snBuilder.build()); 771 builder.setTimestamp(entry.getValue()); 772 tstBuilder.addServerTimestamp(builder.build()); 773 } 774 775 return tstBuilder.build(); 776 } 777 778 private HashMap<String, Long> 779 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 780 781 HashMap<String, Long> map = new HashMap<>(); 782 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 783 for (BackupProtos.ServerTimestamp st : list) { 784 ServerName sn = 785 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 786 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 787 } 788 return map; 789 } 790 791 /** 792 * Return the current tables covered by incremental backup. 793 * @param backupRoot root directory path to backup 794 * @return set of tableNames 795 * @throws IOException exception 796 */ 797 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 798 LOG.trace("get incremental backup table set from backup system table"); 799 800 TreeSet<TableName> set = new TreeSet<>(); 801 802 try (Table table = connection.getTable(tableName)) { 803 Get get = createGetForIncrBackupTableSet(backupRoot); 804 Result res = table.get(get); 805 if (res.isEmpty()) { 806 return set; 807 } 808 List<Cell> cells = res.listCells(); 809 for (Cell cell : cells) { 810 // qualifier = table name - we use table names as qualifiers 811 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 812 } 813 return set; 814 } 815 } 816 817 /** 818 * Add tables to global incremental backup set 819 * @param tables set of tables 820 * @param backupRoot root directory path to backup 821 * @throws IOException exception 822 */ 823 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 824 throws IOException { 825 if (LOG.isTraceEnabled()) { 826 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 827 + " tables [" + StringUtils.join(tables, " ") + "]"); 828 } 829 if (LOG.isDebugEnabled()) { 830 tables.forEach(table -> LOG.debug(Objects.toString(table))); 831 } 832 try (Table table = connection.getTable(tableName)) { 833 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 834 table.put(put); 835 } 836 } 837 838 /** 839 * Deletes incremental backup set for a backup destination 840 * @param backupRoot backup root 841 */ 842 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 843 if (LOG.isTraceEnabled()) { 844 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 845 } 846 try (Table table = connection.getTable(tableName)) { 847 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 848 table.delete(delete); 849 } 850 } 851 852 /** 853 * Checks if we have at least one backup session in backup system table This API is used by 854 * BackupLogCleaner 855 * @return true, if - at least one session exists in backup system table table 856 * @throws IOException exception 857 */ 858 public boolean hasBackupSessions() throws IOException { 859 LOG.trace("Has backup sessions from backup system table"); 860 861 boolean result = false; 862 Scan scan = createScanForBackupHistory(); 863 scan.setCaching(1); 864 try (Table table = connection.getTable(tableName); 865 ResultScanner scanner = table.getScanner(scan)) { 866 if (scanner.next() != null) { 867 result = true; 868 } 869 return result; 870 } 871 } 872 873 /** 874 * BACKUP SETS 875 */ 876 877 /** 878 * Get backup set list 879 * @return backup set list 880 * @throws IOException if a table or scanner operation fails 881 */ 882 public List<String> listBackupSets() throws IOException { 883 LOG.trace("Backup set list"); 884 885 List<String> list = new ArrayList<>(); 886 try (Table table = connection.getTable(tableName)) { 887 Scan scan = createScanForBackupSetList(); 888 scan.readVersions(1); 889 try (ResultScanner scanner = table.getScanner(scan)) { 890 Result res; 891 while ((res = scanner.next()) != null) { 892 res.advance(); 893 list.add(cellKeyToBackupSetName(res.current())); 894 } 895 return list; 896 } 897 } 898 } 899 900 /** 901 * Get backup set description (list of tables) 902 * @param name set's name 903 * @return list of tables in a backup set 904 * @throws IOException if a table operation fails 905 */ 906 public List<TableName> describeBackupSet(String name) throws IOException { 907 if (LOG.isTraceEnabled()) { 908 LOG.trace(" Backup set describe: " + name); 909 } 910 try (Table table = connection.getTable(tableName)) { 911 Get get = createGetForBackupSet(name); 912 Result res = table.get(get); 913 if (res.isEmpty()) { 914 return null; 915 } 916 res.advance(); 917 String[] tables = cellValueToBackupSet(res.current()); 918 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 919 .collect(Collectors.toList()); 920 } 921 } 922 923 /** 924 * Add backup set (list of tables) 925 * @param name set name 926 * @param newTables list of tables, comma-separated 927 * @throws IOException if a table operation fails 928 */ 929 public void addToBackupSet(String name, String[] newTables) throws IOException { 930 if (LOG.isTraceEnabled()) { 931 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 932 } 933 String[] union = null; 934 try (Table table = connection.getTable(tableName)) { 935 Get get = createGetForBackupSet(name); 936 Result res = table.get(get); 937 if (res.isEmpty()) { 938 union = newTables; 939 } else { 940 res.advance(); 941 String[] tables = cellValueToBackupSet(res.current()); 942 union = merge(tables, newTables); 943 } 944 Put put = createPutForBackupSet(name, union); 945 table.put(put); 946 } 947 } 948 949 /** 950 * Remove tables from backup set (list of tables) 951 * @param name set name 952 * @param toRemove list of tables 953 * @throws IOException if a table operation or deleting the backup set fails 954 */ 955 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 956 if (LOG.isTraceEnabled()) { 957 LOG.trace( 958 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 959 } 960 String[] disjoint; 961 String[] tables; 962 try (Table table = connection.getTable(tableName)) { 963 Get get = createGetForBackupSet(name); 964 Result res = table.get(get); 965 if (res.isEmpty()) { 966 LOG.warn("Backup set '" + name + "' not found."); 967 return; 968 } else { 969 res.advance(); 970 tables = cellValueToBackupSet(res.current()); 971 disjoint = disjoin(tables, toRemove); 972 } 973 if (disjoint.length > 0 && disjoint.length != tables.length) { 974 Put put = createPutForBackupSet(name, disjoint); 975 table.put(put); 976 } else if (disjoint.length == tables.length) { 977 LOG.warn("Backup set '" + name + "' does not contain tables [" 978 + StringUtils.join(toRemove, " ") + "]"); 979 } else { // disjoint.length == 0 and tables.length >0 980 // Delete backup set 981 LOG.info("Backup set '" + name + "' is empty. Deleting."); 982 deleteBackupSet(name); 983 } 984 } 985 } 986 987 private String[] merge(String[] existingTables, String[] newTables) { 988 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 989 tables.addAll(Arrays.asList(newTables)); 990 return tables.toArray(new String[0]); 991 } 992 993 private String[] disjoin(String[] existingTables, String[] toRemove) { 994 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 995 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 996 return tables.toArray(new String[0]); 997 } 998 999 /** 1000 * Delete backup set 1001 * @param name set's name 1002 * @throws IOException if getting or deleting the table fails 1003 */ 1004 public void deleteBackupSet(String name) throws IOException { 1005 if (LOG.isTraceEnabled()) { 1006 LOG.trace(" Backup set delete: " + name); 1007 } 1008 try (Table table = connection.getTable(tableName)) { 1009 Delete del = createDeleteForBackupSet(name); 1010 table.delete(del); 1011 } 1012 } 1013 1014 /** 1015 * Get backup system table descriptor 1016 * @return table's descriptor 1017 */ 1018 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1019 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1020 1021 ColumnFamilyDescriptorBuilder colBuilder = 1022 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1023 1024 colBuilder.setMaxVersions(1); 1025 Configuration config = HBaseConfiguration.create(); 1026 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1027 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1028 colBuilder.setTimeToLive(ttl); 1029 1030 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1031 builder.setColumnFamily(colSessionsDesc); 1032 1033 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1034 colBuilder.setTimeToLive(ttl); 1035 builder.setColumnFamily(colBuilder.build()); 1036 return builder.build(); 1037 } 1038 1039 public static TableName getTableName(Configuration conf) { 1040 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1041 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1042 return TableName.valueOf(name); 1043 } 1044 1045 public static String getTableNameAsString(Configuration conf) { 1046 return getTableName(conf).getNameAsString(); 1047 } 1048 1049 public static String getSnapshotName(Configuration conf) { 1050 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1051 } 1052 1053 /** 1054 * Get backup system table descriptor 1055 * @return table's descriptor 1056 */ 1057 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1058 TableDescriptorBuilder builder = 1059 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1060 1061 ColumnFamilyDescriptorBuilder colBuilder = 1062 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1063 colBuilder.setMaxVersions(1); 1064 Configuration config = HBaseConfiguration.create(); 1065 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1066 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1067 colBuilder.setTimeToLive(ttl); 1068 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1069 builder.setColumnFamily(colSessionsDesc); 1070 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1071 colBuilder.setTimeToLive(ttl); 1072 builder.setColumnFamily(colBuilder.build()); 1073 return builder.build(); 1074 } 1075 1076 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1077 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1078 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1079 return TableName.valueOf(name); 1080 } 1081 1082 /** 1083 * Creates Put operation for a given backup info object 1084 * @param context backup info 1085 * @return put operation 1086 * @throws IOException exception 1087 */ 1088 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1089 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1090 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1091 context.toByteArray()); 1092 return put; 1093 } 1094 1095 /** 1096 * Creates Get operation for a given backup id 1097 * @param backupId backup's ID 1098 * @return get operation 1099 * @throws IOException exception 1100 */ 1101 private Get createGetForBackupInfo(String backupId) throws IOException { 1102 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1103 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1104 get.readVersions(1); 1105 return get; 1106 } 1107 1108 /** 1109 * Creates Delete operation for a given backup id 1110 * @param backupId backup's ID 1111 * @return delete operation 1112 */ 1113 private Delete createDeleteForBackupInfo(String backupId) { 1114 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1115 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1116 return del; 1117 } 1118 1119 /** 1120 * Converts Result to BackupInfo 1121 * @param res HBase result 1122 * @return backup info instance 1123 * @throws IOException exception 1124 */ 1125 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1126 res.advance(); 1127 Cell cell = res.current(); 1128 return cellToBackupInfo(cell); 1129 } 1130 1131 /** 1132 * Creates Get operation to retrieve start code from backup system table 1133 * @return get operation 1134 * @throws IOException exception 1135 */ 1136 private Get createGetForStartCode(String rootPath) throws IOException { 1137 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1138 get.addFamily(BackupSystemTable.META_FAMILY); 1139 get.readVersions(1); 1140 return get; 1141 } 1142 1143 /** 1144 * Creates Put operation to store start code to backup system table 1145 * @return put operation 1146 */ 1147 private Put createPutForStartCode(String startCode, String rootPath) { 1148 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1149 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1150 Bytes.toBytes(startCode)); 1151 return put; 1152 } 1153 1154 /** 1155 * Creates Get to retrieve incremental backup table set from backup system table 1156 * @return get operation 1157 * @throws IOException exception 1158 */ 1159 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1160 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1161 get.addFamily(BackupSystemTable.META_FAMILY); 1162 get.readVersions(1); 1163 return get; 1164 } 1165 1166 /** 1167 * Creates Put to store incremental backup table set 1168 * @param tables tables 1169 * @return put operation 1170 */ 1171 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1172 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1173 for (TableName table : tables) { 1174 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1175 EMPTY_VALUE); 1176 } 1177 return put; 1178 } 1179 1180 /** 1181 * Creates Delete for incremental backup table set 1182 * @param backupRoot backup root 1183 * @return delete operation 1184 */ 1185 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1186 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1187 delete.addFamily(BackupSystemTable.META_FAMILY); 1188 return delete; 1189 } 1190 1191 /** 1192 * Creates Scan operation to load backup history 1193 * @return scan operation 1194 */ 1195 private Scan createScanForBackupHistory() { 1196 Scan scan = new Scan(); 1197 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1198 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1199 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1200 scan.withStartRow(startRow); 1201 scan.withStopRow(stopRow); 1202 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1203 scan.readVersions(1); 1204 return scan; 1205 } 1206 1207 /** 1208 * Converts cell to backup info instance. 1209 * @param current current cell 1210 * @return backup backup info instance 1211 * @throws IOException exception 1212 */ 1213 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1214 byte[] data = CellUtil.cloneValue(current); 1215 return BackupInfo.fromByteArray(data); 1216 } 1217 1218 /** 1219 * Creates Put to write RS last roll log timestamp map 1220 * @param table table 1221 * @param smap map, containing RS:ts 1222 * @return put operation 1223 */ 1224 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1225 String backupRoot) { 1226 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1227 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1228 return put; 1229 } 1230 1231 /** 1232 * Creates Scan to load table-> { RS -> ts} map of maps 1233 * @return scan operation 1234 */ 1235 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1236 Scan scan = new Scan(); 1237 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1238 scan.addFamily(BackupSystemTable.META_FAMILY); 1239 1240 return scan; 1241 } 1242 1243 /** 1244 * Get table name from rowkey 1245 * @param cloneRow rowkey 1246 * @return table name 1247 */ 1248 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1249 String s = Bytes.toString(cloneRow); 1250 int index = s.lastIndexOf(NULL); 1251 return s.substring(index + 1); 1252 } 1253 1254 /** 1255 * Creates Put to store RS last log result 1256 * @param server server name 1257 * @param timestamp log roll result (timestamp) 1258 * @return put operation 1259 */ 1260 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1261 String backupRoot) { 1262 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1263 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1264 Bytes.toBytes(timestamp)); 1265 return put; 1266 } 1267 1268 /** 1269 * Creates Scan operation to load last RS log roll results 1270 * @return scan operation 1271 */ 1272 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1273 Scan scan = new Scan(); 1274 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1275 scan.addFamily(BackupSystemTable.META_FAMILY); 1276 scan.readVersions(1); 1277 1278 return scan; 1279 } 1280 1281 /** 1282 * Get server's name from rowkey 1283 * @param row rowkey 1284 * @return server's name 1285 */ 1286 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1287 String s = Bytes.toString(row); 1288 int index = s.lastIndexOf(NULL); 1289 return s.substring(index + 1); 1290 } 1291 1292 /** 1293 * Creates Put's for bulk loads. 1294 */ 1295 private static List<Put> createPutForBulkLoad(TableName table, byte[] region, 1296 Map<byte[], List<Path>> columnFamilyToHFilePaths) { 1297 List<Put> puts = new ArrayList<>(); 1298 for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) { 1299 for (Path path : entry.getValue()) { 1300 String file = path.toString(); 1301 int lastSlash = file.lastIndexOf("/"); 1302 String filename = file.substring(lastSlash + 1); 1303 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1304 Bytes.toString(region), BLK_LD_DELIM, filename)); 1305 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1306 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1307 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1308 puts.add(put); 1309 LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); 1310 } 1311 } 1312 return puts; 1313 } 1314 1315 public static void snapshot(Connection conn) throws IOException { 1316 try (Admin admin = conn.getAdmin()) { 1317 Configuration conf = conn.getConfiguration(); 1318 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1319 } 1320 } 1321 1322 public static void restoreFromSnapshot(Connection conn) throws IOException { 1323 Configuration conf = conn.getConfiguration(); 1324 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1325 try (Admin admin = conn.getAdmin()) { 1326 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1327 if (snapshotExists(admin, snapshotName)) { 1328 admin.restoreBackupSystemTable(snapshotName); 1329 LOG.debug("Done restoring backup system table"); 1330 } else { 1331 // Snapshot does not exists, i.e completeBackup failed after 1332 // deleting backup system table snapshot 1333 // In this case we log WARN and proceed 1334 LOG.warn( 1335 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1336 } 1337 } 1338 } 1339 1340 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1341 List<SnapshotDescription> list = admin.listSnapshots(); 1342 for (SnapshotDescription desc : list) { 1343 if (desc.getName().equals(snapshotName)) { 1344 return true; 1345 } 1346 } 1347 return false; 1348 } 1349 1350 public static boolean snapshotExists(Connection conn) throws IOException { 1351 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1352 } 1353 1354 public static void deleteSnapshot(Connection conn) throws IOException { 1355 Configuration conf = conn.getConfiguration(); 1356 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1357 try (Admin admin = conn.getAdmin()) { 1358 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1359 if (snapshotExists(admin, snapshotName)) { 1360 admin.deleteSnapshot(snapshotName); 1361 LOG.debug("Done deleting backup system table snapshot"); 1362 } else { 1363 LOG.error("Snapshot " + snapshotName + " does not exists"); 1364 } 1365 } 1366 } 1367 1368 private Put createPutForDeleteOperation(String[] backupIdList) { 1369 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1370 Put put = new Put(DELETE_OP_ROW); 1371 put.addColumn(META_FAMILY, FAM_COL, value); 1372 return put; 1373 } 1374 1375 private Delete createDeleteForBackupDeleteOperation() { 1376 Delete delete = new Delete(DELETE_OP_ROW); 1377 delete.addFamily(META_FAMILY); 1378 return delete; 1379 } 1380 1381 private Get createGetForDeleteOperation() { 1382 Get get = new Get(DELETE_OP_ROW); 1383 get.addFamily(META_FAMILY); 1384 return get; 1385 } 1386 1387 public void startDeleteOperation(String[] backupIdList) throws IOException { 1388 if (LOG.isTraceEnabled()) { 1389 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1390 } 1391 Put put = createPutForDeleteOperation(backupIdList); 1392 try (Table table = connection.getTable(tableName)) { 1393 table.put(put); 1394 } 1395 } 1396 1397 public void finishDeleteOperation() throws IOException { 1398 LOG.trace("Finsih delete operation for backup ids"); 1399 1400 Delete delete = createDeleteForBackupDeleteOperation(); 1401 try (Table table = connection.getTable(tableName)) { 1402 table.delete(delete); 1403 } 1404 } 1405 1406 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1407 LOG.trace("Get delete operation for backup ids"); 1408 1409 Get get = createGetForDeleteOperation(); 1410 try (Table table = connection.getTable(tableName)) { 1411 Result res = table.get(get); 1412 if (res.isEmpty()) { 1413 return null; 1414 } 1415 Cell cell = res.listCells().get(0); 1416 byte[] val = CellUtil.cloneValue(cell); 1417 if (val.length == 0) { 1418 return null; 1419 } 1420 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1421 .toArray(String[]::new); 1422 } 1423 } 1424 1425 private Put createPutForMergeOperation(String[] backupIdList) { 1426 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1427 Put put = new Put(MERGE_OP_ROW); 1428 put.addColumn(META_FAMILY, FAM_COL, value); 1429 return put; 1430 } 1431 1432 public boolean isMergeInProgress() throws IOException { 1433 Get get = new Get(MERGE_OP_ROW); 1434 try (Table table = connection.getTable(tableName)) { 1435 Result res = table.get(get); 1436 return !res.isEmpty(); 1437 } 1438 } 1439 1440 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1441 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1442 Put put = new Put(MERGE_OP_ROW); 1443 put.addColumn(META_FAMILY, PATH_COL, value); 1444 return put; 1445 } 1446 1447 private Delete createDeleteForBackupMergeOperation() { 1448 Delete delete = new Delete(MERGE_OP_ROW); 1449 delete.addFamily(META_FAMILY); 1450 return delete; 1451 } 1452 1453 private Get createGetForMergeOperation() { 1454 Get get = new Get(MERGE_OP_ROW); 1455 get.addFamily(META_FAMILY); 1456 return get; 1457 } 1458 1459 public void startMergeOperation(String[] backupIdList) throws IOException { 1460 if (LOG.isTraceEnabled()) { 1461 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1462 } 1463 Put put = createPutForMergeOperation(backupIdList); 1464 try (Table table = connection.getTable(tableName)) { 1465 table.put(put); 1466 } 1467 } 1468 1469 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1470 if (LOG.isTraceEnabled()) { 1471 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1472 } 1473 Put put = createPutForUpdateTablesForMerge(tables); 1474 try (Table table = connection.getTable(tableName)) { 1475 table.put(put); 1476 } 1477 } 1478 1479 public void finishMergeOperation() throws IOException { 1480 LOG.trace("Finish merge operation for backup ids"); 1481 1482 Delete delete = createDeleteForBackupMergeOperation(); 1483 try (Table table = connection.getTable(tableName)) { 1484 table.delete(delete); 1485 } 1486 } 1487 1488 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1489 LOG.trace("Get backup ids for merge operation"); 1490 1491 Get get = createGetForMergeOperation(); 1492 try (Table table = connection.getTable(tableName)) { 1493 Result res = table.get(get); 1494 if (res.isEmpty()) { 1495 return null; 1496 } 1497 Cell cell = res.listCells().get(0); 1498 byte[] val = CellUtil.cloneValue(cell); 1499 if (val.length == 0) { 1500 return null; 1501 } 1502 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1503 .toArray(String[]::new); 1504 } 1505 } 1506 1507 /** 1508 * Creates a scan to read all registered bulk loads for the given table, or for all tables if 1509 * {@code table} is {@code null}. 1510 */ 1511 static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) { 1512 Scan scan = new Scan(); 1513 byte[] startRow = table == null 1514 ? BULK_LOAD_PREFIX_BYTES 1515 : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1516 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1517 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1518 scan.withStartRow(startRow); 1519 scan.withStopRow(stopRow); 1520 scan.addFamily(BackupSystemTable.META_FAMILY); 1521 scan.readVersions(1); 1522 return scan; 1523 } 1524 1525 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1526 // format is bulk : namespace : table : region : file 1527 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1528 } 1529 1530 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1531 // format is bulk : namespace : table : region : file 1532 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1533 Iterator<String> i = parts.iterator(); 1534 int idx = 3; 1535 if (parts.size() == 4) { 1536 // the table is in default namespace 1537 idx = 2; 1538 } 1539 String region = Iterators.get(i, idx); 1540 LOG.debug("bulk row string " + rowStr + " region " + region); 1541 return region; 1542 } 1543 1544 /* 1545 * Used to query bulk loaded hfiles which have been copied by incremental backup 1546 * @param backupId the backup Id. It can be null when querying for all tables 1547 * @return the Scan object 1548 * @deprecated This method is broken if a backupId is specified - see HBASE-28715 1549 */ 1550 static Scan createScanForBulkLoadedFiles(String backupId) { 1551 Scan scan = new Scan(); 1552 byte[] startRow = 1553 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1554 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1555 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1556 scan.withStartRow(startRow); 1557 scan.withStopRow(stopRow); 1558 scan.addFamily(BackupSystemTable.META_FAMILY); 1559 scan.readVersions(1); 1560 return scan; 1561 } 1562 1563 /** 1564 * Creates Scan operation to load backup set list 1565 * @return scan operation 1566 */ 1567 private Scan createScanForBackupSetList() { 1568 Scan scan = new Scan(); 1569 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1570 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1571 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1572 scan.withStartRow(startRow); 1573 scan.withStopRow(stopRow); 1574 scan.addFamily(BackupSystemTable.META_FAMILY); 1575 return scan; 1576 } 1577 1578 /** 1579 * Creates Get operation to load backup set content 1580 * @return get operation 1581 */ 1582 private Get createGetForBackupSet(String name) { 1583 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1584 get.addFamily(BackupSystemTable.META_FAMILY); 1585 return get; 1586 } 1587 1588 /** 1589 * Creates Delete operation to delete backup set content 1590 * @param name backup set's name 1591 * @return delete operation 1592 */ 1593 private Delete createDeleteForBackupSet(String name) { 1594 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1595 del.addFamily(BackupSystemTable.META_FAMILY); 1596 return del; 1597 } 1598 1599 /** 1600 * Creates Put operation to update backup set content 1601 * @param name backup set's name 1602 * @param tables list of tables 1603 * @return put operation 1604 */ 1605 private Put createPutForBackupSet(String name, String[] tables) { 1606 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1607 byte[] value = convertToByteArray(tables); 1608 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1609 return put; 1610 } 1611 1612 private byte[] convertToByteArray(String[] tables) { 1613 return Bytes.toBytes(StringUtils.join(tables, ",")); 1614 } 1615 1616 /** 1617 * Converts cell to backup set list. 1618 * @param current current cell 1619 * @return backup set as array of table names 1620 */ 1621 private String[] cellValueToBackupSet(Cell current) { 1622 byte[] data = CellUtil.cloneValue(current); 1623 if (!ArrayUtils.isEmpty(data)) { 1624 return Bytes.toString(data).split(","); 1625 } 1626 return new String[0]; 1627 } 1628 1629 /** 1630 * Converts cell key to backup set name. 1631 * @param current current cell 1632 * @return backup set name 1633 */ 1634 private String cellKeyToBackupSetName(Cell current) { 1635 byte[] data = CellUtil.cloneRow(current); 1636 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1637 } 1638 1639 private static byte[] rowkey(String s, String... other) { 1640 StringBuilder sb = new StringBuilder(s); 1641 for (String ss : other) { 1642 sb.append(ss); 1643 } 1644 return Bytes.toBytes(sb.toString()); 1645 } 1646 1647 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1648 if (!admin.isTableEnabled(tableName)) { 1649 try { 1650 admin.enableTable(tableName); 1651 } catch (TableNotDisabledException ignored) { 1652 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1653 } 1654 } 1655 } 1656}