001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.handler;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.Server;
025import org.apache.hadoop.hbase.TableNotFoundException;
026import org.apache.hadoop.hbase.client.AsyncClusterConnection;
027import org.apache.hadoop.hbase.executor.EventHandler;
028import org.apache.hadoop.hbase.executor.EventType;
029import org.apache.hadoop.hbase.regionserver.HRegion;
030import org.apache.hadoop.hbase.util.FutureUtils;
031import org.apache.hadoop.hbase.util.RetryCounter;
032import org.apache.hadoop.hbase.util.RetryCounterFactory;
033import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
039
040/**
041 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
042 * secondary region replicas. This means that a secondary region replica can serve some edits from
043 * it's memstore that are still not flushed from primary. We do not want to allow secondary region's
044 * seqId to go back in time, when this secondary region is opened elsewhere after a crash or region
045 * move. We will trigger a flush cache in the primary region replica and wait for observing a
046 * complete flush cycle before marking the region readsEnabled. This handler does the flushing of
047 * the primary region replica and ensures that regular region opening is not blocked while the
048 * secondary replica is blocked on flush.
049 */
050@InterfaceAudience.Private
051public class RegionReplicaFlushHandler extends EventHandler {
052  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
053
054  private final AsyncClusterConnection connection;
055
056  private final HRegion region;
057
058  public RegionReplicaFlushHandler(Server server, HRegion region) {
059    super(server, EventType.RS_REGION_REPLICA_FLUSH);
060    this.connection = server.getAsyncClusterConnection();
061    this.region = region;
062  }
063
064  @Override
065  public void process() throws IOException {
066    triggerFlushInPrimaryRegion(region);
067  }
068
069  @Override
070  protected void handleException(Throwable t) {
071    if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
072      LOG.error("Caught throwable while processing event " + eventType, t);
073    } else if (t instanceof RuntimeException) {
074      server.abort("Server aborting", t);
075    } else {
076      // something fishy since we cannot flush the primary region until all retries (retries from
077      // rpc times 35 trigger). We cannot close the region since there is no such mechanism to
078      // close a region without master triggering it. We just abort the server for now.
079      server.abort("ServerAborting because an exception was thrown", t);
080    }
081  }
082
083  private int getRetriesCount(Configuration conf) {
084    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
085      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
086    if (numRetries > 10) {
087      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
088        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
089      numRetries = numRetries / mult; // reset if HRS has multiplied this already
090    }
091    return numRetries;
092  }
093
094  private boolean isTableDisabledOrDropped(IOException error, HRegion region) {
095    if (error instanceof TableNotFoundException) {
096      return true;
097    }
098    try {
099      if (
100        FutureUtils.get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))
101      ) {
102        return true;
103      }
104    } catch (IOException e) {
105      if (error instanceof TableNotFoundException) {
106        return true;
107      } else {
108        LOG.warn("failed tp check whether table {} is disabled", region.getRegionInfo().getTable(),
109          e);
110      }
111    }
112    return false;
113  }
114
115  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
116    long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
117      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
118
119    int maxAttempts = getRetriesCount(connection.getConfiguration());
120    RetryCounter counter = new RetryCounterFactory(maxAttempts, (int) pause).create();
121
122    if (LOG.isDebugEnabled()) {
123      LOG.debug("RPC'ing to primary "
124        + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
125          .getRegionNameAsString()
126        + " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
127    }
128    while (
129      !region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()
130    ) {
131      // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
132      // do not have to wait for the whole flush here, just initiate it.
133      FlushRegionResponse response;
134      try {
135        response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
136          .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
137      } catch (IOException e) {
138        if (isTableDisabledOrDropped(e, region)) {
139          return;
140        }
141        if (!counter.shouldRetry()) {
142          throw e;
143        }
144        // The reason that why we need to retry here is that, the retry for asynchronous admin
145        // request is much simpler than the normal operation, if we failed to locate the region once
146        // then we will throw the exception out and will not try to relocate again. So here we need
147        // to add some retries by ourselves to prevent shutting down the region server too
148        // frequent...
149        LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
150          ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
151            .getRegionNameAsString(),
152          region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
153        try {
154          counter.sleepUntilNextRetry();
155        } catch (InterruptedException e1) {
156          throw new InterruptedIOException(e1.getMessage());
157        }
158        continue;
159      }
160
161      if (response.getFlushed()) {
162        // then we have to wait for seeing the flush entry. All reads will be rejected until we see
163        // a complete flush cycle or replay a region open event
164        if (LOG.isDebugEnabled()) {
165          LOG.debug("Triggered flush of primary region replica "
166            + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
167              .getRegionNameAsString()
168            + " for " + region.getRegionInfo().getEncodedName()
169            + "; now waiting and blocking reads until completes a full flush cycle");
170        }
171        region.setReadsEnabled(true);
172        break;
173      } else {
174        if (response.hasWroteFlushWalMarker()) {
175          if (response.getWroteFlushWalMarker()) {
176            if (LOG.isDebugEnabled()) {
177              LOG.debug("Triggered empty flush marker (memstore empty) on primary region replica "
178                + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
179                  .getRegionNameAsString()
180                + " for " + region.getRegionInfo().getEncodedName()
181                + "; now waiting and blocking reads until observing a flush marker");
182            }
183            region.setReadsEnabled(true);
184            break;
185          } else {
186            // somehow we were not able to get the primary to write the flush request. It may be
187            // closing or already flushing. Retry flush again after some sleep.
188            if (!counter.shouldRetry()) {
189              throw new IOException("Cannot cause primary to flush or drop a wal marker after "
190                + counter.getAttemptTimes() + " retries. Failing opening of this region replica "
191                + region.getRegionInfo().getRegionNameAsString());
192            } else {
193              LOG.warn(
194                "Cannot cause primary replica {} to flush or drop a wal marker "
195                  + "for region replica {}, retry={}",
196                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
197                  .getRegionNameAsString(),
198                region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
199            }
200          }
201        } else {
202          // nothing to do. Are we dealing with an old server?
203          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
204            + "Continuing to open the secondary region replica: "
205            + region.getRegionInfo().getRegionNameAsString());
206          break;
207        }
208      }
209      try {
210        counter.sleepUntilNextRetry();
211      } catch (InterruptedException e) {
212        throw new InterruptedIOException(e.getMessage());
213      }
214    }
215    region.setReadsEnabled(true);
216  }
217}