001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.charset.StandardCharsets; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.Objects; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.ArrayUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.NamespaceDescriptor; 046import org.apache.hadoop.hbase.NamespaceExistException; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.backup.BackupInfo; 051import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 052import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 053import org.apache.hadoop.hbase.backup.BackupType; 054import org.apache.hadoop.hbase.backup.util.BackupUtils; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 058import org.apache.hadoop.hbase.client.Connection; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Get; 061import org.apache.hadoop.hbase.client.Put; 062import org.apache.hadoop.hbase.client.Result; 063import org.apache.hadoop.hbase.client.ResultScanner; 064import org.apache.hadoop.hbase.client.Scan; 065import org.apache.hadoop.hbase.client.SnapshotDescription; 066import org.apache.hadoop.hbase.client.Table; 067import org.apache.hadoop.hbase.client.TableDescriptor; 068import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.Pair; 072import org.apache.yetus.audience.InterfaceAudience; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 077import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 078 079import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 081 082/** 083 * This class provides API to access backup system table<br> 084 * Backup system table schema:<br> 085 * <p> 086 * <ul> 087 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 088 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 089 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> 090 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-%3E last WAL 091 * timestamp]</li> 092 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 093 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 094 * name</li> 095 * </ul> 096 * </p> 097 */ 098@InterfaceAudience.Private 099public final class BackupSystemTable implements Closeable { 100 101 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 102 103 static class WALItem { 104 String backupId; 105 String walFile; 106 String backupRoot; 107 108 WALItem(String backupId, String walFile, String backupRoot) { 109 this.backupId = backupId; 110 this.walFile = walFile; 111 this.backupRoot = backupRoot; 112 } 113 114 public String getBackupId() { 115 return backupId; 116 } 117 118 public String getWalFile() { 119 return walFile; 120 } 121 122 public String getBackupRoot() { 123 return backupRoot; 124 } 125 126 @Override 127 public String toString() { 128 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 129 } 130 } 131 132 /** 133 * Backup system table (main) name 134 */ 135 private TableName tableName; 136 137 /** 138 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 139 * separate table because we have to isolate general backup operations: create, merge etc from 140 * activity of RegionObserver, which controls process of a bulk loading 141 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 142 */ 143 private TableName bulkLoadTableName; 144 145 /** 146 * Stores backup sessions (contexts) 147 */ 148 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 149 /** 150 * Stores other meta 151 */ 152 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 153 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 154 /** 155 * Connection to HBase cluster, shared among all instances 156 */ 157 private final Connection connection; 158 159 private final static String BACKUP_INFO_PREFIX = "session:"; 160 private final static String START_CODE_ROW = "startcode:"; 161 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 162 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 163 164 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 165 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 166 167 private final static String INCR_BACKUP_SET = "incrbackupset:"; 168 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 169 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 170 171 private final static String BULK_LOAD_PREFIX = "bulk:"; 172 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 173 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 174 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 175 176 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 177 final static byte[] FAM_COL = Bytes.toBytes("fam"); 178 final static byte[] PATH_COL = Bytes.toBytes("path"); 179 final static byte[] STATE_COL = Bytes.toBytes("state"); 180 // the two states a bulk loaded file can be 181 final static byte[] BL_PREPARE = Bytes.toBytes("R"); 182 final static byte[] BL_COMMIT = Bytes.toBytes("D"); 183 184 private final static String SET_KEY_PREFIX = "backupset:"; 185 186 // separator between BULK_LOAD_PREFIX and ordinals 187 private final static String BLK_LD_DELIM = ":"; 188 private final static byte[] EMPTY_VALUE = new byte[] {}; 189 190 // Safe delimiter in a string 191 private final static String NULL = "\u0000"; 192 193 public BackupSystemTable(Connection conn) throws IOException { 194 this.connection = conn; 195 Configuration conf = this.connection.getConfiguration(); 196 tableName = BackupSystemTable.getTableName(conf); 197 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 198 checkSystemTable(); 199 } 200 201 private void checkSystemTable() throws IOException { 202 try (Admin admin = connection.getAdmin()) { 203 verifyNamespaceExists(admin); 204 Configuration conf = connection.getConfiguration(); 205 if (!admin.tableExists(tableName)) { 206 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 207 createSystemTable(admin, backupHTD); 208 } 209 if (!admin.tableExists(bulkLoadTableName)) { 210 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 211 createSystemTable(admin, blHTD); 212 } 213 waitForSystemTable(admin, tableName); 214 waitForSystemTable(admin, bulkLoadTableName); 215 } 216 } 217 218 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 219 try { 220 admin.createTable(descriptor); 221 } catch (TableExistsException e) { 222 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 223 // so may be subject to race conditions where one caller succeeds in creating the 224 // table and others fail because it now exists 225 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 226 } 227 } 228 229 private void verifyNamespaceExists(Admin admin) throws IOException { 230 String namespaceName = tableName.getNamespaceAsString(); 231 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 232 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 233 boolean exists = false; 234 for (NamespaceDescriptor nsd : list) { 235 if (nsd.getName().equals(ns.getName())) { 236 exists = true; 237 break; 238 } 239 } 240 if (!exists) { 241 try { 242 admin.createNamespace(ns); 243 } catch (NamespaceExistException e) { 244 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 245 // so may be subject to race conditions where one caller succeeds in creating the 246 // namespace and others fail because it now exists 247 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 248 } 249 } 250 } 251 252 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 253 // Return fast if the table is available and avoid a log message 254 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 255 return; 256 } 257 long TIMEOUT = 60000; 258 long startTime = EnvironmentEdgeManager.currentTime(); 259 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 260 tableName); 261 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 262 try { 263 Thread.sleep(100); 264 } catch (InterruptedException e) { 265 throw (IOException) new InterruptedIOException().initCause(e); 266 } 267 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 268 throw new IOException( 269 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 270 } 271 } 272 LOG.debug("Backup table {} exists and available", tableName); 273 } 274 275 @Override 276 public void close() { 277 // do nothing 278 } 279 280 /** 281 * Updates status (state) of a backup session in backup system table table 282 * @param info backup info 283 * @throws IOException exception 284 */ 285 public void updateBackupInfo(BackupInfo info) throws IOException { 286 if (LOG.isTraceEnabled()) { 287 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 288 + " set status=" + info.getState()); 289 } 290 try (Table table = connection.getTable(tableName)) { 291 Put put = createPutForBackupInfo(info); 292 table.put(put); 293 } 294 } 295 296 /* 297 * @param backupId the backup Id 298 * @return Map of rows to path of bulk loaded hfile 299 */ 300 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 301 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 302 try (Table table = connection.getTable(bulkLoadTableName); 303 ResultScanner scanner = table.getScanner(scan)) { 304 Result res = null; 305 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 306 while ((res = scanner.next()) != null) { 307 res.advance(); 308 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 309 for (Cell cell : res.listCells()) { 310 if ( 311 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 312 BackupSystemTable.PATH_COL.length) == 0 313 ) { 314 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 315 } 316 } 317 } 318 return map; 319 } 320 } 321 322 /* 323 * Used during restore 324 * @param backupId the backup Id 325 * @param sTableList List of tables 326 * @return array of Map of family to List of Paths 327 */ 328 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) 329 throws IOException { 330 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 331 @SuppressWarnings("unchecked") 332 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; 333 try (Table table = connection.getTable(bulkLoadTableName); 334 ResultScanner scanner = table.getScanner(scan)) { 335 Result res = null; 336 while ((res = scanner.next()) != null) { 337 res.advance(); 338 TableName tbl = null; 339 byte[] fam = null; 340 String path = null; 341 for (Cell cell : res.listCells()) { 342 if ( 343 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 344 BackupSystemTable.TBL_COL.length) == 0 345 ) { 346 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); 347 } else if ( 348 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 349 BackupSystemTable.FAM_COL.length) == 0 350 ) { 351 fam = CellUtil.cloneValue(cell); 352 } else if ( 353 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 354 BackupSystemTable.PATH_COL.length) == 0 355 ) { 356 path = Bytes.toString(CellUtil.cloneValue(cell)); 357 } 358 } 359 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); 360 if (srcIdx == -1) { 361 // the table is not among the query 362 continue; 363 } 364 if (mapForSrc[srcIdx] == null) { 365 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 366 } 367 List<Path> files; 368 if (!mapForSrc[srcIdx].containsKey(fam)) { 369 files = new ArrayList<Path>(); 370 mapForSrc[srcIdx].put(fam, files); 371 } else { 372 files = mapForSrc[srcIdx].get(fam); 373 } 374 files.add(new Path(path)); 375 if (LOG.isDebugEnabled()) { 376 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); 377 } 378 } 379 380 return mapForSrc; 381 } 382 } 383 384 /** 385 * Deletes backup status from backup system table table 386 * @param backupId backup id 387 * @throws IOException exception 388 */ 389 public void deleteBackupInfo(String backupId) throws IOException { 390 if (LOG.isTraceEnabled()) { 391 LOG.trace("delete backup status in backup system table for " + backupId); 392 } 393 try (Table table = connection.getTable(tableName)) { 394 Delete del = createDeleteForBackupInfo(backupId); 395 table.delete(del); 396 } 397 } 398 399 /* 400 * For postBulkLoadHFile() hook. 401 * @param tabName table name 402 * @param region the region receiving hfile 403 * @param finalPaths family and associated hfiles 404 */ 405 public void writePathsPostBulkLoad(TableName tabName, byte[] region, 406 Map<byte[], List<Path>> finalPaths) throws IOException { 407 if (LOG.isDebugEnabled()) { 408 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() 409 + " entries"); 410 } 411 try (Table table = connection.getTable(bulkLoadTableName)) { 412 List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); 413 table.put(puts); 414 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); 415 } 416 } 417 418 /* 419 * For preCommitStoreFile() hook 420 * @param tabName table name 421 * @param region the region receiving hfile 422 * @param family column family 423 * @param pairs list of paths for hfiles 424 */ 425 public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, 426 final List<Pair<Path, Path>> pairs) throws IOException { 427 if (LOG.isDebugEnabled()) { 428 LOG.debug( 429 "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); 430 } 431 try (Table table = connection.getTable(bulkLoadTableName)) { 432 List<Put> puts = 433 BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); 434 table.put(puts); 435 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); 436 } 437 } 438 439 /* 440 * Removes rows recording bulk loaded hfiles from backup table 441 * @param lst list of table names 442 * @param rows the rows to be deleted 443 */ 444 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 445 try (Table table = connection.getTable(bulkLoadTableName)) { 446 List<Delete> lstDels = new ArrayList<>(); 447 for (byte[] row : rows) { 448 Delete del = new Delete(row); 449 lstDels.add(del); 450 LOG.debug("orig deleting the row: " + Bytes.toString(row)); 451 } 452 table.delete(lstDels); 453 LOG.debug("deleted " + rows.size() + " original bulkload rows"); 454 } 455 } 456 457 /* 458 * Reads the rows from backup table recording bulk loaded hfiles 459 * @param tableList list of table names 460 * @return The keys of the Map are table, region and column family. Value of the map reflects 461 * whether the hfile was recorded by preCommitStoreFile hook (true) 462 */ 463 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> 464 readBulkloadRows(List<TableName> tableList) throws IOException { 465 466 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); 467 List<byte[]> rows = new ArrayList<>(); 468 for (TableName tTable : tableList) { 469 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); 470 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); 471 try (Table table = connection.getTable(bulkLoadTableName); 472 ResultScanner scanner = table.getScanner(scan)) { 473 Result res = null; 474 while ((res = scanner.next()) != null) { 475 res.advance(); 476 String fam = null; 477 String path = null; 478 boolean raw = false; 479 byte[] row; 480 String region = null; 481 for (Cell cell : res.listCells()) { 482 row = CellUtil.cloneRow(cell); 483 rows.add(row); 484 String rowStr = Bytes.toString(row); 485 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 486 if ( 487 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 488 BackupSystemTable.FAM_COL.length) == 0 489 ) { 490 fam = Bytes.toString(CellUtil.cloneValue(cell)); 491 } else if ( 492 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 493 BackupSystemTable.PATH_COL.length) == 0 494 ) { 495 path = Bytes.toString(CellUtil.cloneValue(cell)); 496 } else if ( 497 CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, 498 BackupSystemTable.STATE_COL.length) == 0 499 ) { 500 byte[] state = CellUtil.cloneValue(cell); 501 if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { 502 raw = true; 503 } else { 504 raw = false; 505 } 506 } 507 } 508 if (map.get(tTable) == null) { 509 map.put(tTable, new HashMap<>()); 510 tblMap = map.get(tTable); 511 } 512 if (tblMap.get(region) == null) { 513 tblMap.put(region, new HashMap<>()); 514 } 515 Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); 516 if (famMap.get(fam) == null) { 517 famMap.put(fam, new ArrayList<>()); 518 } 519 famMap.get(fam).add(new Pair<>(path, raw)); 520 LOG.debug("found orig " + path + " for " + fam + " of table " + region); 521 } 522 } 523 } 524 return new Pair<>(map, rows); 525 } 526 527 /* 528 * @param sTableList List of tables 529 * @param maps array of Map of family to List of Paths 530 * @param backupId the backup Id 531 */ 532 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, 533 String backupId) throws IOException { 534 try (Table table = connection.getTable(bulkLoadTableName)) { 535 long ts = EnvironmentEdgeManager.currentTime(); 536 int cnt = 0; 537 List<Put> puts = new ArrayList<>(); 538 for (int idx = 0; idx < maps.length; idx++) { 539 Map<byte[], List<Path>> map = maps[idx]; 540 TableName tn = sTableList.get(idx); 541 542 if (map == null) { 543 continue; 544 } 545 546 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { 547 byte[] fam = entry.getKey(); 548 List<Path> paths = entry.getValue(); 549 for (Path p : paths) { 550 Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, 551 ts, cnt++); 552 puts.add(put); 553 } 554 } 555 } 556 if (!puts.isEmpty()) { 557 table.put(puts); 558 } 559 } 560 } 561 562 /** 563 * Reads backup status object (instance of backup info) from backup system table table 564 * @param backupId backup id 565 * @return Current status of backup session or null 566 */ 567 public BackupInfo readBackupInfo(String backupId) throws IOException { 568 if (LOG.isTraceEnabled()) { 569 LOG.trace("read backup status from backup system table for: " + backupId); 570 } 571 572 try (Table table = connection.getTable(tableName)) { 573 Get get = createGetForBackupInfo(backupId); 574 Result res = table.get(get); 575 if (res.isEmpty()) { 576 return null; 577 } 578 return resultToBackupInfo(res); 579 } 580 } 581 582 /** 583 * Read the last backup start code (timestamp) of last successful backup. Will return null if 584 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 585 * there is no successful backup completed so far. 586 * @param backupRoot directory path to backup destination 587 * @return the timestamp of last successful backup 588 * @throws IOException exception 589 */ 590 public String readBackupStartCode(String backupRoot) throws IOException { 591 LOG.trace("read backup start code from backup system table"); 592 593 try (Table table = connection.getTable(tableName)) { 594 Get get = createGetForStartCode(backupRoot); 595 Result res = table.get(get); 596 if (res.isEmpty()) { 597 return null; 598 } 599 Cell cell = res.listCells().get(0); 600 byte[] val = CellUtil.cloneValue(cell); 601 if (val.length == 0) { 602 return null; 603 } 604 return new String(val, StandardCharsets.UTF_8); 605 } 606 } 607 608 /** 609 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 610 * @param startCode start code 611 * @param backupRoot root directory path to backup 612 * @throws IOException exception 613 */ 614 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 615 if (LOG.isTraceEnabled()) { 616 LOG.trace("write backup start code to backup system table " + startCode); 617 } 618 try (Table table = connection.getTable(tableName)) { 619 Put put = createPutForStartCode(startCode.toString(), backupRoot); 620 table.put(put); 621 } 622 } 623 624 /** 625 * Exclusive operations are: create, delete, merge 626 * @throws IOException if a table operation fails or an active backup exclusive operation is 627 * already underway 628 */ 629 public void startBackupExclusiveOperation() throws IOException { 630 LOG.debug("Start new backup exclusive operation"); 631 632 try (Table table = connection.getTable(tableName)) { 633 Put put = createPutForStartBackupSession(); 634 // First try to put if row does not exist 635 if ( 636 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 637 .ifNotExists().thenPut(put) 638 ) { 639 // Row exists, try to put if value == ACTIVE_SESSION_NO 640 if ( 641 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 642 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 643 ) { 644 throw new ExclusiveOperationException(); 645 } 646 } 647 } 648 } 649 650 private Put createPutForStartBackupSession() { 651 Put put = new Put(ACTIVE_SESSION_ROW); 652 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 653 return put; 654 } 655 656 public void finishBackupExclusiveOperation() throws IOException { 657 LOG.debug("Finish backup exclusive operation"); 658 659 try (Table table = connection.getTable(tableName)) { 660 Put put = createPutForStopBackupSession(); 661 if ( 662 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 663 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 664 ) { 665 throw new IOException("There is no active backup exclusive operation"); 666 } 667 } 668 } 669 670 private Put createPutForStopBackupSession() { 671 Put put = new Put(ACTIVE_SESSION_ROW); 672 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 673 return put; 674 } 675 676 /** 677 * Get the Region Servers log information after the last log roll from backup system table. 678 * @param backupRoot root directory path to backup 679 * @return RS log info 680 * @throws IOException exception 681 */ 682 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 683 throws IOException { 684 LOG.trace("read region server last roll log result to backup system table"); 685 686 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 687 688 try (Table table = connection.getTable(tableName); 689 ResultScanner scanner = table.getScanner(scan)) { 690 Result res; 691 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 692 while ((res = scanner.next()) != null) { 693 res.advance(); 694 Cell cell = res.current(); 695 byte[] row = CellUtil.cloneRow(cell); 696 String server = getServerNameForReadRegionServerLastLogRollResult(row); 697 byte[] data = CellUtil.cloneValue(cell); 698 rsTimestampMap.put(server, Bytes.toLong(data)); 699 } 700 return rsTimestampMap; 701 } 702 } 703 704 /** 705 * Writes Region Server last roll log result (timestamp) to backup system table table 706 * @param server Region Server name 707 * @param ts last log timestamp 708 * @param backupRoot root directory path to backup 709 * @throws IOException exception 710 */ 711 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 712 throws IOException { 713 LOG.trace("write region server last roll log result to backup system table"); 714 715 try (Table table = connection.getTable(tableName)) { 716 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 717 table.put(put); 718 } 719 } 720 721 /** 722 * Get all completed backup information (in desc order by time) 723 * @param onlyCompleted true, if only successfully completed sessions 724 * @return history info of BackupCompleteData 725 * @throws IOException exception 726 */ 727 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { 728 LOG.trace("get backup history from backup system table"); 729 730 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; 731 ArrayList<BackupInfo> list = getBackupInfos(state); 732 return BackupUtils.sortHistoryListDesc(list); 733 } 734 735 /** 736 * Get all backups history 737 * @return list of backup info 738 * @throws IOException if getting the backup history fails 739 */ 740 public List<BackupInfo> getBackupHistory() throws IOException { 741 return getBackupHistory(false); 742 } 743 744 /** 745 * Get first n backup history records 746 * @param n number of records, if n== -1 - max number is ignored 747 * @return list of records 748 * @throws IOException if getting the backup history fails 749 */ 750 public List<BackupInfo> getHistory(int n) throws IOException { 751 List<BackupInfo> history = getBackupHistory(); 752 if (n == -1 || history.size() <= n) { 753 return history; 754 } 755 return Collections.unmodifiableList(history.subList(0, n)); 756 } 757 758 /** 759 * Get backup history records filtered by list of filters. 760 * @param n max number of records, if n == -1 , then max number is ignored 761 * @param filters list of filters 762 * @return backup records 763 * @throws IOException if getting the backup history fails 764 */ 765 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { 766 if (filters.length == 0) { 767 return getHistory(n); 768 } 769 770 List<BackupInfo> history = getBackupHistory(); 771 List<BackupInfo> result = new ArrayList<>(); 772 for (BackupInfo bi : history) { 773 if (n >= 0 && result.size() == n) { 774 break; 775 } 776 777 boolean passed = true; 778 for (int i = 0; i < filters.length; i++) { 779 if (!filters[i].apply(bi)) { 780 passed = false; 781 break; 782 } 783 } 784 if (passed) { 785 result.add(bi); 786 } 787 } 788 return result; 789 } 790 791 /* 792 * Retrieve TableName's for completed backup of given type 793 * @param type backup type 794 * @return List of table names 795 */ 796 public List<TableName> getTablesForBackupType(BackupType type) throws IOException { 797 Set<TableName> names = new HashSet<>(); 798 List<BackupInfo> infos = getBackupHistory(true); 799 for (BackupInfo info : infos) { 800 if (info.getType() == type) { 801 names.addAll(info.getTableNames()); 802 } 803 } 804 return new ArrayList<>(names); 805 } 806 807 /** 808 * Get history for backup destination 809 * @param backupRoot backup destination path 810 * @return List of backup info 811 * @throws IOException if getting the backup history fails 812 */ 813 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { 814 ArrayList<BackupInfo> history = getBackupHistory(false); 815 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 816 BackupInfo info = iterator.next(); 817 if (!backupRoot.equals(info.getBackupRootDir())) { 818 iterator.remove(); 819 } 820 } 821 return history; 822 } 823 824 /** 825 * Get history for a table 826 * @param name table name 827 * @return history for a table 828 * @throws IOException if getting the backup history fails 829 */ 830 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { 831 List<BackupInfo> history = getBackupHistory(); 832 List<BackupInfo> tableHistory = new ArrayList<>(); 833 for (BackupInfo info : history) { 834 List<TableName> tables = info.getTableNames(); 835 if (tables.contains(name)) { 836 tableHistory.add(info); 837 } 838 } 839 return tableHistory; 840 } 841 842 public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 843 String backupRoot) throws IOException { 844 List<BackupInfo> history = getBackupHistory(backupRoot); 845 Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = new HashMap<>(); 846 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 847 BackupInfo info = iterator.next(); 848 if (!backupRoot.equals(info.getBackupRootDir())) { 849 continue; 850 } 851 List<TableName> tables = info.getTableNames(); 852 for (TableName tableName : tables) { 853 if (set.contains(tableName)) { 854 ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); 855 if (list == null) { 856 list = new ArrayList<>(); 857 tableHistoryMap.put(tableName, list); 858 } 859 list.add(info); 860 } 861 } 862 } 863 return tableHistoryMap; 864 } 865 866 /** 867 * Get all backup sessions with a given state (in descending order by time) 868 * @param state backup session state 869 * @return history info of backup info objects 870 * @throws IOException exception 871 */ 872 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { 873 LOG.trace("get backup infos from backup system table"); 874 875 Scan scan = createScanForBackupHistory(); 876 ArrayList<BackupInfo> list = new ArrayList<>(); 877 878 try (Table table = connection.getTable(tableName); 879 ResultScanner scanner = table.getScanner(scan)) { 880 Result res; 881 while ((res = scanner.next()) != null) { 882 res.advance(); 883 BackupInfo context = cellToBackupInfo(res.current()); 884 if (state != BackupState.ANY && context.getState() != state) { 885 continue; 886 } 887 list.add(context); 888 } 889 return list; 890 } 891 } 892 893 /** 894 * Write the current timestamps for each regionserver to backup system table after a successful 895 * full or incremental backup. The saved timestamp is of the last log file that was backed up 896 * already. 897 * @param tables tables 898 * @param newTimestamps timestamps 899 * @param backupRoot root directory path to backup 900 * @throws IOException exception 901 */ 902 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 903 String backupRoot) throws IOException { 904 if (LOG.isTraceEnabled()) { 905 LOG.trace("write RS log time stamps to backup system table for tables [" 906 + StringUtils.join(tables, ",") + "]"); 907 } 908 List<Put> puts = new ArrayList<>(); 909 for (TableName table : tables) { 910 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 911 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 912 puts.add(put); 913 } 914 try (Table table = connection.getTable(tableName)) { 915 table.put(puts); 916 } 917 } 918 919 /** 920 * Read the timestamp for each region server log after the last successful backup. Each table has 921 * its own set of the timestamps. The info is stored for each table as a concatenated string of 922 * rs-%3Etimestapmp 923 * @param backupRoot root directory path to backup 924 * @return the timestamp for each region server. key: tableName value: 925 * RegionServer,PreviousTimeStamp 926 * @throws IOException exception 927 */ 928 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 929 throws IOException { 930 if (LOG.isTraceEnabled()) { 931 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 932 } 933 934 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 935 936 Scan scan = createScanForReadLogTimestampMap(backupRoot); 937 try (Table table = connection.getTable(tableName); 938 ResultScanner scanner = table.getScanner(scan)) { 939 Result res; 940 while ((res = scanner.next()) != null) { 941 res.advance(); 942 Cell cell = res.current(); 943 byte[] row = CellUtil.cloneRow(cell); 944 String tabName = getTableNameForReadLogTimestampMap(row); 945 TableName tn = TableName.valueOf(tabName); 946 byte[] data = CellUtil.cloneValue(cell); 947 if (data == null) { 948 throw new IOException("Data of last backup data from backup system table " 949 + "is empty. Create a backup first."); 950 } 951 if (data != null && data.length > 0) { 952 HashMap<String, Long> lastBackup = 953 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 954 tableTimestampMap.put(tn, lastBackup); 955 } 956 } 957 return tableTimestampMap; 958 } 959 } 960 961 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 962 Map<String, Long> map) { 963 BackupProtos.TableServerTimestamp.Builder tstBuilder = 964 BackupProtos.TableServerTimestamp.newBuilder(); 965 tstBuilder 966 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 967 968 for (Entry<String, Long> entry : map.entrySet()) { 969 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 970 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 971 ServerName sn = ServerName.parseServerName(entry.getKey()); 972 snBuilder.setHostName(sn.getHostname()); 973 snBuilder.setPort(sn.getPort()); 974 builder.setServerName(snBuilder.build()); 975 builder.setTimestamp(entry.getValue()); 976 tstBuilder.addServerTimestamp(builder.build()); 977 } 978 979 return tstBuilder.build(); 980 } 981 982 private HashMap<String, Long> 983 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 984 985 HashMap<String, Long> map = new HashMap<>(); 986 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 987 for (BackupProtos.ServerTimestamp st : list) { 988 ServerName sn = 989 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 990 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 991 } 992 return map; 993 } 994 995 /** 996 * Return the current tables covered by incremental backup. 997 * @param backupRoot root directory path to backup 998 * @return set of tableNames 999 * @throws IOException exception 1000 */ 1001 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 1002 LOG.trace("get incremental backup table set from backup system table"); 1003 1004 TreeSet<TableName> set = new TreeSet<>(); 1005 1006 try (Table table = connection.getTable(tableName)) { 1007 Get get = createGetForIncrBackupTableSet(backupRoot); 1008 Result res = table.get(get); 1009 if (res.isEmpty()) { 1010 return set; 1011 } 1012 List<Cell> cells = res.listCells(); 1013 for (Cell cell : cells) { 1014 // qualifier = table name - we use table names as qualifiers 1015 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 1016 } 1017 return set; 1018 } 1019 } 1020 1021 /** 1022 * Add tables to global incremental backup set 1023 * @param tables set of tables 1024 * @param backupRoot root directory path to backup 1025 * @throws IOException exception 1026 */ 1027 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 1028 throws IOException { 1029 if (LOG.isTraceEnabled()) { 1030 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 1031 + " tables [" + StringUtils.join(tables, " ") + "]"); 1032 } 1033 if (LOG.isDebugEnabled()) { 1034 tables.forEach(table -> LOG.debug(Objects.toString(table))); 1035 } 1036 try (Table table = connection.getTable(tableName)) { 1037 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 1038 table.put(put); 1039 } 1040 } 1041 1042 /** 1043 * Deletes incremental backup set for a backup destination 1044 * @param backupRoot backup root 1045 */ 1046 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 1047 if (LOG.isTraceEnabled()) { 1048 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 1049 } 1050 try (Table table = connection.getTable(tableName)) { 1051 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 1052 table.delete(delete); 1053 } 1054 } 1055 1056 /** 1057 * Checks if we have at least one backup session in backup system table This API is used by 1058 * BackupLogCleaner 1059 * @return true, if - at least one session exists in backup system table table 1060 * @throws IOException exception 1061 */ 1062 public boolean hasBackupSessions() throws IOException { 1063 LOG.trace("Has backup sessions from backup system table"); 1064 1065 boolean result = false; 1066 Scan scan = createScanForBackupHistory(); 1067 scan.setCaching(1); 1068 try (Table table = connection.getTable(tableName); 1069 ResultScanner scanner = table.getScanner(scan)) { 1070 if (scanner.next() != null) { 1071 result = true; 1072 } 1073 return result; 1074 } 1075 } 1076 1077 /** 1078 * BACKUP SETS 1079 */ 1080 1081 /** 1082 * Get backup set list 1083 * @return backup set list 1084 * @throws IOException if a table or scanner operation fails 1085 */ 1086 public List<String> listBackupSets() throws IOException { 1087 LOG.trace("Backup set list"); 1088 1089 List<String> list = new ArrayList<>(); 1090 try (Table table = connection.getTable(tableName)) { 1091 Scan scan = createScanForBackupSetList(); 1092 scan.readVersions(1); 1093 try (ResultScanner scanner = table.getScanner(scan)) { 1094 Result res; 1095 while ((res = scanner.next()) != null) { 1096 res.advance(); 1097 list.add(cellKeyToBackupSetName(res.current())); 1098 } 1099 return list; 1100 } 1101 } 1102 } 1103 1104 /** 1105 * Get backup set description (list of tables) 1106 * @param name set's name 1107 * @return list of tables in a backup set 1108 * @throws IOException if a table operation fails 1109 */ 1110 public List<TableName> describeBackupSet(String name) throws IOException { 1111 if (LOG.isTraceEnabled()) { 1112 LOG.trace(" Backup set describe: " + name); 1113 } 1114 try (Table table = connection.getTable(tableName)) { 1115 Get get = createGetForBackupSet(name); 1116 Result res = table.get(get); 1117 if (res.isEmpty()) { 1118 return null; 1119 } 1120 res.advance(); 1121 String[] tables = cellValueToBackupSet(res.current()); 1122 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 1123 .collect(Collectors.toList()); 1124 } 1125 } 1126 1127 /** 1128 * Add backup set (list of tables) 1129 * @param name set name 1130 * @param newTables list of tables, comma-separated 1131 * @throws IOException if a table operation fails 1132 */ 1133 public void addToBackupSet(String name, String[] newTables) throws IOException { 1134 if (LOG.isTraceEnabled()) { 1135 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 1136 } 1137 String[] union = null; 1138 try (Table table = connection.getTable(tableName)) { 1139 Get get = createGetForBackupSet(name); 1140 Result res = table.get(get); 1141 if (res.isEmpty()) { 1142 union = newTables; 1143 } else { 1144 res.advance(); 1145 String[] tables = cellValueToBackupSet(res.current()); 1146 union = merge(tables, newTables); 1147 } 1148 Put put = createPutForBackupSet(name, union); 1149 table.put(put); 1150 } 1151 } 1152 1153 /** 1154 * Remove tables from backup set (list of tables) 1155 * @param name set name 1156 * @param toRemove list of tables 1157 * @throws IOException if a table operation or deleting the backup set fails 1158 */ 1159 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 1160 if (LOG.isTraceEnabled()) { 1161 LOG.trace( 1162 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 1163 } 1164 String[] disjoint; 1165 String[] tables; 1166 try (Table table = connection.getTable(tableName)) { 1167 Get get = createGetForBackupSet(name); 1168 Result res = table.get(get); 1169 if (res.isEmpty()) { 1170 LOG.warn("Backup set '" + name + "' not found."); 1171 return; 1172 } else { 1173 res.advance(); 1174 tables = cellValueToBackupSet(res.current()); 1175 disjoint = disjoin(tables, toRemove); 1176 } 1177 if (disjoint.length > 0 && disjoint.length != tables.length) { 1178 Put put = createPutForBackupSet(name, disjoint); 1179 table.put(put); 1180 } else if (disjoint.length == tables.length) { 1181 LOG.warn("Backup set '" + name + "' does not contain tables [" 1182 + StringUtils.join(toRemove, " ") + "]"); 1183 } else { // disjoint.length == 0 and tables.length >0 1184 // Delete backup set 1185 LOG.info("Backup set '" + name + "' is empty. Deleting."); 1186 deleteBackupSet(name); 1187 } 1188 } 1189 } 1190 1191 private String[] merge(String[] existingTables, String[] newTables) { 1192 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1193 tables.addAll(Arrays.asList(newTables)); 1194 return tables.toArray(new String[0]); 1195 } 1196 1197 private String[] disjoin(String[] existingTables, String[] toRemove) { 1198 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1199 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 1200 return tables.toArray(new String[0]); 1201 } 1202 1203 /** 1204 * Delete backup set 1205 * @param name set's name 1206 * @throws IOException if getting or deleting the table fails 1207 */ 1208 public void deleteBackupSet(String name) throws IOException { 1209 if (LOG.isTraceEnabled()) { 1210 LOG.trace(" Backup set delete: " + name); 1211 } 1212 try (Table table = connection.getTable(tableName)) { 1213 Delete del = createDeleteForBackupSet(name); 1214 table.delete(del); 1215 } 1216 } 1217 1218 /** 1219 * Get backup system table descriptor 1220 * @return table's descriptor 1221 */ 1222 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1223 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1224 1225 ColumnFamilyDescriptorBuilder colBuilder = 1226 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1227 1228 colBuilder.setMaxVersions(1); 1229 Configuration config = HBaseConfiguration.create(); 1230 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1231 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1232 colBuilder.setTimeToLive(ttl); 1233 1234 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1235 builder.setColumnFamily(colSessionsDesc); 1236 1237 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1238 colBuilder.setTimeToLive(ttl); 1239 builder.setColumnFamily(colBuilder.build()); 1240 return builder.build(); 1241 } 1242 1243 public static TableName getTableName(Configuration conf) { 1244 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1245 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1246 return TableName.valueOf(name); 1247 } 1248 1249 public static String getTableNameAsString(Configuration conf) { 1250 return getTableName(conf).getNameAsString(); 1251 } 1252 1253 public static String getSnapshotName(Configuration conf) { 1254 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1255 } 1256 1257 /** 1258 * Get backup system table descriptor 1259 * @return table's descriptor 1260 */ 1261 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1262 TableDescriptorBuilder builder = 1263 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1264 1265 ColumnFamilyDescriptorBuilder colBuilder = 1266 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1267 colBuilder.setMaxVersions(1); 1268 Configuration config = HBaseConfiguration.create(); 1269 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1270 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1271 colBuilder.setTimeToLive(ttl); 1272 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1273 builder.setColumnFamily(colSessionsDesc); 1274 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1275 colBuilder.setTimeToLive(ttl); 1276 builder.setColumnFamily(colBuilder.build()); 1277 return builder.build(); 1278 } 1279 1280 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1281 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1282 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1283 return TableName.valueOf(name); 1284 } 1285 1286 /** 1287 * Creates Put operation for a given backup info object 1288 * @param context backup info 1289 * @return put operation 1290 * @throws IOException exception 1291 */ 1292 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1293 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1294 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1295 context.toByteArray()); 1296 return put; 1297 } 1298 1299 /** 1300 * Creates Get operation for a given backup id 1301 * @param backupId backup's ID 1302 * @return get operation 1303 * @throws IOException exception 1304 */ 1305 private Get createGetForBackupInfo(String backupId) throws IOException { 1306 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1307 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1308 get.readVersions(1); 1309 return get; 1310 } 1311 1312 /** 1313 * Creates Delete operation for a given backup id 1314 * @param backupId backup's ID 1315 * @return delete operation 1316 */ 1317 private Delete createDeleteForBackupInfo(String backupId) { 1318 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1319 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1320 return del; 1321 } 1322 1323 /** 1324 * Converts Result to BackupInfo 1325 * @param res HBase result 1326 * @return backup info instance 1327 * @throws IOException exception 1328 */ 1329 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1330 res.advance(); 1331 Cell cell = res.current(); 1332 return cellToBackupInfo(cell); 1333 } 1334 1335 /** 1336 * Creates Get operation to retrieve start code from backup system table 1337 * @return get operation 1338 * @throws IOException exception 1339 */ 1340 private Get createGetForStartCode(String rootPath) throws IOException { 1341 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1342 get.addFamily(BackupSystemTable.META_FAMILY); 1343 get.readVersions(1); 1344 return get; 1345 } 1346 1347 /** 1348 * Creates Put operation to store start code to backup system table 1349 * @return put operation 1350 */ 1351 private Put createPutForStartCode(String startCode, String rootPath) { 1352 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1353 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1354 Bytes.toBytes(startCode)); 1355 return put; 1356 } 1357 1358 /** 1359 * Creates Get to retrieve incremental backup table set from backup system table 1360 * @return get operation 1361 * @throws IOException exception 1362 */ 1363 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1364 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1365 get.addFamily(BackupSystemTable.META_FAMILY); 1366 get.readVersions(1); 1367 return get; 1368 } 1369 1370 /** 1371 * Creates Put to store incremental backup table set 1372 * @param tables tables 1373 * @return put operation 1374 */ 1375 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1376 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1377 for (TableName table : tables) { 1378 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1379 EMPTY_VALUE); 1380 } 1381 return put; 1382 } 1383 1384 /** 1385 * Creates Delete for incremental backup table set 1386 * @param backupRoot backup root 1387 * @return delete operation 1388 */ 1389 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1390 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1391 delete.addFamily(BackupSystemTable.META_FAMILY); 1392 return delete; 1393 } 1394 1395 /** 1396 * Creates Scan operation to load backup history 1397 * @return scan operation 1398 */ 1399 private Scan createScanForBackupHistory() { 1400 Scan scan = new Scan(); 1401 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1402 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1403 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1404 scan.withStartRow(startRow); 1405 scan.withStopRow(stopRow); 1406 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1407 scan.readVersions(1); 1408 return scan; 1409 } 1410 1411 /** 1412 * Converts cell to backup info instance. 1413 * @param current current cell 1414 * @return backup backup info instance 1415 * @throws IOException exception 1416 */ 1417 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1418 byte[] data = CellUtil.cloneValue(current); 1419 return BackupInfo.fromByteArray(data); 1420 } 1421 1422 /** 1423 * Creates Put to write RS last roll log timestamp map 1424 * @param table table 1425 * @param smap map, containing RS:ts 1426 * @return put operation 1427 */ 1428 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1429 String backupRoot) { 1430 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1431 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1432 return put; 1433 } 1434 1435 /** 1436 * Creates Scan to load table-> { RS -> ts} map of maps 1437 * @return scan operation 1438 */ 1439 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1440 Scan scan = new Scan(); 1441 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1442 scan.addFamily(BackupSystemTable.META_FAMILY); 1443 1444 return scan; 1445 } 1446 1447 /** 1448 * Get table name from rowkey 1449 * @param cloneRow rowkey 1450 * @return table name 1451 */ 1452 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1453 String s = Bytes.toString(cloneRow); 1454 int index = s.lastIndexOf(NULL); 1455 return s.substring(index + 1); 1456 } 1457 1458 /** 1459 * Creates Put to store RS last log result 1460 * @param server server name 1461 * @param timestamp log roll result (timestamp) 1462 * @return put operation 1463 */ 1464 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1465 String backupRoot) { 1466 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1467 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1468 Bytes.toBytes(timestamp)); 1469 return put; 1470 } 1471 1472 /** 1473 * Creates Scan operation to load last RS log roll results 1474 * @return scan operation 1475 */ 1476 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1477 Scan scan = new Scan(); 1478 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1479 scan.addFamily(BackupSystemTable.META_FAMILY); 1480 scan.readVersions(1); 1481 1482 return scan; 1483 } 1484 1485 /** 1486 * Get server's name from rowkey 1487 * @param row rowkey 1488 * @return server's name 1489 */ 1490 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1491 String s = Bytes.toString(row); 1492 int index = s.lastIndexOf(NULL); 1493 return s.substring(index + 1); 1494 } 1495 1496 /* 1497 * Creates Put's for bulk load resulting from running LoadIncrementalHFiles 1498 */ 1499 static List<Put> createPutForCommittedBulkload(TableName table, byte[] region, 1500 Map<byte[], List<Path>> finalPaths) { 1501 List<Put> puts = new ArrayList<>(); 1502 for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) { 1503 for (Path path : entry.getValue()) { 1504 String file = path.toString(); 1505 int lastSlash = file.lastIndexOf("/"); 1506 String filename = file.substring(lastSlash + 1); 1507 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1508 Bytes.toString(region), BLK_LD_DELIM, filename)); 1509 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1510 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1511 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1512 put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); 1513 puts.add(put); 1514 LOG 1515 .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); 1516 } 1517 } 1518 return puts; 1519 } 1520 1521 public static void snapshot(Connection conn) throws IOException { 1522 try (Admin admin = conn.getAdmin()) { 1523 Configuration conf = conn.getConfiguration(); 1524 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1525 } 1526 } 1527 1528 public static void restoreFromSnapshot(Connection conn) throws IOException { 1529 Configuration conf = conn.getConfiguration(); 1530 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1531 try (Admin admin = conn.getAdmin()) { 1532 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1533 if (snapshotExists(admin, snapshotName)) { 1534 admin.disableTable(BackupSystemTable.getTableName(conf)); 1535 admin.restoreSnapshot(snapshotName); 1536 admin.enableTable(BackupSystemTable.getTableName(conf)); 1537 LOG.debug("Done restoring backup system table"); 1538 } else { 1539 // Snapshot does not exists, i.e completeBackup failed after 1540 // deleting backup system table snapshot 1541 // In this case we log WARN and proceed 1542 LOG.warn( 1543 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1544 } 1545 } 1546 } 1547 1548 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1549 List<SnapshotDescription> list = admin.listSnapshots(); 1550 for (SnapshotDescription desc : list) { 1551 if (desc.getName().equals(snapshotName)) { 1552 return true; 1553 } 1554 } 1555 return false; 1556 } 1557 1558 public static boolean snapshotExists(Connection conn) throws IOException { 1559 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1560 } 1561 1562 public static void deleteSnapshot(Connection conn) throws IOException { 1563 Configuration conf = conn.getConfiguration(); 1564 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1565 try (Admin admin = conn.getAdmin()) { 1566 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1567 if (snapshotExists(admin, snapshotName)) { 1568 admin.deleteSnapshot(snapshotName); 1569 LOG.debug("Done deleting backup system table snapshot"); 1570 } else { 1571 LOG.error("Snapshot " + snapshotName + " does not exists"); 1572 } 1573 } 1574 } 1575 1576 /* 1577 * Creates Put's for bulk load resulting from running LoadIncrementalHFiles 1578 */ 1579 static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family, 1580 final List<Pair<Path, Path>> pairs) { 1581 List<Put> puts = new ArrayList<>(pairs.size()); 1582 for (Pair<Path, Path> pair : pairs) { 1583 Path path = pair.getSecond(); 1584 String file = path.toString(); 1585 int lastSlash = file.lastIndexOf("/"); 1586 String filename = file.substring(lastSlash + 1); 1587 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1588 Bytes.toString(region), BLK_LD_DELIM, filename)); 1589 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1590 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); 1591 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1592 put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); 1593 puts.add(put); 1594 LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); 1595 } 1596 return puts; 1597 } 1598 1599 public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) { 1600 List<Delete> lstDels = new ArrayList<>(lst.size()); 1601 for (TableName table : lst) { 1602 Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); 1603 del.addFamily(BackupSystemTable.META_FAMILY); 1604 lstDels.add(del); 1605 } 1606 return lstDels; 1607 } 1608 1609 private Put createPutForDeleteOperation(String[] backupIdList) { 1610 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1611 Put put = new Put(DELETE_OP_ROW); 1612 put.addColumn(META_FAMILY, FAM_COL, value); 1613 return put; 1614 } 1615 1616 private Delete createDeleteForBackupDeleteOperation() { 1617 Delete delete = new Delete(DELETE_OP_ROW); 1618 delete.addFamily(META_FAMILY); 1619 return delete; 1620 } 1621 1622 private Get createGetForDeleteOperation() { 1623 Get get = new Get(DELETE_OP_ROW); 1624 get.addFamily(META_FAMILY); 1625 return get; 1626 } 1627 1628 public void startDeleteOperation(String[] backupIdList) throws IOException { 1629 if (LOG.isTraceEnabled()) { 1630 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1631 } 1632 Put put = createPutForDeleteOperation(backupIdList); 1633 try (Table table = connection.getTable(tableName)) { 1634 table.put(put); 1635 } 1636 } 1637 1638 public void finishDeleteOperation() throws IOException { 1639 LOG.trace("Finsih delete operation for backup ids"); 1640 1641 Delete delete = createDeleteForBackupDeleteOperation(); 1642 try (Table table = connection.getTable(tableName)) { 1643 table.delete(delete); 1644 } 1645 } 1646 1647 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1648 LOG.trace("Get delete operation for backup ids"); 1649 1650 Get get = createGetForDeleteOperation(); 1651 try (Table table = connection.getTable(tableName)) { 1652 Result res = table.get(get); 1653 if (res.isEmpty()) { 1654 return null; 1655 } 1656 Cell cell = res.listCells().get(0); 1657 byte[] val = CellUtil.cloneValue(cell); 1658 if (val.length == 0) { 1659 return null; 1660 } 1661 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1662 .toArray(String[]::new); 1663 } 1664 } 1665 1666 private Put createPutForMergeOperation(String[] backupIdList) { 1667 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1668 Put put = new Put(MERGE_OP_ROW); 1669 put.addColumn(META_FAMILY, FAM_COL, value); 1670 return put; 1671 } 1672 1673 public boolean isMergeInProgress() throws IOException { 1674 Get get = new Get(MERGE_OP_ROW); 1675 try (Table table = connection.getTable(tableName)) { 1676 Result res = table.get(get); 1677 return !res.isEmpty(); 1678 } 1679 } 1680 1681 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1682 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1683 Put put = new Put(MERGE_OP_ROW); 1684 put.addColumn(META_FAMILY, PATH_COL, value); 1685 return put; 1686 } 1687 1688 private Delete createDeleteForBackupMergeOperation() { 1689 Delete delete = new Delete(MERGE_OP_ROW); 1690 delete.addFamily(META_FAMILY); 1691 return delete; 1692 } 1693 1694 private Get createGetForMergeOperation() { 1695 Get get = new Get(MERGE_OP_ROW); 1696 get.addFamily(META_FAMILY); 1697 return get; 1698 } 1699 1700 public void startMergeOperation(String[] backupIdList) throws IOException { 1701 if (LOG.isTraceEnabled()) { 1702 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1703 } 1704 Put put = createPutForMergeOperation(backupIdList); 1705 try (Table table = connection.getTable(tableName)) { 1706 table.put(put); 1707 } 1708 } 1709 1710 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1711 if (LOG.isTraceEnabled()) { 1712 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1713 } 1714 Put put = createPutForUpdateTablesForMerge(tables); 1715 try (Table table = connection.getTable(tableName)) { 1716 table.put(put); 1717 } 1718 } 1719 1720 public void finishMergeOperation() throws IOException { 1721 LOG.trace("Finish merge operation for backup ids"); 1722 1723 Delete delete = createDeleteForBackupMergeOperation(); 1724 try (Table table = connection.getTable(tableName)) { 1725 table.delete(delete); 1726 } 1727 } 1728 1729 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1730 LOG.trace("Get backup ids for merge operation"); 1731 1732 Get get = createGetForMergeOperation(); 1733 try (Table table = connection.getTable(tableName)) { 1734 Result res = table.get(get); 1735 if (res.isEmpty()) { 1736 return null; 1737 } 1738 Cell cell = res.listCells().get(0); 1739 byte[] val = CellUtil.cloneValue(cell); 1740 if (val.length == 0) { 1741 return null; 1742 } 1743 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1744 .toArray(String[]::new); 1745 } 1746 } 1747 1748 static Scan createScanForOrigBulkLoadedFiles(TableName table) { 1749 Scan scan = new Scan(); 1750 byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1751 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1752 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1753 scan.withStartRow(startRow); 1754 scan.withStopRow(stopRow); 1755 scan.addFamily(BackupSystemTable.META_FAMILY); 1756 scan.readVersions(1); 1757 return scan; 1758 } 1759 1760 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1761 // format is bulk : namespace : table : region : file 1762 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1763 } 1764 1765 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1766 // format is bulk : namespace : table : region : file 1767 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1768 Iterator<String> i = parts.iterator(); 1769 int idx = 3; 1770 if (parts.size() == 4) { 1771 // the table is in default namespace 1772 idx = 2; 1773 } 1774 String region = Iterators.get(i, idx); 1775 LOG.debug("bulk row string " + rowStr + " region " + region); 1776 return region; 1777 } 1778 1779 /* 1780 * Used to query bulk loaded hfiles which have been copied by incremental backup 1781 * @param backupId the backup Id. It can be null when querying for all tables 1782 * @return the Scan object 1783 */ 1784 static Scan createScanForBulkLoadedFiles(String backupId) { 1785 Scan scan = new Scan(); 1786 byte[] startRow = 1787 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1788 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1789 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1790 scan.withStartRow(startRow); 1791 scan.withStopRow(stopRow); 1792 scan.addFamily(BackupSystemTable.META_FAMILY); 1793 scan.readVersions(1); 1794 return scan; 1795 } 1796 1797 static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, 1798 long ts, int idx) { 1799 Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); 1800 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); 1801 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); 1802 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p)); 1803 return put; 1804 } 1805 1806 /** 1807 * Creates Scan operation to load backup set list 1808 * @return scan operation 1809 */ 1810 private Scan createScanForBackupSetList() { 1811 Scan scan = new Scan(); 1812 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1813 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1814 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1815 scan.withStartRow(startRow); 1816 scan.withStopRow(stopRow); 1817 scan.addFamily(BackupSystemTable.META_FAMILY); 1818 return scan; 1819 } 1820 1821 /** 1822 * Creates Get operation to load backup set content 1823 * @return get operation 1824 */ 1825 private Get createGetForBackupSet(String name) { 1826 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1827 get.addFamily(BackupSystemTable.META_FAMILY); 1828 return get; 1829 } 1830 1831 /** 1832 * Creates Delete operation to delete backup set content 1833 * @param name backup set's name 1834 * @return delete operation 1835 */ 1836 private Delete createDeleteForBackupSet(String name) { 1837 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1838 del.addFamily(BackupSystemTable.META_FAMILY); 1839 return del; 1840 } 1841 1842 /** 1843 * Creates Put operation to update backup set content 1844 * @param name backup set's name 1845 * @param tables list of tables 1846 * @return put operation 1847 */ 1848 private Put createPutForBackupSet(String name, String[] tables) { 1849 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1850 byte[] value = convertToByteArray(tables); 1851 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1852 return put; 1853 } 1854 1855 private byte[] convertToByteArray(String[] tables) { 1856 return Bytes.toBytes(StringUtils.join(tables, ",")); 1857 } 1858 1859 /** 1860 * Converts cell to backup set list. 1861 * @param current current cell 1862 * @return backup set as array of table names 1863 */ 1864 private String[] cellValueToBackupSet(Cell current) { 1865 byte[] data = CellUtil.cloneValue(current); 1866 if (!ArrayUtils.isEmpty(data)) { 1867 return Bytes.toString(data).split(","); 1868 } 1869 return new String[0]; 1870 } 1871 1872 /** 1873 * Converts cell key to backup set name. 1874 * @param current current cell 1875 * @return backup set name 1876 */ 1877 private String cellKeyToBackupSetName(Cell current) { 1878 byte[] data = CellUtil.cloneRow(current); 1879 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1880 } 1881 1882 private static byte[] rowkey(String s, String... other) { 1883 StringBuilder sb = new StringBuilder(s); 1884 for (String ss : other) { 1885 sb.append(ss); 1886 } 1887 return Bytes.toBytes(sb.toString()); 1888 } 1889}