001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.handler; 019 020import java.io.IOException; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.hbase.ExtendedCell; 024import org.apache.hadoop.hbase.executor.EventHandler; 025import org.apache.hadoop.hbase.executor.EventType; 026import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 027import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Handler to seek storefiles in parallel. 034 */ 035@InterfaceAudience.Private 036public class ParallelSeekHandler extends EventHandler { 037 private static final Logger LOG = LoggerFactory.getLogger(ParallelSeekHandler.class); 038 private KeyValueScanner scanner; 039 private ExtendedCell keyValue; 040 private long readPoint; 041 private CountDownLatch latch; 042 private Throwable err = null; 043 044 // Flag to enable/disable scan metrics collection and thread-local counters for capturing scan 045 // performance during parallel store file seeking. 046 // These aggregate metrics from worker threads back to the main scan thread. 047 private final boolean isScanMetricsEnabled; 048 // Thread-local counter for bytes read from FS. 049 private final AtomicLong bytesReadFromFs; 050 // Thread-local counter for bytes read from BlockCache. 051 private final AtomicLong bytesReadFromBlockCache; 052 // Thread-local counter for block read operations count. 053 private final AtomicLong blockReadOpsCount; 054 055 public ParallelSeekHandler(KeyValueScanner scanner, ExtendedCell keyValue, long readPoint, 056 CountDownLatch latch) { 057 super(null, EventType.RS_PARALLEL_SEEK); 058 this.scanner = scanner; 059 this.keyValue = keyValue; 060 this.readPoint = readPoint; 061 this.latch = latch; 062 this.isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled(); 063 this.bytesReadFromFs = ThreadLocalServerSideScanMetrics.getBytesReadFromFsCounter(); 064 this.bytesReadFromBlockCache = 065 ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheCounter(); 066 this.blockReadOpsCount = ThreadLocalServerSideScanMetrics.getBlockReadOpsCountCounter(); 067 } 068 069 @Override 070 public void process() { 071 try { 072 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled); 073 if (isScanMetricsEnabled) { 074 ThreadLocalServerSideScanMetrics.reset(); 075 } 076 scanner.seek(keyValue); 077 if (isScanMetricsEnabled) { 078 long metricValue = ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 079 if (metricValue > 0) { 080 bytesReadFromFs.addAndGet(metricValue); 081 } 082 metricValue = ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset(); 083 if (metricValue > 0) { 084 bytesReadFromBlockCache.addAndGet(metricValue); 085 } 086 metricValue = ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 087 if (metricValue > 0) { 088 blockReadOpsCount.addAndGet(metricValue); 089 } 090 } 091 } catch (IOException e) { 092 LOG.error("", e); 093 setErr(e); 094 } finally { 095 latch.countDown(); 096 } 097 } 098 099 public Throwable getErr() { 100 return err; 101 } 102 103 public void setErr(Throwable err) { 104 this.err = err; 105 } 106}