我在使用logstash将数据从postgres传输到elastic时遇到麻烦。我收到错误“在converge_state中阻止”。生成的错误是:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of \r, \n at line 36, column 4 (byte 583) after # }", :backtrace=>["/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:24:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}

我的配置文件如下所示:
input {
    jdbc {

        jdbc_connection_string => "jdbc:postgresql://localhost:5432/school"


        jdbc_user => "postgres"
        jdbc_password => "postgres"


        jdbc_driver_library => "/Users/karangupta/Downloads/postgresql-42.2.8.jar"


        jdbc_driver_class => "org.postgresql.Driver"

        statement => "select sch.udise_sch_code,
    sch.school_name,
    e.edu_block_name,
    d.district_name,
    s.state_name,

    sch.school_address,
    sch.pincode,
    case when sch.sch_loc_r_u = 1 then 'rural'
        when sch.sch_loc_r_u = 2 then 'urban' end as "sch_location_type",
    sch.assembly_const,
    sch.status,

    schd.academic_year,
    schd.sch_category,
    schd.class_frm,
    schd.class_to,

    case when schd.sch_type = 1 then 'boys'
          when schd.sch_type = 2 then 'girls'
          else 'coed' end as "school_type",

    case when sde.is_cce_ele = 1 then 'yes'
        when sde.is_cce_ele = 2 then 'no'
        else 'NA' end as "cce_implemented_ele",
    case when sde.is_pcr_maintained = 1 then 'yes'
        when sde.is_pcr_maintained = 2 then 'no'
        else 'NA' end as "cce_implemented_ele",
    case when sde.is_pcr_shared_parents = 1 then 'yes'
        when sde.is_pcr_shared_parents = 2 then 'no'
        else 'NA' end as "is_pcr_shared_parents",
    case when sde.is_cce_sec = 1 then 'yes'
        when sde.is_cce_sec = 2 then 'no'
        else 'NA' end as "cce_implemented_sec",
    case when sde.is_cce_hrsec = 1 then 'yes'
        when sde.is_cce_hrsec = 2 then 'no'
        else 'NA' end as "cce_implemented_hrsec",
    sde.rte_25p_applied,
    sde.rte_25p_enrolled_previous_yr,
    sde.is_smc,
    sde.smc_tot_m,
    sde.smc_tot_f,
    sde.sms_parents_m,
    sde.sms_parents_f,
    sde.smc_lgb_m
from mst_school_2017_18 sch
inner join mst_edu_block_2017_18 e on sch.udise_edu_block_code = e.udise_edu_block_code
inner join mst_district_2017_18 d on e.udise_dist_code = d.udise_district_code
inner join mst_state_2017_18 s on d.udise_state_code = s.udise_state_code
inner join school_details_2017_18 schd on sch.udise_sch_code = schd.udise_sch_code
inner join school_details_extended_2017_18 sde on sch.udise_sch_code = sde.udise_sch_code"
    }
}


output {
    stdout { codec => json_lines }
}

# output {
#   elasticsearch {
#       index => "schoolDetails"
#       hosts => "http://localhost:9200"
#   }
# }

我不知道我在做什么错。

最佳答案

我建议您将多行SQL语句存储在一个文件中,然后存储在reference that file in your configuration中。

所以代替

statement => "select ..."

用这个:
statement_filepath => "/absolute/path/to/statement.sql"

关于elasticsearch - 如何使用logstash将elasticsearch与postgres连接?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58299830/

10-10 20:16