001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ForkJoinPool; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.function.Supplier; 032import org.apache.commons.io.IOUtils; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.coprocessor.ObserverContext; 038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 040import org.apache.hadoop.hbase.coprocessor.RegionObserver; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 044import org.junit.AfterClass; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.rules.TestName; 048import org.junit.runners.Parameterized.Parameter; 049import org.junit.runners.Parameterized.Parameters; 050 051public abstract class AbstractTestAsyncTableRegionReplicasRead { 052 053 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 054 055 protected static TableName TABLE_NAME = TableName.valueOf("async"); 056 057 protected static byte[] FAMILY = Bytes.toBytes("cf"); 058 059 protected static byte[] QUALIFIER = Bytes.toBytes("cq"); 060 061 protected static byte[] ROW = Bytes.toBytes("row"); 062 063 protected static byte[] VALUE = Bytes.toBytes("value"); 064 065 protected static int REPLICA_COUNT = 3; 066 067 protected static AsyncConnection ASYNC_CONN; 068 069 @Rule 070 public TestName testName = new TestName(); 071 072 @Parameter 073 public Supplier<AsyncTable<?>> getTable; 074 075 private static AsyncTable<?> getRawTable() { 076 return ASYNC_CONN.getTable(TABLE_NAME); 077 } 078 079 private static AsyncTable<?> getTable() { 080 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 081 } 082 083 @Parameters 084 public static List<Object[]> params() { 085 return Arrays.asList( 086 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable }, 087 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable }); 088 } 089 090 protected static volatile boolean FAIL_PRIMARY_GET = false; 091 092 protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT = 093 new ConcurrentHashMap<>(); 094 095 public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { 096 097 @Override 098 public Optional<RegionObserver> getRegionObserver() { 099 return Optional.of(this); 100 } 101 102 private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c) 103 throws IOException { 104 RegionInfo region = c.getEnvironment().getRegionInfo(); 105 if (!region.getTable().equals(TABLE_NAME)) { 106 return; 107 } 108 REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) 109 .incrementAndGet(); 110 if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { 111 throw new IOException("Inject error"); 112 } 113 } 114 115 @Override 116 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, 117 List<Cell> result) throws IOException { 118 recordAndTryFail(c); 119 } 120 121 @Override 122 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) 123 throws IOException { 124 recordAndTryFail(c); 125 } 126 } 127 128 private static boolean allReplicasHaveRow(byte[] row) throws IOException { 129 for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 130 for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { 131 if (region.get(new Get(row), false).isEmpty()) { 132 return false; 133 } 134 } 135 } 136 return true; 137 } 138 139 protected static void startClusterAndCreateTable() throws Exception { 140 // 10 mins 141 TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, 142 TimeUnit.MINUTES.toMillis(10)); 143 TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 144 TimeUnit.MINUTES.toMillis(10)); 145 TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 146 TimeUnit.MINUTES.toMillis(10)); 147 148 // 1 second 149 TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND, 150 TimeUnit.SECONDS.toMicros(1)); 151 TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND, 152 TimeUnit.SECONDS.toMicros(1)); 153 154 // set a small pause so we will retry very quickly 155 TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); 156 157 // infinite retry 158 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); 159 160 TEST_UTIL.startMiniCluster(3); 161 TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 162 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) 163 .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); 164 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); 165 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 166 } 167 168 protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException { 169 // this is the fastest way to let all replicas have the row 170 TEST_UTIL.getAdmin().disableTable(TABLE_NAME); 171 TEST_UTIL.getAdmin().enableTable(TABLE_NAME); 172 TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row)); 173 } 174 175 @AfterClass 176 public static void tearDownAfterClass() throws Exception { 177 IOUtils.closeQuietly(ASYNC_CONN); 178 TEST_UTIL.shutdownMiniCluster(); 179 } 180 181 protected static int getSecondaryGetCount() { 182 return REPLICA_ID_TO_COUNT.entrySet().stream() 183 .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) 184 .mapToInt(e -> e.getValue().get()).sum(); 185 } 186 187 protected static int getPrimaryGetCount() { 188 AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); 189 return primaryGetCount != null ? primaryGetCount.get() : 0; 190 } 191 192 // replicaId = -1 means do not set replica 193 protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception; 194 195 @Test 196 public void testNoReplicaRead() throws Exception { 197 FAIL_PRIMARY_GET = false; 198 REPLICA_ID_TO_COUNT.clear(); 199 AsyncTable<?> table = getTable.get(); 200 readAndCheck(table, -1); 201 // the primary region is fine and the primary timeout is 1 second which is long enough, so we 202 // should not send any requests to secondary replicas even if the consistency is timeline. 203 Thread.sleep(5000); 204 assertEquals(0, getSecondaryGetCount()); 205 } 206 207 @Test 208 public void testReplicaRead() throws Exception { 209 // fail the primary get request 210 FAIL_PRIMARY_GET = true; 211 REPLICA_ID_TO_COUNT.clear(); 212 // make sure that we could still get the value from secondary replicas 213 AsyncTable<?> table = getTable.get(); 214 readAndCheck(table, -1); 215 // make sure that the primary request has been canceled 216 Thread.sleep(5000); 217 int count = getPrimaryGetCount(); 218 Thread.sleep(10000); 219 assertEquals(count, getPrimaryGetCount()); 220 } 221 222 @Test 223 public void testReadSpecificReplica() throws Exception { 224 FAIL_PRIMARY_GET = false; 225 REPLICA_ID_TO_COUNT.clear(); 226 AsyncTable<?> table = getTable.get(); 227 for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { 228 readAndCheck(table, replicaId); 229 assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); 230 } 231 } 232}