001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ForkJoinPool;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.function.Supplier;
031import org.apache.commons.io.IOUtils;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.coprocessor.ObserverContext;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
037import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
038import org.apache.hadoop.hbase.coprocessor.RegionObserver;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
042import org.junit.AfterClass;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.rules.TestName;
046import org.junit.runners.Parameterized.Parameter;
047import org.junit.runners.Parameterized.Parameters;
048
049public abstract class AbstractTestAsyncTableRegionReplicasRead {
050
051  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
052
053  protected static TableName TABLE_NAME = TableName.valueOf("async");
054
055  protected static byte[] FAMILY = Bytes.toBytes("cf");
056
057  protected static byte[] QUALIFIER = Bytes.toBytes("cq");
058
059  protected static byte[] ROW = Bytes.toBytes("row");
060
061  protected static byte[] VALUE = Bytes.toBytes("value");
062
063  protected static int REPLICA_COUNT = 3;
064
065  protected static AsyncConnection ASYNC_CONN;
066
067  @Rule
068  public TestName testName = new TestName();
069
070  @Parameter
071  public Supplier<AsyncTable<?>> getTable;
072
073  private static AsyncTable<?> getRawTable() {
074    return ASYNC_CONN.getTable(TABLE_NAME);
075  }
076
077  private static AsyncTable<?> getTable() {
078    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
079  }
080
081  @Parameters
082  public static List<Object[]> params() {
083    return Arrays.asList(
084      new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
085      new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
086  }
087
088  protected static volatile boolean FAIL_PRIMARY_GET = false;
089
090  protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
091    new ConcurrentHashMap<>();
092
093  public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
094
095    @Override
096    public Optional<RegionObserver> getRegionObserver() {
097      return Optional.of(this);
098    }
099
100    private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
101        throws IOException {
102      RegionInfo region = c.getEnvironment().getRegionInfo();
103      if (!region.getTable().equals(TABLE_NAME)) {
104        return;
105      }
106      REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
107        .incrementAndGet();
108      if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
109        throw new IOException("Inject error");
110      }
111    }
112
113    @Override
114    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
115        List<Cell> result) throws IOException {
116      recordAndTryFail(c);
117    }
118
119    @Override
120    public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
121        throws IOException {
122      recordAndTryFail(c);
123    }
124  }
125
126  private static boolean allReplicasHaveRow(byte[] row) throws IOException {
127    for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
128      for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
129        if (region.get(new Get(row), false).isEmpty()) {
130          return false;
131        }
132      }
133    }
134    return true;
135  }
136
137  protected static void startClusterAndCreateTable() throws Exception {
138    TEST_UTIL.startMiniCluster(3);
139    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
140      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
141      .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
142    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
143    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
144  }
145
146  protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
147    // this is the fastest way to let all replicas have the row
148    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
149    TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
150    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
151  }
152
153  @AfterClass
154  public static void tearDownAfterClass() throws Exception {
155    IOUtils.closeQuietly(ASYNC_CONN);
156    TEST_UTIL.shutdownMiniCluster();
157  }
158
159  protected static int getSecondaryGetCount() {
160    return REPLICA_ID_TO_COUNT.entrySet().stream()
161      .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
162      .mapToInt(e -> e.getValue().get()).sum();
163  }
164
165  protected static int getPrimaryGetCount() {
166    AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
167    return primaryGetCount != null ? primaryGetCount.get() : 0;
168  }
169
170  // replicaId = -1 means do not set replica
171  protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
172
173  @Test
174  public void testNoReplicaRead() throws Exception {
175    FAIL_PRIMARY_GET = false;
176    REPLICA_ID_TO_COUNT.clear();
177    AsyncTable<?> table = getTable.get();
178    readAndCheck(table, -1);
179    // the primary region is fine and the primary timeout is 1 second which is long enough, so we
180    // should not send any requests to secondary replicas even if the consistency is timeline.
181    Thread.sleep(5000);
182    assertEquals(0, getSecondaryGetCount());
183  }
184
185  @Test
186  public void testReplicaRead() throws Exception {
187    // fail the primary get request
188    FAIL_PRIMARY_GET = true;
189    REPLICA_ID_TO_COUNT.clear();
190    // make sure that we could still get the value from secondary replicas
191    AsyncTable<?> table = getTable.get();
192    readAndCheck(table, -1);
193    // make sure that the primary request has been canceled
194    Thread.sleep(5000);
195    int count = getPrimaryGetCount();
196    Thread.sleep(10000);
197    assertEquals(count, getPrimaryGetCount());
198  }
199
200  @Test
201  public void testReadSpecificReplica() throws Exception {
202    FAIL_PRIMARY_GET = false;
203    REPLICA_ID_TO_COUNT.clear();
204    AsyncTable<?> table = getTable.get();
205    for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
206      readAndCheck(table, replicaId);
207      assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
208    }
209  }
210}