001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.slowlog;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Random;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.NamespaceDescriptor;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Durability;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level.
044 * This can be done only optionally to record the entire history of slow/large rpc calls
045 * since RingBuffer can handle only limited latest records.
046 */
047@InterfaceAudience.Private
048public class SlowLogTableAccessor {
049
050  private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
051
052  private static final Random RANDOM = new Random();
053
054  private static Connection connection;
055
056  /**
057   * hbase:slowlog table name - can be enabled
058   * with config - hbase.regionserver.slowlog.systable.enabled
059   */
060  public static final TableName SLOW_LOG_TABLE_NAME =
061    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog");
062
063  private static void doPut(final Connection connection, final List<Put> puts)
064      throws IOException {
065    try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) {
066      table.put(puts);
067    }
068  }
069
070  /**
071   * Add slow/large log records to hbase:slowlog table
072   * @param slowLogPayloads List of SlowLogPayload to process
073   * @param configuration Configuration to use for connection
074   */
075  public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
076      final Configuration configuration) {
077    List<Put> puts = new ArrayList<>(slowLogPayloads.size());
078    for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
079      final byte[] rowKey = getRowKey(slowLogPayload);
080      final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL)
081        .setPriority(HConstants.NORMAL_QOS)
082        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"),
083          Bytes.toBytes(slowLogPayload.getCallDetails()))
084        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"),
085          Bytes.toBytes(slowLogPayload.getClientAddress()))
086        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"),
087          Bytes.toBytes(slowLogPayload.getMethodName()))
088        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"),
089          Bytes.toBytes(slowLogPayload.getParam()))
090        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"),
091          Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime())))
092        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"),
093          Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime())))
094        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"),
095          Bytes.toBytes(slowLogPayload.getRegionName()))
096        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"),
097          Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize())))
098        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"),
099          Bytes.toBytes(slowLogPayload.getServerClass()))
100        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"),
101          Bytes.toBytes(Long.toString(slowLogPayload.getStartTime())))
102        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"),
103          Bytes.toBytes(slowLogPayload.getType().name()))
104        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"),
105          Bytes.toBytes(slowLogPayload.getUserName()));
106      puts.add(put);
107    }
108    try {
109      if (connection == null) {
110        createConnection(configuration);
111      }
112      doPut(connection, puts);
113    } catch (Exception e) {
114      LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
115    }
116  }
117
118  private static synchronized void createConnection(Configuration configuration)
119      throws IOException {
120    Configuration conf = new Configuration(configuration);
121    // rpc timeout: 20s
122    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
123    // retry count: 5
124    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
125    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
126    connection = ConnectionFactory.createConnection(conf);
127  }
128
129  /**
130   * Create rowKey: currentTimeMillis APPEND slowLogPayload.hashcode
131   * Scan on slowlog table should keep records with sorted order of time, however records
132   * added at the very same time (currentTimeMillis) could be in random order.
133   *
134   * @param slowLogPayload SlowLogPayload to process
135   * @return rowKey byte[]
136   */
137  private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) {
138    String hashcode = String.valueOf(slowLogPayload.hashCode());
139    String lastFiveDig =
140      hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0);
141    if (lastFiveDig.startsWith("-")) {
142      lastFiveDig = String.valueOf(RANDOM.nextInt(99999));
143    }
144    final long currentTimeMillis = EnvironmentEdgeManager.currentTime();
145    final String timeAndHashcode = currentTimeMillis + lastFiveDig;
146    final long rowKeyLong = Long.parseLong(timeAndHashcode);
147    return Bytes.toBytes(rowKeyLong);
148  }
149
150}