001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.List; 029import java.util.NavigableMap; 030import java.util.TreeMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.JVMClusterUtil; 053import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.junit.AfterClass; 057import org.junit.BeforeClass; 058import org.junit.runners.Parameterized.Parameter; 059import org.junit.runners.Parameterized.Parameters; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * This class is only a base for other integration-level replication tests. 065 * Do not add tests here. 066 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go 067 * All other tests should have their own classes and extend this one 068 */ 069public class TestReplicationBase { 070/* 071 { 072 ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL); 073 }*/ 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 076 077 protected static Configuration conf1 = HBaseConfiguration.create(); 078 protected static Configuration conf2; 079 protected static Configuration CONF_WITH_LOCALFS; 080 081 protected static ZKWatcher zkw1; 082 protected static ZKWatcher zkw2; 083 084 protected static ReplicationAdmin admin; 085 protected static Admin hbaseAdmin; 086 087 protected static Table htable1; 088 protected static Table htable2; 089 protected static NavigableMap<byte[], Integer> scopes; 090 091 protected static HBaseTestingUtility utility1; 092 protected static HBaseTestingUtility utility2; 093 protected static final int NB_ROWS_IN_BATCH = 100; 094 protected static final int NB_ROWS_IN_BIG_BATCH = 095 NB_ROWS_IN_BATCH * 10; 096 protected static final long SLEEP_TIME = 500; 097 protected static final int NB_RETRIES = 10; 098 099 protected static final TableName tableName = TableName.valueOf("test"); 100 protected static final byte[] famName = Bytes.toBytes("f"); 101 protected static final byte[] row = Bytes.toBytes("row"); 102 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 103 104 @Parameter 105 public static boolean seperateOldWALs; 106 107 @Parameters 108 public static List<Boolean> params() { 109 return Arrays.asList(false, true); 110 } 111 112 protected final void cleanUp() throws IOException, InterruptedException { 113 // Starting and stopping replication can make us miss new logs, 114 // rolling like this makes sure the most recent one gets added to the queue 115 for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() 116 .getRegionServerThreads()) { 117 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 118 } 119 int rowCount = utility1.countRows(tableName); 120 utility1.deleteTableData(tableName); 121 // truncating the table will send one Delete per row to the slave cluster 122 // in an async fashion, which is why we cannot just call deleteTableData on 123 // utility2 since late writes could make it to the slave in some way. 124 // Instead, we truncate the first table and wait for all the Deletes to 125 // make it to the slave. 126 Scan scan = new Scan(); 127 int lastCount = 0; 128 for (int i = 0; i < NB_RETRIES; i++) { 129 if (i == NB_RETRIES - 1) { 130 fail("Waited too much time for truncate"); 131 } 132 ResultScanner scanner = htable2.getScanner(scan); 133 Result[] res = scanner.next(rowCount); 134 scanner.close(); 135 if (res.length != 0) { 136 if (res.length < lastCount) { 137 i--; // Don't increment timeout if we make progress 138 } 139 lastCount = res.length; 140 LOG.info("Still got " + res.length + " rows"); 141 Thread.sleep(SLEEP_TIME); 142 } else { 143 break; 144 } 145 } 146 } 147 148 protected static void waitForReplication(int expectedRows, int retries) 149 throws IOException, InterruptedException { 150 Scan scan; 151 for (int i = 0; i < retries; i++) { 152 scan = new Scan(); 153 if (i== retries -1) { 154 fail("Waited too much time for normal batch replication"); 155 } 156 ResultScanner scanner = htable2.getScanner(scan); 157 Result[] res = scanner.next(expectedRows); 158 scanner.close(); 159 if (res.length != expectedRows) { 160 LOG.info("Only got " + res.length + " rows"); 161 Thread.sleep(SLEEP_TIME); 162 } else { 163 break; 164 } 165 } 166 } 167 168 protected static void loadData(String prefix, byte[] row) throws IOException { 169 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 170 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 171 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); 172 put.addColumn(famName, row, row); 173 puts.add(put); 174 } 175 htable1.put(puts); 176 } 177 178 @BeforeClass 179 public static void setUpBeforeClass() throws Exception { 180 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 181 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger 182 // sufficient number of events. But we don't want to go too low because 183 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want 184 // more than one batch sent to the peer cluster for better testing. 185 conf1.setInt("replication.source.size.capacity", 102400); 186 conf1.setLong("replication.source.sleepforretries", 100); 187 conf1.setInt("hbase.regionserver.maxlogs", 10); 188 conf1.setLong("hbase.master.logcleaner.ttl", 10); 189 conf1.setInt("zookeeper.recovery.retry", 1); 190 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); 191 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 192 conf1.setInt("replication.stats.thread.period.seconds", 5); 193 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); 194 conf1.setLong("replication.sleep.before.failover", 2000); 195 conf1.setInt("replication.source.maxretriesmultiplier", 10); 196 conf1.setFloat("replication.source.ratio", 1.0f); 197 conf1.setBoolean("replication.source.eof.autorecovery", true); 198 199 // Parameter config 200 conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs); 201 202 utility1 = new HBaseTestingUtility(conf1); 203 utility1.startMiniZKCluster(); 204 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 205 // Have to reget conf1 in case zk cluster location different 206 // than default 207 conf1 = utility1.getConfiguration(); 208 zkw1 = new ZKWatcher(conf1, "cluster1", null, true); 209 admin = new ReplicationAdmin(conf1); 210 LOG.info("Setup first Zk"); 211 212 // Base conf2 on conf1 so it gets the right zk cluster. 213 conf2 = HBaseConfiguration.create(conf1); 214 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 215 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 216 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 217 218 utility2 = new HBaseTestingUtility(conf2); 219 utility2.setZkCluster(miniZK); 220 zkw2 = new ZKWatcher(conf2, "cluster2", null, true); 221 LOG.info("Setup second Zk"); 222 223 CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); 224 utility1.startMiniCluster(2); 225 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks 226 // as a component in deciding maximum number of parallel batches to send to the peer cluster. 227 utility2.startMiniCluster(4); 228 229 ReplicationPeerConfig rpc = 230 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); 231 hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); 232 hbaseAdmin.addReplicationPeer("2", rpc); 233 234 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 235 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 236 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 237 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 238 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 239 for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { 240 scopes.put(f.getName(), f.getScope()); 241 } 242 Connection connection1 = ConnectionFactory.createConnection(conf1); 243 Connection connection2 = ConnectionFactory.createConnection(conf2); 244 try (Admin admin1 = connection1.getAdmin()) { 245 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 246 } 247 try (Admin admin2 = connection2.getAdmin()) { 248 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 249 } 250 utility1.waitUntilAllRegionsAssigned(tableName); 251 utility2.waitUntilAllRegionsAssigned(tableName); 252 htable1 = connection1.getTable(tableName); 253 htable2 = connection2.getTable(tableName); 254 } 255 256 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { 257 Put put = new Put(row); 258 put.addColumn(famName, row, row); 259 260 htable1 = utility1.getConnection().getTable(tableName); 261 htable1.put(put); 262 263 Get get = new Get(row); 264 for (int i = 0; i < NB_RETRIES; i++) { 265 if (i == NB_RETRIES - 1) { 266 fail("Waited too much time for put replication"); 267 } 268 Result res = htable2.get(get); 269 if (res.isEmpty()) { 270 LOG.info("Row not available"); 271 Thread.sleep(SLEEP_TIME); 272 } else { 273 assertArrayEquals(row, res.value()); 274 break; 275 } 276 } 277 278 Delete del = new Delete(row); 279 htable1.delete(del); 280 281 get = new Get(row); 282 for (int i = 0; i < NB_RETRIES; i++) { 283 if (i == NB_RETRIES - 1) { 284 fail("Waited too much time for del replication"); 285 } 286 Result res = htable2.get(get); 287 if (res.size() >= 1) { 288 LOG.info("Row not deleted"); 289 Thread.sleep(SLEEP_TIME); 290 } else { 291 break; 292 } 293 } 294 } 295 296 protected static void runSmallBatchTest() throws IOException, InterruptedException { 297 // normal Batch tests 298 loadData("", row); 299 300 Scan scan = new Scan(); 301 302 ResultScanner scanner1 = htable1.getScanner(scan); 303 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 304 scanner1.close(); 305 assertEquals(NB_ROWS_IN_BATCH, res1.length); 306 307 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); 308 } 309 310 @AfterClass 311 public static void tearDownAfterClass() throws Exception { 312 htable2.close(); 313 htable1.close(); 314 admin.close(); 315 utility2.shutdownMiniCluster(); 316 utility1.shutdownMiniCluster(); 317 } 318}