001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ForkJoinPool; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.function.Supplier; 030import java.util.stream.Stream; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.coprocessor.ObserverContext; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.RegionObserver; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.junit.jupiter.api.AfterAll; 042import org.junit.jupiter.api.TestTemplate; 043import org.junit.jupiter.params.provider.Arguments; 044 045import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 046 047public abstract class AbstractTestAsyncTableRegionReplicasRead { 048 049 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 050 051 protected static TableName TABLE_NAME = TableName.valueOf("async"); 052 053 protected static byte[] FAMILY = Bytes.toBytes("cf"); 054 055 protected static byte[] QUALIFIER = Bytes.toBytes("cq"); 056 057 protected static byte[] ROW = Bytes.toBytes("row"); 058 059 protected static byte[] VALUE = Bytes.toBytes("value"); 060 061 protected static int REPLICA_COUNT = 3; 062 063 protected static AsyncConnection ASYNC_CONN; 064 065 protected Supplier<AsyncTable<?>> getTable; 066 067 public static Stream<Arguments> parameters() { 068 return Stream.of( 069 Arguments.of((Supplier<AsyncTable<?>>) AbstractTestAsyncTableRegionReplicasRead::getRawTable), 070 Arguments.of((Supplier<AsyncTable<?>>) AbstractTestAsyncTableRegionReplicasRead::getTable)); 071 } 072 073 protected AbstractTestAsyncTableRegionReplicasRead(Supplier<AsyncTable<?>> getTable) { 074 this.getTable = getTable; 075 } 076 077 protected static AsyncTable<?> getRawTable() { 078 return ASYNC_CONN.getTable(TABLE_NAME); 079 } 080 081 protected static AsyncTable<?> getTable() { 082 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 083 } 084 085 protected static volatile boolean FAIL_PRIMARY_GET = false; 086 087 protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT = 088 new ConcurrentHashMap<>(); 089 090 public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { 091 092 @Override 093 public Optional<RegionObserver> getRegionObserver() { 094 return Optional.of(this); 095 } 096 097 private void recordAndTryFail(ObserverContext<? extends RegionCoprocessorEnvironment> c) 098 throws IOException { 099 RegionInfo region = c.getEnvironment().getRegionInfo(); 100 if (!region.getTable().equals(TABLE_NAME)) { 101 return; 102 } 103 REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) 104 .incrementAndGet(); 105 if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { 106 throw new IOException("Inject error"); 107 } 108 } 109 110 @Override 111 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 112 List<Cell> result) throws IOException { 113 recordAndTryFail(c); 114 } 115 116 @Override 117 public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan) 118 throws IOException { 119 recordAndTryFail(c); 120 } 121 } 122 123 private static boolean allReplicasHaveRow(byte[] row) throws IOException { 124 for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 125 for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { 126 if (region.get(new Get(row), false).isEmpty()) { 127 return false; 128 } 129 } 130 } 131 return true; 132 } 133 134 protected static void startClusterAndCreateTable() throws Exception { 135 TEST_UTIL.startMiniCluster(3); 136 TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 137 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) 138 .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); 139 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); 140 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 141 } 142 143 protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException { 144 // this is the fastest way to let all replicas have the row 145 TEST_UTIL.getAdmin().disableTable(TABLE_NAME); 146 TEST_UTIL.getAdmin().enableTable(TABLE_NAME); 147 TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row)); 148 } 149 150 @AfterAll 151 public static void tearDownAfterClass() throws Exception { 152 Closeables.close(ASYNC_CONN, true); 153 TEST_UTIL.shutdownMiniCluster(); 154 } 155 156 protected static int getSecondaryGetCount() { 157 return REPLICA_ID_TO_COUNT.entrySet().stream() 158 .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) 159 .mapToInt(e -> e.getValue().get()).sum(); 160 } 161 162 protected static int getPrimaryGetCount() { 163 AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); 164 return primaryGetCount != null ? primaryGetCount.get() : 0; 165 } 166 167 // replicaId = -1 means do not set replica 168 protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception; 169 170 @TestTemplate 171 public void testNoReplicaRead() throws Exception { 172 FAIL_PRIMARY_GET = false; 173 REPLICA_ID_TO_COUNT.clear(); 174 AsyncTable<?> table = getTable.get(); 175 readAndCheck(table, -1); 176 // the primary region is fine and the primary timeout is 1 second which is long enough, so we 177 // should not send any requests to secondary replicas even if the consistency is timeline. 178 Thread.sleep(5000); 179 assertEquals(0, getSecondaryGetCount()); 180 } 181 182 @TestTemplate 183 public void testReplicaRead() throws Exception { 184 // fail the primary get request 185 FAIL_PRIMARY_GET = true; 186 REPLICA_ID_TO_COUNT.clear(); 187 // make sure that we could still get the value from secondary replicas 188 AsyncTable<?> table = getTable.get(); 189 readAndCheck(table, -1); 190 // make sure that the primary request has been canceled 191 Thread.sleep(5000); 192 int count = getPrimaryGetCount(); 193 Thread.sleep(10000); 194 assertEquals(count, getPrimaryGetCount()); 195 } 196 197 @TestTemplate 198 public void testReadSpecificReplica() throws Exception { 199 FAIL_PRIMARY_GET = false; 200 REPLICA_ID_TO_COUNT.clear(); 201 AsyncTable<?> table = getTable.get(); 202 for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { 203 readAndCheck(table, replicaId); 204 assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); 205 } 206 } 207}