Elasticsearch


1. 全文搜索引擎

Google,百度类的网站搜索,它们都是根据网页中的关键字生成索引,我们在搜索的时候

输入关键字,它们会将该关键字即索引匹配到的所有网页返回;还有常见的项目中应用日志的搜索等等。对于这些非结构化的数据文本,关系型数据库搜索不是能很好的支持(虽然mysql5.7以后支持了全文索引(FULLTEXT),但是全文索引对mysql的数据库的压力是很大的)。

一般传统数据库,全文检索都实现的很鸡肋,因为一般也没人用数据库存文本字段。进行全文检索需要扫描整个表,如果数据量大的话即使对 SQL 的语法优化,也收效甚微。

如果建立了索引,但是维护起来也很麻烦,对于 insert 和 update 操作都会重新构建索引。

基于以上原因可以分析得出,在一些生产环境中,使用常规的搜索方式,性能是非常差的:

  • 搜索的数据对象是大量的非结构化的文本数据。

  • 文件记录量达到数十万或数百万个甚至更多。

  • 支持大量基于交互式文本的查询。

  • 需求非常灵活的全文搜索查询。

  • 对高度相关的搜索结果的有特殊需求,但是没有可用的关系数据库可以满足

  • 对不同记录类型、非文本数据操作或安全事务处理的需求相对较少的情况。

为了解决结构化数据搜索和非结构化数据搜索性能问题,我们就需要专业,健壮,强大的全文搜索引擎;

这里说到的全文搜索引擎指的是目前广泛应用的主流搜索引擎。它的工作原理是计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。


2. ElasticSearch介绍

​ The Elastic Stack, 包括 Elasticsearch、Kibana和 Logstash(也称为 ELK

Stack)。能够安全可靠地获取任何来源、任何格式的数据,然后实时地对数据进行搜索、分析和可视化。Elaticsearch,简称为 ES,ES 是一个开源的高扩展的分布式全文搜索引擎,是整个 Elastic Stack 技术栈的核心。它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB 级别的数据


3. 如果用数据库做搜索会怎么样?

1
在软件开发里面,数据都是存储在数据库里面的,比如电商网站的商品信息,员工的信息等等,如果从员工角度去做搜索功能,我们会这么设计:
name age sex birthday
张三 20 1993-08-09
张三三 22 1992-09-02
李四 25 1992-09-01
1
select * from employee where name like '张三%' 

以上会存在几个问题:

  1. 如果表记录上千万上亿了这个性能问题,另外一个如果有一个本文字段要在里面模糊配置,这个就会出现严重的性能问题
  2. 还不能将搜索词拆分开来,比如上面这个只能搜索名字是“张三”开头的员工,如果想搜出“张小三”那是搜索不出来的。
    ==总体来说,用数据库来实现搜索,是不太靠谱的,通常性能也会很差==

4. ElasticSearch解决了哪些问题?

1
Lucene是单机的模式,如果你的数据量超过了一台物理机的容量,你需要扩容,将数据拆分成2份放在不同的集群,这个就是典型的分布式计算了。需要拷贝容错,机器宕机,数据一致性等复杂的场景,这个实现就比较复杂了

ES解决了这些问题
1、自动维护数据的分布到多个节点的索引的建立,还有搜索请求分布到多个节点的执行
2、自动维护数据的冗余副本,保证了一旦机器宕机,不会丢失数据
3、封装了更多高级的功能,例如聚合分析的功能,基于地理位置的搜索


5. ElasticSearch的功能

  • 分布式的搜索引擎和数据分析引擎
    • 搜索:网站的站内搜索,IT系统的检索
    • 数据分析:电商网站,统计销售排名前10的商家
  • 全文检索,结构化检索,数据分析
    • 全文检索:我想搜索商品名称包含某个关键字的商品
    • 结构化检索:我想搜索商品分类为日化用品的商品都有哪些
    • 数据分析:我们分析每一个商品分类下有多少个商品
  • 对海量数据进行近实时的处理
    • 分布式:ES自动可以将海量数据分散到多台服务器上去存储和检索
    • 海量数据的处理:分布式以后,就可以采用大量的服务器去存储和检索数据,自然而然就可以实现海量数据的处理了
    • 近实时:检索数据要花费1小时(这就不要近实时,离线批处理,batch-processing);在秒级别对数据进行搜索和分析,数据更新在Elasticsearch中几乎是完全同步的。
    • Restful风格,一切API都遵循Rest原则,容易上手

6. ElasticSearch的应用场景

  • 自己做搜索引擎
  • 新闻网站
  • Stack Overflow(国外的程序异常讨论论坛)
  • GitHub(开源代码管理)
  • 电商网站
  • 日志数据分析
  • BI系统
  • 站内搜索
  • 新浪使用es处理32亿条实时日志

7. ElasticSearch的特点

  • 可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据,服务大公司;也可以运行在单机上,服务小公司
  • Elasticsearch不是什么新技术,主要是将全文检索、数据分析以及分布式技术,合并在了一起
  • 对用户而言,是开箱即用的,非常简单,作为中小型的应用,直接3分钟部署一下ES
  • Elasticsearch作为传统数据库的一个补充,比如全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理

8. ElasticSearch的安装

8.1 准备工作

8.1.1 进程并发数不足

1
[1]: max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]

首先用root用户登录。

然后修改配置文件:

1
vim /etc/security/limits.conf

添加下面的内容:

1
2
3
4
* soft nofile 65536
* hard nofile 131072
* soft nproc 4096
* hard nproc 4096

8.1.2 单个进程中的内存大小

1
[2]: max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144] 

vm.max_map_count:限制一个进程可以拥有的VMA(虚拟内存区域)的数量,继续修改配置文件

vim /etc/sysctl.conf

添加下面内容:

1
vm.max_map_count=655360

执行以下命令:

1
sysctl -p

8.2 elasticsearch单机版的安装

