Elasticsearch官方网址:https://www.elastic.co

Elasticsearch中文官网地址:https://www.elastic.co/cn/products/elasticsearch

https://www.elastic.co/cn/downloads/logstash

https://www.elastic.co/cn/downloads/kibana
https://dev.mysql.com/downloads/connector/j/

Logstash与Java 8适配的版本取决于你所使用的Elasticsearch(ES)版本。根据Elastic Stack的兼容性建议:

对于ES 6.7.x之前的版本,推荐Java 8。
对于ES 6.7.x到7.17.x的版本,Java 8或Java 11都是推荐的。但请注意,虽然这些版本的ES支持Java 8,但Elasticsearch 7.10是官方支持Java 8的最后一个版本。
对于ES 8.x及之后的版本,推荐使用Oracle/OpenJava 11或Oracle/OpenJava 17。
至于Logstash,从7.8.0版本开始,它支持Azul Zulu JDK,这对于使用M1 Mac的用户是一个好的选择。但是,对于Java 8的适配,你应该选择与你的Elasticsearch版本兼容的Logstash版本。

在CentOS 7.9上安装Elasticsearch 7.10.0和Logstash 7.10.0,并确保远程可以访问,你可以按照以下步骤进行:

一、安装Elasticsearch 7.10.0

下载RPM包:
从Elasticsearch官网下载Elasticsearch 7.10.0的RPM包(elasticsearch-7.10.0-x86_64.rpm)。
安装Elasticsearch:
使用rpm命令安装下载的RPM包。

sudo rpm -ivh elasticsearch-7.10.0-x86_64.rpm

配置Elasticsearch:

编辑Elasticsearch的配置文件/etc/elasticsearch/elasticsearch.yml
cluster.name: "my-cluster-name"
node.name: "my-node-name"
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["my-node-name"]
# 如果启用了X-Pack安全功能,请确保设置了正确的xpack配置  
# # xpack.security.enabled: true  
# # xpack.license.self_generated.type: basic  
# # ... 其他xpack相关配置 ...


# 集群和节点名称一定要设置吗?

# 集群名称(cluster.name):推荐设置。它有助于标识和管理Elasticsearch集群,特别是当您有多个集群时。
# 节点名称(node.name):不是必须设置,但推荐设置。如果不设置,Elasticsearch将自动为节点生成一个名称。但自定义节点名称可以使管理更加容易,特别是在大型集群中。

sudo vi /etc/elasticsearch/jvm.options
-Xms128m
-Xmx128m

启动Elasticsearch服务:
使用systemctl命令启动Elasticsearch服务,并设置为开机自启。

sudo systemctl daemon-reload   # 重新加载 systemd 配置文件
sudo systemctl start elasticsearch  
sudo systemctl enable elasticsearch

二、安装Logstash 7.10.0

下载RPM包:
从Logstash官网下载Logstash 7.10.0的RPM包(logstash-7.10.0-x86_64.rpm)。
安装Logstash:
使用rpm命令安装下载的RPM包。

sudo rpm -ivh logstash-7.10.0-x86_64.rpm

配置Logstash:
创建或编辑Logstash的配置文件(例如logstash.conf),配置输入(input)、过滤(filter)和输出(output)。对于从MySQL读取数据并发送到Elasticsearch的场景,你需要使用jdbc插件作为输入,并使用elasticsearch插件作为输出。
以下是一个简单的配置示例:

input {  
  jdbc {  
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"  
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"  
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database"  
    jdbc_user => "your_username"  
    jdbc_password => "your_password"  
    statement => "SELECT * FROM your_table"  
  }  
}  
 
output {  
  elasticsearch {  
    hosts => ["localhost:9200"]  
    index => "your_index_name"  
    document_id => "%{some_field}"  
  }  
}

请根据你的实际情况替换数据库连接字符串、用户名、密码等。

sudo vi /etc/logstash/jvm.options
-Xms123m  
-Xmx128m

使用命令行参数:
当启动Logstash时,你可以使用-w标志来指定工作线程的数量。例如,要设置工作线程数为2,你可以使用以下命令启动Logstash:

bin/logstash -f <config-file> -w 2

其中<config-file>是你的Logstash配置文件的路径。

修改配置文件:
在某些情况下,你可能需要在Logstash的配置文件中直接指定工作线程的数量。这通常涉及到修改pipeline的配置。然而,请注意,直接在配置文件中设置工作线程数量可能不是所有Logstash版本都支持的功能。在这种情况下,使用命令行参数可能是更可靠的方法。

确认Logstash是否安装成功

sudo rpm -qa | grep logstash

检查服务单元文件是否存在

ls /usr/lib/systemd/system/logstash.service
sudo rpm -ivh logstash-7.10.0-x86_64.rpm

检查Logstash安装目录

sudo find / -name logstash.service 2>/dev/null

创建一个新的 logstash.service 文件在 /usr/lib/systemd/system/ 目录下,并填入适当的配置。例如:

[Unit]  
Description=Logstash  
After=network.target  

