程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python script synchronizes MySQL data to hive incrementally through dataX

編輯:Python

python adopt dataX Sync mysql data

1、 Introduce

 This script mainly uses python The script by dataX Inquire about mysql The data increment of is synchronized to HIVE.

2、 Environmental Science

1、python3
2、dataX
3、pyhive
4、pyspark

3、 function

(1) Support the free switching between test and production .
(2) Support incremental synchronization .
(3) Support to supplement historical data .
(4) Simple operating environment .
(5) Support HIVE Queue switching .

4、 Optimize

 For synchronous data , The script basically supports . There is also room for optimization :
1、 Connect HIVE You can use HA Pattern , Do not connect a single node . Reduce the risk of downtime .
2、 Standardization of log printing .
3、dataX Missing script . It cannot be released for other reasons . I'll see you in the back demo come out .
4、 The target table of the table to be synchronized can be transferred in the form of variables . Realize the synchronization of drawing inferences from one instance .
Follow up in other script optimization .

5、 Source code

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Incremental synchronization message 
from pyhive import hive
import os, sys,datetime
isPrd = True
hiveInfo = {
'host':'192.168.1.1','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} \
if(isPrd) else {
'host':'192.168.1.122','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'}
sourceDbInfo = {
'url':'192.168.1.1:3306/db','user':'root','passwd':'123'} \
if(isPrd) else {
'url':'192.168.1.122:3306/db','user':'root','passwd':'root123'}
sys.path.append(os.getcwd())
UTF8 = "UTF-8";
class HiveClient:
def __init__(self):
self.conn = hive.connect(
host=hiveInfo.get('host'),
port=hiveInfo.get('port'),
username=hiveInfo.get('user'),
database=hiveInfo.get('database'),)
def query(self, sql):
sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
print(sql)
with self.conn.cursor() as cursor:
cursor.execute("set mapreduce.job.queuename=root.users.project")
cursor.execute(sql)
return cursor.fetchall()
def execute(self, sql):
sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
print(sql)
with self.conn.cursor() as cursor:
cursor.execute("set mapreduce.job.queuename=root.users.project")
cursor.execute(sql)
def close(self):
self.conn.close()
def __getMaxPk():
# Add partitions 
addPartion="alter table ods.ods_message_incr add if not exists partition (dt='{dt}') ".format(dt=dt)
HiveClient().execute(addPartion)
# Get maximum ID
sql = """select max(id) from ods.ods_message_incr where dt='{dt}'""".format(dt=dt)
data = HiveClient().query(sql)
HiveClient().close()
print(data)
if (data[0][0] == None):
return 0
return data[0][0]
# Incremental synchronous push message 
def syncPushMessage(dt):
maxPk = __getMaxPk();
datax_json_path = os.getcwd() + '/ods_message_incr.json'
etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
# This is execution dataX command , Then there's the Chuan Shen .
commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
print(commandStr)
os.system(commandStr)
# Add missing messages 
def syncPushMessage_history(dt,maxPk):
datax_json_path = os.getcwd() + '/ods_message_incr.json'
etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
print(commandStr)
os.system(commandStr)
if __name__ == '__main__':
if len(sys.argv) == 1:
dt = (datetime.datetime.now()).strftime('%Y-%m-%d')
syncPushMessage(dt)
elif len(sys.argv) == 2:
dt = sys.argv[1]
syncPushMessage(dt)
elif len(sys.argv) == 3:
dt = sys.argv[1]
maxPk = sys.argv[2]
syncPushMessage_history(dt, maxPk)
else:
print(' Parameter input error ')
sys.exit(1)

6、 Last

Original article , Please show me your original address for reprint .
Thanks for reading , If this article can help you . My pleasure ! thank you ~


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved