001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.charset.StandardCharsets; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.Objects; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.ArrayUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.NamespaceDescriptor; 046import org.apache.hadoop.hbase.NamespaceExistException; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNotDisabledException; 051import org.apache.hadoop.hbase.backup.BackupInfo; 052import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 053import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 054import org.apache.hadoop.hbase.backup.BackupType; 055import org.apache.hadoop.hbase.backup.util.BackupUtils; 056import org.apache.hadoop.hbase.client.Admin; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.Delete; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.SnapshotDescription; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.Pair; 074import org.apache.yetus.audience.InterfaceAudience; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 079import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 080 081import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 083 084/** 085 * This class provides API to access backup system table<br> 086 * Backup system table schema:<br> 087 * <p> 088 * <ul> 089 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 090 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 091 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 092 * include table; value=empty</li> 093 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 094 * timestamp]</li> 095 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 096 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 097 * name</li> 098 * </ul> 099 * </p> 100 */ 101@InterfaceAudience.Private 102public final class BackupSystemTable implements Closeable { 103 104 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 105 106 static class WALItem { 107 String backupId; 108 String walFile; 109 String backupRoot; 110 111 WALItem(String backupId, String walFile, String backupRoot) { 112 this.backupId = backupId; 113 this.walFile = walFile; 114 this.backupRoot = backupRoot; 115 } 116 117 public String getBackupId() { 118 return backupId; 119 } 120 121 public String getWalFile() { 122 return walFile; 123 } 124 125 public String getBackupRoot() { 126 return backupRoot; 127 } 128 129 @Override 130 public String toString() { 131 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 132 } 133 } 134 135 /** 136 * Backup system table (main) name 137 */ 138 private TableName tableName; 139 140 /** 141 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 142 * separate table because we have to isolate general backup operations: create, merge etc from 143 * activity of RegionObserver, which controls process of a bulk loading 144 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 145 */ 146 private TableName bulkLoadTableName; 147 148 /** 149 * Stores backup sessions (contexts) 150 */ 151 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 152 /** 153 * Stores other meta 154 */ 155 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 156 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 157 /** 158 * Connection to HBase cluster, shared among all instances 159 */ 160 private final Connection connection; 161 162 private final static String BACKUP_INFO_PREFIX = "session:"; 163 private final static String START_CODE_ROW = "startcode:"; 164 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 165 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 166 167 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 168 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 169 170 private final static String INCR_BACKUP_SET = "incrbackupset:"; 171 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 172 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 173 174 private final static String BULK_LOAD_PREFIX = "bulk:"; 175 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 176 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 177 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 178 179 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 180 final static byte[] FAM_COL = Bytes.toBytes("fam"); 181 final static byte[] PATH_COL = Bytes.toBytes("path"); 182 final static byte[] STATE_COL = Bytes.toBytes("state"); 183 // the two states a bulk loaded file can be 184 final static byte[] BL_PREPARE = Bytes.toBytes("R"); 185 final static byte[] BL_COMMIT = Bytes.toBytes("D"); 186 187 private final static String SET_KEY_PREFIX = "backupset:"; 188 189 // separator between BULK_LOAD_PREFIX and ordinals 190 private final static String BLK_LD_DELIM = ":"; 191 private final static byte[] EMPTY_VALUE = new byte[] {}; 192 193 // Safe delimiter in a string 194 private final static String NULL = "\u0000"; 195 196 public BackupSystemTable(Connection conn) throws IOException { 197 this.connection = conn; 198 Configuration conf = this.connection.getConfiguration(); 199 tableName = BackupSystemTable.getTableName(conf); 200 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 201 checkSystemTable(); 202 } 203 204 private void checkSystemTable() throws IOException { 205 try (Admin admin = connection.getAdmin()) { 206 verifyNamespaceExists(admin); 207 Configuration conf = connection.getConfiguration(); 208 if (!admin.tableExists(tableName)) { 209 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 210 createSystemTable(admin, backupHTD); 211 } 212 ensureTableEnabled(admin, tableName); 213 if (!admin.tableExists(bulkLoadTableName)) { 214 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 215 createSystemTable(admin, blHTD); 216 } 217 ensureTableEnabled(admin, bulkLoadTableName); 218 waitForSystemTable(admin, tableName); 219 waitForSystemTable(admin, bulkLoadTableName); 220 } 221 } 222 223 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 224 try { 225 admin.createTable(descriptor); 226 } catch (TableExistsException e) { 227 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 228 // so may be subject to race conditions where one caller succeeds in creating the 229 // table and others fail because it now exists 230 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 231 } 232 } 233 234 private void verifyNamespaceExists(Admin admin) throws IOException { 235 String namespaceName = tableName.getNamespaceAsString(); 236 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 237 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 238 boolean exists = false; 239 for (NamespaceDescriptor nsd : list) { 240 if (nsd.getName().equals(ns.getName())) { 241 exists = true; 242 break; 243 } 244 } 245 if (!exists) { 246 try { 247 admin.createNamespace(ns); 248 } catch (NamespaceExistException e) { 249 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 250 // so may be subject to race conditions where one caller succeeds in creating the 251 // namespace and others fail because it now exists 252 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 253 } 254 } 255 } 256 257 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 258 // Return fast if the table is available and avoid a log message 259 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 260 return; 261 } 262 long TIMEOUT = 60000; 263 long startTime = EnvironmentEdgeManager.currentTime(); 264 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 265 tableName); 266 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 267 try { 268 Thread.sleep(100); 269 } catch (InterruptedException e) { 270 throw (IOException) new InterruptedIOException().initCause(e); 271 } 272 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 273 throw new IOException( 274 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 275 } 276 } 277 LOG.debug("Backup table {} exists and available", tableName); 278 } 279 280 @Override 281 public void close() { 282 // do nothing 283 } 284 285 /** 286 * Updates status (state) of a backup session in backup system table table 287 * @param info backup info 288 * @throws IOException exception 289 */ 290 public void updateBackupInfo(BackupInfo info) throws IOException { 291 if (LOG.isTraceEnabled()) { 292 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 293 + " set status=" + info.getState()); 294 } 295 try (Table table = connection.getTable(tableName)) { 296 Put put = createPutForBackupInfo(info); 297 table.put(put); 298 } 299 } 300 301 /* 302 * @param backupId the backup Id 303 * @return Map of rows to path of bulk loaded hfile 304 */ 305 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 306 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 307 try (Table table = connection.getTable(bulkLoadTableName); 308 ResultScanner scanner = table.getScanner(scan)) { 309 Result res = null; 310 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 311 while ((res = scanner.next()) != null) { 312 res.advance(); 313 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 314 for (Cell cell : res.listCells()) { 315 if ( 316 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 317 BackupSystemTable.PATH_COL.length) == 0 318 ) { 319 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 320 } 321 } 322 } 323 return map; 324 } 325 } 326 327 /* 328 * Used during restore 329 * @param backupId the backup Id 330 * @param sTableList List of tables 331 * @return array of Map of family to List of Paths 332 */ 333 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) 334 throws IOException { 335 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 336 @SuppressWarnings("unchecked") 337 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; 338 try (Table table = connection.getTable(bulkLoadTableName); 339 ResultScanner scanner = table.getScanner(scan)) { 340 Result res = null; 341 while ((res = scanner.next()) != null) { 342 res.advance(); 343 TableName tbl = null; 344 byte[] fam = null; 345 String path = null; 346 for (Cell cell : res.listCells()) { 347 if ( 348 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 349 BackupSystemTable.TBL_COL.length) == 0 350 ) { 351 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); 352 } else if ( 353 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 354 BackupSystemTable.FAM_COL.length) == 0 355 ) { 356 fam = CellUtil.cloneValue(cell); 357 } else if ( 358 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 359 BackupSystemTable.PATH_COL.length) == 0 360 ) { 361 path = Bytes.toString(CellUtil.cloneValue(cell)); 362 } 363 } 364 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); 365 if (srcIdx == -1) { 366 // the table is not among the query 367 continue; 368 } 369 if (mapForSrc[srcIdx] == null) { 370 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 371 } 372 List<Path> files; 373 if (!mapForSrc[srcIdx].containsKey(fam)) { 374 files = new ArrayList<Path>(); 375 mapForSrc[srcIdx].put(fam, files); 376 } else { 377 files = mapForSrc[srcIdx].get(fam); 378 } 379 files.add(new Path(path)); 380 if (LOG.isDebugEnabled()) { 381 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); 382 } 383 } 384 385 return mapForSrc; 386 } 387 } 388 389 /** 390 * Deletes backup status from backup system table table 391 * @param backupId backup id 392 * @throws IOException exception 393 */ 394 public void deleteBackupInfo(String backupId) throws IOException { 395 if (LOG.isTraceEnabled()) { 396 LOG.trace("delete backup status in backup system table for " + backupId); 397 } 398 try (Table table = connection.getTable(tableName)) { 399 Delete del = createDeleteForBackupInfo(backupId); 400 table.delete(del); 401 } 402 } 403 404 /* 405 * For postBulkLoadHFile() hook. 406 * @param tabName table name 407 * @param region the region receiving hfile 408 * @param finalPaths family and associated hfiles 409 */ 410 public void writePathsPostBulkLoad(TableName tabName, byte[] region, 411 Map<byte[], List<Path>> finalPaths) throws IOException { 412 if (LOG.isDebugEnabled()) { 413 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() 414 + " entries"); 415 } 416 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 417 List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); 418 bufferedMutator.mutate(puts); 419 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); 420 } 421 } 422 423 /* 424 * For preCommitStoreFile() hook 425 * @param tabName table name 426 * @param region the region receiving hfile 427 * @param family column family 428 * @param pairs list of paths for hfiles 429 */ 430 public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, 431 final List<Pair<Path, Path>> pairs) throws IOException { 432 if (LOG.isDebugEnabled()) { 433 LOG.debug( 434 "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); 435 } 436 try (Table table = connection.getTable(bulkLoadTableName)) { 437 List<Put> puts = 438 BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); 439 table.put(puts); 440 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); 441 } 442 } 443 444 /* 445 * Removes rows recording bulk loaded hfiles from backup table 446 * @param lst list of table names 447 * @param rows the rows to be deleted 448 */ 449 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 450 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 451 List<Delete> lstDels = new ArrayList<>(); 452 for (byte[] row : rows) { 453 Delete del = new Delete(row); 454 lstDels.add(del); 455 LOG.debug("orig deleting the row: " + Bytes.toString(row)); 456 } 457 bufferedMutator.mutate(lstDels); 458 LOG.debug("deleted " + rows.size() + " original bulkload rows"); 459 } 460 } 461 462 /* 463 * Reads the rows from backup table recording bulk loaded hfiles 464 * @param tableList list of table names 465 * @return The keys of the Map are table, region and column family. Value of the map reflects 466 * whether the hfile was recorded by preCommitStoreFile hook (true) 467 */ 468 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> 469 readBulkloadRows(List<TableName> tableList) throws IOException { 470 471 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); 472 List<byte[]> rows = new ArrayList<>(); 473 for (TableName tTable : tableList) { 474 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); 475 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); 476 try (Table table = connection.getTable(bulkLoadTableName); 477 ResultScanner scanner = table.getScanner(scan)) { 478 Result res = null; 479 while ((res = scanner.next()) != null) { 480 res.advance(); 481 String fam = null; 482 String path = null; 483 boolean raw = false; 484 byte[] row; 485 String region = null; 486 for (Cell cell : res.listCells()) { 487 row = CellUtil.cloneRow(cell); 488 rows.add(row); 489 String rowStr = Bytes.toString(row); 490 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 491 if ( 492 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 493 BackupSystemTable.FAM_COL.length) == 0 494 ) { 495 fam = Bytes.toString(CellUtil.cloneValue(cell)); 496 } else if ( 497 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 498 BackupSystemTable.PATH_COL.length) == 0 499 ) { 500 path = Bytes.toString(CellUtil.cloneValue(cell)); 501 } else if ( 502 CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, 503 BackupSystemTable.STATE_COL.length) == 0 504 ) { 505 byte[] state = CellUtil.cloneValue(cell); 506 if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { 507 raw = true; 508 } else { 509 raw = false; 510 } 511 } 512 } 513 if (map.get(tTable) == null) { 514 map.put(tTable, new HashMap<>()); 515 tblMap = map.get(tTable); 516 } 517 if (tblMap.get(region) == null) { 518 tblMap.put(region, new HashMap<>()); 519 } 520 Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); 521 if (famMap.get(fam) == null) { 522 famMap.put(fam, new ArrayList<>()); 523 } 524 famMap.get(fam).add(new Pair<>(path, raw)); 525 LOG.debug("found orig " + path + " for " + fam + " of table " + region); 526 } 527 } 528 } 529 return new Pair<>(map, rows); 530 } 531 532 /* 533 * @param sTableList List of tables 534 * @param maps array of Map of family to List of Paths 535 * @param backupId the backup Id 536 */ 537 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, 538 String backupId) throws IOException { 539 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 540 long ts = EnvironmentEdgeManager.currentTime(); 541 int cnt = 0; 542 List<Put> puts = new ArrayList<>(); 543 for (int idx = 0; idx < maps.length; idx++) { 544 Map<byte[], List<Path>> map = maps[idx]; 545 TableName tn = sTableList.get(idx); 546 547 if (map == null) { 548 continue; 549 } 550 551 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { 552 byte[] fam = entry.getKey(); 553 List<Path> paths = entry.getValue(); 554 for (Path p : paths) { 555 Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, 556 ts, cnt++); 557 puts.add(put); 558 } 559 } 560 } 561 if (!puts.isEmpty()) { 562 bufferedMutator.mutate(puts); 563 } 564 } 565 } 566 567 /** 568 * Reads backup status object (instance of backup info) from backup system table table 569 * @param backupId backup id 570 * @return Current status of backup session or null 571 */ 572 public BackupInfo readBackupInfo(String backupId) throws IOException { 573 if (LOG.isTraceEnabled()) { 574 LOG.trace("read backup status from backup system table for: " + backupId); 575 } 576 577 try (Table table = connection.getTable(tableName)) { 578 Get get = createGetForBackupInfo(backupId); 579 Result res = table.get(get); 580 if (res.isEmpty()) { 581 return null; 582 } 583 return resultToBackupInfo(res); 584 } 585 } 586 587 /** 588 * Read the last backup start code (timestamp) of last successful backup. Will return null if 589 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 590 * there is no successful backup completed so far. 591 * @param backupRoot directory path to backup destination 592 * @return the timestamp of last successful backup 593 * @throws IOException exception 594 */ 595 public String readBackupStartCode(String backupRoot) throws IOException { 596 LOG.trace("read backup start code from backup system table"); 597 598 try (Table table = connection.getTable(tableName)) { 599 Get get = createGetForStartCode(backupRoot); 600 Result res = table.get(get); 601 if (res.isEmpty()) { 602 return null; 603 } 604 Cell cell = res.listCells().get(0); 605 byte[] val = CellUtil.cloneValue(cell); 606 if (val.length == 0) { 607 return null; 608 } 609 return new String(val, StandardCharsets.UTF_8); 610 } 611 } 612 613 /** 614 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 615 * @param startCode start code 616 * @param backupRoot root directory path to backup 617 * @throws IOException exception 618 */ 619 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 620 if (LOG.isTraceEnabled()) { 621 LOG.trace("write backup start code to backup system table " + startCode); 622 } 623 try (Table table = connection.getTable(tableName)) { 624 Put put = createPutForStartCode(startCode.toString(), backupRoot); 625 table.put(put); 626 } 627 } 628 629 /** 630 * Exclusive operations are: create, delete, merge 631 * @throws IOException if a table operation fails or an active backup exclusive operation is 632 * already underway 633 */ 634 public void startBackupExclusiveOperation() throws IOException { 635 LOG.debug("Start new backup exclusive operation"); 636 637 try (Table table = connection.getTable(tableName)) { 638 Put put = createPutForStartBackupSession(); 639 // First try to put if row does not exist 640 if ( 641 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 642 .ifNotExists().thenPut(put) 643 ) { 644 // Row exists, try to put if value == ACTIVE_SESSION_NO 645 if ( 646 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 647 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 648 ) { 649 throw new ExclusiveOperationException(); 650 } 651 } 652 } 653 } 654 655 private Put createPutForStartBackupSession() { 656 Put put = new Put(ACTIVE_SESSION_ROW); 657 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 658 return put; 659 } 660 661 public void finishBackupExclusiveOperation() throws IOException { 662 LOG.debug("Finish backup exclusive operation"); 663 664 try (Table table = connection.getTable(tableName)) { 665 Put put = createPutForStopBackupSession(); 666 if ( 667 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 668 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 669 ) { 670 throw new IOException("There is no active backup exclusive operation"); 671 } 672 } 673 } 674 675 private Put createPutForStopBackupSession() { 676 Put put = new Put(ACTIVE_SESSION_ROW); 677 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 678 return put; 679 } 680 681 /** 682 * Get the Region Servers log information after the last log roll from backup system table. 683 * @param backupRoot root directory path to backup 684 * @return RS log info 685 * @throws IOException exception 686 */ 687 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 688 throws IOException { 689 LOG.trace("read region server last roll log result to backup system table"); 690 691 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 692 693 try (Table table = connection.getTable(tableName); 694 ResultScanner scanner = table.getScanner(scan)) { 695 Result res; 696 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 697 while ((res = scanner.next()) != null) { 698 res.advance(); 699 Cell cell = res.current(); 700 byte[] row = CellUtil.cloneRow(cell); 701 String server = getServerNameForReadRegionServerLastLogRollResult(row); 702 byte[] data = CellUtil.cloneValue(cell); 703 rsTimestampMap.put(server, Bytes.toLong(data)); 704 } 705 return rsTimestampMap; 706 } 707 } 708 709 /** 710 * Writes Region Server last roll log result (timestamp) to backup system table table 711 * @param server Region Server name 712 * @param ts last log timestamp 713 * @param backupRoot root directory path to backup 714 * @throws IOException exception 715 */ 716 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 717 throws IOException { 718 LOG.trace("write region server last roll log result to backup system table"); 719 720 try (Table table = connection.getTable(tableName)) { 721 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 722 table.put(put); 723 } 724 } 725 726 /** 727 * Get all completed backup information (in desc order by time) 728 * @param onlyCompleted true, if only successfully completed sessions 729 * @return history info of BackupCompleteData 730 * @throws IOException exception 731 */ 732 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { 733 LOG.trace("get backup history from backup system table"); 734 735 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; 736 ArrayList<BackupInfo> list = getBackupInfos(state); 737 return BackupUtils.sortHistoryListDesc(list); 738 } 739 740 /** 741 * Get all backups history 742 * @return list of backup info 743 * @throws IOException if getting the backup history fails 744 */ 745 public List<BackupInfo> getBackupHistory() throws IOException { 746 return getBackupHistory(false); 747 } 748 749 /** 750 * Get first n backup history records 751 * @param n number of records, if n== -1 - max number is ignored 752 * @return list of records 753 * @throws IOException if getting the backup history fails 754 */ 755 public List<BackupInfo> getHistory(int n) throws IOException { 756 List<BackupInfo> history = getBackupHistory(); 757 if (n == -1 || history.size() <= n) { 758 return history; 759 } 760 return Collections.unmodifiableList(history.subList(0, n)); 761 } 762 763 /** 764 * Get backup history records filtered by list of filters. 765 * @param n max number of records, if n == -1 , then max number is ignored 766 * @param filters list of filters 767 * @return backup records 768 * @throws IOException if getting the backup history fails 769 */ 770 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { 771 if (filters.length == 0) { 772 return getHistory(n); 773 } 774 775 List<BackupInfo> history = getBackupHistory(); 776 List<BackupInfo> result = new ArrayList<>(); 777 for (BackupInfo bi : history) { 778 if (n >= 0 && result.size() == n) { 779 break; 780 } 781 782 boolean passed = true; 783 for (int i = 0; i < filters.length; i++) { 784 if (!filters[i].apply(bi)) { 785 passed = false; 786 break; 787 } 788 } 789 if (passed) { 790 result.add(bi); 791 } 792 } 793 return result; 794 } 795 796 /* 797 * Retrieve TableName's for completed backup of given type 798 * @param type backup type 799 * @return List of table names 800 */ 801 public List<TableName> getTablesForBackupType(BackupType type) throws IOException { 802 Set<TableName> names = new HashSet<>(); 803 List<BackupInfo> infos = getBackupHistory(true); 804 for (BackupInfo info : infos) { 805 if (info.getType() == type) { 806 names.addAll(info.getTableNames()); 807 } 808 } 809 return new ArrayList<>(names); 810 } 811 812 /** 813 * Get history for backup destination 814 * @param backupRoot backup destination path 815 * @return List of backup info 816 * @throws IOException if getting the backup history fails 817 */ 818 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { 819 ArrayList<BackupInfo> history = getBackupHistory(false); 820 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 821 BackupInfo info = iterator.next(); 822 if (!backupRoot.equals(info.getBackupRootDir())) { 823 iterator.remove(); 824 } 825 } 826 return history; 827 } 828 829 /** 830 * Get history for a table 831 * @param name table name 832 * @return history for a table 833 * @throws IOException if getting the backup history fails 834 */ 835 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { 836 List<BackupInfo> history = getBackupHistory(); 837 List<BackupInfo> tableHistory = new ArrayList<>(); 838 for (BackupInfo info : history) { 839 List<TableName> tables = info.getTableNames(); 840 if (tables.contains(name)) { 841 tableHistory.add(info); 842 } 843 } 844 return tableHistory; 845 } 846 847 /** 848 * Goes through all backup history corresponding to the provided root folder, and collects all 849 * backup info mentioning each of the provided tables. 850 * @param set the tables for which to collect the {@code BackupInfo} 851 * @param backupRoot backup destination path to retrieve backup history for 852 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 853 * least one {@code BackupInfo} 854 * @throws IOException if getting the backup history fails 855 */ 856 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 857 String backupRoot) throws IOException { 858 List<BackupInfo> history = getBackupHistory(backupRoot); 859 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 860 for (BackupInfo info : history) { 861 List<TableName> tables = info.getTableNames(); 862 for (TableName tableName : tables) { 863 if (set.contains(tableName)) { 864 List<BackupInfo> list = 865 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 866 list.add(info); 867 } 868 } 869 } 870 return tableHistoryMap; 871 } 872 873 /** 874 * Get all backup sessions with a given state (in descending order by time) 875 * @param state backup session state 876 * @return history info of backup info objects 877 * @throws IOException exception 878 */ 879 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { 880 LOG.trace("get backup infos from backup system table"); 881 882 Scan scan = createScanForBackupHistory(); 883 ArrayList<BackupInfo> list = new ArrayList<>(); 884 885 try (Table table = connection.getTable(tableName); 886 ResultScanner scanner = table.getScanner(scan)) { 887 Result res; 888 while ((res = scanner.next()) != null) { 889 res.advance(); 890 BackupInfo context = cellToBackupInfo(res.current()); 891 if (state != BackupState.ANY && context.getState() != state) { 892 continue; 893 } 894 list.add(context); 895 } 896 return list; 897 } 898 } 899 900 /** 901 * Write the current timestamps for each regionserver to backup system table after a successful 902 * full or incremental backup. The saved timestamp is of the last log file that was backed up 903 * already. 904 * @param tables tables 905 * @param newTimestamps timestamps 906 * @param backupRoot root directory path to backup 907 * @throws IOException exception 908 */ 909 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 910 String backupRoot) throws IOException { 911 if (LOG.isTraceEnabled()) { 912 LOG.trace("write RS log time stamps to backup system table for tables [" 913 + StringUtils.join(tables, ",") + "]"); 914 } 915 List<Put> puts = new ArrayList<>(); 916 for (TableName table : tables) { 917 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 918 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 919 puts.add(put); 920 } 921 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 922 bufferedMutator.mutate(puts); 923 } 924 } 925 926 /** 927 * Read the timestamp for each region server log after the last successful backup. Each table has 928 * its own set of the timestamps. The info is stored for each table as a concatenated string of 929 * rs->timestapmp 930 * @param backupRoot root directory path to backup 931 * @return the timestamp for each region server. key: tableName value: 932 * RegionServer,PreviousTimeStamp 933 * @throws IOException exception 934 */ 935 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 936 throws IOException { 937 if (LOG.isTraceEnabled()) { 938 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 939 } 940 941 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 942 943 Scan scan = createScanForReadLogTimestampMap(backupRoot); 944 try (Table table = connection.getTable(tableName); 945 ResultScanner scanner = table.getScanner(scan)) { 946 Result res; 947 while ((res = scanner.next()) != null) { 948 res.advance(); 949 Cell cell = res.current(); 950 byte[] row = CellUtil.cloneRow(cell); 951 String tabName = getTableNameForReadLogTimestampMap(row); 952 TableName tn = TableName.valueOf(tabName); 953 byte[] data = CellUtil.cloneValue(cell); 954 if (data == null) { 955 throw new IOException("Data of last backup data from backup system table " 956 + "is empty. Create a backup first."); 957 } 958 if (data != null && data.length > 0) { 959 HashMap<String, Long> lastBackup = 960 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 961 tableTimestampMap.put(tn, lastBackup); 962 } 963 } 964 return tableTimestampMap; 965 } 966 } 967 968 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 969 Map<String, Long> map) { 970 BackupProtos.TableServerTimestamp.Builder tstBuilder = 971 BackupProtos.TableServerTimestamp.newBuilder(); 972 tstBuilder 973 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 974 975 for (Entry<String, Long> entry : map.entrySet()) { 976 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 977 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 978 ServerName sn = ServerName.parseServerName(entry.getKey()); 979 snBuilder.setHostName(sn.getHostname()); 980 snBuilder.setPort(sn.getPort()); 981 builder.setServerName(snBuilder.build()); 982 builder.setTimestamp(entry.getValue()); 983 tstBuilder.addServerTimestamp(builder.build()); 984 } 985 986 return tstBuilder.build(); 987 } 988 989 private HashMap<String, Long> 990 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 991 992 HashMap<String, Long> map = new HashMap<>(); 993 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 994 for (BackupProtos.ServerTimestamp st : list) { 995 ServerName sn = 996 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 997 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 998 } 999 return map; 1000 } 1001 1002 /** 1003 * Return the current tables covered by incremental backup. 1004 * @param backupRoot root directory path to backup 1005 * @return set of tableNames 1006 * @throws IOException exception 1007 */ 1008 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 1009 LOG.trace("get incremental backup table set from backup system table"); 1010 1011 TreeSet<TableName> set = new TreeSet<>(); 1012 1013 try (Table table = connection.getTable(tableName)) { 1014 Get get = createGetForIncrBackupTableSet(backupRoot); 1015 Result res = table.get(get); 1016 if (res.isEmpty()) { 1017 return set; 1018 } 1019 List<Cell> cells = res.listCells(); 1020 for (Cell cell : cells) { 1021 // qualifier = table name - we use table names as qualifiers 1022 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 1023 } 1024 return set; 1025 } 1026 } 1027 1028 /** 1029 * Add tables to global incremental backup set 1030 * @param tables set of tables 1031 * @param backupRoot root directory path to backup 1032 * @throws IOException exception 1033 */ 1034 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 1035 throws IOException { 1036 if (LOG.isTraceEnabled()) { 1037 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 1038 + " tables [" + StringUtils.join(tables, " ") + "]"); 1039 } 1040 if (LOG.isDebugEnabled()) { 1041 tables.forEach(table -> LOG.debug(Objects.toString(table))); 1042 } 1043 try (Table table = connection.getTable(tableName)) { 1044 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 1045 table.put(put); 1046 } 1047 } 1048 1049 /** 1050 * Deletes incremental backup set for a backup destination 1051 * @param backupRoot backup root 1052 */ 1053 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 1054 if (LOG.isTraceEnabled()) { 1055 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 1056 } 1057 try (Table table = connection.getTable(tableName)) { 1058 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 1059 table.delete(delete); 1060 } 1061 } 1062 1063 /** 1064 * Checks if we have at least one backup session in backup system table This API is used by 1065 * BackupLogCleaner 1066 * @return true, if - at least one session exists in backup system table table 1067 * @throws IOException exception 1068 */ 1069 public boolean hasBackupSessions() throws IOException { 1070 LOG.trace("Has backup sessions from backup system table"); 1071 1072 boolean result = false; 1073 Scan scan = createScanForBackupHistory(); 1074 scan.setCaching(1); 1075 try (Table table = connection.getTable(tableName); 1076 ResultScanner scanner = table.getScanner(scan)) { 1077 if (scanner.next() != null) { 1078 result = true; 1079 } 1080 return result; 1081 } 1082 } 1083 1084 /** 1085 * BACKUP SETS 1086 */ 1087 1088 /** 1089 * Get backup set list 1090 * @return backup set list 1091 * @throws IOException if a table or scanner operation fails 1092 */ 1093 public List<String> listBackupSets() throws IOException { 1094 LOG.trace("Backup set list"); 1095 1096 List<String> list = new ArrayList<>(); 1097 try (Table table = connection.getTable(tableName)) { 1098 Scan scan = createScanForBackupSetList(); 1099 scan.readVersions(1); 1100 try (ResultScanner scanner = table.getScanner(scan)) { 1101 Result res; 1102 while ((res = scanner.next()) != null) { 1103 res.advance(); 1104 list.add(cellKeyToBackupSetName(res.current())); 1105 } 1106 return list; 1107 } 1108 } 1109 } 1110 1111 /** 1112 * Get backup set description (list of tables) 1113 * @param name set's name 1114 * @return list of tables in a backup set 1115 * @throws IOException if a table operation fails 1116 */ 1117 public List<TableName> describeBackupSet(String name) throws IOException { 1118 if (LOG.isTraceEnabled()) { 1119 LOG.trace(" Backup set describe: " + name); 1120 } 1121 try (Table table = connection.getTable(tableName)) { 1122 Get get = createGetForBackupSet(name); 1123 Result res = table.get(get); 1124 if (res.isEmpty()) { 1125 return null; 1126 } 1127 res.advance(); 1128 String[] tables = cellValueToBackupSet(res.current()); 1129 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 1130 .collect(Collectors.toList()); 1131 } 1132 } 1133 1134 /** 1135 * Add backup set (list of tables) 1136 * @param name set name 1137 * @param newTables list of tables, comma-separated 1138 * @throws IOException if a table operation fails 1139 */ 1140 public void addToBackupSet(String name, String[] newTables) throws IOException { 1141 if (LOG.isTraceEnabled()) { 1142 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 1143 } 1144 String[] union = null; 1145 try (Table table = connection.getTable(tableName)) { 1146 Get get = createGetForBackupSet(name); 1147 Result res = table.get(get); 1148 if (res.isEmpty()) { 1149 union = newTables; 1150 } else { 1151 res.advance(); 1152 String[] tables = cellValueToBackupSet(res.current()); 1153 union = merge(tables, newTables); 1154 } 1155 Put put = createPutForBackupSet(name, union); 1156 table.put(put); 1157 } 1158 } 1159 1160 /** 1161 * Remove tables from backup set (list of tables) 1162 * @param name set name 1163 * @param toRemove list of tables 1164 * @throws IOException if a table operation or deleting the backup set fails 1165 */ 1166 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 1167 if (LOG.isTraceEnabled()) { 1168 LOG.trace( 1169 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 1170 } 1171 String[] disjoint; 1172 String[] tables; 1173 try (Table table = connection.getTable(tableName)) { 1174 Get get = createGetForBackupSet(name); 1175 Result res = table.get(get); 1176 if (res.isEmpty()) { 1177 LOG.warn("Backup set '" + name + "' not found."); 1178 return; 1179 } else { 1180 res.advance(); 1181 tables = cellValueToBackupSet(res.current()); 1182 disjoint = disjoin(tables, toRemove); 1183 } 1184 if (disjoint.length > 0 && disjoint.length != tables.length) { 1185 Put put = createPutForBackupSet(name, disjoint); 1186 table.put(put); 1187 } else if (disjoint.length == tables.length) { 1188 LOG.warn("Backup set '" + name + "' does not contain tables [" 1189 + StringUtils.join(toRemove, " ") + "]"); 1190 } else { // disjoint.length == 0 and tables.length >0 1191 // Delete backup set 1192 LOG.info("Backup set '" + name + "' is empty. Deleting."); 1193 deleteBackupSet(name); 1194 } 1195 } 1196 } 1197 1198 private String[] merge(String[] existingTables, String[] newTables) { 1199 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1200 tables.addAll(Arrays.asList(newTables)); 1201 return tables.toArray(new String[0]); 1202 } 1203 1204 private String[] disjoin(String[] existingTables, String[] toRemove) { 1205 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1206 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 1207 return tables.toArray(new String[0]); 1208 } 1209 1210 /** 1211 * Delete backup set 1212 * @param name set's name 1213 * @throws IOException if getting or deleting the table fails 1214 */ 1215 public void deleteBackupSet(String name) throws IOException { 1216 if (LOG.isTraceEnabled()) { 1217 LOG.trace(" Backup set delete: " + name); 1218 } 1219 try (Table table = connection.getTable(tableName)) { 1220 Delete del = createDeleteForBackupSet(name); 1221 table.delete(del); 1222 } 1223 } 1224 1225 /** 1226 * Get backup system table descriptor 1227 * @return table's descriptor 1228 */ 1229 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1230 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1231 1232 ColumnFamilyDescriptorBuilder colBuilder = 1233 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1234 1235 colBuilder.setMaxVersions(1); 1236 Configuration config = HBaseConfiguration.create(); 1237 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1238 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1239 colBuilder.setTimeToLive(ttl); 1240 1241 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1242 builder.setColumnFamily(colSessionsDesc); 1243 1244 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1245 colBuilder.setTimeToLive(ttl); 1246 builder.setColumnFamily(colBuilder.build()); 1247 return builder.build(); 1248 } 1249 1250 public static TableName getTableName(Configuration conf) { 1251 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1252 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1253 return TableName.valueOf(name); 1254 } 1255 1256 public static String getTableNameAsString(Configuration conf) { 1257 return getTableName(conf).getNameAsString(); 1258 } 1259 1260 public static String getSnapshotName(Configuration conf) { 1261 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1262 } 1263 1264 /** 1265 * Get backup system table descriptor 1266 * @return table's descriptor 1267 */ 1268 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1269 TableDescriptorBuilder builder = 1270 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1271 1272 ColumnFamilyDescriptorBuilder colBuilder = 1273 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1274 colBuilder.setMaxVersions(1); 1275 Configuration config = HBaseConfiguration.create(); 1276 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1277 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1278 colBuilder.setTimeToLive(ttl); 1279 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1280 builder.setColumnFamily(colSessionsDesc); 1281 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1282 colBuilder.setTimeToLive(ttl); 1283 builder.setColumnFamily(colBuilder.build()); 1284 return builder.build(); 1285 } 1286 1287 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1288 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1289 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1290 return TableName.valueOf(name); 1291 } 1292 1293 /** 1294 * Creates Put operation for a given backup info object 1295 * @param context backup info 1296 * @return put operation 1297 * @throws IOException exception 1298 */ 1299 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1300 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1301 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1302 context.toByteArray()); 1303 return put; 1304 } 1305 1306 /** 1307 * Creates Get operation for a given backup id 1308 * @param backupId backup's ID 1309 * @return get operation 1310 * @throws IOException exception 1311 */ 1312 private Get createGetForBackupInfo(String backupId) throws IOException { 1313 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1314 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1315 get.readVersions(1); 1316 return get; 1317 } 1318 1319 /** 1320 * Creates Delete operation for a given backup id 1321 * @param backupId backup's ID 1322 * @return delete operation 1323 */ 1324 private Delete createDeleteForBackupInfo(String backupId) { 1325 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1326 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1327 return del; 1328 } 1329 1330 /** 1331 * Converts Result to BackupInfo 1332 * @param res HBase result 1333 * @return backup info instance 1334 * @throws IOException exception 1335 */ 1336 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1337 res.advance(); 1338 Cell cell = res.current(); 1339 return cellToBackupInfo(cell); 1340 } 1341 1342 /** 1343 * Creates Get operation to retrieve start code from backup system table 1344 * @return get operation 1345 * @throws IOException exception 1346 */ 1347 private Get createGetForStartCode(String rootPath) throws IOException { 1348 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1349 get.addFamily(BackupSystemTable.META_FAMILY); 1350 get.readVersions(1); 1351 return get; 1352 } 1353 1354 /** 1355 * Creates Put operation to store start code to backup system table 1356 * @return put operation 1357 */ 1358 private Put createPutForStartCode(String startCode, String rootPath) { 1359 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1360 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1361 Bytes.toBytes(startCode)); 1362 return put; 1363 } 1364 1365 /** 1366 * Creates Get to retrieve incremental backup table set from backup system table 1367 * @return get operation 1368 * @throws IOException exception 1369 */ 1370 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1371 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1372 get.addFamily(BackupSystemTable.META_FAMILY); 1373 get.readVersions(1); 1374 return get; 1375 } 1376 1377 /** 1378 * Creates Put to store incremental backup table set 1379 * @param tables tables 1380 * @return put operation 1381 */ 1382 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1383 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1384 for (TableName table : tables) { 1385 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1386 EMPTY_VALUE); 1387 } 1388 return put; 1389 } 1390 1391 /** 1392 * Creates Delete for incremental backup table set 1393 * @param backupRoot backup root 1394 * @return delete operation 1395 */ 1396 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1397 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1398 delete.addFamily(BackupSystemTable.META_FAMILY); 1399 return delete; 1400 } 1401 1402 /** 1403 * Creates Scan operation to load backup history 1404 * @return scan operation 1405 */ 1406 private Scan createScanForBackupHistory() { 1407 Scan scan = new Scan(); 1408 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1409 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1410 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1411 scan.withStartRow(startRow); 1412 scan.withStopRow(stopRow); 1413 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1414 scan.readVersions(1); 1415 return scan; 1416 } 1417 1418 /** 1419 * Converts cell to backup info instance. 1420 * @param current current cell 1421 * @return backup backup info instance 1422 * @throws IOException exception 1423 */ 1424 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1425 byte[] data = CellUtil.cloneValue(current); 1426 return BackupInfo.fromByteArray(data); 1427 } 1428 1429 /** 1430 * Creates Put to write RS last roll log timestamp map 1431 * @param table table 1432 * @param smap map, containing RS:ts 1433 * @return put operation 1434 */ 1435 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1436 String backupRoot) { 1437 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1438 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1439 return put; 1440 } 1441 1442 /** 1443 * Creates Scan to load table-> { RS -> ts} map of maps 1444 * @return scan operation 1445 */ 1446 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1447 Scan scan = new Scan(); 1448 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1449 scan.addFamily(BackupSystemTable.META_FAMILY); 1450 1451 return scan; 1452 } 1453 1454 /** 1455 * Get table name from rowkey 1456 * @param cloneRow rowkey 1457 * @return table name 1458 */ 1459 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1460 String s = Bytes.toString(cloneRow); 1461 int index = s.lastIndexOf(NULL); 1462 return s.substring(index + 1); 1463 } 1464 1465 /** 1466 * Creates Put to store RS last log result 1467 * @param server server name 1468 * @param timestamp log roll result (timestamp) 1469 * @return put operation 1470 */ 1471 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1472 String backupRoot) { 1473 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1474 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1475 Bytes.toBytes(timestamp)); 1476 return put; 1477 } 1478 1479 /** 1480 * Creates Scan operation to load last RS log roll results 1481 * @return scan operation 1482 */ 1483 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1484 Scan scan = new Scan(); 1485 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1486 scan.addFamily(BackupSystemTable.META_FAMILY); 1487 scan.readVersions(1); 1488 1489 return scan; 1490 } 1491 1492 /** 1493 * Get server's name from rowkey 1494 * @param row rowkey 1495 * @return server's name 1496 */ 1497 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1498 String s = Bytes.toString(row); 1499 int index = s.lastIndexOf(NULL); 1500 return s.substring(index + 1); 1501 } 1502 1503 /* 1504 * Creates Put's for bulk load resulting from running LoadIncrementalHFiles 1505 */ 1506 static List<Put> createPutForCommittedBulkload(TableName table, byte[] region, 1507 Map<byte[], List<Path>> finalPaths) { 1508 List<Put> puts = new ArrayList<>(); 1509 for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) { 1510 for (Path path : entry.getValue()) { 1511 String file = path.toString(); 1512 int lastSlash = file.lastIndexOf("/"); 1513 String filename = file.substring(lastSlash + 1); 1514 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1515 Bytes.toString(region), BLK_LD_DELIM, filename)); 1516 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1517 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1518 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1519 put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); 1520 puts.add(put); 1521 LOG 1522 .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); 1523 } 1524 } 1525 return puts; 1526 } 1527 1528 public static void snapshot(Connection conn) throws IOException { 1529 try (Admin admin = conn.getAdmin()) { 1530 Configuration conf = conn.getConfiguration(); 1531 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1532 } 1533 } 1534 1535 public static void restoreFromSnapshot(Connection conn) throws IOException { 1536 Configuration conf = conn.getConfiguration(); 1537 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1538 try (Admin admin = conn.getAdmin()) { 1539 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1540 if (snapshotExists(admin, snapshotName)) { 1541 admin.disableTable(BackupSystemTable.getTableName(conf)); 1542 admin.restoreSnapshot(snapshotName); 1543 admin.enableTable(BackupSystemTable.getTableName(conf)); 1544 LOG.debug("Done restoring backup system table"); 1545 } else { 1546 // Snapshot does not exists, i.e completeBackup failed after 1547 // deleting backup system table snapshot 1548 // In this case we log WARN and proceed 1549 LOG.warn( 1550 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1551 } 1552 } 1553 } 1554 1555 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1556 List<SnapshotDescription> list = admin.listSnapshots(); 1557 for (SnapshotDescription desc : list) { 1558 if (desc.getName().equals(snapshotName)) { 1559 return true; 1560 } 1561 } 1562 return false; 1563 } 1564 1565 public static boolean snapshotExists(Connection conn) throws IOException { 1566 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1567 } 1568 1569 public static void deleteSnapshot(Connection conn) throws IOException { 1570 Configuration conf = conn.getConfiguration(); 1571 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1572 try (Admin admin = conn.getAdmin()) { 1573 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1574 if (snapshotExists(admin, snapshotName)) { 1575 admin.deleteSnapshot(snapshotName); 1576 LOG.debug("Done deleting backup system table snapshot"); 1577 } else { 1578 LOG.error("Snapshot " + snapshotName + " does not exists"); 1579 } 1580 } 1581 } 1582 1583 /* 1584 * Creates Put's for bulk load resulting from running LoadIncrementalHFiles 1585 */ 1586 static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family, 1587 final List<Pair<Path, Path>> pairs) { 1588 List<Put> puts = new ArrayList<>(pairs.size()); 1589 for (Pair<Path, Path> pair : pairs) { 1590 Path path = pair.getSecond(); 1591 String file = path.toString(); 1592 int lastSlash = file.lastIndexOf("/"); 1593 String filename = file.substring(lastSlash + 1); 1594 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1595 Bytes.toString(region), BLK_LD_DELIM, filename)); 1596 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1597 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); 1598 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1599 put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); 1600 puts.add(put); 1601 LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); 1602 } 1603 return puts; 1604 } 1605 1606 public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) { 1607 List<Delete> lstDels = new ArrayList<>(lst.size()); 1608 for (TableName table : lst) { 1609 Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); 1610 del.addFamily(BackupSystemTable.META_FAMILY); 1611 lstDels.add(del); 1612 } 1613 return lstDels; 1614 } 1615 1616 private Put createPutForDeleteOperation(String[] backupIdList) { 1617 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1618 Put put = new Put(DELETE_OP_ROW); 1619 put.addColumn(META_FAMILY, FAM_COL, value); 1620 return put; 1621 } 1622 1623 private Delete createDeleteForBackupDeleteOperation() { 1624 Delete delete = new Delete(DELETE_OP_ROW); 1625 delete.addFamily(META_FAMILY); 1626 return delete; 1627 } 1628 1629 private Get createGetForDeleteOperation() { 1630 Get get = new Get(DELETE_OP_ROW); 1631 get.addFamily(META_FAMILY); 1632 return get; 1633 } 1634 1635 public void startDeleteOperation(String[] backupIdList) throws IOException { 1636 if (LOG.isTraceEnabled()) { 1637 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1638 } 1639 Put put = createPutForDeleteOperation(backupIdList); 1640 try (Table table = connection.getTable(tableName)) { 1641 table.put(put); 1642 } 1643 } 1644 1645 public void finishDeleteOperation() throws IOException { 1646 LOG.trace("Finsih delete operation for backup ids"); 1647 1648 Delete delete = createDeleteForBackupDeleteOperation(); 1649 try (Table table = connection.getTable(tableName)) { 1650 table.delete(delete); 1651 } 1652 } 1653 1654 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1655 LOG.trace("Get delete operation for backup ids"); 1656 1657 Get get = createGetForDeleteOperation(); 1658 try (Table table = connection.getTable(tableName)) { 1659 Result res = table.get(get); 1660 if (res.isEmpty()) { 1661 return null; 1662 } 1663 Cell cell = res.listCells().get(0); 1664 byte[] val = CellUtil.cloneValue(cell); 1665 if (val.length == 0) { 1666 return null; 1667 } 1668 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1669 .toArray(String[]::new); 1670 } 1671 } 1672 1673 private Put createPutForMergeOperation(String[] backupIdList) { 1674 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1675 Put put = new Put(MERGE_OP_ROW); 1676 put.addColumn(META_FAMILY, FAM_COL, value); 1677 return put; 1678 } 1679 1680 public boolean isMergeInProgress() throws IOException { 1681 Get get = new Get(MERGE_OP_ROW); 1682 try (Table table = connection.getTable(tableName)) { 1683 Result res = table.get(get); 1684 return !res.isEmpty(); 1685 } 1686 } 1687 1688 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1689 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1690 Put put = new Put(MERGE_OP_ROW); 1691 put.addColumn(META_FAMILY, PATH_COL, value); 1692 return put; 1693 } 1694 1695 private Delete createDeleteForBackupMergeOperation() { 1696 Delete delete = new Delete(MERGE_OP_ROW); 1697 delete.addFamily(META_FAMILY); 1698 return delete; 1699 } 1700 1701 private Get createGetForMergeOperation() { 1702 Get get = new Get(MERGE_OP_ROW); 1703 get.addFamily(META_FAMILY); 1704 return get; 1705 } 1706 1707 public void startMergeOperation(String[] backupIdList) throws IOException { 1708 if (LOG.isTraceEnabled()) { 1709 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1710 } 1711 Put put = createPutForMergeOperation(backupIdList); 1712 try (Table table = connection.getTable(tableName)) { 1713 table.put(put); 1714 } 1715 } 1716 1717 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1718 if (LOG.isTraceEnabled()) { 1719 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1720 } 1721 Put put = createPutForUpdateTablesForMerge(tables); 1722 try (Table table = connection.getTable(tableName)) { 1723 table.put(put); 1724 } 1725 } 1726 1727 public void finishMergeOperation() throws IOException { 1728 LOG.trace("Finish merge operation for backup ids"); 1729 1730 Delete delete = createDeleteForBackupMergeOperation(); 1731 try (Table table = connection.getTable(tableName)) { 1732 table.delete(delete); 1733 } 1734 } 1735 1736 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1737 LOG.trace("Get backup ids for merge operation"); 1738 1739 Get get = createGetForMergeOperation(); 1740 try (Table table = connection.getTable(tableName)) { 1741 Result res = table.get(get); 1742 if (res.isEmpty()) { 1743 return null; 1744 } 1745 Cell cell = res.listCells().get(0); 1746 byte[] val = CellUtil.cloneValue(cell); 1747 if (val.length == 0) { 1748 return null; 1749 } 1750 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1751 .toArray(String[]::new); 1752 } 1753 } 1754 1755 static Scan createScanForOrigBulkLoadedFiles(TableName table) { 1756 Scan scan = new Scan(); 1757 byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1758 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1759 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1760 scan.withStartRow(startRow); 1761 scan.withStopRow(stopRow); 1762 scan.addFamily(BackupSystemTable.META_FAMILY); 1763 scan.readVersions(1); 1764 return scan; 1765 } 1766 1767 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1768 // format is bulk : namespace : table : region : file 1769 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1770 } 1771 1772 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1773 // format is bulk : namespace : table : region : file 1774 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1775 Iterator<String> i = parts.iterator(); 1776 int idx = 3; 1777 if (parts.size() == 4) { 1778 // the table is in default namespace 1779 idx = 2; 1780 } 1781 String region = Iterators.get(i, idx); 1782 LOG.debug("bulk row string " + rowStr + " region " + region); 1783 return region; 1784 } 1785 1786 /* 1787 * Used to query bulk loaded hfiles which have been copied by incremental backup 1788 * @param backupId the backup Id. It can be null when querying for all tables 1789 * @return the Scan object 1790 */ 1791 static Scan createScanForBulkLoadedFiles(String backupId) { 1792 Scan scan = new Scan(); 1793 byte[] startRow = 1794 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1795 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1796 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1797 scan.withStartRow(startRow); 1798 scan.withStopRow(stopRow); 1799 scan.addFamily(BackupSystemTable.META_FAMILY); 1800 scan.readVersions(1); 1801 return scan; 1802 } 1803 1804 static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, 1805 long ts, int idx) { 1806 Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); 1807 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); 1808 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); 1809 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p)); 1810 return put; 1811 } 1812 1813 /** 1814 * Creates Scan operation to load backup set list 1815 * @return scan operation 1816 */ 1817 private Scan createScanForBackupSetList() { 1818 Scan scan = new Scan(); 1819 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1820 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1821 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1822 scan.withStartRow(startRow); 1823 scan.withStopRow(stopRow); 1824 scan.addFamily(BackupSystemTable.META_FAMILY); 1825 return scan; 1826 } 1827 1828 /** 1829 * Creates Get operation to load backup set content 1830 * @return get operation 1831 */ 1832 private Get createGetForBackupSet(String name) { 1833 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1834 get.addFamily(BackupSystemTable.META_FAMILY); 1835 return get; 1836 } 1837 1838 /** 1839 * Creates Delete operation to delete backup set content 1840 * @param name backup set's name 1841 * @return delete operation 1842 */ 1843 private Delete createDeleteForBackupSet(String name) { 1844 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1845 del.addFamily(BackupSystemTable.META_FAMILY); 1846 return del; 1847 } 1848 1849 /** 1850 * Creates Put operation to update backup set content 1851 * @param name backup set's name 1852 * @param tables list of tables 1853 * @return put operation 1854 */ 1855 private Put createPutForBackupSet(String name, String[] tables) { 1856 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1857 byte[] value = convertToByteArray(tables); 1858 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1859 return put; 1860 } 1861 1862 private byte[] convertToByteArray(String[] tables) { 1863 return Bytes.toBytes(StringUtils.join(tables, ",")); 1864 } 1865 1866 /** 1867 * Converts cell to backup set list. 1868 * @param current current cell 1869 * @return backup set as array of table names 1870 */ 1871 private String[] cellValueToBackupSet(Cell current) { 1872 byte[] data = CellUtil.cloneValue(current); 1873 if (!ArrayUtils.isEmpty(data)) { 1874 return Bytes.toString(data).split(","); 1875 } 1876 return new String[0]; 1877 } 1878 1879 /** 1880 * Converts cell key to backup set name. 1881 * @param current current cell 1882 * @return backup set name 1883 */ 1884 private String cellKeyToBackupSetName(Cell current) { 1885 byte[] data = CellUtil.cloneRow(current); 1886 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1887 } 1888 1889 private static byte[] rowkey(String s, String... other) { 1890 StringBuilder sb = new StringBuilder(s); 1891 for (String ss : other) { 1892 sb.append(ss); 1893 } 1894 return Bytes.toBytes(sb.toString()); 1895 } 1896 1897 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1898 if (!admin.isTableEnabled(tableName)) { 1899 try { 1900 admin.enableTable(tableName); 1901 } catch (TableNotDisabledException ignored) { 1902 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1903 } 1904 } 1905 } 1906}