001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.ForkJoinPool;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.function.Supplier;
030import java.util.stream.Stream;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.junit.jupiter.api.AfterAll;
042import org.junit.jupiter.api.TestTemplate;
043import org.junit.jupiter.params.provider.Arguments;
044
045import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
046
047public abstract class AbstractTestAsyncTableRegionReplicasRead {
048
049  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
050
051  protected static TableName TABLE_NAME = TableName.valueOf("async");
052
053  protected static byte[] FAMILY = Bytes.toBytes("cf");
054
055  protected static byte[] QUALIFIER = Bytes.toBytes("cq");
056
057  protected static byte[] ROW = Bytes.toBytes("row");
058
059  protected static byte[] VALUE = Bytes.toBytes("value");
060
061  protected static int REPLICA_COUNT = 3;
062
063  protected static AsyncConnection ASYNC_CONN;
064
065  protected Supplier<AsyncTable<?>> getTable;
066
067  public static Stream<Arguments> parameters() {
068    return Stream.of(
069      Arguments.of((Supplier<AsyncTable<?>>) AbstractTestAsyncTableRegionReplicasRead::getRawTable),
070      Arguments.of((Supplier<AsyncTable<?>>) AbstractTestAsyncTableRegionReplicasRead::getTable));
071  }
072
073  protected AbstractTestAsyncTableRegionReplicasRead(Supplier<AsyncTable<?>> getTable) {
074    this.getTable = getTable;
075  }
076
077  protected static AsyncTable<?> getRawTable() {
078    return ASYNC_CONN.getTable(TABLE_NAME);
079  }
080
081  protected static AsyncTable<?> getTable() {
082    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
083  }
084
085  protected static volatile boolean FAIL_PRIMARY_GET = false;
086
087  protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
088    new ConcurrentHashMap<>();
089
090  public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
091
092    @Override
093    public Optional<RegionObserver> getRegionObserver() {
094      return Optional.of(this);
095    }
096
097    private void recordAndTryFail(ObserverContext<? extends RegionCoprocessorEnvironment> c)
098      throws IOException {
099      RegionInfo region = c.getEnvironment().getRegionInfo();
100      if (!region.getTable().equals(TABLE_NAME)) {
101        return;
102      }
103      REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
104        .incrementAndGet();
105      if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
106        throw new IOException("Inject error");
107      }
108    }
109
110    @Override
111    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
112      List<Cell> result) throws IOException {
113      recordAndTryFail(c);
114    }
115
116    @Override
117    public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan)
118      throws IOException {
119      recordAndTryFail(c);
120    }
121  }
122
123  private static boolean allReplicasHaveRow(byte[] row) throws IOException {
124    for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
125      for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
126        if (region.get(new Get(row), false).isEmpty()) {
127          return false;
128        }
129      }
130    }
131    return true;
132  }
133
134  protected static void startClusterAndCreateTable() throws Exception {
135    TEST_UTIL.startMiniCluster(3);
136    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
137      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
138      .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
139    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
140    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
141  }
142
143  protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
144    // this is the fastest way to let all replicas have the row
145    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
146    TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
147    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
148  }
149
150  @AfterAll
151  public static void tearDownAfterClass() throws Exception {
152    Closeables.close(ASYNC_CONN, true);
153    TEST_UTIL.shutdownMiniCluster();
154  }
155
156  protected static int getSecondaryGetCount() {
157    return REPLICA_ID_TO_COUNT.entrySet().stream()
158      .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
159      .mapToInt(e -> e.getValue().get()).sum();
160  }
161
162  protected static int getPrimaryGetCount() {
163    AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
164    return primaryGetCount != null ? primaryGetCount.get() : 0;
165  }
166
167  // replicaId = -1 means do not set replica
168  protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
169
170  @TestTemplate
171  public void testNoReplicaRead() throws Exception {
172    FAIL_PRIMARY_GET = false;
173    REPLICA_ID_TO_COUNT.clear();
174    AsyncTable<?> table = getTable.get();
175    readAndCheck(table, -1);
176    // the primary region is fine and the primary timeout is 1 second which is long enough, so we
177    // should not send any requests to secondary replicas even if the consistency is timeline.
178    Thread.sleep(5000);
179    assertEquals(0, getSecondaryGetCount());
180  }
181
182  @TestTemplate
183  public void testReplicaRead() throws Exception {
184    // fail the primary get request
185    FAIL_PRIMARY_GET = true;
186    REPLICA_ID_TO_COUNT.clear();
187    // make sure that we could still get the value from secondary replicas
188    AsyncTable<?> table = getTable.get();
189    readAndCheck(table, -1);
190    // make sure that the primary request has been canceled
191    Thread.sleep(5000);
192    int count = getPrimaryGetCount();
193    Thread.sleep(10000);
194    assertEquals(count, getPrimaryGetCount());
195  }
196
197  @TestTemplate
198  public void testReadSpecificReplica() throws Exception {
199    FAIL_PRIMARY_GET = false;
200    REPLICA_ID_TO_COUNT.clear();
201    AsyncTable<?> table = getTable.get();
202    for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
203      readAndCheck(table, replicaId);
204      assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
205    }
206  }
207}