001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Queue;
023import java.util.concurrent.locks.ReentrantLock;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
031import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
034
035/**
036 * Persistent service provider for Slow/LargeLog events
037 */
038@InterfaceAudience.Private
039public class SlowLogPersistentService {
040
041  private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
042
043  private static final ReentrantLock LOCK = new ReentrantLock();
044  private static final String SYS_TABLE_QUEUE_SIZE =
045    "hbase.regionserver.slowlog.systable.queue.size";
046  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
047  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
048
049  private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
050
051  private final Configuration configuration;
052
053  public SlowLogPersistentService(final Configuration configuration) {
054    this.configuration = configuration;
055    int sysTableQueueSize =
056      configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
057    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
058      EvictingQueue.create(sysTableQueueSize);
059    queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
060  }
061
062  public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
063    queueForSysTable.add(slowLogPayload);
064  }
065
066  /**
067   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
068   */
069  public void addAllLogsToSysTable() {
070    if (queueForSysTable == null) {
071      LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
072      return;
073    }
074    if (LOCK.isLocked()) {
075      return;
076    }
077    LOCK.lock();
078    try {
079      List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
080      int i = 0;
081      while (!queueForSysTable.isEmpty()) {
082        slowLogPayloads.add(queueForSysTable.poll());
083        i++;
084        if (i == SYSTABLE_PUT_BATCH_SIZE) {
085          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
086          slowLogPayloads.clear();
087          i = 0;
088        }
089      }
090      if (slowLogPayloads.size() > 0) {
091        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
092      }
093    } finally {
094      LOCK.unlock();
095    }
096  }
097
098}