001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
026import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
027import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
028import static org.junit.jupiter.api.Assertions.assertEquals;
029import static org.junit.jupiter.api.Assertions.assertFalse;
030import static org.junit.jupiter.api.Assertions.assertTrue;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.List;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
050import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.ReplicationTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.wal.WAL;
055import org.junit.jupiter.api.AfterAll;
056import org.junit.jupiter.api.BeforeAll;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * This test creates 2 mini hbase cluster. One cluster with
064 * "hbase.regionserver.replication.marker.enabled" conf key. This will create
065 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create
066 * marker rows to be replicated to sink cluster. Second cluster with
067 * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the
068 * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table.
069 **/
070@Tag(ReplicationTests.TAG)
071@Tag(MediumTests.TAG)
072public class TestReplicationMarker {
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class);
075
076  private static Configuration conf1;
077  private static Configuration conf2;
078  private static HBaseTestingUtil utility1;
079  private static HBaseTestingUtil utility2;
080
081  @BeforeAll
082  public static void setUpBeforeClass() throws Exception {
083    conf1 = HBaseConfiguration.create();
084    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
085    conf2 = new Configuration(conf1);
086    // Run the replication marker chore in cluster1.
087    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
088    conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec
089    utility1 = new HBaseTestingUtil(conf1);
090
091    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
092    // Enable the replication sink tracker for cluster 2
093    conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true);
094    utility2 = new HBaseTestingUtil(conf2);
095
096    // Start cluster 2 first so that hbase:replicationsinktracker table gets created first.
097    utility2.startMiniCluster(1);
098    waitForReplicationTrackerTableCreation();
099
100    // Start cluster1
101    utility1.startMiniCluster(1);
102    Admin admin1 = utility1.getAdmin();
103    ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
104    rpcBuilder.setClusterKey(utility2.getRpcConnnectionURI());
105    admin1.addReplicationPeer("1", rpcBuilder.build());
106
107    ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
108      .getReplicationSourceService().getReplicationManager();
109    // Wait until the peer gets established.
110    Waiter.waitFor(conf1, 10000, () -> manager.getSources().size() == 1);
111  }
112
113  private static void waitForReplicationTrackerTableCreation() {
114    Waiter.waitFor(conf2, 10000,
115      () -> utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
116  }
117
118  @AfterAll
119  public static void tearDown() throws Exception {
120    utility1.shutdownMiniCluster();
121    utility2.shutdownMiniCluster();
122  }
123
124  @Test
125  public void testReplicationMarkerRow() throws Exception {
126    // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will
127    // create enough sentinel rows.
128    Thread.sleep(5000);
129    WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null);
130    String walName1ForCluster1 = ((AbstractFSWAL<?>) wal1).getCurrentFileName().getName();
131    String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname();
132    // Since we sync the marker edits while appending to wal, all the edits should be visible
133    // to Replication threads immediately.
134    assertTrue(getReplicatedEntries() >= 5);
135    // Force log roll.
136    wal1.rollWriter(true);
137    String walName2ForCluster1 = ((AbstractFSWAL<?>) wal1).getCurrentFileName().getName();
138    Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection();
139    // Sleep for 5 more seconds to get marker rows with new wal name.
140    Thread.sleep(5000);
141    // Wait for cluster 2 to have atleast 8 tracker rows from cluster1.
142    utility2.waitFor(5000, () -> getTableCount(connection2) >= 8);
143    // Get replication marker rows from cluster2
144    List<ReplicationSinkTrackerRow> list = getRows(connection2);
145    for (ReplicationSinkTrackerRow desc : list) {
146      // All the tracker rows should have same region server name i.e. rs of cluster1
147      assertEquals(rs1Name, desc.getRegionServerName());
148      // All the tracker rows will have either wal1 or wal2 name.
149      assertTrue(walName1ForCluster1.equals(desc.getWalName())
150        || walName2ForCluster1.equals(desc.getWalName()));
151    }
152
153    // This table shouldn't exist on cluster1 since
154    // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster.
155    assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
156    // This table shouldn't exist on cluster1 since
157    // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster.
158    assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
159  }
160
161  /*
162   * Get rows for replication sink tracker table.
163   */
164  private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException {
165    List<ReplicationSinkTrackerRow> list = new ArrayList<>();
166    Scan scan = new Scan();
167    Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
168    ResultScanner scanner = table.getScanner(scan);
169
170    Result r;
171    while ((r = scanner.next()) != null) {
172      List<Cell> cells = r.listCells();
173      list.add(getPayload(cells));
174    }
175    return list;
176  }
177
178  private ReplicationSinkTrackerRow getPayload(List<Cell> cells) {
179    String rsName = null, walName = null;
180    Long offset = null;
181    long timestamp = 0L;
182    for (Cell cell : cells) {
183      byte[] qualifier = CellUtil.cloneQualifier(cell);
184      byte[] value = CellUtil.cloneValue(cell);
185
186      if (Bytes.equals(RS_COLUMN, qualifier)) {
187        rsName = Bytes.toString(value);
188      } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) {
189        walName = Bytes.toString(value);
190      } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) {
191        timestamp = Bytes.toLong(value);
192      } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) {
193        offset = Bytes.toLong(value);
194      }
195    }
196    ReplicationSinkTrackerRow row =
197      new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset);
198    return row;
199  }
200
201  static class ReplicationSinkTrackerRow {
202    private String region_server_name;
203    private String wal_name;
204    private long timestamp;
205    private long offset;
206
207    public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp,
208      long offset) {
209      this.region_server_name = region_server_name;
210      this.wal_name = wal_name;
211      this.timestamp = timestamp;
212      this.offset = offset;
213    }
214
215    public String getRegionServerName() {
216      return region_server_name;
217    }
218
219    public String getWalName() {
220      return wal_name;
221    }
222
223    public long getTimestamp() {
224      return timestamp;
225    }
226
227    public long getOffset() {
228      return offset;
229    }
230
231    @Override
232    public String toString() {
233      return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\''
234        + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset
235        + '}';
236    }
237  }
238
239  private int getTableCount(Connection connection) throws Exception {
240    Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
241    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
242    int count = 0;
243    while (resultScanner.next() != null) {
244      count++;
245    }
246    LOG.info("Table count: " + count);
247    return count;
248  }
249
250  /*
251   * Return replicated entries from cluster1.
252   */
253  private long getReplicatedEntries() {
254    ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
255      .getReplicationSourceService().getReplicationManager();
256    List<ReplicationSourceInterface> sources = manager.getSources();
257    assertEquals(1, sources.size());
258    ReplicationSource source = (ReplicationSource) sources.get(0);
259    return source.getTotalReplicatedEdits();
260  }
261}