001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.ThreadFactory;
022import java.util.concurrent.ThreadPoolExecutor;
023import java.util.concurrent.TimeUnit;
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * A ThreadPoolExecutor customized for working with HBase thrift to update metrics before and after
028 * the execution of a task.
029 */
030
031@InterfaceAudience.Private
032public class THBaseThreadPoolExecutor extends ThreadPoolExecutor {
033
034  private ThriftMetrics metrics;
035
036  public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
037    TimeUnit unit, BlockingQueue<Runnable> workQueue, ThriftMetrics metrics) {
038    this(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, null, metrics);
039  }
040
041  public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
042    TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
043    ThriftMetrics metrics) {
044    super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
045    if (threadFactory != null) {
046      setThreadFactory(threadFactory);
047    }
048    this.metrics = metrics;
049  }
050
051  @Override
052  protected void beforeExecute(Thread t, Runnable r) {
053    super.beforeExecute(t, r);
054    metrics.incActiveWorkerCount();
055  }
056
057  @Override
058  protected void afterExecute(Runnable r, Throwable t) {
059    metrics.decActiveWorkerCount();
060    super.afterExecute(r, t);
061  }
062}