Code Examples

To make integration easier, we're sharing code examples that allow clients to handle all Client Audit Log API calls (token request and file retrieval). The following code samples can be used as a reference to integrate with the Index Exchange (IX) platform.

Log Retrieval and Storage via Python + AWS

This example demonstrates how to retrieve your client audit logs from the CAL endpoint and push them to your AWS instance.

''' The script demonstrates how to get a token and retrieve files for download from the CAL endpoint. '''
  
#!/usr/bin/env python
  
import sys
import hashlib
import tempfile
import boto3
import requests
from botocore.errorfactory import ClientError
  
def get_token(secret):
    ''' Request access token from Keycloak via Client Secret '''
  
    headers = {'Authorization': 'Basic ' + secret}
    resp = requests.post('https://identity.indexexchange.com/auth/realms/eventlog/protocol/openid-connect/token', headers=headers, data={'grant_type': 'client_credentials'})
    resp.raise_for_status()
  
    resp_json = resp.json()
    return resp_json["access_token"]
  
def get_available_downloads(token):
    ''' Given a header containing an access token, retrieve list of available downloads '''
  
    token_header = {'Authorization': 'Bearer ' + token}
  
    resp = requests.get('https://app.indexexchange.com/api/cal/v1/downloads', headers=token_header)
    resp.raise_for_status()
  
    return resp.json()
  
def upload_file_to_s3(s3_client, token, bucket, bucket_key, url, expected_md5sum):
    ''' Download a file from CAL and upload it to S3 client '''
  
    token_header = {'Authorization': 'Bearer ' + token}
  
    resp = requests.get(url, headers=token_header, timeout=5, stream=True)
    resp.raise_for_status()
  
    # download CAL file to disk in chunks so we don't hold huge files in memory
    with tempfile.NamedTemporaryFile() as tmp:
        md5sum = hashlib.md5()
        for chunk in resp.iter_content(chunk_size=32000):
            if chunk:
                tmp.write(chunk)
                md5sum.update(chunk)
  
        if md5sum.hexdigest() != expected_md5sum:
            raise Exception('m5dsum of downloaded file does not match expected value. Actual: {}, Expected: {}'.format(md5sum.hexdigest(), expected_md5sum))
  
        # Save metadata to be used when re-downloading, so we can skip files that are unchanged (have the same md5sum)
        s3_client.upload_file(tmp.name, bucket, bucket_key, ExtraArgs={'Metadata': {'md5sum': md5sum.hexdigest()}})
  
        print "Successfully uploaded file {}".format(bucket_key)
  
def is_file_uploaded(s3_client, bucket, bucket_key, md5sum):
    ''' Check if the key is already in the bucket and the contents of key are unchanged '''
  
    try:
        key_object = s3_client.head_object(Bucket=bucket, Key=bucket_key)
  
        # Check if file exists and is unchanged via md5sum
        return 'md5sum' in key_object['Metadata'] and key_object['Metadata']['md5sum'] == md5sum
    except ClientError:
        # key_object not found in bucket
        return False
  
def main():
    ''' Standard process for retrieving CAL files: Retrieve auth token, get list of available files, download files and upload to S3'''
  
    if len(sys.argv) != 4:
        print 'Usage: python get_event_logs.py <client_secret> <aws_access_key> <aws_secret_key>'
        quit()
  
    token = get_token(sys.argv[1])
    downloads = get_available_downloads(token)
  
    print 'Number of available downloads: {}'.format(downloads['count'])
  
    # Connect to S3 Client via access key and secret key
    client = boto3.client(
        's3',
        aws_access_key_id=sys.argv[2],
        aws_secret_access_key=sys.argv[3],
    )
  
    # Download all available files and push them to an S3 bucket
    for download in downloads['availableDownloads']:
        # Destination file named after the hour
        bucket_key = '{}.gz'.format(download['hour'])
        bucket = 'insert-your-bucket-name-here'
  
        if not is_file_uploaded(client, bucket, bucket_key, download['md5sum']):
            upload_file_to_s3(client, token, bucket, bucket_key, download['downloadURL'], download['md5sum'])
  
