001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.Cell.Type;
031import org.apache.hadoop.hbase.CellBuilderFactory;
032import org.apache.hadoop.hbase.CellBuilderType;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055
056/**
057 * Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code.
058 */
059@Category({ CoprocessorTests.class, MediumTests.class })
060public class TestFlushLifeCycleTracker {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestFlushLifeCycleTracker.class);
065
066  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
067
068  private static final TableName NAME =
069      TableName.valueOf(TestFlushLifeCycleTracker.class.getSimpleName());
070
071  private static final byte[] CF = Bytes.toBytes("CF");
072
073  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
074
075  private HRegion region;
076
077  private static FlushLifeCycleTracker TRACKER;
078
079  private static volatile CountDownLatch ARRIVE;
080
081  private static volatile CountDownLatch BLOCK;
082
083  public static final class FlushObserver implements RegionObserver, RegionCoprocessor {
084
085    @Override
086    public Optional<RegionObserver> getRegionObserver() {
087      return Optional.of(this);
088    }
089
090    @Override
091    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
092        FlushLifeCycleTracker tracker) throws IOException {
093      if (TRACKER != null) {
094        assertSame(tracker, TRACKER);
095      }
096    }
097
098    @Override
099    public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
100        InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
101      if (TRACKER != null) {
102        assertSame(tracker, TRACKER);
103      }
104      return scanner;
105    }
106
107    @Override
108    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
109        FlushLifeCycleTracker tracker) throws IOException {
110      if (TRACKER != null) {
111        assertSame(tracker, TRACKER);
112      }
113    }
114
115    @Override
116    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
117        StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
118      if (TRACKER != null) {
119        assertSame(tracker, TRACKER);
120      }
121      // inject here so we can make a flush request to fail because of we already have a flush
122      // ongoing.
123      CountDownLatch arrive = ARRIVE;
124      if (arrive != null) {
125        arrive.countDown();
126        try {
127          BLOCK.await();
128        } catch (InterruptedException e) {
129          throw new InterruptedIOException();
130        }
131      }
132    }
133  }
134
135  private static final class Tracker implements FlushLifeCycleTracker {
136
137    private String reason;
138
139    private boolean beforeExecutionCalled;
140
141    private boolean afterExecutionCalled;
142
143    private boolean completed = false;
144
145    @Override
146    public synchronized void notExecuted(String reason) {
147      this.reason = reason;
148      completed = true;
149      notifyAll();
150    }
151
152    @Override
153    public void beforeExecution() {
154      this.beforeExecutionCalled = true;
155    }
156
157    @Override
158    public synchronized void afterExecution() {
159      this.afterExecutionCalled = true;
160      completed = true;
161      notifyAll();
162    }
163
164    public synchronized void await() throws InterruptedException {
165      while (!completed) {
166        wait();
167      }
168    }
169  }
170
171  @BeforeClass
172  public static void setUpBeforeClass() throws Exception {
173    UTIL.startMiniCluster(3);
174  }
175
176  @AfterClass
177  public static void tearDownAfterClass() throws Exception {
178    UTIL.shutdownMiniCluster();
179  }
180
181  @Before
182  public void setUp() throws IOException {
183    UTIL.getAdmin()
184        .createTable(TableDescriptorBuilder.newBuilder(NAME)
185            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
186            .setCoprocessor(FlushObserver.class.getName()).build());
187    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
188  }
189
190  @After
191  public void tearDown() throws IOException {
192    region = null;
193    TRACKER = null;
194    UTIL.deleteTable(NAME);
195  }
196
197  @Test
198  public void test() throws IOException, InterruptedException {
199    try (Table table = UTIL.getConnection().getTable(NAME)) {
200      for (int i = 0; i < 100; i++) {
201        byte[] row = Bytes.toBytes(i);
202        table.put(new Put(row, true)
203                    .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
204                        .setRow(row)
205                        .setFamily(CF)
206                        .setQualifier(QUALIFIER)
207                        .setTimestamp(HConstants.LATEST_TIMESTAMP)
208                        .setType(Type.Put)
209                        .setValue(Bytes.toBytes(i))
210                        .build()));
211      }
212    }
213    Tracker tracker = new Tracker();
214    TRACKER = tracker;
215    region.requestFlush(tracker);
216    tracker.await();
217    assertNull(tracker.reason);
218    assertTrue(tracker.beforeExecutionCalled);
219    assertTrue(tracker.afterExecutionCalled);
220
221    // request flush on a region with empty memstore should still success
222    tracker = new Tracker();
223    TRACKER = tracker;
224    region.requestFlush(tracker);
225    tracker.await();
226    assertNull(tracker.reason);
227    assertTrue(tracker.beforeExecutionCalled);
228    assertTrue(tracker.afterExecutionCalled);
229  }
230
231  @Test
232  public void testNotExecuted() throws IOException, InterruptedException {
233    try (Table table = UTIL.getConnection().getTable(NAME)) {
234      for (int i = 0; i < 100; i++) {
235        byte[] row = Bytes.toBytes(i);
236        table.put(new Put(row, true)
237                    .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
238                        .setRow(row)
239                        .setFamily(CF)
240                        .setQualifier(QUALIFIER)
241                        .setTimestamp(HConstants.LATEST_TIMESTAMP)
242                        .setType(Type.Put)
243                        .setValue(Bytes.toBytes(i))
244                        .build()));
245      }
246    }
247    // here we may have overlap when calling the CP hooks so we do not assert on TRACKER
248    Tracker tracker1 = new Tracker();
249    ARRIVE = new CountDownLatch(1);
250    BLOCK = new CountDownLatch(1);
251    region.requestFlush(tracker1);
252    ARRIVE.await();
253
254    Tracker tracker2 = new Tracker();
255    region.requestFlush(tracker2);
256    tracker2.await();
257    assertNotNull(tracker2.reason);
258    assertFalse(tracker2.beforeExecutionCalled);
259    assertFalse(tracker2.afterExecutionCalled);
260
261    BLOCK.countDown();
262    tracker1.await();
263    assertNull(tracker1.reason);
264    assertTrue(tracker1.beforeExecutionCalled);
265    assertTrue(tracker1.afterExecutionCalled);
266  }
267}