Elastic 中国社区官方博客

Elastic 中国社区官方博客

了解如何为 Elasticsearch 创建自定义连接器以简化数据摄取过程。

作者:JEDR BLASZYK

如何为 Elasticsearch 创建自定义连接器-LMLPHP

Elasticsearch 拥有一个摄取工具库,可以从多个来源获取数据。 但是,有时你的数据源可能与 Elastic 现有的提取工具不兼容。 在这种情况下,你可能需要创建自定义连接器以将数据与 Elasticsearch 连接。

在你的应用程序中使用 Elastic 连接器有多种原因。 例如,你可能想要:

  • 将数据从自定义或遗留应用程序引入 Elasticsearch
  • 为你的组织数据引入语义搜索
  • 从 PDF、MS Office 文档等文件中提取文本内容
  • 使用 Kibana UI 管理你的数据源(包括配置、过滤规则、设置定期同步计划规则)
  • 你想要在自己的基础设施上部署 Elastic 连接器(一些 Elastic 支持的连接器可作为 Elastic Cloud 中的本机连接器使用)

用于创建定制连接器的开放代码框架

如果创建你自己的连接器是满足你需求的解决方案,那么连接器框架将帮助你创建一个。 我们创建的框架是为了支持创建自定义连接器并帮助用户将独特的数据源连接到 Elasticsearch。 连接器的代码可在 GitHub 上找到,并且我们有可以帮助你入门的文档

该框架设计简单且高性能。 它旨在对开发人员友好,因此它是开放代码且高度可定制的。 你创建的连接器可以在你自己的基础设施上进行自我管理。 目标是让开发人员能够轻松地将自己的数据源与 Elasticsearch 集成。

使用连接器框架之前你需要了解什么

该框架是用 async-python 编写的

有几门课程可以学习 async-python。 如果你需要推荐,我们认为这个 LinkedIn 学习课程非常好,但需要订阅。 我们喜欢的一个免费替代方案是这个

为什么我们选择异步 Python?

摄取受 IO 限制(而非 CPU 限制),因此从资源利用的角度构建连接器时,异步编程是最佳方法。 在 I/O 密集型应用程序中,大部分时间都花在等待外部资源上,例如读取文件、发出网络请求或查询数据库。 在这些等待期间,传统的同步代码会阻塞整个程序,导致资源利用效率低下。

还有其他先决条件吗?

这不是先决条件。 在开始之前,绝对值得阅读《连接器开发人员指南》! 希望你觉得这个有用。

使用连接器框架构建定制连接器

入门很容易。 在与框架相关的术语中,我们将自定义连接器称为源。 我们通过创建一个新类来实现一个新的源,该类的职责是将文档从自定义数据源发送到Elasticsearch。

作为一种可选的入门方式,用户还可以查看此目录源 (directory source) 示例。 这是一个很好但基本的示例,可以帮助你了解如何编写自定义连接器。

步骤概要

一旦你知道要为其创建连接器的自定义数据源,以下是编写新源的步骤概述:

  • 在 connectors/sources 中添加模块或目录
  • requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的方法
  • (可选,在为 repo 做出贡献时)在 connectors/sources/tests 中添加单元测试,覆盖率 +90%
  • 在源部分声明你的连接器 connectors/config.py
  • 就是这样。 我们完成了! 现在你应该能够运行连接器

在编写定制连接器之前你需要了解什么

为了使 Elasticsearch 用户能够获取数据并在该数据的基础上构建搜索体验,我们提供了一个轻量级的连接器协议。 该协议允许用户轻松获取数据、使用企业搜索功能来操作该数据并创建搜索体验,同时在 Kibana 中为他们提供无缝的用户体验。 为了与企业搜索兼容并充分利用 Kibana 中提供的连接器功能,连接器必须遵守该协议。

关于连接器协议你需要了解的内容

