001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.LinkedBlockingQueue; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.regionserver.StoreContext; 031import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035public class StoreFileTrackerForTest extends DefaultStoreFileTracker { 036 037 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerForTest.class); 038 private static ConcurrentMap<String, BlockingQueue<StoreFileInfo>> trackedFiles = 039 new ConcurrentHashMap<>(); 040 private String storeId; 041 042 public StoreFileTrackerForTest(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 043 super(conf, isPrimaryReplica, ctx); 044 if (ctx != null && ctx.getRegionFileSystem() != null) { 045 this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString(); 046 LOG.info("created storeId: {}", storeId); 047 trackedFiles.computeIfAbsent(storeId, v -> new LinkedBlockingQueue<>()); 048 } else { 049 LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null."); 050 } 051 } 052 053 @Override 054 protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { 055 LOG.info("adding to storeId: {}", storeId); 056 trackedFiles.get(storeId).addAll(newFiles); 057 } 058 059 @Override 060 public List<StoreFileInfo> load() throws IOException { 061 return new ArrayList<>(trackedFiles.get(storeId)); 062 } 063 064 public static boolean tracked(String encodedRegionName, String family, Path file) { 065 BlockingQueue<StoreFileInfo> files = trackedFiles.get(encodedRegionName + "-" + family); 066 return files != null && files.stream().anyMatch(s -> s.getPath().equals(file)); 067 } 068 069 public static void clear() { 070 trackedFiles.clear(); 071 } 072}