001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HColumnDescriptor;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.HTableDescriptor;
032import org.apache.hadoop.hbase.HTestConst;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.ResultScanner;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.junit.AfterClass;
049import org.junit.Assert;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category({ ReplicationTests.class, LargeTests.class })
060public class TestGlobalReplicationThrottler {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestGlobalReplicationThrottler.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestGlobalReplicationThrottler.class);
067  private static Configuration conf1;
068  private static Configuration conf2;
069
070  private static HBaseTestingUtility utility1;
071  private static HBaseTestingUtility utility2;
072
073  private static final byte[] famName = Bytes.toBytes("f");
074  private static final byte[] VALUE = Bytes.toBytes("v");
075  private static final byte[] ROW = Bytes.toBytes("r");
076  private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
077
078  @Rule
079  public TestName name = new TestName();
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    conf1 = HBaseConfiguration.create();
084    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
085    conf1.setLong("replication.source.sleepforretries", 100);
086    // Each WAL is about 120 bytes
087    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
088    conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
089
090    utility1 = new HBaseTestingUtility(conf1);
091    utility1.startMiniZKCluster();
092    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
093    new ZKWatcher(conf1, "cluster1", null, true);
094
095    conf2 = new Configuration(conf1);
096    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
097
098    utility2 = new HBaseTestingUtility(conf2);
099    utility2.setZkCluster(miniZK);
100    new ZKWatcher(conf2, "cluster2", null, true);
101
102    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
103    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
104    rpc.setClusterKey(utility2.getClusterKey());
105
106    utility1.startMiniCluster(1, 1);
107    utility2.startMiniCluster(1, 1);
108
109    admin1.addPeer("peer1", rpc, null);
110    admin1.addPeer("peer2", rpc, null);
111    admin1.addPeer("peer3", rpc, null);
112  }
113
114  @AfterClass
115  public static void tearDownAfterClass() throws Exception {
116    utility2.shutdownMiniCluster();
117    utility1.shutdownMiniCluster();
118  }
119
120
121  volatile private boolean testQuotaPass = false;
122  volatile private boolean testQuotaNonZero = false;
123  @Test
124  public void testQuota() throws IOException {
125    final TableName tableName = TableName.valueOf(name.getMethodName());
126    HTableDescriptor table = new HTableDescriptor(tableName);
127    HColumnDescriptor fam = new HColumnDescriptor(famName);
128    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
129    table.addFamily(fam);
130    utility1.getAdmin().createTable(table);
131    utility2.getAdmin().createTable(table);
132
133    Thread watcher = new Thread(()->{
134      Replication replication = (Replication)utility1.getMiniHBaseCluster()
135          .getRegionServer(0).getReplicationSourceService();
136      AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
137      testQuotaPass = true;
138      while (!Thread.interrupted()) {
139        long size = bufferUsed.get();
140        if (size > 0) {
141          testQuotaNonZero = true;
142        }
143        if (size > 600) {
144          // We read logs first then check throttler, so if the buffer quota limiter doesn't
145          // take effect, it will push many logs and exceed the quota.
146          testQuotaPass = false;
147        }
148        Threads.sleep(50);
149      }
150    });
151    watcher.start();
152
153    try(Table t1 = utility1.getConnection().getTable(tableName);
154        Table t2 = utility2.getConnection().getTable(tableName)) {
155      for (int i = 0; i < 50; i++) {
156        Put put = new Put(ROWS[i]);
157        put.addColumn(famName, VALUE, VALUE);
158        t1.put(put);
159      }
160      long start = EnvironmentEdgeManager.currentTime();
161      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
162        Scan scan = new Scan();
163        scan.setCaching(50);
164        int count = 0;
165        try (ResultScanner results = t2.getScanner(scan)) {
166          for (Result result : results) {
167            count++;
168          }
169        }
170        if (count < 50) {
171          LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count);
172          Threads.sleep(200);
173          continue;
174        }
175        break;
176      }
177    }
178
179    watcher.interrupt();
180    Assert.assertTrue(testQuotaPass);
181    Assert.assertTrue(testQuotaNonZero);
182  }
183
184  private List<Integer> getRowNumbers(List<Cell> cells) {
185    List<Integer> listOfRowNumbers = new ArrayList<>(cells.size());
186    for (Cell c : cells) {
187      listOfRowNumbers.add(Integer.parseInt(Bytes
188          .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
189              c.getRowLength() - ROW.length)));
190    }
191    return listOfRowNumbers;
192  }
193}