001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
023import static org.apache.hadoop.hbase.backup.BackupType.FULL;
024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.io.IOException;
029import java.time.Instant;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.List;
034import java.util.stream.Stream;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
053import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.junit.jupiter.api.AfterAll;
056import org.junit.jupiter.api.AfterEach;
057import org.junit.jupiter.api.BeforeAll;
058import org.junit.jupiter.api.BeforeEach;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.TestTemplate;
061import org.junit.jupiter.params.provider.Arguments;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065@Tag(LargeTests.TAG)
066@HBaseParameterizedTestTemplate(name = "{index}: restoreToOtherTable={0}")
067public class TestBackupRestoreOnEmptyEnvironment {
068
069  private static final Logger LOG =
070    LoggerFactory.getLogger(TestBackupRestoreOnEmptyEnvironment.class);
071
072  public boolean restoreToOtherTable;
073
074  public TestBackupRestoreOnEmptyEnvironment(boolean restoreToOtherTable) {
075    this.restoreToOtherTable = restoreToOtherTable;
076  }
077
078  public static Stream<Arguments> parameters() {
079    return Stream.of(Arguments.of(true), Arguments.of(false));
080  }
081
082  private TableName sourceTable;
083  private TableName targetTable;
084
085  private static TestingHBaseCluster cluster;
086  private static Path BACKUP_ROOT_DIR;
087  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
088
089  @BeforeAll
090  public static void beforeClass() throws Exception {
091    Configuration conf = HBaseConfiguration.create();
092    enableBackup(conf);
093    cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
094    cluster.start();
095    BACKUP_ROOT_DIR = new Path(new Path(conf.get("fs.defaultFS")), new Path("/backupIT"));
096  }
097
098  @AfterAll
099  public static void afterClass() throws Exception {
100    cluster.stop();
101  }
102
103  @BeforeEach
104  public void setUp() throws Exception {
105    sourceTable = TableName.valueOf("table");
106    targetTable = TableName.valueOf("another-table");
107    createTable(sourceTable);
108    createTable(targetTable);
109  }
110
111  @AfterEach
112  public void removeTables() throws Exception {
113    deleteTables();
114  }
115
116  @TestTemplate
117  public void testRestoreToCorrectTable() throws Exception {
118    Instant timestamp = Instant.now().minusSeconds(10);
119
120    // load some data
121    putLoad(sourceTable, timestamp, "data");
122
123    String backupId = backup(FULL, Collections.singletonList(sourceTable));
124    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
125    assertTrue(backupInfo.getTables().contains(sourceTable));
126
127    LOG.info("Deleting the tables before restore ...");
128    deleteTables();
129
130    if (restoreToOtherTable) {
131      restore(backupId, sourceTable, targetTable);
132      validateDataEquals(targetTable, "data");
133    } else {
134      restore(backupId, sourceTable, sourceTable);
135      validateDataEquals(sourceTable, "data");
136    }
137
138  }
139
140  @TestTemplate
141  public void testRestoreCorrectTableForIncremental() throws Exception {
142    Instant timestamp = Instant.now().minusSeconds(10);
143
144    // load some data
145    putLoad(sourceTable, timestamp, "data");
146
147    String backupId = backup(FULL, Collections.singletonList(sourceTable));
148    verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
149
150    // some incremental data
151    putLoad(sourceTable, timestamp.plusMillis(1), "new_data");
152
153    String backupId2 = backup(INCREMENTAL, Collections.singletonList(sourceTable));
154    verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE);
155
156    LOG.info("Deleting the tables before restore ...");
157    deleteTables();
158
159    if (restoreToOtherTable) {
160      restore(backupId2, sourceTable, targetTable);
161      validateDataEquals(targetTable, "new_data");
162    } else {
163      restore(backupId2, sourceTable, sourceTable);
164      validateDataEquals(sourceTable, "new_data");
165    }
166
167  }
168
169  private void createTable(TableName tableName) throws IOException {
170    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
171      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
172    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
173      Admin admin = connection.getAdmin()) {
174      admin.createTable(builder.build());
175    }
176  }
177
178  private void deleteTables() throws IOException {
179    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
180      Admin admin = connection.getAdmin()) {
181      for (TableName table : Arrays.asList(sourceTable, targetTable)) {
182        if (admin.tableExists(table)) {
183          admin.disableTable(table);
184          admin.deleteTable(table);
185        }
186      }
187    }
188  }
189
190  private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
191    LOG.info("Writing new data to HBase using normal Puts: {}", data);
192    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
193      Table table = connection.getTable(sourceTable);
194      List<Put> puts = new ArrayList<>();
195      for (int i = 0; i < 10; i++) {
196        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
197        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
198        puts.add(put);
199
200        if (i % 100 == 0) {
201          table.put(puts);
202          puts.clear();
203        }
204      }
205      if (!puts.isEmpty()) {
206        table.put(puts);
207      }
208      connection.getAdmin().flush(tableName);
209    }
210  }
211
212  private String backup(BackupType backupType, List<TableName> tables) throws IOException {
213    LOG.info("Creating the backup ...");
214
215    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
216      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
217      BackupRequest backupRequest =
218        new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
219          .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
220      return backupAdmin.backupTables(backupRequest);
221    }
222
223  }
224
225  private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
226    throws IOException {
227    LOG.info("Restoring data ...");
228    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
229      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
230      RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
231        .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOverwrite(true)
232        .withFromTables(new TableName[] { sourceTableName })
233        .withToTables(new TableName[] { targetTableName }).build();
234      backupAdmin.restore(restoreRequest);
235    }
236  }
237
238  private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
239    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
240      Table table = connection.getTable(tableName)) {
241      Scan scan = new Scan();
242      scan.setRaw(true);
243      scan.setBatch(100);
244
245      for (Result sourceResult : table.getScanner(scan)) {
246        List<Cell> sourceCells = sourceResult.listCells();
247        for (Cell cell : sourceCells) {
248          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
249            cell.getValueOffset(), cell.getValueLength()));
250        }
251      }
252    }
253  }
254}