001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.Future;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.DroppedSnapshotException;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.errorhandling.ForeignException;
036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
037import org.apache.hadoop.hbase.procedure.ProcedureMember;
038import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
039import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
040import org.apache.hadoop.hbase.procedure.Subprocedure;
041import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
042import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.RegionServerServices;
046import org.apache.hadoop.hbase.util.Strings;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.zookeeper.KeeperException;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
058
059/**
060 * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
061 */
062@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
063public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
064  private static final Logger LOG =
065    LoggerFactory.getLogger(RegionServerFlushTableProcedureManager.class);
066
067  private static final String CONCURENT_FLUSH_TASKS_KEY =
068    "hbase.flush.procedure.region.concurrentTasks";
069  private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
070
071  public static final String FLUSH_REQUEST_THREADS_KEY =
072    "hbase.flush.procedure.region.pool.threads";
073  public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
074
075  public static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.procedure.region.timeout";
076  public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
077
078  public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
079    "hbase.flush.procedure.region.wakefrequency";
080  private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
081
082  private RegionServerServices rss;
083  private ProcedureMemberRpcs memberRpcs;
084  private ProcedureMember member;
085
086  /**
087   * Exposed for testing.
088   * @param conf       HBase configuration.
089   * @param server     region server.
090   * @param memberRpc  use specified memberRpc instance
091   * @param procMember use specified ProcedureMember
092   */
093  RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
094    ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
095    this.rss = server;
096    this.memberRpcs = memberRpc;
097    this.member = procMember;
098  }
099
100  public RegionServerFlushTableProcedureManager() {
101  }
102
103  /**
104   * Start accepting flush table requests.
105   */
106  @Override
107  public void start() {
108    LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
109    this.memberRpcs.start(rss.getServerName().toString(), member);
110  }
111
112  /**
113   * Close <tt>this</tt> and all running tasks
114   * @param force forcefully stop all running tasks
115   */
116  @Override
117  public void stop(boolean force) throws IOException {
118    String mode = force ? "abruptly" : "gracefully";
119    LOG.info("Stopping region server flush procedure manager " + mode + ".");
120
121    try {
122      this.member.close();
123    } finally {
124      this.memberRpcs.close();
125    }
126  }
127
128  /**
129   * If in a running state, creates the specified subprocedure to flush table regions. Because this
130   * gets the local list of regions to flush and not the set the master had, there is a possibility
131   * of a race where regions may be missed.
132   * @return Subprocedure to submit to the ProcedureMember.
133   */
134  public Subprocedure buildSubprocedure(String table, List<String> families) {
135
136    // don't run the subprocedure if the parent is stop(ping)
137    if (rss.isStopping() || rss.isStopped()) {
138      throw new IllegalStateException("Can't start flush region subprocedure on RS: "
139        + rss.getServerName() + ", because stopping/stopped!");
140    }
141
142    // check to see if this server is hosting any regions for the table
143    List<HRegion> involvedRegions;
144    try {
145      involvedRegions = getRegionsToFlush(table);
146    } catch (IOException e1) {
147      throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
148    }
149
150    // We need to run the subprocedure even if we have no relevant regions. The coordinator
151    // expects participation in the procedure and without sending message the master procedure
152    // will hang and fail.
153
154    LOG.debug("Launching subprocedure to flush regions for " + table);
155    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
156    Configuration conf = rss.getConfiguration();
157    long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
158    long wakeMillis =
159      conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
160
161    FlushTableSubprocedurePool taskManager =
162      new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
163    return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis,
164      involvedRegions, table, families, taskManager);
165  }
166
167  /**
168   * Get the list of regions to flush for the table on this server It is possible that if a region
169   * moves somewhere between the calls we'll miss the region.
170   * @return the list of online regions. Empty list is returned if no regions.
171   */
172  private List<HRegion> getRegionsToFlush(String table) throws IOException {
173    return (List<HRegion>) rss.getRegions(TableName.valueOf(table));
174  }
175
176  public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
177
178    @Override
179    public Subprocedure buildSubprocedure(String name, byte[] data) {
180      List<String> families = null;
181      // Currently we do not put other data except families, so it is ok to
182      // judge by length that if families were specified
183      if (data.length > 0) {
184        try {
185          HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
186          families = ImmutableList.copyOf(Strings.SPLITTER.split(nsp.getValue()));
187        } catch (Exception e) {
188          LOG.error("fail to get families by parsing from data", e);
189        }
190      }
191      // The name of the procedure instance from the master is the table name.
192      return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, families);
193    }
194
195  }
196
197  /**
198   * We use the FlushTableSubprocedurePool, a class specific thread pool instead of
199   * {@link org.apache.hadoop.hbase.executor.ExecutorService}. It uses a
200   * {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of completed
201   * tasks which lets us efficiently cancel pending tasks upon the earliest operation failures.
202   */
203  static class FlushTableSubprocedurePool {
204    private final Abortable abortable;
205    private final ExecutorCompletionService<Void> taskPool;
206    private final ThreadPoolExecutor executor;
207    private volatile boolean stopped;
208    private final List<Future<Void>> futures = new ArrayList<>();
209    private final String name;
210
211    FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
212      this.abortable = abortable;
213      // configure the executor service
214      long keepAlive = conf.getLong(RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
215        RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
216      int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
217      this.name = name;
218      executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
219        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d")
220          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
221      taskPool = new ExecutorCompletionService<>(executor);
222    }
223
224    boolean hasTasks() {
225      return futures.size() != 0;
226    }
227
228    /**
229     * Submit a task to the pool. NOTE: all must be submitted before you can safely
230     * {@link #waitForOutstandingTasks()}.
231     */
232    void submitTask(final Callable<Void> task) {
233      Future<Void> f = this.taskPool.submit(task);
234      futures.add(f);
235    }
236
237    /**
238     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
239     * This *must* be called after all tasks are submitted via submitTask.
240     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
241     */
242    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
243      LOG.debug("Waiting for local region flush to finish.");
244
245      int sz = futures.size();
246      try {
247        // Using the completion service to process the futures.
248        for (int i = 0; i < sz; i++) {
249          Future<Void> f = taskPool.take();
250          f.get();
251          if (!futures.remove(f)) {
252            LOG.warn("unexpected future" + f);
253          }
254          LOG.debug("Completed " + (i + 1) + "/" + sz + " local region flush tasks.");
255        }
256        LOG.debug("Completed " + sz + " local region flush tasks.");
257        return true;
258      } catch (InterruptedException e) {
259        LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
260        if (!stopped) {
261          Thread.currentThread().interrupt();
262          throw new ForeignException("FlushSubprocedurePool", e);
263        }
264        // we are stopped so we can just exit.
265      } catch (ExecutionException e) {
266        Throwable cause = e.getCause();
267        if (cause instanceof ForeignException) {
268          LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
269          throw (ForeignException) e.getCause();
270        } else if (cause instanceof DroppedSnapshotException) {
271          // we have to abort the region server according to contract of flush
272          abortable.abort("Received DroppedSnapshotException, aborting", cause);
273        }
274        LOG.warn("Got Exception in FlushSubprocedurePool", e);
275        throw new ForeignException(name, e.getCause());
276      } finally {
277        cancelTasks();
278      }
279      return false;
280    }
281
282    /**
283     * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
284     * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
285     */
286    void cancelTasks() throws InterruptedException {
287      Collection<Future<Void>> tasks = futures;
288      LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
289      for (Future<Void> f : tasks) {
290        f.cancel(false);
291      }
292
293      // evict remaining tasks and futures from taskPool.
294      futures.clear();
295      while (taskPool.poll() != null) {
296      }
297      stop();
298    }
299
300    /**
301     * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be interrupted
302     * (see HBASE-13877)
303     */
304    void stop() {
305      if (this.stopped) return;
306
307      this.stopped = true;
308      this.executor.shutdown();
309    }
310  }
311
312  /**
313   * Initialize this region server flush procedure manager Uses a zookeeper based member controller.
314   * @param rss region server
315   * @throws KeeperException if the zookeeper cannot be reached
316   */
317  @Override
318  public void initialize(RegionServerServices rss) throws KeeperException {
319    this.rss = rss;
320    ZKWatcher zkw = rss.getZooKeeper();
321    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
322      MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
323
324    Configuration conf = rss.getConfiguration();
325    long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
326    int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
327
328    // create the actual flush table procedure member
329    ThreadPoolExecutor pool =
330      ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
331    this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
332  }
333
334  @Override
335  public String getProcedureSignature() {
336    return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
337  }
338
339}