001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver.handler;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.Server;
027import org.apache.hadoop.hbase.TableNotFoundException;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.client.ClusterConnection;
032import org.apache.hadoop.hbase.client.FlushRegionCallable;
033import org.apache.hadoop.hbase.client.RegionReplicaUtil;
034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
035import org.apache.hadoop.hbase.executor.EventHandler;
036import org.apache.hadoop.hbase.executor.EventType;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.util.RetryCounter;
041import org.apache.hadoop.hbase.util.RetryCounterFactory;
042import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
043
044/**
045 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
046 * secondary region replicas. This means that a secondary region replica can serve some edits from
047 * it's memstore that that is still not flushed from primary. We do not want to allow secondary
048 * region's seqId to go back in time, when this secondary region is opened elsewhere after a
049 * crash or region move. We will trigger a flush cache in the primary region replica and wait
050 * for observing a complete flush cycle before marking the region readsEnabled. This handler does
051 * the flushing of the primary region replica and ensures that regular region opening is not
052 * blocked while the secondary replica is blocked on flush.
053 */
054@InterfaceAudience.Private
055public class RegionReplicaFlushHandler extends EventHandler {
056
057  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
058
059  private final ClusterConnection connection;
060  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
061  private final RpcControllerFactory rpcControllerFactory;
062  private final int operationTimeout;
063  private final HRegion region;
064
065  public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
066      RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
067      int operationTimeout, HRegion region) {
068    super(server, EventType.RS_REGION_REPLICA_FLUSH);
069    this.connection = connection;
070    this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
071    this.rpcControllerFactory = rpcControllerFactory;
072    this.operationTimeout = operationTimeout;
073    this.region = region;
074  }
075
076  @Override
077  public void process() throws IOException {
078    triggerFlushInPrimaryRegion(region);
079  }
080
081  @Override
082  protected void handleException(Throwable t) {
083    if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
084      LOG.error("Caught throwable while processing event " + eventType, t);
085    } else if (t instanceof RuntimeException) {
086      server.abort("ServerAborting because a runtime exception was thrown", t);
087    } else {
088      // something fishy since we cannot flush the primary region until all retries (retries from
089      // rpc times 35 trigger). We cannot close the region since there is no such mechanism to
090      // close a region without master triggering it. We just abort the server for now.
091      server.abort("ServerAborting because an exception was thrown", t);
092    }
093  }
094
095  private int getRetriesCount(Configuration conf) {
096    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
097      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
098    if (numRetries > 10) {
099      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
100        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
101      numRetries = numRetries / mult; // reset if HRS has multiplied this already
102    }
103    return numRetries;
104  }
105
106  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
107    long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
108      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
109
110    int maxAttempts = getRetriesCount(connection.getConfiguration());
111    RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
112
113    if (LOG.isDebugEnabled()) {
114      LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
115        .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
116       + region.getRegionInfo().getEncodedName() + " to trigger a flush");
117    }
118    while (!region.isClosing() && !region.isClosed()
119        && !server.isAborted() && !server.isStopped()) {
120      FlushRegionCallable flushCallable = new FlushRegionCallable(
121        connection, rpcControllerFactory,
122        RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
123
124      // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
125      // do not have to wait for the whole flush here, just initiate it.
126      FlushRegionResponse response = null;
127      try {
128         response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
129          .callWithRetries(flushCallable, this.operationTimeout);
130      } catch (IOException ex) {
131        if (ex instanceof TableNotFoundException
132            || connection.isTableDisabled(region.getRegionInfo().getTable())) {
133          return;
134        }
135        throw ex;
136      }
137
138      if (response.getFlushed()) {
139        // then we have to wait for seeing the flush entry. All reads will be rejected until we see
140        // a complete flush cycle or replay a region open event
141        if (LOG.isDebugEnabled()) {
142          LOG.debug("Successfully triggered a flush of primary region replica "
143              + ServerRegionReplicaUtil
144                .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
145                + " of region " + region.getRegionInfo().getEncodedName()
146                + " Now waiting and blocking reads until observing a full flush cycle");
147        }
148        region.setReadsEnabled(true);
149        break;
150      } else {
151        if (response.hasWroteFlushWalMarker()) {
152          if(response.getWroteFlushWalMarker()) {
153            if (LOG.isDebugEnabled()) {
154              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
155                  + "region replica " + ServerRegionReplicaUtil
156                    .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
157                  + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
158                  + "blocking reads until observing a flush marker");
159            }
160            region.setReadsEnabled(true);
161            break;
162          } else {
163            // somehow we were not able to get the primary to write the flush request. It may be
164            // closing or already flushing. Retry flush again after some sleep.
165            if (!counter.shouldRetry()) {
166              throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
167                  "retries. Failing opening of this region replica "
168                  + region.getRegionInfo().getEncodedName());
169            }
170          }
171        } else {
172          // nothing to do. Are we dealing with an old server?
173          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
174              + "Continuing to open the secondary region replica: "
175              + region.getRegionInfo().getEncodedName());
176          region.setReadsEnabled(true);
177          break;
178        }
179      }
180      try {
181        counter.sleepUntilNextRetry();
182      } catch (InterruptedException e) {
183        throw new InterruptedIOException(e.getMessage());
184      }
185    }
186  }
187
188}