001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.File; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.util.List; 030import java.util.Map; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.ExtendedCellBuilder; 037import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.NamespaceDescriptor; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.io.hfile.HFile; 050import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.ReplicationTests; 053import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hdfs.MiniDFSCluster; 057import org.junit.jupiter.api.AfterEach; 058import org.junit.jupiter.api.BeforeAll; 059import org.junit.jupiter.api.BeforeEach; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062 063import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 064import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 065import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 066 067@Tag(ReplicationTests.TAG) 068@Tag(LargeTests.TAG) 069public class TestBulkLoadReplicationHFileRefs extends TestReplicationBaseNoBeforeAll { 070 071 private static final String PEER1_CLUSTER_ID = "peer1"; 072 private static final String PEER2_CLUSTER_ID = "peer2"; 073 074 private static final String REPLICATE_NAMESPACE = "replicate_ns"; 075 private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns"; 076 private static final TableName REPLICATE_TABLE = 077 TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table"); 078 private static final TableName NO_REPLICATE_TABLE = 079 TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table"); 080 private static final byte[] CF_A = Bytes.toBytes("cfa"); 081 private static final byte[] CF_B = Bytes.toBytes("cfb"); 082 083 private byte[] row = Bytes.toBytes("r1"); 084 private byte[] qualifier = Bytes.toBytes("q1"); 085 private byte[] value = Bytes.toBytes("v1"); 086 087 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 088 089 private static Admin admin1; 090 private static Admin admin2; 091 092 private static ReplicationQueueStorage queueStorage; 093 094 private static File sourceDir; 095 096 @BeforeAll 097 public static void setUpBeforeClass() throws Exception { 098 configureClusters(UTIL1, UTIL2); 099 sourceDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 100 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 101 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 102 startClusters(); 103 admin1 = UTIL1.getConnection().getAdmin(); 104 admin2 = UTIL2.getConnection().getAdmin(); 105 106 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(), 107 UTIL1.getConfiguration()); 108 109 admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build()); 110 admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build()); 111 admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build()); 112 admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build()); 113 } 114 115 protected static void setupBulkLoadConfigsForCluster(Configuration config, 116 String clusterReplicationId) throws Exception { 117 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 118 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 119 File sourceConfigFolder = new File(sourceDir, clusterReplicationId); 120 sourceConfigFolder.mkdirs(); 121 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath(), "hbase-site.xml"); 122 config.writeXml(new FileOutputStream(sourceConfigFile)); 123 config.set(REPLICATION_CONF_DIR, sourceDir.getAbsolutePath()); 124 } 125 126 @BeforeEach 127 public void setUp() throws Exception { 128 for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) { 129 admin1.removeReplicationPeer(peer.getPeerId()); 130 } 131 } 132 133 @AfterEach 134 public void teardown() throws Exception { 135 for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) { 136 admin1.removeReplicationPeer(peer.getPeerId()); 137 } 138 for (TableName tableName : admin1.listTableNames()) { 139 UTIL1.deleteTable(tableName); 140 } 141 for (TableName tableName : admin2.listTableNames()) { 142 UTIL2.deleteTable(tableName); 143 } 144 } 145 146 @Test 147 public void testWhenExcludeCF() throws Exception { 148 // Create table in source and remote clusters. 149 createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B); 150 // Add peer, setReplicateAllUserTables true, but exclude CF_B. 151 Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap(); 152 excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B))); 153 ReplicationPeerConfig peerConfig = 154 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 155 .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build(); 156 admin1.addReplicationPeer(PEER_ID2, peerConfig); 157 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); 158 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); 159 assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B)); 160 161 assertEquals(0, queueStorage.getAllHFileRefs().size()); 162 163 // Bulk load data into the CF that is not replicated. 164 bulkLoadOnCluster(REPLICATE_TABLE, CF_B); 165 Threads.sleep(1000); 166 167 // Cannot get data from remote cluster 168 Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE); 169 Result result = table2.get(new Get(row)); 170 assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier))); 171 // The extra HFile is never added to the HFileRefs 172 assertEquals(0, queueStorage.getAllHFileRefs().size()); 173 } 174 175 @Test 176 public void testWhenExcludeTable() throws Exception { 177 // Create 2 tables in source and remote clusters. 178 createTableOnClusters(REPLICATE_TABLE, CF_A); 179 createTableOnClusters(NO_REPLICATE_TABLE, CF_A); 180 181 // Add peer, setReplicateAllUserTables true, but exclude one table. 182 Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap(); 183 excludeTableCFs.put(NO_REPLICATE_TABLE, null); 184 ReplicationPeerConfig peerConfig = 185 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 186 .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build(); 187 admin1.addReplicationPeer(PEER_ID2, peerConfig); 188 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); 189 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE)); 190 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); 191 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A)); 192 193 assertEquals(0, queueStorage.getAllHFileRefs().size()); 194 195 // Bulk load data into the table that is not replicated. 196 bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A); 197 Threads.sleep(1000); 198 199 // Cannot get data from remote cluster 200 Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE); 201 Result result = table2.get(new Get(row)); 202 assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier))); 203 204 // The extra HFile is never added to the HFileRefs 205 assertEquals(0, queueStorage.getAllHFileRefs().size()); 206 } 207 208 @Test 209 public void testWhenExcludeNamespace() throws Exception { 210 // Create 2 tables in source and remote clusters. 211 createTableOnClusters(REPLICATE_TABLE, CF_A); 212 createTableOnClusters(NO_REPLICATE_TABLE, CF_A); 213 214 // Add peer, setReplicateAllUserTables true, but exclude one namespace. 215 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 216 .setClusterKey(UTIL2.getRpcConnnectionURI()).setReplicateAllUserTables(true) 217 .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE)).build(); 218 admin1.addReplicationPeer(PEER_ID2, peerConfig); 219 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); 220 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE)); 221 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); 222 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A)); 223 224 assertEquals(0, queueStorage.getAllHFileRefs().size()); 225 226 // Bulk load data into the table of the namespace that is not replicated. 227 byte[] row = Bytes.toBytes("001"); 228 bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A); 229 Threads.sleep(1000); 230 231 // Cannot get data from remote cluster 232 Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE); 233 Result result = table2.get(new Get(row)); 234 assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier))); 235 236 // The extra HFile is never added to the HFileRefs 237 assertEquals(0, queueStorage.getAllHFileRefs().size()); 238 } 239 240 protected void bulkLoadOnCluster(TableName tableName, byte[] family) throws Exception { 241 String bulkLoadFilePath = createHFileForFamilies(family); 242 copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster()); 243 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration()); 244 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 245 } 246 247 private String createHFileForFamilies(byte[] family) throws IOException { 248 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 249 cellBuilder.setRow(row).setFamily(family).setQualifier(qualifier).setValue(value) 250 .setType(Cell.Type.Put); 251 252 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration()); 253 File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 254 randomDir.mkdirs(); 255 File hFileLocation = new File(randomDir, "hfile"); 256 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 257 try { 258 hFileFactory.withOutputStream(out); 259 hFileFactory.withFileContext(new HFileContextBuilder().build()); 260 HFile.Writer writer = hFileFactory.create(); 261 try { 262 writer.append(new KeyValue(cellBuilder.build())); 263 } finally { 264 writer.close(); 265 } 266 } finally { 267 out.close(); 268 } 269 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 270 } 271 272 private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster) 273 throws Exception { 274 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family)); 275 cluster.getFileSystem().mkdirs(bulkLoadDir); 276 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 277 } 278 279 private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException { 280 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 281 for (byte[] cf : cfs) { 282 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf) 283 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 284 } 285 TableDescriptor td = builder.build(); 286 admin1.createTable(td); 287 admin2.createTable(td); 288 } 289}