001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.Future;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.DroppedSnapshotException;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.errorhandling.ForeignException;
036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
037import org.apache.hadoop.hbase.procedure.ProcedureMember;
038import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
039import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
040import org.apache.hadoop.hbase.procedure.Subprocedure;
041import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
042import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.RegionServerServices;
046import org.apache.hadoop.hbase.util.Threads;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.zookeeper.KeeperException;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
056
057/**
058 * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
059 */
060@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
061public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
062  private static final Logger LOG =
063      LoggerFactory.getLogger(RegionServerFlushTableProcedureManager.class);
064
065  private static final String CONCURENT_FLUSH_TASKS_KEY =
066      "hbase.flush.procedure.region.concurrentTasks";
067  private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
068
069  public static final String FLUSH_REQUEST_THREADS_KEY =
070      "hbase.flush.procedure.region.pool.threads";
071  public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
072
073  public static final String FLUSH_TIMEOUT_MILLIS_KEY =
074      "hbase.flush.procedure.region.timeout";
075  public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
076
077  public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
078      "hbase.flush.procedure.region.wakefrequency";
079  private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
080
081  private RegionServerServices rss;
082  private ProcedureMemberRpcs memberRpcs;
083  private ProcedureMember member;
084
085  /**
086   * Exposed for testing.
087   * @param conf HBase configuration.
088   * @param server region server.
089   * @param memberRpc use specified memberRpc instance
090   * @param procMember use specified ProcedureMember
091   */
092   RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
093      ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
094    this.rss = server;
095    this.memberRpcs = memberRpc;
096    this.member = procMember;
097  }
098
099  public RegionServerFlushTableProcedureManager() {}
100
101  /**
102   * Start accepting flush table requests.
103   */
104  @Override
105  public void start() {
106    LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
107    this.memberRpcs.start(rss.getServerName().toString(), member);
108  }
109
110  /**
111   * Close <tt>this</tt> and all running tasks
112   * @param force forcefully stop all running tasks
113   * @throws IOException
114   */
115  @Override
116  public void stop(boolean force) throws IOException {
117    String mode = force ? "abruptly" : "gracefully";
118    LOG.info("Stopping region server flush procedure manager " + mode + ".");
119
120    try {
121      this.member.close();
122    } finally {
123      this.memberRpcs.close();
124    }
125  }
126
127  /**
128   * If in a running state, creates the specified subprocedure to flush table regions.
129   *
130   * Because this gets the local list of regions to flush and not the set the master had,
131   * there is a possibility of a race where regions may be missed.
132   *
133   * @param table table to flush
134   * @param family column family within a table
135   * @return Subprocedure to submit to the ProcedureMember.
136   */
137  public Subprocedure buildSubprocedure(String table, String family) {
138
139    // don't run the subprocedure if the parent is stop(ping)
140    if (rss.isStopping() || rss.isStopped()) {
141      throw new IllegalStateException("Can't start flush region subprocedure on RS: "
142          + rss.getServerName() + ", because stopping/stopped!");
143    }
144
145    // check to see if this server is hosting any regions for the table
146    List<HRegion> involvedRegions;
147    try {
148      involvedRegions = getRegionsToFlush(table);
149    } catch (IOException e1) {
150      throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
151    }
152
153    // We need to run the subprocedure even if we have no relevant regions.  The coordinator
154    // expects participation in the procedure and without sending message the master procedure
155    // will hang and fail.
156
157    LOG.debug("Launching subprocedure to flush regions for " + table);
158    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
159    Configuration conf = rss.getConfiguration();
160    long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY,
161        FLUSH_TIMEOUT_MILLIS_DEFAULT);
162    long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY,
163        FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
164
165    FlushTableSubprocedurePool taskManager =
166        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
167    return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
168      timeoutMillis, involvedRegions, table, family, taskManager);
169  }
170
171  /**
172   * Get the list of regions to flush for the table on this server
173   *
174   * It is possible that if a region moves somewhere between the calls
175   * we'll miss the region.
176   *
177   * @param table
178   * @return the list of online regions. Empty list is returned if no regions.
179   * @throws IOException
180   */
181  private List<HRegion> getRegionsToFlush(String table) throws IOException {
182    return (List<HRegion>) rss.getRegions(TableName.valueOf(table));
183  }
184
185  public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
186
187    @Override
188    public Subprocedure buildSubprocedure(String name, byte[] data) {
189      String family = null;
190      // Currently we do not put other data except family, so it is ok to
191      // judge by length that if family was specified
192      if (data.length > 0) {
193        try {
194          HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
195          family = nsp.getValue();
196        } catch (Exception e) {
197          LOG.error("fail to get family by parsing from data", e);
198        }
199      }
200      // The name of the procedure instance from the master is the table name.
201      return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
202    }
203
204  }
205
206  /**
207   * We use the FlushTableSubprocedurePool, a class specific thread pool instead of
208   * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
209   *
210   * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
211   * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
212   * failures.
213   */
214  static class FlushTableSubprocedurePool {
215    private final Abortable abortable;
216    private final ExecutorCompletionService<Void> taskPool;
217    private final ThreadPoolExecutor executor;
218    private volatile boolean stopped;
219    private final List<Future<Void>> futures = new ArrayList<>();
220    private final String name;
221
222    FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
223      this.abortable = abortable;
224      // configure the executor service
225      long keepAlive = conf.getLong(
226        RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
227        RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
228      int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
229      this.name = name;
230      executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
231        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d")
232          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
233      taskPool = new ExecutorCompletionService<>(executor);
234    }
235
236    boolean hasTasks() {
237      return futures.size() != 0;
238    }
239
240    /**
241     * Submit a task to the pool.
242     *
243     * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}.
244     */
245    void submitTask(final Callable<Void> task) {
246      Future<Void> f = this.taskPool.submit(task);
247      futures.add(f);
248    }
249
250    /**
251     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
252     * This *must* be called after all tasks are submitted via submitTask.
253     *
254     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
255     * @throws InterruptedException
256     */
257    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
258      LOG.debug("Waiting for local region flush to finish.");
259
260      int sz = futures.size();
261      try {
262        // Using the completion service to process the futures.
263        for (int i = 0; i < sz; i++) {
264          Future<Void> f = taskPool.take();
265          f.get();
266          if (!futures.remove(f)) {
267            LOG.warn("unexpected future" + f);
268          }
269          LOG.debug("Completed " + (i+1) + "/" + sz +  " local region flush tasks.");
270        }
271        LOG.debug("Completed " + sz +  " local region flush tasks.");
272        return true;
273      } catch (InterruptedException e) {
274        LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
275        if (!stopped) {
276          Thread.currentThread().interrupt();
277          throw new ForeignException("FlushSubprocedurePool", e);
278        }
279        // we are stopped so we can just exit.
280      } catch (ExecutionException e) {
281        Throwable cause = e.getCause();
282        if (cause instanceof ForeignException) {
283          LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
284          throw (ForeignException)e.getCause();
285        } else if (cause instanceof DroppedSnapshotException) {
286          // we have to abort the region server according to contract of flush
287          abortable.abort("Received DroppedSnapshotException, aborting", cause);
288        }
289        LOG.warn("Got Exception in FlushSubprocedurePool", e);
290        throw new ForeignException(name, e.getCause());
291      } finally {
292        cancelTasks();
293      }
294      return false;
295    }
296
297    /**
298     * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
299     * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
300     * @throws InterruptedException
301     */
302    void cancelTasks() throws InterruptedException {
303      Collection<Future<Void>> tasks = futures;
304      LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
305      for (Future<Void> f: tasks) {
306        f.cancel(false);
307      }
308
309      // evict remaining tasks and futures from taskPool.
310      futures.clear();
311      while (taskPool.poll() != null) {}
312      stop();
313    }
314
315    /**
316     * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
317     * interrupted (see HBASE-13877)
318     */
319    void stop() {
320      if (this.stopped) return;
321
322      this.stopped = true;
323      this.executor.shutdown();
324    }
325  }
326
327  /**
328   * Initialize this region server flush procedure manager
329   * Uses a zookeeper based member controller.
330   * @param rss region server
331   * @throws KeeperException if the zookeeper cannot be reached
332   */
333  @Override
334  public void initialize(RegionServerServices rss) throws KeeperException {
335    this.rss = rss;
336    ZKWatcher zkw = rss.getZooKeeper();
337    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
338      MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
339
340    Configuration conf = rss.getConfiguration();
341    long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
342    int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
343
344    // create the actual flush table procedure member
345    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
346      opThreads, keepAlive);
347    this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
348  }
349
350  @Override
351  public String getProcedureSignature() {
352    return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
353  }
354
355}