001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; 021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; 022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; 023import static org.apache.hadoop.hbase.backup.BackupType.FULL; 024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertTrue; 028 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.time.Instant; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import java.util.Map; 036import java.util.UUID; 037import java.util.stream.Stream; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.io.hfile.HFile; 057import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.testing.TestingHBaseCluster; 060import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; 061import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.junit.jupiter.api.AfterAll; 064import org.junit.jupiter.api.BeforeAll; 065import org.junit.jupiter.api.BeforeEach; 066import org.junit.jupiter.api.Tag; 067import org.junit.jupiter.api.TestTemplate; 068import org.junit.jupiter.params.provider.Arguments; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Tag(LargeTests.TAG) 073@HBaseParameterizedTestTemplate(name = "{index}: useBulkLoad={0}") 074public class TestIncrementalBackupMergeWithBulkLoad { 075 076 private static final Logger LOG = 077 LoggerFactory.getLogger(TestIncrementalBackupMergeWithBulkLoad.class); 078 079 public boolean useBulkLoad; 080 081 public TestIncrementalBackupMergeWithBulkLoad(boolean useBulkLoad) { 082 this.useBulkLoad = useBulkLoad; 083 } 084 085 public static Stream<Arguments> parameters() { 086 return Stream.of(Arguments.of(true), Arguments.of(false)); 087 } 088 089 private TableName sourceTable; 090 private TableName targetTable; 091 092 private List<TableName> allTables; 093 private static TestingHBaseCluster cluster; 094 private static final Path BACKUP_ROOT_DIR = new Path("backupIT"); 095 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); 096 097 @BeforeAll 098 public static void beforeClass() throws Exception { 099 Configuration conf = HBaseConfiguration.create(); 100 enableBackup(conf); 101 cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); 102 cluster.start(); 103 } 104 105 @AfterAll 106 public static void afterClass() throws Exception { 107 cluster.stop(); 108 } 109 110 @BeforeEach 111 public void setUp() throws Exception { 112 sourceTable = TableName.valueOf("table-" + useBulkLoad); 113 targetTable = TableName.valueOf("another-table-" + useBulkLoad); 114 allTables = Arrays.asList(sourceTable, targetTable); 115 createTable(sourceTable); 116 createTable(targetTable); 117 } 118 119 @TestTemplate 120 public void testMergeContainingBulkloadedHfiles() throws Exception { 121 Instant timestamp = Instant.now(); 122 123 String backupId = backup(FULL, allTables); 124 BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 125 assertTrue(backupInfo.getTables().contains(sourceTable)); 126 127 // load some data 128 load(sourceTable, timestamp, "data"); 129 130 String backupId1 = backup(INCREMENTAL, allTables); 131 backupInfo = verifyBackup(cluster.getConf(), backupId1, INCREMENTAL, COMPLETE); 132 assertTrue(backupInfo.getTables().contains(sourceTable)); 133 134 String backupId2 = backup(INCREMENTAL, allTables); 135 backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); 136 assertTrue(backupInfo.getTables().contains(sourceTable)); 137 138 merge(new String[] { backupId1, backupId2 }); 139 backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); 140 assertTrue(backupInfo.getTables().contains(sourceTable)); 141 validateDataEquals(sourceTable, "data"); 142 } 143 144 private void createTable(TableName tableName) throws IOException { 145 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 146 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 147 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 148 Admin admin = connection.getAdmin()) { 149 admin.createTable(builder.build()); 150 } 151 } 152 153 private void load(TableName tableName, Instant timestamp, String data) throws IOException { 154 if (useBulkLoad) { 155 hFileBulkLoad(tableName, timestamp, data); 156 } else { 157 putLoad(tableName, timestamp, data); 158 } 159 } 160 161 private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { 162 LOG.info("Writing new data to HBase using normal Puts: {}", data); 163 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { 164 Table table = connection.getTable(sourceTable); 165 List<Put> puts = new ArrayList<>(); 166 for (int i = 0; i < 10; i++) { 167 Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); 168 put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); 169 puts.add(put); 170 171 if (i % 100 == 0) { 172 table.put(puts); 173 puts.clear(); 174 } 175 } 176 if (!puts.isEmpty()) { 177 table.put(puts); 178 } 179 connection.getAdmin().flush(tableName); 180 } 181 } 182 183 private void hFileBulkLoad(TableName tableName, Instant timestamp, String data) 184 throws IOException { 185 FileSystem fs = FileSystem.get(cluster.getConf()); 186 LOG.info("Writing new data to HBase using BulkLoad: {}", data); 187 // HFiles require this strict directory structure to allow to load them 188 Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID()); 189 fs.mkdirs(hFileRootPath); 190 Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY)); 191 fs.mkdirs(hFileFamilyPath); 192 try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf()) 193 .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID())) 194 .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes()) 195 .withColumnFamily(COLUMN_FAMILY).build()) 196 .create()) { 197 for (int i = 0; i < 10; i++) { 198 writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"), 199 timestamp.toEpochMilli(), Bytes.toBytes(data))); 200 } 201 } 202 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = 203 BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath); 204 assertFalse(result.isEmpty()); 205 } 206 207 private String backup(BackupType backupType, List<TableName> tables) throws IOException { 208 LOG.info("Creating the backup ..."); 209 210 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 211 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 212 BackupRequest backupRequest = 213 new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) 214 .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); 215 return backupAdmin.backupTables(backupRequest); 216 } 217 } 218 219 private void merge(String[] backupIds) throws IOException { 220 LOG.info("Merging the backups ..."); 221 222 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 223 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 224 backupAdmin.mergeBackups(backupIds); 225 } 226 } 227 228 private void validateDataEquals(TableName tableName, String expectedData) throws IOException { 229 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 230 Table table = connection.getTable(tableName)) { 231 Scan scan = new Scan(); 232 scan.readAllVersions(); 233 scan.setRaw(true); 234 scan.setBatch(100); 235 236 for (Result sourceResult : table.getScanner(scan)) { 237 List<Cell> sourceCells = sourceResult.listCells(); 238 for (Cell cell : sourceCells) { 239 assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), 240 cell.getValueOffset(), cell.getValueLength())); 241 } 242 } 243 } 244 } 245 246}