001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.stream.Stream; 026import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 027import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 028import org.apache.hadoop.hbase.Waiter; 029import org.apache.hadoop.hbase.client.Get; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.junit.jupiter.api.BeforeEach; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.TestTemplate; 041import org.junit.jupiter.params.provider.Arguments; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Test handling of changes to the number of a peer's regionservers. 047 */ 048@Tag(ReplicationTests.TAG) 049@Tag(LargeTests.TAG) 050@HBaseParameterizedTestTemplate(name = "{index}: serialPeer={0}, syncPeer={1}") 051public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { 052 053 private static final Logger LOG = 054 LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class); 055 056 private boolean serialPeer; 057 058 private boolean syncPeer; 059 060 public TestReplicationChangingPeerRegionservers(boolean serialPeer, boolean syncPeer) { 061 this.serialPeer = serialPeer; 062 this.syncPeer = syncPeer; 063 } 064 065 @Override 066 protected boolean isSerialPeer() { 067 return serialPeer; 068 } 069 070 @Override 071 protected boolean isSyncPeer() { 072 return syncPeer; 073 } 074 075 public static Stream<Arguments> parameters() { 076 return Stream.of(Arguments.of(false, false), Arguments.of(false, true), 077 Arguments.of(true, false), Arguments.of(true, true)); 078 } 079 080 @BeforeEach 081 public void setUp() throws Exception { 082 // Starting and stopping replication can make us miss new logs, 083 // rolling like this makes sure the most recent one gets added to the queue 084 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 085 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 086 } 087 UTIL1.deleteTableData(tableName); 088 // truncating the table will send one Delete per row to the slave cluster 089 // in an async fashion, which is why we cannot just call deleteTableData on 090 // utility2 since late writes could make it to the slave in some way. 091 // Instead, we truncate the first table and wait for all the Deletes to 092 // make it to the slave. 093 Scan scan = new Scan(); 094 int lastCount = 0; 095 for (int i = 0; i < NB_RETRIES; i++) { 096 if (i == NB_RETRIES - 1) { 097 fail("Waited too much time for truncate"); 098 } 099 ResultScanner scanner = htable2.getScanner(scan); 100 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 101 scanner.close(); 102 if (res.length != 0) { 103 if (res.length < lastCount) { 104 i--; // Don't increment timeout if we make progress 105 } 106 lastCount = res.length; 107 LOG.info("Still got " + res.length + " rows"); 108 Thread.sleep(SLEEP_TIME); 109 } else { 110 break; 111 } 112 } 113 } 114 115 @TestTemplate 116 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { 117 LOG.info("testSimplePutDelete"); 118 SingleProcessHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster(); 119 // This test wants two RS's up. We only run one generally so add one. 120 peerCluster.startRegionServer(); 121 Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() { 122 @Override 123 public boolean evaluate() throws Exception { 124 return peerCluster.getLiveRegionServerThreads().size() > 1; 125 } 126 }); 127 int numRS = peerCluster.getRegionServerThreads().size(); 128 129 doPutTest(Bytes.toBytes(1)); 130 131 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; 132 peerCluster.stopRegionServer(rsToStop); 133 peerCluster.waitOnRegionServer(rsToStop); 134 135 // Sanity check 136 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); 137 138 doPutTest(Bytes.toBytes(2)); 139 140 peerCluster.startRegionServer(); 141 142 // Sanity check 143 assertEquals(numRS, peerCluster.getRegionServerThreads().size()); 144 145 doPutTest(Bytes.toBytes(3)); 146 } 147 148 private void doPutTest(byte[] row) throws IOException, InterruptedException { 149 Put put = new Put(row); 150 put.addColumn(famName, row, row); 151 152 if (htable1 == null) { 153 htable1 = UTIL1.getConnection().getTable(tableName); 154 } 155 156 htable1.put(put); 157 158 Get get = new Get(row); 159 for (int i = 0; i < NB_RETRIES; i++) { 160 if (i == NB_RETRIES - 1) { 161 fail("Waited too much time for put replication"); 162 } 163 Result res = htable2.get(get); 164 if (res.isEmpty()) { 165 LOG.info("Row not available"); 166 Thread.sleep(SLEEP_TIME); 167 } else { 168 assertArrayEquals(res.value(), row); 169 break; 170 } 171 } 172 } 173}