001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.regex.Pattern; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HBaseZKTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.StartTestingClusterOption; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.AsyncClusterConnection; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.master.MasterFileSystem; 049import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FutureUtils; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.ipc.RemoteException; 058import org.junit.jupiter.api.AfterAll; 059 060import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 061 062/** 063 * Sync replication test base without BeforeAll method. 064 * @see SyncReplicationTestBase 065 */ 066public class SyncReplicationTestBaseNoBeforeAll { 067 068 protected static final HBaseZKTestingUtil ZK_UTIL = new HBaseZKTestingUtil(); 069 070 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 071 072 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 073 074 protected static TableName TABLE_NAME = TableName.valueOf("SyncRep"); 075 076 protected static byte[] CF = Bytes.toBytes("cf"); 077 078 protected static byte[] CQ = Bytes.toBytes("cq"); 079 080 protected static String PEER_ID = "1"; 081 082 protected static Path REMOTE_WAL_DIR1; 083 084 protected static Path REMOTE_WAL_DIR2; 085 086 protected static void initTestingUtility(HBaseTestingUtil util, String zkParent) { 087 util.setZkCluster(ZK_UTIL.getZkCluster()); 088 Configuration conf = util.getConfiguration(); 089 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); 090 conf.setInt("replication.source.size.capacity", 102400); 091 conf.setLong("replication.source.sleepforretries", 100); 092 conf.setInt("hbase.regionserver.maxlogs", 10); 093 conf.setLong("hbase.master.logcleaner.ttl", 10); 094 conf.setInt("zookeeper.recovery.retry", 1); 095 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 096 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 097 conf.setInt("replication.stats.thread.period.seconds", 5); 098 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 099 conf.setLong("replication.sleep.before.failover", 2000); 100 conf.setInt("replication.source.maxretriesmultiplier", 10); 101 conf.setFloat("replication.source.ratio", 1.0f); 102 conf.setBoolean("replication.source.eof.autorecovery", true); 103 } 104 105 protected static void startClusters() throws Exception { 106 ZK_UTIL.startMiniZKCluster(); 107 initTestingUtility(UTIL1, "/cluster1"); 108 initTestingUtility(UTIL2, "/cluster2"); 109 StartTestingClusterOption option = 110 StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); 111 UTIL1.startMiniCluster(option); 112 UTIL2.startMiniCluster(option); 113 TableDescriptor td = 114 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 115 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 116 UTIL1.getAdmin().createTable(td); 117 UTIL2.getAdmin().createTable(td); 118 FileSystem fs1 = UTIL1.getTestFileSystem(); 119 FileSystem fs2 = UTIL2.getTestFileSystem(); 120 REMOTE_WAL_DIR1 = 121 new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), 122 "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); 123 REMOTE_WAL_DIR2 = 124 new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), 125 "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); 126 UTIL1.getAdmin().addReplicationPeer(PEER_ID, 127 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 128 .setReplicateAllUserTables(false) 129 .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) 130 .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build()); 131 UTIL2.getAdmin().addReplicationPeer(PEER_ID, 132 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI()) 133 .setReplicateAllUserTables(false) 134 .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) 135 .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build()); 136 } 137 138 private static void shutdown(HBaseTestingUtil util) throws Exception { 139 if (util.getHBaseCluster() == null) { 140 return; 141 } 142 Admin admin = util.getAdmin(); 143 if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) { 144 if ( 145 admin.getReplicationPeerSyncReplicationState(PEER_ID) 146 != SyncReplicationState.DOWNGRADE_ACTIVE 147 ) { 148 admin.transitReplicationPeerSyncReplicationState(PEER_ID, 149 SyncReplicationState.DOWNGRADE_ACTIVE); 150 } 151 admin.removeReplicationPeer(PEER_ID); 152 } 153 util.shutdownMiniCluster(); 154 } 155 156 @AfterAll 157 public static void tearDown() throws Exception { 158 shutdown(UTIL1); 159 shutdown(UTIL2); 160 ZK_UTIL.shutdownMiniZKCluster(); 161 } 162 163 protected final void write(HBaseTestingUtil util, int start, int end) throws IOException { 164 try (Table table = util.getConnection().getTable(TABLE_NAME)) { 165 for (int i = start; i < end; i++) { 166 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 167 } 168 } 169 } 170 171 protected final void verify(HBaseTestingUtil util, int start, int end) throws IOException { 172 try (Table table = util.getConnection().getTable(TABLE_NAME)) { 173 for (int i = start; i < end; i++) { 174 assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); 175 } 176 } 177 } 178 179 protected final void verifyThroughRegion(HBaseTestingUtil util, int start, int end) 180 throws IOException { 181 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 182 for (int i = start; i < end; i++) { 183 assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); 184 } 185 } 186 187 protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtil util, int start, int end) 188 throws IOException { 189 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 190 for (int i = start; i < end; i++) { 191 assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty()); 192 } 193 } 194 195 protected final void waitUntilReplicationDone(HBaseTestingUtil util, int end) throws Exception { 196 // The reject check is in RSRpcService so we can still read through HRegion 197 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 198 util.waitFor(30000, new ExplainingPredicate<Exception>() { 199 200 @Override 201 public boolean evaluate() throws Exception { 202 return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty(); 203 } 204 205 @Override 206 public String explainFailure() throws Exception { 207 return "Replication has not been catched up yet"; 208 } 209 }); 210 } 211 212 protected final void writeAndVerifyReplication(HBaseTestingUtil util1, HBaseTestingUtil util2, 213 int start, int end) throws Exception { 214 write(util1, start, end); 215 waitUntilReplicationDone(util2, end); 216 verifyThroughRegion(util2, start, end); 217 } 218 219 protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { 220 Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); 221 return getRemoteWALDir(remoteWALDir, peerId); 222 } 223 224 protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) { 225 return new Path(remoteWALDir, peerId); 226 } 227 228 protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) { 229 return new Path(remoteWALDir, peerId + "-replay"); 230 } 231 232 protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility) 233 throws Exception { 234 ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage( 235 utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration()); 236 try { 237 rps.getPeerSyncReplicationState(peerId); 238 fail("Should throw exception when get the sync replication state of a removed peer."); 239 } catch (ReplicationException e) { 240 // ignore. 241 } 242 try { 243 rps.getPeerNewSyncReplicationState(peerId); 244 fail("Should throw exception when get the new sync replication state of a removed peer"); 245 } catch (ReplicationException e) { 246 // ignore. 247 } 248 try (FileSystem fs = utility.getTestFileSystem()) { 249 assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId))); 250 assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId))); 251 } 252 } 253 254 private void assertRejection(Throwable error) { 255 assertThat(error, instanceOf(DoNotRetryIOException.class)); 256 assertTrue(error.getMessage().contains("Reject to apply to sink cluster")); 257 assertTrue(error.getMessage().contains(TABLE_NAME.toString())); 258 } 259 260 protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility, 261 boolean expectedRejection) throws Exception { 262 HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); 263 AsyncClusterConnection connection = regionServer.getAsyncClusterConnection(); 264 Entry[] entries = new Entry[10]; 265 for (int i = 0; i < entries.length; i++) { 266 entries[i] = 267 new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); 268 } 269 if (!expectedRejection) { 270 FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( 271 connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, 272 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); 273 } else { 274 try { 275 FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( 276 connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, 277 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); 278 fail("Should throw IOException when sync-replication state is in A or DA"); 279 } catch (RemoteException e) { 280 assertRejection(e.unwrapRemoteException()); 281 } catch (DoNotRetryIOException e) { 282 assertRejection(e); 283 } 284 } 285 } 286 287 protected final void waitUntilDeleted(HBaseTestingUtil util, Path remoteWAL) throws Exception { 288 MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem(); 289 util.waitFor(30000, new ExplainingPredicate<Exception>() { 290 291 @Override 292 public boolean evaluate() throws Exception { 293 return !mfs.getWALFileSystem().exists(remoteWAL); 294 } 295 296 @Override 297 public String explainFailure() throws Exception { 298 return remoteWAL + " has not been deleted yet"; 299 } 300 }); 301 } 302}