001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.JVMClusterUtil; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 055import org.junit.jupiter.api.AfterAll; 056import org.junit.jupiter.api.AfterEach; 057import org.junit.jupiter.api.BeforeEach; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 062import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 063import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 064import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 065 066/** 067 * Replication test base class without BeforeAll method, as in some tests we need to do some changes 068 * before starting clusters. 069 * @see TestReplicationBase 070 */ 071public class TestReplicationBaseNoBeforeAll { 072 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBaseNoBeforeAll.class); 073 protected static Connection connection1; 074 protected static Connection connection2; 075 protected static Configuration CONF_WITH_LOCALFS; 076 077 protected static Admin hbaseAdmin; 078 079 protected static Table htable1; 080 protected static Table htable2; 081 082 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 083 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 084 protected static Configuration CONF1 = UTIL1.getConfiguration(); 085 protected static Configuration CONF2 = UTIL2.getConfiguration(); 086 087 protected static int NUM_SLAVES1 = 1; 088 protected static int NUM_SLAVES2 = 1; 089 protected static final int NB_ROWS_IN_BATCH = 100; 090 protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; 091 protected static final long SLEEP_TIME = 500; 092 protected static final int NB_RETRIES = 50; 093 protected static AtomicInteger replicateCount = new AtomicInteger(); 094 protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList(); 095 096 protected static final TableName tableName = TableName.valueOf("test"); 097 protected static final byte[] famName = Bytes.toBytes("f"); 098 protected static final byte[] row = Bytes.toBytes("row"); 099 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 100 protected static final String PEER_ID2 = "2"; 101 102 protected boolean isSerialPeer() { 103 return false; 104 } 105 106 protected boolean isSyncPeer() { 107 return false; 108 } 109 110 protected final void cleanUp() throws IOException, InterruptedException { 111 // Starting and stopping replication can make us miss new logs, 112 // rolling like this makes sure the most recent one gets added to the queue 113 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 114 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 115 } 116 int rowCount = UTIL1.countRows(tableName); 117 UTIL1.deleteTableData(tableName); 118 // truncating the table will send one Delete per row to the slave cluster 119 // in an async fashion, which is why we cannot just call deleteTableData on 120 // utility2 since late writes could make it to the slave in some way. 121 // Instead, we truncate the first table and wait for all the Deletes to 122 // make it to the slave. 123 Scan scan = new Scan(); 124 int lastCount = 0; 125 for (int i = 0; i < NB_RETRIES; i++) { 126 if (i == NB_RETRIES - 1) { 127 fail("Waited too much time for truncate"); 128 } 129 ResultScanner scanner = htable2.getScanner(scan); 130 Result[] res = scanner.next(rowCount); 131 scanner.close(); 132 if (res.length != 0) { 133 if (res.length < lastCount) { 134 i--; // Don't increment timeout if we make progress 135 } 136 lastCount = res.length; 137 LOG.info("Still got " + res.length + " rows"); 138 Thread.sleep(SLEEP_TIME); 139 } else { 140 break; 141 } 142 } 143 } 144 145 protected static void waitForReplication(int expectedRows, int retries) 146 throws IOException, InterruptedException { 147 waitForReplication(htable2, expectedRows, retries); 148 } 149 150 protected static void waitForReplication(Table table, int expectedRows, int retries) 151 throws IOException, InterruptedException { 152 Scan scan; 153 for (int i = 0; i < retries; i++) { 154 scan = new Scan(); 155 if (i == retries - 1) { 156 fail("Waited too much time for normal batch replication"); 157 } 158 int count = 0; 159 try (ResultScanner scanner = table.getScanner(scan)) { 160 while (scanner.next() != null) { 161 count++; 162 } 163 } 164 if (count != expectedRows) { 165 LOG.info("Only got " + count + " rows"); 166 Thread.sleep(SLEEP_TIME); 167 } else { 168 break; 169 } 170 } 171 } 172 173 protected static void loadData(String prefix, byte[] row) throws IOException { 174 loadData(prefix, row, famName); 175 } 176 177 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 178 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 179 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 180 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 181 put.addColumn(familyName, row, row); 182 puts.add(put); 183 } 184 htable1.put(puts); 185 } 186 187 protected static void setupConfig(HBaseTestingUtil util, String znodeParent) { 188 Configuration conf = util.getConfiguration(); 189 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); 190 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 191 // sufficient number of events. But we don't want to go too low because 192 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 193 // more than one batch sent to the peer cluster for better testing. 194 conf.setInt("replication.source.size.capacity", 102400); 195 conf.setLong("replication.source.sleepforretries", 100); 196 conf.setInt("hbase.regionserver.maxlogs", 10); 197 conf.setLong("hbase.master.logcleaner.ttl", 10); 198 conf.setInt("zookeeper.recovery.retry", 1); 199 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 200 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 201 conf.setInt("replication.stats.thread.period.seconds", 5); 202 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 203 conf.setLong("replication.sleep.before.failover", 2000); 204 conf.setInt("replication.source.maxretriesmultiplier", 10); 205 conf.setFloat("replication.source.ratio", 1.0f); 206 conf.setBoolean("replication.source.eof.autorecovery", true); 207 conf.setLong("hbase.serial.replication.waiting.ms", 100); 208 } 209 210 protected static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) { 211 setupConfig(util1, "/1"); 212 setupConfig(util2, "/2"); 213 214 Configuration conf2 = util2.getConfiguration(); 215 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 216 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 217 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 218 } 219 220 protected static void restartSourceCluster(int numSlaves) throws Exception { 221 Closeables.close(hbaseAdmin, true); 222 Closeables.close(htable1, true); 223 Closeables.close(connection1, true); 224 UTIL1.shutdownMiniHBaseCluster(); 225 UTIL1.restartHBaseCluster(numSlaves); 226 // Invalidate the cached connection state. 227 CONF1 = UTIL1.getConfiguration(); 228 connection1 = ConnectionFactory.createConnection(CONF1); 229 hbaseAdmin = connection1.getAdmin(); 230 htable1 = connection1.getTable(tableName); 231 } 232 233 static void restartTargetHBaseCluster(int numSlaves) throws Exception { 234 Closeables.close(htable2, true); 235 Closeables.close(connection2, true); 236 UTIL2.restartHBaseCluster(numSlaves); 237 // Invalidate the cached connection state 238 CONF2 = UTIL2.getConfiguration(); 239 connection2 = ConnectionFactory.createConnection(CONF2); 240 htable2 = connection2.getTable(tableName); 241 } 242 243 protected static void createTable(TableName tableName) throws IOException { 244 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 245 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 246 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 247 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 248 UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 249 UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 250 UTIL1.waitUntilAllRegionsAssigned(tableName); 251 UTIL2.waitUntilAllRegionsAssigned(tableName); 252 } 253 254 protected static void startClusters() throws Exception { 255 UTIL1.startMiniZKCluster(); 256 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); 257 LOG.info("Setup first Zk"); 258 259 UTIL2.setZkCluster(miniZK); 260 LOG.info("Setup second Zk"); 261 262 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); 263 UTIL1.startMiniCluster(NUM_SLAVES1); 264 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 265 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 266 UTIL2.startMiniCluster(NUM_SLAVES2); 267 268 connection1 = ConnectionFactory.createConnection(CONF1); 269 connection2 = ConnectionFactory.createConnection(CONF2); 270 hbaseAdmin = connection1.getAdmin(); 271 272 createTable(tableName); 273 htable1 = connection1.getTable(tableName); 274 htable2 = connection2.getTable(tableName); 275 } 276 277 private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException { 278 return util.getAdmin().listReplicationPeers().stream() 279 .anyMatch(p -> peerId.equals(p.getPeerId())); 280 } 281 282 // can be overridden in tests, in case you need to use zk based uri, or the old style uri 283 protected String getClusterKey(HBaseTestingUtil util) throws Exception { 284 return util.getRpcConnnectionURI(); 285 } 286 287 protected final void addPeer(String peerId, TableName tableName) throws Exception { 288 addPeer(peerId, tableName, UTIL1, UTIL2); 289 } 290 291 protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source, 292 HBaseTestingUtil target) throws Exception { 293 if (peerExist(peerId, source)) { 294 return; 295 } 296 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 297 .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer()) 298 .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); 299 if (isSyncPeer()) { 300 FileSystem fs2 = target.getTestFileSystem(); 301 // The remote wal dir is not important as we do not use it in DA state, here we only need to 302 // confirm that a sync peer in DA state can still replicate data to remote cluster 303 // asynchronously. 304 builder.setReplicateAllUserTables(false) 305 .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())) 306 .setRemoteWALDir(new Path("/RemoteWAL") 307 .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); 308 } 309 source.getAdmin().addReplicationPeer(peerId, builder.build()); 310 } 311 312 @BeforeEach 313 public void setUpBase() throws Exception { 314 addPeer(PEER_ID2, tableName); 315 } 316 317 protected final void removePeer(String peerId) throws Exception { 318 removePeer(peerId, UTIL1); 319 } 320 321 protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception { 322 if (peerExist(peerId, util)) { 323 util.getAdmin().removeReplicationPeer(peerId); 324 } 325 } 326 327 @AfterEach 328 public void tearDownBase() throws Exception { 329 removePeer(PEER_ID2); 330 } 331 332 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 333 Put put = new Put(row); 334 put.addColumn(famName, row, row); 335 336 htable1 = UTIL1.getConnection().getTable(tableName); 337 htable1.put(put); 338 339 Get get = new Get(row); 340 for (int i = 0; i < NB_RETRIES; i++) { 341 if (i == NB_RETRIES - 1) { 342 fail("Waited too much time for put replication"); 343 } 344 Result res = htable2.get(get); 345 if (res.isEmpty()) { 346 LOG.info("Row not available"); 347 Thread.sleep(SLEEP_TIME); 348 } else { 349 assertArrayEquals(row, res.value()); 350 break; 351 } 352 } 353 354 Delete del = new Delete(row); 355 htable1.delete(del); 356 357 get = new Get(row); 358 for (int i = 0; i < NB_RETRIES; i++) { 359 if (i == NB_RETRIES - 1) { 360 fail("Waited too much time for del replication"); 361 } 362 Result res = htable2.get(get); 363 if (res.size() >= 1) { 364 LOG.info("Row not deleted"); 365 Thread.sleep(SLEEP_TIME); 366 } else { 367 break; 368 } 369 } 370 } 371 372 protected static void runSmallBatchTest() throws IOException, InterruptedException { 373 // normal Batch tests 374 loadData("", row); 375 376 Scan scan = new Scan(); 377 378 ResultScanner scanner1 = htable1.getScanner(scan); 379 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 380 scanner1.close(); 381 assertEquals(NB_ROWS_IN_BATCH, res1.length); 382 383 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 384 } 385 386 protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException { 387 List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream() 388 .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList()); 389 for (ServerName rs : rses) { 390 util.getMiniHBaseCluster().stopRegionServer(rs); 391 } 392 } 393 394 @AfterAll 395 public static void tearDownAfterClass() throws Exception { 396 if (htable2 != null) { 397 htable2.close(); 398 } 399 if (htable1 != null) { 400 htable1.close(); 401 } 402 if (hbaseAdmin != null) { 403 hbaseAdmin.close(); 404 } 405 406 if (connection2 != null) { 407 connection2.close(); 408 } 409 if (connection1 != null) { 410 connection1.close(); 411 } 412 UTIL2.shutdownMiniCluster(); 413 UTIL1.shutdownMiniCluster(); 414 } 415 416 /** 417 * Custom replication endpoint to keep track of replication status for tests. 418 */ 419 public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint { 420 public ReplicationEndpointTest() { 421 replicateCount.set(0); 422 } 423 424 @Override 425 public boolean replicate(ReplicateContext replicateContext) { 426 replicateCount.incrementAndGet(); 427 replicatedEntries.addAll(replicateContext.getEntries()); 428 429 return super.replicate(replicateContext); 430 } 431 } 432}