001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.namequeues;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
024import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
025import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
026import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Queue;
033import java.util.concurrent.locks.ReentrantLock;
034
035/**
036 * Persistent service provider for Slow/LargeLog events
037 */
038@InterfaceAudience.Private
039public class SlowLogPersistentService {
040
041  private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
042
043  private static final ReentrantLock LOCK = new ReentrantLock();
044  private static final String SYS_TABLE_QUEUE_SIZE =
045    "hbase.regionserver.slowlog.systable.queue.size";
046  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
047  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
048
049  private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
050
051  private final Configuration configuration;
052
053  public SlowLogPersistentService(final Configuration configuration) {
054    this.configuration = configuration;
055    int sysTableQueueSize =
056      configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
057    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
058      EvictingQueue.create(sysTableQueueSize);
059    queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
060  }
061
062  public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
063    queueForSysTable.add(slowLogPayload);
064  }
065
066  /**
067   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
068   */
069  public void addAllLogsToSysTable() {
070    if (queueForSysTable == null) {
071      LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
072      return;
073    }
074    if (LOCK.isLocked()) {
075      return;
076    }
077    LOCK.lock();
078    try {
079      List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
080      int i = 0;
081      while (!queueForSysTable.isEmpty()) {
082        slowLogPayloads.add(queueForSysTable.poll());
083        i++;
084        if (i == SYSTABLE_PUT_BATCH_SIZE) {
085          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
086          slowLogPayloads.clear();
087          i = 0;
088        }
089      }
090      if (slowLogPayloads.size() > 0) {
091        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
092      }
093    } finally {
094      LOCK.unlock();
095    }
096  }
097
098}