001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; 021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; 022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; 023import static org.apache.hadoop.hbase.backup.BackupType.FULL; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.nio.ByteBuffer; 030import java.time.Instant; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Map; 035import java.util.UUID; 036import java.util.stream.Stream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 046import org.apache.hadoop.hbase.client.Admin; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.ConnectionFactory; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 055import org.apache.hadoop.hbase.io.hfile.HFile; 056import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.testing.TestingHBaseCluster; 059import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; 060import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.jupiter.api.AfterAll; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.BeforeEach; 065import org.junit.jupiter.api.Tag; 066import org.junit.jupiter.api.TestTemplate; 067import org.junit.jupiter.params.provider.Arguments; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Tag(LargeTests.TAG) 072@HBaseParameterizedTestTemplate(name = "{index}: useBulkLoad={0}") 073public class TestBackupRestoreWithModifications { 074 075 private static final Logger LOG = 076 LoggerFactory.getLogger(TestBackupRestoreWithModifications.class); 077 078 public boolean useBulkLoad; 079 080 public TestBackupRestoreWithModifications(boolean useBulkLoad) { 081 this.useBulkLoad = useBulkLoad; 082 } 083 084 public static Stream<Arguments> parameters() { 085 return Stream.of(Arguments.of(true), Arguments.of(false)); 086 } 087 088 private TableName sourceTable; 089 private TableName targetTable; 090 091 private List<TableName> allTables; 092 private static TestingHBaseCluster cluster; 093 private static final Path BACKUP_ROOT_DIR = new Path("backupIT"); 094 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); 095 096 @BeforeAll 097 public static void beforeClass() throws Exception { 098 Configuration conf = HBaseConfiguration.create(); 099 enableBackup(conf); 100 cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); 101 cluster.start(); 102 } 103 104 @AfterAll 105 public static void afterClass() throws Exception { 106 cluster.stop(); 107 } 108 109 @BeforeEach 110 public void setUp() throws Exception { 111 sourceTable = TableName.valueOf("table-" + useBulkLoad); 112 targetTable = TableName.valueOf("another-table-" + useBulkLoad); 113 allTables = Arrays.asList(sourceTable, targetTable); 114 createTable(sourceTable); 115 createTable(targetTable); 116 } 117 118 @TestTemplate 119 public void testModificationsOnTable() throws Exception { 120 Instant timestamp = Instant.now(); 121 122 // load some data 123 load(sourceTable, timestamp, "data"); 124 125 String backupId = backup(FULL, allTables); 126 BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 127 assertTrue(backupInfo.getTables().contains(sourceTable)); 128 129 restore(backupId, sourceTable, targetTable); 130 validateDataEquals(sourceTable, "data"); 131 validateDataEquals(targetTable, "data"); 132 133 // load new data on the same timestamp 134 load(sourceTable, timestamp, "changed_data"); 135 136 backupId = backup(FULL, allTables); 137 backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 138 assertTrue(backupInfo.getTables().contains(sourceTable)); 139 140 restore(backupId, sourceTable, targetTable); 141 validateDataEquals(sourceTable, "changed_data"); 142 validateDataEquals(targetTable, "changed_data"); 143 } 144 145 private void createTable(TableName tableName) throws IOException { 146 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 147 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 148 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 149 Admin admin = connection.getAdmin()) { 150 admin.createTable(builder.build()); 151 } 152 } 153 154 private void load(TableName tableName, Instant timestamp, String data) throws IOException { 155 if (useBulkLoad) { 156 hFileBulkLoad(tableName, timestamp, data); 157 } else { 158 putLoad(tableName, timestamp, data); 159 } 160 } 161 162 private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { 163 LOG.info("Writing new data to HBase using normal Puts: {}", data); 164 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { 165 Table table = connection.getTable(sourceTable); 166 List<Put> puts = new ArrayList<>(); 167 for (int i = 0; i < 10; i++) { 168 Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); 169 put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); 170 puts.add(put); 171 172 if (i % 100 == 0) { 173 table.put(puts); 174 puts.clear(); 175 } 176 } 177 if (!puts.isEmpty()) { 178 table.put(puts); 179 } 180 connection.getAdmin().flush(tableName); 181 } 182 } 183 184 private void hFileBulkLoad(TableName tableName, Instant timestamp, String data) 185 throws IOException { 186 FileSystem fs = FileSystem.get(cluster.getConf()); 187 LOG.info("Writing new data to HBase using BulkLoad: {}", data); 188 // HFiles require this strict directory structure to allow to load them 189 Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID()); 190 fs.mkdirs(hFileRootPath); 191 Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY)); 192 fs.mkdirs(hFileFamilyPath); 193 try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf()) 194 .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID())) 195 .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes()) 196 .withColumnFamily(COLUMN_FAMILY).build()) 197 .create()) { 198 for (int i = 0; i < 10; i++) { 199 writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"), 200 timestamp.toEpochMilli(), Bytes.toBytes(data))); 201 } 202 } 203 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = 204 BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath); 205 assertFalse(result.isEmpty()); 206 } 207 208 private String backup(BackupType backupType, List<TableName> tables) throws IOException { 209 LOG.info("Creating the backup ..."); 210 211 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 212 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 213 BackupRequest backupRequest = 214 new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) 215 .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); 216 return backupAdmin.backupTables(backupRequest); 217 } 218 219 } 220 221 private void restore(String backupId, TableName sourceTableName, TableName targetTableName) 222 throws IOException { 223 LOG.info("Restoring data ..."); 224 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 225 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 226 RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId) 227 .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOverwrite(true) 228 .withFromTables(new TableName[] { sourceTableName }) 229 .withToTables(new TableName[] { targetTableName }).build(); 230 backupAdmin.restore(restoreRequest); 231 } 232 } 233 234 private void validateDataEquals(TableName tableName, String expectedData) throws IOException { 235 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 236 Table table = connection.getTable(tableName)) { 237 Scan scan = new Scan(); 238 scan.readAllVersions(); 239 scan.setRaw(true); 240 scan.setBatch(100); 241 242 for (Result sourceResult : table.getScanner(scan)) { 243 List<Cell> sourceCells = sourceResult.listCells(); 244 for (Cell cell : sourceCells) { 245 assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), 246 cell.getValueOffset(), cell.getValueLength())); 247 } 248 } 249 } 250 } 251 252}