001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.Collections; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.client.Table; 030import org.apache.hadoop.hbase.client.TableState; 031import org.apache.hadoop.hbase.master.TableStateManager; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 034import org.apache.hadoop.hbase.replication.regionserver.Replication; 035import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.ReplicationTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 041import org.junit.jupiter.api.BeforeEach; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044 045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 046 047/** 048 * Testcase for HBASE-20147. 049 */ 050@Tag(ReplicationTests.TAG) 051@Tag(LargeTests.TAG) 052public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { 053 054 @BeforeEach 055 public void setUp() throws IOException, StreamLacksCapabilityException { 056 setupWALWriter(); 057 } 058 059 // make sure that we will start replication for the sequence id after move, that's what we want to 060 // test here. 061 private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { 062 moveRegion(region, rs); 063 rollAllWALs(); 064 } 065 066 private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs, final String oldWalName) 067 throws Exception { 068 Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName(); 069 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName()); 070 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 071 072 @Override 073 public boolean evaluate() throws Exception { 074 ReplicationSourceManager manager = 075 ((Replication) rs.getReplicationSourceService()).getReplicationManager(); 076 // Make sure replication moves to the new file. 077 ReplicationQueueId queueId = new ReplicationQueueId(rs.getServerName(), PEER_ID); 078 return (manager.getWALs().get(queueId).get(logPrefix).size() == 1) 079 && !oldWalName.equals(manager.getWALs().get(queueId).get(logPrefix).first()); 080 } 081 082 @Override 083 public String explainFailure() throws Exception { 084 return "Still not replicated to the current WAL file yet"; 085 } 086 }); 087 } 088 089 @Test 090 public void testAddPeer() throws Exception { 091 TableName tableName = createTable(); 092 try (Table table = UTIL.getConnection().getTable(tableName)) { 093 for (int i = 0; i < 100; i++) { 094 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 095 } 096 } 097 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 098 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 099 moveRegionAndArchiveOldWals(region, rs); 100 addPeer(true); 101 try (Table table = UTIL.getConnection().getTable(tableName)) { 102 for (int i = 0; i < 100; i++) { 103 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 104 } 105 } 106 waitUntilReplicationDone(100); 107 checkOrder(100); 108 } 109 110 @Test 111 public void testChangeToSerial() throws Exception { 112 ReplicationPeerConfig peerConfig = 113 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 114 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); 115 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 116 117 TableName tableName = createTable(); 118 try (Table table = UTIL.getConnection().getTable(tableName)) { 119 for (int i = 0; i < 100; i++) { 120 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 121 } 122 } 123 124 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 125 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 126 // Get the current wal file name 127 String walFileNameBeforeRollover = 128 ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName(); 129 130 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 131 moveRegionAndArchiveOldWals(region, rs); 132 waitUntilReplicationDone(100); 133 waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover); 134 135 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 136 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 137 ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build()); 138 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 139 140 try (Table table = UTIL.getConnection().getTable(tableName)) { 141 for (int i = 0; i < 100; i++) { 142 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 143 } 144 } 145 waitUntilReplicationDone(200); 146 checkOrder(200); 147 } 148 149 @Test 150 public void testAddToSerialPeer() throws Exception { 151 ReplicationPeerConfig peerConfig = 152 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 153 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) 154 .setReplicateAllUserTables(false).setSerial(true).build(); 155 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 156 157 TableName tableName = createTable(); 158 try (Table table = UTIL.getConnection().getTable(tableName)) { 159 for (int i = 0; i < 100; i++) { 160 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 161 } 162 } 163 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 164 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 165 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 166 167 // Get the current wal file name 168 String walFileNameBeforeRollover = 169 ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName(); 170 171 moveRegionAndArchiveOldWals(region, rs); 172 173 // Make sure that the replication done for the oldWal at source rs. 174 waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover); 175 176 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 177 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 178 ReplicationPeerConfig.newBuilder(peerConfig) 179 .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build()); 180 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 181 try (Table table = UTIL.getConnection().getTable(tableName)) { 182 for (int i = 0; i < 100; i++) { 183 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 184 } 185 } 186 waitUntilReplicationDone(100); 187 checkOrder(100); 188 } 189 190 @Test 191 public void testDisabledTable() throws Exception { 192 TableName tableName = createTable(); 193 try (Table table = UTIL.getConnection().getTable(tableName)) { 194 for (int i = 0; i < 100; i++) { 195 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 196 } 197 } 198 UTIL.getAdmin().disableTable(tableName); 199 rollAllWALs(); 200 addPeer(true); 201 UTIL.getAdmin().enableTable(tableName); 202 try (Table table = UTIL.getConnection().getTable(tableName)) { 203 for (int i = 0; i < 100; i++) { 204 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 205 } 206 } 207 waitUntilReplicationDone(100); 208 checkOrder(100); 209 } 210 211 @Test 212 public void testDisablingTable() throws Exception { 213 TableName tableName = createTable(); 214 try (Table table = UTIL.getConnection().getTable(tableName)) { 215 for (int i = 0; i < 100; i++) { 216 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 217 } 218 } 219 UTIL.getAdmin().disableTable(tableName); 220 rollAllWALs(); 221 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 222 tsm.setTableState(tableName, TableState.State.DISABLING); 223 Thread t = new Thread(() -> { 224 try { 225 addPeer(true); 226 } catch (IOException e) { 227 throw new RuntimeException(e); 228 } 229 }); 230 t.start(); 231 Thread.sleep(5000); 232 // we will wait on the disabling table so the thread should still be alive. 233 assertTrue(t.isAlive()); 234 tsm.setTableState(tableName, TableState.State.DISABLED); 235 t.join(); 236 UTIL.getAdmin().enableTable(tableName); 237 try (Table table = UTIL.getConnection().getTable(tableName)) { 238 for (int i = 0; i < 100; i++) { 239 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 240 } 241 } 242 waitUntilReplicationDone(100); 243 checkOrder(100); 244 } 245 246 @Test 247 public void testEnablingTable() throws Exception { 248 TableName tableName = createTable(); 249 try (Table table = UTIL.getConnection().getTable(tableName)) { 250 for (int i = 0; i < 100; i++) { 251 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 252 } 253 } 254 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 255 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 256 moveRegionAndArchiveOldWals(region, rs); 257 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 258 tsm.setTableState(tableName, TableState.State.ENABLING); 259 Thread t = new Thread(() -> { 260 try { 261 addPeer(true); 262 } catch (IOException e) { 263 throw new RuntimeException(e); 264 } 265 }); 266 t.start(); 267 Thread.sleep(5000); 268 // we will wait on the disabling table so the thread should still be alive. 269 assertTrue(t.isAlive()); 270 tsm.setTableState(tableName, TableState.State.ENABLED); 271 t.join(); 272 try (Table table = UTIL.getConnection().getTable(tableName)) { 273 for (int i = 0; i < 100; i++) { 274 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 275 } 276 } 277 waitUntilReplicationDone(100); 278 checkOrder(100); 279 } 280}