1
ElasticSearch不允许使用超级管理员用户来进行操作,需要创建普通用户
  • 准备安装包

    elasticsearch-7.16.0-linux-x86_64.tar.gz

  • 创建elasticsearch的用户

    1
    2
    3
    useradd es #创建用户   cat /etc/passwd:查看所有用户  userdel xx -r:删除用户并且删除用户对应的家目录
    passwd es #配置密码
    su es: #切换用户
  • 上传安装包,解压(使用es用户)

    1
    2
    3
    4
    #解压es到当前的家目录
    tar -zxvf elasticsearch-7.16.0-linux-aarch64.tar.gz -C /home/es/
    #更改目录名称
    mv elasticsearch-7.16.0 elasticsearch
  • 修改配置 jvm.options

    Elasticsearch基于Lucene的,而Lucene底层是java实现,因此我们需要配置jvm参数。

    1
    默认为两个参数的值都为4g
    1
    2
    -Xms512m
    -Xmx512m
  • 修改配置 elasticsearch.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    cluster.name: my-elasticsearch #集群名称
    node.name: es-node01 #节点名称
    path.data: /home/es/elasticsearch/data # 数据目录位置(没有此目录则会自动创建)
    path.logs: /home/es/elasticsearch/logs # 日志目录位置(没有此目录则会自动创建)
    network.host: 0.0.0.0 #能通过本机所有的ip来访问,一旦配置了0.0.0.0则为生产模式,必须要配置cluster.initial_master_nodes来进行引导初始化集群(生产模式需要进行集群初始化引导)
    http.port: 9200 #http的通信端口
    discovery.seed_hosts: ["10.10.10.101:9300"] #参与选举主节点的主机列表(除了第一次启动外,每次启动集群时都加载)---->注意:有的平台配置主机名无效,需要配置ip地址
    cluster.initial_master_nodes: ["10.10.10.101:9300"] #参与选举主节点的主机列表(仅仅在第一次启动集群时加载,用来初始化集群数据)---->注意:有的平台配置主机名无效,需要配置ip地址
    xpack.security.enabled: false #禁用安全校验(不禁用kibana会一直有提示安全警告),可以不配置

8.3 elasticsearch集群的安装

搭建es的集群,节点的数量一定要>1的奇数个机器(因为内部进行投票选举主节点);

如果要做集群部署,只需要在配置文件中修改节点名称并且添加其它节点信息即可。

1
2
3
4
5
#修改每个机器的以下两项即可
node.name: es-node02 #节点名称(每个从节点都不一样)
discovery.seed_hosts: ["10.10.10.101:9300", "10.10.10.102:9300", "10.10.10.103:9300"]
cluster.initial_master_nodes: ["10.10.10.101:9300", "10.10.10.102:9300", "10.10.10.103:9300"]
~~~

如果是在单机版的基础上复制安装的集群一定要删除拷贝到其他机器的data目录


9. ElasticSearch的其他配置信息

属性名 说明
cluster.name 配置elasticsearch的集群名称,默认是elasticsearch。建议修改成一个有意义的名称。
node.name 节点名,es会默认随机指定一个名字,建议指定一个有意义的名称,方便管理
node.master 默认为true(可以被当选为主节点)
path.conf 设置配置文件的存储路径,tar或zip包安装默认在es根目录下的config文件夹,rpm安装默认在/etc/ elasticsearch
path.data 设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开
path.logs 设置日志文件的存储路径,默认是es根目录下的logs文件夹
path.plugins 设置插件的存放路径,默认是es根目录下的plugins文件夹
bootstrap.memory_lock 设置为true可以锁住ES使用的内存,避免内存进行swap
network.host 设置bind_host和publish_host,设置为0.0.0.0允许外网访问
http.port 设置对外服务的http端口,默认为9200。
transport.tcp.port 集群结点之间通信端口 默认为9300
discovery.zen.ping.timeout 设置ES自动发现节点连接超时的时间,默认为3秒,如果网络延迟高可设置大些
discovery.seed_hosts (7.x版本之后的配置)参与选举的种子主机列表,也就是说集群的主机地址列表,格式为:主机:端口; (7.x版本之前的配置为 discovery.zen.ping.unicast.hosts)
discovery.seed_providers 也是参与选举的种子主机列表,只不过是文件方式的
cluster.initial_master_nodes (7.x版本之后的配置)集群初始化时参与选举的种子主机列表; (7.x版本之前的配置为 minimum_master_nodes min_master_count)

10. 启动ElasticSearch

  • 配置环境变量 /home/es/.bash_profile

    1
    2
    export ES_HOME=/home/es/elasticsearch
    export PATH=$PATH:$ES_HOME/bin
    1
    2
    #es7.x以后的版本建议使用高版本的jdk,如果自己机器安装的jdk版本为1.8的,在启动时会有警告信息,需要配置es自带的jdk的环境变量  ES_JAVA_HOME
    export ES_JAVA_HOME=/home/es/elasticsearch/jdk
  • 执行运行指令

    1
    2
    elasticsearch
    elasticsearch -d
1
2
3
publish_address {10.10.10.61:9300}, bound_addresses {[::]:9300}
......
publish_address {10.10.10.61:9200}
1
可以看到绑定了两个端口:
  • 9300:集群节点间通讯端口
  • 9200:客户端访问端口

我们在浏览器中访问:http://10.10.10.101:9200/

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"name": "6wHCrFE", #如果不配置node-name则随机生成
"cluster_name": "elasticsearch",
"cluster_uuid": "zH219LmpTPidGsekhyIIFg",
"version": {
"number": "5.6.8",
"build_hash": "688ecce",
"build_date": "2018-02-16T16:46:30.010Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
},
"tagline": "You Know, for Search"
}

11. es中的倒排索引

词条(term): 把一句话按照一定的规则,相邻的一个或者多个字组成的词语称为词条;

image-20220424163449626

