001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.File; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.util.Arrays; 029import java.util.Date; 030import java.util.List; 031import java.util.Map; 032import java.util.Optional; 033import java.util.Random; 034import java.util.UUID; 035import java.util.concurrent.CountDownLatch; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Executors; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.CellBuilder; 045import org.apache.hadoop.hbase.CellBuilderFactory; 046import org.apache.hadoop.hbase.CellBuilderType; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.KeyValue; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.Admin; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.Connection; 056import org.apache.hadoop.hbase.client.ConnectionFactory; 057import org.apache.hadoop.hbase.client.Get; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.coprocessor.ObserverContext; 063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 065import org.apache.hadoop.hbase.coprocessor.RegionObserver; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.io.hfile.HFile; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 070import org.apache.hadoop.hbase.mob.MobConstants; 071import org.apache.hadoop.hbase.mob.MobFileName; 072import org.apache.hadoop.hbase.mob.MobUtils; 073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; 074import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 075import org.apache.hadoop.hbase.replication.TestReplicationBase; 076import org.apache.hadoop.hbase.testclassification.MediumTests; 077import org.apache.hadoop.hbase.testclassification.ReplicationTests; 078import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hdfs.MiniDFSCluster; 083import org.junit.After; 084import org.junit.Before; 085import org.junit.BeforeClass; 086import org.junit.ClassRule; 087import org.junit.Rule; 088import org.junit.Test; 089import org.junit.experimental.categories.Category; 090import org.junit.rules.TemporaryFolder; 091import org.junit.rules.TestName; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095/** 096 * Integration test for bulk load replication. Defines three clusters, with the following 097 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 098 * 2 and 3). 099 * 100 * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file 101 * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication 102 * topology all these bulk loads should get replicated only once on each peer. To assert this, 103 * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the 104 * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying 105 * we are not entering the infinite loop condition addressed by HBASE-22380. 106 */ 107@Category({ ReplicationTests.class, MediumTests.class}) 108public class TestBulkLoadReplication extends TestReplicationBase { 109 110 @ClassRule 111 public static final HBaseClassTestRule CLASS_RULE = 112 HBaseClassTestRule.forClass(TestBulkLoadReplication.class); 113 114 protected static final Logger LOG = 115 LoggerFactory.getLogger(TestBulkLoadReplication.class); 116 117 private static final String PEER1_CLUSTER_ID = "peer1"; 118 private static final String PEER2_CLUSTER_ID = "peer2"; 119 private static final String PEER3_CLUSTER_ID = "peer3"; 120 121 private static final String PEER_ID1 = "1"; 122 private static final String PEER_ID3 = "3"; 123 124 private static AtomicInteger BULK_LOADS_COUNT; 125 private static CountDownLatch BULK_LOAD_LATCH; 126 127 protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); 128 protected static final Configuration CONF3 = UTIL3.getConfiguration(); 129 130 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 131 132 private static Table htable3; 133 134 @Rule 135 public TestName name = new TestName(); 136 137 @ClassRule 138 public static TemporaryFolder testFolder = new TemporaryFolder(); 139 140 @BeforeClass 141 public static void setUpBeforeClass() throws Exception { 142 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 143 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 144 setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); 145 setupConfig(UTIL3, "/3"); 146 TestReplicationBase.setUpBeforeClass(); 147 startThirdCluster(); 148 } 149 150 private static void startThirdCluster() throws Exception { 151 LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); 152 UTIL3.setZkCluster(UTIL1.getZkCluster()); 153 UTIL3.startMiniCluster(NUM_SLAVES1); 154 155 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 156 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 157 .setMobEnabled(true) 158 .setMobThreshold(4000) 159 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 160 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 161 162 Connection connection3 = ConnectionFactory.createConnection(CONF3); 163 try (Admin admin3 = connection3.getAdmin()) { 164 admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 165 } 166 UTIL3.waitUntilAllRegionsAssigned(tableName); 167 htable3 = connection3.getTable(tableName); 168 } 169 170 @Before 171 @Override 172 public void setUpBase() throws Exception { 173 //"super.setUpBase()" already sets replication from 1->2, 174 //then on the subsequent lines, sets 2->1, 2->3 and 3->2. 175 //So we have following topology: "1 <-> 2 <->3" 176 super.setUpBase(); 177 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); 178 ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); 179 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); 180 //adds cluster1 as a remote peer on cluster2 181 UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); 182 //adds cluster3 as a remote peer on cluster2 183 UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); 184 //adds cluster2 as a remote peer on cluster3 185 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); 186 setupCoprocessor(UTIL1, "cluster1"); 187 setupCoprocessor(UTIL2, "cluster2"); 188 setupCoprocessor(UTIL3, "cluster3"); 189 BULK_LOADS_COUNT = new AtomicInteger(0); 190 } 191 192 private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { 193 return ReplicationPeerConfig.newBuilder() 194 .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build(); 195 } 196 197 private void setupCoprocessor(HBaseTestingUtility cluster, String name){ 198 cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { 199 try { 200 TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost(). 201 findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 202 if(cp == null) { 203 r.getCoprocessorHost(). 204 load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, 205 cluster.getConfiguration()); 206 cp = r.getCoprocessorHost(). 207 findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 208 cp.clusterName = cluster.getClusterKey(); 209 } 210 } catch (Exception e){ 211 LOG.error(e.getMessage(), e); 212 } 213 }); 214 } 215 216 @After 217 @Override 218 public void tearDownBase() throws Exception { 219 super.tearDownBase(); 220 UTIL2.getAdmin().removeReplicationPeer(PEER_ID1); 221 UTIL2.getAdmin().removeReplicationPeer(PEER_ID3); 222 UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); 223 } 224 225 protected static void setupBulkLoadConfigsForCluster(Configuration config, 226 String clusterReplicationId) throws Exception { 227 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 228 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 229 File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); 230 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() 231 + "/hbase-site.xml"); 232 config.writeXml(new FileOutputStream(sourceConfigFile)); 233 config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); 234 } 235 236 @Test 237 public void testBulkLoadReplicationActiveActive() throws Exception { 238 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 239 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 240 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 241 byte[] row = Bytes.toBytes("001"); 242 byte[] value = Bytes.toBytes("v1"); 243 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, 244 peer2TestTable, peer3TestTable); 245 row = Bytes.toBytes("002"); 246 value = Bytes.toBytes("v2"); 247 assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, 248 peer2TestTable, peer3TestTable); 249 row = Bytes.toBytes("003"); 250 value = Bytes.toBytes("v3"); 251 assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, 252 peer2TestTable, peer3TestTable); 253 //Additional wait to make sure no extra bulk load happens 254 Thread.sleep(400); 255 //We have 3 bulk load events (1 initiated on each cluster). 256 //Each event gets 3 counts (the originator cluster, plus the two peers), 257 //so BULK_LOADS_COUNT expected value is 3 * 3 = 9. 258 assertEquals(9, BULK_LOADS_COUNT.get()); 259 } 260 261 @Test 262 public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception { 263 Path path = createMobFiles(UTIL3); 264 ColumnFamilyDescriptor descriptor = 265 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName); 266 ExecutorService pool = null; 267 try { 268 pool = Executors.newFixedThreadPool(1); 269 PartitionedMobCompactor compactor = 270 new PartitionedMobCompactor(UTIL3.getConfiguration(), UTIL3.getTestFileSystem(), tableName, 271 descriptor, pool); 272 BULK_LOAD_LATCH = new CountDownLatch(1); 273 BULK_LOADS_COUNT.set(0); 274 compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true); 275 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS)); 276 Thread.sleep(400); 277 assertEquals(1, BULK_LOADS_COUNT.get()); 278 } finally { 279 if(pool != null && !pool.isTerminated()) { 280 pool.shutdownNow(); 281 } 282 } 283 } 284 285 286 protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, 287 HBaseTestingUtility utility, Table...tables) throws Exception { 288 BULK_LOAD_LATCH = new CountDownLatch(3); 289 bulkLoadOnCluster(tableName, row, value, utility); 290 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 291 assertTableHasValue(tables[0], row, value); 292 assertTableHasValue(tables[1], row, value); 293 assertTableHasValue(tables[2], row, value); 294 } 295 296 protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value, 297 HBaseTestingUtility cluster) throws Exception { 298 String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); 299 copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); 300 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 301 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 302 } 303 304 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { 305 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f"); 306 cluster.getFileSystem().mkdirs(bulkLoadDir); 307 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 308 } 309 310 protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { 311 Get get = new Get(row); 312 Result result = table.get(get); 313 assertTrue(result.advance()); 314 assertEquals(Bytes.toString(value), Bytes.toString(result.value())); 315 } 316 317 protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception { 318 Get get = new Get(row); 319 Result result = table.get(get); 320 assertTrue(result.isEmpty()); 321 } 322 323 private String createHFileForFamilies(byte[] row, byte[] value, 324 Configuration clusterConfig) throws IOException { 325 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); 326 cellBuilder.setRow(row) 327 .setFamily(TestReplicationBase.famName) 328 .setQualifier(Bytes.toBytes("1")) 329 .setValue(value) 330 .setType(Cell.Type.Put); 331 332 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 333 // TODO We need a way to do this without creating files 334 File hFileLocation = testFolder.newFile(); 335 FSDataOutputStream out = 336 new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 337 try { 338 hFileFactory.withOutputStream(out); 339 hFileFactory.withFileContext(new HFileContextBuilder().build()); 340 HFile.Writer writer = hFileFactory.create(); 341 try { 342 writer.append(new KeyValue(cellBuilder.build())); 343 } finally { 344 writer.close(); 345 } 346 } finally { 347 out.close(); 348 } 349 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 350 } 351 352 private Path createMobFiles(HBaseTestingUtility util) throws IOException { 353 Path testDir = CommonFSUtils.getRootDir(util.getConfiguration()); 354 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); 355 Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f"); 356 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 357 MobFileName mobFileName = null; 358 byte[] mobFileStartRow = new byte[32]; 359 for (byte rowKey : Bytes.toBytes("01234")) { 360 mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()), 361 UUID.randomUUID().toString().replaceAll("-", "")); 362 StoreFileWriter mobFileWriter = 363 new StoreFileWriter.Builder(util.getConfiguration(), 364 new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta) 365 .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 366 long now = System.currentTimeMillis(); 367 try { 368 for (int i = 0; i < 10; i++) { 369 byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i)); 370 byte[] dummyData = new byte[5000]; 371 new Random().nextBytes(dummyData); 372 mobFileWriter.append( 373 new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData)); 374 } 375 } finally { 376 mobFileWriter.close(); 377 } 378 } 379 return basePath; 380 } 381 382 public static class BulkReplicationTestObserver implements RegionCoprocessor { 383 384 String clusterName; 385 AtomicInteger bulkLoadCounts = new AtomicInteger(); 386 387 @Override 388 public Optional<RegionObserver> getRegionObserver() { 389 return Optional.of(new RegionObserver() { 390 391 @Override 392 public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, 393 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) 394 throws IOException { 395 BULK_LOAD_LATCH.countDown(); 396 BULK_LOADS_COUNT.incrementAndGet(); 397 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, 398 bulkLoadCounts.addAndGet(1)); 399 } 400 }); 401 } 402 } 403}