if __name__ == "__main__":
    main() 

Hourly Aggregations to AWS via Python

This example demonstrates how to aggregate client audit logs hourly and push the aggregations to your AWS instance.

''' The script demonstrates how to perform basic aggregations on files retrieved from CAL. '''
 
#!/usr/bin/env python
 
import sys
import hashlib
import json
import pandas
import requests
import s3fs
import tempfile
from collections import OrderedDict
from datetime import datetime
 
# Stores mapping DataFrame objects queried through the CAL API
mapping_cache = {}
 
def get_token(encoded_client_key):
    ''' Request access token from Keycloak using an encoded client key '''
 
    auth_url = 'https://identity.indexexchange.com/auth/realms/eventlog/protocol/openid-connect/token'
    headers = {'Authorization': 'Basic ' + encoded_client_key}
    data = {'grant_type': 'client_credentials'}
    resp = requests.post(auth_url, headers=headers, data=data)
    resp.raise_for_status()
 
    resp_json = resp.json()
    return resp_json['access_token']
 
def get_available_downloads(token):
    ''' Given an access token, retrieve list of available downloads as a JSON object '''
 
    request_url = 'https://app.indexexchange.com/api/cal/v1/downloads'
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header)
    resp.raise_for_status()
 
    resp_json = resp.json()
    return resp_json['availableDownloads']
 
def get_mappings(token, mapping_url_suffix):
    ''' Given an access token, retrieve mappings for a given url_suffix as a DataFrame '''
 
    if mapping_url_suffix in mapping_cache:
        return mapping_cache[mapping_url_suffix]
 
    request_url = 'https://app.indexexchange.com/api/cal/v1/mappings/{}'.format(mapping_url_suffix)
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header)
    resp.raise_for_status()
 
    resp_json = resp.json()
    mapping_json = resp_json['data']
    mapping_str = json.dumps(mapping_json)
    mapping_df = pandas.read_json(mapping_str)
    mapping_cache[mapping_url_suffix] = mapping_df
    return mapping_df
 
def get_audit_log(token, download_json):
    ''' Given an access token and file metadata, download file from CAL as a DataFrame '''
 
    request_url = download_json['downloadURL']
    token_header = {'Authorization': 'Bearer ' + token}
    resp = requests.get(request_url, headers=token_header, timeout=5, stream=True)
    resp.raise_for_status()
 
    # Download in chunks so we don't hold huge files in memory
    with tempfile.NamedTemporaryFile() as tmp_file:
        expected_md5sum = download_json['md5sum']
        md5sum = hashlib.md5()
        for chunk in resp.iter_content(chunk_size=32000):
            if chunk:
                tmp_file.write(chunk)
                md5sum.update(chunk)
        tmp_file.flush()
 
        actual_md5sum = md5sum.hexdigest()
        if actual_md5sum != expected_md5sum:
            error_message = 'md5sum of downloaded file does not match expected value. Actual: {}, Expected: {}'
            raise Exception(error_message.format(actual_md5sum, expected_md5sum))
 
        log_df = pandas.read_csv(tmp_file.name, compression='gzip')
        return log_df
 
def get_path_s3(download_json, bucket_name):
    ''' Destination file is named after the hour '''
 
    hour = download_json['hour']
    bucket_key = '{}.csv'.format(hour)
    path = '{}/{}'.format(bucket_name, bucket_key)
    return path
 
def upload_df_to_s3(df, fs, path):
    ''' Upload a DataFrame to the S3 bucket '''
 
    with fs.open(path, mode='wb') as file:
        df.to_csv(file, index=False)
 
def truncate_hour(timestamp_str):
    ''' Truncates an hour by eliminating minutes and seconds '''
 
    timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
    return timestamp_obj.strftime('%Y-%m-%d %H:00')
 
def timestamp_to_hour(df):
    ''' Reformats the timestamp column of a DataFrame by truncating the hour '''
 
    col_ts = 'timestamp'
    df[col_ts] = df[col_ts].apply(truncate_hour)
    hour_df = df.rename(columns={'timestamp': 'hour'})
    return hour_df
 
