This script mainly uses python The script by dataX Inquire about mysql The data increment of is synchronized to HIVE.
1、python3
2、dataX
3、pyhive
4、pyspark
(1) Support the free switching between test and production .
(2) Support incremental synchronization .
(3) Support to supplement historical data .
(4) Simple operating environment .
(5) Support HIVE Queue switching .
For synchronous data , The script basically supports . There is also room for optimization :
1、 Connect HIVE You can use HA Pattern , Do not connect a single node . Reduce the risk of downtime .
2、 Standardization of log printing .
3、dataX Missing script . It cannot be released for other reasons . I'll see you in the back demo come out .
4、 The target table of the table to be synchronized can be transferred in the form of variables . Realize the synchronization of drawing inferences from one instance .
Follow up in other script optimization .
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Incremental synchronization message
from pyhive import hive
import os, sys,datetime
isPrd = True
hiveInfo = {
'host':'192.168.1.1','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} \
if(isPrd) else {
'host':'192.168.1.122','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'}
sourceDbInfo = {
'url':'192.168.1.1:3306/db','user':'root','passwd':'123'} \
if(isPrd) else {
'url':'192.168.1.122:3306/db','user':'root','passwd':'root123'}
sys.path.append(os.getcwd())
UTF8 = "UTF-8";
class HiveClient:
def __init__(self):
self.conn = hive.connect(
host=hiveInfo.get('host'),
port=hiveInfo.get('port'),
username=hiveInfo.get('user'),
database=hiveInfo.get('database'),)
def query(self, sql):
sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
print(sql)
with self.conn.cursor() as cursor:
cursor.execute("set mapreduce.job.queuename=root.users.project")
cursor.execute(sql)
return cursor.fetchall()
def execute(self, sql):
sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
print(sql)
with self.conn.cursor() as cursor:
cursor.execute("set mapreduce.job.queuename=root.users.project")
cursor.execute(sql)
def close(self):
self.conn.close()
def __getMaxPk():
# Add partitions
addPartion="alter table ods.ods_message_incr add if not exists partition (dt='{dt}') ".format(dt=dt)
HiveClient().execute(addPartion)
# Get maximum ID
sql = """select max(id) from ods.ods_message_incr where dt='{dt}'""".format(dt=dt)
data = HiveClient().query(sql)
HiveClient().close()
print(data)
if (data[0][0] == None):
return 0
return data[0][0]
# Incremental synchronous push message
def syncPushMessage(dt):
maxPk = __getMaxPk();
datax_json_path = os.getcwd() + '/ods_message_incr.json'
etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
# This is execution dataX command , Then there's the Chuan Shen .
commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
print(commandStr)
os.system(commandStr)
# Add missing messages
def syncPushMessage_history(dt,maxPk):
datax_json_path = os.getcwd() + '/ods_message_incr.json'
etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
print(commandStr)
os.system(commandStr)
if __name__ == '__main__':
if len(sys.argv) == 1:
dt = (datetime.datetime.now()).strftime('%Y-%m-%d')
syncPushMessage(dt)
elif len(sys.argv) == 2:
dt = sys.argv[1]
syncPushMessage(dt)
elif len(sys.argv) == 3:
dt = sys.argv[1]
maxPk = sys.argv[2]
syncPushMessage_history(dt, maxPk)
else:
print(' Parameter input error ')
sys.exit(1)
Original article , Please show me your original address for reprint .
Thanks for reading , If this article can help you . My pleasure ! thank you ~