我使用高级Rest客户端从Spring Boot应用程序连接到Elasticsearch 6.5。
我想创建一个 Controller ,该 Controller 具有将命令添加到批量请求的方法和刷新(实际执行)批量请求操作的方法。
我这样编码:
BulkRequest bean -注意单例范围
@Bean
public BulkRequest bulkRequest() {
return new BulkRequest();
}
批量 Controller
@RestController
@RequestMapping("/bulk")
public class BulkController {
@Autowired
private BulkRequest bulkRequest;
@Autowired
RestHighLevelClient client;
@PostMapping
public void index(@RequestBody String o) {
bulkRequest.add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
}
@PostMapping(path = "/flush")
public String flush() throws Exception {
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
return bulkResponse.buildFailureMessage();
}
else {
return "All operations in the bulk request proceeded successfully!";
}
}
现在的问题:
-
bulkRequest.add
方法是否在BulkRequest bean的范围内同步(在本例中为singleton)?-调用
BulkController.flush
方法后如何启动新的BulkRequest?我是否需要实例化BulkRequest的新bean,并以某种方式将其推送到bean环境?-如果BulkRequest bean具有
@session
范围,则需要更改什么? 最佳答案
不幸的是,我在StackOverflow上找到了一个类似的问题,我忘了保存链接,因此在此回复中将仅作解释。
我使用了AtomicReference作为bean类型,在flush方法中重新初始化了BulkRequest对象。然后,我在BulkRequest.add
调用上添加了同步,因为它在后台使用List。
请注意,此解决方案有点脏,在引用的答案中已对此进行了描述-但对我来说很有效。
码:
Bean
@Bean
public AtomicReference<BulkRequest> bulkRequest() {
return new AtomicReference<BulkRequest>(new BulkRequest());
}
Controller :
@Autowired
private AtomicReference<BulkRequest> bulkRequest;
@PostMapping
public void index(@RequestBody String o) {
synchronized (bulkRequest.get()) {
bulkRequest.get().add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
}
}
@DeleteMapping(path="/{id}")
public void delete(@PathVariable String id) {
synchronized (bulkRequest.get()) {
bulkRequest.get().add(new DeleteRequest(config.INDEX, config.TYPE, id));
}
}
@PutMapping(path="/{id}")
public void update(@PathVariable String id, @RequestBody String o) {
synchronized (bulkRequest.get()) {
bulkRequest.get().add(new UpdateRequest(config.INDEX, config.TYPE, id).doc(o, XContentType.JSON));
}
}
@PostMapping(path = "/flush")
public String flush() throws Exception {
synchronized (bulkRequest.get()) {
String result = bulkService.flush(bulkRequest);
bulkRequest.set(new BulkRequest());
return result;
}
}
BulkService
@Service
public class BulkService {
@Autowired
private RestHighLevelClient client;
public String flush( AtomicReference<BulkRequest> bulkRequest) throws Exception {
BulkResponse bulkResponse = client.bulk(bulkRequest.get(), RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
return bulkResponse.buildFailureMessage();
}
else {
return "All operations in the bulk request proceeded successfully!";
}
}
}