[Service]  
Type=simple  
User=logstash  
Group=logstash  
Environment=JAVA_HOME=/usr/share/logstash/jdk  
ExecStart=/usr/share/logstash/bin/logstash "-f" "/etc/logstash/conf.d/"  
Restart=always  

[Install]  
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable logstash  
sudo systemctl start logstash

检查 SELinux 状态:
如果您的系统上启用了 SELinux,并且它在阻止某些操作,您可能需要调整 SELinux 策略或将其设置为 Permissive 模式进行测试。使用 getenforce 命令来检查 SELinux 的状态。
查看 Logstash 和系统的日志文件:
查看 Logstash 的日志文件(通常位于 /var/log/logstash/)以及系统的日志文件(如 /var/log/messages 或使用 journalctl 命令),以获取更多关于安装或配置问题的信息。

查看日志:
如果 Logstash 服务没有按预期运行,您可能需要查看其日志文件以获取更多信息。Logstash 的日志文件通常位于 /var/log/logstash/ 目录下。
使用 journalctl 命令也可以查看 systemd 服务的日志:

journalctl -u logstash
sudo groupadd logstash  
sudo useradd -g logstash logstash

验证配置:
在启动Logstash之前,验证您的配置文件是否正确。您可以使用Logstash的-t或–config.test_and_exit选项来测试配置文件的语法是否正确。

sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ --config.test_and_exit

运行Logstash:
使用Logstash命令行工具运行你的配置文件。

sudo /usr/share/logstash/bin/logstash -f /path/to/logstash.conf

三、配置防火墙和网络访问

开放Elasticsearch端口:
默认情况下,Elasticsearch使用9200端口进行HTTP通信。你需要确保防火墙规则允许对该端口的入站连接。
使用firewall-cmd命令开放端口:

sudo firewall-cmd --zone=public --add-port=9200/tcp --permanent  
sudo firewall-cmd --reload

配置Elasticsearch的网络访问:
确保Elasticsearch的配置文件(elasticsearch.yml)中的network.host设置为0.0.0.0,以允许任何IP地址的访问。如果你只想允许特定IP或IP范围访问,可以设置为具体的IP地址或CIDR表示法。
测试远程访问:
从远程计算机尝试访问Elasticsearch的REST API(例如,使用curl命令或浏览器访问http://your_server_ip:9200/),以验证远程访问是否成功。
完成上述步骤后,你应该能够在CentOS 7.9上成功安装和配置Elasticsearch 7.10.0和Logstash 7.10.0,并确保远程可以访问Elasticsearch。请注意,根据你的具体环境和需求,可能还需要进行其他配置和优化。务必参考官方文档以获取更详细的信息和指导。

可能出现的错误

  1. 解决集群发现设置问题
    Elasticsearch的日志中提到:“the default discovery settings are unsuitable for production use”。这意味着默认的集群发现设置不适合生产环境使用。在单节点环境中,您可以通过编辑Elasticsearch的配置文件(通常是/etc/elasticsearch/elasticsearch.yml)来设置cluster.initial_master_nodes来避免这个警告。例如:
cluster.initial_master_nodes: ["your_node_name"]

将your_node_name替换为您在elasticsearch.yml中设置的节点名称。

如果您打算运行一个多节点的集群,您需要配置discovery.seed_hosts,它指定了Elasticsearch用于发现其他集群成员的主机列表。例如:

discovery.seed_hosts: ["host1", "host2", "host3"]

将host1、host2和host3替换为您集群中其他节点的实际主机名或IP地址。

  1. 查看启动引导检查失败的具体原因
    日志中提到有启动引导检查失败,但是没有提供具体的失败原因。要查看这些原因,您需要检查Elasticsearch的日志文件,通常位于/var/log/elasticsearch/目录下。查看与您的集群名称相对应的日志文件(例如my-cluster-name.log),文件中应该包含启动引导检查失败的具体原因。

  2. 根据日志中的错误信息进行修复
    一旦您查看了具体的启动引导检查错误信息,您就可以根据这些信息进行修复。一些常见的启动引导检查失败原因包括:

文件描述符限制太低。
内存锁定限制太低(vm.max_map_count)。
使用了不支持的JVM版本。
节点数据目录的权限问题。
4. 调整系统配置(如果需要)
根据启动引导检查失败的具体原因,您可能需要调整系统配置。例如,如果vm.max_map_count太低,您可以使用以下命令将其设置为一个较高的值(例如655360):

sudo sysctl -w vm.max_map_count=655360

为了使这个设置永久生效,您可以将上述行添加到/etc/sysctl.conf文件中。
根据启动引导检查失败的具体原因,您可能还需要调整系统配置。例如,增加文件描述符限制、调整内存锁定限制等。这些配置通常可以在操作系统的系统文件中设置,例如/etc/security/limits.conf和/etc/sysctl.conf。

input {  
  beats {  
    port => 5044  
  }  
}  
  
filter {  
  # 如果需要,可以在这里添加数据过滤逻辑  
  # ...  
}  
  
output {  
  elasticsearch {  
    hosts => ["localhost:9200"]  # 如果Elasticsearch不在同一台机器上,请替换为正确的地址  
    index => "fruit-%{+YYYY.MM.dd}"  # 使用日期作为索引名的一部分,便于管理  
    document_id => "%{fruitId}"  # 假设数据源中包含fruitId字段,用于设置文档ID  
    # 如果Elasticsearch启用了安全性功能,请添加用户名和密码配置  
    # user => "logstash_internal"  
    # password => "your_password"  
  }  
}

配置

vim /etc/logstash/conf.d/fruit.conf

input {
  jdbc {
    jdbc_driver_library => "/installer/mysql-connector-j-8.3.0.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/fshop_app"
    jdbc_user => "root"
    jdbc_password => "Root123_"
    statement => "SELECT fruit_id AS fruitId, category_id AS categoryId, fruit_name AS fruitName, fruit_origin AS fruitOrigin, fruit_price AS fruitPrice, fruit_standard AS fruitStandard, fruit_detail AS fruitDetail, fruit_image_url AS fruitImageUrl, fruit_pick_time AS fruitPickTime, fruit_quality_time AS fruitQualityTime, fruit_inventory AS fruitInventory, fruit_status AS fruitStatus, version, create_time AS createTime, update_time AS updateTime, other1, other2 FROM fruit"
   #statement => "select * from fruit"
         #schedule => "*/1 * * * *"


           # 是否将 sql 中 column 名称转小写
        lowercase_column_names => false
 }
}


output {

 elasticsearch {
    hosts => ["localhost:9200"]
    index => "fruit"
    document_id => "%{fruitId}"
  }


        stdout {
        codec => json_lines
    }

}

package com.fshop.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.*;
import org.springframework.format.annotation.DateTimeFormat;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * <p>
 * 水果表
 * </p>
 *
 * @author dev
 * @since 2024-04-23
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName = "fruit")
public class Fruit implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 水果ID
     */
    @TableId(value = "fruit_id", type = IdType.AUTO)
    @Id
    private Integer fruitId;

    /**
     * 分类ID,外键(关联水果分类表)
     */
    private Integer categoryId;

    /**
     * 水果名称
     */
    @Field(type = FieldType.Keyword)
    private String fruitName;

    /**
     * 水果产地
     */
    private String fruitOrigin;

    /**
     * 水果价格(元/箱)
     */
    private BigDecimal fruitPrice;

    /**
     * 水果规格:小0、中1、大2
     */
    private Integer fruitStandard;

    /**
     * 水果特征详细描述
     */
