001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 047import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.JVMClusterUtil; 050import org.apache.hadoop.hbase.wal.WAL; 051import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 052import org.junit.After; 053import org.junit.AfterClass; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 061 062/** 063 * This class is only a base for other integration-level replication tests. Do not add tests here. 064 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All 065 * other tests should have their own classes and extend this one 066 */ 067public class TestReplicationBase { 068 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 069 protected static Connection connection1; 070 protected static Connection connection2; 071 protected static Configuration CONF_WITH_LOCALFS; 072 073 protected static ReplicationAdmin admin; 074 protected static Admin hbaseAdmin; 075 076 protected static Table htable1; 077 protected static Table htable2; 078 079 protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); 080 protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); 081 protected static Configuration CONF1 = UTIL1.getConfiguration(); 082 protected static Configuration CONF2 = UTIL2.getConfiguration(); 083 084 protected static int NUM_SLAVES1 = 1; 085 protected static int NUM_SLAVES2 = 1; 086 protected static final int NB_ROWS_IN_BATCH = 100; 087 protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; 088 protected static final long SLEEP_TIME = 500; 089 protected static final int NB_RETRIES = 50; 090 protected static AtomicInteger replicateCount = new AtomicInteger(); 091 protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList(); 092 093 protected static final TableName tableName = TableName.valueOf("test"); 094 protected static final byte[] famName = Bytes.toBytes("f"); 095 protected static final byte[] row = Bytes.toBytes("row"); 096 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 097 protected static final String PEER_ID2 = "2"; 098 099 protected boolean isSerialPeer() { 100 return false; 101 } 102 103 protected final void cleanUp() throws IOException, InterruptedException { 104 // Starting and stopping replication can make us miss new logs, 105 // rolling like this makes sure the most recent one gets added to the queue 106 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 107 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 108 } 109 int rowCount = UTIL1.countRows(tableName); 110 UTIL1.deleteTableData(tableName); 111 // truncating the table will send one Delete per row to the slave cluster 112 // in an async fashion, which is why we cannot just call deleteTableData on 113 // utility2 since late writes could make it to the slave in some way. 114 // Instead, we truncate the first table and wait for all the Deletes to 115 // make it to the slave. 116 Scan scan = new Scan(); 117 int lastCount = 0; 118 for (int i = 0; i < NB_RETRIES; i++) { 119 if (i == NB_RETRIES - 1) { 120 fail("Waited too much time for truncate"); 121 } 122 ResultScanner scanner = htable2.getScanner(scan); 123 Result[] res = scanner.next(rowCount); 124 scanner.close(); 125 if (res.length != 0) { 126 if (res.length < lastCount) { 127 i--; // Don't increment timeout if we make progress 128 } 129 lastCount = res.length; 130 LOG.info("Still got " + res.length + " rows"); 131 Thread.sleep(SLEEP_TIME); 132 } else { 133 break; 134 } 135 } 136 } 137 138 protected static void waitForReplication(int expectedRows, int retries) 139 throws IOException, InterruptedException { 140 waitForReplication(htable2, expectedRows, retries); 141 } 142 143 protected static void waitForReplication(Table table, int expectedRows, int retries) 144 throws IOException, InterruptedException { 145 Scan scan; 146 for (int i = 0; i < retries; i++) { 147 scan = new Scan(); 148 if (i == retries - 1) { 149 fail("Waited too much time for normal batch replication"); 150 } 151 int count = 0; 152 try (ResultScanner scanner = table.getScanner(scan)) { 153 while (scanner.next() != null) { 154 count++; 155 } 156 } 157 if (count != expectedRows) { 158 LOG.info("Only got " + count + " rows"); 159 Thread.sleep(SLEEP_TIME); 160 } else { 161 break; 162 } 163 } 164 } 165 166 protected static void loadData(String prefix, byte[] row) throws IOException { 167 loadData(prefix, row, famName); 168 } 169 170 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 171 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 172 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 173 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 174 put.addColumn(familyName, row, row); 175 puts.add(put); 176 } 177 htable1.put(puts); 178 } 179 180 protected static void setupConfig(HBaseTestingUtility util, String znodeParent) { 181 Configuration conf = util.getConfiguration(); 182 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); 183 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 184 // sufficient number of events. But we don't want to go too low because 185 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 186 // more than one batch sent to the peer cluster for better testing. 187 conf.setInt("replication.source.size.capacity", 102400); 188 conf.setLong("replication.source.sleepforretries", 100); 189 conf.setInt("hbase.regionserver.maxlogs", 10); 190 conf.setLong("hbase.master.logcleaner.ttl", 10); 191 conf.setInt("zookeeper.recovery.retry", 1); 192 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 193 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 194 conf.setInt("replication.stats.thread.period.seconds", 5); 195 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 196 conf.setLong("replication.sleep.before.failover", 2000); 197 conf.setInt("replication.source.maxretriesmultiplier", 10); 198 conf.setFloat("replication.source.ratio", 1.0f); 199 conf.setBoolean("replication.source.eof.autorecovery", true); 200 conf.setLong("hbase.serial.replication.waiting.ms", 100); 201 } 202 203 static void configureClusters(HBaseTestingUtility util1, HBaseTestingUtility util2) { 204 setupConfig(util1, "/1"); 205 setupConfig(util2, "/2"); 206 207 Configuration conf2 = util2.getConfiguration(); 208 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 209 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 210 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 211 } 212 213 static void restartSourceCluster(int numSlaves) throws Exception { 214 Closeables.close(hbaseAdmin, true); 215 Closeables.close(htable1, true); 216 UTIL1.shutdownMiniHBaseCluster(); 217 UTIL1.restartHBaseCluster(numSlaves); 218 // Invalidate the cached connection state. 219 CONF1 = UTIL1.getConfiguration(); 220 hbaseAdmin = UTIL1.getAdmin(); 221 Connection connection1 = UTIL1.getConnection(); 222 htable1 = connection1.getTable(tableName); 223 } 224 225 static void restartTargetHBaseCluster(int numSlaves) throws Exception { 226 Closeables.close(htable2, true); 227 UTIL2.restartHBaseCluster(numSlaves); 228 // Invalidate the cached connection state 229 CONF2 = UTIL2.getConfiguration(); 230 htable2 = UTIL2.getConnection().getTable(tableName); 231 } 232 233 protected static void createTable(TableName tableName) throws IOException { 234 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 235 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 236 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 237 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 238 UTIL1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 239 UTIL2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 240 UTIL1.waitUntilAllRegionsAssigned(tableName); 241 UTIL2.waitUntilAllRegionsAssigned(tableName); 242 } 243 244 private static void startClusters() throws Exception { 245 UTIL1.startMiniZKCluster(); 246 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); 247 LOG.info("Setup first Zk"); 248 249 UTIL2.setZkCluster(miniZK); 250 LOG.info("Setup second Zk"); 251 252 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); 253 UTIL1.startMiniCluster(NUM_SLAVES1); 254 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 255 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 256 UTIL2.startMiniCluster(NUM_SLAVES2); 257 258 connection1 = ConnectionFactory.createConnection(CONF1); 259 connection2 = ConnectionFactory.createConnection(CONF2); 260 admin = new ReplicationAdmin(CONF1); 261 hbaseAdmin = connection1.getAdmin(); 262 263 createTable(tableName); 264 htable1 = connection1.getTable(tableName); 265 htable2 = connection2.getTable(tableName); 266 } 267 268 @BeforeClass 269 public static void setUpBeforeClass() throws Exception { 270 configureClusters(UTIL1, UTIL2); 271 startClusters(); 272 } 273 274 private boolean peerExist(String peerId) throws IOException { 275 return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); 276 } 277 278 protected final void addPeer(String peerId, TableName tableName) throws Exception { 279 if (!peerExist(peerId)) { 280 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 281 .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()) 282 .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); 283 hbaseAdmin.addReplicationPeer(peerId, builder.build()); 284 } 285 } 286 287 @Before 288 public void setUpBase() throws Exception { 289 addPeer(PEER_ID2, tableName); 290 } 291 292 protected final void removePeer(String peerId) throws Exception { 293 if (peerExist(peerId)) { 294 hbaseAdmin.removeReplicationPeer(peerId); 295 } 296 } 297 298 @After 299 public void tearDownBase() throws Exception { 300 removePeer(PEER_ID2); 301 } 302 303 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 304 Put put = new Put(row); 305 put.addColumn(famName, row, row); 306 307 htable1 = UTIL1.getConnection().getTable(tableName); 308 htable1.put(put); 309 310 Get get = new Get(row); 311 for (int i = 0; i < NB_RETRIES; i++) { 312 if (i == NB_RETRIES - 1) { 313 fail("Waited too much time for put replication"); 314 } 315 Result res = htable2.get(get); 316 if (res.isEmpty()) { 317 LOG.info("Row not available"); 318 Thread.sleep(SLEEP_TIME); 319 } else { 320 assertArrayEquals(row, res.value()); 321 break; 322 } 323 } 324 325 Delete del = new Delete(row); 326 htable1.delete(del); 327 328 get = new Get(row); 329 for (int i = 0; i < NB_RETRIES; i++) { 330 if (i == NB_RETRIES - 1) { 331 fail("Waited too much time for del replication"); 332 } 333 Result res = htable2.get(get); 334 if (res.size() >= 1) { 335 LOG.info("Row not deleted"); 336 Thread.sleep(SLEEP_TIME); 337 } else { 338 break; 339 } 340 } 341 } 342 343 protected static void runSmallBatchTest() throws IOException, InterruptedException { 344 // normal Batch tests 345 loadData("", row); 346 347 Scan scan = new Scan(); 348 349 ResultScanner scanner1 = htable1.getScanner(scan); 350 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 351 scanner1.close(); 352 assertEquals(NB_ROWS_IN_BATCH, res1.length); 353 354 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 355 } 356 357 @AfterClass 358 public static void tearDownAfterClass() throws Exception { 359 if (htable2 != null) { 360 htable2.close(); 361 } 362 if (htable1 != null) { 363 htable1.close(); 364 } 365 if (admin != null) { 366 admin.close(); 367 } 368 if (hbaseAdmin != null) { 369 hbaseAdmin.close(); 370 } 371 372 if (connection2 != null) { 373 connection2.close(); 374 } 375 if (connection1 != null) { 376 connection1.close(); 377 } 378 UTIL2.shutdownMiniCluster(); 379 UTIL1.shutdownMiniCluster(); 380 } 381 382 /** 383 * Custom replication endpoint to keep track of replication status for tests. 384 */ 385 public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint { 386 public ReplicationEndpointTest() { 387 replicateCount.set(0); 388 } 389 390 @Override 391 public boolean replicate(ReplicateContext replicateContext) { 392 replicateCount.incrementAndGet(); 393 replicatedEntries.addAll(replicateContext.getEntries()); 394 395 return super.replicate(replicateContext); 396 } 397 } 398}