001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.List; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.MiniHBaseCluster; 028import org.apache.hadoop.hbase.Waiter; 029import org.apache.hadoop.hbase.client.Get; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.junit.Before; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.runner.RunWith; 043import org.junit.runners.Parameterized; 044import org.junit.runners.Parameterized.Parameter; 045import org.junit.runners.Parameterized.Parameters; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 050 051/** 052 * Test handling of changes to the number of a peer's regionservers. 053 */ 054@RunWith(Parameterized.class) 055@Category({ ReplicationTests.class, LargeTests.class }) 056public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class); 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class); 064 065 @SuppressWarnings("checkstyle:VisibilityModifier") 066 @Parameter 067 public boolean serialPeer; 068 069 @Override 070 protected boolean isSerialPeer() { 071 return serialPeer; 072 } 073 074 @Parameters(name = "{index}: serialPeer={0}") 075 public static List<Boolean> parameters() { 076 return ImmutableList.of(true, false); 077 } 078 079 @Before 080 public void setUp() throws Exception { 081 // Starting and stopping replication can make us miss new logs, 082 // rolling like this makes sure the most recent one gets added to the queue 083 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 084 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 085 } 086 UTIL1.deleteTableData(tableName); 087 // truncating the table will send one Delete per row to the slave cluster 088 // in an async fashion, which is why we cannot just call deleteTableData on 089 // utility2 since late writes could make it to the slave in some way. 090 // Instead, we truncate the first table and wait for all the Deletes to 091 // make it to the slave. 092 Scan scan = new Scan(); 093 int lastCount = 0; 094 for (int i = 0; i < NB_RETRIES; i++) { 095 if (i == NB_RETRIES - 1) { 096 fail("Waited too much time for truncate"); 097 } 098 ResultScanner scanner = htable2.getScanner(scan); 099 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 100 scanner.close(); 101 if (res.length != 0) { 102 if (res.length < lastCount) { 103 i--; // Don't increment timeout if we make progress 104 } 105 lastCount = res.length; 106 LOG.info("Still got " + res.length + " rows"); 107 Thread.sleep(SLEEP_TIME); 108 } else { 109 break; 110 } 111 } 112 } 113 114 @Test 115 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { 116 LOG.info("testSimplePutDelete"); 117 MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster(); 118 // This test wants two RS's up. We only run one generally so add one. 119 peerCluster.startRegionServer(); 120 Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() { 121 @Override 122 public boolean evaluate() throws Exception { 123 return peerCluster.getLiveRegionServerThreads().size() > 1; 124 } 125 }); 126 int numRS = peerCluster.getRegionServerThreads().size(); 127 128 doPutTest(Bytes.toBytes(1)); 129 130 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; 131 peerCluster.stopRegionServer(rsToStop); 132 peerCluster.waitOnRegionServer(rsToStop); 133 134 // Sanity check 135 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); 136 137 doPutTest(Bytes.toBytes(2)); 138 139 peerCluster.startRegionServer(); 140 141 // Sanity check 142 assertEquals(numRS, peerCluster.getRegionServerThreads().size()); 143 144 doPutTest(Bytes.toBytes(3)); 145 } 146 147 private void doPutTest(byte[] row) throws IOException, InterruptedException { 148 Put put = new Put(row); 149 put.addColumn(famName, row, row); 150 151 if (htable1 == null) { 152 htable1 = UTIL1.getConnection().getTable(tableName); 153 } 154 155 htable1.put(put); 156 157 Get get = new Get(row); 158 for (int i = 0; i < NB_RETRIES; i++) { 159 if (i == NB_RETRIES - 1) { 160 fail("Waited too much time for put replication"); 161 } 162 Result res = htable2.get(get); 163 if (res.isEmpty()) { 164 LOG.info("Row not available"); 165 Thread.sleep(SLEEP_TIME); 166 } else { 167 assertArrayEquals(res.value(), row); 168 break; 169 } 170 } 171 } 172}