001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import org.apache.commons.io.IOUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.JVMClusterUtil; 050import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 051import org.junit.After; 052import org.junit.AfterClass; 053import org.junit.Before; 054import org.junit.BeforeClass; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * This class is only a base for other integration-level replication tests. 060 * Do not add tests here. 061 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go 062 * All other tests should have their own classes and extend this one 063 */ 064public class TestReplicationBase { 065 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 066 067 protected static Configuration CONF_WITH_LOCALFS; 068 069 protected static ReplicationAdmin admin; 070 protected static Admin hbaseAdmin; 071 072 protected static Table htable1; 073 protected static Table htable2; 074 075 protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); 076 protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); 077 protected static Configuration CONF1 = UTIL1.getConfiguration(); 078 protected static Configuration CONF2 = UTIL2.getConfiguration(); 079 080 protected static final int NUM_SLAVES1 = 1; 081 protected static final int NUM_SLAVES2 = 1; 082 protected static final int NB_ROWS_IN_BATCH = 100; 083 protected static final int NB_ROWS_IN_BIG_BATCH = 084 NB_ROWS_IN_BATCH * 10; 085 protected static final long SLEEP_TIME = 500; 086 protected static final int NB_RETRIES = 50; 087 088 protected static final TableName tableName = TableName.valueOf("test"); 089 protected static final byte[] famName = Bytes.toBytes("f"); 090 protected static final byte[] row = Bytes.toBytes("row"); 091 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 092 protected static final String PEER_ID2 = "2"; 093 094 protected boolean isSerialPeer() { 095 return false; 096 } 097 098 protected final void cleanUp() throws IOException, InterruptedException { 099 // Starting and stopping replication can make us miss new logs, 100 // rolling like this makes sure the most recent one gets added to the queue 101 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() 102 .getRegionServerThreads()) { 103 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 104 } 105 int rowCount = UTIL1.countRows(tableName); 106 UTIL1.deleteTableData(tableName); 107 // truncating the table will send one Delete per row to the slave cluster 108 // in an async fashion, which is why we cannot just call deleteTableData on 109 // utility2 since late writes could make it to the slave in some way. 110 // Instead, we truncate the first table and wait for all the Deletes to 111 // make it to the slave. 112 Scan scan = new Scan(); 113 int lastCount = 0; 114 for (int i = 0; i < NB_RETRIES; i++) { 115 if (i == NB_RETRIES - 1) { 116 fail("Waited too much time for truncate"); 117 } 118 ResultScanner scanner = htable2.getScanner(scan); 119 Result[] res = scanner.next(rowCount); 120 scanner.close(); 121 if (res.length != 0) { 122 if (res.length < lastCount) { 123 i--; // Don't increment timeout if we make progress 124 } 125 lastCount = res.length; 126 LOG.info("Still got " + res.length + " rows"); 127 Thread.sleep(SLEEP_TIME); 128 } else { 129 break; 130 } 131 } 132 } 133 134 protected static void waitForReplication(int expectedRows, int retries) 135 throws IOException, InterruptedException { 136 waitForReplication(htable2, expectedRows, retries); 137 } 138 139 protected static void waitForReplication(Table htable2, int expectedRows, int retries) 140 throws IOException, InterruptedException { 141 Scan scan; 142 for (int i = 0; i < retries; i++) { 143 scan = new Scan(); 144 if (i== retries -1) { 145 fail("Waited too much time for normal batch replication"); 146 } 147 ResultScanner scanner = htable2.getScanner(scan); 148 Result[] res = scanner.next(expectedRows); 149 scanner.close(); 150 if (res.length != expectedRows) { 151 LOG.info("Only got " + res.length + " rows"); 152 Thread.sleep(SLEEP_TIME); 153 } else { 154 break; 155 } 156 } 157 } 158 159 protected static void loadData(String prefix, byte[] row) throws IOException { 160 loadData(prefix, row, famName); 161 } 162 163 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 164 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 165 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 166 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 167 put.addColumn(familyName, row, row); 168 puts.add(put); 169 } 170 htable1.put(puts); 171 } 172 173 protected static void setupConfig(HBaseTestingUtility util, String znodeParent) { 174 Configuration conf = util.getConfiguration(); 175 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); 176 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 177 // sufficient number of events. But we don't want to go too low because 178 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 179 // more than one batch sent to the peer cluster for better testing. 180 conf.setInt("replication.source.size.capacity", 102400); 181 conf.setLong("replication.source.sleepforretries", 100); 182 conf.setInt("hbase.regionserver.maxlogs", 10); 183 conf.setLong("hbase.master.logcleaner.ttl", 10); 184 conf.setInt("zookeeper.recovery.retry", 1); 185 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 186 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 187 conf.setInt("replication.stats.thread.period.seconds", 5); 188 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 189 conf.setLong("replication.sleep.before.failover", 2000); 190 conf.setInt("replication.source.maxretriesmultiplier", 10); 191 conf.setFloat("replication.source.ratio", 1.0f); 192 conf.setBoolean("replication.source.eof.autorecovery", true); 193 conf.setLong("hbase.serial.replication.waiting.ms", 100); 194 } 195 196 static void configureClusters(HBaseTestingUtility util1, 197 HBaseTestingUtility util2) { 198 setupConfig(util1, "/1"); 199 setupConfig(util2, "/2"); 200 201 Configuration conf2 = util2.getConfiguration(); 202 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 203 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 204 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 205 } 206 207 static void restartSourceCluster(int numSlaves) 208 throws Exception { 209 IOUtils.closeQuietly(hbaseAdmin, htable1); 210 UTIL1.shutdownMiniHBaseCluster(); 211 UTIL1.restartHBaseCluster(numSlaves); 212 // Invalidate the cached connection state. 213 CONF1 = UTIL1.getConfiguration(); 214 hbaseAdmin = UTIL1.getAdmin(); 215 Connection connection1 = UTIL1.getConnection(); 216 htable1 = connection1.getTable(tableName); 217 } 218 219 static void restartTargetHBaseCluster(int numSlaves) throws Exception { 220 IOUtils.closeQuietly(htable2); 221 UTIL2.restartHBaseCluster(numSlaves); 222 // Invalidate the cached connection state 223 CONF2 = UTIL2.getConfiguration(); 224 htable2 = UTIL2.getConnection().getTable(tableName); 225 } 226 227 private static void startClusters() throws Exception { 228 UTIL1.startMiniZKCluster(); 229 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); 230 LOG.info("Setup first Zk"); 231 232 UTIL2.setZkCluster(miniZK); 233 LOG.info("Setup second Zk"); 234 235 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); 236 UTIL1.startMiniCluster(NUM_SLAVES1); 237 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 238 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 239 UTIL2.startMiniCluster(NUM_SLAVES2); 240 241 admin = new ReplicationAdmin(CONF1); 242 hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); 243 244 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 245 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 246 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 247 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 248 249 Connection connection1 = ConnectionFactory.createConnection(CONF1); 250 Connection connection2 = ConnectionFactory.createConnection(CONF2); 251 try (Admin admin1 = connection1.getAdmin()) { 252 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 253 } 254 try (Admin admin2 = connection2.getAdmin()) { 255 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 256 } 257 UTIL1.waitUntilAllRegionsAssigned(tableName); 258 UTIL2.waitUntilAllRegionsAssigned(tableName); 259 htable1 = connection1.getTable(tableName); 260 htable2 = connection2.getTable(tableName); 261 } 262 263 @BeforeClass 264 public static void setUpBeforeClass() throws Exception { 265 configureClusters(UTIL1, UTIL2); 266 startClusters(); 267 } 268 269 private boolean peerExist(String peerId) throws IOException { 270 return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); 271 } 272 273 @Before 274 public void setUpBase() throws Exception { 275 if (!peerExist(PEER_ID2)) { 276 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 277 .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build(); 278 hbaseAdmin.addReplicationPeer(PEER_ID2, rpc); 279 } 280 } 281 282 @After 283 public void tearDownBase() throws Exception { 284 if (peerExist(PEER_ID2)) { 285 hbaseAdmin.removeReplicationPeer(PEER_ID2); 286 } 287 } 288 289 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 290 Put put = new Put(row); 291 put.addColumn(famName, row, row); 292 293 htable1 = UTIL1.getConnection().getTable(tableName); 294 htable1.put(put); 295 296 Get get = new Get(row); 297 for (int i = 0; i < NB_RETRIES; i++) { 298 if (i == NB_RETRIES - 1) { 299 fail("Waited too much time for put replication"); 300 } 301 Result res = htable2.get(get); 302 if (res.isEmpty()) { 303 LOG.info("Row not available"); 304 Thread.sleep(SLEEP_TIME); 305 } else { 306 assertArrayEquals(row, res.value()); 307 break; 308 } 309 } 310 311 Delete del = new Delete(row); 312 htable1.delete(del); 313 314 get = new Get(row); 315 for (int i = 0; i < NB_RETRIES; i++) { 316 if (i == NB_RETRIES - 1) { 317 fail("Waited too much time for del replication"); 318 } 319 Result res = htable2.get(get); 320 if (res.size() >= 1) { 321 LOG.info("Row not deleted"); 322 Thread.sleep(SLEEP_TIME); 323 } else { 324 break; 325 } 326 } 327 } 328 329 protected static void runSmallBatchTest() throws IOException, InterruptedException { 330 // normal Batch tests 331 loadData("", row); 332 333 Scan scan = new Scan(); 334 335 ResultScanner scanner1 = htable1.getScanner(scan); 336 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 337 scanner1.close(); 338 assertEquals(NB_ROWS_IN_BATCH, res1.length); 339 340 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 341 } 342 343 @AfterClass 344 public static void tearDownAfterClass() throws Exception { 345 if (htable2 != null) { 346 htable2.close(); 347 } 348 if (htable1 != null) { 349 htable1.close(); 350 } 351 if (admin != null) { 352 admin.close(); 353 } 354 UTIL2.shutdownMiniCluster(); 355 UTIL1.shutdownMiniCluster(); 356 } 357}