001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.handler;
019
020import java.io.IOException;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.hbase.ExtendedCell;
024import org.apache.hadoop.hbase.executor.EventHandler;
025import org.apache.hadoop.hbase.executor.EventType;
026import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
027import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Handler to seek storefiles in parallel.
034 */
035@InterfaceAudience.Private
036public class ParallelSeekHandler extends EventHandler {
037  private static final Logger LOG = LoggerFactory.getLogger(ParallelSeekHandler.class);
038  private KeyValueScanner scanner;
039  private ExtendedCell keyValue;
040  private long readPoint;
041  private CountDownLatch latch;
042  private Throwable err = null;
043
044  // Flag to enable/disable scan metrics collection and thread-local counters for capturing scan
045  // performance during parallel store file seeking.
046  // These aggregate metrics from worker threads back to the main scan thread.
047  private final boolean isScanMetricsEnabled;
048  // Thread-local counter for bytes read from FS.
049  private final AtomicLong bytesReadFromFs;
050  // Thread-local counter for bytes read from BlockCache.
051  private final AtomicLong bytesReadFromBlockCache;
052  // Thread-local counter for block read operations count.
053  private final AtomicLong blockReadOpsCount;
054
055  public ParallelSeekHandler(KeyValueScanner scanner, ExtendedCell keyValue, long readPoint,
056    CountDownLatch latch) {
057    super(null, EventType.RS_PARALLEL_SEEK);
058    this.scanner = scanner;
059    this.keyValue = keyValue;
060    this.readPoint = readPoint;
061    this.latch = latch;
062    this.isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
063    this.bytesReadFromFs = ThreadLocalServerSideScanMetrics.getBytesReadFromFsCounter();
064    this.bytesReadFromBlockCache =
065      ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheCounter();
066    this.blockReadOpsCount = ThreadLocalServerSideScanMetrics.getBlockReadOpsCountCounter();
067  }
068
069  @Override
070  public void process() {
071    try {
072      ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
073      if (isScanMetricsEnabled) {
074        ThreadLocalServerSideScanMetrics.reset();
075      }
076      scanner.seek(keyValue);
077      if (isScanMetricsEnabled) {
078        long metricValue = ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
079        if (metricValue > 0) {
080          bytesReadFromFs.addAndGet(metricValue);
081        }
082        metricValue = ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset();
083        if (metricValue > 0) {
084          bytesReadFromBlockCache.addAndGet(metricValue);
085        }
086        metricValue = ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
087        if (metricValue > 0) {
088          blockReadOpsCount.addAndGet(metricValue);
089        }
090      }
091    } catch (IOException e) {
092      LOG.error("", e);
093      setErr(e);
094    } finally {
095      latch.countDown();
096    }
097  }
098
099  public Throwable getErr() {
100    return err;
101  }
102
103  public void setErr(Throwable err) {
104    this.err = err;
105  }
106}