001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.Collections; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableState; 032import org.apache.hadoop.hbase.master.TableStateManager; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.replication.regionserver.Replication; 036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 042import org.junit.Before; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 048 049/** 050 * Testcase for HBASE-20147. 051 */ 052@Category({ ReplicationTests.class, MediumTests.class }) 053public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class); 058 059 @Before 060 public void setUp() throws IOException, StreamLacksCapabilityException { 061 setupWALWriter(); 062 } 063 064 // make sure that we will start replication for the sequence id after move, that's what we want to 065 // test here. 066 private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { 067 moveRegion(region, rs); 068 rollAllWALs(); 069 } 070 071 private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs) throws Exception { 072 Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName(); 073 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName()); 074 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 075 076 @Override 077 public boolean evaluate() throws Exception { 078 ReplicationSourceManager manager = 079 ((Replication) rs.getReplicationSourceService()).getReplicationManager(); 080 return manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1; 081 } 082 083 @Override 084 public String explainFailure() throws Exception { 085 return "Still not replicated to the current WAL file yet"; 086 } 087 }); 088 } 089 090 @Test 091 public void testAddPeer() throws Exception { 092 TableName tableName = createTable(); 093 try (Table table = UTIL.getConnection().getTable(tableName)) { 094 for (int i = 0; i < 100; i++) { 095 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 096 } 097 } 098 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 099 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 100 moveRegionAndArchiveOldWals(region, rs); 101 addPeer(true); 102 try (Table table = UTIL.getConnection().getTable(tableName)) { 103 for (int i = 0; i < 100; i++) { 104 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 105 } 106 } 107 waitUntilReplicationDone(100); 108 checkOrder(100); 109 } 110 111 @Test 112 public void testChangeToSerial() throws Exception { 113 ReplicationPeerConfig peerConfig = 114 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 115 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); 116 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 117 118 TableName tableName = createTable(); 119 try (Table table = UTIL.getConnection().getTable(tableName)) { 120 for (int i = 0; i < 100; i++) { 121 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 122 } 123 } 124 125 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 126 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 127 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 128 moveRegionAndArchiveOldWals(region, rs); 129 waitUntilReplicationDone(100); 130 waitUntilReplicatedToTheCurrentWALFile(srcRs); 131 132 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 133 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 134 ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build()); 135 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 136 137 try (Table table = UTIL.getConnection().getTable(tableName)) { 138 for (int i = 0; i < 100; i++) { 139 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 140 } 141 } 142 waitUntilReplicationDone(200); 143 checkOrder(200); 144 } 145 146 @Test 147 public void testAddToSerialPeer() throws Exception { 148 ReplicationPeerConfig peerConfig = 149 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 150 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) 151 .setReplicateAllUserTables(false).setSerial(true).build(); 152 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 153 154 TableName tableName = createTable(); 155 try (Table table = UTIL.getConnection().getTable(tableName)) { 156 for (int i = 0; i < 100; i++) { 157 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 158 } 159 } 160 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 161 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 162 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 163 moveRegionAndArchiveOldWals(region, rs); 164 waitUntilReplicatedToTheCurrentWALFile(rs); 165 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 166 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 167 ReplicationPeerConfig.newBuilder(peerConfig) 168 .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build()); 169 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 170 try (Table table = UTIL.getConnection().getTable(tableName)) { 171 for (int i = 0; i < 100; i++) { 172 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 173 } 174 } 175 waitUntilReplicationDone(100); 176 checkOrder(100); 177 } 178 179 @Test 180 public void testDisabledTable() throws Exception { 181 TableName tableName = createTable(); 182 try (Table table = UTIL.getConnection().getTable(tableName)) { 183 for (int i = 0; i < 100; i++) { 184 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 185 } 186 } 187 UTIL.getAdmin().disableTable(tableName); 188 rollAllWALs(); 189 addPeer(true); 190 UTIL.getAdmin().enableTable(tableName); 191 try (Table table = UTIL.getConnection().getTable(tableName)) { 192 for (int i = 0; i < 100; i++) { 193 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 194 } 195 } 196 waitUntilReplicationDone(100); 197 checkOrder(100); 198 } 199 200 @Test 201 public void testDisablingTable() throws Exception { 202 TableName tableName = createTable(); 203 try (Table table = UTIL.getConnection().getTable(tableName)) { 204 for (int i = 0; i < 100; i++) { 205 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 206 } 207 } 208 UTIL.getAdmin().disableTable(tableName); 209 rollAllWALs(); 210 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 211 tsm.setTableState(tableName, TableState.State.DISABLING); 212 Thread t = new Thread(() -> { 213 try { 214 addPeer(true); 215 } catch (IOException e) { 216 throw new RuntimeException(e); 217 } 218 }); 219 t.start(); 220 Thread.sleep(5000); 221 // we will wait on the disabling table so the thread should still be alive. 222 assertTrue(t.isAlive()); 223 tsm.setTableState(tableName, TableState.State.DISABLED); 224 t.join(); 225 UTIL.getAdmin().enableTable(tableName); 226 try (Table table = UTIL.getConnection().getTable(tableName)) { 227 for (int i = 0; i < 100; i++) { 228 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 229 } 230 } 231 waitUntilReplicationDone(100); 232 checkOrder(100); 233 } 234 235 @Test 236 public void testEnablingTable() throws Exception { 237 TableName tableName = createTable(); 238 try (Table table = UTIL.getConnection().getTable(tableName)) { 239 for (int i = 0; i < 100; i++) { 240 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 241 } 242 } 243 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 244 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 245 moveRegionAndArchiveOldWals(region, rs); 246 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 247 tsm.setTableState(tableName, TableState.State.ENABLING); 248 Thread t = new Thread(() -> { 249 try { 250 addPeer(true); 251 } catch (IOException e) { 252 throw new RuntimeException(e); 253 } 254 }); 255 t.start(); 256 Thread.sleep(5000); 257 // we will wait on the disabling table so the thread should still be alive. 258 assertTrue(t.isAlive()); 259 tsm.setTableState(tableName, TableState.State.ENABLED); 260 t.join(); 261 try (Table table = UTIL.getConnection().getTable(tableName)) { 262 for (int i = 0; i < 100; i++) { 263 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 264 } 265 } 266 waitUntilReplicationDone(100); 267 checkOrder(100); 268 } 269}