001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
039import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
040import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
041import org.apache.hadoop.hbase.client.coprocessor.Batch;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.Region;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Test that we can actually send and use region metrics to slowdown client writes
057 */
058@Category(MediumTests.class)
059public class TestClientPushback {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestClientPushback.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestClientPushback.class);
066  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
067
068  private static final TableName tableName = TableName.valueOf("client-pushback");
069  private static final byte[] family = Bytes.toBytes("f");
070  private static final byte[] qualifier = Bytes.toBytes("q");
071  private static final long flushSizeBytes = 512;
072
073  @BeforeClass
074  public static void setupCluster() throws Exception{
075    Configuration conf = UTIL.getConfiguration();
076    // enable backpressure
077    conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
078    // use the exponential backoff policy
079    conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class,
080      ClientBackoffPolicy.class);
081    // turn the memstore size way down so we don't need to write a lot to see changes in memstore
082    // load
083    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
084    // ensure we block the flushes when we are double that flushsize
085    conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
086    conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
087    UTIL.startMiniCluster(1);
088    UTIL.createTable(tableName, family);
089  }
090
091  @AfterClass
092  public static void teardownCluster() throws Exception{
093    UTIL.shutdownMiniCluster();
094  }
095
096  @Test
097  public void testClientTracksServerPushback() throws Exception{
098    Configuration conf = UTIL.getConfiguration();
099
100    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
101    BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
102
103    HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
104    Region region = rs.getRegions(tableName).get(0);
105
106    LOG.debug("Writing some data to "+tableName);
107    // write some data
108    Put p = new Put(Bytes.toBytes("row"));
109    p.addColumn(family, qualifier, Bytes.toBytes("value1"));
110    mutator.mutate(p);
111    mutator.flush();
112
113    // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
114    int load = (int) ((region.getMemStoreHeapSize() * 100)
115        / flushSizeBytes);
116    LOG.debug("Done writing some data to "+tableName);
117
118    // get the stats for the region hosting our table
119    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
120    assertTrue("Backoff policy is not correctly configured",
121      backoffPolicy instanceof ExponentialClientBackoffPolicy);
122
123    ServerStatisticTracker stats = conn.getStatisticsTracker();
124    assertNotNull( "No stats configured for the client!", stats);
125    // get the names so we can query the stats
126    ServerName server = rs.getServerName();
127    byte[] regionName = region.getRegionInfo().getRegionName();
128
129    // check to see we found some load on the memstore
130    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
131    ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
132    assertEquals("We did not find some load on the memstore", load,
133      regionStats.getMemStoreLoadPercent());
134    // check that the load reported produces a nonzero delay
135    long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
136    assertNotEquals("Reported load does not produce a backoff", 0, backoffTime);
137    LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " +
138      server + " is " + backoffTime);
139
140    // Reach into the connection and submit work directly to AsyncProcess so we can
141    // monitor how long the submission was delayed via a callback
142    List<Row> ops = new ArrayList<>(1);
143    ops.add(p);
144    final CountDownLatch latch = new CountDownLatch(1);
145    final AtomicLong endTime = new AtomicLong();
146    long startTime = EnvironmentEdgeManager.currentTime();
147    Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
148        endTime.set(EnvironmentEdgeManager.currentTime());
149        latch.countDown();
150    };
151    AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
152            .setPool(mutator.getPool())
153            .setTableName(tableName)
154            .setRowAccess(ops)
155            .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
156            .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
157            .setRpcTimeout(60 * 1000)
158            .build();
159    mutator.getAsyncProcess().submit(task);
160    // Currently the ExponentialClientBackoffPolicy under these test conditions
161    // produces a backoffTime of 151 milliseconds. This is long enough so the
162    // wait and related checks below are reasonable. Revisit if the backoff
163    // time reported by above debug logging has significantly deviated.
164    String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
165    MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
166            serverStats.get(server).get(regionName);
167    assertEquals(name, rsStats.name);
168    assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
169        (double)regionStats.getHeapOccupancyPercent(), 0.1 );
170    assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
171        (double)regionStats.getMemStoreLoadPercent(), 0.1);
172
173    MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats;
174
175    assertEquals(1, runnerStats.delayRunners.getCount());
176    assertEquals(1, runnerStats.normalRunners.getCount());
177    assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
178      (double)backoffTime, 0.1);
179
180    latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
181    assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get());
182    assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
183  }
184
185  @Test
186  public void testMutateRowStats() throws IOException {
187    Configuration conf = UTIL.getConfiguration();
188    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
189    Table table = conn.getTable(tableName);
190    HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
191    Region region = rs.getRegions(tableName).get(0);
192
193    RowMutations mutations = new RowMutations(Bytes.toBytes("row"));
194    Put p = new Put(Bytes.toBytes("row"));
195    p.addColumn(family, qualifier, Bytes.toBytes("value2"));
196    mutations.add(p);
197    table.mutateRow(mutations);
198
199    ServerStatisticTracker stats = conn.getStatisticsTracker();
200    assertNotNull( "No stats configured for the client!", stats);
201    // get the names so we can query the stats
202    ServerName server = rs.getServerName();
203    byte[] regionName = region.getRegionInfo().getRegionName();
204
205    // check to see we found some load on the memstore
206    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
207    ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
208
209    assertNotNull(regionStats);
210    assertTrue(regionStats.getMemStoreLoadPercent() > 0);
211    }
212}