001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotEquals; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.File; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.concurrent.CountDownLatch; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataOutputStream; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellBuilderType; 043import org.apache.hadoop.hbase.ExtendedCellBuilder; 044import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.Admin; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 058import org.apache.hadoop.hbase.coprocessor.ObserverContext; 059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 061import org.apache.hadoop.hbase.coprocessor.RegionObserver; 062import org.apache.hadoop.hbase.io.hfile.HFile; 063import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 064import org.apache.hadoop.hbase.regionserver.HRegion; 065import org.apache.hadoop.hbase.testclassification.LargeTests; 066import org.apache.hadoop.hbase.testclassification.ReplicationTests; 067import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.Pair; 070import org.apache.hadoop.hdfs.MiniDFSCluster; 071import org.junit.jupiter.api.AfterAll; 072import org.junit.jupiter.api.BeforeAll; 073import org.junit.jupiter.api.BeforeEach; 074import org.junit.jupiter.api.Tag; 075import org.junit.jupiter.api.Test; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Integration test for bulk load replication. Defines three clusters, with the following 081 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2 082 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk 083 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given 084 * replication topology all these bulk loads should get replicated only once on each peer. To assert 085 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each 086 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying 087 * we are not entering the infinite loop condition addressed by HBASE-22380. 088 */ 089@Tag(ReplicationTests.TAG) 090@Tag(LargeTests.TAG) 091public class TestBulkLoadReplication extends TestReplicationBaseNoBeforeAll { 092 093 protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class); 094 095 private static final String PEER1_CLUSTER_ID = "peer1"; 096 private static final String PEER2_CLUSTER_ID = "peer2"; 097 private static final String PEER3_CLUSTER_ID = "peer3"; 098 099 private static final String PEER_ID1 = "1"; 100 private static final String PEER_ID3 = "3"; 101 102 private static AtomicInteger BULK_LOADS_COUNT; 103 private static CountDownLatch BULK_LOAD_LATCH; 104 105 protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil(); 106 protected static final Configuration CONF3 = UTIL3.getConfiguration(); 107 108 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 109 110 private static File SOURCE_DIR; 111 112 private static ReplicationQueueStorage queueStorage; 113 114 @BeforeAll 115 public static void setUpBeforeAll() throws Exception { 116 setupConfig(UTIL3, "/3"); 117 configureClusters(UTIL1, UTIL2); 118 SOURCE_DIR = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 119 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 120 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 121 setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); 122 startClusters(); 123 startThirdCluster(); 124 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(), 125 UTIL1.getConfiguration()); 126 setupCoprocessor(UTIL1); 127 setupCoprocessor(UTIL2); 128 setupCoprocessor(UTIL3); 129 130 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); 131 ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); 132 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); 133 // Setup following topology: "1 <-> 2 <-> 3", 1 -> 2 will be added by setUpBase method in parent 134 // class 135 UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); 136 // adds cluster3 as a remote peer on cluster2 137 UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); 138 // adds cluster2 as a remote peer on cluster3 139 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); 140 } 141 142 @AfterAll 143 public static void tearDownAfterAll() throws Exception { 144 UTIL3.shutdownMiniCluster(); 145 } 146 147 private static void startThirdCluster() throws Exception { 148 LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); 149 UTIL3.setZkCluster(UTIL1.getZkCluster()); 150 UTIL3.startMiniCluster(NUM_SLAVES1); 151 152 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 153 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true) 154 .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 155 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 156 157 Connection connection3 = ConnectionFactory.createConnection(CONF3); 158 try (Admin admin3 = connection3.getAdmin()) { 159 admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 160 } 161 UTIL3.waitUntilAllRegionsAssigned(tableName); 162 } 163 164 @BeforeEach 165 public void resetBulkLoadCount() throws Exception { 166 // removing the peer and adding again causing the previously completed bulk load jobs getting 167 // submitted again, so here we override the setUpBase and tearDownBase to not adding/removing 168 // peers between each tests, we will add peers in beforeAll 169 BULK_LOADS_COUNT = new AtomicInteger(0); 170 } 171 172 private static ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) 173 throws UnknownHostException { 174 return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI()) 175 .setSerial(false).build(); 176 } 177 178 private static void setupCoprocessor(HBaseTestingUtil cluster) throws IOException { 179 for (HRegion region : cluster.getHBaseCluster().getRegions(tableName)) { 180 TestBulkLoadReplication.BulkReplicationTestObserver cp = region.getCoprocessorHost() 181 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 182 if (cp == null) { 183 region.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 184 0, cluster.getConfiguration()); 185 cp = region.getCoprocessorHost() 186 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 187 cp.clusterName = cluster.getRpcConnnectionURI(); 188 } 189 } 190 } 191 192 protected static void setupBulkLoadConfigsForCluster(Configuration config, 193 String clusterReplicationId) throws Exception { 194 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 195 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 196 File sourceConfigFolder = new File(SOURCE_DIR, clusterReplicationId); 197 sourceConfigFolder.mkdirs(); 198 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath(), "hbase-site.xml"); 199 try (FileOutputStream out = new FileOutputStream(sourceConfigFile)) { 200 config.writeXml(out); 201 } 202 config.set(REPLICATION_CONF_DIR, SOURCE_DIR.getAbsolutePath()); 203 } 204 205 @Test 206 public void testBulkLoadReplicationActiveActive() throws Exception { 207 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 208 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 209 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 210 byte[] row = Bytes.toBytes("001"); 211 byte[] value = Bytes.toBytes("v1"); 212 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable, 213 peer3TestTable); 214 row = Bytes.toBytes("002"); 215 value = Bytes.toBytes("v2"); 216 assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable, 217 peer3TestTable); 218 row = Bytes.toBytes("003"); 219 value = Bytes.toBytes("v3"); 220 assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable, 221 peer3TestTable); 222 // Additional wait to make sure no extra bulk load happens 223 Thread.sleep(400); 224 // We have 3 bulk load events (1 initiated on each cluster). 225 // Each event gets 3 counts (the originator cluster, plus the two peers), 226 // so BULK_LOADS_COUNT expected value is 3 * 3 = 9. 227 assertEquals(9, BULK_LOADS_COUNT.get()); 228 } 229 230 protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, 231 HBaseTestingUtil utility, Table... tables) throws Exception { 232 BULK_LOAD_LATCH = new CountDownLatch(3); 233 bulkLoadOnCluster(tableName, row, value, utility); 234 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 235 assertTableHasValue(tables[0], row, value); 236 assertTableHasValue(tables[1], row, value); 237 assertTableHasValue(tables[2], row, value); 238 } 239 240 protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value, 241 HBaseTestingUtil cluster) throws Exception { 242 String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); 243 copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); 244 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 245 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 246 } 247 248 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { 249 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f"); 250 cluster.getFileSystem().mkdirs(bulkLoadDir); 251 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 252 } 253 254 protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { 255 Get get = new Get(row); 256 Result result = table.get(get); 257 assertTrue(result.advance()); 258 assertEquals(Bytes.toString(value), Bytes.toString(result.value())); 259 } 260 261 protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception { 262 Get get = new Get(row); 263 Result result = table.get(get); 264 assertTrue(result.isEmpty()); 265 } 266 267 private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) 268 throws IOException { 269 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 270 cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1")) 271 .setValue(value).setType(Cell.Type.Put); 272 273 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 274 // TODO We need a way to do this without creating files 275 File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 276 randomDir.mkdirs(); 277 File hFileLocation = new File(randomDir, "hfile"); 278 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 279 try { 280 hFileFactory.withOutputStream(out); 281 hFileFactory.withFileContext(new HFileContextBuilder().build()); 282 HFile.Writer writer = hFileFactory.create(); 283 try { 284 writer.append(new KeyValue(cellBuilder.build())); 285 } finally { 286 writer.close(); 287 } 288 } finally { 289 out.close(); 290 } 291 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 292 } 293 294 public static class BulkReplicationTestObserver implements RegionCoprocessor { 295 296 String clusterName; 297 AtomicInteger bulkLoadCounts = new AtomicInteger(); 298 299 @Override 300 public Optional<RegionObserver> getRegionObserver() { 301 return Optional.of(new RegionObserver() { 302 303 @Override 304 public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 305 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) 306 throws IOException { 307 BULK_LOAD_LATCH.countDown(); 308 BULK_LOADS_COUNT.incrementAndGet(); 309 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, 310 bulkLoadCounts.addAndGet(1)); 311 } 312 }); 313 } 314 } 315 316 @Test 317 public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception { 318 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 319 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 320 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 321 byte[] row = Bytes.toBytes("004"); 322 byte[] value = Bytes.toBytes("v4"); 323 assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable, 324 peer3TestTable); 325 // additional wait to make sure no extra bulk load happens 326 Thread.sleep(400); 327 assertEquals(1, BULK_LOADS_COUNT.get()); 328 assertEquals(0, queueStorage.getAllHFileRefs().size()); 329 } 330 331 private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value, 332 HBaseTestingUtil utility, Table... tables) throws Exception { 333 BULK_LOAD_LATCH = new CountDownLatch(1); 334 bulkLoadOnClusterForNoRepFamily(row, value, utility); 335 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 336 assertTableHasValue(tables[0], row, value); 337 assertTableNotHasValue(tables[1], row, value); 338 assertTableNotHasValue(tables[2], row, value); 339 } 340 341 private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster) 342 throws Exception { 343 String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration()); 344 Path bulkLoadFilePath = new Path(bulkloadFile); 345 copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster()); 346 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 347 Map<byte[], List<Path>> family2Files = new HashMap<>(); 348 List<Path> files = new ArrayList<>(); 349 files.add(new Path( 350 BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName())); 351 family2Files.put(noRepfamName, files); 352 bulkLoadHFilesTool.bulkLoad(tableName, family2Files); 353 } 354 355 private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig) 356 throws IOException { 357 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 358 cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName) 359 .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put); 360 361 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 362 // TODO We need a way to do this without creating files 363 File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 364 randomDir.mkdirs(); 365 File hFileLocation = new File(randomDir, "hfile"); 366 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 367 try { 368 hFileFactory.withOutputStream(out); 369 hFileFactory.withFileContext(new HFileContextBuilder().build()); 370 HFile.Writer writer = hFileFactory.create(); 371 try { 372 writer.append(new KeyValue(cellBuilder.build())); 373 } finally { 374 writer.close(); 375 } 376 } finally { 377 out.close(); 378 } 379 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 380 } 381 382 private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster) 383 throws Exception { 384 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/"); 385 cluster.getFileSystem().mkdirs(bulkLoadDir); 386 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 387 } 388 389 private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException { 390 Get get = new Get(row); 391 Result result = table.get(get); 392 assertNotEquals(Bytes.toString(value), Bytes.toString(result.value())); 393 } 394}