001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.JVMClusterUtil; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 055import org.junit.After; 056import org.junit.AfterClass; 057import org.junit.Before; 058import org.junit.BeforeClass; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.AfterEach; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.BeforeEach; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 068import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 069import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 070 071/** 072 * This class is only a base for other integration-level replication tests. Do not add tests here. 073 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All 074 * other tests should have their own classes and extend this one 075 */ 076public class TestReplicationBase { 077 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 078 protected static Connection connection1; 079 protected static Connection connection2; 080 protected static Configuration CONF_WITH_LOCALFS; 081 082 protected static Admin hbaseAdmin; 083 084 protected static Table htable1; 085 protected static Table htable2; 086 087 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 088 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 089 protected static Configuration CONF1 = UTIL1.getConfiguration(); 090 protected static Configuration CONF2 = UTIL2.getConfiguration(); 091 092 protected static int NUM_SLAVES1 = 1; 093 protected static int NUM_SLAVES2 = 1; 094 protected static final int NB_ROWS_IN_BATCH = 100; 095 protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; 096 protected static final long SLEEP_TIME = 500; 097 protected static final int NB_RETRIES = 50; 098 protected static AtomicInteger replicateCount = new AtomicInteger(); 099 protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList(); 100 101 protected static final TableName tableName = TableName.valueOf("test"); 102 protected static final byte[] famName = Bytes.toBytes("f"); 103 protected static final byte[] row = Bytes.toBytes("row"); 104 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 105 protected static final String PEER_ID2 = "2"; 106 107 protected boolean isSerialPeer() { 108 return false; 109 } 110 111 protected boolean isSyncPeer() { 112 return false; 113 } 114 115 protected final void cleanUp() throws IOException, InterruptedException { 116 // Starting and stopping replication can make us miss new logs, 117 // rolling like this makes sure the most recent one gets added to the queue 118 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 119 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 120 } 121 int rowCount = UTIL1.countRows(tableName); 122 UTIL1.deleteTableData(tableName); 123 // truncating the table will send one Delete per row to the slave cluster 124 // in an async fashion, which is why we cannot just call deleteTableData on 125 // utility2 since late writes could make it to the slave in some way. 126 // Instead, we truncate the first table and wait for all the Deletes to 127 // make it to the slave. 128 Scan scan = new Scan(); 129 int lastCount = 0; 130 for (int i = 0; i < NB_RETRIES; i++) { 131 if (i == NB_RETRIES - 1) { 132 fail("Waited too much time for truncate"); 133 } 134 ResultScanner scanner = htable2.getScanner(scan); 135 Result[] res = scanner.next(rowCount); 136 scanner.close(); 137 if (res.length != 0) { 138 if (res.length < lastCount) { 139 i--; // Don't increment timeout if we make progress 140 } 141 lastCount = res.length; 142 LOG.info("Still got " + res.length + " rows"); 143 Thread.sleep(SLEEP_TIME); 144 } else { 145 break; 146 } 147 } 148 } 149 150 protected static void waitForReplication(int expectedRows, int retries) 151 throws IOException, InterruptedException { 152 waitForReplication(htable2, expectedRows, retries); 153 } 154 155 protected static void waitForReplication(Table table, int expectedRows, int retries) 156 throws IOException, InterruptedException { 157 Scan scan; 158 for (int i = 0; i < retries; i++) { 159 scan = new Scan(); 160 if (i == retries - 1) { 161 fail("Waited too much time for normal batch replication"); 162 } 163 int count = 0; 164 try (ResultScanner scanner = table.getScanner(scan)) { 165 while (scanner.next() != null) { 166 count++; 167 } 168 } 169 if (count != expectedRows) { 170 LOG.info("Only got " + count + " rows"); 171 Thread.sleep(SLEEP_TIME); 172 } else { 173 break; 174 } 175 } 176 } 177 178 protected static void loadData(String prefix, byte[] row) throws IOException { 179 loadData(prefix, row, famName); 180 } 181 182 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 183 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 184 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 185 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 186 put.addColumn(familyName, row, row); 187 puts.add(put); 188 } 189 htable1.put(puts); 190 } 191 192 protected static void setupConfig(HBaseTestingUtil util, String znodeParent) { 193 Configuration conf = util.getConfiguration(); 194 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); 195 // Disable directory sharing to prevent race conditions when tests run in parallel. 196 // Each test instance gets its own isolated directories to avoid one test's tearDown() 197 // deleting directories another parallel test is still using. 198 conf.setBoolean("hbase.test.disable-directory-sharing", true); 199 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 200 // sufficient number of events. But we don't want to go too low because 201 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 202 // more than one batch sent to the peer cluster for better testing. 203 conf.setInt("replication.source.size.capacity", 102400); 204 conf.setLong("replication.source.sleepforretries", 100); 205 conf.setInt("hbase.regionserver.maxlogs", 10); 206 conf.setLong("hbase.master.logcleaner.ttl", 10); 207 conf.setInt("zookeeper.recovery.retry", 1); 208 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 209 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 210 conf.setInt("replication.stats.thread.period.seconds", 5); 211 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 212 conf.setLong("replication.sleep.before.failover", 2000); 213 conf.setInt("replication.source.maxretriesmultiplier", 10); 214 conf.setFloat("replication.source.ratio", 1.0f); 215 conf.setBoolean("replication.source.eof.autorecovery", true); 216 conf.setLong("hbase.serial.replication.waiting.ms", 100); 217 } 218 219 static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) { 220 setupConfig(util1, "/1"); 221 setupConfig(util2, "/2"); 222 223 Configuration conf2 = util2.getConfiguration(); 224 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 225 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 226 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 227 } 228 229 protected static void restartSourceCluster(int numSlaves) throws Exception { 230 Closeables.close(hbaseAdmin, true); 231 Closeables.close(htable1, true); 232 UTIL1.shutdownMiniHBaseCluster(); 233 UTIL1.restartHBaseCluster(numSlaves); 234 // Invalidate the cached connection state. 235 CONF1 = UTIL1.getConfiguration(); 236 hbaseAdmin = UTIL1.getAdmin(); 237 Connection connection1 = UTIL1.getConnection(); 238 htable1 = connection1.getTable(tableName); 239 } 240 241 static void restartTargetHBaseCluster(int numSlaves) throws Exception { 242 Closeables.close(htable2, true); 243 UTIL2.restartHBaseCluster(numSlaves); 244 // Invalidate the cached connection state 245 CONF2 = UTIL2.getConfiguration(); 246 htable2 = UTIL2.getConnection().getTable(tableName); 247 } 248 249 protected static void createTable(TableName tableName) throws IOException { 250 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 251 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 252 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 253 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 254 UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 255 UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 256 UTIL1.waitUntilAllRegionsAssigned(tableName); 257 UTIL2.waitUntilAllRegionsAssigned(tableName); 258 } 259 260 private static void startClusters() throws Exception { 261 UTIL1.startMiniZKCluster(); 262 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); 263 LOG.info("Setup first Zk"); 264 265 UTIL2.setZkCluster(miniZK); 266 LOG.info("Setup second Zk"); 267 268 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); 269 UTIL1.startMiniCluster(NUM_SLAVES1); 270 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 271 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 272 UTIL2.startMiniCluster(NUM_SLAVES2); 273 274 connection1 = ConnectionFactory.createConnection(CONF1); 275 connection2 = ConnectionFactory.createConnection(CONF2); 276 hbaseAdmin = connection1.getAdmin(); 277 278 createTable(tableName); 279 htable1 = connection1.getTable(tableName); 280 htable2 = connection2.getTable(tableName); 281 } 282 283 @BeforeAll 284 @BeforeClass 285 public static void setUpBeforeClass() throws Exception { 286 configureClusters(UTIL1, UTIL2); 287 startClusters(); 288 } 289 290 private boolean peerExist(String peerId) throws IOException { 291 return peerExist(peerId, UTIL1); 292 } 293 294 private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException { 295 return util.getAdmin().listReplicationPeers().stream() 296 .anyMatch(p -> peerId.equals(p.getPeerId())); 297 } 298 299 // can be override in tests, in case you need to use zk based uri, or the old style uri 300 protected String getClusterKey(HBaseTestingUtil util) throws Exception { 301 return util.getRpcConnnectionURI(); 302 } 303 304 protected final void addPeer(String peerId, TableName tableName) throws Exception { 305 addPeer(peerId, tableName, UTIL1, UTIL2); 306 } 307 308 protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source, 309 HBaseTestingUtil target) throws Exception { 310 if (peerExist(peerId, source)) { 311 return; 312 } 313 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 314 .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer()) 315 .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); 316 if (isSyncPeer()) { 317 FileSystem fs2 = target.getTestFileSystem(); 318 // The remote wal dir is not important as we do not use it in DA state, here we only need to 319 // confirm that a sync peer in DA state can still replicate data to remote cluster 320 // asynchronously. 321 builder.setReplicateAllUserTables(false) 322 .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())) 323 .setRemoteWALDir(new Path("/RemoteWAL") 324 .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); 325 } 326 source.getAdmin().addReplicationPeer(peerId, builder.build()); 327 } 328 329 @Before 330 @BeforeEach 331 public void setUpBase() throws Exception { 332 addPeer(PEER_ID2, tableName); 333 } 334 335 protected final void removePeer(String peerId) throws Exception { 336 removePeer(peerId, UTIL1); 337 } 338 339 protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception { 340 if (peerExist(peerId, util)) { 341 util.getAdmin().removeReplicationPeer(peerId); 342 } 343 } 344 345 @After 346 @AfterEach 347 public void tearDownBase() throws Exception { 348 removePeer(PEER_ID2); 349 } 350 351 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 352 Put put = new Put(row); 353 put.addColumn(famName, row, row); 354 355 htable1 = UTIL1.getConnection().getTable(tableName); 356 htable1.put(put); 357 358 Get get = new Get(row); 359 for (int i = 0; i < NB_RETRIES; i++) { 360 if (i == NB_RETRIES - 1) { 361 fail("Waited too much time for put replication"); 362 } 363 Result res = htable2.get(get); 364 if (res.isEmpty()) { 365 LOG.info("Row not available"); 366 Thread.sleep(SLEEP_TIME); 367 } else { 368 assertArrayEquals(row, res.value()); 369 break; 370 } 371 } 372 373 Delete del = new Delete(row); 374 htable1.delete(del); 375 376 get = new Get(row); 377 for (int i = 0; i < NB_RETRIES; i++) { 378 if (i == NB_RETRIES - 1) { 379 fail("Waited too much time for del replication"); 380 } 381 Result res = htable2.get(get); 382 if (res.size() >= 1) { 383 LOG.info("Row not deleted"); 384 Thread.sleep(SLEEP_TIME); 385 } else { 386 break; 387 } 388 } 389 } 390 391 protected static void runSmallBatchTest() throws IOException, InterruptedException { 392 // normal Batch tests 393 loadData("", row); 394 395 Scan scan = new Scan(); 396 397 ResultScanner scanner1 = htable1.getScanner(scan); 398 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 399 scanner1.close(); 400 assertEquals(NB_ROWS_IN_BATCH, res1.length); 401 402 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 403 } 404 405 protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException { 406 List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream() 407 .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList()); 408 for (ServerName rs : rses) { 409 util.getMiniHBaseCluster().stopRegionServer(rs); 410 } 411 } 412 413 @AfterAll 414 @AfterClass 415 public static void tearDownAfterClass() throws Exception { 416 if (htable2 != null) { 417 htable2.close(); 418 } 419 if (htable1 != null) { 420 htable1.close(); 421 } 422 if (hbaseAdmin != null) { 423 hbaseAdmin.close(); 424 } 425 426 if (connection2 != null) { 427 connection2.close(); 428 } 429 if (connection1 != null) { 430 connection1.close(); 431 } 432 UTIL2.shutdownMiniCluster(); 433 UTIL1.shutdownMiniCluster(); 434 } 435 436 /** 437 * Custom replication endpoint to keep track of replication status for tests. 438 */ 439 public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint { 440 public ReplicationEndpointTest() { 441 replicateCount.set(0); 442 } 443 444 @Override 445 public boolean replicate(ReplicateContext replicateContext) { 446 replicateCount.incrementAndGet(); 447 replicatedEntries.addAll(replicateContext.getEntries()); 448 449 return super.replicate(replicateContext); 450 } 451 } 452}