//    @Field(type = FieldType.Text,analyzer = "ik_smart",searchAnalyzer = "ik_max_word")
//    private String fruitDetail;

    @CompletionField(analyzer = "ik_smart", searchAnalyzer = "ik_max_word")
    private String fruitDetail;

    /**
     * 水果主图片URL
     */
    private String fruitImageUrl;

    /**
     * 采摘时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @Field( type = FieldType.Date,name = "fruit_pick_time",format = {},
            pattern = "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd'T'HH:mm:ss'+08:00' || strict_date_opotional_time || epoch_millis")
    private LocalDateTime fruitPickTime;








    /**
     * 保存时间(天)
     */
    private Integer fruitQualityTime;

    /**
     * 库存(箱)
     */
    private Integer fruitInventory;

    /**
     * 水果状态:已下架0、秒杀中1、售卖中2
     */
    private Integer fruitStatus;

    /**
     * 版本
     */
    private Integer version;

    /**
     * 创建时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @Field( type = FieldType.Date,name = "create_time",format = {},
            pattern = "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd'T'HH:mm:ss'+08:00' || strict_date_opotional_time || epoch_millis")
    private LocalDateTime createTime;

    /**
     * 最近更新时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @Field( type = FieldType.Date,name = "update_time",format = {},
            pattern = "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd'T'HH:mm:ss'+08:00' || strict_date_opotional_time || epoch_millis")
    private LocalDateTime updateTime;

    private String other1;

    private String other2;

}



-- fshop_app.fruit definition

CREATE TABLE `fruit` (
  `fruit_id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '水果ID',
  `category_id` int unsigned DEFAULT NULL COMMENT '分类ID,外键(关联水果分类表)',
  `fruit_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '水果名称',
  `fruit_origin` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '水果产地',
  `fruit_price` decimal(10,2) NOT NULL COMMENT '水果价格(元/箱)',
  `fruit_standard` tinyint unsigned NOT NULL COMMENT '水果规格:小0、中1、大2',
  `fruit_detail` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '水果特征详细描述',
  `fruit_image_url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '水果主图片URL',
  `fruit_pick_time` datetime NOT NULL COMMENT '采摘时间',
  `fruit_quality_time` int unsigned NOT NULL COMMENT '保存时间(天)',
  `fruit_inventory` int unsigned NOT NULL COMMENT '库存(箱)',
  `fruit_status` tinyint unsigned NOT NULL COMMENT '水果状态:已下架0、秒杀中1、售卖中2',
  `version` int unsigned NOT NULL DEFAULT '1' COMMENT '版本',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '最近更新时间',
  `other1` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  `other2` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`fruit_id`) USING BTREE,
  KEY `fk_fruit_category` (`category_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=137 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='水果表';

06-10 11:35