001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
023import static org.apache.hadoop.hbase.backup.BackupType.FULL;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.io.IOException;
029import java.nio.ByteBuffer;
030import java.time.Instant;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.UUID;
036import java.util.stream.Stream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.io.hfile.HFile;
056import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
059import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
060import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.jupiter.api.AfterAll;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.BeforeEach;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.TestTemplate;
067import org.junit.jupiter.params.provider.Arguments;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Tag(LargeTests.TAG)
072@HBaseParameterizedTestTemplate(name = "{index}: useBulkLoad={0}")
073public class TestBackupRestoreWithModifications {
074
075  private static final Logger LOG =
076    LoggerFactory.getLogger(TestBackupRestoreWithModifications.class);
077
078  public boolean useBulkLoad;
079
080  public TestBackupRestoreWithModifications(boolean useBulkLoad) {
081    this.useBulkLoad = useBulkLoad;
082  }
083
084  public static Stream<Arguments> parameters() {
085    return Stream.of(Arguments.of(true), Arguments.of(false));
086  }
087
088  private TableName sourceTable;
089  private TableName targetTable;
090
091  private List<TableName> allTables;
092  private static TestingHBaseCluster cluster;
093  private static final Path BACKUP_ROOT_DIR = new Path("backupIT");
094  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
095
096  @BeforeAll
097  public static void beforeClass() throws Exception {
098    Configuration conf = HBaseConfiguration.create();
099    enableBackup(conf);
100    cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
101    cluster.start();
102  }
103
104  @AfterAll
105  public static void afterClass() throws Exception {
106    cluster.stop();
107  }
108
109  @BeforeEach
110  public void setUp() throws Exception {
111    sourceTable = TableName.valueOf("table-" + useBulkLoad);
112    targetTable = TableName.valueOf("another-table-" + useBulkLoad);
113    allTables = Arrays.asList(sourceTable, targetTable);
114    createTable(sourceTable);
115    createTable(targetTable);
116  }
117
118  @TestTemplate
119  public void testModificationsOnTable() throws Exception {
120    Instant timestamp = Instant.now();
121
122    // load some data
123    load(sourceTable, timestamp, "data");
124
125    String backupId = backup(FULL, allTables);
126    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
127    assertTrue(backupInfo.getTables().contains(sourceTable));
128
129    restore(backupId, sourceTable, targetTable);
130    validateDataEquals(sourceTable, "data");
131    validateDataEquals(targetTable, "data");
132
133    // load new data on the same timestamp
134    load(sourceTable, timestamp, "changed_data");
135
136    backupId = backup(FULL, allTables);
137    backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
138    assertTrue(backupInfo.getTables().contains(sourceTable));
139
140    restore(backupId, sourceTable, targetTable);
141    validateDataEquals(sourceTable, "changed_data");
142    validateDataEquals(targetTable, "changed_data");
143  }
144
145  private void createTable(TableName tableName) throws IOException {
146    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
147      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
148    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
149      Admin admin = connection.getAdmin()) {
150      admin.createTable(builder.build());
151    }
152  }
153
154  private void load(TableName tableName, Instant timestamp, String data) throws IOException {
155    if (useBulkLoad) {
156      hFileBulkLoad(tableName, timestamp, data);
157    } else {
158      putLoad(tableName, timestamp, data);
159    }
160  }
161
162  private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
163    LOG.info("Writing new data to HBase using normal Puts: {}", data);
164    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
165      Table table = connection.getTable(sourceTable);
166      List<Put> puts = new ArrayList<>();
167      for (int i = 0; i < 10; i++) {
168        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
169        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
170        puts.add(put);
171
172        if (i % 100 == 0) {
173          table.put(puts);
174          puts.clear();
175        }
176      }
177      if (!puts.isEmpty()) {
178        table.put(puts);
179      }
180      connection.getAdmin().flush(tableName);
181    }
182  }
183
184  private void hFileBulkLoad(TableName tableName, Instant timestamp, String data)
185    throws IOException {
186    FileSystem fs = FileSystem.get(cluster.getConf());
187    LOG.info("Writing new data to HBase using BulkLoad: {}", data);
188    // HFiles require this strict directory structure to allow to load them
189    Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID());
190    fs.mkdirs(hFileRootPath);
191    Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY));
192    fs.mkdirs(hFileFamilyPath);
193    try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf())
194      .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID()))
195      .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes())
196        .withColumnFamily(COLUMN_FAMILY).build())
197      .create()) {
198      for (int i = 0; i < 10; i++) {
199        writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"),
200          timestamp.toEpochMilli(), Bytes.toBytes(data)));
201      }
202    }
203    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
204      BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath);
205    assertFalse(result.isEmpty());
206  }
207
208  private String backup(BackupType backupType, List<TableName> tables) throws IOException {
209    LOG.info("Creating the backup ...");
210
211    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
212      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
213      BackupRequest backupRequest =
214        new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
215          .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
216      return backupAdmin.backupTables(backupRequest);
217    }
218
219  }
220
221  private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
222    throws IOException {
223    LOG.info("Restoring data ...");
224    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
225      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
226      RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
227        .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOverwrite(true)
228        .withFromTables(new TableName[] { sourceTableName })
229        .withToTables(new TableName[] { targetTableName }).build();
230      backupAdmin.restore(restoreRequest);
231    }
232  }
233
234  private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
235    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
236      Table table = connection.getTable(tableName)) {
237      Scan scan = new Scan();
238      scan.readAllVersions();
239      scan.setRaw(true);
240      scan.setBatch(100);
241
242      for (Result sourceResult : table.getScanner(scan)) {
243        List<Cell> sourceCells = sourceResult.listCells();
244        for (Cell cell : sourceCells) {
245          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
246            cell.getValueOffset(), cell.getValueLength()));
247        }
248      }
249    }
250  }
251
252}