文档页面提供了该协议的良好概述。 以下是你需要了解的内容:

  • 连接器和系统其他部分之间的所有通信都通过 Elasticsearch 索引异步进行
  • 连接器将其状态传达给 Elasticsearch 和 Kibana,以便用户可以为其提供配置并诊断任何问题
  • 这允许简单、开发人员友好的连接器部署。 connectors 服务是无状态的,并且不关心你的 Elastic 部署在哪里运行,只要它可以通过网络连接到它就可以正常工作。 该服务还具有容错能力,可以在重新启动或发生故障后在不同的主机上恢复操作。 一旦与 Elasticsearch 重新建立连接,它将继续正常运行。
  • 在底层,该协议使用 Elasticsearch 索引来跟踪连接器状态
    • .elastic-connectors 和 .elastic-connectors-sync-jobs (在上面链接的文档中描述)

托管自定义连接器的位置

连接器本身不依赖于 Elasticsearch,它可以托管在你自己的环境中

如何为 Elasticsearch 创建自定义连接器-LMLPHP

如果你有 Elasticsearch 部署,无论它是自我管理还是位于 Elastic Cloud 中:

  • 作为开发人员/公司,你可以为你的数据源编写自定义连接器
  • 在你自己的基础设施上管理连接器并根据你的需求配置连接器服务
  • 只要连接器可以通过网络发现 Elasticsearch,它就能够对数据建立索引
  • 作为管理员,你可以通过 Kibana 控制连接器

示例:使用连接器框架的 Google Drive 连接器

我们使用连接器框架为 Google Drive 编写了一个简单的连接器。 我们通过创建一个新类来实现新的源,该类的职责是将文档从目标源发送到 Elasticsearch。

我们从具有 BaseDataSource 预期方法签名的 GoogleDriveDataSource 类开始,以配置数据源、检查其可用性(ping)并检索文档。 为了使这个连接器发挥作用,我们需要实现这些方法。

class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    @classmethod
    def get_default_configuration(cls):
        """Returns a dict with a default configuration"""
        raise NotImplementedError

    async def ping(self):
        """When called, pings the backend

        If the backend has an issue, raises an exception
        """
        raise NotImplementedError

   async def get_docs(self, filtering=None):
        """Returns an iterator on all documents present in the backend

        Each document is a tuple with:
        - a mapping with the data to index
        - a coroutine to download extra data (attachments)

        The mapping should have least an `id` field
        and optionally a `timestamp` field in ISO 8601 UTC

        The coroutine is called if the document needs to be synced
        and has attachments. It needs to return a mapping to index.

        It has two arguments: doit and timestamp
        If doit is False, it should return None immediately.
        If timestamp is provided, it should be used in the mapping.

        Example:

           async def get_file(doit=True, timestamp=None):
               if not doit:
                   return
               return {'TEXT': 'DATA', 'timestamp': timestamp,
                       'id': 'doc-id'}
        """
        raise NotImplementedError

这个 GoogleDriveDataSource 类是编写 Google Drive 源代码的起点。 通过执行以下步骤,你将实现与 Google Drive 同步数据所需的逻辑:

  • 我们需要将此文件添加到 connectors/sources 中
  • 设置新的连接器名称和 service_type,例如 Google Drive 作为名称,google_drive 作为服务类型 (service type)
  • 要从源获取连接器同步数据,你需要实现:
    • get_default_configuration - 此函数应返回 RichConfigurableFields 的集合。 这些字段允许你从 Kibana UI 配置连接器。 这包括传递身份验证详细信息、凭据和其他特定于源的设置。 Kibana 巧妙地呈现这些配置。 例如,如果你将某个字段标记为 "sensitive": true, Kibana 会出于安全原因屏蔽它。
    • ping - 对数据源的简单调用,验证其状态,将其视为健康检查。
    • get_docs - 此方法需要实现实际从源获取数据的逻辑。 此函数应返回一个异步迭代器,该迭代器返回一个包含以下内容的元组:(document, lazy_download),其中:
      • document - 是远程源中项目的 JSON 表示形式。 (如 name, location, table, author, size 等)
      • lazy_download - 是一个协程,用于下载框架处理的内容提取的对象/附件(例如从 PDF 文档中提取文本)

BaseDataSource 类中还有其他抽象方法。 请注意,如果你只想支持内容同步(例如从谷歌驱动器获取所有数据),则不需要实现这些方法。 它们指的是其他连接器功能,例如:

  • 文档级安全性(get_access_control、access_control_query)
  • 高级过滤规则(advanced_rules_validators)
  • 增量同步 (get_docs_incrementally)
  • 将来可能会添加其他功能

我们如何编写官方 Elasticsearch Google Drive 连接器

