001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.Collections; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableState; 032import org.apache.hadoop.hbase.master.TableStateManager; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.replication.regionserver.Replication; 036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 042import org.junit.Before; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 048 049/** 050 * Testcase for HBASE-20147. 051 */ 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class); 058 059 @Before 060 public void setUp() throws IOException, StreamLacksCapabilityException { 061 setupWALWriter(); 062 } 063 064 // make sure that we will start replication for the sequence id after move, that's what we want to 065 // test here. 066 private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { 067 moveRegion(region, rs); 068 rollAllWALs(); 069 } 070 071 private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs, final String oldWalName) 072 throws Exception { 073 Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName(); 074 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName()); 075 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 076 077 @Override 078 public boolean evaluate() throws Exception { 079 ReplicationSourceManager manager = 080 ((Replication) rs.getReplicationSourceService()).getReplicationManager(); 081 // Make sure replication moves to the new file. 082 return (manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1) 083 && !oldWalName.equals(manager.getWALs().get(PEER_ID).get(logPrefix).first()); 084 } 085 086 @Override 087 public String explainFailure() throws Exception { 088 return "Still not replicated to the current WAL file yet"; 089 } 090 }); 091 } 092 093 @Test 094 public void testAddPeer() throws Exception { 095 TableName tableName = createTable(); 096 try (Table table = UTIL.getConnection().getTable(tableName)) { 097 for (int i = 0; i < 100; i++) { 098 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 099 } 100 } 101 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 102 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 103 moveRegionAndArchiveOldWals(region, rs); 104 addPeer(true); 105 try (Table table = UTIL.getConnection().getTable(tableName)) { 106 for (int i = 0; i < 100; i++) { 107 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 108 } 109 } 110 waitUntilReplicationDone(100); 111 checkOrder(100); 112 } 113 114 @Test 115 public void testChangeToSerial() throws Exception { 116 ReplicationPeerConfig peerConfig = 117 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 118 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); 119 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 120 121 TableName tableName = createTable(); 122 try (Table table = UTIL.getConnection().getTable(tableName)) { 123 for (int i = 0; i < 100; i++) { 124 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 125 } 126 } 127 128 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 129 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 130 // Get the current wal file name 131 String walFileNameBeforeRollover = 132 ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName(); 133 134 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 135 moveRegionAndArchiveOldWals(region, rs); 136 waitUntilReplicationDone(100); 137 waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover); 138 139 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 140 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 141 ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build()); 142 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 143 144 try (Table table = UTIL.getConnection().getTable(tableName)) { 145 for (int i = 0; i < 100; i++) { 146 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 147 } 148 } 149 waitUntilReplicationDone(200); 150 checkOrder(200); 151 } 152 153 @Test 154 public void testAddToSerialPeer() throws Exception { 155 ReplicationPeerConfig peerConfig = 156 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 157 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) 158 .setReplicateAllUserTables(false).setSerial(true).build(); 159 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 160 161 TableName tableName = createTable(); 162 try (Table table = UTIL.getConnection().getTable(tableName)) { 163 for (int i = 0; i < 100; i++) { 164 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 165 } 166 } 167 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 168 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 169 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 170 171 // Get the current wal file name 172 String walFileNameBeforeRollover = 173 ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName(); 174 175 moveRegionAndArchiveOldWals(region, rs); 176 177 // Make sure that the replication done for the oldWal at source rs. 178 waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover); 179 180 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 181 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 182 ReplicationPeerConfig.newBuilder(peerConfig) 183 .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build()); 184 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 185 try (Table table = UTIL.getConnection().getTable(tableName)) { 186 for (int i = 0; i < 100; i++) { 187 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 188 } 189 } 190 waitUntilReplicationDone(100); 191 checkOrder(100); 192 } 193 194 @Test 195 public void testDisabledTable() throws Exception { 196 TableName tableName = createTable(); 197 try (Table table = UTIL.getConnection().getTable(tableName)) { 198 for (int i = 0; i < 100; i++) { 199 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 200 } 201 } 202 UTIL.getAdmin().disableTable(tableName); 203 rollAllWALs(); 204 addPeer(true); 205 UTIL.getAdmin().enableTable(tableName); 206 try (Table table = UTIL.getConnection().getTable(tableName)) { 207 for (int i = 0; i < 100; i++) { 208 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 209 } 210 } 211 waitUntilReplicationDone(100); 212 checkOrder(100); 213 } 214 215 @Test 216 public void testDisablingTable() throws Exception { 217 TableName tableName = createTable(); 218 try (Table table = UTIL.getConnection().getTable(tableName)) { 219 for (int i = 0; i < 100; i++) { 220 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 221 } 222 } 223 UTIL.getAdmin().disableTable(tableName); 224 rollAllWALs(); 225 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 226 tsm.setTableState(tableName, TableState.State.DISABLING); 227 Thread t = new Thread(() -> { 228 try { 229 addPeer(true); 230 } catch (IOException e) { 231 throw new RuntimeException(e); 232 } 233 }); 234 t.start(); 235 Thread.sleep(5000); 236 // we will wait on the disabling table so the thread should still be alive. 237 assertTrue(t.isAlive()); 238 tsm.setTableState(tableName, TableState.State.DISABLED); 239 t.join(); 240 UTIL.getAdmin().enableTable(tableName); 241 try (Table table = UTIL.getConnection().getTable(tableName)) { 242 for (int i = 0; i < 100; i++) { 243 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 244 } 245 } 246 waitUntilReplicationDone(100); 247 checkOrder(100); 248 } 249 250 @Test 251 public void testEnablingTable() throws Exception { 252 TableName tableName = createTable(); 253 try (Table table = UTIL.getConnection().getTable(tableName)) { 254 for (int i = 0; i < 100; i++) { 255 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 256 } 257 } 258 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 259 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 260 moveRegionAndArchiveOldWals(region, rs); 261 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 262 tsm.setTableState(tableName, TableState.State.ENABLING); 263 Thread t = new Thread(() -> { 264 try { 265 addPeer(true); 266 } catch (IOException e) { 267 throw new RuntimeException(e); 268 } 269 }); 270 t.start(); 271 Thread.sleep(5000); 272 // we will wait on the disabling table so the thread should still be alive. 273 assertTrue(t.isAlive()); 274 tsm.setTableState(tableName, TableState.State.ENABLED); 275 t.join(); 276 try (Table table = UTIL.getConnection().getTable(tableName)) { 277 for (int i = 0; i < 100; i++) { 278 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 279 } 280 } 281 waitUntilReplicationDone(100); 282 checkOrder(100); 283 } 284}