001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.hbtop.mode;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.stream.Collectors;
029
030import org.apache.hadoop.hbase.ClusterMetrics;
031import org.apache.hadoop.hbase.ServerMetrics;
032import org.apache.hadoop.hbase.UserMetrics;
033import org.apache.hadoop.hbase.hbtop.Record;
034import org.apache.hadoop.hbase.hbtop.RecordFilter;
035import org.apache.hadoop.hbase.hbtop.field.Field;
036import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
037import org.apache.hadoop.hbase.hbtop.field.FieldValue;
038import org.apache.hadoop.hbase.hbtop.field.FieldValueType;
039import org.apache.yetus.audience.InterfaceAudience;
040
041/**
042 * Implementation for {@link ModeStrategy} for client Mode.
043 */
044@InterfaceAudience.Private public final class ClientModeStrategy implements ModeStrategy {
045
046  private final List<FieldInfo> fieldInfos = Arrays
047      .asList(new FieldInfo(Field.CLIENT, 0, true),
048          new FieldInfo(Field.USER_COUNT, 5, true),
049          new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
050          new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
051          new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
052          new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
053  private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();
054
055  ClientModeStrategy() {
056  }
057
058  @Override public List<FieldInfo> getFieldInfos() {
059    return fieldInfos;
060  }
061
062  @Override public Field getDefaultSortField() {
063    return Field.REQUEST_COUNT_PER_SECOND;
064  }
065
066  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
067      List<RecordFilter> pushDownFilters) {
068    List<Record> records = createRecords(clusterMetrics);
069    return aggregateRecordsAndAddDistinct(
070        ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER,
071        Field.USER_COUNT);
072  }
073
074  List<Record> createRecords(ClusterMetrics clusterMetrics) {
075    List<Record> ret = new ArrayList<>();
076    for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
077      long lastReportTimestamp = serverMetrics.getLastReportTimestamp();
078      serverMetrics.getUserMetrics().values().forEach(um -> um.getClientMetrics().values().forEach(
079        clientMetrics -> ret.add(
080              createRecord(um.getNameAsString(), clientMetrics, lastReportTimestamp,
081                  serverMetrics.getServerName().getServerName()))));
082    }
083    return ret;
084  }
085
086  /**
087   * Aggregate the records and count the unique values for the given distinctField
088   *
089   * @param records               records to be processed
090   * @param groupBy               Field on which group by needs to be done
091   * @param distinctField         Field whose unique values needs to be counted
092   * @param uniqueCountAssignedTo a target field to which the unique count is assigned to
093   * @return aggregated records
094   */
095  List<Record> aggregateRecordsAndAddDistinct(List<Record> records, Field groupBy,
096      Field distinctField, Field uniqueCountAssignedTo) {
097    List<Record> result = new ArrayList<>();
098    records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values()
099        .forEach(val -> {
100          Set<FieldValue> distinctValues = new HashSet<>();
101          Map<Field, FieldValue> map = new HashMap<>();
102          for (Record record : val) {
103            for (Map.Entry<Field, FieldValue> field : record.entrySet()) {
104              if (distinctField.equals(field.getKey())) {
105                //We will not be adding the field in the new record whose distinct count is required
106                distinctValues.add(record.get(distinctField));
107              } else {
108                if (field.getKey().getFieldValueType() == FieldValueType.STRING) {
109                  map.put(field.getKey(), field.getValue());
110                } else {
111                  if (map.get(field.getKey()) == null) {
112                    map.put(field.getKey(), field.getValue());
113                  } else {
114                    map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue()));
115                  }
116                }
117              }
118            }
119          }
120          // Add unique count field
121          map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size()));
122          result.add(Record.ofEntries(map.entrySet().stream()
123            .map(k -> Record.entry(k.getKey(), k.getValue()))));
124        });
125    return result;
126  }
127
128  Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
129      long lastReportTimestamp, String server) {
130    Record.Builder builder = Record.builder();
131    String client = clientMetrics.getHostName();
132    builder.put(Field.CLIENT, clientMetrics.getHostName());
133    String mapKey = client + "$" + user + "$" + server;
134    RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey);
135    if (requestCountPerSecond == null) {
136      requestCountPerSecond = new RequestCountPerSecond();
137      requestCountPerSecondMap.put(mapKey, requestCountPerSecond);
138    }
139    requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(),
140        clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount());
141    builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond());
142    builder.put(Field.READ_REQUEST_COUNT_PER_SECOND,
143        requestCountPerSecond.getReadRequestCountPerSecond());
144    builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND,
145        requestCountPerSecond.getWriteRequestCountPerSecond());
146    builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
147        requestCountPerSecond.getFilteredReadRequestCountPerSecond());
148    builder.put(Field.USER, user);
149    return builder.build();
150  }
151
152  @Override public DrillDownInfo drillDown(Record selectedRecord) {
153    List<RecordFilter> initialFilters = Collections.singletonList(
154        RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT)));
155    return new DrillDownInfo(Mode.USER, initialFilters);
156  }
157}