001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; 021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; 022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; 023import static org.apache.hadoop.hbase.backup.BackupType.FULL; 024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.time.Instant; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collections; 033import java.util.List; 034import java.util.stream.Stream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testing.TestingHBaseCluster; 053import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.junit.jupiter.api.AfterAll; 056import org.junit.jupiter.api.AfterEach; 057import org.junit.jupiter.api.BeforeAll; 058import org.junit.jupiter.api.BeforeEach; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.TestTemplate; 061import org.junit.jupiter.params.provider.Arguments; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065@Tag(LargeTests.TAG) 066@HBaseParameterizedTestTemplate(name = "{index}: restoreToOtherTable={0}") 067public class TestBackupRestoreOnEmptyEnvironment { 068 069 private static final Logger LOG = 070 LoggerFactory.getLogger(TestBackupRestoreOnEmptyEnvironment.class); 071 072 public boolean restoreToOtherTable; 073 074 public TestBackupRestoreOnEmptyEnvironment(boolean restoreToOtherTable) { 075 this.restoreToOtherTable = restoreToOtherTable; 076 } 077 078 public static Stream<Arguments> parameters() { 079 return Stream.of(Arguments.of(true), Arguments.of(false)); 080 } 081 082 private TableName sourceTable; 083 private TableName targetTable; 084 085 private static TestingHBaseCluster cluster; 086 private static Path BACKUP_ROOT_DIR; 087 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); 088 089 @BeforeAll 090 public static void beforeClass() throws Exception { 091 Configuration conf = HBaseConfiguration.create(); 092 enableBackup(conf); 093 cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); 094 cluster.start(); 095 BACKUP_ROOT_DIR = new Path(new Path(conf.get("fs.defaultFS")), new Path("/backupIT")); 096 } 097 098 @AfterAll 099 public static void afterClass() throws Exception { 100 cluster.stop(); 101 } 102 103 @BeforeEach 104 public void setUp() throws Exception { 105 sourceTable = TableName.valueOf("table"); 106 targetTable = TableName.valueOf("another-table"); 107 createTable(sourceTable); 108 createTable(targetTable); 109 } 110 111 @AfterEach 112 public void removeTables() throws Exception { 113 deleteTables(); 114 } 115 116 @TestTemplate 117 public void testRestoreToCorrectTable() throws Exception { 118 Instant timestamp = Instant.now().minusSeconds(10); 119 120 // load some data 121 putLoad(sourceTable, timestamp, "data"); 122 123 String backupId = backup(FULL, Collections.singletonList(sourceTable)); 124 BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 125 assertTrue(backupInfo.getTables().contains(sourceTable)); 126 127 LOG.info("Deleting the tables before restore ..."); 128 deleteTables(); 129 130 if (restoreToOtherTable) { 131 restore(backupId, sourceTable, targetTable); 132 validateDataEquals(targetTable, "data"); 133 } else { 134 restore(backupId, sourceTable, sourceTable); 135 validateDataEquals(sourceTable, "data"); 136 } 137 138 } 139 140 @TestTemplate 141 public void testRestoreCorrectTableForIncremental() throws Exception { 142 Instant timestamp = Instant.now().minusSeconds(10); 143 144 // load some data 145 putLoad(sourceTable, timestamp, "data"); 146 147 String backupId = backup(FULL, Collections.singletonList(sourceTable)); 148 verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 149 150 // some incremental data 151 putLoad(sourceTable, timestamp.plusMillis(1), "new_data"); 152 153 String backupId2 = backup(INCREMENTAL, Collections.singletonList(sourceTable)); 154 verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); 155 156 LOG.info("Deleting the tables before restore ..."); 157 deleteTables(); 158 159 if (restoreToOtherTable) { 160 restore(backupId2, sourceTable, targetTable); 161 validateDataEquals(targetTable, "new_data"); 162 } else { 163 restore(backupId2, sourceTable, sourceTable); 164 validateDataEquals(sourceTable, "new_data"); 165 } 166 167 } 168 169 private void createTable(TableName tableName) throws IOException { 170 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 171 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 172 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 173 Admin admin = connection.getAdmin()) { 174 admin.createTable(builder.build()); 175 } 176 } 177 178 private void deleteTables() throws IOException { 179 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 180 Admin admin = connection.getAdmin()) { 181 for (TableName table : Arrays.asList(sourceTable, targetTable)) { 182 if (admin.tableExists(table)) { 183 admin.disableTable(table); 184 admin.deleteTable(table); 185 } 186 } 187 } 188 } 189 190 private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { 191 LOG.info("Writing new data to HBase using normal Puts: {}", data); 192 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { 193 Table table = connection.getTable(sourceTable); 194 List<Put> puts = new ArrayList<>(); 195 for (int i = 0; i < 10; i++) { 196 Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); 197 put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); 198 puts.add(put); 199 200 if (i % 100 == 0) { 201 table.put(puts); 202 puts.clear(); 203 } 204 } 205 if (!puts.isEmpty()) { 206 table.put(puts); 207 } 208 connection.getAdmin().flush(tableName); 209 } 210 } 211 212 private String backup(BackupType backupType, List<TableName> tables) throws IOException { 213 LOG.info("Creating the backup ..."); 214 215 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 216 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 217 BackupRequest backupRequest = 218 new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) 219 .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); 220 return backupAdmin.backupTables(backupRequest); 221 } 222 223 } 224 225 private void restore(String backupId, TableName sourceTableName, TableName targetTableName) 226 throws IOException { 227 LOG.info("Restoring data ..."); 228 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 229 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 230 RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId) 231 .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOverwrite(true) 232 .withFromTables(new TableName[] { sourceTableName }) 233 .withToTables(new TableName[] { targetTableName }).build(); 234 backupAdmin.restore(restoreRequest); 235 } 236 } 237 238 private void validateDataEquals(TableName tableName, String expectedData) throws IOException { 239 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 240 Table table = connection.getTable(tableName)) { 241 Scan scan = new Scan(); 242 scan.setRaw(true); 243 scan.setBatch(100); 244 245 for (Result sourceResult : table.getScanner(scan)) { 246 List<Cell> sourceCells = sourceResult.listCells(); 247 for (Cell cell : sourceCells) { 248 assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), 249 cell.getValueOffset(), cell.getValueLength())); 250 } 251 } 252 } 253 } 254}