001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob.compactions;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.security.Key;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Calendar;
029import java.util.Collections;
030import java.util.List;
031import java.util.Objects;
032import java.util.Optional;
033import java.util.Random;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.RejectedExecutionException;
036import java.util.concurrent.RejectedExecutionHandler;
037import java.util.concurrent.SynchronousQueue;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import javax.crypto.spec.SecretKeySpec;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HColumnDescriptor;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.HTableDescriptor;
052import org.apache.hadoop.hbase.NamespaceDescriptor;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.BufferedMutator;
056import org.apache.hadoop.hbase.client.CompactType;
057import org.apache.hadoop.hbase.client.CompactionState;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.Delete;
061import org.apache.hadoop.hbase.client.Durability;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.Result;
066import org.apache.hadoop.hbase.client.ResultScanner;
067import org.apache.hadoop.hbase.client.Scan;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.coprocessor.ObserverContext;
070import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
071import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
072import org.apache.hadoop.hbase.coprocessor.RegionObserver;
073import org.apache.hadoop.hbase.io.HFileLink;
074import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
075import org.apache.hadoop.hbase.io.crypto.aes.AES;
076import org.apache.hadoop.hbase.io.hfile.CacheConfig;
077import org.apache.hadoop.hbase.io.hfile.HFile;
078import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
079import org.apache.hadoop.hbase.mob.MobConstants;
080import org.apache.hadoop.hbase.mob.MobFileName;
081import org.apache.hadoop.hbase.mob.MobUtils;
082import org.apache.hadoop.hbase.regionserver.BloomType;
083import org.apache.hadoop.hbase.regionserver.HRegion;
084import org.apache.hadoop.hbase.regionserver.HStoreFile;
085import org.apache.hadoop.hbase.regionserver.Store;
086import org.apache.hadoop.hbase.regionserver.StoreFile;
087import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
088import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
089import org.apache.hadoop.hbase.security.EncryptionUtil;
090import org.apache.hadoop.hbase.security.User;
091import org.apache.hadoop.hbase.testclassification.LargeTests;
092import org.apache.hadoop.hbase.util.Bytes;
093import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
094import org.apache.hadoop.hbase.util.Pair;
095import org.apache.hadoop.hbase.util.Threads;
096import org.junit.AfterClass;
097import org.junit.Assert;
098import org.junit.BeforeClass;
099import org.junit.ClassRule;
100import org.junit.Rule;
101import org.junit.Test;
102import org.junit.experimental.categories.Category;
103import org.junit.rules.TestName;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107@Category(LargeTests.class)
108public class TestMobCompactor {
109
110  @ClassRule
111  public static final HBaseClassTestRule CLASS_RULE =
112      HBaseClassTestRule.forClass(TestMobCompactor.class);
113
114  private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactor.class);
115  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
116  private static Configuration conf = null;
117  private TableName tableName;
118  private static Connection conn;
119  private BufferedMutator bufMut;
120  private Table table;
121  private static Admin admin;
122  private HTableDescriptor desc;
123  private HColumnDescriptor hcd1;
124  private HColumnDescriptor hcd2;
125  private static FileSystem fs;
126  private static final String family1 = "family1";
127  private static final String family2 = "family2";
128  private static final String qf1 = "qualifier1";
129  private static final String qf2 = "qualifier2";
130
131  private static long tsFor20150907Monday;
132  private static long tsFor20151120Sunday;
133  private static long tsFor20151128Saturday;
134  private static long tsFor20151130Monday;
135  private static long tsFor20151201Tuesday;
136  private static long tsFor20151205Saturday;
137  private static long tsFor20151228Monday;
138  private static long tsFor20151231Thursday;
139  private static long tsFor20160101Friday;
140  private static long tsFor20160103Sunday;
141
142  private static final byte[] mobKey01 = Bytes.toBytes("r01");
143  private static final byte[] mobKey02 = Bytes.toBytes("r02");
144  private static final byte[] mobKey03 = Bytes.toBytes("r03");
145  private static final byte[] mobKey04 = Bytes.toBytes("r04");
146  private static final byte[] mobKey05 = Bytes.toBytes("r05");
147  private static final byte[] mobKey06 = Bytes.toBytes("r05");
148  private static final byte[] mobKey1 = Bytes.toBytes("r1");
149  private static final byte[] mobKey2 = Bytes.toBytes("r2");
150  private static final byte[] mobKey3 = Bytes.toBytes("r3");
151  private static final byte[] mobKey4 = Bytes.toBytes("r4");
152  private static final byte[] mobKey5 = Bytes.toBytes("r5");
153  private static final byte[] mobKey6 = Bytes.toBytes("r6");
154  private static final byte[] mobKey7 = Bytes.toBytes("r7");
155  private static final byte[] mobKey8 = Bytes.toBytes("r8");
156  private static final String mobValue0 = "mobValue00000000000000000000000000";
157  private static final String mobValue1 = "mobValue00000111111111111111111111";
158  private static final String mobValue2 = "mobValue00000222222222222222222222";
159  private static final String mobValue3 = "mobValue00000333333333333333333333";
160  private static final String mobValue4 = "mobValue00000444444444444444444444";
161  private static final String mobValue5 = "mobValue00000666666666666666666666";
162  private static final String mobValue6 = "mobValue00000777777777777777777777";
163  private static final String mobValue7 = "mobValue00000888888888888888888888";
164  private static final String mobValue8 = "mobValue00000888888888888888888899";
165
166  private static byte[] KEYS = Bytes.toBytes("012");
167  private static int regionNum = KEYS.length;
168  private static int delRowNum = 1;
169  private static int delCellNum = 6;
170  private static int cellNumPerRow = 3;
171  private static int rowNumPerFile = 2;
172  private static ExecutorService pool;
173
174  @Rule
175  public TestName name = new TestName();
176
177  @BeforeClass
178  public static void setUpBeforeClass() throws Exception {
179    TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000);
180    TEST_UTIL.getConfiguration()
181        .set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
182    TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
183    TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
184    TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
185    TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100);
186    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
187    TEST_UTIL.startMiniCluster(1);
188    pool = createThreadPool(TEST_UTIL.getConfiguration());
189    conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
190    fs = TEST_UTIL.getTestFileSystem();
191    conf = TEST_UTIL.getConfiguration();
192    admin = TEST_UTIL.getAdmin();
193
194    // Initialize timestamps for these days
195    Calendar calendar =  Calendar.getInstance();
196    calendar.set(2015, 8, 7, 10, 20);
197    tsFor20150907Monday = calendar.getTimeInMillis();
198
199    calendar.set(2015, 10, 20, 10, 20);
200    tsFor20151120Sunday = calendar.getTimeInMillis();
201
202    calendar.set(2015, 10, 28, 10, 20);
203    tsFor20151128Saturday = calendar.getTimeInMillis();
204
205    calendar.set(2015, 10, 30, 10, 20);
206    tsFor20151130Monday = calendar.getTimeInMillis();
207
208    calendar.set(2015, 11, 1, 10, 20);
209    tsFor20151201Tuesday = calendar.getTimeInMillis();
210
211    calendar.set(2015, 11, 5, 10, 20);
212    tsFor20151205Saturday = calendar.getTimeInMillis();
213
214    calendar.set(2015, 11, 28, 10, 20);
215    tsFor20151228Monday = calendar.getTimeInMillis();
216
217    calendar.set(2015, 11, 31, 10, 20);
218    tsFor20151231Thursday = calendar.getTimeInMillis();
219
220    calendar.set(2016, 0, 1, 10, 20);
221    tsFor20160101Friday = calendar.getTimeInMillis();
222
223    calendar.set(2016, 0, 3, 10, 20);
224    tsFor20160103Sunday = calendar.getTimeInMillis();
225  }
226
227  @AfterClass
228  public static void tearDownAfterClass() throws Exception {
229    pool.shutdown();
230    conn.close();
231    TEST_UTIL.shutdownMiniCluster();
232  }
233
234  public void setUp(String tableNameAsString) throws IOException {
235    tableName = TableName.valueOf(tableNameAsString);
236    hcd1 = new HColumnDescriptor(family1);
237    hcd1.setMobEnabled(true);
238    hcd1.setMobThreshold(5);
239    hcd2 = new HColumnDescriptor(family2);
240    hcd2.setMobEnabled(true);
241    hcd2.setMobThreshold(5);
242    desc = new HTableDescriptor(tableName);
243    desc.addFamily(hcd1);
244    desc.addFamily(hcd2);
245    admin.createTable(desc, getSplitKeys());
246    table = conn.getTable(tableName);
247    bufMut = conn.getBufferedMutator(tableName);
248  }
249
250  // Set up for mob compaction policy testing
251  private void setUpForPolicyTest(String tableNameAsString, MobCompactPartitionPolicy type)
252      throws IOException {
253    tableName = TableName.valueOf(tableNameAsString);
254    hcd1 = new HColumnDescriptor(family1);
255    hcd1.setMobEnabled(true);
256    hcd1.setMobThreshold(10);
257    hcd1.setMobCompactPartitionPolicy(type);
258    desc = new HTableDescriptor(tableName);
259    desc.addFamily(hcd1);
260    admin.createTable(desc);
261    table = conn.getTable(tableName);
262    bufMut = conn.getBufferedMutator(tableName);
263  }
264
265  // alter mob compaction policy
266  private void alterForPolicyTest(final MobCompactPartitionPolicy type)
267      throws Exception {
268
269    hcd1.setMobCompactPartitionPolicy(type);
270    desc.modifyFamily(hcd1);
271    admin.modifyTable(tableName, desc);
272    Pair<Integer, Integer> st;
273
274    while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) {
275      LOG.debug(st.getFirst() + " regions left to update");
276      Thread.sleep(40);
277    }
278    LOG.info("alter status finished");
279  }
280
281  @Test
282  public void testMinorCompaction() throws Exception {
283    resetConf();
284    int mergeSize = 5000;
285    // change the mob compaction merge size
286    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
287
288    // create a table with namespace
289    NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build();
290    String tableNameAsString = "ns:testMinorCompaction";
291    admin.createNamespace(namespaceDescriptor);
292    setUp(tableNameAsString);
293    int count = 4;
294    // generate mob files
295    loadData(admin, bufMut, tableName, count, rowNumPerFile);
296    int rowNumPerRegion = count * rowNumPerFile;
297
298    assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion,
299      countMobRows(table));
300    assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion,
301      countMobCells(table));
302    assertEquals("Before deleting: mob file count", regionNum * count,
303      countFiles(tableName, true, family1));
304
305    int largeFilesCount = countLargeFiles(mergeSize, tableName, family1);
306    createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
307
308    assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
309      countMobRows(table));
310    assertEquals("Before compaction: mob cells count", regionNum
311      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
312    assertEquals("Before compaction: family1 mob file count", regionNum * count,
313      countFiles(tableName, true, family1));
314    assertEquals("Before compaction: family2 mob file count", regionNum * count,
315      countFiles(tableName, true, family2));
316    assertEquals("Before compaction: family1 del file count", regionNum,
317      countFiles(tableName, false, family1));
318    assertEquals("Before compaction: family2 del file count", regionNum,
319      countFiles(tableName, false, family2));
320
321    // do the mob file compaction
322    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
323    compactor.compact();
324
325    assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
326      countMobRows(table));
327    assertEquals("After compaction: mob cells count", regionNum
328      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
329    // After the compaction, the files smaller than the mob compaction merge size
330    // is merge to one file
331    assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
332      countFiles(tableName, true, family1));
333    assertEquals("After compaction: family2 mob file count", regionNum * count,
334      countFiles(tableName, true, family2));
335    assertEquals("After compaction: family1 del file count", regionNum,
336      countFiles(tableName, false, family1));
337    assertEquals("After compaction: family2 del file count", regionNum,
338      countFiles(tableName, false, family2));
339  }
340
341  @Test
342  public void testMinorCompactionWithWeeklyPolicy() throws Exception {
343    resetConf();
344    int mergeSize = 5000;
345    // change the mob compaction merge size
346    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
347
348    commonPolicyTestLogic("testMinorCompactionWithWeeklyPolicy",
349        MobCompactPartitionPolicy.WEEKLY, false, 6,
350        new String[] { "20150907", "20151120", "20151128", "20151130", "20151205", "20160103" },
351        true);
352  }
353
354  @Test
355  public void testMajorCompactionWithWeeklyPolicy() throws Exception {
356    resetConf();
357
358    commonPolicyTestLogic("testMajorCompactionWithWeeklyPolicy",
359        MobCompactPartitionPolicy.WEEKLY, true, 5,
360        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
361  }
362
363  @Test
364  public void testMinorCompactionWithMonthlyPolicy() throws Exception {
365    resetConf();
366    int mergeSize = 5000;
367    // change the mob compaction merge size
368    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
369
370    commonPolicyTestLogic("testMinorCompactionWithMonthlyPolicy",
371        MobCompactPartitionPolicy.MONTHLY, false, 4,
372        new String[] { "20150907", "20151130", "20151231", "20160103" }, true);
373  }
374
375  @Test
376  public void testMajorCompactionWithMonthlyPolicy() throws Exception {
377    resetConf();
378
379    commonPolicyTestLogic("testMajorCompactionWithMonthlyPolicy",
380        MobCompactPartitionPolicy.MONTHLY, true, 4,
381        new String[] {"20150907", "20151130", "20151231", "20160103"}, true);
382  }
383
384  @Test
385  public void testMajorCompactionWithWeeklyFollowedByMonthly() throws Exception {
386    resetConf();
387
388    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly",
389        MobCompactPartitionPolicy.WEEKLY, true, 5,
390        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
391
392    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly",
393        MobCompactPartitionPolicy.MONTHLY, true, 4,
394        new String[] {"20150907", "20151128", "20151205", "20160103" }, false);
395  }
396
397  @Test
398  public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly() throws Exception {
399    resetConf();
400
401    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
402        MobCompactPartitionPolicy.WEEKLY, true, 5,
403        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
404
405    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
406        MobCompactPartitionPolicy.MONTHLY, true, 4,
407        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
408
409    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
410        MobCompactPartitionPolicy.WEEKLY, true, 4,
411        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
412  }
413
414  @Test
415  public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily() throws Exception {
416    resetConf();
417
418    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
419        MobCompactPartitionPolicy.WEEKLY, true, 5,
420        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
421
422    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
423        MobCompactPartitionPolicy.MONTHLY, true, 4,
424        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
425
426    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
427        MobCompactPartitionPolicy.DAILY, true, 4,
428        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
429  }
430
431  @Test
432  public void testCompactionWithHFileLink() throws IOException, InterruptedException {
433    resetConf();
434    String tableNameAsString = "testCompactionWithHFileLink";
435    setUp(tableNameAsString);
436    int count = 4;
437    // generate mob files
438    loadData(admin, bufMut, tableName, count, rowNumPerFile);
439    int rowNumPerRegion = count * rowNumPerFile;
440
441    long tid = System.currentTimeMillis();
442    byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
443    // take a snapshot
444    admin.snapshot(snapshotName1, tableName);
445
446    createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
447
448    assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
449      countMobRows(table));
450    assertEquals("Before compaction: mob cells count", regionNum
451      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
452    assertEquals("Before compaction: family1 mob file count", regionNum * count,
453      countFiles(tableName, true, family1));
454    assertEquals("Before compaction: family2 mob file count", regionNum * count,
455      countFiles(tableName, true, family2));
456    assertEquals("Before compaction: family1 del file count", regionNum,
457      countFiles(tableName, false, family1));
458    assertEquals("Before compaction: family2 del file count", regionNum,
459      countFiles(tableName, false, family2));
460
461    // do the mob compaction
462    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
463    compactor.compact();
464
465    assertEquals("After first compaction: mob rows count", regionNum
466      * (rowNumPerRegion - delRowNum), countMobRows(table));
467    assertEquals("After first compaction: mob cells count", regionNum
468      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
469    assertEquals("After first compaction: family1 mob file count", regionNum,
470      countFiles(tableName, true, family1));
471    assertEquals("After first compaction: family2 mob file count", regionNum * count,
472      countFiles(tableName, true, family2));
473    assertEquals("After first compaction: family1 del file count", 0,
474      countFiles(tableName, false, family1));
475    assertEquals("After first compaction: family2 del file count", regionNum,
476      countFiles(tableName, false, family2));
477    assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
478    assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
479
480    admin.disableTable(tableName);
481    // Restore from snapshot, the hfilelink will exist in mob dir
482    admin.restoreSnapshot(snapshotName1);
483    admin.enableTable(tableName);
484
485    assertEquals("After restoring snapshot: mob rows count", regionNum * rowNumPerRegion,
486      countMobRows(table));
487    assertEquals("After restoring snapshot: mob cells count", regionNum * cellNumPerRow
488      * rowNumPerRegion, countMobCells(table));
489    assertEquals("After restoring snapshot: family1 mob file count", regionNum * count,
490      countFiles(tableName, true, family1));
491    assertEquals("After restoring snapshot: family2 mob file count", regionNum * count,
492      countFiles(tableName, true, family2));
493    assertEquals("After restoring snapshot: family1 del file count", 0,
494      countFiles(tableName, false, family1));
495    assertEquals("After restoring snapshot: family2 del file count", 0,
496      countFiles(tableName, false, family2));
497    assertEquals("After restoring snapshot: family1 hfilelink count", regionNum * count,
498      countHFileLinks(family1));
499    assertEquals("After restoring snapshot: family2 hfilelink count", 0, countHFileLinks(family2));
500
501    compactor.compact();
502
503    assertEquals("After second compaction: mob rows count", regionNum * rowNumPerRegion,
504      countMobRows(table));
505    assertEquals("After second compaction: mob cells count", regionNum * cellNumPerRow
506      * rowNumPerRegion, countMobCells(table));
507    assertEquals("After second compaction: family1 mob file count", regionNum,
508      countFiles(tableName, true, family1));
509    assertEquals("After second compaction: family2 mob file count", regionNum * count,
510      countFiles(tableName, true, family2));
511    assertEquals("After second compaction: family1 del file count", 0,
512      countFiles(tableName, false, family1));
513    assertEquals("After second compaction: family2 del file count", 0,
514      countFiles(tableName, false, family2));
515    assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
516    assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
517    assertRefFileNameEqual(family1);
518  }
519
520  @Test
521  public void testMajorCompactionFromAdmin() throws Exception {
522    resetConf();
523    int mergeSize = 5000;
524    // change the mob compaction merge size
525    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
526    SecureRandom rng = new SecureRandom();
527    byte[] keyBytes = new byte[AES.KEY_LENGTH];
528    rng.nextBytes(keyBytes);
529    String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
530    Key cfKey = new SecretKeySpec(keyBytes, algorithm);
531    byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
532      conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey);
533    final TableName tableName = TableName.valueOf(name.getMethodName());
534    HTableDescriptor desc = new HTableDescriptor(tableName);
535    HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
536    hcd1.setMobEnabled(true);
537    hcd1.setMobThreshold(0);
538    hcd1.setEncryptionType(algorithm);
539    hcd1.setEncryptionKey(encryptionKey);
540    HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
541    hcd2.setMobEnabled(true);
542    hcd2.setMobThreshold(0);
543    desc.addFamily(hcd1);
544    desc.addFamily(hcd2);
545    admin.createTable(desc, getSplitKeys());
546    Table table = conn.getTable(tableName);
547    BufferedMutator bufMut = conn.getBufferedMutator(tableName);
548    int count = 4;
549    // generate mob files
550    loadData(admin, bufMut, tableName, count, rowNumPerFile);
551    int rowNumPerRegion = count * rowNumPerFile;
552
553    assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion,
554      countMobRows(table));
555    assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion,
556      countMobCells(table));
557    assertEquals("Before deleting: mob file count", regionNum * count,
558      countFiles(tableName, true, family1));
559
560    createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
561
562    assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
563      countMobRows(table));
564    assertEquals("Before compaction: mob cells count", regionNum
565      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
566    assertEquals("Before compaction: family1 mob file count", regionNum * count,
567      countFiles(tableName, true, family1));
568    assertEquals("Before compaction: family2 mob file count", regionNum * count,
569      countFiles(tableName, true, family2));
570    assertEquals("Before compaction: family1 del file count", regionNum,
571      countFiles(tableName, false, family1));
572    assertEquals("Before compaction: family2 del file count", regionNum,
573      countFiles(tableName, false, family2));
574
575    // do the major mob compaction, it will force all files to compaction
576    admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
577
578    waitUntilMobCompactionFinished(tableName);
579    assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
580      countMobRows(table));
581    assertEquals("After compaction: mob cells count", regionNum
582      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
583    assertEquals("After compaction: family1 mob file count", regionNum,
584      countFiles(tableName, true, family1));
585    assertEquals("After compaction: family2 mob file count", regionNum * count,
586      countFiles(tableName, true, family2));
587    assertEquals("After compaction: family1 del file count", 0,
588      countFiles(tableName, false, family1));
589    assertEquals("After compaction: family2 del file count", regionNum,
590      countFiles(tableName, false, family2));
591    Assert.assertTrue(verifyEncryption(tableName, family1));
592    table.close();
593  }
594
595  @Test
596  public void testScannerOnBulkLoadRefHFiles() throws Exception {
597    resetConf();
598    setUp("testScannerOnBulkLoadRefHFiles");
599    long ts = EnvironmentEdgeManager.currentTime();
600    byte[] key0 = Bytes.toBytes("k0");
601    byte[] key1 = Bytes.toBytes("k1");
602    String value0 = "mobValue0";
603    String value1 = "mobValue1";
604    String newValue0 = "new";
605    Put put0 = new Put(key0);
606    put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0));
607    loadData(admin, bufMut, tableName, new Put[] { put0 });
608    put0 = new Put(key0);
609    put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0));
610    Put put1 = new Put(key1);
611    put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1));
612    loadData(admin, bufMut, tableName, new Put[] { put0, put1 });
613    // read the latest cell of key0.
614    Get get = new Get(key0);
615    Result result = table.get(get);
616    Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
617    assertEquals("Before compaction: mob value of k0", newValue0,
618      Bytes.toString(CellUtil.cloneValue(cell)));
619    admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
620    waitUntilMobCompactionFinished(tableName);
621    // read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the
622    // scanner. The cell that has "new" value is still visible.
623    result = table.get(get);
624    cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
625    assertEquals("After compaction: mob value of k0", newValue0,
626      Bytes.toString(CellUtil.cloneValue(cell)));
627    // read the ref cell, not read further to the mob cell.
628    get = new Get(key1);
629    get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
630    result = table.get(get);
631    cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
632    // the ref name is the new file
633    Path mobFamilyPath =
634      MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, hcd1.getNameAsString());
635    List<Path> paths = new ArrayList<>();
636    if (fs.exists(mobFamilyPath)) {
637      FileStatus[] files = fs.listStatus(mobFamilyPath);
638      for (FileStatus file : files) {
639        if (!StoreFileInfo.isDelFile(file.getPath())) {
640          paths.add(file.getPath());
641        }
642      }
643    }
644    assertEquals("After compaction: number of mob files:", 1, paths.size());
645    assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0)
646      .getName());
647  }
648
649  /**
650   * This case tests the following mob compaction and normal compaction scenario,
651   * after mob compaction, the mob reference in new bulkloaded hfile will win even after it
652   * is compacted with some other normal hfiles. This is to make sure the mvcc is included
653   * after compaction for mob enabled store files.
654   */
655  @Test
656  public void testGetAfterCompaction() throws Exception {
657    resetConf();
658    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
659    String famStr = "f1";
660    byte[] fam = Bytes.toBytes(famStr);
661    byte[] qualifier = Bytes.toBytes("q1");
662    byte[] mobVal = Bytes.toBytes("01234567890");
663    HTableDescriptor hdt = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
664    hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName());
665    HColumnDescriptor hcd = new HColumnDescriptor(fam);
666    hcd.setMobEnabled(true);
667    hcd.setMobThreshold(10);
668    hcd.setMaxVersions(1);
669    hdt.addFamily(hcd);
670    try {
671      Table table = TEST_UTIL.createTable(hdt, null);
672      HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0);
673      Put p = new Put(Bytes.toBytes("r1"));
674      p.addColumn(fam, qualifier, mobVal);
675      table.put(p);
676      // Create mob file mob1 and reference file ref1
677      TEST_UTIL.flush(table.getName());
678      // Make sure that it is flushed.
679      FileSystem fs = r.getRegionFileSystem().getFileSystem();
680      Path path = r.getRegionFileSystem().getStoreDir(famStr);
681      waitUntilFilesShowup(fs, path, 1);
682
683      p = new Put(Bytes.toBytes("r2"));
684      p.addColumn(fam, qualifier, mobVal);
685      table.put(p);
686      // Create mob file mob2 and reference file ref2
687      TEST_UTIL.flush(table.getName());
688      waitUntilFilesShowup(fs, path, 2);
689      // Do mob compaction to create mob3 and ref3
690      TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam, CompactType.MOB);
691      waitUntilFilesShowup(fs, path, 3);
692
693      // Compact ref3 and ref2 into ref4
694      TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam);
695      waitUntilFilesShowup(fs, path, 2);
696
697      // Sleep for some time, since TimeToLiveHFileCleaner is 0, the next run of
698      // clean chore is guaranteed to clean up files in archive
699      Thread.sleep(100);
700      // Run cleaner to make sure that files in archive directory are cleaned up
701      TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
702
703      // Get "r2"
704      Get get = new Get(Bytes.toBytes("r2"));
705      try {
706        Result result = table.get(get);
707        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
708      } catch (IOException e) {
709        assertTrue("The MOB file doesn't exist", false);
710      }
711    } finally {
712      TEST_UTIL.deleteTable(hdt.getTableName());
713    }
714  }
715
716  private void waitUntilFilesShowup(final FileSystem fs, final Path path, final int num)
717    throws InterruptedException, IOException {
718    FileStatus[] fileList = fs.listStatus(path);
719    while (fileList.length != num) {
720      Thread.sleep(50);
721      fileList = fs.listStatus(path);
722      for (FileStatus fileStatus: fileList) {
723        LOG.info(Objects.toString(fileStatus));
724      }
725    }
726  }
727
728  /**
729   * This copro overwrites the default compaction policy. It always chooses two latest hfiles and
730   * compacts them into a new one.
731   */
732  public static class CompactTwoLatestHfilesCopro implements RegionCoprocessor, RegionObserver {
733
734    @Override
735    public Optional<RegionObserver> getRegionObserver() {
736      return Optional.of(this);
737    }
738
739    @Override
740    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
741        List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
742        throws IOException {
743      int count = candidates.size();
744      if (count >= 2) {
745        for (int i = 0; i < count - 2; i++) {
746          candidates.remove(0);
747        }
748        c.bypass();
749      }
750    }
751  }
752
753  private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
754    InterruptedException {
755    long finished = EnvironmentEdgeManager.currentTime() + 60000;
756    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB);
757    while (EnvironmentEdgeManager.currentTime() < finished) {
758      if (state == CompactionState.NONE) {
759        break;
760      }
761      state = admin.getCompactionState(tableName, CompactType.MOB);
762      Thread.sleep(10);
763    }
764    assertEquals(CompactionState.NONE, state);
765  }
766
767  /**
768   * Gets the number of rows in the given table.
769   * @param table to get the  scanner
770   * @return the number of rows
771   */
772  private int countMobRows(final Table table) throws IOException {
773    Scan scan = new Scan();
774    // Do not retrieve the mob data when scanning
775    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
776    return TEST_UTIL.countRows(table, scan);
777  }
778
779  /**
780   * Gets the number of cells in the given table.
781   * @param table to get the  scanner
782   * @return the number of cells
783   */
784  private int countMobCells(final Table table) throws IOException {
785    Scan scan = new Scan();
786    // Do not retrieve the mob data when scanning
787    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
788    ResultScanner results = table.getScanner(scan);
789    int count = 0;
790    for (Result res : results) {
791      count += res.size();
792    }
793    results.close();
794    return count;
795  }
796
797  /**
798   * Gets the number of files in the mob path.
799   * @param isMobFile gets number of the mob files or del files
800   * @param familyName the family name
801   * @return the number of the files
802   */
803  private int countFiles(TableName tableName, boolean isMobFile, String familyName)
804    throws IOException {
805    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
806    int count = 0;
807    if (fs.exists(mobDirPath)) {
808      FileStatus[] files = fs.listStatus(mobDirPath);
809      for (FileStatus file : files) {
810        if (isMobFile == true) {
811          if (!StoreFileInfo.isDelFile(file.getPath())) {
812            count++;
813          }
814        } else {
815          if (StoreFileInfo.isDelFile(file.getPath())) {
816            count++;
817          }
818        }
819      }
820    }
821    return count;
822  }
823
824  private boolean verifyEncryption(TableName tableName, String familyName) throws IOException {
825    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
826    boolean hasFiles = false;
827    if (fs.exists(mobDirPath)) {
828      FileStatus[] files = fs.listStatus(mobDirPath);
829      hasFiles = files != null && files.length > 0;
830      Assert.assertTrue(hasFiles);
831      Path path = files[0].getPath();
832      CacheConfig cacheConf = new CacheConfig(conf);
833      HStoreFile sf = new HStoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf,
834        BloomType.NONE, true);
835      sf.initReader();
836      HFile.Reader reader = sf.getReader().getHFileReader();
837      byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
838      Assert.assertTrue(null != encryptionKey);
839      Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
840        .equals(HConstants.CIPHER_AES));
841    }
842    return hasFiles;
843  }
844
845  /**
846   * Gets the number of HFileLink in the mob path.
847   * @param familyName the family name
848   * @return the number of the HFileLink
849   */
850  private int countHFileLinks(String familyName) throws IOException {
851    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
852    int count = 0;
853    if (fs.exists(mobDirPath)) {
854      FileStatus[] files = fs.listStatus(mobDirPath);
855      for (FileStatus file : files) {
856        if (HFileLink.isHFileLink(file.getPath())) {
857          count++;
858        }
859      }
860    }
861    return count;
862  }
863
864  /**
865   * Gets the number of files.
866   * @param size the size of the file
867   * @param tableName the current table name
868   * @param familyName the family name
869   * @return the number of files large than the size
870   */
871  private int countLargeFiles(int size, TableName tableName, String familyName) throws IOException {
872    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
873    int count = 0;
874    if (fs.exists(mobDirPath)) {
875      FileStatus[] files = fs.listStatus(mobDirPath);
876      for (FileStatus file : files) {
877        // ignore the del files in the mob path
878        if ((!StoreFileInfo.isDelFile(file.getPath())) && (file.getLen() > size)) {
879          count++;
880        }
881      }
882    }
883    return count;
884  }
885
886  /**
887   * loads some data to the table.
888   */
889  private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum,
890    int rowNumPerFile) throws IOException, InterruptedException {
891    if (fileNum <= 0) {
892      throw new IllegalArgumentException();
893    }
894    for (int i = 0; i < fileNum * rowNumPerFile; i++) {
895      for (byte k0 : KEYS) {
896        byte[] k = new byte[] { k0 };
897        byte[] key = Bytes.add(k, Bytes.toBytes(i));
898        byte[] mobVal = makeDummyData(10 * (i + 1));
899        Put put = new Put(key);
900        put.setDurability(Durability.SKIP_WAL);
901        put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
902        put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
903        put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
904        table.mutate(put);
905      }
906      if ((i + 1) % rowNumPerFile == 0) {
907        table.flush();
908        admin.flush(tableName);
909      }
910    }
911  }
912
913  private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts)
914    throws IOException {
915    table.mutate(Arrays.asList(puts));
916    table.flush();
917    admin.flush(tableName);
918  }
919
920  private void loadDataForPartitionPolicy(Admin admin, BufferedMutator table, TableName tableName)
921      throws IOException {
922
923    Put[] pArray = new Put[1000];
924
925    for (int i = 0; i < 1000; i ++) {
926      Put put0 = new Put(Bytes.toBytes("r0" + i));
927      put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151130Monday, Bytes.toBytes(mobValue0));
928      pArray[i] = put0;
929    }
930    loadData(admin, bufMut, tableName, pArray);
931
932    Put put06 = new Put(mobKey06);
933    put06.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151128Saturday, Bytes.toBytes(mobValue0));
934
935    loadData(admin, bufMut, tableName, new Put[] { put06 });
936
937    Put put1 = new Put(mobKey1);
938    put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151201Tuesday,
939        Bytes.toBytes(mobValue1));
940    loadData(admin, bufMut, tableName, new Put[] { put1 });
941
942    Put put2 = new Put(mobKey2);
943    put2.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151205Saturday,
944        Bytes.toBytes(mobValue2));
945    loadData(admin, bufMut, tableName, new Put[] { put2 });
946
947    Put put3 = new Put(mobKey3);
948    put3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151228Monday,
949        Bytes.toBytes(mobValue3));
950    loadData(admin, bufMut, tableName, new Put[] { put3 });
951
952    Put put4 = new Put(mobKey4);
953    put4.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151231Thursday,
954        Bytes.toBytes(mobValue4));
955    loadData(admin, bufMut, tableName, new Put[] { put4 });
956
957    Put put5 = new Put(mobKey5);
958    put5.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160101Friday,
959        Bytes.toBytes(mobValue5));
960    loadData(admin, bufMut, tableName, new Put[] { put5 });
961
962    Put put6 = new Put(mobKey6);
963    put6.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160103Sunday,
964        Bytes.toBytes(mobValue6));
965    loadData(admin, bufMut, tableName, new Put[] { put6 });
966
967    Put put7 = new Put(mobKey7);
968    put7.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20150907Monday,
969        Bytes.toBytes(mobValue7));
970    loadData(admin, bufMut, tableName, new Put[] { put7 });
971
972    Put put8 = new Put(mobKey8);
973    put8.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151120Sunday,
974        Bytes.toBytes(mobValue8));
975    loadData(admin, bufMut, tableName, new Put[] { put8 });
976  }
977
978
979  /**
980   * delete the row, family and cell to create the del file
981   */
982  private void createDelFile(Table table, TableName tableName, byte[] family, byte[] qf)
983    throws IOException, InterruptedException {
984    for (byte k0 : KEYS) {
985      byte[] k = new byte[] { k0 };
986      // delete a family
987      byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
988      Delete delete1 = new Delete(key1);
989      delete1.addFamily(family);
990      table.delete(delete1);
991      // delete one row
992      byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
993      Delete delete2 = new Delete(key2);
994      table.delete(delete2);
995      // delete one cell
996      byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
997      Delete delete3 = new Delete(key3);
998      delete3.addColumn(family, qf);
999      table.delete(delete3);
1000    }
1001    admin.flush(tableName);
1002    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
1003    for (HRegion region : regions) {
1004      region.waitForFlushesAndCompactions();
1005      region.compact(true);
1006    }
1007  }
1008  /**
1009   * Creates the dummy data with a specific size.
1010   * @param size the size of value
1011   * @return the dummy data
1012   */
1013  private byte[] makeDummyData(int size) {
1014    byte[] dummyData = new byte[size];
1015    new Random().nextBytes(dummyData);
1016    return dummyData;
1017  }
1018
1019  /**
1020   * Gets the split keys
1021   */
1022  private byte[][] getSplitKeys() {
1023    byte[][] splitKeys = new byte[KEYS.length - 1][];
1024    for (int i = 0; i < splitKeys.length; ++i) {
1025      splitKeys[i] = new byte[] { KEYS[i + 1] };
1026    }
1027    return splitKeys;
1028  }
1029
1030  private static ExecutorService createThreadPool(Configuration conf) {
1031    int maxThreads = 10;
1032    long keepAliveTime = 60;
1033    final SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
1034    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
1035        keepAliveTime, TimeUnit.SECONDS, queue,
1036        Threads.newDaemonThreadFactory("MobFileCompactionChore"),
1037        new RejectedExecutionHandler() {
1038          @Override
1039          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
1040            try {
1041              // waiting for a thread to pick up instead of throwing exceptions.
1042              queue.put(r);
1043            } catch (InterruptedException e) {
1044              throw new RejectedExecutionException(e);
1045            }
1046          }
1047        });
1048    pool.allowCoreThreadTimeOut(true);
1049    return pool;
1050  }
1051
1052  private void assertRefFileNameEqual(String familyName) throws IOException {
1053    Scan scan = new Scan();
1054    scan.addFamily(Bytes.toBytes(familyName));
1055    // Do not retrieve the mob data when scanning
1056    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
1057    ResultScanner results = table.getScanner(scan);
1058    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
1059        tableName, familyName);
1060    List<Path> actualFilePaths = new ArrayList<>();
1061    List<Path> expectFilePaths = new ArrayList<>();
1062    for (Result res : results) {
1063      for (Cell cell : res.listCells()) {
1064        byte[] referenceValue = CellUtil.cloneValue(cell);
1065        String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
1066            referenceValue.length - Bytes.SIZEOF_INT);
1067        Path targetPath = new Path(mobFamilyPath, fileName);
1068        if(!actualFilePaths.contains(targetPath)) {
1069          actualFilePaths.add(targetPath);
1070        }
1071      }
1072    }
1073    results.close();
1074    if (fs.exists(mobFamilyPath)) {
1075      FileStatus[] files = fs.listStatus(mobFamilyPath);
1076      for (FileStatus file : files) {
1077        if (!StoreFileInfo.isDelFile(file.getPath())) {
1078          expectFilePaths.add(file.getPath());
1079        }
1080      }
1081    }
1082    Collections.sort(actualFilePaths);
1083    Collections.sort(expectFilePaths);
1084    assertEquals(expectFilePaths, actualFilePaths);
1085  }
1086
1087  /**
1088   * Resets the configuration.
1089   */
1090  private void resetConf() {
1091    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
1092      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
1093    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
1094      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
1095  }
1096
1097  /**
1098   * Verify mob partition policy compaction values.
1099   */
1100  private void verifyPolicyValues() throws Exception {
1101    Get get = new Get(mobKey01);
1102    Result result = table.get(get);
1103    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1104        Bytes.toBytes(mobValue0)));
1105
1106    get = new Get(mobKey02);
1107    result = table.get(get);
1108    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1109        Bytes.toBytes(mobValue0)));
1110
1111    get = new Get(mobKey03);
1112    result = table.get(get);
1113    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1114        Bytes.toBytes(mobValue0)));
1115
1116    get = new Get(mobKey04);
1117    result = table.get(get);
1118    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1119        Bytes.toBytes(mobValue0)));
1120
1121    get = new Get(mobKey05);
1122    result = table.get(get);
1123    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1124        Bytes.toBytes(mobValue0)));
1125
1126    get = new Get(mobKey06);
1127    result = table.get(get);
1128    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1129        Bytes.toBytes(mobValue0)));
1130
1131    get = new Get(mobKey1);
1132    result = table.get(get);
1133    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1134        Bytes.toBytes(mobValue1)));
1135
1136    get = new Get(mobKey2);
1137    result = table.get(get);
1138    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1139        Bytes.toBytes(mobValue2)));
1140
1141    get = new Get(mobKey3);
1142    result = table.get(get);
1143    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1144        Bytes.toBytes(mobValue3)));
1145
1146    get = new Get(mobKey4);
1147    result = table.get(get);
1148    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1149        Bytes.toBytes(mobValue4)));
1150
1151    get = new Get(mobKey5);
1152    result = table.get(get);
1153    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1154        Bytes.toBytes(mobValue5)));
1155
1156    get = new Get(mobKey6);
1157    result = table.get(get);
1158    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1159        Bytes.toBytes(mobValue6)));
1160
1161    get = new Get(mobKey7);
1162    result = table.get(get);
1163    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1164        Bytes.toBytes(mobValue7)));
1165
1166    get = new Get(mobKey8);
1167    result = table.get(get);
1168    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
1169        Bytes.toBytes(mobValue8)));
1170  }
1171
1172  private void commonPolicyTestLogic (final String tableNameAsString,
1173      final MobCompactPartitionPolicy pType, final boolean majorCompact,
1174      final int expectedFileNumbers, final String[] expectedFileNames,
1175      final boolean setupAndLoadData
1176      ) throws Exception {
1177    if (setupAndLoadData) {
1178      setUpForPolicyTest(tableNameAsString, pType);
1179
1180      loadDataForPartitionPolicy(admin, bufMut, tableName);
1181    } else {
1182      alterForPolicyTest(pType);
1183    }
1184
1185    if (majorCompact) {
1186      admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
1187    } else {
1188      admin.compact(tableName, hcd1.getName(), CompactType.MOB);
1189    }
1190
1191    waitUntilMobCompactionFinished(tableName);
1192
1193    // Run cleaner to make sure that files in archive directory are cleaned up
1194    TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
1195
1196    //check the number of files
1197    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, family1);
1198    FileStatus[] fileList = fs.listStatus(mobDirPath);
1199
1200    assertTrue(fileList.length == expectedFileNumbers);
1201
1202    // the file names are expected
1203    ArrayList<String> fileNames = new ArrayList<>(expectedFileNumbers);
1204    for (FileStatus file : fileList) {
1205      fileNames.add(MobFileName.getDateFromName(file.getPath().getName()));
1206    }
1207    int index = 0;
1208    for (String fileName : expectedFileNames) {
1209      index = fileNames.indexOf(fileName);
1210      assertTrue(index >= 0);
1211      fileNames.remove(index);
1212    }
1213
1214    // Check daily mob files are removed from the mobdir, and only weekly mob files are there.
1215    // Also check that there is no data loss.
1216
1217    verifyPolicyValues();
1218  }
1219 }