001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.Future;
030import java.util.concurrent.ThreadPoolExecutor;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.regionserver.RegionServerServices;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.apache.hadoop.hbase.errorhandling.ForeignException;
039import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041import org.apache.zookeeper.KeeperException;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045public class SimpleRSProcedureManager extends RegionServerProcedureManager {
046
047  private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
048
049  private RegionServerServices rss;
050  private ProcedureMemberRpcs memberRpcs;
051  private ProcedureMember member;
052
053  @Override
054  public void initialize(RegionServerServices rss) throws KeeperException {
055    this.rss = rss;
056    ZKWatcher zkw = rss.getZooKeeper();
057    this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
058
059    ThreadPoolExecutor pool =
060        ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
061    this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
062    LOG.info("Initialized: " + rss.getServerName().toString());
063  }
064
065  @Override
066  public void start() {
067    this.memberRpcs.start(rss.getServerName().toString(), member);
068    LOG.info("Started.");
069  }
070
071  @Override
072  public void stop(boolean force) throws IOException {
073    LOG.info("stop: " + force);
074    try {
075      this.member.close();
076    } finally {
077      this.memberRpcs.close();
078    }
079  }
080
081  @Override
082  public String getProcedureSignature() {
083    return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
084  }
085
086  /**
087   * If in a running state, creates the specified subprocedure for handling a procedure.
088   * @return Subprocedure to submit to the ProcedureMember.
089   */
090  public Subprocedure buildSubprocedure(String name) {
091
092    // don't run a procedure if the parent is stop(ping)
093    if (rss.isStopping() || rss.isStopped()) {
094      throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
095          + ", because stopping/stopped!");
096    }
097
098    LOG.info("Attempting to run a procedure.");
099    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
100    Configuration conf = rss.getConfiguration();
101
102    SimpleSubprocedurePool taskManager =
103        new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
104    return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
105  }
106
107  /**
108   * Build the actual procedure runner that will do all the 'hard' work
109   */
110  public class SimleSubprocedureBuilder implements SubprocedureFactory {
111
112    @Override
113    public Subprocedure buildSubprocedure(String name, byte[] data) {
114      LOG.info("Building procedure: " + name);
115      return SimpleRSProcedureManager.this.buildSubprocedure(name);
116    }
117  }
118
119  public static class SimpleSubprocedurePool implements Closeable, Abortable {
120
121    private final ExecutorCompletionService<Void> taskPool;
122    private final ExecutorService executor;
123    private volatile boolean aborted;
124    private final List<Future<Void>> futures = new ArrayList<>();
125    private final String name;
126
127    public SimpleSubprocedurePool(String name, Configuration conf) {
128      this.name = name;
129      executor = Executors.newSingleThreadExecutor(
130        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
131          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
132      taskPool = new ExecutorCompletionService<>(executor);
133    }
134
135    /**
136     * Submit a task to the pool.
137     */
138    public void submitTask(final Callable<Void> task) {
139      Future<Void> f = this.taskPool.submit(task);
140      futures.add(f);
141    }
142
143    /**
144     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
145     *
146     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
147     * @throws ForeignException
148     */
149    public boolean waitForOutstandingTasks() throws ForeignException {
150      LOG.debug("Waiting for procedure to finish.");
151
152      try {
153        for (Future<Void> f: futures) {
154          f.get();
155        }
156        return true;
157      } catch (InterruptedException e) {
158        if (aborted) throw new ForeignException(
159            "Interrupted and found to be aborted while waiting for tasks!", e);
160        Thread.currentThread().interrupt();
161      } catch (ExecutionException e) {
162        if (e.getCause() instanceof ForeignException) {
163          throw (ForeignException) e.getCause();
164        }
165        throw new ForeignException(name, e.getCause());
166      } finally {
167        // close off remaining tasks
168        for (Future<Void> f: futures) {
169          if (!f.isDone()) {
170            f.cancel(true);
171          }
172        }
173      }
174      return false;
175    }
176
177    /**
178     * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
179     * finish
180     */
181    @Override
182    public void close() {
183      executor.shutdown();
184    }
185
186    @Override
187    public void abort(String why, Throwable e) {
188      if (this.aborted) return;
189
190      this.aborted = true;
191      LOG.warn("Aborting because: " + why, e);
192      this.executor.shutdownNow();
193    }
194
195    @Override
196    public boolean isAborted() {
197      return this.aborted;
198    }
199  }
200
201  public class SimpleSubprocedure extends Subprocedure {
202    private final RegionServerServices rss;
203    private final SimpleSubprocedurePool taskManager;
204
205    public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
206        ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
207      super(member, name, errorListener, 500, 60000);
208      LOG.info("Constructing a SimpleSubprocedure.");
209      this.rss = rss;
210      this.taskManager = taskManager;
211    }
212
213    /**
214     * Callable task.
215     * TODO. We don't need a thread pool to execute roll log. This can be simplified
216     * with no use of subprocedurepool.
217     */
218    class RSSimpleTask implements Callable<Void> {
219      RSSimpleTask() {}
220
221      @Override
222      public Void call() throws Exception {
223        LOG.info("Execute subprocedure on " + rss.getServerName().toString());
224        return null;
225      }
226
227    }
228
229    private void execute() throws ForeignException {
230
231      monitor.rethrowException();
232
233      // running a task (e.g., roll log, flush table) on region server
234      taskManager.submitTask(new RSSimpleTask());
235      monitor.rethrowException();
236
237      // wait for everything to complete.
238      taskManager.waitForOutstandingTasks();
239      monitor.rethrowException();
240
241    }
242
243    @Override
244    public void acquireBarrier() throws ForeignException {
245      // do nothing, executing in inside barrier step.
246    }
247
248    /**
249     * do a log roll.
250     */
251    @Override
252    public byte[] insideBarrier() throws ForeignException {
253      execute();
254      return Bytes.toBytes(SimpleMasterProcedureManager.SIMPLE_DATA);
255    }
256
257    /**
258     * Cancel threads if they haven't finished.
259     */
260    @Override
261    public void cleanup(Exception e) {
262      taskManager.abort("Aborting simple subprocedure tasks due to error", e);
263    }
264  }
265
266}