001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.util; 020 021import java.io.IOException; 022import java.util.PriorityQueue; 023import java.util.Queue; 024import java.util.Set; 025import java.util.concurrent.ArrayBlockingQueue; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.ConcurrentSkipListSet; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** Creates multiple threads that write key/values into the */ 040public abstract class MultiThreadedWriterBase extends MultiThreadedAction { 041 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class); 042 043 /** 044 * A temporary place to keep track of inserted/updated keys. This is written to by 045 * all writers and is drained on a separate thread that populates 046 * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys 047 * being inserted/updated. This queue is supposed to stay small. 048 */ 049 protected BlockingQueue<Long> wroteKeys; 050 051 /** 052 * This is the current key to be inserted/updated by any thread. Each thread does an 053 * atomic get and increment operation and inserts the current value. 054 */ 055 protected AtomicLong nextKeyToWrite = new AtomicLong(); 056 057 /** 058 * The highest key in the contiguous range of keys . 059 */ 060 protected AtomicLong wroteUpToKey = new AtomicLong(); 061 062 /** The sorted set of keys NOT inserted/updated by the writers */ 063 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>(); 064 065 /** 066 * The total size of the temporary inserted/updated key set that have not yet lined 067 * up in a our contiguous sequence starting from startKey. Supposed to stay 068 * small. 069 */ 070 protected AtomicLong wroteKeyQueueSize = new AtomicLong(); 071 072 /** Enable this if used in conjunction with a concurrent reader. */ 073 protected boolean trackWroteKeys; 074 075 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, 076 TableName tableName, String actionLetter) throws IOException { 077 super(dataGen, conf, tableName, actionLetter); 078 this.wroteKeys = createWriteKeysQueue(conf); 079 } 080 081 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) { 082 return new ArrayBlockingQueue<>(10000); 083 } 084 085 @Override 086 public void start(long startKey, long endKey, int numThreads) throws IOException { 087 super.start(startKey, endKey, numThreads); 088 nextKeyToWrite.set(startKey); 089 wroteUpToKey.set(startKey - 1); 090 091 if (trackWroteKeys) { 092 new Thread(new WroteKeysTracker(), 093 "MultiThreadedWriterBase-WroteKeysTracker-" + System.currentTimeMillis()).start(); 094 numThreadsWorking.incrementAndGet(); 095 } 096 } 097 098 protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) { 099 HRegionLocation cached = null, real = null; 100 try { 101 cached = connection.getRegionLocation(tableName, rowKey, false); 102 real = connection.getRegionLocation(tableName, rowKey, true); 103 } catch (Throwable t) { 104 // Cannot obtain region information for another catch block - too bad! 105 } 106 String result = "no information can be obtained"; 107 if (cached != null) { 108 result = "cached: " + cached.toString(); 109 } 110 if (real != null && real.getServerName() != null) { 111 if (cached != null && cached.getServerName() != null && real.equals(cached)) { 112 result += "; cache is up to date"; 113 } else { 114 result = (cached != null) ? (result + "; ") : ""; 115 result += "real: " + real.toString(); 116 } 117 } 118 return result; 119 } 120 121 /** 122 * A thread that keeps track of the highest key in the contiguous range of 123 * inserted/updated keys. 124 */ 125 private class WroteKeysTracker implements Runnable { 126 127 @Override 128 public void run() { 129 Thread.currentThread().setName(getClass().getSimpleName()); 130 try { 131 long expectedKey = startKey; 132 Queue<Long> sortedKeys = new PriorityQueue<>(); 133 while (expectedKey < endKey) { 134 // Block until a new element is available. 135 Long k; 136 try { 137 k = wroteKeys.poll(1, TimeUnit.SECONDS); 138 } catch (InterruptedException e) { 139 LOG.info("Inserted key tracker thread interrupted", e); 140 break; 141 } 142 if (k == null) { 143 continue; 144 } 145 if (k == expectedKey) { 146 // Skip the "sorted key" queue and consume this key. 147 wroteUpToKey.set(k); 148 ++expectedKey; 149 } else { 150 sortedKeys.add(k); 151 } 152 153 // See if we have a sequence of contiguous keys lined up. 154 while (!sortedKeys.isEmpty() 155 && ((k = sortedKeys.peek()) == expectedKey)) { 156 sortedKeys.poll(); 157 wroteUpToKey.set(k); 158 ++expectedKey; 159 } 160 161 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size()); 162 } 163 } catch (Exception ex) { 164 LOG.error("Error in inserted/updaed key tracker", ex); 165 } finally { 166 numThreadsWorking.decrementAndGet(); 167 } 168 } 169 } 170 171 public int getNumWriteFailures() { 172 return failedKeySet.size(); 173 } 174 175 /** 176 * The max key until which all keys have been inserted/updated (successfully or not). 177 * @return the last key that we have inserted/updated all keys up to (inclusive) 178 */ 179 public long wroteUpToKey() { 180 return wroteUpToKey.get(); 181 } 182 183 public boolean failedToWriteKey(long k) { 184 return failedKeySet.contains(k); 185 } 186 187 @Override 188 protected String progressInfo() { 189 StringBuilder sb = new StringBuilder(); 190 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get()); 191 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get()); 192 return sb.toString(); 193 } 194 195 /** 196 * Used for a joint write/read workload. Enables tracking the last inserted/updated 197 * key, which requires a blocking queue and a consumer thread. 198 * @param enable whether to enable tracking the last inserted/updated key 199 */ 200 public void setTrackWroteKeys(boolean enable) { 201 trackWroteKeys = enable; 202 } 203}