Login
网站首页 > 文章中心 > 其它

基于Kafka和Elasticsearch构建实时站内搜索功能的实践

作者:小编 更新时间:2023-08-16 12:24:05 浏览量:152人看过

目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的.本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈.

问题的定义与决策

为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策.我们使用 MySQL 作为主数据库存储,所以呢有以下选择:

使用一个高效的搜索数据库,如 Elasticsearch.?

考虑到我们是一个多租户应用程序,同时被搜索的实体可能需要大量的关联操作(如果我们使用的是 MySQL 一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以我们还可以能需要同时遍历多个数据表来查询用户输入的关键词.所以我们决定不使用直接在 MySQL 中查询关键词的方案.?

所以呢,我们必须决定一种高效、可靠的方式,将数据实时地从 MySQL 迁移到 Elasticsearch 中.此时此刻呢需要做出如下的决定:

使用 Worker 定期查询 MySQL 数据库,并将所有变化的数据发送到 Elasticsearch.?

使用基于事件的流引擎,将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上,经过处理后将其转发到 Elasticsearch.?

服务简介

基于Kafka和Elasticsearch构建实时站内搜索功能的实践

为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构.对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为标题和内容,这部分内容我们称之可搜索内容(Searchable Content).在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为元数据(Metadata).最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的原始内容(Raw Content).到此为止我们可以定义出了存储到 Elasticsearch 中的通用数据结构:


{
"searchable": {
"title": "string",
"content": "string"
},
"metadata": {
"tenant_id": "long",
"type": "long",
"created_at": "date",
"created_by": "string",
"updated_at": "date",
"updated_by": "string"
},
"raw": {}
}



基础设施

Apache Kafka:Apache?Kafka 是开源的分布式事件流平台.我们使用 Apache kafka 作为数据库事件(插入、修改和删除)的持久化存储.

mysql-binlog-connector-java:我们使用mysql-binlog-connector-java从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中.我们将单独启动一个服务来完成这个过程.

在接收端我们也将单独启动一个服务来消费 Kafka 中的事件,并对数据进行处理然后发送到 Elasticsearch 中.


Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中.
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch.
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中.但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog.



配置技术栈

我们使用 docker 和 docker-compose 来配置和部署服务.为了简单起见,MySQL 直接使用了 root 作为用户名和密码,Kafka 和 Elasticsearch 使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境.


version: "3"
services:
  mysql:
image: mysql:⑤7
container_name: mysql
environment:
  MYSQL_ROOT_PASSWORD: root
  MYSQL_DATABASE: app
ports:
  - 3306:3306
volumes:
  - mysql:/var/lib/mysql
  zookeeper:
image: bitnami/zookeeper:③⑥2
container_name: zookeeper
ports:
  - 2181:2181
volumes:
  - zookeeper:/bitnami
environment:
  - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
image: bitnami/kafka:2.⑦0
container_name: kafka
ports:
  - 9092:9092
volumes:
  - kafka:/bitnami
environment:
  - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  - ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
  - zookeeper
  elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:⑦11.0
container_name: elasticsearch
environment:
  - discovery.type=single-node
volumes:
  - elasticsearch:/usr/share/elasticsearch/data
ports:
  - 9200:9200
volumes:
  mysql:
driver: local
  zookeeper:
driver: local
  kafka:
driver: local
  elasticsearch:
driver: local



在服务启动成功后我们需要为 Elasticsearch 创建索引,今天这一节我们直接使用 curl 调用 Elasticsearch 的 RESTful API,也可以使用 busybox 基础镜像创建服务来完成这个步骤.


# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
"properties": {
  "searchable": {
    "type": "nested",
    "properties": {
      "title": {
        "type": "text"
      },
      "content": {
        "type": "text"
      }
    }
  },
  "metadata": {
    "type": "nested",
    "properties": {
      "tenant_id": {
        "type": "long"
      },
      "type": {
        "type": "integer"
      },
      "created_at": {
        "type": "date"
      },
      "created_by": {
        "type": "keyword"
      },
      "updated_at": {
        "type": "date"
      },
      "updated_by": {
        "type": "keyword"
      }
    }
  },
  "raw": {
    "type": "nested"
  }
}
  }
}'



