001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
023import static org.apache.hadoop.hbase.backup.BackupType.FULL;
024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertTrue;
028
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.time.Instant;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035import java.util.Map;
036import java.util.UUID;
037import java.util.stream.Stream;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.io.hfile.HFile;
057import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
060import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
061import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.junit.jupiter.api.AfterAll;
064import org.junit.jupiter.api.BeforeAll;
065import org.junit.jupiter.api.BeforeEach;
066import org.junit.jupiter.api.Tag;
067import org.junit.jupiter.api.TestTemplate;
068import org.junit.jupiter.params.provider.Arguments;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Tag(LargeTests.TAG)
073@HBaseParameterizedTestTemplate(name = "{index}: useBulkLoad={0}")
074public class TestIncrementalBackupMergeWithBulkLoad {
075
076  private static final Logger LOG =
077    LoggerFactory.getLogger(TestIncrementalBackupMergeWithBulkLoad.class);
078
079  public boolean useBulkLoad;
080
081  public TestIncrementalBackupMergeWithBulkLoad(boolean useBulkLoad) {
082    this.useBulkLoad = useBulkLoad;
083  }
084
085  public static Stream<Arguments> parameters() {
086    return Stream.of(Arguments.of(true), Arguments.of(false));
087  }
088
089  private TableName sourceTable;
090  private TableName targetTable;
091
092  private List<TableName> allTables;
093  private static TestingHBaseCluster cluster;
094  private static final Path BACKUP_ROOT_DIR = new Path("backupIT");
095  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
096
097  @BeforeAll
098  public static void beforeClass() throws Exception {
099    Configuration conf = HBaseConfiguration.create();
100    enableBackup(conf);
101    cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
102    cluster.start();
103  }
104
105  @AfterAll
106  public static void afterClass() throws Exception {
107    cluster.stop();
108  }
109
110  @BeforeEach
111  public void setUp() throws Exception {
112    sourceTable = TableName.valueOf("table-" + useBulkLoad);
113    targetTable = TableName.valueOf("another-table-" + useBulkLoad);
114    allTables = Arrays.asList(sourceTable, targetTable);
115    createTable(sourceTable);
116    createTable(targetTable);
117  }
118
119  @TestTemplate
120  public void testMergeContainingBulkloadedHfiles() throws Exception {
121    Instant timestamp = Instant.now();
122
123    String backupId = backup(FULL, allTables);
124    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
125    assertTrue(backupInfo.getTables().contains(sourceTable));
126
127    // load some data
128    load(sourceTable, timestamp, "data");
129
130    String backupId1 = backup(INCREMENTAL, allTables);
131    backupInfo = verifyBackup(cluster.getConf(), backupId1, INCREMENTAL, COMPLETE);
132    assertTrue(backupInfo.getTables().contains(sourceTable));
133
134    String backupId2 = backup(INCREMENTAL, allTables);
135    backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE);
136    assertTrue(backupInfo.getTables().contains(sourceTable));
137
138    merge(new String[] { backupId1, backupId2 });
139    backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE);
140    assertTrue(backupInfo.getTables().contains(sourceTable));
141    validateDataEquals(sourceTable, "data");
142  }
143
144  private void createTable(TableName tableName) throws IOException {
145    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
146      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
147    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
148      Admin admin = connection.getAdmin()) {
149      admin.createTable(builder.build());
150    }
151  }
152
153  private void load(TableName tableName, Instant timestamp, String data) throws IOException {
154    if (useBulkLoad) {
155      hFileBulkLoad(tableName, timestamp, data);
156    } else {
157      putLoad(tableName, timestamp, data);
158    }
159  }
160
161  private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
162    LOG.info("Writing new data to HBase using normal Puts: {}", data);
163    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
164      Table table = connection.getTable(sourceTable);
165      List<Put> puts = new ArrayList<>();
166      for (int i = 0; i < 10; i++) {
167        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
168        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
169        puts.add(put);
170
171        if (i % 100 == 0) {
172          table.put(puts);
173          puts.clear();
174        }
175      }
176      if (!puts.isEmpty()) {
177        table.put(puts);
178      }
179      connection.getAdmin().flush(tableName);
180    }
181  }
182
183  private void hFileBulkLoad(TableName tableName, Instant timestamp, String data)
184    throws IOException {
185    FileSystem fs = FileSystem.get(cluster.getConf());
186    LOG.info("Writing new data to HBase using BulkLoad: {}", data);
187    // HFiles require this strict directory structure to allow to load them
188    Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID());
189    fs.mkdirs(hFileRootPath);
190    Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY));
191    fs.mkdirs(hFileFamilyPath);
192    try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf())
193      .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID()))
194      .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes())
195        .withColumnFamily(COLUMN_FAMILY).build())
196      .create()) {
197      for (int i = 0; i < 10; i++) {
198        writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"),
199          timestamp.toEpochMilli(), Bytes.toBytes(data)));
200      }
201    }
202    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
203      BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath);
204    assertFalse(result.isEmpty());
205  }
206
207  private String backup(BackupType backupType, List<TableName> tables) throws IOException {
208    LOG.info("Creating the backup ...");
209
210    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
211      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
212      BackupRequest backupRequest =
213        new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
214          .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
215      return backupAdmin.backupTables(backupRequest);
216    }
217  }
218
219  private void merge(String[] backupIds) throws IOException {
220    LOG.info("Merging the backups ...");
221
222    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
223      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
224      backupAdmin.mergeBackups(backupIds);
225    }
226  }
227
228  private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
229    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
230      Table table = connection.getTable(tableName)) {
231      Scan scan = new Scan();
232      scan.readAllVersions();
233      scan.setRaw(true);
234      scan.setBatch(100);
235
236      for (Result sourceResult : table.getScanner(scan)) {
237        List<Cell> sourceCells = sourceResult.listCells();
238        for (Cell cell : sourceCells) {
239          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
240            cell.getValueOffset(), cell.getValueLength()));
241        }
242      }
243    }
244  }
245
246}