001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver.handler;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.Server;
026import org.apache.hadoop.hbase.TableNotFoundException;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.hbase.client.ClusterConnection;
031import org.apache.hadoop.hbase.client.FlushRegionCallable;
032import org.apache.hadoop.hbase.client.RegionReplicaUtil;
033import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
034import org.apache.hadoop.hbase.executor.EventHandler;
035import org.apache.hadoop.hbase.executor.EventType;
036import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.util.RetryCounter;
040import org.apache.hadoop.hbase.util.RetryCounterFactory;
041import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
042
043/**
044 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
045 * secondary region replicas. This means that a secondary region replica can serve some edits from
046 * it's memstore that are still not flushed from primary. We do not want to allow secondary
047 * region's seqId to go back in time, when this secondary region is opened elsewhere after a
048 * crash or region move. We will trigger a flush cache in the primary region replica and wait
049 * for observing a complete flush cycle before marking the region readsEnabled. This handler does
050 * the flushing of the primary region replica and ensures that regular region opening is not
051 * blocked while the secondary replica is blocked on flush.
052 */
053@InterfaceAudience.Private
054public class RegionReplicaFlushHandler extends EventHandler {
055  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
056
057  private final ClusterConnection connection;
058  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
059  private final RpcControllerFactory rpcControllerFactory;
060  private final int operationTimeout;
061  private final HRegion region;
062
063  public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
064      RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
065      int operationTimeout, HRegion region) {
066    super(server, EventType.RS_REGION_REPLICA_FLUSH);
067    this.connection = connection;
068    this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
069    this.rpcControllerFactory = rpcControllerFactory;
070    this.operationTimeout = operationTimeout;
071    this.region = region;
072  }
073
074  @Override
075  public void process() throws IOException {
076    triggerFlushInPrimaryRegion(region);
077  }
078
079  @Override
080  protected void handleException(Throwable t) {
081    if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
082      LOG.error("Caught throwable while processing event " + eventType, t);
083    } else if (t instanceof RuntimeException) {
084      server.abort("Server aborting", t);
085    } else {
086      // something fishy since we cannot flush the primary region until all retries (retries from
087      // rpc times 35 trigger). We cannot close the region since there is no such mechanism to
088      // close a region without master triggering it. We just abort the server for now.
089      server.abort("ServerAborting because an exception was thrown", t);
090    }
091  }
092
093  private int getRetriesCount(Configuration conf) {
094    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
095      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
096    if (numRetries > 10) {
097      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
098        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
099      numRetries = numRetries / mult; // reset if HRS has multiplied this already
100    }
101    return numRetries;
102  }
103
104  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
105    long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
106      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
107
108    int maxAttempts = getRetriesCount(connection.getConfiguration());
109    RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
110
111    if (LOG.isDebugEnabled()) {
112      LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil.
113          getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() +
114        " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
115    }
116    while (!region.isClosing() && !region.isClosed()
117        && !server.isAborted() && !server.isStopped()) {
118      FlushRegionCallable flushCallable = new FlushRegionCallable(
119        connection, rpcControllerFactory,
120        RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
121
122      // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
123      // do not have to wait for the whole flush here, just initiate it.
124      FlushRegionResponse response = null;
125      try {
126         response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
127          .callWithRetries(flushCallable, this.operationTimeout);
128      } catch (IOException ex) {
129        if (ex instanceof TableNotFoundException
130            || connection.isTableDisabled(region.getRegionInfo().getTable())) {
131          return;
132        }
133        throw ex;
134      }
135
136      if (response.getFlushed()) {
137        // then we have to wait for seeing the flush entry. All reads will be rejected until we see
138        // a complete flush cycle or replay a region open event
139        if (LOG.isDebugEnabled()) {
140          LOG.debug("Triggered flush of primary region replica "
141              + ServerRegionReplicaUtil
142                .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
143                + " for " + region.getRegionInfo().getEncodedName()
144                + "; now waiting and blocking reads until completes a full flush cycle");
145        }
146        region.setReadsEnabled(true);
147        break;
148      } else {
149        if (response.hasWroteFlushWalMarker()) {
150          if(response.getWroteFlushWalMarker()) {
151            if (LOG.isDebugEnabled()) {
152              LOG.debug("Triggered empty flush marker (memstore empty) on primary "
153                  + "region replica " + ServerRegionReplicaUtil
154                    .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
155                  + " for " + region.getRegionInfo().getEncodedName() + "; now waiting and "
156                  + "blocking reads until observing a flush marker");
157            }
158            region.setReadsEnabled(true);
159            break;
160          } else {
161            // somehow we were not able to get the primary to write the flush request. It may be
162            // closing or already flushing. Retry flush again after some sleep.
163            if (!counter.shouldRetry()) {
164              throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
165                  "retries. Failing opening of this region replica "
166                  + region.getRegionInfo().getEncodedName());
167            }
168          }
169        } else {
170          // nothing to do. Are we dealing with an old server?
171          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
172              + "Continuing to open the secondary region replica: "
173              + region.getRegionInfo().getEncodedName());
174          region.setReadsEnabled(true);
175          break;
176        }
177      }
178      try {
179        counter.sleepUntilNextRetry();
180      } catch (InterruptedException e) {
181        throw new InterruptedIOException(e.getMessage());
182      }
183    }
184  }
185
186}