001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Queue;
023import java.util.concurrent.locks.ReentrantLock;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.client.Connection;
026import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
032import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
035
036/**
037 * Persistent service provider for Slow/LargeLog events
038 */
039@InterfaceAudience.Private
040public class SlowLogPersistentService {
041
042  private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
043
044  private static final ReentrantLock LOCK = new ReentrantLock();
045  private static final String SYS_TABLE_QUEUE_SIZE =
046    "hbase.regionserver.slowlog.systable.queue.size";
047  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
048  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
049
050  private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
051
052  private final Configuration configuration;
053
054  public SlowLogPersistentService(final Configuration configuration) {
055    this.configuration = configuration;
056    int sysTableQueueSize =
057      configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
058    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
059      EvictingQueue.create(sysTableQueueSize);
060    queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
061  }
062
063  public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
064    queueForSysTable.add(slowLogPayload);
065  }
066
067  /**
068   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
069   */
070  public void addAllLogsToSysTable(Connection connection) {
071    if (queueForSysTable == null) {
072      LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
073      return;
074    }
075    if (LOCK.isLocked()) {
076      return;
077    }
078    LOCK.lock();
079    try {
080      List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
081      int i = 0;
082      while (!queueForSysTable.isEmpty()) {
083        slowLogPayloads.add(queueForSysTable.poll());
084        i++;
085        if (i == SYSTABLE_PUT_BATCH_SIZE) {
086          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
087          slowLogPayloads.clear();
088          i = 0;
089        }
090      }
091      if (slowLogPayloads.size() > 0) {
092        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
093      }
094    } finally {
095      LOCK.unlock();
096    }
097  }
098
099}