001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.slowlog; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Random; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.NamespaceDescriptor; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.Durability; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level. 044 * This can be done only optionally to record the entire history of slow/large rpc calls 045 * since RingBuffer can handle only limited latest records. 046 */ 047@InterfaceAudience.Private 048public class SlowLogTableAccessor { 049 050 private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); 051 052 private static final Random RANDOM = new Random(); 053 054 private static Connection connection; 055 056 /** 057 * hbase:slowlog table name - can be enabled 058 * with config - hbase.regionserver.slowlog.systable.enabled 059 */ 060 public static final TableName SLOW_LOG_TABLE_NAME = 061 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog"); 062 063 private static void doPut(final Connection connection, final List<Put> puts) 064 throws IOException { 065 try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) { 066 table.put(puts); 067 } 068 } 069 070 /** 071 * Add slow/large log records to hbase:slowlog table 072 * @param slowLogPayloads List of SlowLogPayload to process 073 * @param configuration Configuration to use for connection 074 */ 075 public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads, 076 final Configuration configuration) { 077 List<Put> puts = new ArrayList<>(slowLogPayloads.size()); 078 for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { 079 final byte[] rowKey = getRowKey(slowLogPayload); 080 final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL) 081 .setPriority(HConstants.NORMAL_QOS) 082 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"), 083 Bytes.toBytes(slowLogPayload.getCallDetails())) 084 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"), 085 Bytes.toBytes(slowLogPayload.getClientAddress())) 086 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"), 087 Bytes.toBytes(slowLogPayload.getMethodName())) 088 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"), 089 Bytes.toBytes(slowLogPayload.getParam())) 090 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"), 091 Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime()))) 092 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"), 093 Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime()))) 094 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"), 095 Bytes.toBytes(slowLogPayload.getRegionName())) 096 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"), 097 Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize()))) 098 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"), 099 Bytes.toBytes(slowLogPayload.getServerClass())) 100 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"), 101 Bytes.toBytes(Long.toString(slowLogPayload.getStartTime()))) 102 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"), 103 Bytes.toBytes(slowLogPayload.getType().name())) 104 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"), 105 Bytes.toBytes(slowLogPayload.getUserName())); 106 puts.add(put); 107 } 108 try { 109 if (connection == null) { 110 createConnection(configuration); 111 } 112 doPut(connection, puts); 113 } catch (Exception e) { 114 LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); 115 } 116 } 117 118 private static synchronized void createConnection(Configuration configuration) 119 throws IOException { 120 Configuration conf = new Configuration(configuration); 121 // rpc timeout: 20s 122 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000); 123 // retry count: 5 124 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 125 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 126 connection = ConnectionFactory.createConnection(conf); 127 } 128 129 /** 130 * Create rowKey: currentTimeMillis APPEND slowLogPayload.hashcode 131 * Scan on slowlog table should keep records with sorted order of time, however records 132 * added at the very same time (currentTimeMillis) could be in random order. 133 * 134 * @param slowLogPayload SlowLogPayload to process 135 * @return rowKey byte[] 136 */ 137 private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) { 138 String hashcode = String.valueOf(slowLogPayload.hashCode()); 139 String lastFiveDig = 140 hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0); 141 if (lastFiveDig.startsWith("-")) { 142 lastFiveDig = String.valueOf(RANDOM.nextInt(99999)); 143 } 144 final long currentTimeMillis = EnvironmentEdgeManager.currentTime(); 145 final String timeAndHashcode = currentTimeMillis + lastFiveDig; 146 final long rowKeyLong = Long.parseLong(timeAndHashcode); 147 return Bytes.toBytes(rowKeyLong); 148 } 149 150}