def aggregate_audit_log(df, dimensions):
    ''' Some required dimensions are predefined while others are specified by the caller '''
 
    measure_aggregations = OrderedDict([
        ('event_opportunity', 'size'),
        ('net_revenue', 'sum'),
        ('gross_revenue', 'sum')
    ])
    measures = measure_aggregations.keys()
 
    agg_df = (df[dimensions + measures]
        .groupby(dimensions, as_index=False)
        .agg(measure_aggregations)
        .rename(columns={'event_opportunity': 'impressions'}))
    return agg_df
 
def join_mapping(base_df, token, mapping_url_suffix, col_join):
    '''
    Joins a base DataFrame against a mapping DataFrame using one or more columns
        base_df:                Base DataFrame on which mappings are applied
        token:                  Auth token for CAL API access
        mapping_url_suffix:     Suffix of the mapping API URL for the desired mapping (see get_mappings() for details)
        col_join:               Columns that represent the join conditions against the mapping DataFrame (can be String or String[])
    '''
 
    mapping_df = get_mappings(token, mapping_url_suffix)
    merged_df = base_df.merge(mapping_df, how='left', on=col_join)
    return merged_df
 
def get_hourly_df(encoded_client_key, download_json):
    '''
    Restructures a raw audit log to produce predefined hourly aggregations
        encoded_client_key:     Encoded key with enough information to acquire an auth token for CAL API access
        download_json:          JSON structure containing download file metadata
 
    The resulting report can be customized:
        * Extra dimensions can be modified to produce different aggregation granularities
        * Additional information for certain dimensions can be added using join_mapping pipes
        * Sorting can be modified to ensure that relevant aggregations are shown in priority order
 
    Note:
        * The 'hour' dimension is generated in case we want to see it in the final report
    '''
    dimensions = ['hour', 'partner_id', 'site_id', 'supply_source', 'dsp_id', 'trading_desk_id']
    sort_order = ['partner_id', 'site_id', 'impressions']
    sort_ascending = [True, True, False]
 
    token = get_token(encoded_client_key)
    log_df = get_audit_log(token, download_json)
    hourly_df = (log_df
        .pipe(timestamp_to_hour)
        .pipe(aggregate_audit_log, dimensions=dimensions)
        .sort_values(by=sort_order, ascending=sort_ascending)
        .pipe(join_mapping, token=token, mapping_url_suffix='dsps', col_join='dsp_id')
        .pipe(join_mapping, token=token, mapping_url_suffix='buyers', col_join=['dsp_id', 'trading_desk_id']))
    return hourly_df
 
def main():
    '''
    Standard process for retrieving CAL files:
        * Retrieve auth token
        * Get list of available files
        * Ensure available files have been processed and uploaded to the S3 bucket
    '''
 
    if len(sys.argv) != 5:
        print 'Usage: python get_event_logs.py <encoded_client_key> <aws_access_key> <aws_secret_key> <s3_bucket_name>'
        quit()
 
    encoded_client_key = sys.argv[1]
    aws_access_key_id = sys.argv[2]
    aws_secret_access_key = sys.argv[3]
    bucket_name = sys.argv[4]
 
    token = get_token(encoded_client_key)
    downloads_json = get_available_downloads(token)
 
    print 'Number of available downloads: {}'.format(len(downloads_json))
 
    # Connect to S3 filesystem via access key and secret key
    fs = s3fs.S3FileSystem(
        key=aws_access_key_id,
        secret=aws_secret_access_key,
    )
 
    for download_json in downloads_json:
        path = get_path_s3(download_json, bucket_name)
        if fs.exists(path):
            print 'Already uploaded path: {}'.format(path)
            continue
 
        hourly_df = get_hourly_df(encoded_client_key, download_json)
        upload_df_to_s3(hourly_df, fs, path)
        print "Successfully uploaded path: {}".format(path)
 
if __name__ == "__main__":
    main()