001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.PriorityQueue;
022import java.util.Queue;
023import java.util.Set;
024import java.util.concurrent.ArrayBlockingQueue;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.ConcurrentSkipListSet;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/** Creates multiple threads that write key/values into the */
040@InterfaceAudience.Private
041public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
042  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class);
043
044  /**
045   * A temporary place to keep track of inserted/updated keys. This is written to by all writers and
046   * is drained on a separate thread that populates {@link #wroteUpToKey}, the maximum key in the
047   * contiguous range of keys being inserted/updated. This queue is supposed to stay small.
048   */
049  protected BlockingQueue<Long> wroteKeys;
050
051  /**
052   * This is the current key to be inserted/updated by any thread. Each thread does an atomic get
053   * and increment operation and inserts the current value.
054   */
055  protected AtomicLong nextKeyToWrite = new AtomicLong();
056
057  /**
058   * The highest key in the contiguous range of keys .
059   */
060  protected AtomicLong wroteUpToKey = new AtomicLong();
061
062  /** The sorted set of keys NOT inserted/updated by the writers */
063  protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>();
064
065  /**
066   * The total size of the temporary inserted/updated key set that have not yet lined up in a our
067   * contiguous sequence starting from startKey. Supposed to stay small.
068   */
069  protected AtomicLong wroteKeyQueueSize = new AtomicLong();
070
071  /** Enable this if used in conjunction with a concurrent reader. */
072  protected boolean trackWroteKeys;
073
074  public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
075    TableName tableName, String actionLetter) throws IOException {
076    super(dataGen, conf, tableName, actionLetter);
077    this.wroteKeys = createWriteKeysQueue(conf);
078  }
079
080  protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
081    return new ArrayBlockingQueue<>(10000);
082  }
083
084  @Override
085  public void start(long startKey, long endKey, int numThreads) throws IOException {
086    super.start(startKey, endKey, numThreads);
087    nextKeyToWrite.set(startKey);
088    wroteUpToKey.set(startKey - 1);
089
090    if (trackWroteKeys) {
091      new Thread(new WroteKeysTracker(),
092        "MultiThreadedWriterBase-WroteKeysTracker-" + EnvironmentEdgeManager.currentTime()).start();
093      numThreadsWorking.incrementAndGet();
094    }
095  }
096
097  protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
098    HRegionLocation cached = null, real = null;
099    try (RegionLocator locator = connection.getRegionLocator(tableName)) {
100      cached = locator.getRegionLocation(rowKey, false);
101      real = locator.getRegionLocation(rowKey, true);
102    } catch (Throwable t) {
103      // Cannot obtain region information for another catch block - too bad!
104    }
105    String result = "no information can be obtained";
106    if (cached != null) {
107      result = "cached: " + cached.toString();
108    }
109    if (real != null && real.getServerName() != null) {
110      if (cached != null && cached.getServerName() != null && real.equals(cached)) {
111        result += "; cache is up to date";
112      } else {
113        result = (cached != null) ? (result + "; ") : "";
114        result += "real: " + real.toString();
115      }
116    }
117    return result;
118  }
119
120  /**
121   * A thread that keeps track of the highest key in the contiguous range of inserted/updated keys.
122   */
123  private class WroteKeysTracker implements Runnable {
124
125    @Override
126    public void run() {
127      Thread.currentThread().setName(getClass().getSimpleName());
128      try {
129        long expectedKey = startKey;
130        Queue<Long> sortedKeys = new PriorityQueue<>();
131        while (expectedKey < endKey) {
132          // Block until a new element is available.
133          Long k;
134          try {
135            k = wroteKeys.poll(1, TimeUnit.SECONDS);
136          } catch (InterruptedException e) {
137            LOG.info("Inserted key tracker thread interrupted", e);
138            break;
139          }
140          if (k == null) {
141            continue;
142          }
143          if (k == expectedKey) {
144            // Skip the "sorted key" queue and consume this key.
145            wroteUpToKey.set(k);
146            ++expectedKey;
147          } else {
148            sortedKeys.add(k);
149          }
150
151          // See if we have a sequence of contiguous keys lined up.
152          while (!sortedKeys.isEmpty() && ((k = sortedKeys.peek()) == expectedKey)) {
153            sortedKeys.poll();
154            wroteUpToKey.set(k);
155            ++expectedKey;
156          }
157
158          wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
159        }
160      } catch (Exception ex) {
161        LOG.error("Error in inserted/updaed key tracker", ex);
162      } finally {
163        numThreadsWorking.decrementAndGet();
164      }
165    }
166  }
167
168  public int getNumWriteFailures() {
169    return failedKeySet.size();
170  }
171
172  /**
173   * The max key until which all keys have been inserted/updated (successfully or not).
174   * @return the last key that we have inserted/updated all keys up to (inclusive)
175   */
176  public long wroteUpToKey() {
177    return wroteUpToKey.get();
178  }
179
180  public boolean failedToWriteKey(long k) {
181    return failedKeySet.contains(k);
182  }
183
184  @Override
185  protected String progressInfo() {
186    StringBuilder sb = new StringBuilder();
187    appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
188    appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
189    return sb.toString();
190  }
191
192  /**
193   * Used for a joint write/read workload. Enables tracking the last inserted/updated key, which
194   * requires a blocking queue and a consumer thread.
195   * @param enable whether to enable tracking the last inserted/updated key
196   */
197  public void setTrackWroteKeys(boolean enable) {
198    trackWroteKeys = enable;
199  }
200}