001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ForkJoinPool; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.function.Supplier; 031import org.apache.commons.io.IOUtils; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.coprocessor.ObserverContext; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 037import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 038import org.apache.hadoop.hbase.coprocessor.RegionObserver; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 042import org.junit.AfterClass; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.rules.TestName; 046import org.junit.runners.Parameterized.Parameter; 047import org.junit.runners.Parameterized.Parameters; 048 049public abstract class AbstractTestAsyncTableRegionReplicasRead { 050 051 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 052 053 protected static TableName TABLE_NAME = TableName.valueOf("async"); 054 055 protected static byte[] FAMILY = Bytes.toBytes("cf"); 056 057 protected static byte[] QUALIFIER = Bytes.toBytes("cq"); 058 059 protected static byte[] ROW = Bytes.toBytes("row"); 060 061 protected static byte[] VALUE = Bytes.toBytes("value"); 062 063 protected static int REPLICA_COUNT = 3; 064 065 protected static AsyncConnection ASYNC_CONN; 066 067 @Rule 068 public TestName testName = new TestName(); 069 070 @Parameter 071 public Supplier<AsyncTable<?>> getTable; 072 073 private static AsyncTable<?> getRawTable() { 074 return ASYNC_CONN.getTable(TABLE_NAME); 075 } 076 077 private static AsyncTable<?> getTable() { 078 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 079 } 080 081 @Parameters 082 public static List<Object[]> params() { 083 return Arrays.asList( 084 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable }, 085 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable }); 086 } 087 088 protected static volatile boolean FAIL_PRIMARY_GET = false; 089 090 protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT = 091 new ConcurrentHashMap<>(); 092 093 public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { 094 095 @Override 096 public Optional<RegionObserver> getRegionObserver() { 097 return Optional.of(this); 098 } 099 100 private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c) 101 throws IOException { 102 RegionInfo region = c.getEnvironment().getRegionInfo(); 103 if (!region.getTable().equals(TABLE_NAME)) { 104 return; 105 } 106 REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) 107 .incrementAndGet(); 108 if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { 109 throw new IOException("Inject error"); 110 } 111 } 112 113 @Override 114 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, 115 List<Cell> result) throws IOException { 116 recordAndTryFail(c); 117 } 118 119 @Override 120 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) 121 throws IOException { 122 recordAndTryFail(c); 123 } 124 } 125 126 private static boolean allReplicasHaveRow(byte[] row) throws IOException { 127 for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 128 for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { 129 if (region.get(new Get(row), false).isEmpty()) { 130 return false; 131 } 132 } 133 } 134 return true; 135 } 136 137 protected static void startClusterAndCreateTable() throws Exception { 138 TEST_UTIL.startMiniCluster(3); 139 TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 140 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) 141 .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); 142 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); 143 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 144 } 145 146 protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException { 147 // this is the fastest way to let all replicas have the row 148 TEST_UTIL.getAdmin().disableTable(TABLE_NAME); 149 TEST_UTIL.getAdmin().enableTable(TABLE_NAME); 150 TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row)); 151 } 152 153 @AfterClass 154 public static void tearDownAfterClass() throws Exception { 155 IOUtils.closeQuietly(ASYNC_CONN); 156 TEST_UTIL.shutdownMiniCluster(); 157 } 158 159 protected static int getSecondaryGetCount() { 160 return REPLICA_ID_TO_COUNT.entrySet().stream() 161 .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) 162 .mapToInt(e -> e.getValue().get()).sum(); 163 } 164 165 protected static int getPrimaryGetCount() { 166 AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); 167 return primaryGetCount != null ? primaryGetCount.get() : 0; 168 } 169 170 // replicaId = -1 means do not set replica 171 protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception; 172 173 @Test 174 public void testNoReplicaRead() throws Exception { 175 FAIL_PRIMARY_GET = false; 176 REPLICA_ID_TO_COUNT.clear(); 177 AsyncTable<?> table = getTable.get(); 178 readAndCheck(table, -1); 179 // the primary region is fine and the primary timeout is 1 second which is long enough, so we 180 // should not send any requests to secondary replicas even if the consistency is timeline. 181 Thread.sleep(5000); 182 assertEquals(0, getSecondaryGetCount()); 183 } 184 185 @Test 186 public void testReplicaRead() throws Exception { 187 // fail the primary get request 188 FAIL_PRIMARY_GET = true; 189 REPLICA_ID_TO_COUNT.clear(); 190 // make sure that we could still get the value from secondary replicas 191 AsyncTable<?> table = getTable.get(); 192 readAndCheck(table, -1); 193 // make sure that the primary request has been canceled 194 Thread.sleep(5000); 195 int count = getPrimaryGetCount(); 196 Thread.sleep(10000); 197 assertEquals(count, getPrimaryGetCount()); 198 } 199 200 @Test 201 public void testReadSpecificReplica() throws Exception { 202 FAIL_PRIMARY_GET = false; 203 REPLICA_ID_TO_COUNT.clear(); 204 AsyncTable<?> table = getTable.get(); 205 for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { 206 readAndCheck(table, replicaId); 207 assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); 208 } 209 } 210}