初始化
$hosts = array('192.168.30.41');
$this->client = \Elasticsearch\ClientBuilder::create()->setHosts($hosts)->build();
新建和设置index
$params = [
'index' => 'order',
'body' => [
'settings' => [
'max_result_window' => 10000000 #由于默认只能读取前10000条数据,这里设置为100w,但是代价就是分页越靠后,效率越低。也可以使用scan解决
],
'mappings' => [
'goods' => [
'_source' => [
'enabled' => true
],
'properties' => [
'product_code' => [
'type'=>'string',
'store'=>'yes',
'fielddata'=>true,
'fields'=>[
'raw'=>[ #由于需要按照这个字段分组统计,且不能进行分词,固这样配置。统计时字段需要写为 product_code.raw
'type'=>'string',
'index'=>'not_analyzed'
]
]
],
'order_id'=>[
'fielddata'=>true,
'type'=>'string'
],
'price'=>[
'type'=>'double'
],
'num'=>[
'type'=>'integer'
],
'pay_time'=>[
'type'=>'date',
'format'=>'yyyy-MM-dd HH:mm:ss'
],
'take_province'=>[
'type'=>'string',
'fielddata'=>true,
'store'=>'yes',
'fields'=>[
'raw'=>[
'type'=>'string',
'index'=>'not_analyzed'
]
]
],
'buyer_nike'=>[
'type'=>'string',
'fielddata'=>true
]
]
]
]
]
];
$response = $this->client->indices()->create($params);
插入数据(这里引用了官方文档的例子,大数据导入不使用insert,而使用更为效率的bulk)
$params = ['body' => []]; for ($i = 1; $i <= 1234567; $i++) {
$params['body'][] = [
'index' => [
'_index' => 'my_index',
'_type' => 'my_type',
'_id' => $i
]
]; $params['body'][] = [
'my_field' => 'my_value',
'second_field' => 'some more values'
]; // Every 1000 documents stop and send the bulk request
if ($i % 1000 == 0) {
$responses = $client->bulk($params); // erase the old bulk request
$params = ['body' => []]; // unset the bulk response when you are done to save memory
unset($responses);
}
} // Send the last batch if it exists
if (!empty($params['body'])) {
$responses = $client->bulk($params);
}
相关查询
1、查询某商品某时间段内订单数、售卖总数和总价格
#where product_code="xxx" and pay_time BETWEEN "2017-01-01 00:00:00" AND "2017-01-31 23:59:59"
$params = [
'index' => 'order',
'type' => 'goods',
'body' => [
'size' => 1,
'query' => [
"bool"=>[
"must"=>[
"term"=>["product_code.raw"=>$code] #上面解释过了,这里采用不分词的统计,使用字段.raw
],
"filter"=>[
"range"=>[
"pay_time"=>[
"gte"=>$start_time,
"lte"=>$end_time
]
]
]
]
],
'aggs' => [
'sum_this_product'=>['sum'=>['field'=>"num"]], #售卖总数量,sum累加
'total_price'=>['sum'=>['field'=>"price"]], #总价格
'distinct_orderid'=>['cardinality'=>['field'=>'order_id']] #去重订单数
]
]
];
$response = $this->client->search($params);
2、统计某时间段所有商品的订单数、售卖总数和总价格
#where pay_time BETWEEN "2017-01-01 00:00:00" AND "2017-01-31 23:59:59"
$params = [
'index' => 'order',
'type' => 'goods',
'body' => [
'size' => 0,
'query' => [
"bool"=>[
"filter"=>[
"range"=>[
"pay_time"=>[
"gte"=>$start_time,
"lte"=>$end_time
]
]
]
]
],
'aggs' => [
'num'=>[
'terms'=>[
'field'=>'product_code.raw',
'size'=>100,
'order'=>['sum_this_product'=>'desc'] #根据统计出来的售卖总数排序
],
'aggs'=>[
'sum_this_product'=>['sum'=>['field'=>'num']],
'total_this_product'=>['sum'=>['field'=>'price']],
'distinct_orderid'=>['cardinality'=>['field'=>'order_id']]
]
]
]
]
];
$response = $this->client->search($params);
唠叨:
1、这次使用的是docker环境,使用阿里镜像:https://dev.aliyun.com/detail.html?spm=5176.1972343.2.21.F0KOV2&repoId=1209
2、官方文档:https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html
3、本次工作数据量大约1500w,需要复杂的统计和展现,mysql已经不能满足,故使用es。但是es不支持类似mysql:select in select这样的子查询,着实折腾了不少时间
4、感谢一位大神的博客:https://segmentfault.com/a/1190000004433446,这是个文章系列,很值得参考。