001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.util.concurrent.atomic.AtomicLong;
021import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
022import org.apache.hadoop.hbase.regionserver.RegionScanner;
023import org.apache.hadoop.hbase.regionserver.ScannerContext;
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Thread-local storage for server-side scan metrics that captures performance data separately for
028 * each scan thread. This class works in conjunction with {@link ServerSideScanMetrics} to provide
029 * comprehensive scan performance monitoring.
030 * <h3>Purpose and Design</h3> {@link ServerSideScanMetrics} captures scan metrics on the server
031 * side and passes them back to the client in protocol buffer responses. However, the
032 * {@link ServerSideScanMetrics} instance is not readily available at deeper layers in HBase where
033 * HFiles are read and individual HFile blocks are accessed. To avoid the complexity of passing
034 * {@link ServerSideScanMetrics} instances through method calls across multiple layers, this class
035 * provides thread-local storage for metrics collection.
036 * <h3>Thread Safety and HBase Architecture</h3> This class leverages a critical aspect of HBase
037 * server design: on the server side, the thread that opens a {@link RegionScanner} and calls
038 * {@link RegionScanner#nextRaw(java.util.List, ScannerContext)} is the same thread that reads HFile
039 * blocks. This design allows thread-local storage to effectively capture metrics without
040 * cross-thread synchronization.
041 * <h3>Special Handling for Parallel Operations</h3> The only deviation from the single-thread model
042 * occurs when {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} is used for
043 * parallel store file seeking. In this case, special handling ensures that metrics are captured
044 * correctly across multiple threads. The
045 * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} captures metrics from
046 * worker threads and aggregates them back to the main scan thread. Please refer to the javadoc of
047 * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} for detailed information
048 * about this parallel processing mechanism.
049 * <h3>Usage Pattern</h3>
050 * <ol>
051 * <li>Enable metrics collection: {@link #setScanMetricsEnabled(boolean)}</li>
052 * <li>Reset counters at scan start: {@link #reset()}</li>
053 * <li>Increment counters during I/O operations using the various {@code add*} methods</li>
054 * <li>Populate the main metrics object:
055 * {@link #populateServerSideScanMetrics(ServerSideScanMetrics)}</li>
056 * </ol>
057 * <h3>Thread Safety</h3> This class is thread-safe. Each thread maintains its own set of counters
058 * through {@link ThreadLocal} storage, ensuring that metrics from different scan operations do not
059 * interfere with each other.
060 * @see ServerSideScanMetrics
061 * @see RegionScanner
062 * @see org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler
063 */
064@InterfaceAudience.Private
065public final class ThreadLocalServerSideScanMetrics {
066  private ThreadLocalServerSideScanMetrics() {
067  }
068
069  private static final ThreadLocal<Boolean> IS_SCAN_METRICS_ENABLED =
070    ThreadLocal.withInitial(() -> false);
071
072  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_FS =
073    ThreadLocal.withInitial(() -> new AtomicLong(0));
074
075  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_BLOCK_CACHE =
076    ThreadLocal.withInitial(() -> new AtomicLong(0));
077
078  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_MEMSTORE =
079    ThreadLocal.withInitial(() -> new AtomicLong(0));
080
081  private static final ThreadLocal<AtomicLong> BLOCK_READ_OPS_COUNT =
082    ThreadLocal.withInitial(() -> new AtomicLong(0));
083
084  public static void setScanMetricsEnabled(boolean enable) {
085    IS_SCAN_METRICS_ENABLED.set(enable);
086  }
087
088  public static long addBytesReadFromFs(long bytes) {
089    return BYTES_READ_FROM_FS.get().addAndGet(bytes);
090  }
091
092  public static long addBytesReadFromBlockCache(long bytes) {
093    return BYTES_READ_FROM_BLOCK_CACHE.get().addAndGet(bytes);
094  }
095
096  public static long addBytesReadFromMemstore(long bytes) {
097    return BYTES_READ_FROM_MEMSTORE.get().addAndGet(bytes);
098  }
099
100  public static long addBlockReadOpsCount(long count) {
101    return BLOCK_READ_OPS_COUNT.get().addAndGet(count);
102  }
103
104  public static boolean isScanMetricsEnabled() {
105    return IS_SCAN_METRICS_ENABLED.get();
106  }
107
108  public static AtomicLong getBytesReadFromFsCounter() {
109    return BYTES_READ_FROM_FS.get();
110  }
111
112  public static AtomicLong getBytesReadFromBlockCacheCounter() {
113    return BYTES_READ_FROM_BLOCK_CACHE.get();
114  }
115
116  public static AtomicLong getBytesReadFromMemstoreCounter() {
117    return BYTES_READ_FROM_MEMSTORE.get();
118  }
119
120  public static AtomicLong getBlockReadOpsCountCounter() {
121    return BLOCK_READ_OPS_COUNT.get();
122  }
123
124  public static long getBytesReadFromFsAndReset() {
125    return getBytesReadFromFsCounter().getAndSet(0);
126  }
127
128  public static long getBytesReadFromBlockCacheAndReset() {
129    return getBytesReadFromBlockCacheCounter().getAndSet(0);
130  }
131
132  public static long getBytesReadFromMemstoreAndReset() {
133    return getBytesReadFromMemstoreCounter().getAndSet(0);
134  }
135
136  public static long getBlockReadOpsCountAndReset() {
137    return getBlockReadOpsCountCounter().getAndSet(0);
138  }
139
140  public static void reset() {
141    getBytesReadFromFsAndReset();
142    getBytesReadFromBlockCacheAndReset();
143    getBytesReadFromMemstoreAndReset();
144    getBlockReadOpsCountAndReset();
145  }
146
147  public static void populateServerSideScanMetrics(ServerSideScanMetrics metrics) {
148    if (metrics == null) {
149      return;
150    }
151    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME,
152      getBytesReadFromFsCounter().get());
153    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME,
154      getBytesReadFromBlockCacheCounter().get());
155    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME,
156      getBytesReadFromMemstoreCounter().get());
157    metrics.addToCounter(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
158      getBlockReadOpsCountCounter().get());
159  }
160}