lucene创建索引的原理
IndexWriter的addDocument方法详解
今天看了IndexWriter类的addDocument方法,IndexWriter对此方法的说明如下:
将文档添加到此索引。
请注意,如果遇到异常(例如磁盘已满),则索引将保持一致,但可能尚未添加此文档。
此外,即使使用复合文件(当合并部分成功时),索引也可能有一个非复合格式的段。
此方法定期将待处理文档刷新到目录(见上文),并且还根据正在使用的 MergePolicy 定期触发索引中的段合并。
合并暂时占用目录中的空间。当没有针对索引打开阅读器/搜索器时,所需的空间量最多是所有要合并的段大小的 1 倍,当针对索引打开阅读器/搜索器时,需要合并的所有段的大小最多是 2 倍(请参阅forceMerge(int) 了解详细信息)。执行的原始合并操作的顺序由合并策略控制。
请注意,文档中的每个术语不能超过 MAX_TERM_LENGTH(以字节为单位),否则将引发 IllegalArgumentException。
源码分析
ps: 源码版本 v7.2.1
addDocument 实现如下:
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
return this.updateDocument((Term)null, doc);
}
对应的是 update 方法:
//通过首先删除包含术语的文档然后添加新文档来更新文档。
//删除然后添加是原子的,正如读者在同一索引上看到的那样(刷新可能仅在添加之后发生)。
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
// 确认 IndexWriter 处于 open 状态
ensureOpen();
try {
boolean success = false;
try {
// 更新,并且返回对应的 seqNo
long seqNo = docWriter.updateDocument(doc, analyzer, term);
if (seqNo < 0) {
seqNo = - seqNo;
processEvents(true, false);
}
success = true;
return seqNo;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception updating document");
}
}
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
// dead code but javac disagrees:
return -1;
}
}
通过注释可知:updateDocument是先从索引中删除包含相同term的document然后重新添加document到索引中;
其中 long seqNo = docWriter.updateDocument(doc, analyzer, term);
是核心代码,实现如下:
此操作需要确保IndexWriter没有被关闭,其实现是先有DocumentsWriter类的updateDocument方法判断,这里先判断将根据term找到对应的document,并先放到待删除的document队列中,然后从队列中读取document,再将要flush的documents写入磁盘,同时更新flush队列中的索引状态;
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen();
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
// 返回此 {@link DocumentsWriterPerThread} 中 RAM 常驻文档的数量
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocument(doc, analyzer, delTerm);
} catch (AbortingException ae) {
flushControl.doOnAbort(perThread);
dwpt.abort();
throw ae;
} finally {
// We don't know whether the document actually
// counted as being indexed, so we must subtract here to
// accumulate our separate counter:
// 这里是 Atomic 的原子操作
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally {
perThreadPool.release(perThread);
}
if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}
return seqNo;
}
在此期间有一个ThreadState类型的读写锁,lucene判断ThreadState的状态,如果此锁被激活,从内存中获取document并更新到索引文件且重置内存中索引的数量和状态,最后释放相关的资源。
我们静下心来一点点看:
- preUpdate 更新之前
这里实际上会做一下循环等待,直到 flush 对应的等待线程,和排队的数据全部处理完成。
private boolean preUpdate() throws IOException, AbortingException {
ensureOpen();
boolean hasEvents = false;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
// 帮助刷新任何排队的 DWPT,以便我们可以取消停止
do {
// Try pick up pending threads here if possible
// 如果可能,请尝试在此处提取待处理的线程
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
// 不要在此处推送删除,因为更新可能会失败!
hasEvents |= doFlush(flushingDWPT);
}
flushControl.waitIfStalled(); // block if stalled
} while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
}
return hasEvents;
}
- flushControl.obtainAndLock() 获取锁
ThreadState obtainAndLock() {
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), documentsWriter);
boolean success = false;
try {
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
// 有一个flush-all正在进行,这个DWPT现在已经过时了——注册它进行flush并尝试另一个DWPT:
addFlushableState(perThread);
}
success = true;
// simply return the ThreadState even in a flush all case sine we already hold the lock
// 只需在所有情况下都返回 ThreadState,因为我们已经持有锁
return perThread;
} finally {
if (!success) { // make sure we unlock if this fails
perThreadPool.release(perThread);
}
}
}
- dwpt.updateDocument(doc, analyzer, delTerm); 核心的更新逻辑
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
int docCount = 0;
boolean allDocsIndexed = false;
try {
// 迭代所有的 Field
for(Iterable<? extends IndexableField> doc : docs) {
// Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point.
// Aborting exceptions will actually "lose" more than one
// document, so the counter will be "wrong" in that case, but
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
reserveOneDoc();
docState.doc = doc;
docState.docID = numDocsInRAM;
docCount++;
boolean success = false;
try {
consumer.processDocument();
success = true;
} finally {
if (!success) {
// Incr here because finishDocument will not
// be called (because an exc is being thrown):
numDocsInRAM++;
}
}
// 这里为什么使用 ++ 呢?++并不是一个原子操作。
numDocsInRAM++;
}
allDocsIndexed = true;
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
long seqNo;
if (delTerm != null) {
seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
} else {
deleteSlice.reset();
}
}
return seqNo;
} finally {
if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting
// go and mark all docs from this block as deleted
int docID = numDocsInRAM-1;
final int endDocID = docID - docCount;
while (docID > endDocID) {
deleteDocID(docID);
docID--;
}
}
docState.clear();
}
}
索引创建之多线程优化
前面了解到lucene在索引创建的时候一个IndexWriter获取到一个读写锁,这样势在lucene创建大数据量的索引的时候,执行效率低下的问题;
lucene索引的建立,跟以下几点关联很大;
1)磁盘空间大小,这个直接影响索引的建立,甚至会造成索引写入提示完成,但是没有同步的问题;
2)索引合并策略的选择,这个类似于sql里边的批量操作,批量操作的数量过多直接影响执行效率,对于lucene来讲,索引合并前是将document放在内存中,因此选择合适的合并策略也可以提升索引的效率;
3)唯一索引对应的term的选择,lucene索引的创建过程中是先从索引中删除包含相同term的document然后重新添加document到索引中,这里如果term对应的document过多,会占用磁盘IO,同时造成IndexWriter的写锁占用时间延长,相应的执行效率低下
综上所述,索引优化要保证磁盘空间,同时在term选择上可以以ID等标识来确保唯一性,这样第一条和第三条的风险就规避了;
本文旨在对合并策略和采用多线程创建的方式提高索引的效率;
多线程创建索引,我这边还设计了多目录索引创建,这样避免了同一目录数据量过大索引块合并和索引块重新申请;
废话不多说,这里附上代码,代码示例是读取lucene官网下载并解压的文件夹并给文件信息索引起来
代码
核心实现
- FileBean.java
首先定义FileBean来存储文件信息
public class FileBean {
//路径
private String path;
//修改时间
private Long modified;
//内容
private String content;
public String getPath() {
return path;
}
//Getter & Setter & ToString
}
- BaseIndex.java
核心实现的父类。
import org.apache.lucene.index.IndexWriter;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author binbin.hou
* @since 1.0.0
*/
public abstract class BaseIndex<T> implements Runnable{
/**
* 索引编写器
*/
private IndexWriter writer;
/**
* 主线程
*/
private final CountDownLatch countDownLatch1;
/**
*工作线程
*/
private final CountDownLatch countDownLatch2;
/**
* 对象列表
*/
private List<T> list;
public BaseIndex(IndexWriter writer,CountDownLatch countDownLatch1, CountDownLatch countDownLatch2,
List<T> list){
super();
this.writer = writer;
this.countDownLatch1 = countDownLatch1;
this.countDownLatch2 = countDownLatch2;
this.list = list;
}
public BaseIndex(String parentIndexPath, int subIndex,
CountDownLatch countDownLatch1, CountDownLatch countDownLatch2,
List<T> list) {
super();
/**
* 父级索引路径
*/
int subIndex1 = subIndex;
try {
//多目录索引创建
File file = new File(parentIndexPath+"/index"+subIndex);
if(!file.exists()){
file.mkdir();
}
this.writer = IndexUtil.getIndexWriter(parentIndexPath+"/index"+subIndex, true);
} catch (IOException e) {
e.printStackTrace();
};
this.countDownLatch1 = countDownLatch1;
this.countDownLatch2 = countDownLatch2;
this.list = list;
}
public BaseIndex(String path,CountDownLatch countDownLatch1, CountDownLatch countDownLatch2,
List<T> list) {
super();
try {
//单目录索引创建
File file = new File(path);
if(!file.exists()){
file.mkdir();
}
this.writer = IndexUtil.getIndexWriter(path,true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
};
this.countDownLatch1 = countDownLatch1;
this.countDownLatch2 = countDownLatch2;
this.list = list;
}
/**创建索引
* @param writer
* @throws Exception
*/
public abstract void indexDoc(IndexWriter writer,T t) throws Exception;
/**批量索引创建
* @param writer
* @param t
* @throws Exception
*/
public void indexDocs(IndexWriter writer,List<T> t) throws Exception{
for (T t2 : t) {
indexDoc(writer,t2);
}
}
@Override
public void run() {
try {
countDownLatch1.await();
System.out.println(writer);
indexDocs(writer,list);
} catch (Exception e) {
e.printStackTrace();
} finally{
// 数量-1
countDownLatch2.countDown();
try {
writer.commit();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- FileBeanIndex.java
核心实现类:
import org.apache.lucene.document.*;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class FileBeanIndex extends BaseIndex<FileBean> {
public FileBeanIndex(IndexWriter writer, CountDownLatch countDownLatch1,
CountDownLatch countDownLatch2, List<FileBean> list) {
super(writer, countDownLatch1, countDownLatch2, list);
}
public FileBeanIndex(String parentIndexPath, int subIndex, CountDownLatch countDownLatch1,
CountDownLatch countDownLatch2, List<FileBean> list) {
super(parentIndexPath, subIndex, countDownLatch1, countDownLatch2, list);
}
@Override
public void indexDoc(IndexWriter writer, FileBean t) throws Exception {
Document doc = new Document();
System.out.println(t.getPath());
doc.add(new StringField("path", t.getPath(), Field.Store.YES));
doc.add(new LongPoint("modified", t.getModified()));
doc.add(new TextField("content", t.getContent(), Field.Store.YES));
if (writer.getConfig().getOpenMode() == IndexWriterConfig.OpenMode.CREATE){
writer.addDocument(doc);
}else{
writer.updateDocument(new Term("path", t.getPath()), doc);
}
}
}
工具类
涉及到两个工具类如下:
- IndexUtil.java
根据指定的路径,创建对应的 IndexWriter
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import java.io.IOException;
import java.nio.file.Paths;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class IndexUtil {
/**创建索引写入器
* @param indexPath
* @param create
* @return
* @throws IOException
*/
public static IndexWriter getIndexWriter(String indexPath,boolean create) throws IOException {
Directory dir = FSDirectory.open(Paths.get(indexPath));
Analyzer analyzer = new StandardAnalyzer();
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
LogMergePolicy mergePolicy = new LogByteSizeMergePolicy();
//设置segment添加文档(Document)时的合并频率 //值较小,建立索引的速度就较慢 //值较大,建立索引的速度就较快,>10适合批量建立索引
mergePolicy.setMergeFactor(50);
//设置segment最大合并文档(Document)数
//值较小有利于追加索引的速度
//值较大,适合批量建立索引和更快的搜索
mergePolicy.setMaxMergeDocs(5000);
if (create){
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
}else {
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
}
return new IndexWriter(dir, iwc);
}
}
- LuceneFileUtil.java
获取指定路径下的所有文件信息。
package com.github.houbb.lucene.learn.chap04;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LuceneFileUtil {
/**读取文件信息和下属文件夹
* @param folder 文件夹
* @return 文件信息
* @throws IOException 异常
*/
public static List<FileBean> getFolderFiles(String folder) throws IOException {
List<FileBean> fileBeans = new LinkedList<FileBean>();
File file = new File(folder);
if(file.isDirectory()){
File[] files = file.listFiles();
if(files != null){
for (File file2 : files) {
fileBeans.addAll(getFolderFiles(file2.getAbsolutePath()));
}
}
}else{
FileBean bean = new FileBean();
bean.setPath(file.getAbsolutePath());
bean.setModified(file.lastModified());
bean.setContent(new String(Files.readAllBytes(Paths.get(folder))));
fileBeans.add(bean);
}
return fileBeans;
}
}
测试代码
测试实现如下:
public static void main(String[] args) {
try {
//1. 获取指定文件夹下所有文件信息
List<FileBean> fileBeans = LuceneFileUtil.getFolderFiles("D:\\gitee2\\lucene-learn");
//2. 根据数量,构建对应的主线程+工作线程池数量
int totalCount = fileBeans.size();
int perThreadCount = 100;
System.out.println("查询到的数据总数是"+fileBeans.size());
int threadCount = totalCount/perThreadCount + (totalCount%perThreadCount == 0 ? 0 : 1);
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
CountDownLatch countDownLatch1 = new CountDownLatch(1);
CountDownLatch countDownLatch2 = new CountDownLatch(threadCount);
System.out.println(fileBeans.size());
//3. 通过 subList,多线程执行
for(int i = 0; i < threadCount; i++) {
int start = i*perThreadCount;
int end = Math.min((i + 1) * perThreadCount, totalCount);
List<FileBean> subList = fileBeans.subList(start, end);
Runnable runnable = new FileBeanIndex("index",i, countDownLatch1, countDownLatch2, subList);
//子线程交给线程池管理
pool.execute(runnable);
}
//4.1 主线程 countDown,此时子线程可以 await 将被唤醒。(子线程等待也可以移除)
countDownLatch1.countDown();
System.out.println("开始创建索引");
//4.2 等待所有子线程都完成,每一个子线程都会进行一次 countDown
countDownLatch2.await();
//线程全部完成工作
System.out.println("所有线程都创建索引完毕");
//释放线程池资源
pool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}