001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import org.apache.commons.io.IOUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil; 049import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * This class is only a base for other integration-level replication tests. 059 * Do not add tests here. 060 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go 061 * All other tests should have their own classes and extend this one 062 */ 063public class TestReplicationBase { 064 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 065 066 protected static Configuration CONF_WITH_LOCALFS; 067 068 protected static ReplicationAdmin admin; 069 protected static Admin hbaseAdmin; 070 071 protected static Table htable1; 072 protected static Table htable2; 073 074 protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); 075 protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); 076 protected static Configuration CONF1 = UTIL1.getConfiguration(); 077 protected static Configuration CONF2 = UTIL2.getConfiguration(); 078 079 protected static int NUM_SLAVES1 = 1; 080 protected static final int NUM_SLAVES2 = 1; 081 protected static final int NB_ROWS_IN_BATCH = 100; 082 protected static final int NB_ROWS_IN_BIG_BATCH = 083 NB_ROWS_IN_BATCH * 10; 084 protected static final long SLEEP_TIME = 500; 085 protected static final int NB_RETRIES = 50; 086 087 protected static final TableName tableName = TableName.valueOf("test"); 088 protected static final byte[] famName = Bytes.toBytes("f"); 089 protected static final byte[] row = Bytes.toBytes("row"); 090 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 091 protected static final String PEER_ID2 = "2"; 092 093 protected boolean isSerialPeer() { 094 return false; 095 } 096 097 protected final void cleanUp() throws IOException, InterruptedException { 098 // Starting and stopping replication can make us miss new logs, 099 // rolling like this makes sure the most recent one gets added to the queue 100 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() 101 .getRegionServerThreads()) { 102 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 103 } 104 int rowCount = UTIL1.countRows(tableName); 105 UTIL1.deleteTableData(tableName); 106 // truncating the table will send one Delete per row to the slave cluster 107 // in an async fashion, which is why we cannot just call deleteTableData on 108 // utility2 since late writes could make it to the slave in some way. 109 // Instead, we truncate the first table and wait for all the Deletes to 110 // make it to the slave. 111 Scan scan = new Scan(); 112 int lastCount = 0; 113 for (int i = 0; i < NB_RETRIES; i++) { 114 if (i == NB_RETRIES - 1) { 115 fail("Waited too much time for truncate"); 116 } 117 ResultScanner scanner = htable2.getScanner(scan); 118 Result[] res = scanner.next(rowCount); 119 scanner.close(); 120 if (res.length != 0) { 121 if (res.length < lastCount) { 122 i--; // Don't increment timeout if we make progress 123 } 124 lastCount = res.length; 125 LOG.info("Still got " + res.length + " rows"); 126 Thread.sleep(SLEEP_TIME); 127 } else { 128 break; 129 } 130 } 131 } 132 133 protected static void waitForReplication(int expectedRows, int retries) 134 throws IOException, InterruptedException { 135 waitForReplication(htable2, expectedRows, retries); 136 } 137 138 protected static void waitForReplication(Table htable2, int expectedRows, int retries) 139 throws IOException, InterruptedException { 140 Scan scan; 141 for (int i = 0; i < retries; i++) { 142 scan = new Scan(); 143 if (i== retries -1) { 144 fail("Waited too much time for normal batch replication"); 145 } 146 ResultScanner scanner = htable2.getScanner(scan); 147 Result[] res = scanner.next(expectedRows); 148 scanner.close(); 149 if (res.length != expectedRows) { 150 LOG.info("Only got " + res.length + " rows"); 151 Thread.sleep(SLEEP_TIME); 152 } else { 153 break; 154 } 155 } 156 } 157 158 protected static void loadData(String prefix, byte[] row) throws IOException { 159 loadData(prefix, row, famName); 160 } 161 162 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 163 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 164 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 165 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 166 put.addColumn(familyName, row, row); 167 puts.add(put); 168 } 169 htable1.put(puts); 170 } 171 172 protected static void setupConfig(HBaseTestingUtility util, String znodeParent) { 173 Configuration conf = util.getConfiguration(); 174 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); 175 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 176 // sufficient number of events. But we don't want to go too low because 177 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 178 // more than one batch sent to the peer cluster for better testing. 179 conf.setInt("replication.source.size.capacity", 102400); 180 conf.setLong("replication.source.sleepforretries", 100); 181 conf.setInt("hbase.regionserver.maxlogs", 10); 182 conf.setLong("hbase.master.logcleaner.ttl", 10); 183 conf.setInt("zookeeper.recovery.retry", 1); 184 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 185 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 186 conf.setInt("replication.stats.thread.period.seconds", 5); 187 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 188 conf.setLong("replication.sleep.before.failover", 2000); 189 conf.setInt("replication.source.maxretriesmultiplier", 10); 190 conf.setFloat("replication.source.ratio", 1.0f); 191 conf.setBoolean("replication.source.eof.autorecovery", true); 192 conf.setLong("hbase.serial.replication.waiting.ms", 100); 193 } 194 195 static void configureClusters(HBaseTestingUtility util1, 196 HBaseTestingUtility util2) { 197 setupConfig(util1, "/1"); 198 setupConfig(util2, "/2"); 199 200 Configuration conf2 = util2.getConfiguration(); 201 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 202 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 203 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 204 } 205 206 static void restartSourceCluster(int numSlaves) 207 throws Exception { 208 IOUtils.closeQuietly(hbaseAdmin, htable1); 209 UTIL1.shutdownMiniHBaseCluster(); 210 UTIL1.restartHBaseCluster(numSlaves); 211 // Invalidate the cached connection state. 212 CONF1 = UTIL1.getConfiguration(); 213 hbaseAdmin = UTIL1.getAdmin(); 214 Connection connection1 = UTIL1.getConnection(); 215 htable1 = connection1.getTable(tableName); 216 } 217 218 static void restartTargetHBaseCluster(int numSlaves) throws Exception { 219 IOUtils.closeQuietly(htable2); 220 UTIL2.restartHBaseCluster(numSlaves); 221 // Invalidate the cached connection state 222 CONF2 = UTIL2.getConfiguration(); 223 htable2 = UTIL2.getConnection().getTable(tableName); 224 } 225 226 private static void startClusters() throws Exception { 227 UTIL1.startMiniZKCluster(); 228 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); 229 LOG.info("Setup first Zk"); 230 231 UTIL2.setZkCluster(miniZK); 232 LOG.info("Setup second Zk"); 233 234 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); 235 UTIL1.startMiniCluster(NUM_SLAVES1); 236 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 237 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 238 UTIL2.startMiniCluster(NUM_SLAVES2); 239 240 admin = new ReplicationAdmin(CONF1); 241 hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); 242 243 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 244 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 245 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 246 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 247 248 Connection connection1 = ConnectionFactory.createConnection(CONF1); 249 Connection connection2 = ConnectionFactory.createConnection(CONF2); 250 try (Admin admin1 = connection1.getAdmin()) { 251 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 252 } 253 try (Admin admin2 = connection2.getAdmin()) { 254 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 255 } 256 UTIL1.waitUntilAllRegionsAssigned(tableName); 257 UTIL2.waitUntilAllRegionsAssigned(tableName); 258 htable1 = connection1.getTable(tableName); 259 htable2 = connection2.getTable(tableName); 260 } 261 262 @BeforeClass 263 public static void setUpBeforeClass() throws Exception { 264 configureClusters(UTIL1, UTIL2); 265 startClusters(); 266 } 267 268 private boolean peerExist(String peerId) throws IOException { 269 return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); 270 } 271 272 @Before 273 public void setUpBase() throws Exception { 274 if (!peerExist(PEER_ID2)) { 275 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 276 .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build(); 277 hbaseAdmin.addReplicationPeer(PEER_ID2, rpc); 278 } 279 } 280 281 @After 282 public void tearDownBase() throws Exception { 283 if (peerExist(PEER_ID2)) { 284 hbaseAdmin.removeReplicationPeer(PEER_ID2); 285 } 286 } 287 288 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 289 Put put = new Put(row); 290 put.addColumn(famName, row, row); 291 292 htable1 = UTIL1.getConnection().getTable(tableName); 293 htable1.put(put); 294 295 Get get = new Get(row); 296 for (int i = 0; i < NB_RETRIES; i++) { 297 if (i == NB_RETRIES - 1) { 298 fail("Waited too much time for put replication"); 299 } 300 Result res = htable2.get(get); 301 if (res.isEmpty()) { 302 LOG.info("Row not available"); 303 Thread.sleep(SLEEP_TIME); 304 } else { 305 assertArrayEquals(row, res.value()); 306 break; 307 } 308 } 309 310 Delete del = new Delete(row); 311 htable1.delete(del); 312 313 get = new Get(row); 314 for (int i = 0; i < NB_RETRIES; i++) { 315 if (i == NB_RETRIES - 1) { 316 fail("Waited too much time for del replication"); 317 } 318 Result res = htable2.get(get); 319 if (res.size() >= 1) { 320 LOG.info("Row not deleted"); 321 Thread.sleep(SLEEP_TIME); 322 } else { 323 break; 324 } 325 } 326 } 327 328 protected static void runSmallBatchTest() throws IOException, InterruptedException { 329 // normal Batch tests 330 loadData("", row); 331 332 Scan scan = new Scan(); 333 334 ResultScanner scanner1 = htable1.getScanner(scan); 335 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 336 scanner1.close(); 337 assertEquals(NB_ROWS_IN_BATCH, res1.length); 338 339 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 340 } 341 342 @AfterClass 343 public static void tearDownAfterClass() throws Exception { 344 if (htable2 != null) { 345 htable2.close(); 346 } 347 if (htable1 != null) { 348 htable1.close(); 349 } 350 if (admin != null) { 351 admin.close(); 352 } 353 UTIL2.shutdownMiniCluster(); 354 UTIL1.shutdownMiniCluster(); 355 } 356}