核心代码实现(SpringBoot ◆ Kotlin)

Binlog 采集端:


    override fun run() {
    client.serverId = properties.serverId
    val eventDeserializer = EventDeserializer()
    eventDeserializer.setCompatibilityMode(
        EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
    )
    client.setEventDeserializer(eventDeserializer)
    client.registerEventListener {
        val header = it.getHeader()
        val data = it.getData()
        if (header.eventType == EventType.TABLE_MAP) {
            tableRepository.updateTable(Table.of(data as TableMapEventData))
        } else if (EventType.isRowMutation(header.eventType)) {
            val events = when {
                EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                else -> emptyList()
            }
            logger.info("Mutation events: {}", events)
            for (event in events) {
                kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
            }
        }
    }
    client.connect()
}




Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
"id": 1,
"title": "Foo",
"content": "Bar"
}



随后我们将收集到的事件发送到 Kafka 中,并由 Event Processor 进行消费处理.

事件处理器


@Component
class KafkaBinlogTopicListener(
val binlogEventHandler: BinlogEventHandler
) {

companion object {
    private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
}

private val objectMapper = jacksonObjectMapper()

@KafkaListener(topics = ["binlog"])
fun process(message: String) {
    val binlogEvent = objectMapper.readValue(message)
    logger.info("Consume binlog event: {}", binlogEvent)
    binlogEventHandler.handle(binlogEvent)
}
}



首先使用SpringBoot Message Kafka提供的注解对事件进行消费,此时此刻呢将事件委托到binlogEventHandler去进行处理.实际上BinlogEventHandler是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过 Spring Bean 的方式注入到KafkaBinlogTopicListener中.


@Component
class ElasticsearchIndexerBinlogEventHandler(
val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
override fun handle(binlogEvent: BinlogEvent) {
    val payload = binlogEvent.payload as Map<*, *>
    val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
    // Should delete from Elasticsearch
    if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
        val deleteRequest = DeleteRequest()
        deleteRequest
            .index("search")
            .id(documentId)
        restHighLevelClient.delete(deleteRequest, DEFAULT)
    } else {
        // Not ever WRITE or UPDATE, just reindex
        val indexRequest = IndexRequest()
        indexRequest
            .index("search")
            .id(documentId)
            .source(
                mapOf(
                    "searchable" to mapOf(
                        "title" to payload["title"],
                        "content" to payload["content"]
                    ),
                    "metadata" to mapOf(
                        "tenantId" to payload["tenantId"],
                        "type" to payload["type"],
                        "createdAt" to payload["createdAt"],
                        "createdBy" to payload["createdBy"],
                        "updatedAt" to payload["updatedAt"],
                        "updatedBy" to payload["updatedBy"]
                    )
                )
            )
        restHighLevelClient.index(indexRequest, DEFAULT)
    }
}
}



今天这一节我们只需要简单地判断是否为删除操作就可以,如果是删除操作需要在 Elasticsearch 中将数据删除,而如果是非删除操作只需要在 Elasticsearch 重新按照为文档建立索引即可.这段代码简单地使用了 Kotlin 中提供的 mapOf 方法对数据进行映射,如果需要其他复杂的处理只需要按照 Java 代码的方式编写处理器即可.

总结

其实 Binlog 的处理部分有很多开源的处理引擎,包括 Alibaba Canal,本文使用手动处理的方式也是为其他使用非 MySQL 数据源的同学类似的解决方案.大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎!

以上就是土嘎嘎小编为大家整理的基于Kafka和Elasticsearch构建实时站内搜索功能的实践相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章