001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.ThreadFactory;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import org.apache.yetus.audience.InterfaceAudience;
026
027/**
028 * A ThreadPoolExecutor customized for working with HBase thrift to update metrics before and
029 * after the execution of a task.
030 */
031
032@InterfaceAudience.Private
033public class THBaseThreadPoolExecutor extends ThreadPoolExecutor {
034
035  private ThriftMetrics metrics;
036
037  public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
038      TimeUnit unit, BlockingQueue<Runnable> workQueue, ThriftMetrics metrics) {
039    this(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, null, metrics);
040  }
041
042  public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
043      TimeUnit unit, BlockingQueue<Runnable> workQueue,
044      ThreadFactory threadFactory,ThriftMetrics metrics) {
045    super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
046    if (threadFactory != null) {
047      setThreadFactory(threadFactory);
048    }
049    this.metrics = metrics;
050  }
051
052  @Override
053  protected void beforeExecute(Thread t, Runnable r) {
054    super.beforeExecute(t, r);
055    metrics.incActiveWorkerCount();
056  }
057
058  @Override
059  protected void afterExecute(Runnable r, Throwable t) {
060    metrics.decActiveWorkerCount();
061    super.afterExecute(r, t);
062  }
063}