正向索引: 根据id找数据;

倒排索引: 根据词条找数据id;


12. ES的客户端工具

12.1 Elasticsearch-Header

这个是一个es的插件,需要我们在es中安装此插件;也可以安装一个浏览器的插件来使用;

这个工具对高版本的es的支持不是很好,在新版本中es的官方已经推荐我们使用kibana来进行es的可视化操作;


12.2 Http的客户端

  • postman
  • insomnia

12.3 Kibana插件

1
2
3
Kibana是一个基于Node.js的Elasticsearch索引库数据统计工具,可以利用Elasticsearch的聚合功能,生成各种图表,如柱形图,线状图,饼图等

而且还提供了操作Elasticsearch索引数据的控制台,并且提供了一定的API提示,非常有利于我们学习Elasticsearch的语法。

安装kibana

1
2
3
因为Kibana依赖于node,我们的虚拟机没有安装node,而window中安装过。所以我们选择在window下使用kibana。

kibana的版本最好保持和elasticsearch的版本一致

安装node环境

​ 准备安装包: node-v16.14.2-linux-arm64.tar.xz(.tar.xz是二次压缩格式)

​ 解压缩: tar -Jxvf node-v16.14.2-linux-arm64.tar.xz

​ 配置环境变量: ~/.bash_profile

准备kibana的安装包

1
kibana-7.16.0-linux-aarch64.tar.gz

解压安装包

1
tar -zxvf kibana-7.16.0-linux-aarch64.tar.gz -C /home/es/

配置kibana (kibana.yml)

1
2
3
4
elasticsearch.hosts: ["http://10.10.10.101:9200"]
server.port: 5601
server.host: "0.0.0.0"
i18n.locale: "zh-CN"

运行

1
2
kibana  
kibana --allow-root(使用root用户启动)

m1芯片的centos_arm64版本可能会出现错误

