001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; 021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; 022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; 024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; 025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; 026import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 027import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 028import static org.junit.jupiter.api.Assertions.assertEquals; 029import static org.junit.jupiter.api.Assertions.assertFalse; 030import static org.junit.jupiter.api.Assertions.assertTrue; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.List; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.ReplicationTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.wal.WAL; 055import org.junit.jupiter.api.AfterAll; 056import org.junit.jupiter.api.BeforeAll; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * This test creates 2 mini hbase cluster. One cluster with 064 * "hbase.regionserver.replication.marker.enabled" conf key. This will create 065 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create 066 * marker rows to be replicated to sink cluster. Second cluster with 067 * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the 068 * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table. 069 **/ 070@Tag(ReplicationTests.TAG) 071@Tag(MediumTests.TAG) 072public class TestReplicationMarker { 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class); 075 076 private static Configuration conf1; 077 private static Configuration conf2; 078 private static HBaseTestingUtil utility1; 079 private static HBaseTestingUtil utility2; 080 081 @BeforeAll 082 public static void setUpBeforeClass() throws Exception { 083 conf1 = HBaseConfiguration.create(); 084 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 085 conf2 = new Configuration(conf1); 086 // Run the replication marker chore in cluster1. 087 conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); 088 conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec 089 utility1 = new HBaseTestingUtil(conf1); 090 091 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 092 // Enable the replication sink tracker for cluster 2 093 conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true); 094 utility2 = new HBaseTestingUtil(conf2); 095 096 // Start cluster 2 first so that hbase:replicationsinktracker table gets created first. 097 utility2.startMiniCluster(1); 098 waitForReplicationTrackerTableCreation(); 099 100 // Start cluster1 101 utility1.startMiniCluster(1); 102 Admin admin1 = utility1.getAdmin(); 103 ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder(); 104 rpcBuilder.setClusterKey(utility2.getRpcConnnectionURI()); 105 admin1.addReplicationPeer("1", rpcBuilder.build()); 106 107 ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) 108 .getReplicationSourceService().getReplicationManager(); 109 // Wait until the peer gets established. 110 Waiter.waitFor(conf1, 10000, () -> manager.getSources().size() == 1); 111 } 112 113 private static void waitForReplicationTrackerTableCreation() { 114 Waiter.waitFor(conf2, 10000, 115 () -> utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); 116 } 117 118 @AfterAll 119 public static void tearDown() throws Exception { 120 utility1.shutdownMiniCluster(); 121 utility2.shutdownMiniCluster(); 122 } 123 124 @Test 125 public void testReplicationMarkerRow() throws Exception { 126 // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will 127 // create enough sentinel rows. 128 Thread.sleep(5000); 129 WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null); 130 String walName1ForCluster1 = ((AbstractFSWAL<?>) wal1).getCurrentFileName().getName(); 131 String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname(); 132 // Since we sync the marker edits while appending to wal, all the edits should be visible 133 // to Replication threads immediately. 134 assertTrue(getReplicatedEntries() >= 5); 135 // Force log roll. 136 wal1.rollWriter(true); 137 String walName2ForCluster1 = ((AbstractFSWAL<?>) wal1).getCurrentFileName().getName(); 138 Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection(); 139 // Sleep for 5 more seconds to get marker rows with new wal name. 140 Thread.sleep(5000); 141 // Wait for cluster 2 to have atleast 8 tracker rows from cluster1. 142 utility2.waitFor(5000, () -> getTableCount(connection2) >= 8); 143 // Get replication marker rows from cluster2 144 List<ReplicationSinkTrackerRow> list = getRows(connection2); 145 for (ReplicationSinkTrackerRow desc : list) { 146 // All the tracker rows should have same region server name i.e. rs of cluster1 147 assertEquals(rs1Name, desc.getRegionServerName()); 148 // All the tracker rows will have either wal1 or wal2 name. 149 assertTrue(walName1ForCluster1.equals(desc.getWalName()) 150 || walName2ForCluster1.equals(desc.getWalName())); 151 } 152 153 // This table shouldn't exist on cluster1 since 154 // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster. 155 assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); 156 // This table shouldn't exist on cluster1 since 157 // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster. 158 assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); 159 } 160 161 /* 162 * Get rows for replication sink tracker table. 163 */ 164 private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException { 165 List<ReplicationSinkTrackerRow> list = new ArrayList<>(); 166 Scan scan = new Scan(); 167 Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); 168 ResultScanner scanner = table.getScanner(scan); 169 170 Result r; 171 while ((r = scanner.next()) != null) { 172 List<Cell> cells = r.listCells(); 173 list.add(getPayload(cells)); 174 } 175 return list; 176 } 177 178 private ReplicationSinkTrackerRow getPayload(List<Cell> cells) { 179 String rsName = null, walName = null; 180 Long offset = null; 181 long timestamp = 0L; 182 for (Cell cell : cells) { 183 byte[] qualifier = CellUtil.cloneQualifier(cell); 184 byte[] value = CellUtil.cloneValue(cell); 185 186 if (Bytes.equals(RS_COLUMN, qualifier)) { 187 rsName = Bytes.toString(value); 188 } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) { 189 walName = Bytes.toString(value); 190 } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) { 191 timestamp = Bytes.toLong(value); 192 } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) { 193 offset = Bytes.toLong(value); 194 } 195 } 196 ReplicationSinkTrackerRow row = 197 new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset); 198 return row; 199 } 200 201 static class ReplicationSinkTrackerRow { 202 private String region_server_name; 203 private String wal_name; 204 private long timestamp; 205 private long offset; 206 207 public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp, 208 long offset) { 209 this.region_server_name = region_server_name; 210 this.wal_name = wal_name; 211 this.timestamp = timestamp; 212 this.offset = offset; 213 } 214 215 public String getRegionServerName() { 216 return region_server_name; 217 } 218 219 public String getWalName() { 220 return wal_name; 221 } 222 223 public long getTimestamp() { 224 return timestamp; 225 } 226 227 public long getOffset() { 228 return offset; 229 } 230 231 @Override 232 public String toString() { 233 return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\'' 234 + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset 235 + '}'; 236 } 237 } 238 239 private int getTableCount(Connection connection) throws Exception { 240 Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); 241 ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); 242 int count = 0; 243 while (resultScanner.next() != null) { 244 count++; 245 } 246 LOG.info("Table count: " + count); 247 return count; 248 } 249 250 /* 251 * Return replicated entries from cluster1. 252 */ 253 private long getReplicatedEntries() { 254 ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) 255 .getReplicationSourceService().getReplicationManager(); 256 List<ReplicationSourceInterface> sources = manager.getSources(); 257 assertEquals(1, sources.size()); 258 ReplicationSource source = (ReplicationSource) sources.get(0); 259 return source.getTotalReplicatedEdits(); 260 } 261}