001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.fail; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.testclassification.IntegrationTests; 028import org.apache.hadoop.hbase.util.ConstantDelayQueue; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.LoadTestTool; 031import org.apache.hadoop.hbase.util.MultiThreadedUpdater; 032import org.apache.hadoop.hbase.util.MultiThreadedWriter; 033import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 034import org.apache.hadoop.hbase.util.TableDescriptorChecker; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 037import org.apache.hadoop.util.StringUtils; 038import org.apache.hadoop.util.ToolRunner; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 043 044/** 045 * Integration test for testing async wal replication to secondary region replicas. Sets up a table 046 * with given region replication (default 2), and uses LoadTestTool client writer, updater and 047 * reader threads for writes and reads and verification. It uses a delay queue with a given delay 048 * ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the 049 * written items available to readers. This means that a reader will only start reading from a row 050 * written by the writer / updater after 5secs has passed. The reader thread performs the reads from 051 * the given region replica id (default 1) to perform the reads. Async wal replication has to finish 052 * with the replication of the edits before read_delay_ms to the given region replica id so that the 053 * read and verify will not fail. The job will run for <b>at least</b> given runtime (default 10min) 054 * by running a concurrent writer and reader workload followed by a concurrent updater and reader 055 * workload for num_keys_per_server. 056 * <p> 057 * Example usage: 058 * </p> 059 * 060 * <pre> 061 * hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication 062 * -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000 063 * -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000 064 * -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000 065 * -DIntegrationTestRegionReplicaReplication.region_replication=3 066 * -DIntegrationTestRegionReplicaReplication.region_replica_id=2 067 * -DIntegrationTestRegionReplicaReplication.num_read_threads=100 068 * -DIntegrationTestRegionReplicaReplication.num_write_threads=100 069 * </pre> 070 */ 071@Tag(IntegrationTests.TAG) 072public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest { 073 074 private static final String TEST_NAME = 075 IntegrationTestRegionReplicaReplication.class.getSimpleName(); 076 077 private static final String OPT_READ_DELAY_MS = "read_delay_ms"; 078 079 private static final int DEFAULT_REGION_REPLICATION = 2; 080 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 081 private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] { "f1", "f2", "f3" }; 082 083 @Override 084 protected int getMinServerCount() { 085 return SERVER_COUNT; 086 } 087 088 @Override 089 public void setConf(Configuration conf) { 090 conf.setIfUnset(String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION), 091 String.valueOf(DEFAULT_REGION_REPLICATION)); 092 093 conf.setIfUnset(String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES), 094 StringUtils.join(",", DEFAULT_COLUMN_FAMILIES)); 095 096 conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); 097 098 // enable async wal replication to region replicas for unit tests 099 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 100 101 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB 102 conf.setInt("hbase.hstore.blockingStoreFiles", 100); 103 104 super.setConf(conf); 105 } 106 107 @Override 108 @Test 109 public void testIngest() throws Exception { 110 runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20); 111 } 112 113 /** 114 * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer 115 * threads to become available to the MultiThradedReader threads. We add this delay because of the 116 * async nature of the wal replication to region replicas. 117 */ 118 public static class DelayingMultiThreadedWriter extends MultiThreadedWriter { 119 private long delayMs; 120 121 public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, 122 TableName tableName) throws IOException { 123 super(dataGen, conf, tableName); 124 } 125 126 @Override 127 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) { 128 this.delayMs = conf.getLong(String.format("%s.%s", 129 IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000); 130 return new ConstantDelayQueue<>(TimeUnit.MILLISECONDS, delayMs); 131 } 132 } 133 134 /** 135 * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer 136 * threads to become available to the MultiThradedReader threads. We add this delay because of the 137 * async nature of the wal replication to region replicas. 138 */ 139 public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater { 140 private long delayMs; 141 142 public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf, 143 TableName tableName, double updatePercent) throws IOException { 144 super(dataGen, conf, tableName, updatePercent); 145 } 146 147 @Override 148 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) { 149 this.delayMs = conf.getLong(String.format("%s.%s", 150 IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000); 151 return new ConstantDelayQueue<>(TimeUnit.MILLISECONDS, delayMs); 152 } 153 } 154 155 @Override 156 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, 157 int recordSize, int writeThreads, int readThreads) throws Exception { 158 159 LOG.info("Running ingest"); 160 LOG.info("Cluster size:" 161 + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 162 163 // sleep for some time so that the cache for disabled tables does not interfere. 164 Threads.sleep(getConf().getInt( 165 "hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000) + 1000); 166 167 long start = EnvironmentEdgeManager.currentTime(); 168 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 169 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); 170 long startKey = 0; 171 172 long numKeys = getNumKeys(keysPerServerPerIter); 173 while (EnvironmentEdgeManager.currentTime() - start < 0.9 * runtime) { 174 LOG.info("Intended run time: " + (runtime / 60000) + " min, left:" 175 + ((runtime - (EnvironmentEdgeManager.currentTime() - start)) / 60000) + " min"); 176 177 int verifyPercent = 100; 178 int updatePercent = 20; 179 int regionReplicaId = 180 conf.getInt(String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1); 181 182 // we will run writers and readers at the same time. 183 List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys)); 184 args.add("-write"); 185 args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads)); 186 args.add("-" + LoadTestTool.OPT_MULTIPUT); 187 args.add("-writer"); 188 args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class 189 args.add("-read"); 190 args.add(String.format("%d:%d", verifyPercent, readThreads)); 191 args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID); 192 args.add(String.valueOf(regionReplicaId)); 193 194 int ret = loadTool.run(args.toArray(new String[args.size()])); 195 if (0 != ret) { 196 String errorMsg = "Load failed with error code " + ret; 197 LOG.error(errorMsg); 198 fail(errorMsg); 199 } 200 201 args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys)); 202 args.add("-update"); 203 args.add(String.format("%s:%s:1", updatePercent, writeThreads)); 204 args.add("-updater"); 205 args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class 206 args.add("-read"); 207 args.add(String.format("%d:%d", verifyPercent, readThreads)); 208 args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID); 209 args.add(String.valueOf(regionReplicaId)); 210 211 ret = loadTool.run(args.toArray(new String[args.size()])); 212 if (0 != ret) { 213 String errorMsg = "Load failed with error code " + ret; 214 LOG.error(errorMsg); 215 fail(errorMsg); 216 } 217 startKey += numKeys; 218 } 219 } 220 221 public static void main(String[] args) throws Exception { 222 Configuration conf = HBaseConfiguration.create(); 223 IntegrationTestingUtility.setUseDistributedCluster(conf); 224 int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args); 225 System.exit(ret); 226 } 227}