001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.charset.StandardCharsets; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.Objects; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.ArrayUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.NamespaceDescriptor; 046import org.apache.hadoop.hbase.NamespaceExistException; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNotDisabledException; 051import org.apache.hadoop.hbase.backup.BackupInfo; 052import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 053import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 054import org.apache.hadoop.hbase.backup.BackupType; 055import org.apache.hadoop.hbase.backup.util.BackupUtils; 056import org.apache.hadoop.hbase.client.Admin; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.Delete; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.SnapshotDescription; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.yetus.audience.InterfaceAudience; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 078import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 079 080import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 082 083/** 084 * This class provides API to access backup system table<br> 085 * Backup system table schema:<br> 086 * <p> 087 * <ul> 088 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 089 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 090 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 091 * include table; value=empty</li> 092 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 093 * timestamp]</li> 094 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 095 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 096 * name</li> 097 * </ul> 098 * </p> 099 */ 100@InterfaceAudience.Private 101public final class BackupSystemTable implements Closeable { 102 103 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 104 105 static class WALItem { 106 String backupId; 107 String walFile; 108 String backupRoot; 109 110 WALItem(String backupId, String walFile, String backupRoot) { 111 this.backupId = backupId; 112 this.walFile = walFile; 113 this.backupRoot = backupRoot; 114 } 115 116 public String getBackupId() { 117 return backupId; 118 } 119 120 public String getWalFile() { 121 return walFile; 122 } 123 124 public String getBackupRoot() { 125 return backupRoot; 126 } 127 128 @Override 129 public String toString() { 130 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 131 } 132 } 133 134 /** 135 * Backup system table (main) name 136 */ 137 private TableName tableName; 138 139 /** 140 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 141 * separate table because we have to isolate general backup operations: create, merge etc from 142 * activity of RegionObserver, which controls process of a bulk loading 143 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 144 */ 145 private TableName bulkLoadTableName; 146 147 /** 148 * Stores backup sessions (contexts) 149 */ 150 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 151 /** 152 * Stores other meta 153 */ 154 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 155 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 156 /** 157 * Connection to HBase cluster, shared among all instances 158 */ 159 private final Connection connection; 160 161 private final static String BACKUP_INFO_PREFIX = "session:"; 162 private final static String START_CODE_ROW = "startcode:"; 163 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 164 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 165 166 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 167 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 168 169 private final static String INCR_BACKUP_SET = "incrbackupset:"; 170 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 171 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 172 173 private final static String BULK_LOAD_PREFIX = "bulk:"; 174 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 175 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 176 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 177 178 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 179 final static byte[] FAM_COL = Bytes.toBytes("fam"); 180 final static byte[] PATH_COL = Bytes.toBytes("path"); 181 182 private final static String SET_KEY_PREFIX = "backupset:"; 183 184 // separator between BULK_LOAD_PREFIX and ordinals 185 private final static String BLK_LD_DELIM = ":"; 186 private final static byte[] EMPTY_VALUE = new byte[] {}; 187 188 // Safe delimiter in a string 189 private final static String NULL = "\u0000"; 190 191 public BackupSystemTable(Connection conn) throws IOException { 192 this.connection = conn; 193 Configuration conf = this.connection.getConfiguration(); 194 tableName = BackupSystemTable.getTableName(conf); 195 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 196 checkSystemTable(); 197 } 198 199 private void checkSystemTable() throws IOException { 200 try (Admin admin = connection.getAdmin()) { 201 verifyNamespaceExists(admin); 202 Configuration conf = connection.getConfiguration(); 203 if (!admin.tableExists(tableName)) { 204 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 205 createSystemTable(admin, backupHTD); 206 } 207 ensureTableEnabled(admin, tableName); 208 if (!admin.tableExists(bulkLoadTableName)) { 209 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 210 createSystemTable(admin, blHTD); 211 } 212 ensureTableEnabled(admin, bulkLoadTableName); 213 waitForSystemTable(admin, tableName); 214 waitForSystemTable(admin, bulkLoadTableName); 215 } 216 } 217 218 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 219 try { 220 admin.createTable(descriptor); 221 } catch (TableExistsException e) { 222 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 223 // so may be subject to race conditions where one caller succeeds in creating the 224 // table and others fail because it now exists 225 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 226 } 227 } 228 229 private void verifyNamespaceExists(Admin admin) throws IOException { 230 String namespaceName = tableName.getNamespaceAsString(); 231 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 232 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 233 boolean exists = false; 234 for (NamespaceDescriptor nsd : list) { 235 if (nsd.getName().equals(ns.getName())) { 236 exists = true; 237 break; 238 } 239 } 240 if (!exists) { 241 try { 242 admin.createNamespace(ns); 243 } catch (NamespaceExistException e) { 244 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 245 // so may be subject to race conditions where one caller succeeds in creating the 246 // namespace and others fail because it now exists 247 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 248 } 249 } 250 } 251 252 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 253 // Return fast if the table is available and avoid a log message 254 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 255 return; 256 } 257 long TIMEOUT = 60000; 258 long startTime = EnvironmentEdgeManager.currentTime(); 259 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 260 tableName); 261 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 262 try { 263 Thread.sleep(100); 264 } catch (InterruptedException e) { 265 throw (IOException) new InterruptedIOException().initCause(e); 266 } 267 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 268 throw new IOException( 269 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 270 } 271 } 272 LOG.debug("Backup table {} exists and available", tableName); 273 } 274 275 @Override 276 public void close() { 277 // do nothing 278 } 279 280 /** 281 * Updates status (state) of a backup session in backup system table table 282 * @param info backup info 283 * @throws IOException exception 284 */ 285 public void updateBackupInfo(BackupInfo info) throws IOException { 286 if (LOG.isTraceEnabled()) { 287 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 288 + " set status=" + info.getState()); 289 } 290 try (Table table = connection.getTable(tableName)) { 291 Put put = createPutForBackupInfo(info); 292 table.put(put); 293 } 294 } 295 296 /* 297 * @param backupId the backup Id 298 * @return Map of rows to path of bulk loaded hfile 299 */ 300 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 301 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 302 try (Table table = connection.getTable(bulkLoadTableName); 303 ResultScanner scanner = table.getScanner(scan)) { 304 Result res = null; 305 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 306 while ((res = scanner.next()) != null) { 307 res.advance(); 308 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 309 for (Cell cell : res.listCells()) { 310 if ( 311 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 312 BackupSystemTable.PATH_COL.length) == 0 313 ) { 314 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 315 } 316 } 317 } 318 return map; 319 } 320 } 321 322 /* 323 * Used during restore 324 * @param backupId the backup Id 325 * @param sTableList List of tables 326 * @return array of Map of family to List of Paths 327 */ 328 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) 329 throws IOException { 330 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 331 @SuppressWarnings("unchecked") 332 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; 333 try (Table table = connection.getTable(bulkLoadTableName); 334 ResultScanner scanner = table.getScanner(scan)) { 335 Result res = null; 336 while ((res = scanner.next()) != null) { 337 res.advance(); 338 TableName tbl = null; 339 byte[] fam = null; 340 String path = null; 341 for (Cell cell : res.listCells()) { 342 if ( 343 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 344 BackupSystemTable.TBL_COL.length) == 0 345 ) { 346 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); 347 } else if ( 348 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 349 BackupSystemTable.FAM_COL.length) == 0 350 ) { 351 fam = CellUtil.cloneValue(cell); 352 } else if ( 353 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 354 BackupSystemTable.PATH_COL.length) == 0 355 ) { 356 path = Bytes.toString(CellUtil.cloneValue(cell)); 357 } 358 } 359 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); 360 if (srcIdx == -1) { 361 // the table is not among the query 362 continue; 363 } 364 if (mapForSrc[srcIdx] == null) { 365 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 366 } 367 List<Path> files; 368 if (!mapForSrc[srcIdx].containsKey(fam)) { 369 files = new ArrayList<Path>(); 370 mapForSrc[srcIdx].put(fam, files); 371 } else { 372 files = mapForSrc[srcIdx].get(fam); 373 } 374 files.add(new Path(path)); 375 if (LOG.isDebugEnabled()) { 376 LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path); 377 } 378 } 379 380 return mapForSrc; 381 } 382 } 383 384 /** 385 * Deletes backup status from backup system table table 386 * @param backupId backup id 387 * @throws IOException exception 388 */ 389 public void deleteBackupInfo(String backupId) throws IOException { 390 if (LOG.isTraceEnabled()) { 391 LOG.trace("delete backup status in backup system table for " + backupId); 392 } 393 try (Table table = connection.getTable(tableName)) { 394 Delete del = createDeleteForBackupInfo(backupId); 395 table.delete(del); 396 } 397 } 398 399 /** 400 * Registers a bulk load. 401 * @param tableName table name 402 * @param region the region receiving hfile 403 * @param cfToHfilePath column family and associated hfiles 404 */ 405 public void registerBulkLoad(TableName tableName, byte[] region, 406 Map<byte[], List<Path>> cfToHfilePath) throws IOException { 407 if (LOG.isDebugEnabled()) { 408 LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, 409 cfToHfilePath.size()); 410 } 411 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 412 List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); 413 bufferedMutator.mutate(puts); 414 LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName); 415 } 416 } 417 418 /** 419 * Removes entries from the table that tracks all bulk loaded hfiles. 420 * @param rows the row keys of the entries to be deleted 421 */ 422 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 423 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 424 List<Delete> deletes = new ArrayList<>(); 425 for (byte[] row : rows) { 426 Delete del = new Delete(row); 427 deletes.add(del); 428 LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row)); 429 } 430 bufferedMutator.mutate(deletes); 431 LOG.debug("Deleted {} bulk load entries.", rows.size()); 432 } 433 } 434 435 /** 436 * Reads the rows from backup table recording bulk loaded hfiles 437 * @param tableList list of table names 438 */ 439 public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException { 440 List<BulkLoad> result = new ArrayList<>(); 441 for (TableName table : tableList) { 442 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); 443 try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); 444 ResultScanner scanner = bulkLoadTable.getScanner(scan)) { 445 Result res; 446 while ((res = scanner.next()) != null) { 447 res.advance(); 448 String fam = null; 449 String path = null; 450 String region = null; 451 byte[] row = null; 452 for (Cell cell : res.listCells()) { 453 row = CellUtil.cloneRow(cell); 454 String rowStr = Bytes.toString(row); 455 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 456 if ( 457 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 458 BackupSystemTable.FAM_COL.length) == 0 459 ) { 460 fam = Bytes.toString(CellUtil.cloneValue(cell)); 461 } else if ( 462 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 463 BackupSystemTable.PATH_COL.length) == 0 464 ) { 465 path = Bytes.toString(CellUtil.cloneValue(cell)); 466 } 467 } 468 result.add(new BulkLoad(table, region, fam, path, row)); 469 LOG.debug("found orig " + path + " for " + fam + " of table " + region); 470 } 471 } 472 } 473 return result; 474 } 475 476 /* 477 * @param sTableList List of tables 478 * @param maps array of Map of family to List of Paths 479 * @param backupId the backup Id 480 */ 481 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, 482 String backupId) throws IOException { 483 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 484 long ts = EnvironmentEdgeManager.currentTime(); 485 int cnt = 0; 486 List<Put> puts = new ArrayList<>(); 487 for (int idx = 0; idx < maps.length; idx++) { 488 Map<byte[], List<Path>> map = maps[idx]; 489 TableName tn = sTableList.get(idx); 490 491 if (map == null) { 492 continue; 493 } 494 495 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { 496 byte[] fam = entry.getKey(); 497 List<Path> paths = entry.getValue(); 498 for (Path p : paths) { 499 Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, 500 ts, cnt++); 501 puts.add(put); 502 } 503 } 504 } 505 if (!puts.isEmpty()) { 506 bufferedMutator.mutate(puts); 507 } 508 } 509 } 510 511 /** 512 * Reads backup status object (instance of backup info) from backup system table table 513 * @param backupId backup id 514 * @return Current status of backup session or null 515 */ 516 public BackupInfo readBackupInfo(String backupId) throws IOException { 517 if (LOG.isTraceEnabled()) { 518 LOG.trace("read backup status from backup system table for: " + backupId); 519 } 520 521 try (Table table = connection.getTable(tableName)) { 522 Get get = createGetForBackupInfo(backupId); 523 Result res = table.get(get); 524 if (res.isEmpty()) { 525 return null; 526 } 527 return resultToBackupInfo(res); 528 } 529 } 530 531 /** 532 * Read the last backup start code (timestamp) of last successful backup. Will return null if 533 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 534 * there is no successful backup completed so far. 535 * @param backupRoot directory path to backup destination 536 * @return the timestamp of last successful backup 537 * @throws IOException exception 538 */ 539 public String readBackupStartCode(String backupRoot) throws IOException { 540 LOG.trace("read backup start code from backup system table"); 541 542 try (Table table = connection.getTable(tableName)) { 543 Get get = createGetForStartCode(backupRoot); 544 Result res = table.get(get); 545 if (res.isEmpty()) { 546 return null; 547 } 548 Cell cell = res.listCells().get(0); 549 byte[] val = CellUtil.cloneValue(cell); 550 if (val.length == 0) { 551 return null; 552 } 553 return new String(val, StandardCharsets.UTF_8); 554 } 555 } 556 557 /** 558 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 559 * @param startCode start code 560 * @param backupRoot root directory path to backup 561 * @throws IOException exception 562 */ 563 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 564 if (LOG.isTraceEnabled()) { 565 LOG.trace("write backup start code to backup system table " + startCode); 566 } 567 try (Table table = connection.getTable(tableName)) { 568 Put put = createPutForStartCode(startCode.toString(), backupRoot); 569 table.put(put); 570 } 571 } 572 573 /** 574 * Exclusive operations are: create, delete, merge 575 * @throws IOException if a table operation fails or an active backup exclusive operation is 576 * already underway 577 */ 578 public void startBackupExclusiveOperation() throws IOException { 579 LOG.debug("Start new backup exclusive operation"); 580 581 try (Table table = connection.getTable(tableName)) { 582 Put put = createPutForStartBackupSession(); 583 // First try to put if row does not exist 584 if ( 585 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 586 .ifNotExists().thenPut(put) 587 ) { 588 // Row exists, try to put if value == ACTIVE_SESSION_NO 589 if ( 590 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 591 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 592 ) { 593 throw new ExclusiveOperationException(); 594 } 595 } 596 } 597 } 598 599 private Put createPutForStartBackupSession() { 600 Put put = new Put(ACTIVE_SESSION_ROW); 601 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 602 return put; 603 } 604 605 public void finishBackupExclusiveOperation() throws IOException { 606 LOG.debug("Finish backup exclusive operation"); 607 608 try (Table table = connection.getTable(tableName)) { 609 Put put = createPutForStopBackupSession(); 610 if ( 611 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 612 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 613 ) { 614 throw new IOException("There is no active backup exclusive operation"); 615 } 616 } 617 } 618 619 private Put createPutForStopBackupSession() { 620 Put put = new Put(ACTIVE_SESSION_ROW); 621 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 622 return put; 623 } 624 625 /** 626 * Get the Region Servers log information after the last log roll from backup system table. 627 * @param backupRoot root directory path to backup 628 * @return RS log info 629 * @throws IOException exception 630 */ 631 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 632 throws IOException { 633 LOG.trace("read region server last roll log result to backup system table"); 634 635 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 636 637 try (Table table = connection.getTable(tableName); 638 ResultScanner scanner = table.getScanner(scan)) { 639 Result res; 640 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 641 while ((res = scanner.next()) != null) { 642 res.advance(); 643 Cell cell = res.current(); 644 byte[] row = CellUtil.cloneRow(cell); 645 String server = getServerNameForReadRegionServerLastLogRollResult(row); 646 byte[] data = CellUtil.cloneValue(cell); 647 rsTimestampMap.put(server, Bytes.toLong(data)); 648 } 649 return rsTimestampMap; 650 } 651 } 652 653 /** 654 * Writes Region Server last roll log result (timestamp) to backup system table table 655 * @param server Region Server name 656 * @param ts last log timestamp 657 * @param backupRoot root directory path to backup 658 * @throws IOException exception 659 */ 660 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 661 throws IOException { 662 LOG.trace("write region server last roll log result to backup system table"); 663 664 try (Table table = connection.getTable(tableName)) { 665 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 666 table.put(put); 667 } 668 } 669 670 /** 671 * Get all completed backup information (in desc order by time) 672 * @param onlyCompleted true, if only successfully completed sessions 673 * @return history info of BackupCompleteData 674 * @throws IOException exception 675 */ 676 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { 677 LOG.trace("get backup history from backup system table"); 678 679 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; 680 ArrayList<BackupInfo> list = getBackupInfos(state); 681 return BackupUtils.sortHistoryListDesc(list); 682 } 683 684 /** 685 * Get all backups history 686 * @return list of backup info 687 * @throws IOException if getting the backup history fails 688 */ 689 public List<BackupInfo> getBackupHistory() throws IOException { 690 return getBackupHistory(false); 691 } 692 693 /** 694 * Get first n backup history records 695 * @param n number of records, if n== -1 - max number is ignored 696 * @return list of records 697 * @throws IOException if getting the backup history fails 698 */ 699 public List<BackupInfo> getHistory(int n) throws IOException { 700 List<BackupInfo> history = getBackupHistory(); 701 if (n == -1 || history.size() <= n) { 702 return history; 703 } 704 return Collections.unmodifiableList(history.subList(0, n)); 705 } 706 707 /** 708 * Get backup history records filtered by list of filters. 709 * @param n max number of records, if n == -1 , then max number is ignored 710 * @param filters list of filters 711 * @return backup records 712 * @throws IOException if getting the backup history fails 713 */ 714 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { 715 if (filters.length == 0) { 716 return getHistory(n); 717 } 718 719 List<BackupInfo> history = getBackupHistory(); 720 List<BackupInfo> result = new ArrayList<>(); 721 for (BackupInfo bi : history) { 722 if (n >= 0 && result.size() == n) { 723 break; 724 } 725 726 boolean passed = true; 727 for (int i = 0; i < filters.length; i++) { 728 if (!filters[i].apply(bi)) { 729 passed = false; 730 break; 731 } 732 } 733 if (passed) { 734 result.add(bi); 735 } 736 } 737 return result; 738 } 739 740 /** 741 * Retrieve all table names that are part of any known backup 742 */ 743 public Set<TableName> getTablesIncludedInBackups() throws IOException { 744 Set<TableName> names = new HashSet<>(); 745 List<BackupInfo> infos = getBackupHistory(true); 746 for (BackupInfo info : infos) { 747 // Incremental backups have the same tables as the preceding full backups 748 if (info.getType() == BackupType.FULL) { 749 names.addAll(info.getTableNames()); 750 } 751 } 752 return names; 753 } 754 755 /** 756 * Get history for backup destination 757 * @param backupRoot backup destination path 758 * @return List of backup info 759 * @throws IOException if getting the backup history fails 760 */ 761 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { 762 ArrayList<BackupInfo> history = getBackupHistory(false); 763 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 764 BackupInfo info = iterator.next(); 765 if (!backupRoot.equals(info.getBackupRootDir())) { 766 iterator.remove(); 767 } 768 } 769 return history; 770 } 771 772 /** 773 * Get history for a table 774 * @param name table name 775 * @return history for a table 776 * @throws IOException if getting the backup history fails 777 */ 778 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { 779 List<BackupInfo> history = getBackupHistory(); 780 List<BackupInfo> tableHistory = new ArrayList<>(); 781 for (BackupInfo info : history) { 782 List<TableName> tables = info.getTableNames(); 783 if (tables.contains(name)) { 784 tableHistory.add(info); 785 } 786 } 787 return tableHistory; 788 } 789 790 /** 791 * Goes through all backup history corresponding to the provided root folder, and collects all 792 * backup info mentioning each of the provided tables. 793 * @param set the tables for which to collect the {@code BackupInfo} 794 * @param backupRoot backup destination path to retrieve backup history for 795 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 796 * least one {@code BackupInfo} 797 * @throws IOException if getting the backup history fails 798 */ 799 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 800 String backupRoot) throws IOException { 801 List<BackupInfo> history = getBackupHistory(backupRoot); 802 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 803 for (BackupInfo info : history) { 804 List<TableName> tables = info.getTableNames(); 805 for (TableName tableName : tables) { 806 if (set.contains(tableName)) { 807 List<BackupInfo> list = 808 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 809 list.add(info); 810 } 811 } 812 } 813 return tableHistoryMap; 814 } 815 816 /** 817 * Get all backup sessions with a given state (in descending order by time) 818 * @param state backup session state 819 * @return history info of backup info objects 820 * @throws IOException exception 821 */ 822 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { 823 LOG.trace("get backup infos from backup system table"); 824 825 Scan scan = createScanForBackupHistory(); 826 ArrayList<BackupInfo> list = new ArrayList<>(); 827 828 try (Table table = connection.getTable(tableName); 829 ResultScanner scanner = table.getScanner(scan)) { 830 Result res; 831 while ((res = scanner.next()) != null) { 832 res.advance(); 833 BackupInfo context = cellToBackupInfo(res.current()); 834 if (state != BackupState.ANY && context.getState() != state) { 835 continue; 836 } 837 list.add(context); 838 } 839 return list; 840 } 841 } 842 843 /** 844 * Write the current timestamps for each regionserver to backup system table after a successful 845 * full or incremental backup. The saved timestamp is of the last log file that was backed up 846 * already. 847 * @param tables tables 848 * @param newTimestamps timestamps 849 * @param backupRoot root directory path to backup 850 * @throws IOException exception 851 */ 852 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 853 String backupRoot) throws IOException { 854 if (LOG.isTraceEnabled()) { 855 LOG.trace("write RS log time stamps to backup system table for tables [" 856 + StringUtils.join(tables, ",") + "]"); 857 } 858 List<Put> puts = new ArrayList<>(); 859 for (TableName table : tables) { 860 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 861 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 862 puts.add(put); 863 } 864 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 865 bufferedMutator.mutate(puts); 866 } 867 } 868 869 /** 870 * Read the timestamp for each region server log after the last successful backup. Each table has 871 * its own set of the timestamps. The info is stored for each table as a concatenated string of 872 * rs->timestapmp 873 * @param backupRoot root directory path to backup 874 * @return the timestamp for each region server. key: tableName value: 875 * RegionServer,PreviousTimeStamp 876 * @throws IOException exception 877 */ 878 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 879 throws IOException { 880 if (LOG.isTraceEnabled()) { 881 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 882 } 883 884 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 885 886 Scan scan = createScanForReadLogTimestampMap(backupRoot); 887 try (Table table = connection.getTable(tableName); 888 ResultScanner scanner = table.getScanner(scan)) { 889 Result res; 890 while ((res = scanner.next()) != null) { 891 res.advance(); 892 Cell cell = res.current(); 893 byte[] row = CellUtil.cloneRow(cell); 894 String tabName = getTableNameForReadLogTimestampMap(row); 895 TableName tn = TableName.valueOf(tabName); 896 byte[] data = CellUtil.cloneValue(cell); 897 if (data == null) { 898 throw new IOException("Data of last backup data from backup system table " 899 + "is empty. Create a backup first."); 900 } 901 if (data != null && data.length > 0) { 902 HashMap<String, Long> lastBackup = 903 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 904 tableTimestampMap.put(tn, lastBackup); 905 } 906 } 907 return tableTimestampMap; 908 } 909 } 910 911 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 912 Map<String, Long> map) { 913 BackupProtos.TableServerTimestamp.Builder tstBuilder = 914 BackupProtos.TableServerTimestamp.newBuilder(); 915 tstBuilder 916 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 917 918 for (Entry<String, Long> entry : map.entrySet()) { 919 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 920 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 921 ServerName sn = ServerName.parseServerName(entry.getKey()); 922 snBuilder.setHostName(sn.getHostname()); 923 snBuilder.setPort(sn.getPort()); 924 builder.setServerName(snBuilder.build()); 925 builder.setTimestamp(entry.getValue()); 926 tstBuilder.addServerTimestamp(builder.build()); 927 } 928 929 return tstBuilder.build(); 930 } 931 932 private HashMap<String, Long> 933 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 934 935 HashMap<String, Long> map = new HashMap<>(); 936 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 937 for (BackupProtos.ServerTimestamp st : list) { 938 ServerName sn = 939 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 940 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 941 } 942 return map; 943 } 944 945 /** 946 * Return the current tables covered by incremental backup. 947 * @param backupRoot root directory path to backup 948 * @return set of tableNames 949 * @throws IOException exception 950 */ 951 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 952 LOG.trace("get incremental backup table set from backup system table"); 953 954 TreeSet<TableName> set = new TreeSet<>(); 955 956 try (Table table = connection.getTable(tableName)) { 957 Get get = createGetForIncrBackupTableSet(backupRoot); 958 Result res = table.get(get); 959 if (res.isEmpty()) { 960 return set; 961 } 962 List<Cell> cells = res.listCells(); 963 for (Cell cell : cells) { 964 // qualifier = table name - we use table names as qualifiers 965 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 966 } 967 return set; 968 } 969 } 970 971 /** 972 * Add tables to global incremental backup set 973 * @param tables set of tables 974 * @param backupRoot root directory path to backup 975 * @throws IOException exception 976 */ 977 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 978 throws IOException { 979 if (LOG.isTraceEnabled()) { 980 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 981 + " tables [" + StringUtils.join(tables, " ") + "]"); 982 } 983 if (LOG.isDebugEnabled()) { 984 tables.forEach(table -> LOG.debug(Objects.toString(table))); 985 } 986 try (Table table = connection.getTable(tableName)) { 987 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 988 table.put(put); 989 } 990 } 991 992 /** 993 * Deletes incremental backup set for a backup destination 994 * @param backupRoot backup root 995 */ 996 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 997 if (LOG.isTraceEnabled()) { 998 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 999 } 1000 try (Table table = connection.getTable(tableName)) { 1001 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 1002 table.delete(delete); 1003 } 1004 } 1005 1006 /** 1007 * Checks if we have at least one backup session in backup system table This API is used by 1008 * BackupLogCleaner 1009 * @return true, if - at least one session exists in backup system table table 1010 * @throws IOException exception 1011 */ 1012 public boolean hasBackupSessions() throws IOException { 1013 LOG.trace("Has backup sessions from backup system table"); 1014 1015 boolean result = false; 1016 Scan scan = createScanForBackupHistory(); 1017 scan.setCaching(1); 1018 try (Table table = connection.getTable(tableName); 1019 ResultScanner scanner = table.getScanner(scan)) { 1020 if (scanner.next() != null) { 1021 result = true; 1022 } 1023 return result; 1024 } 1025 } 1026 1027 /** 1028 * BACKUP SETS 1029 */ 1030 1031 /** 1032 * Get backup set list 1033 * @return backup set list 1034 * @throws IOException if a table or scanner operation fails 1035 */ 1036 public List<String> listBackupSets() throws IOException { 1037 LOG.trace("Backup set list"); 1038 1039 List<String> list = new ArrayList<>(); 1040 try (Table table = connection.getTable(tableName)) { 1041 Scan scan = createScanForBackupSetList(); 1042 scan.readVersions(1); 1043 try (ResultScanner scanner = table.getScanner(scan)) { 1044 Result res; 1045 while ((res = scanner.next()) != null) { 1046 res.advance(); 1047 list.add(cellKeyToBackupSetName(res.current())); 1048 } 1049 return list; 1050 } 1051 } 1052 } 1053 1054 /** 1055 * Get backup set description (list of tables) 1056 * @param name set's name 1057 * @return list of tables in a backup set 1058 * @throws IOException if a table operation fails 1059 */ 1060 public List<TableName> describeBackupSet(String name) throws IOException { 1061 if (LOG.isTraceEnabled()) { 1062 LOG.trace(" Backup set describe: " + name); 1063 } 1064 try (Table table = connection.getTable(tableName)) { 1065 Get get = createGetForBackupSet(name); 1066 Result res = table.get(get); 1067 if (res.isEmpty()) { 1068 return null; 1069 } 1070 res.advance(); 1071 String[] tables = cellValueToBackupSet(res.current()); 1072 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 1073 .collect(Collectors.toList()); 1074 } 1075 } 1076 1077 /** 1078 * Add backup set (list of tables) 1079 * @param name set name 1080 * @param newTables list of tables, comma-separated 1081 * @throws IOException if a table operation fails 1082 */ 1083 public void addToBackupSet(String name, String[] newTables) throws IOException { 1084 if (LOG.isTraceEnabled()) { 1085 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 1086 } 1087 String[] union = null; 1088 try (Table table = connection.getTable(tableName)) { 1089 Get get = createGetForBackupSet(name); 1090 Result res = table.get(get); 1091 if (res.isEmpty()) { 1092 union = newTables; 1093 } else { 1094 res.advance(); 1095 String[] tables = cellValueToBackupSet(res.current()); 1096 union = merge(tables, newTables); 1097 } 1098 Put put = createPutForBackupSet(name, union); 1099 table.put(put); 1100 } 1101 } 1102 1103 /** 1104 * Remove tables from backup set (list of tables) 1105 * @param name set name 1106 * @param toRemove list of tables 1107 * @throws IOException if a table operation or deleting the backup set fails 1108 */ 1109 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 1110 if (LOG.isTraceEnabled()) { 1111 LOG.trace( 1112 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 1113 } 1114 String[] disjoint; 1115 String[] tables; 1116 try (Table table = connection.getTable(tableName)) { 1117 Get get = createGetForBackupSet(name); 1118 Result res = table.get(get); 1119 if (res.isEmpty()) { 1120 LOG.warn("Backup set '" + name + "' not found."); 1121 return; 1122 } else { 1123 res.advance(); 1124 tables = cellValueToBackupSet(res.current()); 1125 disjoint = disjoin(tables, toRemove); 1126 } 1127 if (disjoint.length > 0 && disjoint.length != tables.length) { 1128 Put put = createPutForBackupSet(name, disjoint); 1129 table.put(put); 1130 } else if (disjoint.length == tables.length) { 1131 LOG.warn("Backup set '" + name + "' does not contain tables [" 1132 + StringUtils.join(toRemove, " ") + "]"); 1133 } else { // disjoint.length == 0 and tables.length >0 1134 // Delete backup set 1135 LOG.info("Backup set '" + name + "' is empty. Deleting."); 1136 deleteBackupSet(name); 1137 } 1138 } 1139 } 1140 1141 private String[] merge(String[] existingTables, String[] newTables) { 1142 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1143 tables.addAll(Arrays.asList(newTables)); 1144 return tables.toArray(new String[0]); 1145 } 1146 1147 private String[] disjoin(String[] existingTables, String[] toRemove) { 1148 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1149 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 1150 return tables.toArray(new String[0]); 1151 } 1152 1153 /** 1154 * Delete backup set 1155 * @param name set's name 1156 * @throws IOException if getting or deleting the table fails 1157 */ 1158 public void deleteBackupSet(String name) throws IOException { 1159 if (LOG.isTraceEnabled()) { 1160 LOG.trace(" Backup set delete: " + name); 1161 } 1162 try (Table table = connection.getTable(tableName)) { 1163 Delete del = createDeleteForBackupSet(name); 1164 table.delete(del); 1165 } 1166 } 1167 1168 /** 1169 * Get backup system table descriptor 1170 * @return table's descriptor 1171 */ 1172 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1173 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1174 1175 ColumnFamilyDescriptorBuilder colBuilder = 1176 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1177 1178 colBuilder.setMaxVersions(1); 1179 Configuration config = HBaseConfiguration.create(); 1180 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1181 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1182 colBuilder.setTimeToLive(ttl); 1183 1184 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1185 builder.setColumnFamily(colSessionsDesc); 1186 1187 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1188 colBuilder.setTimeToLive(ttl); 1189 builder.setColumnFamily(colBuilder.build()); 1190 return builder.build(); 1191 } 1192 1193 public static TableName getTableName(Configuration conf) { 1194 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1195 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1196 return TableName.valueOf(name); 1197 } 1198 1199 public static String getTableNameAsString(Configuration conf) { 1200 return getTableName(conf).getNameAsString(); 1201 } 1202 1203 public static String getSnapshotName(Configuration conf) { 1204 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1205 } 1206 1207 /** 1208 * Get backup system table descriptor 1209 * @return table's descriptor 1210 */ 1211 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1212 TableDescriptorBuilder builder = 1213 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1214 1215 ColumnFamilyDescriptorBuilder colBuilder = 1216 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1217 colBuilder.setMaxVersions(1); 1218 Configuration config = HBaseConfiguration.create(); 1219 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1220 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1221 colBuilder.setTimeToLive(ttl); 1222 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1223 builder.setColumnFamily(colSessionsDesc); 1224 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1225 colBuilder.setTimeToLive(ttl); 1226 builder.setColumnFamily(colBuilder.build()); 1227 return builder.build(); 1228 } 1229 1230 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1231 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1232 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1233 return TableName.valueOf(name); 1234 } 1235 1236 /** 1237 * Creates Put operation for a given backup info object 1238 * @param context backup info 1239 * @return put operation 1240 * @throws IOException exception 1241 */ 1242 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1243 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1244 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1245 context.toByteArray()); 1246 return put; 1247 } 1248 1249 /** 1250 * Creates Get operation for a given backup id 1251 * @param backupId backup's ID 1252 * @return get operation 1253 * @throws IOException exception 1254 */ 1255 private Get createGetForBackupInfo(String backupId) throws IOException { 1256 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1257 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1258 get.readVersions(1); 1259 return get; 1260 } 1261 1262 /** 1263 * Creates Delete operation for a given backup id 1264 * @param backupId backup's ID 1265 * @return delete operation 1266 */ 1267 private Delete createDeleteForBackupInfo(String backupId) { 1268 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1269 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1270 return del; 1271 } 1272 1273 /** 1274 * Converts Result to BackupInfo 1275 * @param res HBase result 1276 * @return backup info instance 1277 * @throws IOException exception 1278 */ 1279 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1280 res.advance(); 1281 Cell cell = res.current(); 1282 return cellToBackupInfo(cell); 1283 } 1284 1285 /** 1286 * Creates Get operation to retrieve start code from backup system table 1287 * @return get operation 1288 * @throws IOException exception 1289 */ 1290 private Get createGetForStartCode(String rootPath) throws IOException { 1291 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1292 get.addFamily(BackupSystemTable.META_FAMILY); 1293 get.readVersions(1); 1294 return get; 1295 } 1296 1297 /** 1298 * Creates Put operation to store start code to backup system table 1299 * @return put operation 1300 */ 1301 private Put createPutForStartCode(String startCode, String rootPath) { 1302 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1303 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1304 Bytes.toBytes(startCode)); 1305 return put; 1306 } 1307 1308 /** 1309 * Creates Get to retrieve incremental backup table set from backup system table 1310 * @return get operation 1311 * @throws IOException exception 1312 */ 1313 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1314 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1315 get.addFamily(BackupSystemTable.META_FAMILY); 1316 get.readVersions(1); 1317 return get; 1318 } 1319 1320 /** 1321 * Creates Put to store incremental backup table set 1322 * @param tables tables 1323 * @return put operation 1324 */ 1325 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1326 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1327 for (TableName table : tables) { 1328 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1329 EMPTY_VALUE); 1330 } 1331 return put; 1332 } 1333 1334 /** 1335 * Creates Delete for incremental backup table set 1336 * @param backupRoot backup root 1337 * @return delete operation 1338 */ 1339 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1340 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1341 delete.addFamily(BackupSystemTable.META_FAMILY); 1342 return delete; 1343 } 1344 1345 /** 1346 * Creates Scan operation to load backup history 1347 * @return scan operation 1348 */ 1349 private Scan createScanForBackupHistory() { 1350 Scan scan = new Scan(); 1351 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1352 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1353 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1354 scan.withStartRow(startRow); 1355 scan.withStopRow(stopRow); 1356 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1357 scan.readVersions(1); 1358 return scan; 1359 } 1360 1361 /** 1362 * Converts cell to backup info instance. 1363 * @param current current cell 1364 * @return backup backup info instance 1365 * @throws IOException exception 1366 */ 1367 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1368 byte[] data = CellUtil.cloneValue(current); 1369 return BackupInfo.fromByteArray(data); 1370 } 1371 1372 /** 1373 * Creates Put to write RS last roll log timestamp map 1374 * @param table table 1375 * @param smap map, containing RS:ts 1376 * @return put operation 1377 */ 1378 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1379 String backupRoot) { 1380 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1381 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1382 return put; 1383 } 1384 1385 /** 1386 * Creates Scan to load table-> { RS -> ts} map of maps 1387 * @return scan operation 1388 */ 1389 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1390 Scan scan = new Scan(); 1391 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1392 scan.addFamily(BackupSystemTable.META_FAMILY); 1393 1394 return scan; 1395 } 1396 1397 /** 1398 * Get table name from rowkey 1399 * @param cloneRow rowkey 1400 * @return table name 1401 */ 1402 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1403 String s = Bytes.toString(cloneRow); 1404 int index = s.lastIndexOf(NULL); 1405 return s.substring(index + 1); 1406 } 1407 1408 /** 1409 * Creates Put to store RS last log result 1410 * @param server server name 1411 * @param timestamp log roll result (timestamp) 1412 * @return put operation 1413 */ 1414 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1415 String backupRoot) { 1416 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1417 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1418 Bytes.toBytes(timestamp)); 1419 return put; 1420 } 1421 1422 /** 1423 * Creates Scan operation to load last RS log roll results 1424 * @return scan operation 1425 */ 1426 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1427 Scan scan = new Scan(); 1428 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1429 scan.addFamily(BackupSystemTable.META_FAMILY); 1430 scan.readVersions(1); 1431 1432 return scan; 1433 } 1434 1435 /** 1436 * Get server's name from rowkey 1437 * @param row rowkey 1438 * @return server's name 1439 */ 1440 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1441 String s = Bytes.toString(row); 1442 int index = s.lastIndexOf(NULL); 1443 return s.substring(index + 1); 1444 } 1445 1446 /** 1447 * Creates Put's for bulk loads. 1448 */ 1449 private static List<Put> createPutForBulkLoad(TableName table, byte[] region, 1450 Map<byte[], List<Path>> columnFamilyToHFilePaths) { 1451 List<Put> puts = new ArrayList<>(); 1452 for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) { 1453 for (Path path : entry.getValue()) { 1454 String file = path.toString(); 1455 int lastSlash = file.lastIndexOf("/"); 1456 String filename = file.substring(lastSlash + 1); 1457 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1458 Bytes.toString(region), BLK_LD_DELIM, filename)); 1459 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1460 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1461 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1462 puts.add(put); 1463 LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); 1464 } 1465 } 1466 return puts; 1467 } 1468 1469 public static void snapshot(Connection conn) throws IOException { 1470 try (Admin admin = conn.getAdmin()) { 1471 Configuration conf = conn.getConfiguration(); 1472 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1473 } 1474 } 1475 1476 public static void restoreFromSnapshot(Connection conn) throws IOException { 1477 Configuration conf = conn.getConfiguration(); 1478 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1479 try (Admin admin = conn.getAdmin()) { 1480 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1481 if (snapshotExists(admin, snapshotName)) { 1482 admin.disableTable(BackupSystemTable.getTableName(conf)); 1483 admin.restoreSnapshot(snapshotName); 1484 admin.enableTable(BackupSystemTable.getTableName(conf)); 1485 LOG.debug("Done restoring backup system table"); 1486 } else { 1487 // Snapshot does not exists, i.e completeBackup failed after 1488 // deleting backup system table snapshot 1489 // In this case we log WARN and proceed 1490 LOG.warn( 1491 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1492 } 1493 } 1494 } 1495 1496 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1497 List<SnapshotDescription> list = admin.listSnapshots(); 1498 for (SnapshotDescription desc : list) { 1499 if (desc.getName().equals(snapshotName)) { 1500 return true; 1501 } 1502 } 1503 return false; 1504 } 1505 1506 public static boolean snapshotExists(Connection conn) throws IOException { 1507 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1508 } 1509 1510 public static void deleteSnapshot(Connection conn) throws IOException { 1511 Configuration conf = conn.getConfiguration(); 1512 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1513 try (Admin admin = conn.getAdmin()) { 1514 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1515 if (snapshotExists(admin, snapshotName)) { 1516 admin.deleteSnapshot(snapshotName); 1517 LOG.debug("Done deleting backup system table snapshot"); 1518 } else { 1519 LOG.error("Snapshot " + snapshotName + " does not exists"); 1520 } 1521 } 1522 } 1523 1524 private Put createPutForDeleteOperation(String[] backupIdList) { 1525 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1526 Put put = new Put(DELETE_OP_ROW); 1527 put.addColumn(META_FAMILY, FAM_COL, value); 1528 return put; 1529 } 1530 1531 private Delete createDeleteForBackupDeleteOperation() { 1532 Delete delete = new Delete(DELETE_OP_ROW); 1533 delete.addFamily(META_FAMILY); 1534 return delete; 1535 } 1536 1537 private Get createGetForDeleteOperation() { 1538 Get get = new Get(DELETE_OP_ROW); 1539 get.addFamily(META_FAMILY); 1540 return get; 1541 } 1542 1543 public void startDeleteOperation(String[] backupIdList) throws IOException { 1544 if (LOG.isTraceEnabled()) { 1545 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1546 } 1547 Put put = createPutForDeleteOperation(backupIdList); 1548 try (Table table = connection.getTable(tableName)) { 1549 table.put(put); 1550 } 1551 } 1552 1553 public void finishDeleteOperation() throws IOException { 1554 LOG.trace("Finsih delete operation for backup ids"); 1555 1556 Delete delete = createDeleteForBackupDeleteOperation(); 1557 try (Table table = connection.getTable(tableName)) { 1558 table.delete(delete); 1559 } 1560 } 1561 1562 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1563 LOG.trace("Get delete operation for backup ids"); 1564 1565 Get get = createGetForDeleteOperation(); 1566 try (Table table = connection.getTable(tableName)) { 1567 Result res = table.get(get); 1568 if (res.isEmpty()) { 1569 return null; 1570 } 1571 Cell cell = res.listCells().get(0); 1572 byte[] val = CellUtil.cloneValue(cell); 1573 if (val.length == 0) { 1574 return null; 1575 } 1576 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1577 .toArray(String[]::new); 1578 } 1579 } 1580 1581 private Put createPutForMergeOperation(String[] backupIdList) { 1582 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1583 Put put = new Put(MERGE_OP_ROW); 1584 put.addColumn(META_FAMILY, FAM_COL, value); 1585 return put; 1586 } 1587 1588 public boolean isMergeInProgress() throws IOException { 1589 Get get = new Get(MERGE_OP_ROW); 1590 try (Table table = connection.getTable(tableName)) { 1591 Result res = table.get(get); 1592 return !res.isEmpty(); 1593 } 1594 } 1595 1596 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1597 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1598 Put put = new Put(MERGE_OP_ROW); 1599 put.addColumn(META_FAMILY, PATH_COL, value); 1600 return put; 1601 } 1602 1603 private Delete createDeleteForBackupMergeOperation() { 1604 Delete delete = new Delete(MERGE_OP_ROW); 1605 delete.addFamily(META_FAMILY); 1606 return delete; 1607 } 1608 1609 private Get createGetForMergeOperation() { 1610 Get get = new Get(MERGE_OP_ROW); 1611 get.addFamily(META_FAMILY); 1612 return get; 1613 } 1614 1615 public void startMergeOperation(String[] backupIdList) throws IOException { 1616 if (LOG.isTraceEnabled()) { 1617 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1618 } 1619 Put put = createPutForMergeOperation(backupIdList); 1620 try (Table table = connection.getTable(tableName)) { 1621 table.put(put); 1622 } 1623 } 1624 1625 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1626 if (LOG.isTraceEnabled()) { 1627 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1628 } 1629 Put put = createPutForUpdateTablesForMerge(tables); 1630 try (Table table = connection.getTable(tableName)) { 1631 table.put(put); 1632 } 1633 } 1634 1635 public void finishMergeOperation() throws IOException { 1636 LOG.trace("Finish merge operation for backup ids"); 1637 1638 Delete delete = createDeleteForBackupMergeOperation(); 1639 try (Table table = connection.getTable(tableName)) { 1640 table.delete(delete); 1641 } 1642 } 1643 1644 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1645 LOG.trace("Get backup ids for merge operation"); 1646 1647 Get get = createGetForMergeOperation(); 1648 try (Table table = connection.getTable(tableName)) { 1649 Result res = table.get(get); 1650 if (res.isEmpty()) { 1651 return null; 1652 } 1653 Cell cell = res.listCells().get(0); 1654 byte[] val = CellUtil.cloneValue(cell); 1655 if (val.length == 0) { 1656 return null; 1657 } 1658 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1659 .toArray(String[]::new); 1660 } 1661 } 1662 1663 static Scan createScanForOrigBulkLoadedFiles(TableName table) { 1664 Scan scan = new Scan(); 1665 byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1666 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1667 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1668 scan.withStartRow(startRow); 1669 scan.withStopRow(stopRow); 1670 scan.addFamily(BackupSystemTable.META_FAMILY); 1671 scan.readVersions(1); 1672 return scan; 1673 } 1674 1675 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1676 // format is bulk : namespace : table : region : file 1677 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1678 } 1679 1680 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1681 // format is bulk : namespace : table : region : file 1682 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1683 Iterator<String> i = parts.iterator(); 1684 int idx = 3; 1685 if (parts.size() == 4) { 1686 // the table is in default namespace 1687 idx = 2; 1688 } 1689 String region = Iterators.get(i, idx); 1690 LOG.debug("bulk row string " + rowStr + " region " + region); 1691 return region; 1692 } 1693 1694 /* 1695 * Used to query bulk loaded hfiles which have been copied by incremental backup 1696 * @param backupId the backup Id. It can be null when querying for all tables 1697 * @return the Scan object 1698 */ 1699 static Scan createScanForBulkLoadedFiles(String backupId) { 1700 Scan scan = new Scan(); 1701 byte[] startRow = 1702 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1703 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1704 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1705 scan.withStartRow(startRow); 1706 scan.withStopRow(stopRow); 1707 scan.addFamily(BackupSystemTable.META_FAMILY); 1708 scan.readVersions(1); 1709 return scan; 1710 } 1711 1712 static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, 1713 long ts, int idx) { 1714 Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); 1715 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); 1716 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); 1717 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p)); 1718 return put; 1719 } 1720 1721 /** 1722 * Creates Scan operation to load backup set list 1723 * @return scan operation 1724 */ 1725 private Scan createScanForBackupSetList() { 1726 Scan scan = new Scan(); 1727 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1728 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1729 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1730 scan.withStartRow(startRow); 1731 scan.withStopRow(stopRow); 1732 scan.addFamily(BackupSystemTable.META_FAMILY); 1733 return scan; 1734 } 1735 1736 /** 1737 * Creates Get operation to load backup set content 1738 * @return get operation 1739 */ 1740 private Get createGetForBackupSet(String name) { 1741 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1742 get.addFamily(BackupSystemTable.META_FAMILY); 1743 return get; 1744 } 1745 1746 /** 1747 * Creates Delete operation to delete backup set content 1748 * @param name backup set's name 1749 * @return delete operation 1750 */ 1751 private Delete createDeleteForBackupSet(String name) { 1752 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1753 del.addFamily(BackupSystemTable.META_FAMILY); 1754 return del; 1755 } 1756 1757 /** 1758 * Creates Put operation to update backup set content 1759 * @param name backup set's name 1760 * @param tables list of tables 1761 * @return put operation 1762 */ 1763 private Put createPutForBackupSet(String name, String[] tables) { 1764 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1765 byte[] value = convertToByteArray(tables); 1766 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1767 return put; 1768 } 1769 1770 private byte[] convertToByteArray(String[] tables) { 1771 return Bytes.toBytes(StringUtils.join(tables, ",")); 1772 } 1773 1774 /** 1775 * Converts cell to backup set list. 1776 * @param current current cell 1777 * @return backup set as array of table names 1778 */ 1779 private String[] cellValueToBackupSet(Cell current) { 1780 byte[] data = CellUtil.cloneValue(current); 1781 if (!ArrayUtils.isEmpty(data)) { 1782 return Bytes.toString(data).split(","); 1783 } 1784 return new String[0]; 1785 } 1786 1787 /** 1788 * Converts cell key to backup set name. 1789 * @param current current cell 1790 * @return backup set name 1791 */ 1792 private String cellKeyToBackupSetName(Cell current) { 1793 byte[] data = CellUtil.cloneRow(current); 1794 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1795 } 1796 1797 private static byte[] rowkey(String s, String... other) { 1798 StringBuilder sb = new StringBuilder(s); 1799 for (String ss : other) { 1800 sb.append(ss); 1801 } 1802 return Bytes.toBytes(sb.toString()); 1803 } 1804 1805 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1806 if (!admin.isTableEnabled(tableName)) { 1807 try { 1808 admin.enableTable(tableName); 1809 } catch (TableNotDisabledException ignored) { 1810 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1811 } 1812 } 1813 } 1814}