001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.PriorityQueue; 022import java.util.Queue; 023import java.util.Set; 024import java.util.concurrent.ArrayBlockingQueue; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.ConcurrentSkipListSet; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** Creates multiple threads that write key/values into the */ 038public abstract class MultiThreadedWriterBase extends MultiThreadedAction { 039 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class); 040 041 /** 042 * A temporary place to keep track of inserted/updated keys. This is written to by all writers and 043 * is drained on a separate thread that populates {@link #wroteUpToKey}, the maximum key in the 044 * contiguous range of keys being inserted/updated. This queue is supposed to stay small. 045 */ 046 protected BlockingQueue<Long> wroteKeys; 047 048 /** 049 * This is the current key to be inserted/updated by any thread. Each thread does an atomic get 050 * and increment operation and inserts the current value. 051 */ 052 protected AtomicLong nextKeyToWrite = new AtomicLong(); 053 054 /** 055 * The highest key in the contiguous range of keys . 056 */ 057 protected AtomicLong wroteUpToKey = new AtomicLong(); 058 059 /** The sorted set of keys NOT inserted/updated by the writers */ 060 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>(); 061 062 /** 063 * The total size of the temporary inserted/updated key set that have not yet lined up in a our 064 * contiguous sequence starting from startKey. Supposed to stay small. 065 */ 066 protected AtomicLong wroteKeyQueueSize = new AtomicLong(); 067 068 /** Enable this if used in conjunction with a concurrent reader. */ 069 protected boolean trackWroteKeys; 070 071 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, 072 TableName tableName, String actionLetter) throws IOException { 073 super(dataGen, conf, tableName, actionLetter); 074 this.wroteKeys = createWriteKeysQueue(conf); 075 } 076 077 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) { 078 return new ArrayBlockingQueue<>(10000); 079 } 080 081 @Override 082 public void start(long startKey, long endKey, int numThreads) throws IOException { 083 super.start(startKey, endKey, numThreads); 084 nextKeyToWrite.set(startKey); 085 wroteUpToKey.set(startKey - 1); 086 087 if (trackWroteKeys) { 088 new Thread(new WroteKeysTracker(), 089 "MultiThreadedWriterBase-WroteKeysTracker-" + EnvironmentEdgeManager.currentTime()).start(); 090 numThreadsWorking.incrementAndGet(); 091 } 092 } 093 094 protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) { 095 HRegionLocation cached = null, real = null; 096 try { 097 cached = connection.getRegionLocation(tableName, rowKey, false); 098 real = connection.getRegionLocation(tableName, rowKey, true); 099 } catch (Throwable t) { 100 // Cannot obtain region information for another catch block - too bad! 101 } 102 String result = "no information can be obtained"; 103 if (cached != null) { 104 result = "cached: " + cached.toString(); 105 } 106 if (real != null && real.getServerName() != null) { 107 if (cached != null && cached.getServerName() != null && real.equals(cached)) { 108 result += "; cache is up to date"; 109 } else { 110 result = (cached != null) ? (result + "; ") : ""; 111 result += "real: " + real.toString(); 112 } 113 } 114 return result; 115 } 116 117 /** 118 * A thread that keeps track of the highest key in the contiguous range of inserted/updated keys. 119 */ 120 private class WroteKeysTracker implements Runnable { 121 122 @Override 123 public void run() { 124 Thread.currentThread().setName(getClass().getSimpleName()); 125 try { 126 long expectedKey = startKey; 127 Queue<Long> sortedKeys = new PriorityQueue<>(); 128 while (expectedKey < endKey) { 129 // Block until a new element is available. 130 Long k; 131 try { 132 k = wroteKeys.poll(1, TimeUnit.SECONDS); 133 } catch (InterruptedException e) { 134 LOG.info("Inserted key tracker thread interrupted", e); 135 break; 136 } 137 if (k == null) { 138 continue; 139 } 140 if (k == expectedKey) { 141 // Skip the "sorted key" queue and consume this key. 142 wroteUpToKey.set(k); 143 ++expectedKey; 144 } else { 145 sortedKeys.add(k); 146 } 147 148 // See if we have a sequence of contiguous keys lined up. 149 while (!sortedKeys.isEmpty() && ((k = sortedKeys.peek()) == expectedKey)) { 150 sortedKeys.poll(); 151 wroteUpToKey.set(k); 152 ++expectedKey; 153 } 154 155 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size()); 156 } 157 } catch (Exception ex) { 158 LOG.error("Error in inserted/updaed key tracker", ex); 159 } finally { 160 numThreadsWorking.decrementAndGet(); 161 } 162 } 163 } 164 165 public int getNumWriteFailures() { 166 return failedKeySet.size(); 167 } 168 169 /** 170 * The max key until which all keys have been inserted/updated (successfully or not). 171 * @return the last key that we have inserted/updated all keys up to (inclusive) 172 */ 173 public long wroteUpToKey() { 174 return wroteUpToKey.get(); 175 } 176 177 public boolean failedToWriteKey(long k) { 178 return failedKeySet.contains(k); 179 } 180 181 @Override 182 protected String progressInfo() { 183 StringBuilder sb = new StringBuilder(); 184 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get()); 185 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get()); 186 return sb.toString(); 187 } 188 189 /** 190 * Used for a joint write/read workload. Enables tracking the last inserted/updated key, which 191 * requires a blocking queue and a consumer thread. 192 * @param enable whether to enable tracking the last inserted/updated key 193 */ 194 public void setTrackWroteKeys(boolean enable) { 195 trackWroteKeys = enable; 196 } 197}