solr DIH 知识梳理

web.xml中listener配置

<listener>
<listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>
</listener>

配置文件dataimport.properties

#################################################
# #
# dataimport scheduler properties #
# #
################################################# # to sync or not to sync
# 1 - active; anything else - inactive
syncEnabled=1 # which cores to schedule
# in a multi-core environment you can decide which cores you want syncronized
# leave empty or comment it out if using single-core deployment
syncCores=core1,core2 # solr server name or IP address
# [defaults to localhost if empty]
server=localhost # solr server port
# [defaults to 80 if empty]
port=8080 # application name/context
# [defaults to current ServletContextListener's context (app) name]
webapp=solr # URL params [mandatory]
# remainder of URL
params=/dataimport?command=delta-import&clean=false&commit=true # schedule interval
# number of minutes between two runs
# [defaults to 30 if empty]
interval=1 # 重做索引的时间间隔,单位分钟,默认7200,即5天;
# 为空,为0,或者注释掉:表示永不重做索引
reBuildIndexInterval=7200 # 重做索引的参数
reBuildIndexParams=/dataimport?command=full-import&clean=true&commit=true # 重做索引时间间隔的计时开始时间,第一次真正执行的时间=reBuildIndexBeginTime+reBuildIndexInterval*60*1000;
# 两种格式:2012-04-11 03:10:00 或者 03:10:00,后一种会自动补全日期部分为服务启动时的日期
reBuildIndexBeginTime=03:10:00
  • interval增量索引的频率,每隔interval分钟就启动一次task
 timer.scheduleAtFixedRate(task, startTime, 60000 * interval);
  • 关于reBuildIndexBeginTime,这里表现为fullImportStartTime
  • 增量更新的请求参数params=/dataimport?command=delta-import&clean=false&commit=true
fullImportTimer.scheduleAtFixedRate(fullImportTask, fullImportStartTime, 60000 * reBuildIndexInterval);

data-config.xml配置

  • query是获取全部数据的SQL(solr从sql中获取那些数据),多列

  • deltaImportQuery是获取增量数据时使用的SQL(数据库新增数据追加到solr的数据),多列

  • deltaQuery是获取pk的SQL(数据库新增数据是,追加到solr的数据时的条件,根据id

    ,条件是最后一次获取的时间,${dataimporter.last_index_time,最后获取的时间}),一列

  • 这个是在增量时使用的修改语句,其中需要注意的是dataimporter.delta这个前缀一定要带

    pk,根据我艰苦卓绝的跟踪代码知道这个pk其实作用只是用来对deltaQuery查询出来的内容放入到一个map中的时候作为key用的

    如果你不想deltaQuery查询出来的结果最后出现混乱,那么最好保证pk是唯一的

document中使用的是自己要被加入到索引中的field
query,被用来做为全量导入的时候使用
deltaImportQuery 这个是在增量时使用的修改语句,其中需要注意的是dataimporter.delta这个前缀一定要带
pk,根据我艰苦卓绝的跟踪代码知道这个pk其实作用只是用来对deltaQuery查询出来的内容放入到一个map中的时候作为key用的
如果你不想deltaQuery查询出来的结果最后出现混乱,那么最好保证pk是唯一的
deltaQuery,这个是用来查询需要被更新的对象的主键,一边deltaImportQuery使用
transformer:很多时候数据库中的字段不能满足你的需要,比如存储了用户生日,那么你需要将他的生肖存储,则此时需要对生日做自己的处理

增量索引

终止跑索引:http://localhost:8080/solr/collection1/dataimport?command=abort
开始索引:http://localhost:8080/solr/collection1/dataimport?command=full-import
增量索引 :http://localhost:8080/solr/collection1/dataimport?command=delta-import

源代码apache-solr-dataimportscheduler-1.4.jar

ApplicationListener

package org.apache.solr.handler.dataimport.scheduler;

