001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotEquals; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.File; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.concurrent.CountDownLatch; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataOutputStream; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellBuilderType; 043import org.apache.hadoop.hbase.ExtendedCellBuilder; 044import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.Admin; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 058import org.apache.hadoop.hbase.coprocessor.ObserverContext; 059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 061import org.apache.hadoop.hbase.coprocessor.RegionObserver; 062import org.apache.hadoop.hbase.io.hfile.HFile; 063import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 064import org.apache.hadoop.hbase.regionserver.HRegion; 065import org.apache.hadoop.hbase.testclassification.LargeTests; 066import org.apache.hadoop.hbase.testclassification.ReplicationTests; 067import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.Pair; 070import org.apache.hadoop.hdfs.MiniDFSCluster; 071import org.junit.jupiter.api.AfterAll; 072import org.junit.jupiter.api.AfterEach; 073import org.junit.jupiter.api.BeforeAll; 074import org.junit.jupiter.api.BeforeEach; 075import org.junit.jupiter.api.Tag; 076import org.junit.jupiter.api.Test; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080/** 081 * Integration test for bulk load replication. Defines three clusters, with the following 082 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2 083 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk 084 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given 085 * replication topology all these bulk loads should get replicated only once on each peer. To assert 086 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each 087 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying 088 * we are not entering the infinite loop condition addressed by HBASE-22380. 089 */ 090@Tag(ReplicationTests.TAG) 091@Tag(LargeTests.TAG) 092public class TestBulkLoadReplication extends TestReplicationBaseNoBeforeAll { 093 094 protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class); 095 096 private static final String PEER1_CLUSTER_ID = "peer1"; 097 private static final String PEER2_CLUSTER_ID = "peer2"; 098 private static final String PEER3_CLUSTER_ID = "peer3"; 099 100 private static final String PEER_ID1 = "1"; 101 private static final String PEER_ID3 = "3"; 102 103 private static AtomicInteger BULK_LOADS_COUNT; 104 private static CountDownLatch BULK_LOAD_LATCH; 105 106 protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil(); 107 protected static final Configuration CONF3 = UTIL3.getConfiguration(); 108 109 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 110 111 private static File SOURCE_DIR; 112 113 private static ReplicationQueueStorage queueStorage; 114 115 @BeforeAll 116 public static void setUpBeforeAll() throws Exception { 117 setupConfig(UTIL3, "/3"); 118 configureClusters(UTIL1, UTIL2); 119 SOURCE_DIR = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 120 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 121 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 122 setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); 123 startClusters(); 124 startThirdCluster(); 125 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(), 126 UTIL1.getConfiguration()); 127 setupCoprocessor(UTIL1); 128 setupCoprocessor(UTIL2); 129 setupCoprocessor(UTIL3); 130 131 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); 132 ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); 133 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); 134 // Set up following topology: "1 <-> 2 <-> 3" 135 UTIL1.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); 136 UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); 137 // adds cluster3 as a remote peer on cluster2 138 UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); 139 // adds cluster2 as a remote peer on cluster3 140 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); 141 } 142 143 @AfterAll 144 public static void tearDownAfterAll() throws Exception { 145 UTIL3.shutdownMiniCluster(); 146 } 147 148 private static void startThirdCluster() throws Exception { 149 LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); 150 UTIL3.setZkCluster(UTIL1.getZkCluster()); 151 UTIL3.startMiniCluster(NUM_SLAVES1); 152 153 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 154 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true) 155 .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 156 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 157 158 Connection connection3 = ConnectionFactory.createConnection(CONF3); 159 try (Admin admin3 = connection3.getAdmin()) { 160 admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 161 } 162 UTIL3.waitUntilAllRegionsAssigned(tableName); 163 } 164 165 @BeforeEach 166 @Override 167 public void setUpBase() throws Exception { 168 // Removing the peer and adding it back causes previously completed bulk-load jobs to be 169 // resubmitted. Override setUpBase/tearDownBase so we do not add/remove peers between tests; 170 // peers are added once in @BeforeAll. 171 BULK_LOADS_COUNT = new AtomicInteger(0); 172 } 173 174 @AfterEach 175 @Override 176 public void tearDownBase() throws Exception { 177 // do not remove PEER_ID2 178 } 179 180 private static ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) 181 throws UnknownHostException { 182 return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI()) 183 .setSerial(false).build(); 184 } 185 186 private static void setupCoprocessor(HBaseTestingUtil cluster) throws IOException { 187 for (HRegion region : cluster.getHBaseCluster().getRegions(tableName)) { 188 TestBulkLoadReplication.BulkReplicationTestObserver cp = region.getCoprocessorHost() 189 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 190 if (cp == null) { 191 region.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 192 0, cluster.getConfiguration()); 193 cp = region.getCoprocessorHost() 194 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 195 cp.clusterName = cluster.getRpcConnnectionURI(); 196 } 197 } 198 } 199 200 protected static void setupBulkLoadConfigsForCluster(Configuration config, 201 String clusterReplicationId) throws Exception { 202 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 203 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 204 File sourceConfigFolder = new File(SOURCE_DIR, clusterReplicationId); 205 sourceConfigFolder.mkdirs(); 206 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath(), "hbase-site.xml"); 207 try (FileOutputStream out = new FileOutputStream(sourceConfigFile)) { 208 config.writeXml(out); 209 } 210 config.set(REPLICATION_CONF_DIR, SOURCE_DIR.getAbsolutePath()); 211 } 212 213 @Test 214 public void testBulkLoadReplicationActiveActive() throws Exception { 215 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 216 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 217 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 218 byte[] row = Bytes.toBytes("001"); 219 byte[] value = Bytes.toBytes("v1"); 220 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable, 221 peer3TestTable); 222 row = Bytes.toBytes("002"); 223 value = Bytes.toBytes("v2"); 224 assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable, 225 peer3TestTable); 226 row = Bytes.toBytes("003"); 227 value = Bytes.toBytes("v3"); 228 assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable, 229 peer3TestTable); 230 // Additional wait to make sure no extra bulk load happens 231 Thread.sleep(400); 232 // We have 3 bulk load events (1 initiated on each cluster). 233 // Each event gets 3 counts (the originator cluster, plus the two peers), 234 // so BULK_LOADS_COUNT expected value is 3 * 3 = 9. 235 assertEquals(9, BULK_LOADS_COUNT.get()); 236 } 237 238 protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, 239 HBaseTestingUtil utility, Table... tables) throws Exception { 240 BULK_LOAD_LATCH = new CountDownLatch(3); 241 bulkLoadOnCluster(tableName, row, value, utility); 242 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 243 assertTableHasValue(tables[0], row, value); 244 assertTableHasValue(tables[1], row, value); 245 assertTableHasValue(tables[2], row, value); 246 } 247 248 protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value, 249 HBaseTestingUtil cluster) throws Exception { 250 String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); 251 copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); 252 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 253 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 254 } 255 256 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { 257 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f"); 258 cluster.getFileSystem().mkdirs(bulkLoadDir); 259 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 260 } 261 262 protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { 263 Get get = new Get(row); 264 Result result = table.get(get); 265 assertTrue(result.advance()); 266 assertEquals(Bytes.toString(value), Bytes.toString(result.value())); 267 } 268 269 protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception { 270 Get get = new Get(row); 271 Result result = table.get(get); 272 assertTrue(result.isEmpty()); 273 } 274 275 private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) 276 throws IOException { 277 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 278 cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1")) 279 .setValue(value).setType(Cell.Type.Put); 280 281 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 282 // TODO We need a way to do this without creating files 283 File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 284 randomDir.mkdirs(); 285 File hFileLocation = new File(randomDir, "hfile"); 286 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 287 try { 288 hFileFactory.withOutputStream(out); 289 hFileFactory.withFileContext(new HFileContextBuilder().build()); 290 HFile.Writer writer = hFileFactory.create(); 291 try { 292 writer.append(new KeyValue(cellBuilder.build())); 293 } finally { 294 writer.close(); 295 } 296 } finally { 297 out.close(); 298 } 299 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 300 } 301 302 public static class BulkReplicationTestObserver implements RegionCoprocessor { 303 304 String clusterName; 305 AtomicInteger bulkLoadCounts = new AtomicInteger(); 306 307 @Override 308 public Optional<RegionObserver> getRegionObserver() { 309 return Optional.of(new RegionObserver() { 310 311 @Override 312 public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 313 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) 314 throws IOException { 315 BULK_LOAD_LATCH.countDown(); 316 BULK_LOADS_COUNT.incrementAndGet(); 317 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, 318 bulkLoadCounts.addAndGet(1)); 319 } 320 }); 321 } 322 } 323 324 @Test 325 public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception { 326 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 327 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 328 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 329 byte[] row = Bytes.toBytes("004"); 330 byte[] value = Bytes.toBytes("v4"); 331 assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable, 332 peer3TestTable); 333 // additional wait to make sure no extra bulk load happens 334 Thread.sleep(400); 335 assertEquals(1, BULK_LOADS_COUNT.get()); 336 assertEquals(0, queueStorage.getAllHFileRefs().size()); 337 } 338 339 private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value, 340 HBaseTestingUtil utility, Table... tables) throws Exception { 341 BULK_LOAD_LATCH = new CountDownLatch(1); 342 bulkLoadOnClusterForNoRepFamily(row, value, utility); 343 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 344 assertTableHasValue(tables[0], row, value); 345 assertTableNotHasValue(tables[1], row, value); 346 assertTableNotHasValue(tables[2], row, value); 347 } 348 349 private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster) 350 throws Exception { 351 String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration()); 352 Path bulkLoadFilePath = new Path(bulkloadFile); 353 copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster()); 354 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 355 Map<byte[], List<Path>> family2Files = new HashMap<>(); 356 List<Path> files = new ArrayList<>(); 357 files.add(new Path( 358 BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName())); 359 family2Files.put(noRepfamName, files); 360 bulkLoadHFilesTool.bulkLoad(tableName, family2Files); 361 } 362 363 private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig) 364 throws IOException { 365 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 366 cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName) 367 .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put); 368 369 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 370 // TODO We need a way to do this without creating files 371 File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile(); 372 randomDir.mkdirs(); 373 File hFileLocation = new File(randomDir, "hfile"); 374 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 375 try { 376 hFileFactory.withOutputStream(out); 377 hFileFactory.withFileContext(new HFileContextBuilder().build()); 378 HFile.Writer writer = hFileFactory.create(); 379 try { 380 writer.append(new KeyValue(cellBuilder.build())); 381 } finally { 382 writer.close(); 383 } 384 } finally { 385 out.close(); 386 } 387 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 388 } 389 390 private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster) 391 throws Exception { 392 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/"); 393 cluster.getFileSystem().mkdirs(bulkLoadDir); 394 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 395 } 396 397 private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException { 398 Get get = new Get(row); 399 Result result = table.get(get); 400 assertNotEquals(Bytes.toString(value), Bytes.toString(result.value())); 401 } 402}