001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.charset.StandardCharsets; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.Objects; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.ArrayUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.NamespaceDescriptor; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.backup.BackupInfo; 049import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 050import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 051import org.apache.hadoop.hbase.backup.BackupType; 052import org.apache.hadoop.hbase.backup.util.BackupUtils; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.Delete; 058import org.apache.hadoop.hbase.client.Get; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.Result; 061import org.apache.hadoop.hbase.client.ResultScanner; 062import org.apache.hadoop.hbase.client.Scan; 063import org.apache.hadoop.hbase.client.SnapshotDescription; 064import org.apache.hadoop.hbase.client.Table; 065import org.apache.hadoop.hbase.client.TableDescriptor; 066import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 069import org.apache.hadoop.hbase.util.Pair; 070import org.apache.yetus.audience.InterfaceAudience; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 075import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 076 077import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 079 080/** 081 * This class provides API to access backup system table<br> 082 * Backup system table schema:<br> 083 * <p> 084 * <ul> 085 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 086 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 087 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> 088 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 089 * timestamp]</li> 090 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 091 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 092 * name</li> 093 * </ul> 094 * </p> 095 */ 096@InterfaceAudience.Private 097public final class BackupSystemTable implements Closeable { 098 099 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 100 101 static class WALItem { 102 String backupId; 103 String walFile; 104 String backupRoot; 105 106 WALItem(String backupId, String walFile, String backupRoot) { 107 this.backupId = backupId; 108 this.walFile = walFile; 109 this.backupRoot = backupRoot; 110 } 111 112 public String getBackupId() { 113 return backupId; 114 } 115 116 public String getWalFile() { 117 return walFile; 118 } 119 120 public String getBackupRoot() { 121 return backupRoot; 122 } 123 124 @Override 125 public String toString() { 126 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 127 } 128 } 129 130 /** 131 * Backup system table (main) name 132 */ 133 private TableName tableName; 134 135 /** 136 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 137 * separate table because we have to isolate general backup operations: create, merge etc from 138 * activity of RegionObserver, which controls process of a bulk loading 139 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 140 */ 141 private TableName bulkLoadTableName; 142 143 /** 144 * Stores backup sessions (contexts) 145 */ 146 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 147 /** 148 * Stores other meta 149 */ 150 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 151 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 152 /** 153 * Connection to HBase cluster, shared among all instances 154 */ 155 private final Connection connection; 156 157 private final static String BACKUP_INFO_PREFIX = "session:"; 158 private final static String START_CODE_ROW = "startcode:"; 159 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 160 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 161 162 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 163 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 164 165 private final static String INCR_BACKUP_SET = "incrbackupset:"; 166 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 167 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 168 169 private final static String BULK_LOAD_PREFIX = "bulk:"; 170 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 171 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 172 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 173 174 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 175 final static byte[] FAM_COL = Bytes.toBytes("fam"); 176 final static byte[] PATH_COL = Bytes.toBytes("path"); 177 final static byte[] STATE_COL = Bytes.toBytes("state"); 178 // the two states a bulk loaded file can be 179 final static byte[] BL_PREPARE = Bytes.toBytes("R"); 180 final static byte[] BL_COMMIT = Bytes.toBytes("D"); 181 182 private final static String SET_KEY_PREFIX = "backupset:"; 183 184 // separator between BULK_LOAD_PREFIX and ordinals 185 private final static String BLK_LD_DELIM = ":"; 186 private final static byte[] EMPTY_VALUE = new byte[] {}; 187 188 // Safe delimiter in a string 189 private final static String NULL = "\u0000"; 190 191 public BackupSystemTable(Connection conn) throws IOException { 192 this.connection = conn; 193 Configuration conf = this.connection.getConfiguration(); 194 tableName = BackupSystemTable.getTableName(conf); 195 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 196 checkSystemTable(); 197 } 198 199 private void checkSystemTable() throws IOException { 200 try (Admin admin = connection.getAdmin()) { 201 verifyNamespaceExists(admin); 202 Configuration conf = connection.getConfiguration(); 203 if (!admin.tableExists(tableName)) { 204 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 205 admin.createTable(backupHTD); 206 } 207 if (!admin.tableExists(bulkLoadTableName)) { 208 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 209 admin.createTable(blHTD); 210 } 211 waitForSystemTable(admin, tableName); 212 waitForSystemTable(admin, bulkLoadTableName); 213 } 214 } 215 216 private void verifyNamespaceExists(Admin admin) throws IOException { 217 String namespaceName = tableName.getNamespaceAsString(); 218 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 219 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 220 boolean exists = false; 221 for (NamespaceDescriptor nsd : list) { 222 if (nsd.getName().equals(ns.getName())) { 223 exists = true; 224 break; 225 } 226 } 227 if (!exists) { 228 admin.createNamespace(ns); 229 } 230 } 231 232 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 233 // Return fast if the table is available and avoid a log message 234 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 235 return; 236 } 237 long TIMEOUT = 60000; 238 long startTime = EnvironmentEdgeManager.currentTime(); 239 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 240 tableName); 241 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 242 try { 243 Thread.sleep(100); 244 } catch (InterruptedException e) { 245 throw (IOException) new InterruptedIOException().initCause(e); 246 } 247 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 248 throw new IOException( 249 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 250 } 251 } 252 LOG.debug("Backup table {} exists and available", tableName); 253 } 254 255 @Override 256 public void close() { 257 // do nothing 258 } 259 260 /** 261 * Updates status (state) of a backup session in backup system table table 262 * @param info backup info 263 * @throws IOException exception 264 */ 265 public void updateBackupInfo(BackupInfo info) throws IOException { 266 if (LOG.isTraceEnabled()) { 267 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 268 + " set status=" + info.getState()); 269 } 270 try (Table table = connection.getTable(tableName)) { 271 Put put = createPutForBackupInfo(info); 272 table.put(put); 273 } 274 } 275 276 /* 277 * @param backupId the backup Id 278 * @return Map of rows to path of bulk loaded hfile 279 */ 280 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 281 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 282 try (Table table = connection.getTable(bulkLoadTableName); 283 ResultScanner scanner = table.getScanner(scan)) { 284 Result res = null; 285 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 286 while ((res = scanner.next()) != null) { 287 res.advance(); 288 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 289 for (Cell cell : res.listCells()) { 290 if ( 291 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 292 BackupSystemTable.PATH_COL.length) == 0 293 ) { 294 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 295 } 296 } 297 } 298 return map; 299 } 300 } 301 302 /* 303 * Used during restore 304 * @param backupId the backup Id 305 * @param sTableList List of tables 306 * @return array of Map of family to List of Paths 307 */ 308 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) 309 throws IOException { 310 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 311 @SuppressWarnings("unchecked") 312 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; 313 try (Table table = connection.getTable(bulkLoadTableName); 314 ResultScanner scanner = table.getScanner(scan)) { 315 Result res = null; 316 while ((res = scanner.next()) != null) { 317 res.advance(); 318 TableName tbl = null; 319 byte[] fam = null; 320 String path = null; 321 for (Cell cell : res.listCells()) { 322 if ( 323 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 324 BackupSystemTable.TBL_COL.length) == 0 325 ) { 326 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); 327 } else if ( 328 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 329 BackupSystemTable.FAM_COL.length) == 0 330 ) { 331 fam = CellUtil.cloneValue(cell); 332 } else if ( 333 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 334 BackupSystemTable.PATH_COL.length) == 0 335 ) { 336 path = Bytes.toString(CellUtil.cloneValue(cell)); 337 } 338 } 339 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); 340 if (srcIdx == -1) { 341 // the table is not among the query 342 continue; 343 } 344 if (mapForSrc[srcIdx] == null) { 345 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 346 } 347 List<Path> files; 348 if (!mapForSrc[srcIdx].containsKey(fam)) { 349 files = new ArrayList<Path>(); 350 mapForSrc[srcIdx].put(fam, files); 351 } else { 352 files = mapForSrc[srcIdx].get(fam); 353 } 354 files.add(new Path(path)); 355 if (LOG.isDebugEnabled()) { 356 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); 357 } 358 } 359 360 return mapForSrc; 361 } 362 } 363 364 /** 365 * Deletes backup status from backup system table table 366 * @param backupId backup id 367 * @throws IOException exception 368 */ 369 public void deleteBackupInfo(String backupId) throws IOException { 370 if (LOG.isTraceEnabled()) { 371 LOG.trace("delete backup status in backup system table for " + backupId); 372 } 373 try (Table table = connection.getTable(tableName)) { 374 Delete del = createDeleteForBackupInfo(backupId); 375 table.delete(del); 376 } 377 } 378 379 /* 380 * For postBulkLoadHFile() hook. 381 * @param tabName table name 382 * @param region the region receiving hfile 383 * @param finalPaths family and associated hfiles 384 */ 385 public void writePathsPostBulkLoad(TableName tabName, byte[] region, 386 Map<byte[], List<Path>> finalPaths) throws IOException { 387 if (LOG.isDebugEnabled()) { 388 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() 389 + " entries"); 390 } 391 try (Table table = connection.getTable(bulkLoadTableName)) { 392 List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); 393 table.put(puts); 394 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); 395 } 396 } 397 398 /* 399 * For preCommitStoreFile() hook 400 * @param tabName table name 401 * @param region the region receiving hfile 402 * @param family column family 403 * @param pairs list of paths for hfiles 404 */ 405 public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, 406 final List<Pair<Path, Path>> pairs) throws IOException { 407 if (LOG.isDebugEnabled()) { 408 LOG.debug( 409 "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); 410 } 411 try (Table table = connection.getTable(bulkLoadTableName)) { 412 List<Put> puts = 413 BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); 414 table.put(puts); 415 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); 416 } 417 } 418 419 /* 420 * Removes rows recording bulk loaded hfiles from backup table 421 * @param lst list of table names 422 * @param rows the rows to be deleted 423 */ 424 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 425 try (Table table = connection.getTable(bulkLoadTableName)) { 426 List<Delete> lstDels = new ArrayList<>(); 427 for (byte[] row : rows) { 428 Delete del = new Delete(row); 429 lstDels.add(del); 430 LOG.debug("orig deleting the row: " + Bytes.toString(row)); 431 } 432 table.delete(lstDels); 433 LOG.debug("deleted " + rows.size() + " original bulkload rows"); 434 } 435 } 436 437 /* 438 * Reads the rows from backup table recording bulk loaded hfiles 439 * @param tableList list of table names 440 * @return The keys of the Map are table, region and column family. Value of the map reflects 441 * whether the hfile was recorded by preCommitStoreFile hook (true) 442 */ 443 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> 444 readBulkloadRows(List<TableName> tableList) throws IOException { 445 446 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); 447 List<byte[]> rows = new ArrayList<>(); 448 for (TableName tTable : tableList) { 449 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); 450 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); 451 try (Table table = connection.getTable(bulkLoadTableName); 452 ResultScanner scanner = table.getScanner(scan)) { 453 Result res = null; 454 while ((res = scanner.next()) != null) { 455 res.advance(); 456 String fam = null; 457 String path = null; 458 boolean raw = false; 459 byte[] row; 460 String region = null; 461 for (Cell cell : res.listCells()) { 462 row = CellUtil.cloneRow(cell); 463 rows.add(row); 464 String rowStr = Bytes.toString(row); 465 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 466 if ( 467 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 468 BackupSystemTable.FAM_COL.length) == 0 469 ) { 470 fam = Bytes.toString(CellUtil.cloneValue(cell)); 471 } else if ( 472 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 473 BackupSystemTable.PATH_COL.length) == 0 474 ) { 475 path = Bytes.toString(CellUtil.cloneValue(cell)); 476 } else if ( 477 CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, 478 BackupSystemTable.STATE_COL.length) == 0 479 ) { 480 byte[] state = CellUtil.cloneValue(cell); 481 if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { 482 raw = true; 483 } else { 484 raw = false; 485 } 486 } 487 } 488 if (map.get(tTable) == null) { 489 map.put(tTable, new HashMap<>()); 490 tblMap = map.get(tTable); 491 } 492 if (tblMap.get(region) == null) { 493 tblMap.put(region, new HashMap<>()); 494 } 495 Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); 496 if (famMap.get(fam) == null) { 497 famMap.put(fam, new ArrayList<>()); 498 } 499 famMap.get(fam).add(new Pair<>(path, raw)); 500 LOG.debug("found orig " + path + " for " + fam + " of table " + region); 501 } 502 } 503 } 504 return new Pair<>(map, rows); 505 } 506 507 /* 508 * @param sTableList List of tables 509 * @param maps array of Map of family to List of Paths 510 * @param backupId the backup Id 511 */ 512 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, 513 String backupId) throws IOException { 514 try (Table table = connection.getTable(bulkLoadTableName)) { 515 long ts = EnvironmentEdgeManager.currentTime(); 516 int cnt = 0; 517 List<Put> puts = new ArrayList<>(); 518 for (int idx = 0; idx < maps.length; idx++) { 519 Map<byte[], List<Path>> map = maps[idx]; 520 TableName tn = sTableList.get(idx); 521 522 if (map == null) { 523 continue; 524 } 525 526 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { 527 byte[] fam = entry.getKey(); 528 List<Path> paths = entry.getValue(); 529 for (Path p : paths) { 530 Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, 531 ts, cnt++); 532 puts.add(put); 533 } 534 } 535 } 536 if (!puts.isEmpty()) { 537 table.put(puts); 538 } 539 } 540 } 541 542 /** 543 * Reads backup status object (instance of backup info) from backup system table table 544 * @param backupId backup id 545 * @return Current status of backup session or null 546 */ 547 public BackupInfo readBackupInfo(String backupId) throws IOException { 548 if (LOG.isTraceEnabled()) { 549 LOG.trace("read backup status from backup system table for: " + backupId); 550 } 551 552 try (Table table = connection.getTable(tableName)) { 553 Get get = createGetForBackupInfo(backupId); 554 Result res = table.get(get); 555 if (res.isEmpty()) { 556 return null; 557 } 558 return resultToBackupInfo(res); 559 } 560 } 561 562 /** 563 * Read the last backup start code (timestamp) of last successful backup. Will return null if 564 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 565 * there is no successful backup completed so far. 566 * @param backupRoot directory path to backup destination 567 * @return the timestamp of last successful backup 568 * @throws IOException exception 569 */ 570 public String readBackupStartCode(String backupRoot) throws IOException { 571 LOG.trace("read backup start code from backup system table"); 572 573 try (Table table = connection.getTable(tableName)) { 574 Get get = createGetForStartCode(backupRoot); 575 Result res = table.get(get); 576 if (res.isEmpty()) { 577 return null; 578 } 579 Cell cell = res.listCells().get(0); 580 byte[] val = CellUtil.cloneValue(cell); 581 if (val.length == 0) { 582 return null; 583 } 584 return new String(val, StandardCharsets.UTF_8); 585 } 586 } 587 588 /** 589 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 590 * @param startCode start code 591 * @param backupRoot root directory path to backup 592 * @throws IOException exception 593 */ 594 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 595 if (LOG.isTraceEnabled()) { 596 LOG.trace("write backup start code to backup system table " + startCode); 597 } 598 try (Table table = connection.getTable(tableName)) { 599 Put put = createPutForStartCode(startCode.toString(), backupRoot); 600 table.put(put); 601 } 602 } 603 604 /** 605 * Exclusive operations are: create, delete, merge 606 * @throws IOException if a table operation fails or an active backup exclusive operation is 607 * already underway 608 */ 609 public void startBackupExclusiveOperation() throws IOException { 610 LOG.debug("Start new backup exclusive operation"); 611 612 try (Table table = connection.getTable(tableName)) { 613 Put put = createPutForStartBackupSession(); 614 // First try to put if row does not exist 615 if ( 616 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 617 .ifNotExists().thenPut(put) 618 ) { 619 // Row exists, try to put if value == ACTIVE_SESSION_NO 620 if ( 621 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 622 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 623 ) { 624 throw new ExclusiveOperationException(); 625 } 626 } 627 } 628 } 629 630 private Put createPutForStartBackupSession() { 631 Put put = new Put(ACTIVE_SESSION_ROW); 632 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 633 return put; 634 } 635 636 public void finishBackupExclusiveOperation() throws IOException { 637 LOG.debug("Finish backup exclusive operation"); 638 639 try (Table table = connection.getTable(tableName)) { 640 Put put = createPutForStopBackupSession(); 641 if ( 642 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 643 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 644 ) { 645 throw new IOException("There is no active backup exclusive operation"); 646 } 647 } 648 } 649 650 private Put createPutForStopBackupSession() { 651 Put put = new Put(ACTIVE_SESSION_ROW); 652 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 653 return put; 654 } 655 656 /** 657 * Get the Region Servers log information after the last log roll from backup system table. 658 * @param backupRoot root directory path to backup 659 * @return RS log info 660 * @throws IOException exception 661 */ 662 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 663 throws IOException { 664 LOG.trace("read region server last roll log result to backup system table"); 665 666 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 667 668 try (Table table = connection.getTable(tableName); 669 ResultScanner scanner = table.getScanner(scan)) { 670 Result res; 671 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 672 while ((res = scanner.next()) != null) { 673 res.advance(); 674 Cell cell = res.current(); 675 byte[] row = CellUtil.cloneRow(cell); 676 String server = getServerNameForReadRegionServerLastLogRollResult(row); 677 byte[] data = CellUtil.cloneValue(cell); 678 rsTimestampMap.put(server, Bytes.toLong(data)); 679 } 680 return rsTimestampMap; 681 } 682 } 683 684 /** 685 * Writes Region Server last roll log result (timestamp) to backup system table table 686 * @param server Region Server name 687 * @param ts last log timestamp 688 * @param backupRoot root directory path to backup 689 * @throws IOException exception 690 */ 691 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 692 throws IOException { 693 LOG.trace("write region server last roll log result to backup system table"); 694 695 try (Table table = connection.getTable(tableName)) { 696 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 697 table.put(put); 698 } 699 } 700 701 /** 702 * Get all completed backup information (in desc order by time) 703 * @param onlyCompleted true, if only successfully completed sessions 704 * @return history info of BackupCompleteData 705 * @throws IOException exception 706 */ 707 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { 708 LOG.trace("get backup history from backup system table"); 709 710 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; 711 ArrayList<BackupInfo> list = getBackupInfos(state); 712 return BackupUtils.sortHistoryListDesc(list); 713 } 714 715 /** 716 * Get all backups history 717 * @return list of backup info 718 * @throws IOException if getting the backup history fails 719 */ 720 public List<BackupInfo> getBackupHistory() throws IOException { 721 return getBackupHistory(false); 722 } 723 724 /** 725 * Get first n backup history records 726 * @param n number of records, if n== -1 - max number is ignored 727 * @return list of records 728 * @throws IOException if getting the backup history fails 729 */ 730 public List<BackupInfo> getHistory(int n) throws IOException { 731 List<BackupInfo> history = getBackupHistory(); 732 if (n == -1 || history.size() <= n) { 733 return history; 734 } 735 return Collections.unmodifiableList(history.subList(0, n)); 736 } 737 738 /** 739 * Get backup history records filtered by list of filters. 740 * @param n max number of records, if n == -1 , then max number is ignored 741 * @param filters list of filters 742 * @return backup records 743 * @throws IOException if getting the backup history fails 744 */ 745 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { 746 if (filters.length == 0) { 747 return getHistory(n); 748 } 749 750 List<BackupInfo> history = getBackupHistory(); 751 List<BackupInfo> result = new ArrayList<>(); 752 for (BackupInfo bi : history) { 753 if (n >= 0 && result.size() == n) { 754 break; 755 } 756 757 boolean passed = true; 758 for (int i = 0; i < filters.length; i++) { 759 if (!filters[i].apply(bi)) { 760 passed = false; 761 break; 762 } 763 } 764 if (passed) { 765 result.add(bi); 766 } 767 } 768 return result; 769 } 770 771 /* 772 * Retrieve TableName's for completed backup of given type 773 * @param type backup type 774 * @return List of table names 775 */ 776 public List<TableName> getTablesForBackupType(BackupType type) throws IOException { 777 Set<TableName> names = new HashSet<>(); 778 List<BackupInfo> infos = getBackupHistory(true); 779 for (BackupInfo info : infos) { 780 if (info.getType() == type) { 781 names.addAll(info.getTableNames()); 782 } 783 } 784 return new ArrayList<>(names); 785 } 786 787 /** 788 * Get history for backup destination 789 * @param backupRoot backup destination path 790 * @return List of backup info 791 * @throws IOException if getting the backup history fails 792 */ 793 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { 794 ArrayList<BackupInfo> history = getBackupHistory(false); 795 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 796 BackupInfo info = iterator.next(); 797 if (!backupRoot.equals(info.getBackupRootDir())) { 798 iterator.remove(); 799 } 800 } 801 return history; 802 } 803 804 /** 805 * Get history for a table 806 * @param name table name 807 * @return history for a table 808 * @throws IOException if getting the backup history fails 809 */ 810 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { 811 List<BackupInfo> history = getBackupHistory(); 812 List<BackupInfo> tableHistory = new ArrayList<>(); 813 for (BackupInfo info : history) { 814 List<TableName> tables = info.getTableNames(); 815 if (tables.contains(name)) { 816 tableHistory.add(info); 817 } 818 } 819 return tableHistory; 820 } 821 822 public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 823 String backupRoot) throws IOException { 824 List<BackupInfo> history = getBackupHistory(backupRoot); 825 Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = new HashMap<>(); 826 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 827 BackupInfo info = iterator.next(); 828 if (!backupRoot.equals(info.getBackupRootDir())) { 829 continue; 830 } 831 List<TableName> tables = info.getTableNames(); 832 for (TableName tableName : tables) { 833 if (set.contains(tableName)) { 834 ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); 835 if (list == null) { 836 list = new ArrayList<>(); 837 tableHistoryMap.put(tableName, list); 838 } 839 list.add(info); 840 } 841 } 842 } 843 return tableHistoryMap; 844 } 845 846 /** 847 * Get all backup sessions with a given state (in descending order by time) 848 * @param state backup session state 849 * @return history info of backup info objects 850 * @throws IOException exception 851 */ 852 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { 853 LOG.trace("get backup infos from backup system table"); 854 855 Scan scan = createScanForBackupHistory(); 856 ArrayList<BackupInfo> list = new ArrayList<>(); 857 858 try (Table table = connection.getTable(tableName); 859 ResultScanner scanner = table.getScanner(scan)) { 860 Result res; 861 while ((res = scanner.next()) != null) { 862 res.advance(); 863 BackupInfo context = cellToBackupInfo(res.current()); 864 if (state != BackupState.ANY && context.getState() != state) { 865 continue; 866 } 867 list.add(context); 868 } 869 return list; 870 } 871 } 872 873 /** 874 * Write the current timestamps for each regionserver to backup system table after a successful 875 * full or incremental backup. The saved timestamp is of the last log file that was backed up 876 * already. 877 * @param tables tables 878 * @param newTimestamps timestamps 879 * @param backupRoot root directory path to backup 880 * @throws IOException exception 881 */ 882 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 883 String backupRoot) throws IOException { 884 if (LOG.isTraceEnabled()) { 885 LOG.trace("write RS log time stamps to backup system table for tables [" 886 + StringUtils.join(tables, ",") + "]"); 887 } 888 List<Put> puts = new ArrayList<>(); 889 for (TableName table : tables) { 890 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 891 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 892 puts.add(put); 893 } 894 try (Table table = connection.getTable(tableName)) { 895 table.put(puts); 896 } 897 } 898 899 /** 900 * Read the timestamp for each region server log after the last successful backup. Each table has 901 * its own set of the timestamps. The info is stored for each table as a concatenated string of 902 * rs->timestapmp 903 * @param backupRoot root directory path to backup 904 * @return the timestamp for each region server. key: tableName value: 905 * RegionServer,PreviousTimeStamp 906 * @throws IOException exception 907 */ 908 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 909 throws IOException { 910 if (LOG.isTraceEnabled()) { 911 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 912 } 913 914 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 915 916 Scan scan = createScanForReadLogTimestampMap(backupRoot); 917 try (Table table = connection.getTable(tableName); 918 ResultScanner scanner = table.getScanner(scan)) { 919 Result res; 920 while ((res = scanner.next()) != null) { 921 res.advance(); 922 Cell cell = res.current(); 923 byte[] row = CellUtil.cloneRow(cell); 924 String tabName = getTableNameForReadLogTimestampMap(row); 925 TableName tn = TableName.valueOf(tabName); 926 byte[] data = CellUtil.cloneValue(cell); 927 if (data == null) { 928 throw new IOException("Data of last backup data from backup system table " 929 + "is empty. Create a backup first."); 930 } 931 if (data != null && data.length > 0) { 932 HashMap<String, Long> lastBackup = 933 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 934 tableTimestampMap.put(tn, lastBackup); 935 } 936 } 937 return tableTimestampMap; 938 } 939 } 940 941 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 942 Map<String, Long> map) { 943 BackupProtos.TableServerTimestamp.Builder tstBuilder = 944 BackupProtos.TableServerTimestamp.newBuilder(); 945 tstBuilder 946 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 947 948 for (Entry<String, Long> entry : map.entrySet()) { 949 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 950 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 951 ServerName sn = ServerName.parseServerName(entry.getKey()); 952 snBuilder.setHostName(sn.getHostname()); 953 snBuilder.setPort(sn.getPort()); 954 builder.setServerName(snBuilder.build()); 955 builder.setTimestamp(entry.getValue()); 956 tstBuilder.addServerTimestamp(builder.build()); 957 } 958 959 return tstBuilder.build(); 960 } 961 962 private HashMap<String, Long> 963 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 964 965 HashMap<String, Long> map = new HashMap<>(); 966 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 967 for (BackupProtos.ServerTimestamp st : list) { 968 ServerName sn = 969 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 970 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 971 } 972 return map; 973 } 974 975 /** 976 * Return the current tables covered by incremental backup. 977 * @param backupRoot root directory path to backup 978 * @return set of tableNames 979 * @throws IOException exception 980 */ 981 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 982 LOG.trace("get incremental backup table set from backup system table"); 983 984 TreeSet<TableName> set = new TreeSet<>(); 985 986 try (Table table = connection.getTable(tableName)) { 987 Get get = createGetForIncrBackupTableSet(backupRoot); 988 Result res = table.get(get); 989 if (res.isEmpty()) { 990 return set; 991 } 992 List<Cell> cells = res.listCells(); 993 for (Cell cell : cells) { 994 // qualifier = table name - we use table names as qualifiers 995 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 996 } 997 return set; 998 } 999 } 1000 1001 /** 1002 * Add tables to global incremental backup set 1003 * @param tables set of tables 1004 * @param backupRoot root directory path to backup 1005 * @throws IOException exception 1006 */ 1007 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 1008 throws IOException { 1009 if (LOG.isTraceEnabled()) { 1010 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 1011 + " tables [" + StringUtils.join(tables, " ") + "]"); 1012 } 1013 if (LOG.isDebugEnabled()) { 1014 tables.forEach(table -> LOG.debug(Objects.toString(table))); 1015 } 1016 try (Table table = connection.getTable(tableName)) { 1017 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 1018 table.put(put); 1019 } 1020 } 1021 1022 /** 1023 * Deletes incremental backup set for a backup destination 1024 * @param backupRoot backup root 1025 */ 1026 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 1027 if (LOG.isTraceEnabled()) { 1028 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 1029 } 1030 try (Table table = connection.getTable(tableName)) { 1031 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 1032 table.delete(delete); 1033 } 1034 } 1035 1036 /** 1037 * Checks if we have at least one backup session in backup system table This API is used by 1038 * BackupLogCleaner 1039 * @return true, if - at least one session exists in backup system table table 1040 * @throws IOException exception 1041 */ 1042 public boolean hasBackupSessions() throws IOException { 1043 LOG.trace("Has backup sessions from backup system table"); 1044 1045 boolean result = false; 1046 Scan scan = createScanForBackupHistory(); 1047 scan.setCaching(1); 1048 try (Table table = connection.getTable(tableName); 1049 ResultScanner scanner = table.getScanner(scan)) { 1050 if (scanner.next() != null) { 1051 result = true; 1052 } 1053 return result; 1054 } 1055 } 1056 1057 /** 1058 * BACKUP SETS 1059 */ 1060 1061 /** 1062 * Get backup set list 1063 * @return backup set list 1064 * @throws IOException if a table or scanner operation fails 1065 */ 1066 public List<String> listBackupSets() throws IOException { 1067 LOG.trace("Backup set list"); 1068 1069 List<String> list = new ArrayList<>(); 1070 try (Table table = connection.getTable(tableName)) { 1071 Scan scan = createScanForBackupSetList(); 1072 scan.readVersions(1); 1073 try (ResultScanner scanner = table.getScanner(scan)) { 1074 Result res; 1075 while ((res = scanner.next()) != null) { 1076 res.advance(); 1077 list.add(cellKeyToBackupSetName(res.current())); 1078 } 1079 return list; 1080 } 1081 } 1082 } 1083 1084 /** 1085 * Get backup set description (list of tables) 1086 * @param name set's name 1087 * @return list of tables in a backup set 1088 * @throws IOException if a table operation fails 1089 */ 1090 public List<TableName> describeBackupSet(String name) throws IOException { 1091 if (LOG.isTraceEnabled()) { 1092 LOG.trace(" Backup set describe: " + name); 1093 } 1094 try (Table table = connection.getTable(tableName)) { 1095 Get get = createGetForBackupSet(name); 1096 Result res = table.get(get); 1097 if (res.isEmpty()) { 1098 return null; 1099 } 1100 res.advance(); 1101 String[] tables = cellValueToBackupSet(res.current()); 1102 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 1103 .collect(Collectors.toList()); 1104 } 1105 } 1106 1107 /** 1108 * Add backup set (list of tables) 1109 * @param name set name 1110 * @param newTables list of tables, comma-separated 1111 * @throws IOException if a table operation fails 1112 */ 1113 public void addToBackupSet(String name, String[] newTables) throws IOException { 1114 if (LOG.isTraceEnabled()) { 1115 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 1116 } 1117 String[] union = null; 1118 try (Table table = connection.getTable(tableName)) { 1119 Get get = createGetForBackupSet(name); 1120 Result res = table.get(get); 1121 if (res.isEmpty()) { 1122 union = newTables; 1123 } else { 1124 res.advance(); 1125 String[] tables = cellValueToBackupSet(res.current()); 1126 union = merge(tables, newTables); 1127 } 1128 Put put = createPutForBackupSet(name, union); 1129 table.put(put); 1130 } 1131 } 1132 1133 /** 1134 * Remove tables from backup set (list of tables) 1135 * @param name set name 1136 * @param toRemove list of tables 1137 * @throws IOException if a table operation or deleting the backup set fails 1138 */ 1139 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 1140 if (LOG.isTraceEnabled()) { 1141 LOG.trace( 1142 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 1143 } 1144 String[] disjoint; 1145 String[] tables; 1146 try (Table table = connection.getTable(tableName)) { 1147 Get get = createGetForBackupSet(name); 1148 Result res = table.get(get); 1149 if (res.isEmpty()) { 1150 LOG.warn("Backup set '" + name + "' not found."); 1151 return; 1152 } else { 1153 res.advance(); 1154 tables = cellValueToBackupSet(res.current()); 1155 disjoint = disjoin(tables, toRemove); 1156 } 1157 if (disjoint.length > 0 && disjoint.length != tables.length) { 1158 Put put = createPutForBackupSet(name, disjoint); 1159 table.put(put); 1160 } else if (disjoint.length == tables.length) { 1161 LOG.warn("Backup set '" + name + "' does not contain tables [" 1162 + StringUtils.join(toRemove, " ") + "]"); 1163 } else { // disjoint.length == 0 and tables.length >0 1164 // Delete backup set 1165 LOG.info("Backup set '" + name + "' is empty. Deleting."); 1166 deleteBackupSet(name); 1167 } 1168 } 1169 } 1170 1171 private String[] merge(String[] existingTables, String[] newTables) { 1172 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1173 tables.addAll(Arrays.asList(newTables)); 1174 return tables.toArray(new String[0]); 1175 } 1176 1177 private String[] disjoin(String[] existingTables, String[] toRemove) { 1178 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1179 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 1180 return tables.toArray(new String[0]); 1181 } 1182 1183 /** 1184 * Delete backup set 1185 * @param name set's name 1186 * @throws IOException if getting or deleting the table fails 1187 */ 1188 public void deleteBackupSet(String name) throws IOException { 1189 if (LOG.isTraceEnabled()) { 1190 LOG.trace(" Backup set delete: " + name); 1191 } 1192 try (Table table = connection.getTable(tableName)) { 1193 Delete del = createDeleteForBackupSet(name); 1194 table.delete(del); 1195 } 1196 } 1197 1198 /** 1199 * Get backup system table descriptor 1200 * @return table's descriptor 1201 */ 1202 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1203 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1204 1205 ColumnFamilyDescriptorBuilder colBuilder = 1206 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1207 1208 colBuilder.setMaxVersions(1); 1209 Configuration config = HBaseConfiguration.create(); 1210 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1211 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1212 colBuilder.setTimeToLive(ttl); 1213 1214 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1215 builder.setColumnFamily(colSessionsDesc); 1216 1217 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1218 colBuilder.setTimeToLive(ttl); 1219 builder.setColumnFamily(colBuilder.build()); 1220 return builder.build(); 1221 } 1222 1223 public static TableName getTableName(Configuration conf) { 1224 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1225 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1226 return TableName.valueOf(name); 1227 } 1228 1229 public static String getTableNameAsString(Configuration conf) { 1230 return getTableName(conf).getNameAsString(); 1231 } 1232 1233 public static String getSnapshotName(Configuration conf) { 1234 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1235 } 1236 1237 /** 1238 * Get backup system table descriptor 1239 * @return table's descriptor 1240 */ 1241 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1242 TableDescriptorBuilder builder = 1243 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1244 1245 ColumnFamilyDescriptorBuilder colBuilder = 1246 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1247 colBuilder.setMaxVersions(1); 1248 Configuration config = HBaseConfiguration.create(); 1249 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1250 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1251 colBuilder.setTimeToLive(ttl); 1252 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1253 builder.setColumnFamily(colSessionsDesc); 1254 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1255 colBuilder.setTimeToLive(ttl); 1256 builder.setColumnFamily(colBuilder.build()); 1257 return builder.build(); 1258 } 1259 1260 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1261 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1262 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1263 return TableName.valueOf(name); 1264 } 1265 1266 /** 1267 * Creates Put operation for a given backup info object 1268 * @param context backup info 1269 * @return put operation 1270 * @throws IOException exception 1271 */ 1272 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1273 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1274 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1275 context.toByteArray()); 1276 return put; 1277 } 1278 1279 /** 1280 * Creates Get operation for a given backup id 1281 * @param backupId backup's ID 1282 * @return get operation 1283 * @throws IOException exception 1284 */ 1285 private Get createGetForBackupInfo(String backupId) throws IOException { 1286 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1287 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1288 get.readVersions(1); 1289 return get; 1290 } 1291 1292 /** 1293 * Creates Delete operation for a given backup id 1294 * @param backupId backup's ID 1295 * @return delete operation 1296 */ 1297 private Delete createDeleteForBackupInfo(String backupId) { 1298 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1299 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1300 return del; 1301 } 1302 1303 /** 1304 * Converts Result to BackupInfo 1305 * @param res HBase result 1306 * @return backup info instance 1307 * @throws IOException exception 1308 */ 1309 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1310 res.advance(); 1311 Cell cell = res.current(); 1312 return cellToBackupInfo(cell); 1313 } 1314 1315 /** 1316 * Creates Get operation to retrieve start code from backup system table 1317 * @return get operation 1318 * @throws IOException exception 1319 */ 1320 private Get createGetForStartCode(String rootPath) throws IOException { 1321 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1322 get.addFamily(BackupSystemTable.META_FAMILY); 1323 get.readVersions(1); 1324 return get; 1325 } 1326 1327 /** 1328 * Creates Put operation to store start code to backup system table 1329 * @return put operation 1330 */ 1331 private Put createPutForStartCode(String startCode, String rootPath) { 1332 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1333 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1334 Bytes.toBytes(startCode)); 1335 return put; 1336 } 1337 1338 /** 1339 * Creates Get to retrieve incremental backup table set from backup system table 1340 * @return get operation 1341 * @throws IOException exception 1342 */ 1343 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1344 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1345 get.addFamily(BackupSystemTable.META_FAMILY); 1346 get.readVersions(1); 1347 return get; 1348 } 1349 1350 /** 1351 * Creates Put to store incremental backup table set 1352 * @param tables tables 1353 * @return put operation 1354 */ 1355 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1356 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1357 for (TableName table : tables) { 1358 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1359 EMPTY_VALUE); 1360 } 1361 return put; 1362 } 1363 1364 /** 1365 * Creates Delete for incremental backup table set 1366 * @param backupRoot backup root 1367 * @return delete operation 1368 */ 1369 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1370 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1371 delete.addFamily(BackupSystemTable.META_FAMILY); 1372 return delete; 1373 } 1374 1375 /** 1376 * Creates Scan operation to load backup history 1377 * @return scan operation 1378 */ 1379 private Scan createScanForBackupHistory() { 1380 Scan scan = new Scan(); 1381 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1382 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1383 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1384 scan.withStartRow(startRow); 1385 scan.withStopRow(stopRow); 1386 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1387 scan.readVersions(1); 1388 return scan; 1389 } 1390 1391 /** 1392 * Converts cell to backup info instance. 1393 * @param current current cell 1394 * @return backup backup info instance 1395 * @throws IOException exception 1396 */ 1397 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1398 byte[] data = CellUtil.cloneValue(current); 1399 return BackupInfo.fromByteArray(data); 1400 } 1401 1402 /** 1403 * Creates Put to write RS last roll log timestamp map 1404 * @param table table 1405 * @param smap map, containing RS:ts 1406 * @return put operation 1407 */ 1408 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1409 String backupRoot) { 1410 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1411 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1412 return put; 1413 } 1414 1415 /** 1416 * Creates Scan to load table-> { RS -> ts} map of maps 1417 * @return scan operation 1418 */ 1419 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1420 Scan scan = new Scan(); 1421 byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot); 1422 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1423 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1424 scan.withStartRow(startRow); 1425 scan.withStopRow(stopRow); 1426 scan.addFamily(BackupSystemTable.META_FAMILY); 1427 1428 return scan; 1429 } 1430 1431 /** 1432 * Get table name from rowkey 1433 * @param cloneRow rowkey 1434 * @return table name 1435 */ 1436 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1437 String s = Bytes.toString(cloneRow); 1438 int index = s.lastIndexOf(NULL); 1439 return s.substring(index + 1); 1440 } 1441 1442 /** 1443 * Creates Put to store RS last log result 1444 * @param server server name 1445 * @param timestamp log roll result (timestamp) 1446 * @return put operation 1447 */ 1448 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1449 String backupRoot) { 1450 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1451 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1452 Bytes.toBytes(timestamp)); 1453 return put; 1454 } 1455 1456 /** 1457 * Creates Scan operation to load last RS log roll results 1458 * @return scan operation 1459 */ 1460 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1461 Scan scan = new Scan(); 1462 byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot); 1463 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1464 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1465 scan.withStartRow(startRow); 1466 scan.withStopRow(stopRow); 1467 scan.addFamily(BackupSystemTable.META_FAMILY); 1468 scan.readVersions(1); 1469 1470 return scan; 1471 } 1472 1473 /** 1474 * Get server's name from rowkey 1475 * @param row rowkey 1476 * @return server's name 1477 */ 1478 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1479 String s = Bytes.toString(row); 1480 int index = s.lastIndexOf(NULL); 1481 return s.substring(index + 1); 1482 } 1483 1484 /* 1485 * Creates Put's for bulk load resulting from running LoadIncrementalHFiles 1486 */ 1487 static List<Put> createPutForCommittedBulkload(TableName table, byte[] region, 1488 Map<byte[], List<Path>> finalPaths) { 1489 List<Put> puts = new ArrayList<>(); 1490 for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) { 1491 for (Path path : entry.getValue()) { 1492 String file = path.toString(); 1493 int lastSlash = file.lastIndexOf("/"); 1494 String filename = file.substring(lastSlash + 1); 1495 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1496 Bytes.toString(region), BLK_LD_DELIM, filename)); 1497 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1498 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1499 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1500 put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); 1501 puts.add(put); 1502 LOG 1503 .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); 1504 } 1505 } 1506 return puts; 1507 } 1508 1509 public static void snapshot(Connection conn) throws IOException { 1510 try (Admin admin = conn.getAdmin()) { 1511 Configuration conf = conn.getConfiguration(); 1512 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1513 } 1514 } 1515 1516 public static void restoreFromSnapshot(Connection conn) throws IOException { 1517 Configuration conf = conn.getConfiguration(); 1518 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1519 try (Admin admin = conn.getAdmin()) { 1520 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1521 if (snapshotExists(admin, snapshotName)) { 1522 admin.disableTable(BackupSystemTable.getTableName(conf)); 1523 admin.restoreSnapshot(snapshotName); 1524 admin.enableTable(BackupSystemTable.getTableName(conf)); 1525 LOG.debug("Done restoring backup system table"); 1526 } else { 1527 // Snapshot does not exists, i.e completeBackup failed after 1528 // deleting backup system table snapshot 1529 // In this case we log WARN and proceed 1530 LOG.warn( 1531 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1532 } 1533 } 1534 } 1535 1536 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1537 List<SnapshotDescription> list = admin.listSnapshots(); 1538 for (SnapshotDescription desc : list) { 1539 if (desc.getName().equals(snapshotName)) { 1540 return true; 1541 } 1542 } 1543 return false; 1544 } 1545 1546 public static boolean snapshotExists(Connection conn) throws IOException { 1547 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1548 } 1549 1550 public static void deleteSnapshot(Connection conn) throws IOException { 1551 Configuration conf = conn.getConfiguration(); 1552 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1553 try (Admin admin = conn.getAdmin()) { 1554 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1555 if (snapshotExists(admin, snapshotName)) { 1556 admin.deleteSnapshot(snapshotName); 1557 LOG.debug("Done deleting backup system table snapshot"); 1558 } else { 1559 LOG.error("Snapshot " + snapshotName + " does not exists"); 1560 } 1561 } 1562 } 1563 1564 /* 1565 * Creates Put's for bulk load resulting from running LoadIncrementalHFiles 1566 */ 1567 static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family, 1568 final List<Pair<Path, Path>> pairs) { 1569 List<Put> puts = new ArrayList<>(pairs.size()); 1570 for (Pair<Path, Path> pair : pairs) { 1571 Path path = pair.getSecond(); 1572 String file = path.toString(); 1573 int lastSlash = file.lastIndexOf("/"); 1574 String filename = file.substring(lastSlash + 1); 1575 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1576 Bytes.toString(region), BLK_LD_DELIM, filename)); 1577 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1578 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); 1579 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1580 put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); 1581 puts.add(put); 1582 LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); 1583 } 1584 return puts; 1585 } 1586 1587 public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) { 1588 List<Delete> lstDels = new ArrayList<>(lst.size()); 1589 for (TableName table : lst) { 1590 Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); 1591 del.addFamily(BackupSystemTable.META_FAMILY); 1592 lstDels.add(del); 1593 } 1594 return lstDels; 1595 } 1596 1597 private Put createPutForDeleteOperation(String[] backupIdList) { 1598 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1599 Put put = new Put(DELETE_OP_ROW); 1600 put.addColumn(META_FAMILY, FAM_COL, value); 1601 return put; 1602 } 1603 1604 private Delete createDeleteForBackupDeleteOperation() { 1605 Delete delete = new Delete(DELETE_OP_ROW); 1606 delete.addFamily(META_FAMILY); 1607 return delete; 1608 } 1609 1610 private Get createGetForDeleteOperation() { 1611 Get get = new Get(DELETE_OP_ROW); 1612 get.addFamily(META_FAMILY); 1613 return get; 1614 } 1615 1616 public void startDeleteOperation(String[] backupIdList) throws IOException { 1617 if (LOG.isTraceEnabled()) { 1618 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1619 } 1620 Put put = createPutForDeleteOperation(backupIdList); 1621 try (Table table = connection.getTable(tableName)) { 1622 table.put(put); 1623 } 1624 } 1625 1626 public void finishDeleteOperation() throws IOException { 1627 LOG.trace("Finsih delete operation for backup ids"); 1628 1629 Delete delete = createDeleteForBackupDeleteOperation(); 1630 try (Table table = connection.getTable(tableName)) { 1631 table.delete(delete); 1632 } 1633 } 1634 1635 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1636 LOG.trace("Get delete operation for backup ids"); 1637 1638 Get get = createGetForDeleteOperation(); 1639 try (Table table = connection.getTable(tableName)) { 1640 Result res = table.get(get); 1641 if (res.isEmpty()) { 1642 return null; 1643 } 1644 Cell cell = res.listCells().get(0); 1645 byte[] val = CellUtil.cloneValue(cell); 1646 if (val.length == 0) { 1647 return null; 1648 } 1649 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1650 .toArray(String[]::new); 1651 } 1652 } 1653 1654 private Put createPutForMergeOperation(String[] backupIdList) { 1655 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1656 Put put = new Put(MERGE_OP_ROW); 1657 put.addColumn(META_FAMILY, FAM_COL, value); 1658 return put; 1659 } 1660 1661 public boolean isMergeInProgress() throws IOException { 1662 Get get = new Get(MERGE_OP_ROW); 1663 try (Table table = connection.getTable(tableName)) { 1664 Result res = table.get(get); 1665 return !res.isEmpty(); 1666 } 1667 } 1668 1669 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1670 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1671 Put put = new Put(MERGE_OP_ROW); 1672 put.addColumn(META_FAMILY, PATH_COL, value); 1673 return put; 1674 } 1675 1676 private Delete createDeleteForBackupMergeOperation() { 1677 Delete delete = new Delete(MERGE_OP_ROW); 1678 delete.addFamily(META_FAMILY); 1679 return delete; 1680 } 1681 1682 private Get createGetForMergeOperation() { 1683 Get get = new Get(MERGE_OP_ROW); 1684 get.addFamily(META_FAMILY); 1685 return get; 1686 } 1687 1688 public void startMergeOperation(String[] backupIdList) throws IOException { 1689 if (LOG.isTraceEnabled()) { 1690 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1691 } 1692 Put put = createPutForMergeOperation(backupIdList); 1693 try (Table table = connection.getTable(tableName)) { 1694 table.put(put); 1695 } 1696 } 1697 1698 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1699 if (LOG.isTraceEnabled()) { 1700 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1701 } 1702 Put put = createPutForUpdateTablesForMerge(tables); 1703 try (Table table = connection.getTable(tableName)) { 1704 table.put(put); 1705 } 1706 } 1707 1708 public void finishMergeOperation() throws IOException { 1709 LOG.trace("Finish merge operation for backup ids"); 1710 1711 Delete delete = createDeleteForBackupMergeOperation(); 1712 try (Table table = connection.getTable(tableName)) { 1713 table.delete(delete); 1714 } 1715 } 1716 1717 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1718 LOG.trace("Get backup ids for merge operation"); 1719 1720 Get get = createGetForMergeOperation(); 1721 try (Table table = connection.getTable(tableName)) { 1722 Result res = table.get(get); 1723 if (res.isEmpty()) { 1724 return null; 1725 } 1726 Cell cell = res.listCells().get(0); 1727 byte[] val = CellUtil.cloneValue(cell); 1728 if (val.length == 0) { 1729 return null; 1730 } 1731 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1732 .toArray(String[]::new); 1733 } 1734 } 1735 1736 static Scan createScanForOrigBulkLoadedFiles(TableName table) { 1737 Scan scan = new Scan(); 1738 byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1739 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1740 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1741 scan.withStartRow(startRow); 1742 scan.withStopRow(stopRow); 1743 scan.addFamily(BackupSystemTable.META_FAMILY); 1744 scan.readVersions(1); 1745 return scan; 1746 } 1747 1748 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1749 // format is bulk : namespace : table : region : file 1750 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1751 } 1752 1753 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1754 // format is bulk : namespace : table : region : file 1755 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1756 Iterator<String> i = parts.iterator(); 1757 int idx = 3; 1758 if (parts.size() == 4) { 1759 // the table is in default namespace 1760 idx = 2; 1761 } 1762 String region = Iterators.get(i, idx); 1763 LOG.debug("bulk row string " + rowStr + " region " + region); 1764 return region; 1765 } 1766 1767 /* 1768 * Used to query bulk loaded hfiles which have been copied by incremental backup 1769 * @param backupId the backup Id. It can be null when querying for all tables 1770 * @return the Scan object 1771 */ 1772 static Scan createScanForBulkLoadedFiles(String backupId) { 1773 Scan scan = new Scan(); 1774 byte[] startRow = 1775 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1776 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1777 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1778 scan.withStartRow(startRow); 1779 scan.withStopRow(stopRow); 1780 scan.addFamily(BackupSystemTable.META_FAMILY); 1781 scan.readVersions(1); 1782 return scan; 1783 } 1784 1785 static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, 1786 long ts, int idx) { 1787 Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); 1788 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); 1789 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); 1790 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p)); 1791 return put; 1792 } 1793 1794 /** 1795 * Creates Scan operation to load backup set list 1796 * @return scan operation 1797 */ 1798 private Scan createScanForBackupSetList() { 1799 Scan scan = new Scan(); 1800 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1801 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1802 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1803 scan.withStartRow(startRow); 1804 scan.withStopRow(stopRow); 1805 scan.addFamily(BackupSystemTable.META_FAMILY); 1806 return scan; 1807 } 1808 1809 /** 1810 * Creates Get operation to load backup set content 1811 * @return get operation 1812 */ 1813 private Get createGetForBackupSet(String name) { 1814 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1815 get.addFamily(BackupSystemTable.META_FAMILY); 1816 return get; 1817 } 1818 1819 /** 1820 * Creates Delete operation to delete backup set content 1821 * @param name backup set's name 1822 * @return delete operation 1823 */ 1824 private Delete createDeleteForBackupSet(String name) { 1825 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1826 del.addFamily(BackupSystemTable.META_FAMILY); 1827 return del; 1828 } 1829 1830 /** 1831 * Creates Put operation to update backup set content 1832 * @param name backup set's name 1833 * @param tables list of tables 1834 * @return put operation 1835 */ 1836 private Put createPutForBackupSet(String name, String[] tables) { 1837 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1838 byte[] value = convertToByteArray(tables); 1839 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1840 return put; 1841 } 1842 1843 private byte[] convertToByteArray(String[] tables) { 1844 return Bytes.toBytes(StringUtils.join(tables, ",")); 1845 } 1846 1847 /** 1848 * Converts cell to backup set list. 1849 * @param current current cell 1850 * @return backup set as array of table names 1851 */ 1852 private String[] cellValueToBackupSet(Cell current) { 1853 byte[] data = CellUtil.cloneValue(current); 1854 if (!ArrayUtils.isEmpty(data)) { 1855 return Bytes.toString(data).split(","); 1856 } 1857 return new String[0]; 1858 } 1859 1860 /** 1861 * Converts cell key to backup set name. 1862 * @param current current cell 1863 * @return backup set name 1864 */ 1865 private String cellKeyToBackupSetName(Cell current) { 1866 byte[] data = CellUtil.cloneRow(current); 1867 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1868 } 1869 1870 private static byte[] rowkey(String s, String... other) { 1871 StringBuilder sb = new StringBuilder(s); 1872 for (String ss : other) { 1873 sb.append(ss); 1874 } 1875 return Bytes.toBytes(sb.toString()); 1876 } 1877}