import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class ApplicationListener
implements ServletContextListener
{
private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class)
; public void contextDestroyed(ServletContextEvent servletContextEvent)
{
ServletContext servletContext = servletContextEvent.getServletContext(); Timer timer = (Timer)servletContext.getAttribute("timer"); Timer fullImportTimer = (Timer)servletContext
.getAttribute("fullImportTimer"); if (timer != null)
timer.cancel();
if (fullImportTimer != null) {
fullImportTimer.cancel();
} servletContext.removeAttribute("timer");
servletContext.removeAttribute("fullImportTimer");
} public void contextInitialized(ServletContextEvent servletContextEvent)
{
ServletContext servletContext = servletContextEvent.getServletContext();
try
{
Timer timer = new Timer();
//增量索引task
DeltaImportHTTPPostScheduler task = new DeltaImportHTTPPostScheduler(servletContext
.getServletContextName(), timer);
//配置的间隔时间分钟
int interval = task.getIntervalInt(); Calendar calendar = Calendar.getInstance(); calendar.add(12, interval);
Date startTime = calendar.getTime();
//task调度
timer.scheduleAtFixedRate(task, startTime, 60000 * interval); servletContext.setAttribute("timer", timer); Timer fullImportTimer = new Timer();
//全量索引task
FullImportHTTPPostScheduler fullImportTask = new FullImportHTTPPostScheduler(servletContext
.getServletContextName(), fullImportTimer);
//这里重建索引时间
int reBuildIndexInterval = fullImportTask
.getReBuildIndexIntervalInt();
if (reBuildIndexInterval <= 0) {
logger.warn("Full Import Schedule disabled");
return;
} Calendar fullImportCalendar = Calendar.getInstance();
Date beginDate = fullImportTask.getReBuildIndexBeginTime();
fullImportCalendar.setTime(beginDate);
fullImportCalendar.add(12, reBuildIndexInterval);
Date fullImportStartTime = fullImportCalendar.getTime();
//fullImportStartTime这个跟配置文件里的reBuildIndexBeginTime相关,
fullImportTimer.scheduleAtFixedRate(fullImportTask, fullImportStartTime, 60000 * reBuildIndexInterval); servletContext.setAttribute("fullImportTimer", fullImportTimer);
}
catch (Exception e) {
if (e.getMessage().endsWith("disabled"))
logger.warn("Schedule disabled");
else
logger.error("Problem initializing the scheduled task: ", e);
}
}
}

增量,全量索引task都继承自BaseTimerTask,主要都差不多,看BaseTimerTask的prepUrlSendHttpPost就好

增量索引task

package org.apache.solr.handler.dataimport.scheduler;

