api_key_auth_sample.py

Follow

Given a path to your api key, this will exchange that key for credentials and then make an authenticated request to list jobs on your account. Download the file at the bottom of the article.

import logging
import json
import os
import requests
import time

API_URL = "https://api.conductortech.com"
API_KEY_PATH = "/PATH/TO/API_KEY.JSON"
AUTH_URL = "https://dashboard.conductortech.com"

def get_creds_path():
   config_path = os.path.join(os.path.expanduser("~"), ".config", "conductor")
   creds_file = os.path.join(config_path, "api_key_credentials")
   return creds_file

def get_api_key_bearer_token(creds_file=None):
   with open(API_KEY_PATH, 'r') as fp:
       api_key_info = json.loads(fp.read())

   response = requests.get("%s/api/oauth_jwt" % AUTH_URL,
                            params={"grant_type": "client_credentials",
                                    "scope": "owner admin user",
                                    "client_id": api_key_info['client_id'],
                                    "client_secret": api_key_info['private_key']})
   if response.status_code == 200:
       response_dict = json.loads(response.text)
       credentials_dict = {
             "access_token": response_dict['access_token'],
             "token_type": "Bearer",
             "expiration": int(time.time()) + int(response_dict['expires_in']),
             "scope": "user admin owner"
         }

         if not creds_file:
             return credentials_dict

         if not os.path.exists(os.path.dirname(creds_file)):
             os.makedirs(os.path.dirname(creds_file))

         with open(creds_file, "w") as fp:
             fp.write(json.dumps(credentials_dict))
   return

def read_conductor_credentials():
    '''
    Read the conductor credentials file, if it exists. This will contain a bearer token from     either the user or the API key (if that's desired). If the credentials file doesn't         exist, try and fetch a new one in the
    API key scenario or prompt the user to log in.
    Args:
        use_api_key: Whether or not to use the API key

    Returns: A Bearer token in the event of a success or None if things couldn't get figured out

    '''

    logging.debug("Reading conductor credentials...")
    creds_file = get_creds_path()

    logging.debug("Creds file is %s" % creds_file)
    if not os.path.exists(creds_file):
        if os.path.exists(API_KEY_PATH):
            logging.debug("Attempted to use API key, but no api key found!")
            return None

        # Exchange the API key for a bearer token
        logging.debug("Attempting to get API key bearer token")
        get_api_key_bearer_token(creds_file)

   if not os.path.exists(creds_file):
       return None
   logging.debug("Reading credentials file...")
   with open(creds_file) as fp:
       file_contents = json.loads(fp.read())

    expiration = file_contents.get('expiration')
    if not expiration or expiration < int(time.time()):
        logging.debug("Credentials expired!")
        logging.debug("Refreshing API key bearer token!")
        get_api_key_bearer_token(creds_file)

return file_contents['access_token']

bearer_token = read_conductor_credentials()
if not bearer_token:
     print "Ack! Something went wrong"
     exit()
response = requests.get("%s/api/v1/jobs" % API_URL,
headers={"Authorization": "Bearer %s" % bearer_token})
print response.text
# curl_cmd = "curl -X GET -H 'Authorization: Bearer %s' '%s/billing/get_spend?type=account&start=2017-07-01&end=2017-07-24'" % (bearer_token, CONDUCTOR_URL)
# print curl_cmd