Solr DIH 源码解析

DataImportHandler.handleRequestBody()中的importer.runCmd(requestParams, sw)

if (DataImporter.FULL_IMPORT_CMD.equals(command)
|| DataImporter.DELTA_IMPORT_CMD.equals(command) ||
IMPORT_CMD.equals(command)) {
importer.maybeReloadConfiguration(requestParams, defaultParams);
UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
SolrResourceLoader loader = req.getCore().getResourceLoader();
DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);//创建DIHWriter if (requestParams.isDebug()) {
if (debugEnabled) {
// Synchronous request for the debug mode
importer.runCmd(requestParams, sw);
rsp.add("mode", "debug");
rsp.add("documents", requestParams.getDebugInfo().debugDocuments);
if (requestParams.getDebugInfo().debugVerboseOutput != null) {
rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);
}
} else {
message = DataImporter.MSG.DEBUG_NOT_ENABLED;
}
} else {
// Asynchronous request for normal mode
if(requestParams.getContentStream() == null && !requestParams.isSyncMode()){
importer.runAsync(requestParams, sw);
} else {
importer.runCmd(requestParams, sw);
}

这里会先创建DIHWriter

DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);//创建DIHWriter

getSolrWriter在DataImportHandler的实现,是实例化了一个SolrWriter

  private DIHWriter getSolrWriter(final UpdateRequestProcessor processor,
final SolrResourceLoader loader, final RequestInfo requestParams,
SolrQueryRequest req) {
SolrParams reqParams = req.getParams();
String writerClassStr = null;
if (reqParams != null && reqParams.get(PARAM_WRITER_IMPL) != null) {
writerClassStr = (String) reqParams.get(PARAM_WRITER_IMPL);
}
DIHWriter writer;
if (writerClassStr != null
&& !writerClassStr.equals(DEFAULT_WRITER_NAME)
&& !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "."
+ DEFAULT_WRITER_NAME)) {
try {
@SuppressWarnings("unchecked")
Class<DIHWriter> writerClass = DocBuilder.loadClass(writerClassStr, req.getCore());
Constructor<DIHWriter> cnstr = writerClass.getConstructor(new Class[] {
UpdateRequestProcessor.class, SolrQueryRequest.class});
return cnstr.newInstance((Object) processor, (Object) req);
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to load Writer implementation:" + writerClassStr, e);
}
} else {
return new SolrWriter(processor, req) {
@Override
public boolean upload(SolrInputDocument document) {
try {
return super.upload(document);//called by DocBuilder.buildDocument
} catch (RuntimeException e) {
LOG.error("Exception while adding: " + document, e);
return false;
}
}
};
}
}

DataImporter中runCmd()方法

if (!importLock.tryLock()){
LOG.warn("Import command failed . another import is running");
return;
}
try {
if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
doFullImport(sw, reqParams);
} else if (command.equals(DELTA_IMPORT_CMD)) {
doDeltaImport(sw, reqParams);
}
} finally {
importLock.unlock();
}

这里importLock是独占锁

private ReentrantLock importLock = new ReentrantLock();

主要逻辑自然就是doFullImport()和doDeltaImport()中了,这里全量,增量这两都差不多

public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
LOG.info("Starting Delta Import");
setStatus(Status.RUNNING_DELTA_DUMP);
try {
DIHProperties dihPropWriter = createPropertyWriter();
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
checkWritablePersistFile(writer, dihPropWriter);
docBuilder.execute();
if (!requestParams.isDebug())
cumulativeStatistics.add(docBuilder.importStatistics);
} catch (Exception e) {
LOG.error("Delta Import Failed", e);
docBuilder.handleError("Delta Import Failed", e);
} finally {
setStatus(Status.IDLE);
DocBuilder.INSTANCE.set(null);
} } public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
LOG.info("Starting Full Import");
setStatus(Status.RUNNING_FULL_DUMP);
try {
DIHProperties dihPropWriter = createPropertyWriter();
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
checkWritablePersistFile(writer, dihPropWriter);
docBuilder.execute();
if (!requestParams.isDebug())
cumulativeStatistics.add(docBuilder.importStatistics);
} catch (Exception e) {
SolrException.log(LOG, "Full Import failed", e);
docBuilder.handleError("Full Import failed", e);
} finally {
setStatus(Status.IDLE);
DocBuilder.INSTANCE.set(null);
} }

都是实例化一个DocBuilder,然后调用起execute()方法,在execute()中调用doDelta()或者doFullDump,doFullDump为例

private void doFullDump() {
addStatusMessage("Full Dump Started");
buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
}

buildDocument的调用

buildDocument-->DocBuilder.buildDocument-->buildDocument-->boolean result = writer.upload(doc)

  private void buildDocument(VariableResolver vr, DocWrapper doc,
Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy)

中会实例化ContextImpl,初始化EntityProcessorWrapper,调用

Map<String, Object> arow = epw.nextRow();

获取数据,最后调用SolrWriter的upload()

实例化ContextImpl

ContextImpl ctx = new ContextImpl(epw, vr, null,
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);

