001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.PriorityQueue; 022import java.util.Queue; 023import java.util.Set; 024import java.util.concurrent.ArrayBlockingQueue; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.ConcurrentSkipListSet; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** Creates multiple threads that write key/values into the */ 040@InterfaceAudience.Private 041public abstract class MultiThreadedWriterBase extends MultiThreadedAction { 042 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class); 043 044 /** 045 * A temporary place to keep track of inserted/updated keys. This is written to by all writers and 046 * is drained on a separate thread that populates {@link #wroteUpToKey}, the maximum key in the 047 * contiguous range of keys being inserted/updated. This queue is supposed to stay small. 048 */ 049 protected BlockingQueue<Long> wroteKeys; 050 051 /** 052 * This is the current key to be inserted/updated by any thread. Each thread does an atomic get 053 * and increment operation and inserts the current value. 054 */ 055 protected AtomicLong nextKeyToWrite = new AtomicLong(); 056 057 /** 058 * The highest key in the contiguous range of keys . 059 */ 060 protected AtomicLong wroteUpToKey = new AtomicLong(); 061 062 /** The sorted set of keys NOT inserted/updated by the writers */ 063 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>(); 064 065 /** 066 * The total size of the temporary inserted/updated key set that have not yet lined up in a our 067 * contiguous sequence starting from startKey. Supposed to stay small. 068 */ 069 protected AtomicLong wroteKeyQueueSize = new AtomicLong(); 070 071 /** Enable this if used in conjunction with a concurrent reader. */ 072 protected boolean trackWroteKeys; 073 074 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, 075 TableName tableName, String actionLetter) throws IOException { 076 super(dataGen, conf, tableName, actionLetter); 077 this.wroteKeys = createWriteKeysQueue(conf); 078 } 079 080 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) { 081 return new ArrayBlockingQueue<>(10000); 082 } 083 084 @Override 085 public void start(long startKey, long endKey, int numThreads) throws IOException { 086 super.start(startKey, endKey, numThreads); 087 nextKeyToWrite.set(startKey); 088 wroteUpToKey.set(startKey - 1); 089 090 if (trackWroteKeys) { 091 new Thread(new WroteKeysTracker(), 092 "MultiThreadedWriterBase-WroteKeysTracker-" + EnvironmentEdgeManager.currentTime()).start(); 093 numThreadsWorking.incrementAndGet(); 094 } 095 } 096 097 protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) { 098 HRegionLocation cached = null, real = null; 099 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 100 cached = locator.getRegionLocation(rowKey, false); 101 real = locator.getRegionLocation(rowKey, true); 102 } catch (Throwable t) { 103 // Cannot obtain region information for another catch block - too bad! 104 } 105 String result = "no information can be obtained"; 106 if (cached != null) { 107 result = "cached: " + cached.toString(); 108 } 109 if (real != null && real.getServerName() != null) { 110 if (cached != null && cached.getServerName() != null && real.equals(cached)) { 111 result += "; cache is up to date"; 112 } else { 113 result = (cached != null) ? (result + "; ") : ""; 114 result += "real: " + real.toString(); 115 } 116 } 117 return result; 118 } 119 120 /** 121 * A thread that keeps track of the highest key in the contiguous range of inserted/updated keys. 122 */ 123 private class WroteKeysTracker implements Runnable { 124 125 @Override 126 public void run() { 127 Thread.currentThread().setName(getClass().getSimpleName()); 128 try { 129 long expectedKey = startKey; 130 Queue<Long> sortedKeys = new PriorityQueue<>(); 131 while (expectedKey < endKey) { 132 // Block until a new element is available. 133 Long k; 134 try { 135 k = wroteKeys.poll(1, TimeUnit.SECONDS); 136 } catch (InterruptedException e) { 137 LOG.info("Inserted key tracker thread interrupted", e); 138 break; 139 } 140 if (k == null) { 141 continue; 142 } 143 if (k == expectedKey) { 144 // Skip the "sorted key" queue and consume this key. 145 wroteUpToKey.set(k); 146 ++expectedKey; 147 } else { 148 sortedKeys.add(k); 149 } 150 151 // See if we have a sequence of contiguous keys lined up. 152 while (!sortedKeys.isEmpty() && ((k = sortedKeys.peek()) == expectedKey)) { 153 sortedKeys.poll(); 154 wroteUpToKey.set(k); 155 ++expectedKey; 156 } 157 158 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size()); 159 } 160 } catch (Exception ex) { 161 LOG.error("Error in inserted/updaed key tracker", ex); 162 } finally { 163 numThreadsWorking.decrementAndGet(); 164 } 165 } 166 } 167 168 public int getNumWriteFailures() { 169 return failedKeySet.size(); 170 } 171 172 /** 173 * The max key until which all keys have been inserted/updated (successfully or not). 174 * @return the last key that we have inserted/updated all keys up to (inclusive) 175 */ 176 public long wroteUpToKey() { 177 return wroteUpToKey.get(); 178 } 179 180 public boolean failedToWriteKey(long k) { 181 return failedKeySet.contains(k); 182 } 183 184 @Override 185 protected String progressInfo() { 186 StringBuilder sb = new StringBuilder(); 187 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get()); 188 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get()); 189 return sb.toString(); 190 } 191 192 /** 193 * Used for a joint write/read workload. Enables tracking the last inserted/updated key, which 194 * requires a blocking queue and a consumer thread. 195 * @param enable whether to enable tracking the last inserted/updated key 196 */ 197 public void setTrackWroteKeys(boolean enable) { 198 trackWroteKeys = enable; 199 } 200}