我的postgres表中有一列几何类型。我检查了其余的列是否工作正常,postgres-> logstash-> elasticsearch。 但是,我不确定如何转换几何类型。 有人可以提供有关铸造的建议吗?请帮我。

现在是我的 session 。

    jdbc {
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
    jdbc_user => "atlas"
    jdbc_password => "atlas"
    jdbc_validate_connection => true
    jdbc_driver_library => "/lib/postgres-42-test.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    columns_charset => { "region_name_kr" => "UTF-8" }
    schedule => "* * * * *"
    statement => "SELECT region_id, region_name_full, boundaries_nearby from expedia_region_union_copy_jpa_test order by region_id asc limit 100"
}

stdin {
    codec => plain { charset => "UTF-8"}
}

}
filter {
    json { source => "boundaries_nearby" }
}
output {
    elasticsearch {
    hosts => [ "localhost:9200" ]
    index => "2020-05-06-wed"
    doc_as_upsert => true
    action => "update"
    document_id => "%{region_id}"
}
    stdout { codec => rubydebug }
}



https://discuss.elastic.co/t/sql-query-from-logstash-causes-an-error-pgobject/138648
这似乎与这个问题重叠,但是我从中找不到线索。

这是我的样本几何数据



我也觉得这个https://discuss.elastic.co/t/missing-converter-handling-for-full-class-name-org-postgresql-util-pgobject-simple-name-pgobject/163338/3
可能是一个答案。但是我对这个答案不太了解。



如果有人可以解释,请帮忙。或者,请回答同样问题的人。先感谢您。

最佳答案

当在postgres中使用logstash将数据加载到elasticsearch中时, jsonb 几何类型转换存在问题。

结论

对于几何类型,请使用 st_asgeojson 进行更改。然后返回字符串。然后,您可以使用json重新投射。

在jsonb的情况下,我将其作为文本接收,并使用ruby过滤器与json进行解析。我不确定这是否是最好的方法。尽管如此,没有什么可以解决问题的。

这是我的示例配置文件。
视情况而定,可能有错字。确认后使用。

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://ip:port/dbname"
        jdbc_user => "user"
        jdbc_password => "password"
        jdbc_driver_library => "driver.jar"
        jdbc_validate_connection => true
        jdbc_driver_class => "org.postgresql.Driver"
        columns_charset => {"region_name_kr" => "UTF-8"}
        schedule => "*/10 * * * *"
        statement => "SELECT region_id, region_name, st_asgeojson(boundaries) as boundaries
                             country::text from expedia_region_union"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "100000"
    }
    stdin {
        codec => plain { charset => "UTF-8" }
    }
}
filter {
    ruby {
        code => "
            require 'json'
            begin
                country_json = JSON.parse(event.get('country').to_s || '{}')
                event.set('country', country_json)
            rescue Exception => e
                event.tag('invalide country json')
            end
            begin
                boundaries_json = JSON.parse(event.get('boundaries').to_s || '{}')
                event.set('boundaries', boundaries_json)
            rescue Exception => e
                event.tag('invalide boundaries json')
            end
        "
    }
}
output {
    elasticsearch {
        hosts => ["https://host"] # if you have many elastic nodes, please pick master node.
        index => "indexName"
        doc_as_upsert => true
        action => "update"
        document_id => "%{region_id}"
    }
    stdout { codec => rubydebug }
}
  • https://stackoverflow.com/a/57653309/10194999
  • https://postgis.net/docs/ST_AsText.html
  • https://discuss.elastic.co/t/painful-postgres-logstash-elasticsearch-mapper-parsing-exception/44918/3
  • 关于postgresql - 缺少完整类名= org.postgresql.util.PGobject的Converter处理,简单名称= PGobject,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61669916/

    10-10 20:16