001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.slowlog; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.ThreadLocalRandom; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.NamespaceDescriptor; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.Durability; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 038 039/** 040 * Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level. 041 * This can be done only optionally to record the entire history of slow/large rpc calls since 042 * RingBuffer can handle only limited latest records. 043 */ 044@InterfaceAudience.Private 045public class SlowLogTableAccessor { 046 047 private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); 048 049 /** 050 * hbase:slowlog table name - can be enabled with config - 051 * hbase.regionserver.slowlog.systable.enabled 052 */ 053 public static final TableName SLOW_LOG_TABLE_NAME = 054 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog"); 055 056 private static void doPut(final Connection connection, final List<Put> puts) throws IOException { 057 try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) { 058 table.put(puts); 059 } 060 } 061 062 /** 063 * Add slow/large log records to hbase:slowlog table 064 * @param slowLogPayloads List of SlowLogPayload to process 065 * @param connection connection 066 */ 067 public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads, 068 Connection connection) { 069 List<Put> puts = new ArrayList<>(slowLogPayloads.size()); 070 for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { 071 final byte[] rowKey = getRowKey(slowLogPayload); 072 final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL) 073 .setPriority(HConstants.NORMAL_QOS) 074 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"), 075 Bytes.toBytes(slowLogPayload.getCallDetails())) 076 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"), 077 Bytes.toBytes(slowLogPayload.getClientAddress())) 078 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"), 079 Bytes.toBytes(slowLogPayload.getMethodName())) 080 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"), 081 Bytes.toBytes(slowLogPayload.getParam())) 082 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"), 083 Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime()))) 084 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"), 085 Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime()))) 086 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"), 087 Bytes.toBytes(slowLogPayload.getRegionName())) 088 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"), 089 Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize()))) 090 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("block_bytes_scanned"), 091 Bytes.toBytes(Long.toString(slowLogPayload.getBlockBytesScanned()))) 092 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"), 093 Bytes.toBytes(slowLogPayload.getServerClass())) 094 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"), 095 Bytes.toBytes(Long.toString(slowLogPayload.getStartTime()))) 096 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"), 097 Bytes.toBytes(slowLogPayload.getType().name())) 098 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"), 099 Bytes.toBytes(slowLogPayload.getUserName())); 100 puts.add(put); 101 } 102 try { 103 doPut(connection, puts); 104 } catch (Exception e) { 105 LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); 106 } 107 } 108 109 /** 110 * Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep 111 * records with sorted order of time, however records added at the very same time could be in 112 * random order. 113 * @param slowLogPayload SlowLogPayload to process 114 * @return rowKey byte[] 115 */ 116 private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) { 117 String hashcode = String.valueOf(slowLogPayload.hashCode()); 118 String lastFiveDig = hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0); 119 if (lastFiveDig.startsWith("-")) { 120 lastFiveDig = String.valueOf(ThreadLocalRandom.current().nextInt(99999)); 121 } 122 final long currentTime = EnvironmentEdgeManager.currentTime(); 123 final String timeAndHashcode = currentTime + lastFiveDig; 124 final long rowKeyLong = Long.parseLong(timeAndHashcode); 125 return Bytes.toBytes(rowKeyLong); 126 } 127}