001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.regex.Pattern; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HBaseZKTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.StartTestingClusterOption; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.AsyncClusterConnection; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.master.MasterFileSystem; 049import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FutureUtils; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.ipc.RemoteException; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 062 063/** 064 * Base class for testing sync replication. 065 */ 066public class SyncReplicationTestBase { 067 068 protected static final HBaseZKTestingUtil ZK_UTIL = new HBaseZKTestingUtil(); 069 070 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 071 072 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 073 074 protected static TableName TABLE_NAME = TableName.valueOf("SyncRep"); 075 076 protected static byte[] CF = Bytes.toBytes("cf"); 077 078 protected static byte[] CQ = Bytes.toBytes("cq"); 079 080 protected static String PEER_ID = "1"; 081 082 protected static Path REMOTE_WAL_DIR1; 083 084 protected static Path REMOTE_WAL_DIR2; 085 086 protected static void initTestingUtility(HBaseTestingUtil util, String zkParent) { 087 util.setZkCluster(ZK_UTIL.getZkCluster()); 088 Configuration conf = util.getConfiguration(); 089 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); 090 conf.setInt("replication.source.size.capacity", 102400); 091 conf.setLong("replication.source.sleepforretries", 100); 092 conf.setInt("hbase.regionserver.maxlogs", 10); 093 conf.setLong("hbase.master.logcleaner.ttl", 10); 094 conf.setInt("zookeeper.recovery.retry", 1); 095 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 096 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 097 conf.setInt("replication.stats.thread.period.seconds", 5); 098 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 099 conf.setLong("replication.sleep.before.failover", 2000); 100 conf.setInt("replication.source.maxretriesmultiplier", 10); 101 conf.setFloat("replication.source.ratio", 1.0f); 102 conf.setBoolean("replication.source.eof.autorecovery", true); 103 } 104 105 @BeforeClass 106 public static void setUp() throws Exception { 107 ZK_UTIL.startMiniZKCluster(); 108 initTestingUtility(UTIL1, "/cluster1"); 109 initTestingUtility(UTIL2, "/cluster2"); 110 StartTestingClusterOption option = 111 StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); 112 UTIL1.startMiniCluster(option); 113 UTIL2.startMiniCluster(option); 114 TableDescriptor td = 115 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 116 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 117 UTIL1.getAdmin().createTable(td); 118 UTIL2.getAdmin().createTable(td); 119 FileSystem fs1 = UTIL1.getTestFileSystem(); 120 FileSystem fs2 = UTIL2.getTestFileSystem(); 121 REMOTE_WAL_DIR1 = 122 new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), 123 "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); 124 REMOTE_WAL_DIR2 = 125 new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), 126 "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); 127 UTIL1.getAdmin().addReplicationPeer(PEER_ID, 128 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 129 .setReplicateAllUserTables(false) 130 .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) 131 .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build()); 132 UTIL2.getAdmin().addReplicationPeer(PEER_ID, 133 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI()) 134 .setReplicateAllUserTables(false) 135 .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) 136 .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build()); 137 } 138 139 private static void shutdown(HBaseTestingUtil util) throws Exception { 140 if (util.getHBaseCluster() == null) { 141 return; 142 } 143 Admin admin = util.getAdmin(); 144 if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) { 145 if ( 146 admin.getReplicationPeerSyncReplicationState(PEER_ID) 147 != SyncReplicationState.DOWNGRADE_ACTIVE 148 ) { 149 admin.transitReplicationPeerSyncReplicationState(PEER_ID, 150 SyncReplicationState.DOWNGRADE_ACTIVE); 151 } 152 admin.removeReplicationPeer(PEER_ID); 153 } 154 util.shutdownMiniCluster(); 155 } 156 157 @AfterClass 158 public static void tearDown() throws Exception { 159 shutdown(UTIL1); 160 shutdown(UTIL2); 161 ZK_UTIL.shutdownMiniZKCluster(); 162 } 163 164 protected final void write(HBaseTestingUtil util, int start, int end) throws IOException { 165 try (Table table = util.getConnection().getTable(TABLE_NAME)) { 166 for (int i = start; i < end; i++) { 167 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 168 } 169 } 170 } 171 172 protected final void verify(HBaseTestingUtil util, int start, int end) throws IOException { 173 try (Table table = util.getConnection().getTable(TABLE_NAME)) { 174 for (int i = start; i < end; i++) { 175 assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); 176 } 177 } 178 } 179 180 protected final void verifyThroughRegion(HBaseTestingUtil util, int start, int end) 181 throws IOException { 182 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 183 for (int i = start; i < end; i++) { 184 assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); 185 } 186 } 187 188 protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtil util, int start, int end) 189 throws IOException { 190 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 191 for (int i = start; i < end; i++) { 192 assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty()); 193 } 194 } 195 196 protected final void waitUntilReplicationDone(HBaseTestingUtil util, int end) throws Exception { 197 // The reject check is in RSRpcService so we can still read through HRegion 198 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 199 util.waitFor(30000, new ExplainingPredicate<Exception>() { 200 201 @Override 202 public boolean evaluate() throws Exception { 203 return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty(); 204 } 205 206 @Override 207 public String explainFailure() throws Exception { 208 return "Replication has not been catched up yet"; 209 } 210 }); 211 } 212 213 protected final void writeAndVerifyReplication(HBaseTestingUtil util1, HBaseTestingUtil util2, 214 int start, int end) throws Exception { 215 write(util1, start, end); 216 waitUntilReplicationDone(util2, end); 217 verifyThroughRegion(util2, start, end); 218 } 219 220 protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { 221 Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); 222 return getRemoteWALDir(remoteWALDir, peerId); 223 } 224 225 protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) { 226 return new Path(remoteWALDir, peerId); 227 } 228 229 protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) { 230 return new Path(remoteWALDir, peerId + "-replay"); 231 } 232 233 protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility) 234 throws Exception { 235 ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage( 236 utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration()); 237 try { 238 rps.getPeerSyncReplicationState(peerId); 239 fail("Should throw exception when get the sync replication state of a removed peer."); 240 } catch (ReplicationException e) { 241 // ignore. 242 } 243 try { 244 rps.getPeerNewSyncReplicationState(peerId); 245 fail("Should throw exception when get the new sync replication state of a removed peer"); 246 } catch (ReplicationException e) { 247 // ignore. 248 } 249 try (FileSystem fs = utility.getTestFileSystem()) { 250 assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId))); 251 assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId))); 252 } 253 } 254 255 private void assertRejection(Throwable error) { 256 assertThat(error, instanceOf(DoNotRetryIOException.class)); 257 assertTrue(error.getMessage().contains("Reject to apply to sink cluster")); 258 assertTrue(error.getMessage().contains(TABLE_NAME.toString())); 259 } 260 261 protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility, 262 boolean expectedRejection) throws Exception { 263 HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); 264 AsyncClusterConnection connection = regionServer.getAsyncClusterConnection(); 265 Entry[] entries = new Entry[10]; 266 for (int i = 0; i < entries.length; i++) { 267 entries[i] = 268 new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); 269 } 270 if (!expectedRejection) { 271 FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( 272 connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, 273 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); 274 } else { 275 try { 276 FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( 277 connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, 278 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); 279 fail("Should throw IOException when sync-replication state is in A or DA"); 280 } catch (RemoteException e) { 281 assertRejection(e.unwrapRemoteException()); 282 } catch (DoNotRetryIOException e) { 283 assertRejection(e); 284 } 285 } 286 } 287 288 protected final void waitUntilDeleted(HBaseTestingUtil util, Path remoteWAL) throws Exception { 289 MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem(); 290 util.waitFor(30000, new ExplainingPredicate<Exception>() { 291 292 @Override 293 public boolean evaluate() throws Exception { 294 return !mfs.getWALFileSystem().exists(remoteWAL); 295 } 296 297 @Override 298 public String explainFailure() throws Exception { 299 return remoteWAL + " has not been deleted yet"; 300 } 301 }); 302 } 303}