初始化EntityProcessorWrapper,这里最后SqlEntityProcessor在中获取dataSource

public void init(Context context) {
rowcache = null;
this.context = context;
resolver = (VariableResolver) context.getVariableResolver();
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) onError = ABORT;
entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
}
delegate.init(context);//SqlEntityProcessor }

在SqlEntityProcessor中getDataSource()的调用

ContextImpl.getDataSource()-->dataImporter.getDataSourceInstance-->JdbcDataSource.init-->createConnectionFactory

最后在createConnectionFactory中开到了url,driver,中算出现这货了。

看看 epw.nextRow();是怎么获取数据的

   arow = delegate.nextRow();

nextRow里边主要两个方法initQuery()与getNext()

@Override
public Map<String, Object> nextRow() {
if (rowIterator == null) {
String q = getQuery();
initQuery(context.replaceTokens(q));
}
return getNext();
}

initQuery()

try {
DataImporter.QUERY_COUNT.get().incrementAndGet();
rowIterator = dataSource.getData(q);
this.query = q;

在会实例化一个ResultSetIterator

public Iterator<Map<String, Object>> getData(String query) {
ResultSetIterator r = new ResultSetIterator(query);
return r.getIterator();
}

这里ResultSetIterator才是执行sql的好么,orz

Connection c = getConnection();
stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(batchSize);
stmt.setMaxRows(maxRows);
LOG.debug("Executing SQL: " + query);
long start = System.nanoTime();
if (stmt.execute(query)) {
resultSet = stmt.getResultSet();
}

这里注意batchSize的设定,最好配置为batchSize=-1,具体参考正确使用MySQL JDBC setFetchSize()方法解决JDBC处理大结果集 java.lang.OutOfMemoryEr

batchSize 的初始值为FETCH_SIZE=500

 private int batchSize = FETCH_SIZE;

配置的batchsize的赋值为

  String bsz = initProps.getProperty("batchSize");
if (bsz != null) {
bsz = context.replaceTokens(bsz);
try {
batchSize = Integer.parseInt(bsz);
if (batchSize == -1)
batchSize = Integer.MIN_VALUE;
} catch (NumberFormatException e) {
LOG.warn("Invalid batch size: " + bsz);
}
}

由于获取数据时stmt配置为

Connection c = getConnection();
stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(batchSize);
stmt.setMaxRows(maxRows);

这种方式下为何避免OOM需要设定FetchSize为Integer.MIN_VALUE; 参考正确使用MySQL JDBC setFetchSize()方法解决JDBC处理大结果集 java.lang.OutOfMemoryEr

最后返回一个rSetIterator给后面(getNext())用,看看getNext()中怎么使用的

if (rowIterator.hasNext())
return rowIterator.next();

这里如果rowIterator不为null就会调用next()方法,rSetIterator中next()的实现

public Map<String, Object> next() {
return getARow();
}

getARow()的实现,这里不就是从resultSet中获取数据包装成Map么,soga


private Map<String, Object> getARow() {
if (resultSet == null)
return null;
Map<String, Object> result = new HashMap<>();
for (String colName : colNames) {
try {
if (!convertType) {
// Use underlying database's type information except for BigDecimal and BigInteger
// which cannot be serialized by JavaBin/XML. See SOLR-6165
Object value = resultSet.getObject(colName);
if (value instanceof BigDecimal || value instanceof BigInteger) {
result.put(colName, value.toString());
} else {
result.put(colName, value);
}
continue;
} Integer type = fieldNameVsType.get(colName);
if (type == null)
type = Types.VARCHAR;
switch (type) {
case Types.INTEGER:
result.put(colName, resultSet.getInt(colName));
break;
case Types.FLOAT:
result.put(colName, resultSet.getFloat(colName));
break;
case Types.BIGINT:
result.put(colName, resultSet.getLong(colName));
break;
case Types.DOUBLE:
result.put(colName, resultSet.getDouble(colName));
break;
case Types.DATE:
result.put(colName, resultSet.getTimestamp(colName));
break;
case Types.BOOLEAN:
result.put(colName, resultSet.getBoolean(colName));
break;
case Types.BLOB:
result.put(colName, resultSet.getBytes(colName));
break;
default:
result.put(colName, resultSet.getString(colName));
break;
}
} catch (SQLException e) {
logError("Error reading data ", e);
wrapAndThrow(SEVERE, e, "Error reading data from database");
}
}
return result;
}

再回来看看upload

 boolean result = writer.upload(doc);

SolrWriter的upload()方法

@Override
public boolean upload(SolrInputDocument d) {
try {
AddUpdateCommand command = new AddUpdateCommand(req);
command.solrDoc = d;
command.commitWithin = commitWithin;
processor.processAdd(command);
} catch (Exception e) {
log.warn("Error creating document : " + d, e);
return false;
} return true;
}

这里SolrWriter的processor又是来自processor链chain,处理就跟solr的一样了,如RunUpdateProcessor,LogUpdateProcessor

UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
SolrResourceLoader loader = req.getCore().getResourceLoader();
DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
04-15 20:00