1
Error: /lib64/libstdc++.so.6: version `GLIBCXX_3.4.20' not found...
1
2
#解决方案(把es中的可用的动态库文件加入到系统中,替换系统的老版本的动态库)
cp /home/es/elasticsearch/modules/x-pack-ml/platform/linux-aarch64/lib/libstdc++.so.6 /usr/lib64/

访问kibana

1
2
3
kibana的监听端口默认是5601

我们访问:http://10.10.10.101:5601

13. ES中的名词解释

索引 (index): 也称为索引库,ElasticSearch存储数据的地方,可以理解成关系型数据库中的数据库概念

**映射 (mapping):**mapping定义了每个字段的类型、字段所使用的分词器等。相当于关系型数据库中的表结构

**文档 (document):**Elasticsearch中的最小数据单元,常以json格式显示。—个document相当于关系型数据库中的一行数据

**倒排索引:**一个倒排索引由文档中所有不重复词的列表抅成,对于其中每个词,对应一个包含它的文档id列表

**类型 (type):**一种type就像一类表。如用户表、角色表等;在Elasticsearch7.X默认type为_doc;
ES 5.x中一个index可以有多种type;

ES 6.x中—个index只能有一种type;

ES 7.x以后,将逐步移除type这个概念,现在的操作已经不再使用,默认 _doc


14. RESTFul方式

**REST (Representational State Transfer)**,表述性状态转移,是一组架构约束条件和原则。满足这些约束条件和原则的应用程序或设计就是RESTful。就是一种定义接口的规范

  • 基于HTTP
    可以使用XML格式定义或JSON格式定义。

  • 每一个URl代表1种资源
    客户端使用GET、POST、 PUT、DELETE4个表示操作方式的动词对服务端资源进行操作:

    GET: 用来获取资源

    POST: 用来新建资源(也可以用于更新资源)

    PUT:用来更新资源

    DELETE:用来删除资源

    1
    2
    GET /user/1  :查询用户
    DELETE /user/1: 删除用户

15. 分词器

分词器 (Analyzer): 将一段文本,按照一定逻辑,分析成多个词语的一种工具

如:华为手机 —>华为、手、手机


ElasticSearch 内置的分词器

  • Standard Analyzer-默认分词器,按词切分,小写处理
  • Simple Analyzer- 按照非字母切分(符号被过滤),小写处理
  • Stop Analyzer - 小写处理,停用词过滤(the,a,is)
  • Whitespace Analyzer- 按照空格切分,不转小写
  • Keyword Analyzer- 不分词,直接将输入当作输出
  • Patter Analyzer-正则表达式,默认(W+(非字符分割)
  • Language -提供了30多种常见语言的分词器

总结: 以上的Elasticsearch系统默认的分词器对中文非常不友好,一个汉字一个词,我们一般不会使用默认的分词器;


ik分词器

1
2
3
4
IKAnalyzer是一个开源的,基于java语言开发的轻量级的中文分词工具包
是一个基于Maven构建的项目
具有60万字/秒的高速处理能力
支持用户词典扩展定义
1
Lucene的IK分词器早在2012年已经没有维护了,现在我们要使用的是在其基础上维护升级的版本,并且开发成ElasticSearch的插件了,与Elasticsearch一起维护升级,版本也保持一致.
  • 准备elasticsearch的ik分词器插件

    1
    elasticsearch-analysis-ik-7.16.0.zip
  • 解压分词器插件

    1
    2
    3
    4
    #在插件目录中新建分词器目录
    mkdir /home/es/elasticsearch/plugins/ik-analyzer
    #解压分词器插件
    unzip elasticsearch-analysis-ik-7.16.0.zip -d /home/es/elasticsearch/plugins/ik-analyzer/
  • 需要重启elasticsearch才能使得ik-analyzer生效


16. 测试分词器

16.1 测试内置默认的分词器

1
2
3
4
GET _analyze
{
"text": "你好 hello java"
}
1
2
3
4
5
GET _analyze
{
"text": "你好 hello java",
"analyzer": "standard" #不指定分词器则使用的就是默认的standard分词器
}
  • 默认英文一个单词一个分词
  • 默认一个中文汉字一个分词

16.2 测试ik分词器

1
2
3
4
5
GET _analyze
{
"analyzer": "ik_max_word", #分词的模式有两种 ik_smart 和 ik_max_word
"text": "我是中国人"
}

17. 索引(index)的操作

17.1 创建索引库

非结构化创建索引(不指定mappings)

  • 请求方式:PUT

  • 请求路径:http://10.10.10.101:9200/索引库名

  • 请求参数:json格式(创建索引库可以没有请求参数)

    • settings:索引的设置
      • number_of_shards:分片数量(7.x之前不配置默认为5; 7.x之后默认为1)
      • number_of_replicas:副本数量(默认为1)
1
2
3
4
5
6
7
http://10.10.10.101:9200/索引库名  #没有请求参数也可以创建索引库
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}

结构化创建索引(指定mappings)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PUT index02
{
"mappings": {
"properties": {
"id":{
"type": "long"
},
"name":{
"type": "text"
},
"age":{
"type": "integer",
}
}
}
}

17.2 查看索引库的属性

查看具体索引库的语法

1
GET 索引库名

查看所有的索引库的语法

1
2
GET *
GET _all

17.3 删除索引库

语法

1
DELETE 索引库名

17.4 关闭索引库

1
2
#关闭索引库之后可以查看索引库的元信息,但不能再进行其他操作了
POST index01/_close

17.5 打开索引库

1
POST  index01/_open

18. ES中的数据类型

简单数据类型

字符串类型

​ text: 会分词,不支持聚合

​ keyword: 不会分词,将全部内容作为一个词条,支持聚合

数值类型

数据类型 描述
long 带符号的64位整数
integer 带符号的32位整数
short 带符号的16位整数
byte 带符号的8位整数
double 双精度64位浮点数
float 单精度32位浮点数
half_float 半精度16位浮点数
scaled_float 由a支持的有限浮点数long,由固定的double比例因子缩放

布尔值

​ boolean类型

二进制类型

​ binary类型

范围类型

1
integer_range, float_range, long_range, double_range, date_range

日期

date类型


复杂数据类型

​ 数组类型:[]

​ 对象类型: {}


其实我们也可以按照能否进行分词来给数据类型进行分类:

  • 整体类型: 非text类型的
  • 可分词类型: text

19. 映射(mappings)的操作

添加映射

在创建索引(库)的时候直接指定映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PUT /index01
#在此不能出现空行 要不然下面的映射信息无效
{
"mappings": {
"properties": {
"name":{
"type": "text"
},
"age":{
"type": "integer"
}
}
}
}

单独为索引添加映射

1
2
#先添加索引库
PUT index01
1
2
3
4
5
6
7
8
9
10
11
12
13
#为索引库添加映射
PUT /index01/_mapping
{
"properties": {
"name":{
"type": "text"
},
"age":{
"type": "integer"
}
}

}

查询映射

1
2
#只查询映射
GET /index01/_mapping
1
2
#查询索引库的全部信息
GET /index01

添加字段

1
2
3
4
5
6
7
8
PUT /index01/_mapping   
{
"properties": {
"sex":{
"type":"keyword"
}
}
}

20. 文档(document)的操作

20.1 添加文档

1
2
3
4
5
6
#添加文档(指定id) PUT和POST都可以
PUT index01/_doc/1
{
"name":"张三",
"age":20
}
1
2
3
4
5
6
#添加文档(不指定id)
POST index01/_doc
{
"name":"李四",
"age":30
}

20.2 修改文档

1
2
3
4
5
#修改文档(整个文档都会被替换成以下的新文档)
PUT index01/_doc/1
{
"name":"张三1"
}

20.3 删除文档

1
DELETE index01/_doc/1

20.4 查询文档

1
2
3
4
5
#查询索引库index01中的全部文档
GET index01/_search

#查询索引库index01中指定id的文档
DELETE index01/_doc/1

20.5 批量操作

Bulk 批量操作是将文档的增删改查一些列操作,通过一次请求全都做完。减少网络传输次数(批量操作中的各个单元互相不受影响)

语法:

1
2
3
POST /_bulk
("action":{"metadata")}
{"data"}
1
2
3
4
POST _bulk
{"delete":{"_index":"index01","_id":"1"}}
{"create":{"_index":"index01","_id":"2"}}
{"name":"admin","age":20}

21. 文档的查询

查询所有

es默认查询10条数据

1
2
3
4
5
6
7
8
GET index01/_search
#与下面的查询等价
GET index01/_search
{
"query": {
"match_all": {}
}
}

词条(term)查询

term词条查询: 比较呆板,无论查询条件是什么类型, 查询的条件就是一个词条,不会对查询条件进行分词,直接去索引库中去查询;

term词条查询只支持一个字段查询

1
2
3
4
5
6
7
8
9
10
GET shop/_search
{
"query": {
"term": {
"brandName": {
"value": "华为" #不会对这个条件值进行分词,直接去倒排索引中查询
}
}
}
}

term一般查询 keyword类型

terms: 多个条件匹配一个字段,取并集


匹配(match)查询

math匹配查询:

  1. 首先判断查询条件中的字段的类型: (text | 非text) 类型

  2. 知道类型之后:

    1. 如果是整体(非text)类型,则不会对条件字段分词,进行精准查询(功能与term一致);
    2. 如果是text类型,那么就看text类型指定的分词器是哪个(如果没有指定分词默认为standard分词器),然后对查询条件的字段使用该分词器进行分词,再去倒排索引查询,随后取分词查询的并集返回

math匹配查询只支持一个字段

1
2
3
4
5
6
7
8
9
#match(分词匹配查询)
GET shop/_search
{
"query": {
"match": {
"name": "我想买个华为手机" #会对这个查询条件进行分词,分词器使用mappings上绑定的分词器
}
}
}

范围(range)查询

范围(range)查询: 查找指定字段在指定范围内包含的值,只能查询一个字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#范围查询
GET shop/_search
{
"query": {
"range": {
"id": {
"gte": 1,
"lte": 4
}
}
},
"sort": [ #可以对查询结果进行排序
{
"id": {
"order": "desc"
}
}
]
}

QueryString查询

QueryStrin查询: math查询的规范+匹配到多个字段(多个字段的关系的操作符为OR)

1
2
3
4
5
6
7
8
9
10
11
GET index01/_search
{
"query": {
"query_string": {
"fields": ["title","content"],
"query": "华为手机" # 对 华为手机 进行分词
#"query": "华为 AND 手机" # 既包含 华为 又包含 手机 的
#"query": "华为 OR 手机" # 包含 华为 或者包含 手机 的
}
}
}

simple_query_string查询:与query_string类似,但是不支持and 和or的连接符,会把and和or当成普通的单词;


布尔(bool)查询

布尔查询: 其实就是对多个条件进行连接,然后查询;

must和should一般不进行结合使用;

bool查询的连接方式有以下几种:

  • must (and):条件必须成立
  • must not (not):条件必须不成立
  • should (or):条件可以成立
  • filter:条件必须成立。不会计算得分,一般用来对上面三个操作的结果进行过滤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
GET shop/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"name": "华为手机"
}
},
{
"match": {
"brandName": "华为"
}
}
],
"filter": [ //filter相当于给上面的查询结果,再次 and
{
"range": {
"id": {
"gt": 1,
"lte": 2
}
}
}
]
}
}
}

这几个操作符可以嵌套使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#and or and
GET shop/_search
{
"query": {
"bool": {
"should": [
{
"bool": {
"must": [
{
"match": {
"name": "手机"
}
}
]
}
},
{
"bool": {
"must": [
{
"match": {
"brandName": "索尼"
}
}
]
}
}
]
}
}
}

聚合查询

  • 指标聚合: 相当于MySQL的聚合函数。max、 min、 avg、 sum等
  • 桶聚合:相当于MySQL的 group by 操作。不要对text类型的数据进行分组,会失败
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#指标聚合
GET index01/_search
{
"query": {
"match": {
"name": "手机"
}
},
"aggs": {
"my_age_sum": { #起一个名字(my_age_sum)接收返回值
"sum": {
"field": "age"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#桶聚合
GET index01/_search
{
"query": {
"match": {
"name": "手机"
}
},
"aggs": {
"brand_name_list": {
"terms": {
"field": "brandName",
"size": 10
}
}
}
}

高亮查询

高亮三要素:

  • 高亮字段

  • 前缀

  • 后缀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET index01/_search
{
"query": {
"match": {
"name": "4g手机"
}
},
"highlight": {
"fields": {
"name":{
"pre_tags": "<font color='red'>",
"post_tags": "</font>"
}
}
}
}

22. 索引别名和重建索引

随着业务需求的变更,索引的结构可能发生改变。

ElasticSearch的索引—旦创建,只允许添加字段,不允许改变字段(因为改变字段,需要重建倒排索引,影响内部缓存结构,性能太低)

我们需要重新创建一个索引,把原来的索引数据导入到新的索引(重建索引)即可;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 先重新创建一个索引
PUT index02
{
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "ik_smart"
},
"age":{
"type": "integer"
}
}
}
}

#把原来索引库中的数据进行拷贝
POST _reindex
{
"source": {
"index": "index01"
},
"dest": {
"index": "index02"
}
}

#查看新索引中的数据
GET index02/_search

#可以直接给新的索引 index02指定别名,这样做就可以不用修改java中的代码了(先删除原来的索引)

#删除老的索引库
DELETE index01
#给新的索引起别名
POST index02/_alias/index01

23. 集群的原理及其参数优化

在创建索引时,如果不指定分片配置,则默认在es7.x之后,分片的数量为1,副本的数量也为1,(在es7.x之前默认分片的数量为5,副本的数量为1)
在创建索引时,可以通过settings设置分片

1
2
3
4
“settings“:{
"number_of_shards":3,
"number_of_replicas":1:
}

分片与自平衡:当节点挂掉后,挂掉的节点分片会自平衡到其他节点中

在Elasticsearch 中,每个查询在每个分片的单个线程中执行,但是,可以并行处理多个分片。

分片数量一旦确定好,不能修改。
索引分片推荐配置方案:

每个分片推荐大小10-30GB

分片数量推荐=节点数量*1~3倍


24. ES中的路由原理

文档存入对应的分片,ES计算分片编号的过程,称为路由。

Elasticsearch 是怎么知道一个文档应该存放到哪个分片中呢?

查询时,根据文档id查询文档,Elasticsearch又该去哪个分片中查询数据呢?

路由算法:shard index = hash(id) % number_of_primary_shards(主分片的数量)


25. ES集群的脑裂现象

脑裂的概念

一个正常es集群中只有一个主节点(Master),主节点负责管理整个集群;如创建或删除索引,跟腙哪些节点是群集的一部分,并决定哪些分片分配给相关的节点。

集群的所有节点都会选择同一个节点作为主节点。

脑裂问题的出现就是因为从节点在选择主节点上出现分岐导致一个集群出现多个主节点从而使集群分裂,使得集群处于异常状态。


引发脑裂的原因

  1. 网络原因:网络延迟较高
  2. 节点负载:主节点的角色既为master又为data
  3. JVM内存回收:JVM内存设置太小

解决脑裂

==对不起,如果出现脑裂问题无法解决,重启集群吧!==


避免出现脑裂问题

1
2
3
4
5
6
7
8
9
1. 网络原因:discovery.zen.ping.timeout 超时时间配置大一点.默认是3S
2. 节点负载:角色分离策略
候选主节点配置为
node.master: true #可以作为master的选举节点参与选举
node.data: false #可以存储数据
数据节点配置为
node.master: false
node.data: true
3. JVM内存回收:修改 config/jvm.options文件的 -Xms 和 -Xmx 为服务器的内存一半.

26. 集群的动态扩容

开启一个新的主机,搭建es环境即可,es的集群有自动发现的机制


27. ES的java客户端

我们通过java客户端访问es,提供了两种方式

  • TransportClient

    1
    Elasticsearch从7.0.0版本开始TransportClient已经过时了不再推荐使用,将在8.0版本删除,具体参考https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html。
  • RestHighLevelClient

    1
    Elasticsearch7.x之后就推荐使用这个高级别的客户端了
  • Elasticsearch Java API Client

    1
    在Es7.15版本之后,es官方将它的高级客户端RestHighLevelClient标记为弃用状态。同时推出了全新的java API客户端Elasticsearch Java API Client,该客户端也将在Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。

28. SpringBoot整合ES

导入maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
</dependencies>

注意:每个SpringBoot版本内置的elasticsearch-rest-high-level-client的版本都是不同的,如果SpringBoot内置的elasticsearch-rest-high-level-client的版本和我们服务器的es版本不相同,可能会出现一些兼容性问题,需要尽量保持一致;我们在项目中可以定义一个maven属性,SpringBoot如果发现有两个相同的属性就会优先加载我们自定义的maven属性;

1
2
3
<properties>
<elasticsearch.version>7.16.0</elasticsearch.version>
</properties>

自己创建es的客户端对象(不推荐使用)

1
2
3
4
5
6
7
8
//创建高级别的es的客户端
HttpHost httpHost = new HttpHost("10.10.10.101", 9200, "http");
RestClientBuilder builder = RestClient.builder(httpHost);
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);//RestHighLevelClient IDEA工具有可能没有代码提示此类,不用管,正常写import导入包就行了
CreateIndexRequest createIndexReq = new CreateIndexRequest("index100");
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexReq, RequestOptions.DEFAULT);
System.out.println(createIndexResponse);
restHighLevelClient.close();

配置es的服务器信息:

1
2
3
4
5
6
7
spring:
elasticsearch:
rest:
uris:
- http://10.10.10.101:9200
- http://10.10.10.102:9200
- http://10.10.10.103:9200

注意: 下面导入包的时候一定要注意jar包: 不要导入低级别的客户端的jar包,要导入rest高级别的客户端的jar包,要不然会出现api特别难用的问题(因为可能导入低级别的客户端的jar包)


28.1 索引的操作

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CreateIndexRequest createIndexReq = new CreateIndexRequest("index01");
String mappingString = "{\n" +
" \"properties\": {\n" +
" \"id\":{\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"name\":{\n" +
" \"type\":\"text\",\n" +
" \"analyzer\": \"ik_smart\"\n" +
" },\n" +
" \"brandName\":{\n" +
" \"type\":\"keyword\"\n" +
" }\n" +
" }\n" +
"}";
createIndexReq.mapping(mappingString, XContentType.JSON);
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexReq, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.isAcknowledged());

获取索引信息

1
2
3
4
5
GetIndexResponse index01 = restHighLevelClient.indices().get(new GetIndexRequest("index01"), RequestOptions.DEFAULT);
Map<String, MappingMetaData> mappings = index01.getMappings();
MappingMetaData index01MetaData = mappings.get("index01");
Map<String, Object> sourceAsMap = index01MetaData.getSourceAsMap();
System.out.println(sourceAsMap);

删除索引

1
2
DeleteIndexRequest deleteIndexRequest =  new DeleteIndexRequest("index01");
restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);

28.2 文档的操作

添加文档

1
2
3
4
5
6
Shop shop = new Shop();
shop.setId(1);
shop.setName("苹果笔记本电脑");
shop.setBrandName("apple");
IndexRequest indexRequest = new IndexRequest("index01").id(String.valueOf(shop.getId())).source(JSON.toJSONString(shop), XContentType.JSON);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

修改文档

注意: api的修改文档与kibana有些不同,api修改如果字段少于es中的全量字段,则只修改提交的部分

1
2
3
4
5
6
Shop shop2 = new Shop();
shop2.setId(1);
shop2.setName("苹果笔记本电脑2");
shop2.setBrandName("apple2");
UpdateRequest updateRequest = new UpdateRequest("index01", "1").doc(JSON.toJSONString(shop2), XContentType.JSON);
restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT);

根据_id删除文档

1
2
3
DeleteRequest deleteRequest = new DeleteRequest("index01","1");
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(deleteResponse.status());

根据_id查询文档

1
2
3
4
GetRequest getRequest = new GetRequest("index01", "1");
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
String json = getResponse.getSourceAsString();
System.out.println(JSON.parseObject(json, Shop.class));

批量操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
BulkRequest bulkRequest = new BulkRequest();
Shop shop = new Shop();
shop.setId(1);
shop.setName("苹果笔记本电脑");
shop.setBrandName("apple");
IndexRequest indexRequest = new IndexRequest("index01").id(String.valueOf(shop.getId())).source(JSON.toJSONString(shop),XContentType.JSON);
//添加批处理
bulkRequest.add(indexRequest);

Shop shop2 = new Shop();
shop2.setId(1);
shop2.setName("苹果笔记本电脑2");
shop2.setBrandName("apple2");
UpdateRequest updateRequest = new UpdateRequest("index01", "1").doc(JSON.toJSONString(shop2), XContentType.JSON);
//添加批处理
bulkRequest.add(updateRequest);
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkResponse.status());

MatchAll查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//添加条件(matchAll:查询所有)
searchSourceBuilder.query(QueryBuilders.matchAllQuery());

//分组
searchSourceBuilder.from(2);
searchSourceBuilder.size(6);

SearchRequest searchRequest = new SearchRequest(new String[]{"index01"}, searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("总记录数:" + searchResponse.getHits().getTotalHits().value);
Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> {
System.out.println(hit.getSourceAsString());
});
}

Term查询

1
//term: 一个条件匹配到一个字段       searchSourceBuilder.query(QueryBuilders.termQuery("brandName","苹果"));

Terms查询

1
2
3
//terms多个条件匹配到一个字段(OR)

searchSourceBuilder.query(QueryBuilders.termsQuery("brandName","苹果","华为"));

Match查询

1
2
//默认的操作符位OR(并集) 
searchSourceBuilder.query(QueryBuilders.matchQuery("name","苹果手机").operator(Operator.OR));

QueryString查询

1
searchSourceBuilder.query(QueryBuilders.queryStringQuery("手机").field("name").field("brandName"));

Bool查询

1
searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("name","手机")).filter(QueryBuilders.termQuery("brandName","索尼")));

聚合查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

searchSourceBuilder.query(QueryBuilders.matchQuery("name", "手机"));
searchSourceBuilder.aggregation(AggregationBuilders.sum("myidSums").field("id"));

SearchRequest searchRequest = new SearchRequest(new String[]{"shop"}, searchSourceBuilder);

SearchResponse searchResponse =
restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("总记录数:" + searchResponse.getHits().getTotalHits().value);

Map<String, Aggregation> aggregationMap = searchResponse.getAggregations().asMap();

ParsedSum parsedSum = (ParsedSum) aggregationMap.get("myidSums");

System.out.println( parsedSum.getValue());

29. ElasticSearch数据同步解决方案

同步方案 原理说明 适用场景 使用限制 相关文档
DTS实现数据实时同步 DTS采用binlog方式实现数据同步,在不影响源数据库的情况下,同步延迟可降至毫秒级别。 对数据同步的实时性要求较高的场景。 DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库负载上升。支持自定义索引Mapping,但需保证Mapping中定义的字段与MySQL中一致。需要购买DTS数据同步作业。购买方式,请参见购买流程;定价说明,请参见产品定价 通过DTS将MySQL数据实时同步到阿里云Elasticsearch
Logstash JDBC数据同步 通过logstash-input-jdbc插件实现通过Logstash批量查询RDS中的数据,并将数据迁移到Elasticsearch。实现的本质是该插件会定期对RDS中的数据进行循环轮询,从而在当前循环中找到上次插入或更改的记录,然后批量查询这些记录并迁移至Elasticsearch。与DTS同步方案相比,该方案的实时性较差,存在秒级延迟。 同步全量数据,接受秒级延迟的场景。批量查询数据然后进行同步的场景。 使用前,需要先在Logstash中上传与RDS版本兼容的SQL JDBC驱动。需要在RDS的白名单中加入Logstash集群中节点的IP地址。需要确保Logstash和RDS在同一时区(避免同步过程中出现时间标记不符的情况)。需要确保Elasticsearch中的**_id字段与RDS中的id**字段相同。当您在RDS中插入或更新数据时,需要确保对应记录有一个包含更新或插入时间的字段。 通过Logstash将RDS MySQL数据同步至Elasticsearch
DataWorks实现离线数据同步 DataWorks是一款提供数据集成、数据开发及数据质量等全方位的产品服务。支持引入并存储关系型数据,然后进行转化和开发,最后将处理后的数据同步到Elasticsearch或其他数据系统。 大数据离线同步场景(可实现最快5分钟一次的离线数据采集任务)。需要自定义查询语句,以及多表联合查询后同步数据的场景。同步整个数据库中数据的场景。 需要开通DataWorks服务。对于传输速度要求较高或复杂环境中的数据源同步场景,需要自定义资源组。需要在RDS的白名单中添加资源组的IP地址。 通过DataWorks将MySQL数据同步至Elasticsearch
Canal实现MySQL数据同步 通过binlog方式实现数据实时同步及订阅。 对数据同步的实时性要求较高的场景。 需要手动在ECS上搭建Canal环境,会增加操作成本。Canal 1.1.4版本暂不支持Elasticsearch 7.x版本。建议使用Canal 1.1.5版本,或者通过其他方式(例如Logstash、DTS)实现MySQL数据同步。支持自定义索引Mapping,但需保证Mapping中定义的字段与MySQL中一致。 通过Canal将MySQL数据同步到阿里云Elasticsearch

Canal

​ 阿里巴巴B2B公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所
以衍生出了同步杭州和美国异地机房的需求,从2010年开始,阿里系公司开始逐步的尝试
基于数据库的日志解析,获取增量变更进行同步,由此行生出了增量订阅&消费的业务。

​ Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。
目前。Canal 主要支持了 MysQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)


30. Mysql的Binlog

30.1 什么是Binlog?

MysQL 的二进制日志可以说 MysQL 最重要的日志了,它记录了所有的 DDL 和 DML(除
了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MysQL 的二进
制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:
其一:MysQL Replication 在 Master 端开启 Binlog, Master 把它的二进制日志传递给 Slaves来达到 Master-Slave 数据一致的目的。

其二:自然就是数据恢复了,通过使用 MysQL Binlog 工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有
的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件


30.2 Binlog的分类

MysaL Binlog 的格式有三种,分别是 statement,row,mixed,在配置文件中可以选择配
binlog_format= statement|row|mixed

三种格式的区别:

  • statement:语句级,binlog 会记录每次一执行写操作的语句。相对row 模式节省空
    闻,但是可能产生不一致性,比如 “update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

    • 优点:节省空间。
    • 缺点:有可能造成数据不一致
  • row: 行级,binlog 会记录每次操作后每行记录的变化。

    • 优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
    • 缺点:占用较大空间。
  • mixed: statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement
    模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时:包含AUTO_INCREMENT 字段的表被更新时:执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理-

    • 优点:节省空间,同时兼顾了一定的一致性。
    • 缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对
      binlog 的监控的情况都不方便。

综合上面对比,Cannel 想做监控分析,选择 row格式最为合适。


31. mysql主从复制的过程

  1. Master 主库将改变记录,写到二进制日志(Binary Log)中

  2. Slave 从库向 MysQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log):

  3. Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。


32. 开启binlog

1
2
3
4
5
6
#查看binlog是否开启(mysql高版本8.x中的binlog默认是开启的)
show global variables like 'log_%';
#查看binlog的格式(mysql高版本8.x中的binlog格式为row)
show global variables like '%binlog_format%';
#查看binlog的过期时间 默认为30天
show global variables like '%logs%'

手动开启binlog /etc/my.cnf

1
2
3
server-id=1
log-bin=binlog
binlog_format=row

可以观察binlog日志文件: /var/lib/mysql/ ,有很多binlog.0000XXXX

1
show master status# 查看最新的binlog

33. 安装canal

环境要求

1
2
3
4
5
#当前环境
mysql 8.0.22
es7.16.0
canal.1.1.5
#mysql驱动的注意点: mysql-connector-java-5.1.48.jar是兼容型驱动,兼容5.x 和8.x,canal中使用的mysql驱动就是mysql-connector-java-5.1.48.jar

创建canal用户

1
2
3
4
5
#创建一个canal用户专门用来读取mysql的数据
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canalCANAL123...';

#授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

下载canal https://github.com/alibaba/canal/

1
2
canal.deployer-1.1.5.tar.gz
canal.adapter-1.1.5.tar.gz

33.1 安装canal服务端

1
2
3
4
#先创建目录
mkdir /opt/canal/depolyer
#解压
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal/depolyer/

配置canal-depolyer:

vim /opt/canal/depolyer/conf/canal.properties

1
2
3
4
5
6
#端口默认为11111
canal.port = 11111
# canal服务端把采集到的数据,输出的目标介质: tcp, kafka, rocketMQ, rabbitMQ 默认为tcp,不需要配置
canal.serverMode = tcp
# 默认的destination(目的地)为example实例
canal.destinations = example

配置canal-depolyer采集mysql数据的实例:

1
2
3
注意:
如果想要创建多个canal实例,一个canal服务中是可以有多个instance的,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。
默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 conf/canal.properties 中的 canal.destinations=实例1,实例2,实例3

编辑配置文件: vim /opt/canal/depolyer/conf/example/instance.properties

1
2
3
4
5
6
#配置此实例采集数据库的 ip:端口
canal.instance.master.address=10.10.10.104:3306

canal.instance.dbUsername=canal
canal.instance.dbPassword=canalCANAL123...
canal.instance.connectionCharset = UTF-8

启动canal-depolyer(server)

1
/opt/canal/depolyer/bin/startup.sh

查看日志canal是否启动并且加载实例(instance)成功:

1
cat /opt/canal/depolyer/logs/example/example.log

33.2 安装canal-adapter

canal-adapter是一个canal的适配器,其实就是canal的客户端,此客户端可以通过TCP把canal采集到的数据同步到不同的数据仓库中(eg: es);当然也可以根据canal提供的TCP的api来自己解析canal采集的数据.

1
2
mkdir  /opt/canal/adapter
tar -zxvf canal.adapter-1.1.5.tar.gz -C /opt/canal/adapter/

编辑canal-adapter的配置文件: vim /opt/canal/adapter/conf/application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500

srcDataSources:
defaultDS:
url: jdbc:mysql://10.10.10.104:3306/db04?useUnicode=true
username: canal
password: canalCANAL123...
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7 #es6 或者es7
hosts: 10.10.10.101:9200,10.10.10.102:9200,10.10.103:9200 #主机之间不能出现空格
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: myelasticsearch

配置表映射文件: /opt/canal/adapter/conf/es7/

在此目录中创建个表名一致的 表名.yml文件

1
2
3
4
5
6
7
8
9
10
11
dataSourceKey: defaultDS  # 源数据源的key, 对应adapater中application.yml配置的srcDataSources中的值
destination: example #cannal的instance或者MQ的topic
esMapping:
_index: article_index #es 的索引名称,需要先在es中手动创建索引
_id: _id #es 的_id, 如果不配置该项,必须配置下面的pk项,_id则会由es自动分配
# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
#mysql字段和es文档属性的对应关系
sql: "select a.id as _id, a.title, a.content, a.author,
a.release_time from article a"
etlCondition: # etl全量同步的条件 eg: where a.c_time>,空代表全部同步
commitBatch: 3000

启动adapter

1
2
3
4
5
6
7
sh /opt/canal/adapter/bin/start.sh
cat /opt/canal/adapter/logs/adapter/adapter.log 发现出错了
....
com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
.....
以上这个错误是因为canal-adapter1.1.5的安装包存在bug,需要把下载上一个版本(v1.1.5-alpha-2)的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉 canal-adapter/plugin中的对应jar包,注意名称要修改为1.1.5中对应的jar包的名称:client-adapter.es7x-1.1.5-jar-with-dependencies.jar

33.3 全量同步mysql的数据到es

在真实的应用场景中,我们一般的需求会是,首先将Mysql中的历史数据同步,然后再同步增删改的数据。也就是说我们先要进行全量数据同步然后在进行增量同步

然后canal它会监听binglog来实现增量同步,所以说增量数据同步是自动的。那么我们只有解决全量同步的问题就行了。

cannal并没有明确的说明支持全量同步,但是……..

查看源码得知在一个 CommonRest.java 中提供了一个全量同步的接口

image-20220504235257200

1
2
3
# type:代表同步的终端数据存储库, 这里是es7版本, 所以这里传递es7。如果是es6传递es6
# task:代表要全量同步的配置文件,我们这里的配置文件路径放在 adapter\conf\es7中。也是就article.yml
http://10.10.10.104:8081/etl/es7/article.yml

发送请求即可全量同步


文章作者: Yang Shiyu
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Yang Shiyu !
  目录