001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.util.concurrent.atomic.AtomicLong;
021import org.apache.hadoop.hbase.HBaseInterfaceAudience;
022import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
023import org.apache.hadoop.hbase.regionserver.RegionScanner;
024import org.apache.hadoop.hbase.regionserver.ScannerContext;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.yetus.audience.InterfaceStability;
027
028/**
029 * Thread-local storage for server-side scan metrics that captures performance data separately for
030 * each scan thread. This class works in conjunction with {@link ServerSideScanMetrics} to provide
031 * comprehensive scan performance monitoring.
032 * <h3>Purpose and Design</h3> {@link ServerSideScanMetrics} captures scan metrics on the server
033 * side and passes them back to the client in protocol buffer responses. However, the
034 * {@link ServerSideScanMetrics} instance is not readily available at deeper layers in HBase where
035 * HFiles are read and individual HFile blocks are accessed. To avoid the complexity of passing
036 * {@link ServerSideScanMetrics} instances through method calls across multiple layers, this class
037 * provides thread-local storage for metrics collection.
038 * <h3>Thread Safety and HBase Architecture</h3> This class leverages a critical aspect of HBase
039 * server design: on the server side, the thread that opens a {@link RegionScanner} and calls
040 * {@link RegionScanner#nextRaw(java.util.List, ScannerContext)} is the same thread that reads HFile
041 * blocks. This design allows thread-local storage to effectively capture metrics without
042 * cross-thread synchronization.
043 * <h3>Special Handling for Parallel Operations</h3> The only deviation from the single-thread model
044 * occurs when {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} is used for
045 * parallel store file seeking. In this case, special handling ensures that metrics are captured
046 * correctly across multiple threads. The
047 * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} captures metrics from
048 * worker threads and aggregates them back to the main scan thread. Please refer to the javadoc of
049 * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} for detailed information
050 * about this parallel processing mechanism.
051 * <h3>Usage Pattern</h3>
052 * <ol>
053 * <li>Enable metrics collection: {@link #setScanMetricsEnabled(boolean)}</li>
054 * <li>Reset counters at scan start: {@link #reset()}</li>
055 * <li>Increment counters during I/O operations using the various {@code add*} methods</li>
056 * <li>Populate the main metrics object:
057 * {@link #populateServerSideScanMetrics(ServerSideScanMetrics)}</li>
058 * </ol>
059 * <h3>Thread Safety</h3> This class is thread-safe. Each thread maintains its own set of counters
060 * through {@link ThreadLocal} storage, ensuring that metrics from different scan operations do not
061 * interfere with each other.
062 * @see ServerSideScanMetrics
063 * @see RegionScanner
064 * @see org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler
065 */
066@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
067@InterfaceStability.Evolving
068public final class ThreadLocalServerSideScanMetrics {
069  private ThreadLocalServerSideScanMetrics() {
070  }
071
072  private static final ThreadLocal<Boolean> IS_SCAN_METRICS_ENABLED =
073    ThreadLocal.withInitial(() -> false);
074
075  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_FS =
076    ThreadLocal.withInitial(() -> new AtomicLong(0));
077
078  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_BLOCK_CACHE =
079    ThreadLocal.withInitial(() -> new AtomicLong(0));
080
081  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_MEMSTORE =
082    ThreadLocal.withInitial(() -> new AtomicLong(0));
083
084  private static final ThreadLocal<AtomicLong> BLOCK_READ_OPS_COUNT =
085    ThreadLocal.withInitial(() -> new AtomicLong(0));
086
087  private static final ThreadLocal<AtomicLong> FS_READ_TIME =
088    ThreadLocal.withInitial(() -> new AtomicLong(0));
089
090  public static void setScanMetricsEnabled(boolean enable) {
091    IS_SCAN_METRICS_ENABLED.set(enable);
092  }
093
094  public static long addBytesReadFromFs(long bytes) {
095    return BYTES_READ_FROM_FS.get().addAndGet(bytes);
096  }
097
098  public static long addBytesReadFromBlockCache(long bytes) {
099    return BYTES_READ_FROM_BLOCK_CACHE.get().addAndGet(bytes);
100  }
101
102  public static long addBytesReadFromMemstore(long bytes) {
103    return BYTES_READ_FROM_MEMSTORE.get().addAndGet(bytes);
104  }
105
106  public static long addBlockReadOpsCount(long count) {
107    return BLOCK_READ_OPS_COUNT.get().addAndGet(count);
108  }
109
110  public static long addFsReadTime(long time) {
111    return FS_READ_TIME.get().addAndGet(time);
112  }
113
114  public static boolean isScanMetricsEnabled() {
115    return IS_SCAN_METRICS_ENABLED.get();
116  }
117
118  public static AtomicLong getBytesReadFromFsCounter() {
119    return BYTES_READ_FROM_FS.get();
120  }
121
122  public static AtomicLong getBytesReadFromBlockCacheCounter() {
123    return BYTES_READ_FROM_BLOCK_CACHE.get();
124  }
125
126  public static AtomicLong getBytesReadFromMemstoreCounter() {
127    return BYTES_READ_FROM_MEMSTORE.get();
128  }
129
130  public static AtomicLong getBlockReadOpsCountCounter() {
131    return BLOCK_READ_OPS_COUNT.get();
132  }
133
134  public static AtomicLong getFsReadTimeCounter() {
135    return FS_READ_TIME.get();
136  }
137
138  public static long getBytesReadFromFsAndReset() {
139    return getBytesReadFromFsCounter().getAndSet(0);
140  }
141
142  public static long getBytesReadFromBlockCacheAndReset() {
143    return getBytesReadFromBlockCacheCounter().getAndSet(0);
144  }
145
146  public static long getBytesReadFromMemstoreAndReset() {
147    return getBytesReadFromMemstoreCounter().getAndSet(0);
148  }
149
150  public static long getBlockReadOpsCountAndReset() {
151    return getBlockReadOpsCountCounter().getAndSet(0);
152  }
153
154  public static long getFsReadTimeAndReset() {
155    return getFsReadTimeCounter().getAndSet(0);
156  }
157
158  public static void reset() {
159    getBytesReadFromFsAndReset();
160    getBytesReadFromBlockCacheAndReset();
161    getBytesReadFromMemstoreAndReset();
162    getBlockReadOpsCountAndReset();
163    getFsReadTimeAndReset();
164  }
165
166  public static void populateServerSideScanMetrics(ServerSideScanMetrics metrics) {
167    if (metrics == null) {
168      return;
169    }
170    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME,
171      getBytesReadFromFsCounter().get());
172    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME,
173      getBytesReadFromBlockCacheCounter().get());
174    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME,
175      getBytesReadFromMemstoreCounter().get());
176    metrics.addToCounter(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
177      getBlockReadOpsCountCounter().get());
178    metrics.addToCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
179      getFsReadTimeCounter().get());
180  }
181}