首先实现 BaseDataSource 类所需的方法

我们需要实现方法 get_default_configuration、ping 和 get_docs 以使连接器同步数据。 因此,让我们更深入地了解实现。

首先要考虑的是:如何与Google Drive “对话” 来获取数据?

Google 提供了官方的 python 客户端,但它是同步的,因此同步内容可能会很慢。 我们认为更好的选择是 aiogoogle 库,它提供了用异步 python 编写的完整客户端功能。 一开始这可能并不直观,但使用异步操作来提高性能非常重要。 因此,在此示例中,我们选择不使用官方谷歌库,因为它不支持异步模式。

如果你在异步框架中使用同步或阻塞代码,可能会对性能产生重大影响。 任何异步框架的核心都是事件循环。 事件循环允许通过连续轮询已完成的任务并调度新任务来并发执行异步任务。 如果引入阻塞操作,它将停止循环的执行,从而阻止它管理其他任务。 这本质上否定了异步架构提供的并发优势。

下一个问题是连接器身份验证

我们将 Google Drive 连接器验证为服务帐户。 有关身份验证的更多信息可以在这些连接器文档页面中找到。

  • 服务帐户可以使用密钥进行身份验证
  • 我们通过 Elasticsearch 中的 Kibana UI 将身份验证密钥传递给服务帐户

让我们看一下 get_default_configuration 实现,它允许最终用户传递凭证密钥,该凭证密钥将存储在索引中以在同步期间进行身份验证:

class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    {...}

    @classmethod
    def get_default_configuration(cls):
        """Get the default configuration for Google Drive.

        Returns:
            dict: Default configuration.
        """
        return {
            "service_account_credentials": {
                "display": "textarea",
                "label": "Google Drive service account JSON",
                "sensitive": True,
                "order": 1,
                "tooltip": "This connectors authenticates as a service account to synchronize content from Google Drive.",
                "type": "str",
                "value": "",
            },
        }

接下来我们来实现一个简单的 ping 方法

我们将对 google Drive api 进行简单的调用,例如 /about 端点。

对于此步骤,我们考虑 GoogleDriveClient 的简化表示。 我们的主要目标是指导你完成连接器创建,因此我们不关注 Google Drive 客户端的实现细节。 然而,最少的客户端代码对于连接器的操作至关重要,因此我们将依赖 GoogleDriveClient 类表示的伪代码。

class GoogleDriveClient(GoogleAPIClient):
    """A google drive client to handle api calls made to Google Drive API."""

    {... google drive client implementation}

    async def ping(self):
        return await self.api_call(resource="about", method="get", fields="kind")



class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    {...}

    @cached_property
    def google_drive_client(self):
        """Initialize and return the GoogleDriveClient

        Returns:
            GoogleDriveClient: An instance of the GoogleDriveClient.
        """
        self._validate_service_account_json()

        json_credentials = json.loads(self.configuration["service_account_credentials"])

        return GoogleDriveClient(json_credentials=json_credentials)


    async def ping(self):
        """Verify the connection with Google Drive"""
        try:
            await self.google_drive_client.ping()
            self._logger.info("Successfully connected to the Google Drive.")
        except Exception:
            self._logger.exception("Error while connecting to the Google Drive.")
            raise

异步 iterator 从 google drive 返回文件以进行内容提取

下一步是编写 get_docs 异步迭代器,该迭代器将从 Google drive 和协程返回文件以下载它们以进行内容提取。 根据个人经验,开始将 get_docs 作为一个简单的独立 python 脚本来实现并获取一些数据通常更简单。 一旦 get_docs 代码正常工作,我们就可以将其移动到数据源类。

我们看一下 api 文档,我们可以:

  • 使用文 files/list 端点通过分页迭代 drive 中的文档
  • 使用 files/get 和 files/export 下载文件(或将 google 文档导出为特定文件格式)
