001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
034import org.apache.hadoop.hbase.backup.impl.BulkLoad;
035import org.apache.hadoop.hbase.backup.util.BackupUtils;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.CommonFSUtils;
043import org.apache.hadoop.hbase.util.HFileArchiveUtil;
044import org.apache.hadoop.hbase.util.HFileTestUtil;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048/**
049 * This test checks whether backups properly track & manage bulk files loads.
050 */
051@Tag(LargeTests.TAG)
052public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
053
054  private static final String TEST_NAME = TestIncrementalBackupWithBulkLoad.class.getSimpleName();
055  private static final int ROWS_IN_BULK_LOAD = 100;
056
057  // implement all test cases in 1 test since incremental backup/restore has dependencies
058  @Test
059  public void TestIncBackupDeleteTable() throws Exception {
060    try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
061      // The test starts with some data, and no bulk loaded rows.
062      int expectedRowCount = NB_ROWS_IN_BATCH;
063      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
064      assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
065
066      // Bulk loads aren't tracked if the table isn't backed up yet
067      performBulkLoad("bulk1");
068      expectedRowCount += ROWS_IN_BULK_LOAD;
069      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
070      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
071
072      // Create a backup, bulk loads are now being tracked
073      String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR);
074      assertTrue(checkSucceeded(backup1));
075      performBulkLoad("bulk2");
076      expectedRowCount += ROWS_IN_BULK_LOAD;
077      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
078      assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
079
080      // Truncating or deleting a table clears the tracked bulk loads (and all rows)
081      TEST_UTIL.truncateTable(table1).close();
082      expectedRowCount = 0;
083      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
084      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
085
086      // Creating a full backup clears the bulk loads (since they are captured in the snapshot)
087      performBulkLoad("bulk3");
088      expectedRowCount = ROWS_IN_BULK_LOAD;
089      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
090      assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
091      String backup2 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR);
092      assertTrue(checkSucceeded(backup2));
093      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
094      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
095
096      // Creating an incremental backup clears the bulk loads
097      performBulkLoad("bulk4");
098      performBulkLoad("bulk5");
099      performBulkLoad("bulk6");
100      expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
101      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
102      assertEquals(3, systemTable.readBulkloadRows(List.of(table1)).size());
103      String backup3 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR);
104      assertTrue(checkSucceeded(backup3));
105      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
106      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
107      int rowCountAfterBackup3 = expectedRowCount;
108
109      // Doing another bulk load, to check that this data will disappear after a restore operation
110      performBulkLoad("bulk7");
111      expectedRowCount += ROWS_IN_BULK_LOAD;
112      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
113      List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1));
114      assertEquals(1, bulkloadsTemp.size());
115      BulkLoad bulk7 = bulkloadsTemp.get(0);
116
117      // Doing a restore. Overwriting the table implies clearing the bulk loads,
118      // but the loading of restored data involves loading bulk data, we expect 2 bulk loads
119      // associated with backup 3 (loading of full backup, loading of incremental backup).
120      BackupAdmin client = getBackupAdmin();
121      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup3, false,
122        new TableName[] { table1 }, new TableName[] { table1 }, true));
123      assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1));
124      List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
125      assertEquals(2, bulkLoads.size());
126      assertFalse(bulkLoads.contains(bulk7));
127
128      // Check that we have data of all expected bulk loads
129      try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) {
130        assertFalse(containsRowWithKey(restoredTable, "bulk1"));
131        assertFalse(containsRowWithKey(restoredTable, "bulk2"));
132        assertTrue(containsRowWithKey(restoredTable, "bulk3"));
133        assertTrue(containsRowWithKey(restoredTable, "bulk4"));
134        assertTrue(containsRowWithKey(restoredTable, "bulk5"));
135        assertTrue(containsRowWithKey(restoredTable, "bulk6"));
136        assertFalse(containsRowWithKey(restoredTable, "bulk7"));
137      }
138    }
139  }
140
141  private boolean containsRowWithKey(Table table, String rowKey) throws IOException {
142    byte[] data = Bytes.toBytes(rowKey);
143    Get get = new Get(data);
144    Result result = table.get(get);
145    return result.containsColumn(famName, qualName);
146  }
147
148  @Test
149  public void testUpdateFileListsRaceCondition() throws Exception {
150    try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
151      // Test the race condition where files are archived during incremental backup
152      FileSystem fs = TEST_UTIL.getTestFileSystem();
153
154      String regionName = "region1";
155      String columnFamily = "cf";
156      String filename1 = "hfile1";
157      String filename2 = "hfile2";
158
159      Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
160      Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
161      Path activeFile1 =
162        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1);
163      Path activeFile2 =
164        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2);
165
166      fs.mkdirs(activeFile1.getParent());
167      fs.create(activeFile1).close();
168      fs.create(activeFile2).close();
169
170      List<String> activeFiles = new ArrayList<>();
171      activeFiles.add(activeFile1.toString());
172      activeFiles.add(activeFile2.toString());
173      List<String> archiveFiles = new ArrayList<>();
174
175      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
176        regionName, columnFamily);
177      Path archivedFile1 = new Path(archiveDir, filename1);
178      fs.mkdirs(archiveDir);
179      assertTrue(fs.rename(activeFile1, archivedFile1), "File should be moved to archive");
180
181      TestBackupBase.IncrementalTableBackupClientForTest client =
182        new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
183          "test_backup_id",
184          createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));
185
186      client.updateFileLists(activeFiles, archiveFiles);
187
188      assertEquals(1, activeFiles.size(), "Only one file should remain in active files");
189      assertEquals(activeFile2.toString(), activeFiles.get(0),
190        "File2 should still be in active files");
191      assertEquals(1, archiveFiles.size(), "One file should be added to archive files");
192      assertEquals(archivedFile1.toString(), archiveFiles.get(0),
193        "Archived file should have correct path");
194      systemTable.finishBackupExclusiveOperation();
195    }
196
197  }
198
199  @Test
200  public void testUpdateFileListsMissingArchivedFile() throws Exception {
201    try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
202      // Test that IOException is thrown when file doesn't exist in archive location
203      FileSystem fs = TEST_UTIL.getTestFileSystem();
204
205      String regionName = "region2";
206      String columnFamily = "cf";
207      String filename = "missing_file";
208
209      Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
210      Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
211      Path activeFile =
212        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename);
213
214      fs.mkdirs(activeFile.getParent());
215      fs.create(activeFile).close();
216
217      List<String> activeFiles = new ArrayList<>();
218      activeFiles.add(activeFile.toString());
219      List<String> archiveFiles = new ArrayList<>();
220
221      // Delete the file but don't create it in archive location
222      fs.delete(activeFile, false);
223
224      TestBackupBase.IncrementalTableBackupClientForTest client =
225        new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
226          "test_backup_id",
227          createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));
228
229      // This should throw IOException since file doesn't exist in archive
230      try {
231        client.updateFileLists(activeFiles, archiveFiles);
232        fail("Expected IOException to be thrown");
233      } catch (IOException e) {
234        // Expected
235      }
236      systemTable.finishBackupExclusiveOperation();
237    }
238  }
239
240  private void performBulkLoad(String keyPrefix) throws IOException {
241    FileSystem fs = TEST_UTIL.getTestFileSystem();
242    Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);
243    Path hfilePath =
244      new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix);
245
246    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName,
247      Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD);
248
249    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
250      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory);
251    assertFalse(result.isEmpty());
252  }
253}