Menu

conductor

3KB Python snippet created on August 8, 2017. This file is private.

Actions
import logging
import json
import os
import requests
import time

API_URL = "https://api.conductortech.com"
API_KEY_PATH = "/PATH/TO/API_KEY.JSON"
AUTH_URL = "https://dashboard.conductortech.com"

def get_creds_path():
    config_path = os.path.join(os.path.expanduser("~"), ".config", "conductor")
    creds_file = os.path.join(config_path, "api_key_credentials")
    return creds_file

def get_api_key_bearer_token(creds_file=None):
    with open(API_KEY_PATH, 'r') as fp:
        api_key_info = json.loads(fp.read())

    response = requests.get("%s/api/oauth_jwt" % AUTH_URL,
                            params={"grant_type": "client_credentials",
                                    "scope": "owner admin user",
                                    "client_id": api_key_info['client_id'],
                                    "client_secret": api_key_info['private_key']})
    if response.status_code == 200:
        response_dict = json.loads(response.text)
        credentials_dict = {
            "access_token": response_dict['access_token'],
            "token_type": "Bearer",
            "expiration": int(time.time()) + int(response_dict['expires_in']),
            "scope": "user admin owner"
        }

        if not creds_file:
            return credentials_dict

        if not os.path.exists(os.path.dirname(creds_file)):
            os.makedirs(os.path.dirname(creds_file))
        
        with open(creds_file, "w") as fp:
            fp.write(json.dumps(credentials_dict))
    return

def read_conductor_credentials():
    '''
    Read the conductor credentials file, if it exists. This will contain a bearer token from either the user
    or the API key (if that's desired). If the credentials file doesn't exist, try and fetch a new one in the
    API key scenario or prompt the user to log in.
    Args:
        use_api_key: Whether or not to use the API key

    Returns: A Bearer token in the event of a success or None if things couldn't get figured out

    '''

    logging.debug("Reading conductor credentials...")
    creds_file = get_creds_path()

    logging.debug("Creds file is %s" % creds_file)
    if not os.path.exists(creds_file):
        if os.path.exists(API_KEY_PATH):
            logging.debug("Attempted to use API key, but no api key found!")
            return None

        #  Exchange the API key for a bearer token
        logging.debug("Attempting to get API key bearer token")
        get_api_key_bearer_token(creds_file)

    if not os.path.exists(creds_file):
        return None

    logging.debug("Reading credentials file...")
    with open(creds_file) as fp:
        file_contents = json.loads(fp.read())

    expiration = file_contents.get('expiration')
    if not expiration or expiration < int(time.time()):
        logging.debug("Credentials expired!")
        logging.debug("Refreshing API key bearer token!")
        get_api_key_bearer_token(creds_file)

    return file_contents['access_token']

bearer_token = read_conductor_credentials()
if not bearer_token: 
    print "Ack! Something went wrong"
    exit()
response = requests.get("%s/api/v1/jobs" % API_URL,
                        headers={"Authorization": "Bearer %s" % bearer_token})
print response.text
# curl_cmd = "curl -X GET -H 'Authorization: Bearer %s' '%s/billing/get_spend?type=account&start=2017-07-01&end=2017-07-24'" % (bearer_token, CONDUCTOR_URL)
# print curl_cmd

shift+enter to add a new line