001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.slowlog; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.ThreadLocalRandom; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.NamespaceDescriptor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.Durability; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 040 041/** 042 * Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level. 043 * This can be done only optionally to record the entire history of slow/large rpc calls since 044 * RingBuffer can handle only limited latest records. 045 */ 046@InterfaceAudience.Private 047public class SlowLogTableAccessor { 048 049 private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); 050 051 private static Connection connection; 052 053 /** 054 * hbase:slowlog table name - can be enabled with config - 055 * hbase.regionserver.slowlog.systable.enabled 056 */ 057 public static final TableName SLOW_LOG_TABLE_NAME = 058 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog"); 059 060 private static void doPut(final Connection connection, final List<Put> puts) throws IOException { 061 try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) { 062 table.put(puts); 063 } 064 } 065 066 /** 067 * Add slow/large log records to hbase:slowlog table 068 * @param slowLogPayloads List of SlowLogPayload to process 069 * @param configuration Configuration to use for connection 070 */ 071 public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads, 072 final Configuration configuration) { 073 List<Put> puts = new ArrayList<>(slowLogPayloads.size()); 074 for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { 075 final byte[] rowKey = getRowKey(slowLogPayload); 076 final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL) 077 .setPriority(HConstants.NORMAL_QOS) 078 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"), 079 Bytes.toBytes(slowLogPayload.getCallDetails())) 080 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"), 081 Bytes.toBytes(slowLogPayload.getClientAddress())) 082 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"), 083 Bytes.toBytes(slowLogPayload.getMethodName())) 084 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"), 085 Bytes.toBytes(slowLogPayload.getParam())) 086 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"), 087 Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime()))) 088 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"), 089 Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime()))) 090 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"), 091 Bytes.toBytes(slowLogPayload.getRegionName())) 092 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"), 093 Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize()))) 094 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"), 095 Bytes.toBytes(slowLogPayload.getServerClass())) 096 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"), 097 Bytes.toBytes(Long.toString(slowLogPayload.getStartTime()))) 098 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"), 099 Bytes.toBytes(slowLogPayload.getType().name())) 100 .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"), 101 Bytes.toBytes(slowLogPayload.getUserName())); 102 puts.add(put); 103 } 104 try { 105 if (connection == null) { 106 createConnection(configuration); 107 } 108 doPut(connection, puts); 109 } catch (Exception e) { 110 LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); 111 } 112 } 113 114 private static synchronized void createConnection(Configuration configuration) 115 throws IOException { 116 Configuration conf = new Configuration(configuration); 117 // rpc timeout: 20s 118 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000); 119 // retry count: 5 120 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 121 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 122 connection = ConnectionFactory.createConnection(conf); 123 } 124 125 /** 126 * Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep 127 * records with sorted order of time, however records added at the very same time could be in 128 * random order. 129 * @param slowLogPayload SlowLogPayload to process 130 * @return rowKey byte[] 131 */ 132 private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) { 133 String hashcode = String.valueOf(slowLogPayload.hashCode()); 134 String lastFiveDig = hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0); 135 if (lastFiveDig.startsWith("-")) { 136 lastFiveDig = String.valueOf(ThreadLocalRandom.current().nextInt(99999)); 137 } 138 final long currentTime = EnvironmentEdgeManager.currentTime(); 139 final String timeAndHashcode = currentTime + lastFiveDig; 140 final long rowKeyLong = Long.parseLong(timeAndHashcode); 141 return Bytes.toBytes(rowKeyLong); 142 } 143 144}