001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Queue; 023import java.util.concurrent.locks.ReentrantLock; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 032import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 035 036/** 037 * Persistent service provider for Slow/LargeLog events 038 */ 039@InterfaceAudience.Private 040public class SlowLogPersistentService { 041 042 private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class); 043 044 private static final ReentrantLock LOCK = new ReentrantLock(); 045 private static final String SYS_TABLE_QUEUE_SIZE = 046 "hbase.regionserver.slowlog.systable.queue.size"; 047 private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; 048 private static final int SYSTABLE_PUT_BATCH_SIZE = 100; 049 050 private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable; 051 052 private final Configuration configuration; 053 054 public SlowLogPersistentService(final Configuration configuration) { 055 this.configuration = configuration; 056 int sysTableQueueSize = 057 configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); 058 EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable = 059 EvictingQueue.create(sysTableQueueSize); 060 queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); 061 } 062 063 public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) { 064 queueForSysTable.add(slowLogPayload); 065 } 066 067 /** 068 * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch 069 */ 070 public void addAllLogsToSysTable(Connection connection) { 071 if (queueForSysTable == null) { 072 LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); 073 return; 074 } 075 if (LOCK.isLocked()) { 076 return; 077 } 078 LOCK.lock(); 079 try { 080 List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>(); 081 int i = 0; 082 while (!queueForSysTable.isEmpty()) { 083 slowLogPayloads.add(queueForSysTable.poll()); 084 i++; 085 if (i == SYSTABLE_PUT_BATCH_SIZE) { 086 SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection); 087 slowLogPayloads.clear(); 088 i = 0; 089 } 090 } 091 if (slowLogPayloads.size() > 0) { 092 SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection); 093 } 094 } finally { 095 LOCK.unlock(); 096 } 097 } 098 099}