import java.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class DeltaImportHTTPPostScheduler extends BaseTimerTask
{
private static final Logger logger = LoggerFactory.getLogger(DeltaImportHTTPPostScheduler.class)
; public DeltaImportHTTPPostScheduler(String webAppName, Timer t) throws Exception
{
super(webAppName, t);
logger.info("<index update process> DeltaImportHTTPPostScheduler init");
} public void run()
{
try {
if ((this.server.isEmpty()) || (this.webapp.isEmpty()) || (this.params == null) ||
(this.params
.isEmpty())) {
logger.warn("<index update process> Insuficient info provided for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else if (this.singleCore) {
prepUrlSendHttpPost(this.params);
}
else if ((this.syncCores.length == 0) || ((this.syncCores.length == 1) &&
(this.syncCores[0]
.isEmpty()))) {
logger.warn("<index update process> No cores scheduled for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else {
for (String core : this.syncCores)
prepUrlSendHttpPost(core, this.params);
}
}
catch (Exception e) {
logger.error("Failed to prepare for sendHttpPost", e);
reloadParams();
}
}
}

全量索引task

package org.apache.solr.handler.dataimport.scheduler;

import java.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class FullImportHTTPPostScheduler extends BaseTimerTask
{
private static final Logger logger = LoggerFactory.getLogger(FullImportHTTPPostScheduler.class)
; public FullImportHTTPPostScheduler(String webAppName, Timer t) throws Exception
{
super(webAppName, t);
logger.info("<index update process> DeltaImportHTTPPostScheduler init");
} public void run()
{
try {
if ((this.server.isEmpty()) || (this.webapp.isEmpty()) || (this.reBuildIndexParams == null) ||
(this.reBuildIndexParams
.isEmpty())) {
logger.warn("<index update process> Insuficient info provided for data import, reBuildIndexParams is null");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else if (this.singleCore) {
prepUrlSendHttpPost(this.reBuildIndexParams);
}
else if ((this.syncCores.length == 0) || ((this.syncCores.length == 1) &&
(this.syncCores[0]
.isEmpty()))) {
logger.warn("<index update process> No cores scheduled for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else {
for (String core : this.syncCores)
prepUrlSendHttpPost(core, this.reBuildIndexParams);
}
}
catch (Exception e) {
logger.error("Failed to prepare for sendHttpPost", e);
reloadParams();
}
}
}

BaseTimerTask的,这里主要关注prepUrlSendHttpPost

package org.apache.solr.handler.dataimport.scheduler;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public abstract class BaseTimerTask extends TimerTask
{
protected String syncEnabled;
protected String[] syncCores;
protected String server;
protected String port;
protected String webapp;
protected String params;
protected String interval;
protected String cores;
protected SolrDataImportProperties p;
protected boolean singleCore;
protected String reBuildIndexParams;
protected String reBuildIndexBeginTime;
protected String reBuildIndexInterval;
private String webAppName;
protected static final Logger logger = LoggerFactory.getLogger(BaseTimerTask.class)
; public BaseTimerTask(String webAppName, Timer t) throws Exception
{
this.webAppName = webAppName; this.p = new SolrDataImportProperties();
reloadParams(); if (!this.syncEnabled.equals("1")) {
throw new Exception("Schedule disabled");
}
if ((this.syncCores == null) || ((this.syncCores.length == 1) &&
(this.syncCores[0]
.isEmpty()))) {
this.singleCore = true;
logger.info("<index update process> Single core identified in dataimport.properties");
} else {
this.singleCore = false;
logger.info(new StringBuilder().append("<index update process> Multiple cores identified in dataimport.properties. Sync active for: ").append(this.cores).toString());
}
} protected void reloadParams()
{
this.p.loadProperties(true);
this.syncEnabled = this.p.getProperty("syncEnabled");
this.cores = this.p.getProperty("syncCores");
this.server = this.p.getProperty("server");
this.port = this.p.getProperty("port");
this.webapp = this.p.getProperty("webapp");
this.params = this.p.getProperty("params");
this.interval = this.p.getProperty("interval");
this.syncCores = (this.cores != null ? this.cores.split(",") : null); this.reBuildIndexParams = this.p
.getProperty("reBuildIndexParams"); this.reBuildIndexBeginTime = this.p
.getProperty("reBuildIndexBeginTime"); this.reBuildIndexInterval = this.p
.getProperty("reBuildIndexInterval"); fixParams(this.webAppName);
} protected void fixParams(String webAppName) {
if ((this.server == null) || (this.server.isEmpty()))
this.server = "localhost";
if ((this.port == null) || (this.port.isEmpty()))
this.port = "8080";
if ((this.webapp == null) || (this.webapp.isEmpty()))
this.webapp = webAppName;
if ((this.interval == null) || (this.interval.isEmpty()) || (getIntervalInt() <= 0))
this.interval = "30";
if ((this.reBuildIndexBeginTime == null) || (this.reBuildIndexBeginTime.isEmpty()))
this.interval = "00:00:00";
if ((this.reBuildIndexInterval == null) || (this.reBuildIndexInterval.isEmpty()) ||
(getReBuildIndexIntervalInt() <= 0))
this.reBuildIndexInterval = "0";
} protected void prepUrlSendHttpPost(String params)
{
sendHttpPost(null, params);
} protected void prepUrlSendHttpPost(String coreName, String params)
{
sendHttpPost(coreName, params);
} protected void sendHttpPost(String coreName, String params)
{
DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
Date startTime = new Date(); String core = coreName == null ? "" : new StringBuilder().append("[").append(coreName).append("] ").toString(); logger.info(new StringBuilder().append(core).append("<index update process> Process started at .............. ")
.append(df
.format(startTime))
.toString());
try
{
String completeUrl = buildUrl(coreName, params);
URL url = new URL(completeUrl);
HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestMethod("POST");
conn.setRequestProperty("type", "submit");
conn.setDoOutput(true); conn.connect(); logger.info(new StringBuilder().append(core).append("<index update process> Full URL\t\t\t\t").append(conn.getURL()).toString());
logger.info(new StringBuilder().append(core).append("<index update process> Response message\t\t\t")
.append(conn
.getResponseMessage()).toString());
logger.info(new StringBuilder().append(core).append("<index update process> Response code\t\t\t")
.append(conn
.getResponseCode()).toString()); if (conn.getResponseCode() != 200) {
reloadParams();
} conn.disconnect();
logger.info(new StringBuilder().append(core).append("<index update process> Disconnected from server\t\t").append(this.server).toString());
Date endTime = new Date();
logger.info(new StringBuilder().append(core).append("<index update process> Process ended at ................ ")
.append(df
.format(endTime))
.toString());
} catch (MalformedURLException mue) {
logger.error("Failed to assemble URL for HTTP POST", mue);
} catch (IOException ioe) {
logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
}
catch (Exception e)
{
logger.error("Failed to send HTTP POST", e);
}
} private String buildUrl(String coreName, String params) { StringBuilder sb = new StringBuilder(); sb.append("http://").append(this.server).append(":").append(this.port); if (!this.webapp.startsWith("/"))
sb.append("/");
sb.append(this.webapp); if ((coreName != null) && (!coreName.isEmpty())) {
if (!this.webapp.endsWith("/"))
sb.append("/");
sb.append(coreName);
} if (sb.charAt(sb.length() - 1) == '/') {
if (params.startsWith("/"))
sb.setLength(sb.length() - 1);
}
else if (!params.startsWith("/")) {
sb.append("/");
}
sb.append(params); return sb.toString(); } public int getIntervalInt() {
try {
return Integer.parseInt(this.interval);
} catch (NumberFormatException e) {
logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
} return 30;
} public int getReBuildIndexIntervalInt()
{
try {
return Integer.parseInt(this.reBuildIndexInterval);
} catch (NumberFormatException e) {
logger.info("Unable to convert 'reBuildIndexInterval' to number. do't rebuild index.", e);
} return 0;
} public Date getReBuildIndexBeginTime()
{
Date beginDate = null;
try {
SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
String dateStr = sdfDate.format(new Date());
beginDate = sdfDate.parse(dateStr);
if ((this.reBuildIndexBeginTime == null) ||
(this.reBuildIndexBeginTime
.isEmpty()))
return beginDate;
SimpleDateFormat sdf;
if (this.reBuildIndexBeginTime.matches("\\d{2}:\\d{2}:\\d{2}")) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); beginDate = sdf.parse(new StringBuilder().append(dateStr).append(" ").append(this.reBuildIndexBeginTime).toString());
}
else if (this.reBuildIndexBeginTime
.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}"))
{
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
return sdf.parse(this.reBuildIndexBeginTime);
}
catch (ParseException e)
{
logger.warn("Unable to convert 'reBuildIndexBeginTime' to date. use now time.", e);
} return beginDate;
}
}

最后还有Properties文件相关的类SolrDataImportProperties

package org.apache.solr.handler.dataimport.scheduler;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class SolrDataImportProperties
{
private Properties properties;
public static final String SYNC_ENABLED = "syncEnabled";
public static final String SYNC_CORES = "syncCores";
public static final String SERVER = "server";
public static final String PORT = "port";
public static final String WEBAPP = "webapp";
public static final String PARAMS = "params";
public static final String INTERVAL = "interval";
public static final String REBUILDINDEXPARAMS = "reBuildIndexParams";
public static final String REBUILDINDEXBEGINTIME = "reBuildIndexBeginTime";
public static final String REBUILDINDEXINTERVAL = "reBuildIndexInterval";
private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class)
; public void loadProperties(boolean force)
{
try
{
SolrResourceLoader loader = new SolrResourceLoader(null);
logger.info("Instance dir = " + loader.getInstanceDir()); String configDir = loader.getConfigDir();
configDir = SolrResourceLoader.normalizeDir(configDir);
if ((force) || (this.properties == null)) {
this.properties = new Properties(); String dataImportPropertiesPath = configDir + "dataimport.properties"; FileInputStream fis = new FileInputStream(dataImportPropertiesPath); this.properties.load(fis);
}
} catch (FileNotFoundException fnfe) {
logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
}
catch (IOException ioe)
{
logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
}
catch (Exception e)
{
logger.error("Error loading DataImportScheduler properties", e);
}
} public String getProperty(String key) {
return this.properties.getProperty(key);
}
}

服务端流程主要流程

handleRequestBody-->  importer.runCmd-->doFullImport/doDeltaImport--> docBuilder.execute()--> doDelta()-->collectDelta()-->ject> row = epw.nextModifiedRowKey()/getModifiedParentRows-->entityProcessor.nextModifiedParentRowKey()-->initQuery()-->dataSource.getData(q)-->ResultSetIterator r = new ResultSetIterator(query)-->getARow()

剩下的就跟正常的solr处理流程差不多了

links

Solr中DIH模式的使用

solr连接数据库配置

Solr学习(五)DIH增量、定时导入并检索数据

05-07 15:50