001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.coprocessor;
021
022import java.io.IOException;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CoprocessorEnvironment;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Delete;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Row;
037import org.apache.hadoop.hbase.ipc.RpcServer;
038import org.apache.hadoop.hbase.metrics.MetricRegistry;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.LossyCounting;
041import org.apache.hadoop.hbase.wal.WALEdit;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
044
045/**
046 * A coprocessor that collects metrics from meta table.
047 * <p>
048 * These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb,
049 * etc) as well as JMX output.
050 * </p>
051 * @see MetaTableMetrics
052 */
053
054@InterfaceAudience.Private
055public class MetaTableMetrics implements RegionCoprocessor {
056
057  private ExampleRegionObserverMeta observer;
058  private MetricRegistry registry;
059  private LossyCounting<String> clientMetricsLossyCounting, regionMetricsLossyCounting;
060  private boolean active = false;
061  private Set<String> metrics = new HashSet<>();
062
063  enum MetaTableOps {
064    GET, PUT, DELETE,
065  }
066
067  private ImmutableMap<Class<? extends Row>, MetaTableOps> opsNameMap =
068      ImmutableMap.<Class<? extends Row>, MetaTableOps>builder()
069              .put(Put.class, MetaTableOps.PUT)
070              .put(Get.class, MetaTableOps.GET)
071              .put(Delete.class, MetaTableOps.DELETE)
072              .build();
073
074  class ExampleRegionObserverMeta implements RegionCoprocessor, RegionObserver {
075
076    @Override
077    public Optional<RegionObserver> getRegionObserver() {
078      return Optional.of(this);
079    }
080
081    @Override
082    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
083        List<Cell> results) throws IOException {
084      registerAndMarkMetrics(e, get);
085    }
086
087    @Override
088    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
089        Durability durability) throws IOException {
090      registerAndMarkMetrics(e, put);
091    }
092
093    @Override
094    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
095        WALEdit edit, Durability durability) {
096      registerAndMarkMetrics(e, delete);
097    }
098
099    private void registerAndMarkMetrics(ObserverContext<RegionCoprocessorEnvironment> e, Row row){
100      if (!active || !isMetaTableOp(e)) {
101        return;
102      }
103      tableMetricRegisterAndMark(row);
104      clientMetricRegisterAndMark();
105      regionMetricRegisterAndMark(row);
106      opMetricRegisterAndMark(row);
107      opWithClientMetricRegisterAndMark(row);
108    }
109
110    /**
111     * Get table name from Ops such as: get, put, delete.
112     * @param op such as get, put or delete.
113     */
114    private String getTableNameFromOp(Row op) {
115      final String tableRowKey = Bytes.toString(op.getRow());
116      if (StringUtils.isEmpty(tableRowKey)) {
117        return null;
118      }
119      final String[] splits = tableRowKey.split(",");
120      return splits.length > 0 ? splits[0] : null;
121    }
122
123    /**
124     * Get regionId from Ops such as: get, put, delete.
125     * @param op  such as get, put or delete.
126     */
127    private String getRegionIdFromOp(Row op) {
128      final String tableRowKey = Bytes.toString(op.getRow());
129      if (StringUtils.isEmpty(tableRowKey)) {
130        return null;
131      }
132      final String[] splits = tableRowKey.split(",");
133      return splits.length > 2 ? splits[2] : null;
134    }
135
136    private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) {
137      return TableName.META_TABLE_NAME
138          .equals(e.getEnvironment().getRegionInfo().getTable());
139    }
140
141    private void clientMetricRegisterAndMark() {
142      // Mark client metric
143      String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
144      if (clientIP == null || clientIP.isEmpty()) {
145        return;
146      }
147      String clientRequestMeter = clientRequestMeterName(clientIP);
148      clientMetricsLossyCounting.add(clientRequestMeter);
149      registerAndMarkMeter(clientRequestMeter);
150    }
151
152    private void tableMetricRegisterAndMark(Row op) {
153      // Mark table metric
154      String tableName = getTableNameFromOp(op);
155      if (tableName == null || tableName.isEmpty()) {
156        return;
157      }
158      String tableRequestMeter = tableMeterName(tableName);
159      registerAndMarkMeter(tableRequestMeter);
160    }
161
162    private void regionMetricRegisterAndMark(Row op) {
163      // Mark region metric
164      String regionId = getRegionIdFromOp(op);
165      if (regionId == null || regionId.isEmpty()) {
166        return;
167      }
168      String regionRequestMeter = regionMeterName(regionId);
169      regionMetricsLossyCounting.add(regionRequestMeter);
170      registerAndMarkMeter(regionRequestMeter);
171    }
172
173    private void opMetricRegisterAndMark(Row op) {
174      // Mark access type ["get", "put", "delete"] metric
175      String opMeterName = opMeterName(op);
176      if (opMeterName == null || opMeterName.isEmpty()) {
177        return;
178      }
179      registerAndMarkMeter(opMeterName);
180    }
181
182    private void opWithClientMetricRegisterAndMark(Object op) {
183      // // Mark client + access type metric
184      String opWithClientMeterName = opWithClientMeterName(op);
185      if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
186        return;
187      }
188      registerAndMarkMeter(opWithClientMeterName);
189    }
190
191    // Helper function to register and mark meter if not present
192    private void registerAndMarkMeter(String requestMeter) {
193      if (requestMeter.isEmpty()) {
194        return;
195      }
196      if(!registry.get(requestMeter).isPresent()){
197        metrics.add(requestMeter);
198      }
199      registry.meter(requestMeter).mark();
200    }
201
202    private String opWithClientMeterName(Object op) {
203      // Extract meter name containing the client IP
204      String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
205      if (clientIP.isEmpty()) {
206        return "";
207      }
208      MetaTableOps ops = opsNameMap.get(op.getClass());
209      String opWithClientMeterName = "";
210      switch (ops) {
211        case GET:
212          opWithClientMeterName = String.format("MetaTable_client_%s_get_request", clientIP);
213          break;
214        case PUT:
215          opWithClientMeterName = String.format("MetaTable_client_%s_put_request", clientIP);
216          break;
217        case DELETE:
218          opWithClientMeterName = String.format("MetaTable_client_%s_delete_request", clientIP);
219          break;
220        default:
221          break;
222      }
223      return opWithClientMeterName;
224    }
225
226    private String opMeterName(Object op) {
227      // Extract meter name containing the access type
228      MetaTableOps ops = opsNameMap.get(op.getClass());
229      String opMeterName = "";
230      switch (ops) {
231        case GET:
232          opMeterName = "MetaTable_get_request";
233          break;
234        case PUT:
235          opMeterName = "MetaTable_put_request";
236          break;
237        case DELETE:
238          opMeterName = "MetaTable_delete_request";
239          break;
240        default:
241          break;
242      }
243      return opMeterName;
244    }
245
246    private String tableMeterName(String tableName) {
247      // Extract meter name containing the table name
248      return String.format("MetaTable_table_%s_request", tableName);
249    }
250
251    private String clientRequestMeterName(String clientIP) {
252      // Extract meter name containing the client IP
253      if (clientIP.isEmpty()) {
254        return "";
255      }
256      return String.format("MetaTable_client_%s_lossy_request", clientIP);
257    }
258
259    private String regionMeterName(String regionId) {
260      // Extract meter name containing the region ID
261      return String.format("MetaTable_region_%s_lossy_request", regionId);
262    }
263  }
264
265  @Override
266  public Optional<RegionObserver> getRegionObserver() {
267    return Optional.of(observer);
268  }
269
270  @Override
271  public void start(CoprocessorEnvironment env) throws IOException {
272    observer = new ExampleRegionObserverMeta();
273    if (env instanceof RegionCoprocessorEnvironment
274        && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
275        && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
276          .equals(TableName.META_TABLE_NAME)) {
277      RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
278      registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
279      LossyCounting.LossyCountingListener<String> listener = key -> {
280        registry.remove(key);
281        metrics.remove(key);
282      };
283      final Configuration conf = regionCoprocessorEnv.getConfiguration();
284      clientMetricsLossyCounting = new LossyCounting<>("clientMetaMetrics", conf, listener);
285      regionMetricsLossyCounting = new LossyCounting<>("regionMetaMetrics", conf, listener);
286      // only be active mode when this region holds meta table.
287      active = true;
288    }
289  }
290
291  @Override
292  public void stop(CoprocessorEnvironment env) throws IOException {
293    // since meta region can move around, clear stale metrics when stop.
294    for(String metric:metrics){
295      registry.remove(metric);
296    }
297  }
298}