001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertTrue; 025 026import java.io.File; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.concurrent.CountDownLatch; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataOutputStream; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellBuilderType; 043import org.apache.hadoop.hbase.ExtendedCellBuilder; 044import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtil; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.KeyValue; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.Admin; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Connection; 053import org.apache.hadoop.hbase.client.ConnectionFactory; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.coprocessor.ObserverContext; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 062import org.apache.hadoop.hbase.coprocessor.RegionObserver; 063import org.apache.hadoop.hbase.io.hfile.HFile; 064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 065import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 066import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 067import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 068import org.apache.hadoop.hbase.replication.TestReplicationBase; 069import org.apache.hadoop.hbase.testclassification.MediumTests; 070import org.apache.hadoop.hbase.testclassification.ReplicationTests; 071import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.apache.hadoop.hbase.util.Pair; 074import org.apache.hadoop.hdfs.MiniDFSCluster; 075import org.junit.Before; 076import org.junit.BeforeClass; 077import org.junit.ClassRule; 078import org.junit.Rule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.junit.rules.TemporaryFolder; 082import org.junit.rules.TestName; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086/** 087 * Integration test for bulk load replication. Defines three clusters, with the following 088 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2 089 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk 090 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given 091 * replication topology all these bulk loads should get replicated only once on each peer. To assert 092 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each 093 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying 094 * we are not entering the infinite loop condition addressed by HBASE-22380. 095 */ 096@Category({ ReplicationTests.class, MediumTests.class }) 097public class TestBulkLoadReplication extends TestReplicationBase { 098 099 @ClassRule 100 public static final HBaseClassTestRule CLASS_RULE = 101 HBaseClassTestRule.forClass(TestBulkLoadReplication.class); 102 103 protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class); 104 105 private static final String PEER1_CLUSTER_ID = "peer1"; 106 private static final String PEER2_CLUSTER_ID = "peer2"; 107 private static final String PEER3_CLUSTER_ID = "peer3"; 108 109 private static final String PEER_ID1 = "1"; 110 private static final String PEER_ID3 = "3"; 111 112 private static AtomicInteger BULK_LOADS_COUNT; 113 private static CountDownLatch BULK_LOAD_LATCH; 114 115 protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil(); 116 protected static final Configuration CONF3 = UTIL3.getConfiguration(); 117 118 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 119 120 private static Table htable3; 121 122 @Rule 123 public TestName name = new TestName(); 124 125 @ClassRule 126 public static TemporaryFolder testFolder = new TemporaryFolder(); 127 128 private static ReplicationQueueStorage queueStorage; 129 130 private static boolean replicationPeersAdded = false; 131 132 @BeforeClass 133 public static void setUpBeforeClass() throws Exception { 134 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 135 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 136 setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); 137 setupConfig(UTIL3, "/3"); 138 TestReplicationBase.setUpBeforeClass(); 139 startThirdCluster(); 140 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(), 141 UTIL1.getConfiguration()); 142 } 143 144 private static void startThirdCluster() throws Exception { 145 LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); 146 UTIL3.setZkCluster(UTIL1.getZkCluster()); 147 UTIL3.startMiniCluster(NUM_SLAVES1); 148 149 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 150 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true) 151 .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 153 154 Connection connection3 = ConnectionFactory.createConnection(CONF3); 155 try (Admin admin3 = connection3.getAdmin()) { 156 admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 157 } 158 UTIL3.waitUntilAllRegionsAssigned(tableName); 159 htable3 = connection3.getTable(tableName); 160 } 161 162 @Before 163 @Override 164 public void setUpBase() throws Exception { 165 // removing the peer and adding again causing the previously completed bulk load jobs getting 166 // submitted again, adding a check to add the peers only once. 167 if (!replicationPeersAdded) { 168 // "super.setUpBase()" already sets replication from 1->2, 169 // then on the subsequent lines, sets 2->1, 2->3 and 3->2. 170 // So we have following topology: "1 <-> 2 <->3" 171 super.setUpBase(); 172 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); 173 ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); 174 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); 175 // adds cluster1 as a remote peer on cluster2 176 UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); 177 // adds cluster3 as a remote peer on cluster2 178 UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); 179 // adds cluster2 as a remote peer on cluster3 180 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); 181 setupCoprocessor(UTIL1); 182 setupCoprocessor(UTIL2); 183 setupCoprocessor(UTIL3); 184 replicationPeersAdded = true; 185 } 186 BULK_LOADS_COUNT = new AtomicInteger(0); 187 } 188 189 private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) 190 throws UnknownHostException { 191 return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI()) 192 .setSerial(isSerialPeer()).build(); 193 } 194 195 private void setupCoprocessor(HBaseTestingUtil cluster) { 196 cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { 197 try { 198 TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost() 199 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 200 if (cp == null) { 201 r.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, 202 cluster.getConfiguration()); 203 cp = r.getCoprocessorHost() 204 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 205 cp.clusterName = cluster.getRpcConnnectionURI(); 206 } 207 } catch (Exception e) { 208 LOG.error(e.getMessage(), e); 209 } 210 }); 211 } 212 213 protected static void setupBulkLoadConfigsForCluster(Configuration config, 214 String clusterReplicationId) throws Exception { 215 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 216 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 217 File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); 218 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml"); 219 config.writeXml(new FileOutputStream(sourceConfigFile)); 220 config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); 221 } 222 223 @Test 224 public void testBulkLoadReplicationActiveActive() throws Exception { 225 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 226 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 227 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 228 byte[] row = Bytes.toBytes("001"); 229 byte[] value = Bytes.toBytes("v1"); 230 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable, 231 peer3TestTable); 232 row = Bytes.toBytes("002"); 233 value = Bytes.toBytes("v2"); 234 assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable, 235 peer3TestTable); 236 row = Bytes.toBytes("003"); 237 value = Bytes.toBytes("v3"); 238 assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable, 239 peer3TestTable); 240 // Additional wait to make sure no extra bulk load happens 241 Thread.sleep(400); 242 // We have 3 bulk load events (1 initiated on each cluster). 243 // Each event gets 3 counts (the originator cluster, plus the two peers), 244 // so BULK_LOADS_COUNT expected value is 3 * 3 = 9. 245 assertEquals(9, BULK_LOADS_COUNT.get()); 246 } 247 248 protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, 249 HBaseTestingUtil utility, Table... tables) throws Exception { 250 BULK_LOAD_LATCH = new CountDownLatch(3); 251 bulkLoadOnCluster(tableName, row, value, utility); 252 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 253 assertTableHasValue(tables[0], row, value); 254 assertTableHasValue(tables[1], row, value); 255 assertTableHasValue(tables[2], row, value); 256 } 257 258 protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value, 259 HBaseTestingUtil cluster) throws Exception { 260 String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); 261 copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); 262 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 263 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 264 } 265 266 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { 267 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f"); 268 cluster.getFileSystem().mkdirs(bulkLoadDir); 269 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 270 } 271 272 protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { 273 Get get = new Get(row); 274 Result result = table.get(get); 275 assertTrue(result.advance()); 276 assertEquals(Bytes.toString(value), Bytes.toString(result.value())); 277 } 278 279 protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception { 280 Get get = new Get(row); 281 Result result = table.get(get); 282 assertTrue(result.isEmpty()); 283 } 284 285 private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) 286 throws IOException { 287 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 288 cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1")) 289 .setValue(value).setType(Cell.Type.Put); 290 291 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 292 // TODO We need a way to do this without creating files 293 File hFileLocation = testFolder.newFile(); 294 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 295 try { 296 hFileFactory.withOutputStream(out); 297 hFileFactory.withFileContext(new HFileContextBuilder().build()); 298 HFile.Writer writer = hFileFactory.create(); 299 try { 300 writer.append(new KeyValue(cellBuilder.build())); 301 } finally { 302 writer.close(); 303 } 304 } finally { 305 out.close(); 306 } 307 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 308 } 309 310 public static class BulkReplicationTestObserver implements RegionCoprocessor { 311 312 String clusterName; 313 AtomicInteger bulkLoadCounts = new AtomicInteger(); 314 315 @Override 316 public Optional<RegionObserver> getRegionObserver() { 317 return Optional.of(new RegionObserver() { 318 319 @Override 320 public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 321 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) 322 throws IOException { 323 BULK_LOAD_LATCH.countDown(); 324 BULK_LOADS_COUNT.incrementAndGet(); 325 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, 326 bulkLoadCounts.addAndGet(1)); 327 } 328 }); 329 } 330 } 331 332 @Test 333 public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception { 334 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 335 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 336 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 337 byte[] row = Bytes.toBytes("004"); 338 byte[] value = Bytes.toBytes("v4"); 339 assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable, 340 peer3TestTable); 341 // additional wait to make sure no extra bulk load happens 342 Thread.sleep(400); 343 assertEquals(1, BULK_LOADS_COUNT.get()); 344 assertEquals(0, queueStorage.getAllHFileRefs().size()); 345 } 346 347 private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value, 348 HBaseTestingUtil utility, Table... tables) throws Exception { 349 BULK_LOAD_LATCH = new CountDownLatch(1); 350 bulkLoadOnClusterForNoRepFamily(row, value, utility); 351 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 352 assertTableHasValue(tables[0], row, value); 353 assertTableNotHasValue(tables[1], row, value); 354 assertTableNotHasValue(tables[2], row, value); 355 } 356 357 private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster) 358 throws Exception { 359 String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration()); 360 Path bulkLoadFilePath = new Path(bulkloadFile); 361 copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster()); 362 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 363 Map<byte[], List<Path>> family2Files = new HashMap<>(); 364 List<Path> files = new ArrayList<>(); 365 files.add(new Path( 366 BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName())); 367 family2Files.put(noRepfamName, files); 368 bulkLoadHFilesTool.bulkLoad(tableName, family2Files); 369 } 370 371 private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig) 372 throws IOException { 373 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 374 cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName) 375 .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put); 376 377 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 378 // TODO We need a way to do this without creating files 379 File hFileLocation = testFolder.newFile(); 380 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 381 try { 382 hFileFactory.withOutputStream(out); 383 hFileFactory.withFileContext(new HFileContextBuilder().build()); 384 HFile.Writer writer = hFileFactory.create(); 385 try { 386 writer.append(new KeyValue(cellBuilder.build())); 387 } finally { 388 writer.close(); 389 } 390 } finally { 391 out.close(); 392 } 393 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 394 } 395 396 private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster) 397 throws Exception { 398 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/"); 399 cluster.getFileSystem().mkdirs(bulkLoadDir); 400 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 401 } 402 403 private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException { 404 Get get = new Get(row); 405 Result result = table.get(get); 406 assertNotEquals(Bytes.toString(value), Bytes.toString(result.value())); 407 } 408}