001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Map; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 034import org.apache.hadoop.hbase.backup.impl.BulkLoad; 035import org.apache.hadoop.hbase.backup.util.BackupUtils; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.CommonFSUtils; 043import org.apache.hadoop.hbase.util.HFileArchiveUtil; 044import org.apache.hadoop.hbase.util.HFileTestUtil; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047 048/** 049 * This test checks whether backups properly track & manage bulk files loads. 050 */ 051@Tag(LargeTests.TAG) 052public class TestIncrementalBackupWithBulkLoad extends TestBackupBase { 053 054 private static final String TEST_NAME = TestIncrementalBackupWithBulkLoad.class.getSimpleName(); 055 private static final int ROWS_IN_BULK_LOAD = 100; 056 057 // implement all test cases in 1 test since incremental backup/restore has dependencies 058 @Test 059 public void TestIncBackupDeleteTable() throws Exception { 060 try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { 061 // The test starts with some data, and no bulk loaded rows. 062 int expectedRowCount = NB_ROWS_IN_BATCH; 063 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 064 assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty()); 065 066 // Bulk loads aren't tracked if the table isn't backed up yet 067 performBulkLoad("bulk1"); 068 expectedRowCount += ROWS_IN_BULK_LOAD; 069 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 070 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 071 072 // Create a backup, bulk loads are now being tracked 073 String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR); 074 assertTrue(checkSucceeded(backup1)); 075 performBulkLoad("bulk2"); 076 expectedRowCount += ROWS_IN_BULK_LOAD; 077 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 078 assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); 079 080 // Truncating or deleting a table clears the tracked bulk loads (and all rows) 081 TEST_UTIL.truncateTable(table1).close(); 082 expectedRowCount = 0; 083 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 084 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 085 086 // Creating a full backup clears the bulk loads (since they are captured in the snapshot) 087 performBulkLoad("bulk3"); 088 expectedRowCount = ROWS_IN_BULK_LOAD; 089 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 090 assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); 091 String backup2 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR); 092 assertTrue(checkSucceeded(backup2)); 093 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 094 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 095 096 // Creating an incremental backup clears the bulk loads 097 performBulkLoad("bulk4"); 098 performBulkLoad("bulk5"); 099 performBulkLoad("bulk6"); 100 expectedRowCount += 3 * ROWS_IN_BULK_LOAD; 101 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 102 assertEquals(3, systemTable.readBulkloadRows(List.of(table1)).size()); 103 String backup3 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR); 104 assertTrue(checkSucceeded(backup3)); 105 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 106 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 107 int rowCountAfterBackup3 = expectedRowCount; 108 109 // Doing another bulk load, to check that this data will disappear after a restore operation 110 performBulkLoad("bulk7"); 111 expectedRowCount += ROWS_IN_BULK_LOAD; 112 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 113 List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1)); 114 assertEquals(1, bulkloadsTemp.size()); 115 BulkLoad bulk7 = bulkloadsTemp.get(0); 116 117 // Doing a restore. Overwriting the table implies clearing the bulk loads, 118 // but the loading of restored data involves loading bulk data, we expect 2 bulk loads 119 // associated with backup 3 (loading of full backup, loading of incremental backup). 120 BackupAdmin client = getBackupAdmin(); 121 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup3, false, 122 new TableName[] { table1 }, new TableName[] { table1 }, true)); 123 assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1)); 124 List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1)); 125 assertEquals(2, bulkLoads.size()); 126 assertFalse(bulkLoads.contains(bulk7)); 127 128 // Check that we have data of all expected bulk loads 129 try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) { 130 assertFalse(containsRowWithKey(restoredTable, "bulk1")); 131 assertFalse(containsRowWithKey(restoredTable, "bulk2")); 132 assertTrue(containsRowWithKey(restoredTable, "bulk3")); 133 assertTrue(containsRowWithKey(restoredTable, "bulk4")); 134 assertTrue(containsRowWithKey(restoredTable, "bulk5")); 135 assertTrue(containsRowWithKey(restoredTable, "bulk6")); 136 assertFalse(containsRowWithKey(restoredTable, "bulk7")); 137 } 138 } 139 } 140 141 private boolean containsRowWithKey(Table table, String rowKey) throws IOException { 142 byte[] data = Bytes.toBytes(rowKey); 143 Get get = new Get(data); 144 Result result = table.get(get); 145 return result.containsColumn(famName, qualName); 146 } 147 148 @Test 149 public void testUpdateFileListsRaceCondition() throws Exception { 150 try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { 151 // Test the race condition where files are archived during incremental backup 152 FileSystem fs = TEST_UTIL.getTestFileSystem(); 153 154 String regionName = "region1"; 155 String columnFamily = "cf"; 156 String filename1 = "hfile1"; 157 String filename2 = "hfile2"; 158 159 Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); 160 Path tableDir = CommonFSUtils.getTableDir(rootDir, table1); 161 Path activeFile1 = 162 new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1); 163 Path activeFile2 = 164 new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2); 165 166 fs.mkdirs(activeFile1.getParent()); 167 fs.create(activeFile1).close(); 168 fs.create(activeFile2).close(); 169 170 List<String> activeFiles = new ArrayList<>(); 171 activeFiles.add(activeFile1.toString()); 172 activeFiles.add(activeFile2.toString()); 173 List<String> archiveFiles = new ArrayList<>(); 174 175 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1, 176 regionName, columnFamily); 177 Path archivedFile1 = new Path(archiveDir, filename1); 178 fs.mkdirs(archiveDir); 179 assertTrue(fs.rename(activeFile1, archivedFile1), "File should be moved to archive"); 180 181 TestBackupBase.IncrementalTableBackupClientForTest client = 182 new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(), 183 "test_backup_id", 184 createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR)); 185 186 client.updateFileLists(activeFiles, archiveFiles); 187 188 assertEquals(1, activeFiles.size(), "Only one file should remain in active files"); 189 assertEquals(activeFile2.toString(), activeFiles.get(0), 190 "File2 should still be in active files"); 191 assertEquals(1, archiveFiles.size(), "One file should be added to archive files"); 192 assertEquals(archivedFile1.toString(), archiveFiles.get(0), 193 "Archived file should have correct path"); 194 systemTable.finishBackupExclusiveOperation(); 195 } 196 197 } 198 199 @Test 200 public void testUpdateFileListsMissingArchivedFile() throws Exception { 201 try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { 202 // Test that IOException is thrown when file doesn't exist in archive location 203 FileSystem fs = TEST_UTIL.getTestFileSystem(); 204 205 String regionName = "region2"; 206 String columnFamily = "cf"; 207 String filename = "missing_file"; 208 209 Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); 210 Path tableDir = CommonFSUtils.getTableDir(rootDir, table1); 211 Path activeFile = 212 new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename); 213 214 fs.mkdirs(activeFile.getParent()); 215 fs.create(activeFile).close(); 216 217 List<String> activeFiles = new ArrayList<>(); 218 activeFiles.add(activeFile.toString()); 219 List<String> archiveFiles = new ArrayList<>(); 220 221 // Delete the file but don't create it in archive location 222 fs.delete(activeFile, false); 223 224 TestBackupBase.IncrementalTableBackupClientForTest client = 225 new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(), 226 "test_backup_id", 227 createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR)); 228 229 // This should throw IOException since file doesn't exist in archive 230 try { 231 client.updateFileLists(activeFiles, archiveFiles); 232 fail("Expected IOException to be thrown"); 233 } catch (IOException e) { 234 // Expected 235 } 236 systemTable.finishBackupExclusiveOperation(); 237 } 238 } 239 240 private void performBulkLoad(String keyPrefix) throws IOException { 241 FileSystem fs = TEST_UTIL.getTestFileSystem(); 242 Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME); 243 Path hfilePath = 244 new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix); 245 246 HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName, 247 Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD); 248 249 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = 250 BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory); 251 assertFalse(result.isEmpty()); 252 } 253}