001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Delete; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.JVMClusterUtil; 052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 053import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 054import org.junit.After; 055import org.junit.AfterClass; 056import org.junit.Before; 057import org.junit.BeforeClass; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * This class is only a base for other integration-level replication tests. 063 * Do not add tests here. 064 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go 065 * All other tests should have their own classes and extend this one 066 */ 067public class TestReplicationBase { 068 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 069 070 protected static Configuration conf1 = HBaseConfiguration.create(); 071 protected static Configuration conf2; 072 protected static Configuration CONF_WITH_LOCALFS; 073 074 protected static ZKWatcher zkw1; 075 protected static ZKWatcher zkw2; 076 077 protected static ReplicationAdmin admin; 078 protected static Admin hbaseAdmin; 079 080 protected static Table htable1; 081 protected static Table htable2; 082 protected static NavigableMap<byte[], Integer> scopes; 083 084 protected static HBaseTestingUtility utility1; 085 protected static HBaseTestingUtility utility2; 086 protected static final int NB_ROWS_IN_BATCH = 100; 087 protected static final int NB_ROWS_IN_BIG_BATCH = 088 NB_ROWS_IN_BATCH * 10; 089 protected static final long SLEEP_TIME = 500; 090 protected static final int NB_RETRIES = 50; 091 092 protected static final TableName tableName = TableName.valueOf("test"); 093 protected static final byte[] famName = Bytes.toBytes("f"); 094 protected static final byte[] row = Bytes.toBytes("row"); 095 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 096 protected static final String PEER_ID2 = "2"; 097 098 protected boolean isSerialPeer() { 099 return false; 100 } 101 102 protected final void cleanUp() throws IOException, InterruptedException { 103 // Starting and stopping replication can make us miss new logs, 104 // rolling like this makes sure the most recent one gets added to the queue 105 for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() 106 .getRegionServerThreads()) { 107 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 108 } 109 int rowCount = utility1.countRows(tableName); 110 utility1.deleteTableData(tableName); 111 // truncating the table will send one Delete per row to the slave cluster 112 // in an async fashion, which is why we cannot just call deleteTableData on 113 // utility2 since late writes could make it to the slave in some way. 114 // Instead, we truncate the first table and wait for all the Deletes to 115 // make it to the slave. 116 Scan scan = new Scan(); 117 int lastCount = 0; 118 for (int i = 0; i < NB_RETRIES; i++) { 119 if (i == NB_RETRIES - 1) { 120 fail("Waited too much time for truncate"); 121 } 122 ResultScanner scanner = htable2.getScanner(scan); 123 Result[] res = scanner.next(rowCount); 124 scanner.close(); 125 if (res.length != 0) { 126 if (res.length < lastCount) { 127 i--; // Don't increment timeout if we make progress 128 } 129 lastCount = res.length; 130 LOG.info("Still got " + res.length + " rows"); 131 Thread.sleep(SLEEP_TIME); 132 } else { 133 break; 134 } 135 } 136 } 137 138 protected static void waitForReplication(int expectedRows, int retries) 139 throws IOException, InterruptedException { 140 Scan scan; 141 for (int i = 0; i < retries; i++) { 142 scan = new Scan(); 143 if (i== retries -1) { 144 fail("Waited too much time for normal batch replication"); 145 } 146 ResultScanner scanner = htable2.getScanner(scan); 147 Result[] res = scanner.next(expectedRows); 148 scanner.close(); 149 if (res.length != expectedRows) { 150 LOG.info("Only got " + res.length + " rows"); 151 Thread.sleep(SLEEP_TIME); 152 } else { 153 break; 154 } 155 } 156 } 157 158 protected static void loadData(String prefix, byte[] row) throws IOException { 159 loadData(prefix, row, famName); 160 } 161 162 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { 163 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 164 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 165 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 166 put.addColumn(familyName, row, row); 167 puts.add(put); 168 } 169 htable1.put(puts); 170 } 171 172 @BeforeClass 173 public static void setUpBeforeClass() throws Exception { 174 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 175 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 176 // sufficient number of events. But we don't want to go too low because 177 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 178 // more than one batch sent to the peer cluster for better testing. 179 conf1.setInt("replication.source.size.capacity", 102400); 180 conf1.setLong("replication.source.sleepforretries", 100); 181 conf1.setInt("hbase.regionserver.maxlogs", 10); 182 conf1.setLong("hbase.master.logcleaner.ttl", 10); 183 conf1.setInt("zookeeper.recovery.retry", 1); 184 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); 185 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 186 conf1.setInt("replication.stats.thread.period.seconds", 5); 187 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); 188 conf1.setLong("replication.sleep.before.failover", 2000); 189 conf1.setInt("replication.source.maxretriesmultiplier", 10); 190 conf1.setFloat("replication.source.ratio", 1.0f); 191 conf1.setBoolean("replication.source.eof.autorecovery", true); 192 conf1.setLong("hbase.serial.replication.waiting.ms", 100); 193 194 utility1 = new HBaseTestingUtility(conf1); 195 utility1.startMiniZKCluster(); 196 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 197 // Have to reget conf1 in case zk cluster location different 198 // than default 199 conf1 = utility1.getConfiguration(); 200 zkw1 = new ZKWatcher(conf1, "cluster1", null, true); 201 admin = new ReplicationAdmin(conf1); 202 LOG.info("Setup first Zk"); 203 204 // Base conf2 on conf1 so it gets the right zk cluster. 205 conf2 = HBaseConfiguration.create(conf1); 206 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 207 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 208 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 209 210 utility2 = new HBaseTestingUtility(conf2); 211 utility2.setZkCluster(miniZK); 212 zkw2 = new ZKWatcher(conf2, "cluster2", null, true); 213 LOG.info("Setup second Zk"); 214 215 CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); 216 utility1.startMiniCluster(2); 217 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 218 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 219 utility2.startMiniCluster(4); 220 221 hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); 222 223 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 224 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 225 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 226 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 227 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 228 for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { 229 scopes.put(f.getName(), f.getScope()); 230 } 231 Connection connection1 = ConnectionFactory.createConnection(conf1); 232 Connection connection2 = ConnectionFactory.createConnection(conf2); 233 try (Admin admin1 = connection1.getAdmin()) { 234 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 235 } 236 try (Admin admin2 = connection2.getAdmin()) { 237 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 238 } 239 utility1.waitUntilAllRegionsAssigned(tableName); 240 utility2.waitUntilAllRegionsAssigned(tableName); 241 htable1 = connection1.getTable(tableName); 242 htable2 = connection2.getTable(tableName); 243 } 244 245 private boolean peerExist(String peerId) throws IOException { 246 return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); 247 } 248 249 @Before 250 public void setUpBase() throws Exception { 251 if (!peerExist(PEER_ID2)) { 252 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 253 .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build(); 254 hbaseAdmin.addReplicationPeer(PEER_ID2, rpc); 255 } 256 } 257 258 @After 259 public void tearDownBase() throws Exception { 260 if (peerExist(PEER_ID2)) { 261 hbaseAdmin.removeReplicationPeer(PEER_ID2); 262 } 263 } 264 265 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 266 Put put = new Put(row); 267 put.addColumn(famName, row, row); 268 269 htable1 = utility1.getConnection().getTable(tableName); 270 htable1.put(put); 271 272 Get get = new Get(row); 273 for (int i = 0; i < NB_RETRIES; i++) { 274 if (i == NB_RETRIES - 1) { 275 fail("Waited too much time for put replication"); 276 } 277 Result res = htable2.get(get); 278 if (res.isEmpty()) { 279 LOG.info("Row not available"); 280 Thread.sleep(SLEEP_TIME); 281 } else { 282 assertArrayEquals(row, res.value()); 283 break; 284 } 285 } 286 287 Delete del = new Delete(row); 288 htable1.delete(del); 289 290 get = new Get(row); 291 for (int i = 0; i < NB_RETRIES; i++) { 292 if (i == NB_RETRIES - 1) { 293 fail("Waited too much time for del replication"); 294 } 295 Result res = htable2.get(get); 296 if (res.size() >= 1) { 297 LOG.info("Row not deleted"); 298 Thread.sleep(SLEEP_TIME); 299 } else { 300 break; 301 } 302 } 303 } 304 305 protected static void runSmallBatchTest() throws IOException, InterruptedException { 306 // normal Batch tests 307 loadData("", row); 308 309 Scan scan = new Scan(); 310 311 ResultScanner scanner1 = htable1.getScanner(scan); 312 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 313 scanner1.close(); 314 assertEquals(NB_ROWS_IN_BATCH, res1.length); 315 316 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 317 } 318 319 @AfterClass 320 public static void tearDownAfterClass() throws Exception { 321 htable2.close(); 322 htable1.close(); 323 admin.close(); 324 utility2.shutdownMiniCluster(); 325 utility1.shutdownMiniCluster(); 326 } 327}