001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 039import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; 040import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 041import org.apache.hadoop.hbase.client.coprocessor.Batch; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.regionserver.Region; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Test that we can actually send and use region metrics to slowdown client writes 057 */ 058@Category(MediumTests.class) 059public class TestClientPushback { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestClientPushback.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestClientPushback.class); 066 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 067 068 private static final TableName tableName = TableName.valueOf("client-pushback"); 069 private static final byte[] family = Bytes.toBytes("f"); 070 private static final byte[] qualifier = Bytes.toBytes("q"); 071 private static final long flushSizeBytes = 512; 072 073 @BeforeClass 074 public static void setupCluster() throws Exception{ 075 Configuration conf = UTIL.getConfiguration(); 076 // enable backpressure 077 conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 078 // use the exponential backoff policy 079 conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class, 080 ClientBackoffPolicy.class); 081 // turn the memstore size way down so we don't need to write a lot to see changes in memstore 082 // load 083 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); 084 // ensure we block the flushes when we are double that flushsize 085 conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); 086 conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true); 087 UTIL.startMiniCluster(1); 088 UTIL.createTable(tableName, family); 089 } 090 091 @AfterClass 092 public static void teardownCluster() throws Exception{ 093 UTIL.shutdownMiniCluster(); 094 } 095 096 @Test 097 public void testClientTracksServerPushback() throws Exception{ 098 Configuration conf = UTIL.getConfiguration(); 099 100 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); 101 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName); 102 103 HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); 104 Region region = rs.getRegions(tableName).get(0); 105 106 LOG.debug("Writing some data to "+tableName); 107 // write some data 108 Put p = new Put(Bytes.toBytes("row")); 109 p.addColumn(family, qualifier, Bytes.toBytes("value1")); 110 mutator.mutate(p); 111 mutator.flush(); 112 113 // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data 114 int load = (int) ((region.getMemStoreHeapSize() * 100) 115 / flushSizeBytes); 116 LOG.debug("Done writing some data to "+tableName); 117 118 // get the stats for the region hosting our table 119 ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); 120 assertTrue("Backoff policy is not correctly configured", 121 backoffPolicy instanceof ExponentialClientBackoffPolicy); 122 123 ServerStatisticTracker stats = conn.getStatisticsTracker(); 124 assertNotNull( "No stats configured for the client!", stats); 125 // get the names so we can query the stats 126 ServerName server = rs.getServerName(); 127 byte[] regionName = region.getRegionInfo().getRegionName(); 128 129 // check to see we found some load on the memstore 130 ServerStatistics serverStats = stats.getServerStatsForTesting(server); 131 ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); 132 assertEquals("We did not find some load on the memstore", load, 133 regionStats.getMemStoreLoadPercent()); 134 // check that the load reported produces a nonzero delay 135 long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats); 136 assertNotEquals("Reported load does not produce a backoff", 0, backoffTime); 137 LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " + 138 server + " is " + backoffTime); 139 140 // Reach into the connection and submit work directly to AsyncProcess so we can 141 // monitor how long the submission was delayed via a callback 142 List<Row> ops = new ArrayList<>(1); 143 ops.add(p); 144 final CountDownLatch latch = new CountDownLatch(1); 145 final AtomicLong endTime = new AtomicLong(); 146 long startTime = EnvironmentEdgeManager.currentTime(); 147 Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> { 148 endTime.set(EnvironmentEdgeManager.currentTime()); 149 latch.countDown(); 150 }; 151 AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback) 152 .setPool(mutator.getPool()) 153 .setTableName(tableName) 154 .setRowAccess(ops) 155 .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) 156 .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout()) 157 .setRpcTimeout(60 * 1000) 158 .build(); 159 mutator.getAsyncProcess().submit(task); 160 // Currently the ExponentialClientBackoffPolicy under these test conditions 161 // produces a backoffTime of 151 milliseconds. This is long enough so the 162 // wait and related checks below are reasonable. Revisit if the backoff 163 // time reported by above debug logging has significantly deviated. 164 String name = server.getServerName() + "," + Bytes.toStringBinary(regionName); 165 MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics(). 166 serverStats.get(server).get(regionName); 167 assertEquals(name, rsStats.name); 168 assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(), 169 (double)regionStats.getHeapOccupancyPercent(), 0.1 ); 170 assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(), 171 (double)regionStats.getMemStoreLoadPercent(), 0.1); 172 173 MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats; 174 175 assertEquals(1, runnerStats.delayRunners.getCount()); 176 assertEquals(1, runnerStats.normalRunners.getCount()); 177 assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(), 178 (double)backoffTime, 0.1); 179 180 latch.await(backoffTime * 2, TimeUnit.MILLISECONDS); 181 assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get()); 182 assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); 183 } 184 185 @Test 186 public void testMutateRowStats() throws IOException { 187 Configuration conf = UTIL.getConfiguration(); 188 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); 189 Table table = conn.getTable(tableName); 190 HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); 191 Region region = rs.getRegions(tableName).get(0); 192 193 RowMutations mutations = new RowMutations(Bytes.toBytes("row")); 194 Put p = new Put(Bytes.toBytes("row")); 195 p.addColumn(family, qualifier, Bytes.toBytes("value2")); 196 mutations.add(p); 197 table.mutateRow(mutations); 198 199 ServerStatisticTracker stats = conn.getStatisticsTracker(); 200 assertNotNull( "No stats configured for the client!", stats); 201 // get the names so we can query the stats 202 ServerName server = rs.getServerName(); 203 byte[] regionName = region.getRegionInfo().getRegionName(); 204 205 // check to see we found some load on the memstore 206 ServerStatistics serverStats = stats.getServerStatsForTesting(server); 207 ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); 208 209 assertNotNull(regionStats); 210 assertTrue(regionStats.getMemStoreLoadPercent() > 0); 211 } 212}