001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Queue; 023import java.util.concurrent.locks.ReentrantLock; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 031import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 034 035/** 036 * Persistent service provider for Slow/LargeLog events 037 */ 038@InterfaceAudience.Private 039public class SlowLogPersistentService { 040 041 private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class); 042 043 private static final ReentrantLock LOCK = new ReentrantLock(); 044 private static final String SYS_TABLE_QUEUE_SIZE = 045 "hbase.regionserver.slowlog.systable.queue.size"; 046 private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; 047 private static final int SYSTABLE_PUT_BATCH_SIZE = 100; 048 049 private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable; 050 051 private final Configuration configuration; 052 053 public SlowLogPersistentService(final Configuration configuration) { 054 this.configuration = configuration; 055 int sysTableQueueSize = 056 configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); 057 EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable = 058 EvictingQueue.create(sysTableQueueSize); 059 queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); 060 } 061 062 public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) { 063 queueForSysTable.add(slowLogPayload); 064 } 065 066 /** 067 * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch 068 */ 069 public void addAllLogsToSysTable() { 070 if (queueForSysTable == null) { 071 LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); 072 return; 073 } 074 if (LOCK.isLocked()) { 075 return; 076 } 077 LOCK.lock(); 078 try { 079 List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>(); 080 int i = 0; 081 while (!queueForSysTable.isEmpty()) { 082 slowLogPayloads.add(queueForSysTable.poll()); 083 i++; 084 if (i == SYSTABLE_PUT_BATCH_SIZE) { 085 SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); 086 slowLogPayloads.clear(); 087 i = 0; 088 } 089 } 090 if (slowLogPayloads.size() > 0) { 091 SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); 092 } 093 } finally { 094 LOCK.unlock(); 095 } 096 } 097 098}