返回

父子表、对象型数组、nested数组灵活同步数据,ElasticSearch实战强效补给

后端

  1. 引言
    最近在做mysql到es的数据同步,涉及到父子表数据同步,特此记录,以供后续参考。关于mysql同步到es的操作明细可参考我之前的博客。

  2. 父子表数据同步
    父表同步到es与普通表同步没有区别,插入新数据时,只需要在es中建立相应的索引,然后将父表的数据插入索引即可。

import pymysql
from elasticsearch import Elasticsearch

# 连接mysql数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()

# 获取父表数据
sql = 'select * from parent'
cursor.execute(sql)
results = cursor.fetchall()

# 连接elasticsearch
es = Elasticsearch()

# 创建索引
index_name = 'parent'
es.indices.create(index=index_name)

# 将父表数据插入索引
for row in results:
    doc = {
        'id': row[0],
        'name': row[1]
    }
    es.index(index=index_name, doc_type='_doc', id=row[0], body=doc)

# 关闭连接
cursor.close()
conn.close()

子表同步到es时,需要在es中建立父子关系,然后将子表的数据插入到父表的索引中。

# 连接mysql数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()

# 获取子表数据
sql = 'select * from child'
cursor.execute(sql)
results = cursor.fetchall()

# 连接elasticsearch
es = Elasticsearch()

# 创建父子关系
index_name = 'parent'
es.indices.put_mapping(index=index_name, body={
    "_doc": {
        "properties": {
            "children": {
                "type": "nested"
            }
        }
    }
})

# 将子表数据插入索引
for row in results:
    doc = {
        'id': row[0],
        'name': row[1],
        'children': [
            {
                'id': row[2],
                'name': row[3]
            }
        ]
    }
    es.index(index=index_name, doc_type='_doc', id=row[0], body=doc)

# 关闭连接
cursor.close()
conn.close()
  1. 对象型数组数据同步
    对象型数组是指数组中的元素是一个个对象,每个对象都有自己的属性。将对象型数组同步到es时,需要将数组中的每个对象作为一个单独的文档插入到索引中。
import pymysql
from elasticsearch import Elasticsearch

# 连接mysql数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()

# 获取对象型数组数据
sql = 'select * from array_object'
cursor.execute(sql)
results = cursor.fetchall()

# 连接elasticsearch
es = Elasticsearch()

# 创建索引
index_name = 'array_object'
es.indices.create(index=index_name)

# 将对象型数组数据插入索引
for row in results:
    doc = {
        'id': row[0],
        'name': row[1],
        'children': [
            {
                'id': child[0],
                'name': child[1]
            } for child in row[2]
        ]
    }
    es.index(index=index_name, doc_type='_doc', id=row[0], body=doc)

# 关闭连接
cursor.close()
conn.close()
  1. nested数组数据同步
    nested数组是指数组中的元素是另一个数组。将nested数组同步到es时,需要将nested数组中的每个元素作为一个单独的文档插入到索引中。
import pymysql
from elasticsearch import Elasticsearch

# 连接mysql数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()

# 获取nested数组数据
sql = 'select * from array_nested'
cursor.execute(sql)
results = cursor.fetchall()

# 连接elasticsearch
es = Elasticsearch()

# 创建索引
index_name = 'array_nested'
es.indices.create(index=index_name)

# 将nested数组数据插入索引
for row in results:
    doc = {
        'id': row[0],
        'name': row[1],
        'children': [
            {
                'id': child[0],
                'name': child[1]
            } for child in row[2]
        ]
    }
    es.index(index=index_name, doc_type='_doc', id=row[0], body=doc)

# 关闭连接
cursor.close()
conn.close()
  1. 总结
    通过本文,我们了解了如何将父子表数据、对象型数组数据、nested数组数据同步到ElasticSearch中。这些同步方法可以帮助我们实现不同数据源之间的数据交换,满足各种业务需求。