001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.Future;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.DaemonThreadFactory;
036import org.apache.hadoop.hbase.DroppedSnapshotException;
037import org.apache.hadoop.hbase.HBaseInterfaceAudience;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.errorhandling.ForeignException;
040import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
041import org.apache.hadoop.hbase.procedure.ProcedureMember;
042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
044import org.apache.hadoop.hbase.procedure.Subprocedure;
045import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RegionServerServices;
050import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
051import org.apache.zookeeper.KeeperException;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
057 */
058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
059public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
060  private static final Logger LOG =
061      LoggerFactory.getLogger(RegionServerFlushTableProcedureManager.class);
062
063  private static final String CONCURENT_FLUSH_TASKS_KEY =
064      "hbase.flush.procedure.region.concurrentTasks";
065  private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
066
067  public static final String FLUSH_REQUEST_THREADS_KEY =
068      "hbase.flush.procedure.region.pool.threads";
069  public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
070
071  public static final String FLUSH_TIMEOUT_MILLIS_KEY =
072      "hbase.flush.procedure.region.timeout";
073  public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
074
075  public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
076      "hbase.flush.procedure.region.wakefrequency";
077  private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
078
079  private RegionServerServices rss;
080  private ProcedureMemberRpcs memberRpcs;
081  private ProcedureMember member;
082
083  /**
084   * Exposed for testing.
085   * @param conf HBase configuration.
086   * @param server region server.
087   * @param memberRpc use specified memberRpc instance
088   * @param procMember use specified ProcedureMember
089   */
090   RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
091      ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
092    this.rss = server;
093    this.memberRpcs = memberRpc;
094    this.member = procMember;
095  }
096
097  public RegionServerFlushTableProcedureManager() {}
098
099  /**
100   * Start accepting flush table requests.
101   */
102  @Override
103  public void start() {
104    LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
105    this.memberRpcs.start(rss.getServerName().toString(), member);
106  }
107
108  /**
109   * Close <tt>this</tt> and all running tasks
110   * @param force forcefully stop all running tasks
111   * @throws IOException
112   */
113  @Override
114  public void stop(boolean force) throws IOException {
115    String mode = force ? "abruptly" : "gracefully";
116    LOG.info("Stopping region server flush procedure manager " + mode + ".");
117
118    try {
119      this.member.close();
120    } finally {
121      this.memberRpcs.close();
122    }
123  }
124
125  /**
126   * If in a running state, creates the specified subprocedure to flush table regions.
127   *
128   * Because this gets the local list of regions to flush and not the set the master had,
129   * there is a possibility of a race where regions may be missed.
130   *
131   * @param table
132   * @return Subprocedure to submit to the ProcedureMemeber.
133   */
134  public Subprocedure buildSubprocedure(String table) {
135
136    // don't run the subprocedure if the parent is stop(ping)
137    if (rss.isStopping() || rss.isStopped()) {
138      throw new IllegalStateException("Can't start flush region subprocedure on RS: "
139          + rss.getServerName() + ", because stopping/stopped!");
140    }
141
142    // check to see if this server is hosting any regions for the table
143    List<HRegion> involvedRegions;
144    try {
145      involvedRegions = getRegionsToFlush(table);
146    } catch (IOException e1) {
147      throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
148    }
149
150    // We need to run the subprocedure even if we have no relevant regions.  The coordinator
151    // expects participation in the procedure and without sending message the master procedure
152    // will hang and fail.
153
154    LOG.debug("Launching subprocedure to flush regions for " + table);
155    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
156    Configuration conf = rss.getConfiguration();
157    long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY,
158        FLUSH_TIMEOUT_MILLIS_DEFAULT);
159    long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY,
160        FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
161
162    FlushTableSubprocedurePool taskManager =
163        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
164    return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
165      timeoutMillis, involvedRegions, table, taskManager);
166  }
167
168  /**
169   * Get the list of regions to flush for the table on this server
170   *
171   * It is possible that if a region moves somewhere between the calls
172   * we'll miss the region.
173   *
174   * @param table
175   * @return the list of online regions. Empty list is returned if no regions.
176   * @throws IOException
177   */
178  private List<HRegion> getRegionsToFlush(String table) throws IOException {
179    return (List<HRegion>) rss.getRegions(TableName.valueOf(table));
180  }
181
182  public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
183
184    @Override
185    public Subprocedure buildSubprocedure(String name, byte[] data) {
186      // The name of the procedure instance from the master is the table name.
187      return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
188    }
189
190  }
191
192  /**
193   * We use the FlushTableSubprocedurePool, a class specific thread pool instead of
194   * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
195   *
196   * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
197   * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
198   * failures.
199   */
200  static class FlushTableSubprocedurePool {
201    private final Abortable abortable;
202    private final ExecutorCompletionService<Void> taskPool;
203    private final ThreadPoolExecutor executor;
204    private volatile boolean stopped;
205    private final List<Future<Void>> futures = new ArrayList<>();
206    private final String name;
207
208    FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
209      this.abortable = abortable;
210      // configure the executor service
211      long keepAlive = conf.getLong(
212        RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
213        RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
214      int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
215      this.name = name;
216      executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
217          new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
218              + name + ")-flush-proc-pool"));
219      executor.allowCoreThreadTimeOut(true);
220      taskPool = new ExecutorCompletionService<>(executor);
221    }
222
223    boolean hasTasks() {
224      return futures.size() != 0;
225    }
226
227    /**
228     * Submit a task to the pool.
229     *
230     * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}.
231     */
232    void submitTask(final Callable<Void> task) {
233      Future<Void> f = this.taskPool.submit(task);
234      futures.add(f);
235    }
236
237    /**
238     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
239     * This *must* be called after all tasks are submitted via submitTask.
240     *
241     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
242     * @throws InterruptedException
243     */
244    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
245      LOG.debug("Waiting for local region flush to finish.");
246
247      int sz = futures.size();
248      try {
249        // Using the completion service to process the futures.
250        for (int i = 0; i < sz; i++) {
251          Future<Void> f = taskPool.take();
252          f.get();
253          if (!futures.remove(f)) {
254            LOG.warn("unexpected future" + f);
255          }
256          LOG.debug("Completed " + (i+1) + "/" + sz +  " local region flush tasks.");
257        }
258        LOG.debug("Completed " + sz +  " local region flush tasks.");
259        return true;
260      } catch (InterruptedException e) {
261        LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
262        if (!stopped) {
263          Thread.currentThread().interrupt();
264          throw new ForeignException("FlushSubprocedurePool", e);
265        }
266        // we are stopped so we can just exit.
267      } catch (ExecutionException e) {
268        Throwable cause = e.getCause();
269        if (cause instanceof ForeignException) {
270          LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
271          throw (ForeignException)e.getCause();
272        } else if (cause instanceof DroppedSnapshotException) {
273          // we have to abort the region server according to contract of flush
274          abortable.abort("Received DroppedSnapshotException, aborting", cause);
275        }
276        LOG.warn("Got Exception in FlushSubprocedurePool", e);
277        throw new ForeignException(name, e.getCause());
278      } finally {
279        cancelTasks();
280      }
281      return false;
282    }
283
284    /**
285     * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
286     * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
287     * @throws InterruptedException
288     */
289    void cancelTasks() throws InterruptedException {
290      Collection<Future<Void>> tasks = futures;
291      LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
292      for (Future<Void> f: tasks) {
293        f.cancel(false);
294      }
295
296      // evict remaining tasks and futures from taskPool.
297      futures.clear();
298      while (taskPool.poll() != null) {}
299      stop();
300    }
301
302    /**
303     * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
304     * interrupted (see HBASE-13877)
305     */
306    void stop() {
307      if (this.stopped) return;
308
309      this.stopped = true;
310      this.executor.shutdown();
311    }
312  }
313
314  /**
315   * Initialize this region server flush procedure manager
316   * Uses a zookeeper based member controller.
317   * @param rss region server
318   * @throws KeeperException if the zookeeper cannot be reached
319   */
320  @Override
321  public void initialize(RegionServerServices rss) throws KeeperException {
322    this.rss = rss;
323    ZKWatcher zkw = rss.getZooKeeper();
324    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
325      MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
326
327    Configuration conf = rss.getConfiguration();
328    long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
329    int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
330
331    // create the actual flush table procedure member
332    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
333      opThreads, keepAlive);
334    this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
335  }
336
337  @Override
338  public String getProcedureSignature() {
339    return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
340  }
341
342}