我使用高级Rest客户端从Spring Boot应用程序连接到Elasticsearch 6.5。

我想创建一个 Controller ,该 Controller 具有将命令添加到批量请求的方法和刷新(实际执行)批量请求操作的方法。

我这样编码:
BulkRequest bean -注意单例范围

@Bean
public BulkRequest bulkRequest() {
  return new BulkRequest();
}

批量 Controller
@RestController
@RequestMapping("/bulk")
public class BulkController {

    @Autowired
    private BulkRequest bulkRequest;

    @Autowired
    RestHighLevelClient client;

    @PostMapping
    public void index(@RequestBody String o) {
        bulkRequest.add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
    }

    @PostMapping(path = "/flush")
    public String flush() throws Exception {
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

        if(bulkResponse.hasFailures()) {
          return bulkResponse.buildFailureMessage();
        }
        else {
          return "All operations in the bulk request proceeded successfully!";
        }
    }

现在的问题:
-bulkRequest.add方法是否在BulkRequest bean的范围内同步(在本例中为singleton)?
-调用BulkController.flush方法后如何启动新的BulkRequest?我是否需要实例化BulkRequest的新bean,并以某种方式将其推送到bean环境?
-如果BulkRequest bean具有@session范围,则需要更改什么?

最佳答案

不幸的是,我在StackOverflow上找到了一个类似的问题,我忘了保存链接,因此在此回复中将仅作解释。

我使用了AtomicReference作为bean类型,在flush方法中重新初始化了BulkRequest对象。然后,我在BulkRequest.add调用上添加了同步,因为它在后台使用List。

请注意,此解决方案有点脏,在引用的答案中已对此进行了描述-但对我来说很有效。

码:
Bean

    @Bean
    public AtomicReference<BulkRequest> bulkRequest() {
      return new AtomicReference<BulkRequest>(new BulkRequest());
    }

Controller :
        @Autowired
        private AtomicReference<BulkRequest> bulkRequest;

        @PostMapping
        public void index(@RequestBody String o) {
            synchronized (bulkRequest.get()) {
                bulkRequest.get().add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
            }
        }

        @DeleteMapping(path="/{id}")
        public void delete(@PathVariable String id) {
            synchronized (bulkRequest.get()) {
                bulkRequest.get().add(new DeleteRequest(config.INDEX, config.TYPE, id));
            }
        }

        @PutMapping(path="/{id}")
        public void update(@PathVariable String id, @RequestBody String o) {
            synchronized (bulkRequest.get()) {
                bulkRequest.get().add(new UpdateRequest(config.INDEX, config.TYPE, id).doc(o, XContentType.JSON));
            }
        }

 @PostMapping(path = "/flush")
    public String flush() throws Exception {
        synchronized (bulkRequest.get()) {
            String result = bulkService.flush(bulkRequest);

            bulkRequest.set(new BulkRequest());

            return result;
        }
    }

BulkService
@Service
public class BulkService {

    @Autowired
    private RestHighLevelClient client;

    public String flush( AtomicReference<BulkRequest> bulkRequest) throws Exception {

        BulkResponse bulkResponse = client.bulk(bulkRequest.get(), RequestOptions.DEFAULT);

        if(bulkResponse.hasFailures()) {
            return bulkResponse.buildFailureMessage();
        }
        else {
            return "All operations in the bulk request proceeded successfully!";
        }
    }
}

08-18 07:20