[Python][CI] Uploading CSVs to Cloud Insights Using API

I thought the following might be useful. I already had the code to Upload CSV To Be Processed by the Post ETL Script for the NetApp Cloud Insights DataWarehousw (DWH). And with a couple of tweaks, I was able to enable using API to upload a CSV to be processed by the Annotation Import Utility (AIU). The two minor changes were:

  1. dwh-management/upload/csvs is a POST, and assets/import is a PUT.
  2. upload/csvs uses customFile in files, and assets/import uses data in files.
You can simply drop the code into your script/Python CLI in order to use it.

Usage:

yourTenant = "https://TENANT.cloudinsights.netapp.com"
apiKey = "YOUR_VERY_LONG_API_KEY"

with Client(yourTenant, apiKey) as client:
  upload_CSV_AIU(client, 'annotation_updates.csv')

with Client(yourTenant, apiKey) as client:
  upload_CSV_DWH(client, 'dwh_custom_table_update.csv')

The Code

from enum import Enum
import json
from pathlib import Path
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

''' CLASS Client '''

class Client:
  ### __init__
  def __init__(self, instance, token, version='v1', session=None):
    self.host = instance.rstrip('/') # Trim trailing slash
    self.version = version
    self.base_path = f'/rest/{version}'
    if session is None:
      session = requests.Session()
    session.headers.update({
      'Content-Type': 'application/json',
      'X-CloudInsights-ApiKey': token,
    })
    self.session = session
  ### __enter__
  def __enter__(self):
    return self
  ### __exit__
  def __exit__(self, type, value, traceback):
    self.close()
    return False
  ### _convert_to_fully_qualified_url
  def _convert_to_fully_qualified_url(self, api_path):
    if api_path.startswith(self.host):
      # Fully qualified URL
      return api_path
    if api_path.startswith(self.base_path):
      # URL that is only missing the schema and server
      return self.host + api_path
    # The URL is relative - needs to add prefix
    return f'{self.host}{self.base_path}/{api_path.lstrip("/")}'
  ### _request
  def _request(self, method, path, **kwargs):
    return_response = kwargs.pop('return_response', None)
    response = self.session.request(
      method,
      self._convert_to_fully_qualified_url(path),
      **kwargs,
      verify=False
    )
    self._raise_for_status(response)
    if return_response:
      return response
    # Certain calls like /dwh-management/upload/csvs return an empty result
    if not response.text:
      return {}
    return response.json()
  ### post
  def post(self, path, json=None, **kwargs):
    return self._request('POST', path, json=json, **kwargs)
  ### PUT
  def put(self, path, json=None, **kwargs):
    return self._request('PUT', path, json=json, **kwargs)
  ### _raise_for_status
  @staticmethod
  def _raise_for_status(response):
    ''' Improve the original raise_for_status in the requests library to add some logging '''
    if 400 <= response.status_code < 500:
      # For client error, raise a regular HTTP error
      raise requests.HTTPError(f'{response.status_code} Client Error: {response.reason} {response.text}', response=response)
    elif 500 <= response.status_code < 600:
      # For server-side errors, we need further parsing to extract OCI specific error information
      raise APIError(f'{response.status_code} Server Error: {response.reason}', response=response)
  ### upload_DWH_csv
  def upload_DWH_csv(self, filename):
    ''' Upload a CSV file to be processed by Data Warehouse. '''
    return self.post(
      'dwh-management/upload/csvs',
      files={'customFile': open(filename, 'rb')},
      headers={'Content-Type': None},  # Reset Content-Type from default
      return_response=True  # Get the response object, as no JSON is returned for this call
    )
  ### upload_AIU_csv
  def upload_AIU_csv(self, filename):
    ''' Upload a CSV file to be processed by the Annotation Import Utility. '''
    return self.put(
      'assets/import',
      files={'data': open(filename, 'rb')},
      headers={'Content-Type': None},  # Reset Content-Type from default
      return_response=True  # Get the response object, as no JSON is returned for this call
    )
  ### close
  def close(self):
    ''' Close the session. '''
    self.session = None

''' CLASS APIError (used in above class)) '''

class APIError(requests.HTTPError)
  ### __init__
  def __init__(self, *args, **kwargs)
    super(APIError, self).__init__(*args, **kwargs)
    j = self.response.json()
    self.errorCode = j['errorCode']
    self.errorMessage = j['errorMessage']
  ### __str__
  def __str__(self)
    return f'Server error {self.response.status_code} ({self.errorCode}): {self.errorMessage}'

''' UPLOAD CSV CODE '''

def upload_CSV_DWH(client, csv_file):
  if not Path(csv_file).is_file():
    print(f'File "{csv_file}" does not exist or is not a file.')
    return
  try:
    response = client.upload_DWH_csv(csv_file)
    if response.ok: print('File uploaded successfully!')
    else: print(f'Error posting file: {response.text}')
  except Exception as e: print(e)

def upload_CSV_AIU(client, csv_file):
  if not Path(csv_file).is_file():
    print(f'File "{csv_file}" does not exist or is not a file.')
    return
  try:
    response = client.upload_AIU_csv(csv_file)
    if response.ok: print('File uploaded successfully!')
    else: print(f'Error posting file: {response.text}')
  except Exception as e: print(e)

Comments