001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CoprocessorEnvironment;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Delete;
031import org.apache.hadoop.hbase.client.Durability;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Row;
035import org.apache.hadoop.hbase.ipc.RpcServer;
036import org.apache.hadoop.hbase.metrics.MetricRegistry;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.LossyCounting;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.yetus.audience.InterfaceAudience;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
043
044/**
045 * A coprocessor that collects metrics from meta table.
046 * <p>
047 * These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb,
048 * etc) as well as JMX output.
049 * </p>
050 * @see MetaTableMetrics
051 */
052
053@InterfaceAudience.Private
054public class MetaTableMetrics implements RegionCoprocessor {
055
056  private ExampleRegionObserverMeta observer;
057  private MetricRegistry registry;
058  private LossyCounting<String> clientMetricsLossyCounting, regionMetricsLossyCounting;
059  private boolean active = false;
060  private Set<String> metrics = ConcurrentHashMap.newKeySet();
061
062  enum MetaTableOps {
063    GET,
064    PUT,
065    DELETE,
066  }
067
068  private ImmutableMap<Class<? extends Row>, MetaTableOps> opsNameMap =
069    ImmutableMap.<Class<? extends Row>, MetaTableOps> builder().put(Put.class, MetaTableOps.PUT)
070      .put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build();
071
072  class ExampleRegionObserverMeta implements RegionCoprocessor, RegionObserver {
073
074    @Override
075    public Optional<RegionObserver> getRegionObserver() {
076      return Optional.of(this);
077    }
078
079    @Override
080    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
081      List<Cell> results) throws IOException {
082      registerAndMarkMetrics(e, get);
083    }
084
085    @Override
086    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> e, Put put,
087      WALEdit edit, Durability durability) throws IOException {
088      registerAndMarkMetrics(e, put);
089    }
090
091    @Override
092    public void preDelete(ObserverContext<? extends RegionCoprocessorEnvironment> e, Delete delete,
093      WALEdit edit, Durability durability) {
094      registerAndMarkMetrics(e, delete);
095    }
096
097    private void registerAndMarkMetrics(ObserverContext<? extends RegionCoprocessorEnvironment> e,
098      Row row) {
099      if (!active || !isMetaTableOp(e)) {
100        return;
101      }
102      tableMetricRegisterAndMark(row);
103      clientMetricRegisterAndMark();
104      regionMetricRegisterAndMark(row);
105      opMetricRegisterAndMark(row);
106      opWithClientMetricRegisterAndMark(row);
107    }
108
109    /**
110     * Get table name from Ops such as: get, put, delete.
111     * @param op such as get, put or delete.
112     */
113    private String getTableNameFromOp(Row op) {
114      final String tableRowKey = Bytes.toString(op.getRow());
115      if (StringUtils.isEmpty(tableRowKey)) {
116        return null;
117      }
118      final String[] splits = tableRowKey.split(",");
119      return splits.length > 0 ? splits[0] : null;
120    }
121
122    /**
123     * Get regionId from Ops such as: get, put, delete.
124     * @param op such as get, put or delete.
125     */
126    private String getRegionIdFromOp(Row op) {
127      final String tableRowKey = Bytes.toString(op.getRow());
128      if (StringUtils.isEmpty(tableRowKey)) {
129        return null;
130      }
131      final String[] splits = tableRowKey.split(",");
132      return splits.length > 2 ? splits[2] : null;
133    }
134
135    private boolean isMetaTableOp(ObserverContext<? extends RegionCoprocessorEnvironment> e) {
136      return TableName.META_TABLE_NAME.equals(e.getEnvironment().getRegionInfo().getTable());
137    }
138
139    private void clientMetricRegisterAndMark() {
140      // Mark client metric
141      String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
142      if (clientIP == null || clientIP.isEmpty()) {
143        return;
144      }
145      String clientRequestMeter = clientRequestMeterName(clientIP);
146      clientMetricsLossyCounting.add(clientRequestMeter);
147      registerAndMarkMeter(clientRequestMeter);
148    }
149
150    private void tableMetricRegisterAndMark(Row op) {
151      // Mark table metric
152      String tableName = getTableNameFromOp(op);
153      if (tableName == null || tableName.isEmpty()) {
154        return;
155      }
156      String tableRequestMeter = tableMeterName(tableName);
157      registerAndMarkMeter(tableRequestMeter);
158    }
159
160    private void regionMetricRegisterAndMark(Row op) {
161      // Mark region metric
162      String regionId = getRegionIdFromOp(op);
163      if (regionId == null || regionId.isEmpty()) {
164        return;
165      }
166      String regionRequestMeter = regionMeterName(regionId);
167      regionMetricsLossyCounting.add(regionRequestMeter);
168      registerAndMarkMeter(regionRequestMeter);
169    }
170
171    private void opMetricRegisterAndMark(Row op) {
172      // Mark access type ["get", "put", "delete"] metric
173      String opMeterName = opMeterName(op);
174      if (opMeterName == null || opMeterName.isEmpty()) {
175        return;
176      }
177      registerAndMarkMeter(opMeterName);
178    }
179
180    private void opWithClientMetricRegisterAndMark(Object op) {
181      // // Mark client + access type metric
182      String opWithClientMeterName = opWithClientMeterName(op);
183      if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
184        return;
185      }
186      registerAndMarkMeter(opWithClientMeterName);
187    }
188
189    // Helper function to register and mark meter if not present
190    private void registerAndMarkMeter(String requestMeter) {
191      if (requestMeter.isEmpty()) {
192        return;
193      }
194      if (!registry.get(requestMeter).isPresent()) {
195        metrics.add(requestMeter);
196      }
197      registry.meter(requestMeter).mark();
198    }
199
200    private String opWithClientMeterName(Object op) {
201      // Extract meter name containing the client IP
202      String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
203      if (clientIP.isEmpty()) {
204        return "";
205      }
206      MetaTableOps ops = opsNameMap.get(op.getClass());
207      if (ops == null) {
208        return "";
209      }
210      switch (ops) {
211        case GET:
212          return String.format("MetaTable_client_%s_get_request", clientIP);
213        case PUT:
214          return String.format("MetaTable_client_%s_put_request", clientIP);
215        case DELETE:
216          return String.format("MetaTable_client_%s_delete_request", clientIP);
217        default:
218          return "";
219      }
220    }
221
222    private String opMeterName(Object op) {
223      // Extract meter name containing the access type
224      MetaTableOps ops = opsNameMap.get(op.getClass());
225      if (ops == null) {
226        return "";
227      }
228      switch (ops) {
229        case GET:
230          return "MetaTable_get_request";
231        case PUT:
232          return "MetaTable_put_request";
233        case DELETE:
234          return "MetaTable_delete_request";
235        default:
236          return "";
237      }
238    }
239
240    private String tableMeterName(String tableName) {
241      // Extract meter name containing the table name
242      return String.format("MetaTable_table_%s_request", tableName);
243    }
244
245    private String clientRequestMeterName(String clientIP) {
246      // Extract meter name containing the client IP
247      if (clientIP.isEmpty()) {
248        return "";
249      }
250      return String.format("MetaTable_client_%s_lossy_request", clientIP);
251    }
252
253    private String regionMeterName(String regionId) {
254      // Extract meter name containing the region ID
255      return String.format("MetaTable_region_%s_lossy_request", regionId);
256    }
257  }
258
259  @Override
260  public Optional<RegionObserver> getRegionObserver() {
261    return Optional.of(observer);
262  }
263
264  @Override
265  public void start(CoprocessorEnvironment env) throws IOException {
266    observer = new ExampleRegionObserverMeta();
267    if (
268      env instanceof RegionCoprocessorEnvironment
269        && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
270        && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
271          .equals(TableName.META_TABLE_NAME)
272    ) {
273      RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
274      registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
275      LossyCounting.LossyCountingListener<String> listener = key -> {
276        registry.remove(key);
277        metrics.remove(key);
278      };
279      final Configuration conf = regionCoprocessorEnv.getConfiguration();
280      clientMetricsLossyCounting = new LossyCounting<>("clientMetaMetrics", conf, listener);
281      regionMetricsLossyCounting = new LossyCounting<>("regionMetaMetrics", conf, listener);
282      // only be active mode when this region holds meta table.
283      active = true;
284    }
285  }
286
287  @Override
288  public void stop(CoprocessorEnvironment env) throws IOException {
289    // since meta region can move around, clear stale metrics when stop.
290    for (String metric : metrics) {
291      registry.remove(metric);
292    }
293  }
294}