001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ForkJoinPool;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.function.Supplier;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.coprocessor.ObserverContext;
038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
040import org.apache.hadoop.hbase.coprocessor.RegionObserver;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
044import org.junit.AfterClass;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.rules.TestName;
048import org.junit.runners.Parameterized.Parameter;
049import org.junit.runners.Parameterized.Parameters;
050
051public abstract class AbstractTestAsyncTableRegionReplicasRead {
052
053  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
054
055  protected static TableName TABLE_NAME = TableName.valueOf("async");
056
057  protected static byte[] FAMILY = Bytes.toBytes("cf");
058
059  protected static byte[] QUALIFIER = Bytes.toBytes("cq");
060
061  protected static byte[] ROW = Bytes.toBytes("row");
062
063  protected static byte[] VALUE = Bytes.toBytes("value");
064
065  protected static int REPLICA_COUNT = 3;
066
067  protected static AsyncConnection ASYNC_CONN;
068
069  @Rule
070  public TestName testName = new TestName();
071
072  @Parameter
073  public Supplier<AsyncTable<?>> getTable;
074
075  private static AsyncTable<?> getRawTable() {
076    return ASYNC_CONN.getTable(TABLE_NAME);
077  }
078
079  private static AsyncTable<?> getTable() {
080    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
081  }
082
083  @Parameters
084  public static List<Object[]> params() {
085    return Arrays.asList(
086      new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
087      new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
088  }
089
090  protected static volatile boolean FAIL_PRIMARY_GET = false;
091
092  protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
093    new ConcurrentHashMap<>();
094
095  public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
096
097    @Override
098    public Optional<RegionObserver> getRegionObserver() {
099      return Optional.of(this);
100    }
101
102    private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
103        throws IOException {
104      RegionInfo region = c.getEnvironment().getRegionInfo();
105      if (!region.getTable().equals(TABLE_NAME)) {
106        return;
107      }
108      REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
109        .incrementAndGet();
110      if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
111        throw new IOException("Inject error");
112      }
113    }
114
115    @Override
116    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
117        List<Cell> result) throws IOException {
118      recordAndTryFail(c);
119    }
120
121    @Override
122    public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
123        throws IOException {
124      recordAndTryFail(c);
125    }
126  }
127
128  private static boolean allReplicasHaveRow(byte[] row) throws IOException {
129    for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
130      for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
131        if (region.get(new Get(row), false).isEmpty()) {
132          return false;
133        }
134      }
135    }
136    return true;
137  }
138
139  protected static void startClusterAndCreateTable() throws Exception {
140    // 10 mins
141    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
142      TimeUnit.MINUTES.toMillis(10));
143    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
144      TimeUnit.MINUTES.toMillis(10));
145    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
146      TimeUnit.MINUTES.toMillis(10));
147
148    // 1 second
149    TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
150      TimeUnit.SECONDS.toMicros(1));
151    TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND,
152      TimeUnit.SECONDS.toMicros(1));
153
154    // set a small pause so we will retry very quickly
155    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
156
157    // infinite retry
158    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
159
160    TEST_UTIL.startMiniCluster(3);
161    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
162      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
163      .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
164    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
165    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
166  }
167
168  protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
169    // this is the fastest way to let all replicas have the row
170    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
171    TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
172    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
173  }
174
175  @AfterClass
176  public static void tearDownAfterClass() throws Exception {
177    IOUtils.closeQuietly(ASYNC_CONN);
178    TEST_UTIL.shutdownMiniCluster();
179  }
180
181  protected static int getSecondaryGetCount() {
182    return REPLICA_ID_TO_COUNT.entrySet().stream()
183      .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
184      .mapToInt(e -> e.getValue().get()).sum();
185  }
186
187  protected static int getPrimaryGetCount() {
188    AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
189    return primaryGetCount != null ? primaryGetCount.get() : 0;
190  }
191
192  // replicaId = -1 means do not set replica
193  protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
194
195  @Test
196  public void testNoReplicaRead() throws Exception {
197    FAIL_PRIMARY_GET = false;
198    REPLICA_ID_TO_COUNT.clear();
199    AsyncTable<?> table = getTable.get();
200    readAndCheck(table, -1);
201    // the primary region is fine and the primary timeout is 1 second which is long enough, so we
202    // should not send any requests to secondary replicas even if the consistency is timeline.
203    Thread.sleep(5000);
204    assertEquals(0, getSecondaryGetCount());
205  }
206
207  @Test
208  public void testReplicaRead() throws Exception {
209    // fail the primary get request
210    FAIL_PRIMARY_GET = true;
211    REPLICA_ID_TO_COUNT.clear();
212    // make sure that we could still get the value from secondary replicas
213    AsyncTable<?> table = getTable.get();
214    readAndCheck(table, -1);
215    // make sure that the primary request has been canceled
216    Thread.sleep(5000);
217    int count = getPrimaryGetCount();
218    Thread.sleep(10000);
219    assertEquals(count, getPrimaryGetCount());
220  }
221
222  @Test
223  public void testReadSpecificReplica() throws Exception {
224    FAIL_PRIMARY_GET = false;
225    REPLICA_ID_TO_COUNT.clear();
226    AsyncTable<?> table = getTable.get();
227    for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
228      readAndCheck(table, replicaId);
229      assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
230    }
231  }
232}