Sync dwh_inventory Tables from CI DWH to On-Premise MySQL

This is a simple version of the tool I'm working on for syncing tables from the NetApp Cloud Insights DWH to an On-Premise MySQL DB. This tool will handle dwh_inventory type tables (dwh_inventory tables are recreated after every ETL, so for syncing they are quite easy, since we just truncate the destination table, and pull everything across from the CI DWH table.)

The tables must already exist in the On-Premise MySQL DB for this to work. Creating the tables is quite simple if you have access to the NetApp OnCommand Insight Data Warehouse, as you can just run "SHOW CREATE TABLE" to see the syntax to create the table.

Save the script as say CiDwhFullTableSync.py and run with this syntax:

python CiDwhFullTableSync.py tokenFile.txt https://YOURTENANT.cloudinsights.netapp.com dwh_inventory storage -mysqlHost SQLSERVERFQDN -mysqlUser USER -mysqlPass PASS -mysqlPort PORT

Your API token should be in the tokenFile.txt. You don't have to specify the mysql parameters as the tool with create the insert commands for you to validate before doing the inserts yourself.

The Script

''' python CiDwhFullTableSync.py tokenFile.txt https://YOURTENANT.cloudinsights.netapp.com dwh_inventory storage -mysqlHost SQLSERVERFQDN -mysqlUser USER -mysqlPass PASS -mysqlPort PORT '''

import argparse
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mysql.connector
import time

''' PARAMETERS '''

parser = argparse.ArgumentParser(description="Just an example",
  formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("tokenFile", help="API Token")
parser.add_argument("tenantUrl", help="https://TENANT.cloudinsights.netapp.com")
parser.add_argument("schema", help="Database")
parser.add_argument("table", help="Table")
parser.add_argument("-batch", default = 1000, help="Batch size for collecting from CI-DWH and MySQL inserts")
parser.add_argument("-mysqlHost", help="Destination MySQL Host")
parser.add_argument("-mysqlUser", help="Destination MySQL User")
parser.add_argument("-mysqlPass", help="Destination MySQL Password")
parser.add_argument("-mysqlPort", help="Destination MySQL Port")
parser.add_argument("-noOutputTxt", help="Do not output the MySQL insert commands")

args = parser.parse_args()
config = vars(args)
print("Arguments config:")
print(config,end = "\r\n\n")

''' READ THE TOKEN FILE '''

with open(config['tokenFile']) as t:
  lines = t.readlines()
token = lines[0]
print("Token:")
print(token,end = "\r\n\n")

''' ACQUIRE FROM CLOUD INSIGHTS '''

headers = {'X-CloudInsights-ApiKey': '{key}'.format(key=token)}
api = ('/rest/v1/dwh-management/odata/' + config['schema'] + '/' + config['table'])

collect = True
skip = 0
batch = int(config['batch'])
results = []

while collect:
  apiExtra = ('?&$top=' + str(batch) + '&$skip=' + str(skip))
  print(apiExtra)
  j = requests.get(config['tenantUrl']+api+apiExtra,headers=headers,verify=False).json()
  if len(j['value']) == 0:
    print("Reached the end!")
    collect = False
  else:
    results += j['value']  
  skip += batch
  
print("Results count = " + str(len(results)),end = "\r\n\n")

''' GET THE COLUMN HEADINGS '''
 
print("The columns: ")
columns =  list(results[0].keys())
for c in columns:
  print(c)
print()

''' OUTPUT MySQL INSERT COMMANDS '''

if not config['noOutputTxt']:
  OutputFile = 'MysqlCommands.' + config['schema'] + '.' + config['table'] + '.txt'
  print("Creating output file " + OutputFile,end = "\r\n\n")
  
  ''' INITIALIZE THE INSERT INTO COMMAND '''
  insertBase = "INSERT INTO " + config['schema'] + '.' + config['table'] + " "
    
  preChar = "("
  for c in columns:
    insertBase += preChar + '`' + c + '`'
    preChar = ","

  insertBase += ") VALUES "
  print("Insert Base = " + insertBase,end = "\r\n\n")
  
  ''' GENERATE THE INSERT COMMANDS (we truncate for dwh_inventory tables) '''
  output = [('TRUNCATE TABLE ' + config['schema'] + '.' + config['table'] + ';')]
  for r in results:
    newRow = []
    for c in columns:
      if(type(r[c]).__name__ == 'NoneType'): newRow += ['None']
      else: newRow += [r[c]]
    newVal = tuple(newRow)
    output += [(insertBase + str(newVal) + ";")]  

  ''' OUTPUT TO TEXT FILE '''
  MyFile = open(OutputFile, 'w')
  for line in output:
    MyFile.write(line)
    MyFile.write('\n')
  MyFile.close()

  print('Created file ' + OutputFile,end = "\r\n\n")
  
''' INSERT THE DATA TO MYSQL (if all 4 MySQL parameters are specified) '''

connectSuccess = False
if config['mysqlHost'] and config['mysqlUser'] and config['mysqlPass'] and config['mysqlPort']:
  print("MYSQL properties specified!",end = "\r\n\n")
  
  mydb = mysql.connector.connect(
    host=config['mysqlHost'],
    user=config['mysqlUser'],
    password=config['mysqlPass'],
    port=config['mysqlPort'],
database="dwh_inventory"
  )
  
  try: mydb
  except NameError: print("Unable to connect to MySQL!")
  else: connectSuccess = True

''' INSERTING MULTIPLE ROWS AT A TIME (more performance) '''
 
if connectSuccess:
  
  ''' CONSTRUCT THE sql STATEMENT '''
  sql = 'INSERT INTO ' + config['schema'] + '.' + config['table'] + " "
  preChar = "("
  for c in columns:
    sql += preChar + '`' + c + '`'
    preChar = ","
  sql += ") VALUES "
  preChar = "("
  for c in columns:
    sql += preChar + '%s'
    preChar = ","
  sql += ")"
  print("SQL = " + sql,end = "\r\n\n")
  
  ''' CONSTRUCT AND BATCH THE val STATEMENTs '''
  vals = []
  val  = []
  count = 0
  totalCount = 0
  for r in results:
    newRow = []
    for c in columns: newRow += [r[c]]
    newVal = tuple(newRow)
    val += [newVal]
    count += 1
    totalCount += 1
    if count == batch or totalCount == len(results):
      count = 0
      vals += [val]
      val = []

  ''' VISUAL OUTPUT '''   
  for val in vals:
    print('sql = "' + sql + '"')
    print('val = [')
    for v in val: print(str(v) + ',')
    print(']',end = "\r\n\n")  

  ''' TEMPORARY PAUSE TO BREAK '''  
  print("Pausing for 5 seconds before truncation!")  
  time.sleep(5) 

  ''' TRUNATE dwh_inventory (these tables get destroyed) '''
  print("Truncating " + config['schema'] + '.' + config['table'],end = "\r\n\n")
  sqlT = 'TRUNCATE TABLE ' + config['schema'] + '.' + config['table']
  mycursor = mydb.cursor()
  mycursor.execute(sqlT)
  mydb.commit
  
  ''' INSERTING MULTIPLE ROWS '''
  print("Inserting new rows.",end = "\r\n\n")
  for val in vals:
    mycursor = mydb.cursor()
    mycursor.executemany(sql,val)
    mydb.commit()

Comments