001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
035import org.apache.hadoop.hbase.backup.impl.BulkLoad;
036import org.apache.hadoop.hbase.backup.util.BackupUtils;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.HFileArchiveUtil;
045import org.apache.hadoop.hbase.util.HFileTestUtil;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050/**
051 * This test checks whether backups properly track & manage bulk files loads.
052 */
053@Category(LargeTests.class)
054public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestIncrementalBackupWithBulkLoad.class);
059
060  private static final String TEST_NAME = TestIncrementalBackupWithBulkLoad.class.getSimpleName();
061  private static final int ROWS_IN_BULK_LOAD = 100;
062
063  // implement all test cases in 1 test since incremental backup/restore has dependencies
064  @Test
065  public void TestIncBackupDeleteTable() throws Exception {
066    try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
067      // The test starts with some data, and no bulk loaded rows.
068      int expectedRowCount = NB_ROWS_IN_BATCH;
069      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
070      assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
071
072      // Bulk loads aren't tracked if the table isn't backed up yet
073      performBulkLoad("bulk1");
074      expectedRowCount += ROWS_IN_BULK_LOAD;
075      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
076      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
077
078      // Create a backup, bulk loads are now being tracked
079      String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR);
080      assertTrue(checkSucceeded(backup1));
081      performBulkLoad("bulk2");
082      expectedRowCount += ROWS_IN_BULK_LOAD;
083      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
084      assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
085
086      // Truncating or deleting a table clears the tracked bulk loads (and all rows)
087      TEST_UTIL.truncateTable(table1).close();
088      expectedRowCount = 0;
089      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
090      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
091
092      // Creating a full backup clears the bulk loads (since they are captured in the snapshot)
093      performBulkLoad("bulk3");
094      expectedRowCount = ROWS_IN_BULK_LOAD;
095      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
096      assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
097      String backup2 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR);
098      assertTrue(checkSucceeded(backup2));
099      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
100      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
101
102      // Creating an incremental backup clears the bulk loads
103      performBulkLoad("bulk4");
104      performBulkLoad("bulk5");
105      performBulkLoad("bulk6");
106      expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
107      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
108      assertEquals(3, systemTable.readBulkloadRows(List.of(table1)).size());
109      String backup3 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR);
110      assertTrue(checkSucceeded(backup3));
111      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
112      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
113      int rowCountAfterBackup3 = expectedRowCount;
114
115      // Doing another bulk load, to check that this data will disappear after a restore operation
116      performBulkLoad("bulk7");
117      expectedRowCount += ROWS_IN_BULK_LOAD;
118      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
119      List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1));
120      assertEquals(1, bulkloadsTemp.size());
121      BulkLoad bulk7 = bulkloadsTemp.get(0);
122
123      // Doing a restore. Overwriting the table implies clearing the bulk loads,
124      // but the loading of restored data involves loading bulk data, we expect 2 bulk loads
125      // associated with backup 3 (loading of full backup, loading of incremental backup).
126      BackupAdmin client = getBackupAdmin();
127      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup3, false,
128        new TableName[] { table1 }, new TableName[] { table1 }, true));
129      assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1));
130      List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
131      assertEquals(2, bulkLoads.size());
132      assertFalse(bulkLoads.contains(bulk7));
133
134      // Check that we have data of all expected bulk loads
135      try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) {
136        assertFalse(containsRowWithKey(restoredTable, "bulk1"));
137        assertFalse(containsRowWithKey(restoredTable, "bulk2"));
138        assertTrue(containsRowWithKey(restoredTable, "bulk3"));
139        assertTrue(containsRowWithKey(restoredTable, "bulk4"));
140        assertTrue(containsRowWithKey(restoredTable, "bulk5"));
141        assertTrue(containsRowWithKey(restoredTable, "bulk6"));
142        assertFalse(containsRowWithKey(restoredTable, "bulk7"));
143      }
144    }
145  }
146
147  private boolean containsRowWithKey(Table table, String rowKey) throws IOException {
148    byte[] data = Bytes.toBytes(rowKey);
149    Get get = new Get(data);
150    Result result = table.get(get);
151    return result.containsColumn(famName, qualName);
152  }
153
154  @Test
155  public void testUpdateFileListsRaceCondition() throws Exception {
156    try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
157      // Test the race condition where files are archived during incremental backup
158      FileSystem fs = TEST_UTIL.getTestFileSystem();
159
160      String regionName = "region1";
161      String columnFamily = "cf";
162      String filename1 = "hfile1";
163      String filename2 = "hfile2";
164
165      Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
166      Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
167      Path activeFile1 =
168        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1);
169      Path activeFile2 =
170        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2);
171
172      fs.mkdirs(activeFile1.getParent());
173      fs.create(activeFile1).close();
174      fs.create(activeFile2).close();
175
176      List<String> activeFiles = new ArrayList<>();
177      activeFiles.add(activeFile1.toString());
178      activeFiles.add(activeFile2.toString());
179      List<String> archiveFiles = new ArrayList<>();
180
181      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
182        regionName, columnFamily);
183      Path archivedFile1 = new Path(archiveDir, filename1);
184      fs.mkdirs(archiveDir);
185      assertTrue("File should be moved to archive", fs.rename(activeFile1, archivedFile1));
186
187      TestBackupBase.IncrementalTableBackupClientForTest client =
188        new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
189          "test_backup_id",
190          createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));
191
192      client.updateFileLists(activeFiles, archiveFiles);
193
194      assertEquals("Only one file should remain in active files", 1, activeFiles.size());
195      assertEquals("File2 should still be in active files", activeFile2.toString(),
196        activeFiles.get(0));
197      assertEquals("One file should be added to archive files", 1, archiveFiles.size());
198      assertEquals("Archived file should have correct path", archivedFile1.toString(),
199        archiveFiles.get(0));
200      systemTable.finishBackupExclusiveOperation();
201    }
202
203  }
204
205  @Test
206  public void testUpdateFileListsMissingArchivedFile() throws Exception {
207    try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
208      // Test that IOException is thrown when file doesn't exist in archive location
209      FileSystem fs = TEST_UTIL.getTestFileSystem();
210
211      String regionName = "region2";
212      String columnFamily = "cf";
213      String filename = "missing_file";
214
215      Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
216      Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
217      Path activeFile =
218        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename);
219
220      fs.mkdirs(activeFile.getParent());
221      fs.create(activeFile).close();
222
223      List<String> activeFiles = new ArrayList<>();
224      activeFiles.add(activeFile.toString());
225      List<String> archiveFiles = new ArrayList<>();
226
227      // Delete the file but don't create it in archive location
228      fs.delete(activeFile, false);
229
230      TestBackupBase.IncrementalTableBackupClientForTest client =
231        new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
232          "test_backup_id",
233          createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));
234
235      // This should throw IOException since file doesn't exist in archive
236      try {
237        client.updateFileLists(activeFiles, archiveFiles);
238        fail("Expected IOException to be thrown");
239      } catch (IOException e) {
240        // Expected
241      }
242      systemTable.finishBackupExclusiveOperation();
243    }
244  }
245
246  private void performBulkLoad(String keyPrefix) throws IOException {
247    FileSystem fs = TEST_UTIL.getTestFileSystem();
248    Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);
249    Path hfilePath =
250      new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix);
251
252    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName,
253      Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD);
254
255    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
256      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory);
257    assertFalse(result.isEmpty());
258  }
259}