python

超轻量级php框架startmvc

用python简单实现mysql数据同步到ElasticSearch的教程

更新时间:2020-06-04 20:18:02 作者:startmvc
之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一

之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个

思路:

网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据

使用:

只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(数据库管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:


# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滚和提交的操作
def find(func):
 def wrapper(self, *args, **kwargs):
 try:
 return func(self, *args, **kwargs)
 except Exception as e:
 print(traceback.format_exc())
 print(str(e))
 return traceback.format_exc()
 finally:
 self.session.close()
 return wrapper
class MysqlManager(object):
 def __init__(self):
 mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
 self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
 pool_recycle=3600)
 # self.DB_Session = sessionmaker(bind=self.engine)
 # self.session = self.DB_Session()
 self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
 self.db = scoped_session(self.DB_Session)
 self.session = self.db()
 @find
 def select_all_dict(self, sql, keys):
 a = self.session.execute(sql)
 a = a.fetchall()
 lists = []
 for i in a:
 if len(keys) == len(i):
 data_dict = {}
 for k, v in zip(keys, i):
 data_dict[k] = v
 lists.append(data_dict)
 else:
 return False
 return lists
 # 关闭
 def close(self):
 self.session.close()

aa.sql:


select 
 CONVERT(c.`id`,CHAR) as id, 
 c.`code` as code, 
 c.`project_name` as project_name, 
 c.`name` as name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `cc` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

bb.sql:


select 
 CONVERT(c.`id`,CHAR) as id, 
 CONVERT(c.`age`,CHAR) as age, 
 c.`code` as code, 
 c.`name` as name, 
 c.`project_name` as project_name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `bb` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

esconfig.py:


# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名与es中的type名一致
mysql = {
 # mysql连接信息
 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
 # sql文件信息
 "statement_filespath":[
 # sql对应的es索引和es类型
 {
 "index":"a1",
 "sqlfile":"aa.sql",
 "type":"aa"
 },
 {
 "index":"a1",
 "sqlfile":"bb.sql",
 "type":"bb"
 },
 ],
}
# es的ip和端口
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
# 字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识
db_field = {
 "aa":
 ("id",
 "code",
 "name",
 "project_name",
 "update_time",
 ),
 "bb":
 ("id",
 "code",
 "age",
 "project_name",
 "name",
 "update_time",
 ),
}
es_config = {
 # 间隔多少秒同步一次
 "sleep_time":10,
 # 为了解决服务器之间时间差问题
 "time_difference":3,
 # show_json 用来展示导入的json格式数据,
 "show_json":False,
}

mstes.py:


# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
 try:
 # 是否展示json数据在控制台
 self.show_json = es_config.get("show_json")
 # 间隔多少秒同步一次
 self.sleep_time = es_config.get("sleep_time")
 # 为了解决同步时数据更新产生的误差
 self.time_difference = es_config.get("time_difference")
 # 当前时间,留有后用
 self.datetime_now = ""
 # es的ip和端口
 es_host = elasticsearch.get("hosts")
 # 连接es
 self.es = Elasticsearch(es_host)
 # 连接mysql
 self.mm = MysqlManager()
 except :
 print(traceback.format_exc())
 def tongbu_es_mm(self):
 try:
 # 同步开始时间
 start_time = time.time()
 print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
 # 这个list用于批量插入es
 actions = []
 # 获得所有sql文件list
 statement_filespath = mysql.get("statement_filespath",[])
 if self.datetime_now:
 # 当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化
 # sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T
 self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
 else:
 self.datetime_now = "1999-01-01T00:00:00"
 if statement_filespath:
 for filepath in statement_filespath:
 # sql文件
 sqlfile = filepath.get("sqlfile")
 # es的索引
 es_index = filepath.get("index")
 # es的type
 es_type = filepath.get("type")
 # 读取sql文件内容
 with open(sqlfile,"r") as opf:
 sqldatas = opf.read()
 # ::datetime_now是一个自定义的特殊字符串用于增量更新
 if "::datetime_now" in sqldatas:
 sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
 else:
 sqldatas = sqldatas
 # es和sql字段的映射
 dict_set = db_field.get(es_type)
 # 访问mysql,得到一个list,元素都是字典,键是字段名,值是数据
 db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
 if db_data_list:
 # 将数据拼装成es的格式
 for db_data in db_data_list:
 action = {
 "_index": es_index,
 "_type": es_type,
 "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
 "_source": db_data
 }
 # 如果没有id字段就自动生成
 es_id = db_data.get("id", "")
 if es_id:
 action["_id"] = es_id
 # 是否显示json再终端
 if self.show_json:
 print(action)
 # 将拼装好的数据放进list中
 actions.append(action)
 # list不为空就批量插入数据到es中
 if len(actions) > 0 :
 helpers.bulk(self.es, actions)
 except Exception as e:
 print(traceback.format_exc())
 else:
 end_time = time.time()
 print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
 self.time_difference = end_time-start_time
 finally:
 # 报错就关闭数据库
 self.mm.close()
def main():
 tb = TongBu()
 # 间隔多少秒同步一次
 sleep_time = tb.sleep_time
 # 死循环执行导入数据,加上时间间隔
 while True:
 tb.tongbu_es_mm()
 time.sleep(sleep_time)
if __name__ == '__main__':
 main()

以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

python mysql 数据同步 ElasticSearch