001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.handler;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.Server;
025import org.apache.hadoop.hbase.TableNotFoundException;
026import org.apache.hadoop.hbase.client.ClusterConnection;
027import org.apache.hadoop.hbase.client.FlushRegionCallable;
028import org.apache.hadoop.hbase.client.RegionReplicaUtil;
029import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
030import org.apache.hadoop.hbase.executor.EventHandler;
031import org.apache.hadoop.hbase.executor.EventType;
032import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
033import org.apache.hadoop.hbase.regionserver.HRegion;
034import org.apache.hadoop.hbase.util.RetryCounter;
035import org.apache.hadoop.hbase.util.RetryCounterFactory;
036import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
042
043/**
044 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
045 * secondary region replicas. This means that a secondary region replica can serve some edits from
046 * it's memstore that are still not flushed from primary. We do not want to allow secondary region's
047 * seqId to go back in time, when this secondary region is opened elsewhere after a crash or region
048 * move. We will trigger a flush cache in the primary region replica and wait for observing a
049 * complete flush cycle before marking the region readsEnabled. This handler does the flushing of
050 * the primary region replica and ensures that regular region opening is not blocked while the
051 * secondary replica is blocked on flush.
052 */
053@InterfaceAudience.Private
054public class RegionReplicaFlushHandler extends EventHandler {
055  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
056
057  private final ClusterConnection connection;
058  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
059  private final RpcControllerFactory rpcControllerFactory;
060  private final int operationTimeout;
061  private final HRegion region;
062
063  public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
064    RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
065    int operationTimeout, HRegion region) {
066    super(server, EventType.RS_REGION_REPLICA_FLUSH);
067    this.connection = connection;
068    this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
069    this.rpcControllerFactory = rpcControllerFactory;
070    this.operationTimeout = operationTimeout;
071    this.region = region;
072  }
073
074  @Override
075  public void process() throws IOException {
076    triggerFlushInPrimaryRegion(region);
077  }
078
079  @Override
080  protected void handleException(Throwable t) {
081    if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
082      LOG.error("Caught throwable while processing event " + eventType, t);
083    } else if (t instanceof RuntimeException) {
084      server.abort("Server aborting", t);
085    } else {
086      // something fishy since we cannot flush the primary region until all retries (retries from
087      // rpc times 35 trigger). We cannot close the region since there is no such mechanism to
088      // close a region without master triggering it. We just abort the server for now.
089      server.abort("ServerAborting because an exception was thrown", t);
090    }
091  }
092
093  private int getRetriesCount(Configuration conf) {
094    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
095      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
096    if (numRetries > 10) {
097      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
098        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
099      numRetries = numRetries / mult; // reset if HRS has multiplied this already
100    }
101    return numRetries;
102  }
103
104  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
105    long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
106      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
107
108    int maxAttempts = getRetriesCount(connection.getConfiguration());
109    RetryCounter counter = new RetryCounterFactory(maxAttempts, (int) pause).create();
110
111    if (LOG.isDebugEnabled()) {
112      LOG.debug("RPC'ing to primary "
113        + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
114          .getRegionNameAsString()
115        + " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
116    }
117    while (
118      !region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()
119    ) {
120      FlushRegionCallable flushCallable = new FlushRegionCallable(connection, rpcControllerFactory,
121        RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
122
123      // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
124      // do not have to wait for the whole flush here, just initiate it.
125      FlushRegionResponse response = null;
126      try {
127        response = rpcRetryingCallerFactory.<FlushRegionResponse> newCaller()
128          .callWithRetries(flushCallable, this.operationTimeout);
129      } catch (IOException ex) {
130        if (
131          ex instanceof TableNotFoundException
132            || connection.isTableDisabled(region.getRegionInfo().getTable())
133        ) {
134          return;
135        }
136        throw ex;
137      }
138
139      if (response.getFlushed()) {
140        // then we have to wait for seeing the flush entry. All reads will be rejected until we see
141        // a complete flush cycle or replay a region open event
142        if (LOG.isDebugEnabled()) {
143          LOG.debug("Triggered flush of primary region replica "
144            + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
145              .getEncodedName()
146            + " for " + region.getRegionInfo().getEncodedName()
147            + "; now waiting and blocking reads until completes a full flush cycle");
148        }
149        region.setReadsEnabled(true);
150        break;
151      } else {
152        if (response.hasWroteFlushWalMarker()) {
153          if (response.getWroteFlushWalMarker()) {
154            if (LOG.isDebugEnabled()) {
155              LOG.debug(
156                "Triggered empty flush marker (memstore empty) on primary " + "region replica "
157                  + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
158                    .getEncodedName()
159                  + " for " + region.getRegionInfo().getEncodedName() + "; now waiting and "
160                  + "blocking reads until observing a flush marker");
161            }
162            region.setReadsEnabled(true);
163            break;
164          } else {
165            // somehow we were not able to get the primary to write the flush request. It may be
166            // closing or already flushing. Retry flush again after some sleep.
167            if (!counter.shouldRetry()) {
168              throw new IOException("Cannot cause primary to flush or drop a wal marker after "
169                + "retries. Failing opening of this region replica "
170                + region.getRegionInfo().getEncodedName());
171            }
172          }
173        } else {
174          // nothing to do. Are we dealing with an old server?
175          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
176            + "Continuing to open the secondary region replica: "
177            + region.getRegionInfo().getEncodedName());
178          region.setReadsEnabled(true);
179          break;
180        }
181      }
182      try {
183        counter.sleepUntilNextRetry();
184      } catch (InterruptedException e) {
185        throw new InterruptedIOException(e.getMessage());
186      }
187    }
188  }
189
190}