001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.JVMClusterUtil; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 053import org.junit.After; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 062import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064 065/** 066 * This class is only a base for other integration-level replication tests. Do not add tests here. 067 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All 068 * other tests should have their own classes and extend this one 069 */ 070public class TestReplicationBase { 071 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 072 protected static Connection connection1; 073 protected static Connection connection2; 074 protected static Configuration CONF_WITH_LOCALFS; 075 076 protected static Admin hbaseAdmin; 077 078 protected static Table htable1; 079 protected static Table htable2; 080 081 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 082 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 083 protected static Configuration CONF1 = UTIL1.getConfiguration(); 084 protected static Configuration CONF2 = UTIL2.getConfiguration(); 085 086 protected static int NUM_SLAVES1 = 1; 087 protected static int NUM_SLAVES2 = 1; 088 protected static final int NB_ROWS_IN_BATCH = 100; 089 protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; 090 protected static final long SLEEP_TIME = 500; 091 protected static final int NB_RETRIES = 50; 092 protected static AtomicInteger replicateCount = new AtomicInteger(); 093 protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList(); 094 095 protected static final TableName tableName = TableName.valueOf("test"); 096 protected static final byte[] famName = Bytes.toBytes("f"); 097 protected static final byte[] row = Bytes.toBytes("row"); 098 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 099 protected static final String PEER_ID2 = "2"; 100 101 protected boolean isSerialPeer() { 102 return false; 103 } 104 105 protected boolean isSyncPeer() { 106 return false; 107 } 108 109 protected final void cleanUp() throws IOException, InterruptedException { 110 // Starting and stopping replication can make us miss new logs, 111 // rolling like this makes sure the most recent one gets added to the queue 112 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 113 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 114 } 115 int rowCount = UTIL1.countRows(tableName); 116 UTIL1.deleteTableData(tableName); 117 // truncating the table will send one Delete per row to the slave cluster 118 // in an async fashion, which is why we cannot just call deleteTableData on 119 // utility2 since late writes could make it to the slave in some way. 120 // Instead, we truncate the first table and wait for all the Deletes to 121 // make it to the slave. 122 Scan scan = new Scan(); 123 int lastCount = 0; 124 for (int i = 0; i < NB_RETRIES; i++) { 125 if (i == NB_RETRIES - 1) { 126 fail("Waited too much time for truncate"); 127 } 128 ResultScanner scanner = htable2.getScanner(scan); 129 Result[] res = scanner.next(rowCount); 130 scanner.close(); 131 if (res.length != 0) { 132 if (res.length < lastCount) { 133 i--; // Don't increment timeout if we make progress 134 } 135 lastCount = res.length; 136 LOG.info("Still got " + res.length + " rows"); 137 Thread.sleep(SLEEP_TIME); 138 } else { 139 break; 140 } 141 } 142 } 143 144 protected static void waitForReplication(int expectedRows, int retries) 145 throws IOException, InterruptedException { 146 waitForReplication(htable2, expectedRows, retries); 147 } 148 149 protected static void waitForReplication(Table table, int expectedRows, int retries) 150 throws IOException, InterruptedException { 151 Scan scan; 152 for (int i = 0; i < retries; i++) { 153 scan = new Scan(); 154 if (i == retries - 1) { 155 fail("Waited too much time for normal batch replication"); 156 } 157 int count = 0; 158 try (ResultScanner scanner = table.getScanner(scan)) { 159 while (scanner.next() != null) { 160 count++; 161 } 162 } 163 if (count != expectedRows) { 164 LOG.info("Only got " + count + " rows"); 165 Thread.sleep(SLEEP_TIME); 166 } else { 167 break; 168 } 169 } 170 } 171 172 protected static void loadData(String prefix, byte[] row) throws IOException { 173 loadData(prefix, row, famName); 174 } 175 176 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 177 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 178 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 179 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 180 put.addColumn(familyName, row, row); 181 puts.add(put); 182 } 183 htable1.put(puts); 184 } 185 186 protected static void setupConfig(HBaseTestingUtil util, String znodeParent) { 187 Configuration conf = util.getConfiguration(); 188 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); 189 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 190 // sufficient number of events. But we don't want to go too low because 191 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 192 // more than one batch sent to the peer cluster for better testing. 193 conf.setInt("replication.source.size.capacity", 102400); 194 conf.setLong("replication.source.sleepforretries", 100); 195 conf.setInt("hbase.regionserver.maxlogs", 10); 196 conf.setLong("hbase.master.logcleaner.ttl", 10); 197 conf.setInt("zookeeper.recovery.retry", 1); 198 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 199 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 200 conf.setInt("replication.stats.thread.period.seconds", 5); 201 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 202 conf.setLong("replication.sleep.before.failover", 2000); 203 conf.setInt("replication.source.maxretriesmultiplier", 10); 204 conf.setFloat("replication.source.ratio", 1.0f); 205 conf.setBoolean("replication.source.eof.autorecovery", true); 206 conf.setLong("hbase.serial.replication.waiting.ms", 100); 207 } 208 209 static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) { 210 setupConfig(util1, "/1"); 211 setupConfig(util2, "/2"); 212 213 Configuration conf2 = util2.getConfiguration(); 214 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 215 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 216 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 217 } 218 219 protected static void restartSourceCluster(int numSlaves) throws Exception { 220 Closeables.close(hbaseAdmin, true); 221 Closeables.close(htable1, true); 222 UTIL1.shutdownMiniHBaseCluster(); 223 UTIL1.restartHBaseCluster(numSlaves); 224 // Invalidate the cached connection state. 225 CONF1 = UTIL1.getConfiguration(); 226 hbaseAdmin = UTIL1.getAdmin(); 227 Connection connection1 = UTIL1.getConnection(); 228 htable1 = connection1.getTable(tableName); 229 } 230 231 static void restartTargetHBaseCluster(int numSlaves) throws Exception { 232 Closeables.close(htable2, true); 233 UTIL2.restartHBaseCluster(numSlaves); 234 // Invalidate the cached connection state 235 CONF2 = UTIL2.getConfiguration(); 236 htable2 = UTIL2.getConnection().getTable(tableName); 237 } 238 239 protected static void createTable(TableName tableName) throws IOException { 240 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 241 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 242 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 243 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 244 UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 245 UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 246 UTIL1.waitUntilAllRegionsAssigned(tableName); 247 UTIL2.waitUntilAllRegionsAssigned(tableName); 248 } 249 250 private static void startClusters() throws Exception { 251 UTIL1.startMiniZKCluster(); 252 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); 253 LOG.info("Setup first Zk"); 254 255 UTIL2.setZkCluster(miniZK); 256 LOG.info("Setup second Zk"); 257 258 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); 259 UTIL1.startMiniCluster(NUM_SLAVES1); 260 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 261 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 262 UTIL2.startMiniCluster(NUM_SLAVES2); 263 264 connection1 = ConnectionFactory.createConnection(CONF1); 265 connection2 = ConnectionFactory.createConnection(CONF2); 266 hbaseAdmin = connection1.getAdmin(); 267 268 createTable(tableName); 269 htable1 = connection1.getTable(tableName); 270 htable2 = connection2.getTable(tableName); 271 } 272 273 @BeforeClass 274 public static void setUpBeforeClass() throws Exception { 275 configureClusters(UTIL1, UTIL2); 276 startClusters(); 277 } 278 279 private boolean peerExist(String peerId) throws IOException { 280 return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); 281 } 282 283 protected final void addPeer(String peerId, TableName tableName) throws Exception { 284 if (!peerExist(peerId)) { 285 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 286 .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()) 287 .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); 288 if (isSyncPeer()) { 289 FileSystem fs2 = UTIL2.getTestFileSystem(); 290 // The remote wal dir is not important as we do not use it in DA state, here we only need to 291 // confirm that a sync peer in DA state can still replicate data to remote cluster 292 // asynchronously. 293 builder.setReplicateAllUserTables(false) 294 .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())) 295 .setRemoteWALDir(new Path("/RemoteWAL") 296 .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); 297 } 298 hbaseAdmin.addReplicationPeer(peerId, builder.build()); 299 } 300 } 301 302 @Before 303 public void setUpBase() throws Exception { 304 addPeer(PEER_ID2, tableName); 305 } 306 307 protected final void removePeer(String peerId) throws Exception { 308 if (peerExist(peerId)) { 309 hbaseAdmin.removeReplicationPeer(peerId); 310 } 311 } 312 313 @After 314 public void tearDownBase() throws Exception { 315 removePeer(PEER_ID2); 316 } 317 318 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 319 Put put = new Put(row); 320 put.addColumn(famName, row, row); 321 322 htable1 = UTIL1.getConnection().getTable(tableName); 323 htable1.put(put); 324 325 Get get = new Get(row); 326 for (int i = 0; i < NB_RETRIES; i++) { 327 if (i == NB_RETRIES - 1) { 328 fail("Waited too much time for put replication"); 329 } 330 Result res = htable2.get(get); 331 if (res.isEmpty()) { 332 LOG.info("Row not available"); 333 Thread.sleep(SLEEP_TIME); 334 } else { 335 assertArrayEquals(row, res.value()); 336 break; 337 } 338 } 339 340 Delete del = new Delete(row); 341 htable1.delete(del); 342 343 get = new Get(row); 344 for (int i = 0; i < NB_RETRIES; i++) { 345 if (i == NB_RETRIES - 1) { 346 fail("Waited too much time for del replication"); 347 } 348 Result res = htable2.get(get); 349 if (res.size() >= 1) { 350 LOG.info("Row not deleted"); 351 Thread.sleep(SLEEP_TIME); 352 } else { 353 break; 354 } 355 } 356 } 357 358 protected static void runSmallBatchTest() throws IOException, InterruptedException { 359 // normal Batch tests 360 loadData("", row); 361 362 Scan scan = new Scan(); 363 364 ResultScanner scanner1 = htable1.getScanner(scan); 365 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 366 scanner1.close(); 367 assertEquals(NB_ROWS_IN_BATCH, res1.length); 368 369 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 370 } 371 372 @AfterClass 373 public static void tearDownAfterClass() throws Exception { 374 if (htable2 != null) { 375 htable2.close(); 376 } 377 if (htable1 != null) { 378 htable1.close(); 379 } 380 if (hbaseAdmin != null) { 381 hbaseAdmin.close(); 382 } 383 384 if (connection2 != null) { 385 connection2.close(); 386 } 387 if (connection1 != null) { 388 connection1.close(); 389 } 390 UTIL2.shutdownMiniCluster(); 391 UTIL1.shutdownMiniCluster(); 392 } 393 394 /** 395 * Custom replication endpoint to keep track of replication status for tests. 396 */ 397 public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint { 398 public ReplicationEndpointTest() { 399 replicateCount.set(0); 400 } 401 402 @Override 403 public boolean replicate(ReplicateContext replicateContext) { 404 replicateCount.incrementAndGet(); 405 replicatedEntries.addAll(replicateContext.getEntries()); 406 407 return super.replicate(replicateContext); 408 } 409 } 410}