001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Map; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 035import org.apache.hadoop.hbase.backup.impl.BulkLoad; 036import org.apache.hadoop.hbase.backup.util.BackupUtils; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.util.HFileArchiveUtil; 045import org.apache.hadoop.hbase.util.HFileTestUtil; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050/** 051 * This test checks whether backups properly track & manage bulk files loads. 052 */ 053@Category(LargeTests.class) 054public class TestIncrementalBackupWithBulkLoad extends TestBackupBase { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestIncrementalBackupWithBulkLoad.class); 059 060 private static final String TEST_NAME = TestIncrementalBackupWithBulkLoad.class.getSimpleName(); 061 private static final int ROWS_IN_BULK_LOAD = 100; 062 063 // implement all test cases in 1 test since incremental backup/restore has dependencies 064 @Test 065 public void TestIncBackupDeleteTable() throws Exception { 066 try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { 067 // The test starts with some data, and no bulk loaded rows. 068 int expectedRowCount = NB_ROWS_IN_BATCH; 069 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 070 assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty()); 071 072 // Bulk loads aren't tracked if the table isn't backed up yet 073 performBulkLoad("bulk1"); 074 expectedRowCount += ROWS_IN_BULK_LOAD; 075 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 076 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 077 078 // Create a backup, bulk loads are now being tracked 079 String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR); 080 assertTrue(checkSucceeded(backup1)); 081 performBulkLoad("bulk2"); 082 expectedRowCount += ROWS_IN_BULK_LOAD; 083 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 084 assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); 085 086 // Truncating or deleting a table clears the tracked bulk loads (and all rows) 087 TEST_UTIL.truncateTable(table1).close(); 088 expectedRowCount = 0; 089 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 090 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 091 092 // Creating a full backup clears the bulk loads (since they are captured in the snapshot) 093 performBulkLoad("bulk3"); 094 expectedRowCount = ROWS_IN_BULK_LOAD; 095 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 096 assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); 097 String backup2 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR); 098 assertTrue(checkSucceeded(backup2)); 099 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 100 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 101 102 // Creating an incremental backup clears the bulk loads 103 performBulkLoad("bulk4"); 104 performBulkLoad("bulk5"); 105 performBulkLoad("bulk6"); 106 expectedRowCount += 3 * ROWS_IN_BULK_LOAD; 107 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 108 assertEquals(3, systemTable.readBulkloadRows(List.of(table1)).size()); 109 String backup3 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR); 110 assertTrue(checkSucceeded(backup3)); 111 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 112 assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); 113 int rowCountAfterBackup3 = expectedRowCount; 114 115 // Doing another bulk load, to check that this data will disappear after a restore operation 116 performBulkLoad("bulk7"); 117 expectedRowCount += ROWS_IN_BULK_LOAD; 118 assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); 119 List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1)); 120 assertEquals(1, bulkloadsTemp.size()); 121 BulkLoad bulk7 = bulkloadsTemp.get(0); 122 123 // Doing a restore. Overwriting the table implies clearing the bulk loads, 124 // but the loading of restored data involves loading bulk data, we expect 2 bulk loads 125 // associated with backup 3 (loading of full backup, loading of incremental backup). 126 BackupAdmin client = getBackupAdmin(); 127 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup3, false, 128 new TableName[] { table1 }, new TableName[] { table1 }, true)); 129 assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1)); 130 List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1)); 131 assertEquals(2, bulkLoads.size()); 132 assertFalse(bulkLoads.contains(bulk7)); 133 134 // Check that we have data of all expected bulk loads 135 try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) { 136 assertFalse(containsRowWithKey(restoredTable, "bulk1")); 137 assertFalse(containsRowWithKey(restoredTable, "bulk2")); 138 assertTrue(containsRowWithKey(restoredTable, "bulk3")); 139 assertTrue(containsRowWithKey(restoredTable, "bulk4")); 140 assertTrue(containsRowWithKey(restoredTable, "bulk5")); 141 assertTrue(containsRowWithKey(restoredTable, "bulk6")); 142 assertFalse(containsRowWithKey(restoredTable, "bulk7")); 143 } 144 } 145 } 146 147 private boolean containsRowWithKey(Table table, String rowKey) throws IOException { 148 byte[] data = Bytes.toBytes(rowKey); 149 Get get = new Get(data); 150 Result result = table.get(get); 151 return result.containsColumn(famName, qualName); 152 } 153 154 @Test 155 public void testUpdateFileListsRaceCondition() throws Exception { 156 try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { 157 // Test the race condition where files are archived during incremental backup 158 FileSystem fs = TEST_UTIL.getTestFileSystem(); 159 160 String regionName = "region1"; 161 String columnFamily = "cf"; 162 String filename1 = "hfile1"; 163 String filename2 = "hfile2"; 164 165 Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); 166 Path tableDir = CommonFSUtils.getTableDir(rootDir, table1); 167 Path activeFile1 = 168 new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1); 169 Path activeFile2 = 170 new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2); 171 172 fs.mkdirs(activeFile1.getParent()); 173 fs.create(activeFile1).close(); 174 fs.create(activeFile2).close(); 175 176 List<String> activeFiles = new ArrayList<>(); 177 activeFiles.add(activeFile1.toString()); 178 activeFiles.add(activeFile2.toString()); 179 List<String> archiveFiles = new ArrayList<>(); 180 181 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1, 182 regionName, columnFamily); 183 Path archivedFile1 = new Path(archiveDir, filename1); 184 fs.mkdirs(archiveDir); 185 assertTrue("File should be moved to archive", fs.rename(activeFile1, archivedFile1)); 186 187 TestBackupBase.IncrementalTableBackupClientForTest client = 188 new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(), 189 "test_backup_id", 190 createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR)); 191 192 client.updateFileLists(activeFiles, archiveFiles); 193 194 assertEquals("Only one file should remain in active files", 1, activeFiles.size()); 195 assertEquals("File2 should still be in active files", activeFile2.toString(), 196 activeFiles.get(0)); 197 assertEquals("One file should be added to archive files", 1, archiveFiles.size()); 198 assertEquals("Archived file should have correct path", archivedFile1.toString(), 199 archiveFiles.get(0)); 200 systemTable.finishBackupExclusiveOperation(); 201 } 202 203 } 204 205 @Test 206 public void testUpdateFileListsMissingArchivedFile() throws Exception { 207 try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { 208 // Test that IOException is thrown when file doesn't exist in archive location 209 FileSystem fs = TEST_UTIL.getTestFileSystem(); 210 211 String regionName = "region2"; 212 String columnFamily = "cf"; 213 String filename = "missing_file"; 214 215 Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); 216 Path tableDir = CommonFSUtils.getTableDir(rootDir, table1); 217 Path activeFile = 218 new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename); 219 220 fs.mkdirs(activeFile.getParent()); 221 fs.create(activeFile).close(); 222 223 List<String> activeFiles = new ArrayList<>(); 224 activeFiles.add(activeFile.toString()); 225 List<String> archiveFiles = new ArrayList<>(); 226 227 // Delete the file but don't create it in archive location 228 fs.delete(activeFile, false); 229 230 TestBackupBase.IncrementalTableBackupClientForTest client = 231 new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(), 232 "test_backup_id", 233 createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR)); 234 235 // This should throw IOException since file doesn't exist in archive 236 try { 237 client.updateFileLists(activeFiles, archiveFiles); 238 fail("Expected IOException to be thrown"); 239 } catch (IOException e) { 240 // Expected 241 } 242 systemTable.finishBackupExclusiveOperation(); 243 } 244 } 245 246 private void performBulkLoad(String keyPrefix) throws IOException { 247 FileSystem fs = TEST_UTIL.getTestFileSystem(); 248 Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME); 249 Path hfilePath = 250 new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix); 251 252 HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName, 253 Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD); 254 255 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = 256 BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory); 257 assertFalse(result.isEmpty()); 258 } 259}