class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    {...}

    async def get_content(self, file, timestamp=None, doit=None):
        """Extracts the content from a file file.

        Args:
            file (dict): Formatted file document.
            timestamp (timestamp, optional): Timestamp of file last_modified. Defaults to None.
            doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.

        Returns:
            dict: Content document with id, timestamp & text
        """

        # Code details have been omitted here for brevity. For a complete implementation,
        # please refer to the connector code on GitHub.


    async def get_docs(self, filtering=None):
        """Executes the logic to fetch Google Drive objects in an async manner.

        Args:
            filtering (optional): Advenced filtering rules. Defaults to None.

        Yields:
            dict, partial: dict containing meta-data of the Google Drive objects,
                                partial download content function
        """


        async for files_page in self.google_drive_client.list_files():
            async for file in self.prepare_files(files_page=files_page):
                yield file, partial(self.get_content, file)

那么这段代码中发生了什么?

  • list_files 对驱动器中的文件进行分页。
  • prepare_files 将文件元数据格式化为预期模式
  • get_content 是一个下载文件并对其内容进行 Base64 编码的协程(内容提取的兼容格式)

为了简洁起见,省略了一些代码细节。 有关完整的实现,请参阅 GitHub 上的当前连接器实现

让我们运行连接器!

要将自定义连接器集成到框架中,你需要注册其实现。 通过在 connectors/config.py 的源部分中添加自定义连接器的条目来执行此操作。 对于 Google Drive 示例,添加内容将如下所示:

"sources": {
  ...,
  "google_drive": "connectors.sources.google_drive:GoogleDriveDataSource",
  ...
}

现在在 Kibana 界面中:

  • 转到 Search -> Indices -> Create a new index -> Use a Connector
  • 选择 Customized connector(使用自定义连接器时)
  • 配置你的连接器。 生成 Elasticsearch API 密钥和连接器 ID,并按照说明将这些详细信息放入 config.yml 中,然后启动连接器。

此时,Kibana 应该检测到您的连接器! 安排定期数据同步或只需单击 “Sync” 即可开始完全同步。

如何为 Elasticsearch 创建自定义连接器-LMLPHP连接器可以配置为使用 Elasticsearch 的摄取管道在将数据存储到索引之前对数据执行转换。 一个常见的用例是通过机器学习丰富文档。 例如,你可以:

  • 使用文本嵌入模型分析文本字段,该模型将生成数据的密集向量表示
  • 运行文本分类以进行情感分析
  • 使用命名实体识别 (NER) 从文本中提取关键信息

同步完成后,你的数据将在搜索优化的 Elasticsearch 索引中可用。 此时,你可以深入构建搜索体验或深入分析。

你想创建并贡献一个新的连接器吗?

如果你为可能对 Elasticsearch 社区有所帮助的源创建自定义连接器,请考虑贡献它。 以下是使定制连接器成为 Elastic 支持的连接器的升级路径指南

贡献连接器的验收标准

此外,在开始花一些时间开发连接器之前,你应该创建一个问题并寻求有关连接器及其将使用哪些库的一些初步反馈。 一旦你的连接器想法得到一些初步反馈,请确保您的项目满足一些验收标准:

  • 在 connectors/sources 中添加模块或目录
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的所有方法
  • 在 connectors/sources/tests 中添加覆盖率 +90% 的单元测试
  • 在源部分的 connectors/config.py 中声明你的连接器
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 对于你要添加的每个依赖项(包括间接依赖项),列出所有许可证并在补丁中提供该列表。
  • 确保你的源使用异步库。 如果不可能,请确保没有阻塞循环
  • 如果可能,请提供运行后端服务的 docker 映像,以便我们测试连接器。 如果您无法提供 Docker 映像,请提供针对在线服务运行所需的凭据。
  • 由于 Elasticsearch 分页的默认大小限制为 10k,测试后端需要返回超过 10k 的文档。 从测试后端返回超过 10k 文档将有助于测试连接器

用于测试连接器的支持工具

我们还有一些支持工具来分析连接器代码并运行性能测试。 你可以在这里找到这些资源:

  • Perf8 - 性能库和仪表板,用于分析 python 代码的质量,以评估资源利用率并检测阻塞调用
  • E-2-E 功能测试,利用 perf8 库来分析每个连接器

总结

我们希望这个博客和示例对你有用。

以下是 Elasticsearch 可用的 native connectorsconnector clients 的完整列表。 如果你没有找到列出的数据源,是否可以创建一个自定义连接器?

以下是与本文相关的一些有用资源:

如果你没有 Elastic 帐户,你可以随时启动试用帐户来开始!

10-19 10:27