Abstract

The aim of this project is to build ML Workflow on AWS Sagemaker

Background

Image Classifiers are used in the field of computer vision to identify the content of an image and it is used across a broad variety of industries, from advanced technologies like autonomous vehicles and augmented reality, to eCommerce platforms, and even in diagnostic medicine.

The image classification model can help the team in a variety of ways in their operating environment: detecting people and vehicles in video feeds from roadways, better support routing for their engagement on social media, detecting defects in their scones, and many more!

In this project, we’ll be building an image classification model that can automatically detect which kind of vehicle delivery drivers have, in order to route them to the correct loading bay and orders. Assigning delivery professionals who have a bicycle to nearby orders and giving motorcyclists orders that are farther can help Unlimited optimize operations.

In this project, we’ll use AWS Sagemaker to build an image classification model that can tell bicycles apart from motorcycles. we’ll deploy our model, use AWS Lambda functions to build supporting services, and AWS Step Functions to compose the model and services into an event-driven application.

Dataset

The dataset is CIFAR-100. The CIFAR dataset is open source and generously hosted by the University of Toronto at: https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz.

Project Steps Overview

Data staging

In AWS sagemeker studio the notebook we choose has been tested on the Python 3 (Data Science) kernel with the ml.t3.medium instance.

Data extraction from a hosting service

import requests

def extract_cifar_data(url, filename="cifar.tar.gz"):
    """A function for extracting the CIFAR-100 dataset and storing it as a gzipped file
    
    Arguments:
    url      -- the URL where the dataset is hosted
    filename -- the full path where the dataset will be written
    
    """
        
    r = requests.get(url, filename)
    with open(filename, "wb") as file_context:
        file_context.write(r.content)
    return


extract_cifar_data("https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz")     

Data transformation into a usable shape and format

Clearly, distributing the data as a gzipped archive makes sense for the hosting service! It saves on bandwidth and storage, and it’s a widely-used archive format. In fact, it’s so widely used that the Python community ships a utility for working with them, tarfile, as part of its Standard Library.

import tarfile

with tarfile.open("cifar.tar.gz", "r:gz") as tar:
    tar.extractall()
import pickle

with open("./cifar-100-python/meta", "rb") as f:
    dataset_meta = pickle.load(f, encoding='bytes')

with open("./cifar-100-python/test", "rb") as f:
    dataset_test = pickle.load(f, encoding='bytes')

with open("./cifar-100-python/train", "rb") as f:
    dataset_train = pickle.load(f, encoding='bytes')

Now we can construct a dataframe

df_train = pd.DataFrame({
    "filenames": dataset_train[b'filenames'],
    "labels": dataset_train[b'fine_labels'],
    "row": range(len(dataset_train[b'filenames']))
})
# Drop all rows from df_train where label is not 8 or 48
df_train = df_train.loc[df_train['labels'].isin([8,48])]

# Decode df_train.filenames so they are regular strings
df_train["filenames"] = df_train["filenames"].apply(
    lambda x: x.decode("utf-8")
)
df_test = pd.DataFrame({
    "filenames": dataset_test[b'filenames'],
    "labels": dataset_test[b'fine_labels'],
    "row": range(len(dataset_test[b'filenames']))
})

# Drop all rows from df_test where label is not 8 or 48
df_test = df_test.loc[df_test['labels'].isin([8,48])]

# Decode df_test.filenames so they are regular strings
df_test["filenames"] = df_test["filenames"].apply(
    lambda x: x.decode("utf-8")
)

Now that the data is filtered for just our classes, we can save all our images.

def save_images(dataset, path, df):
    #Grab the image data in row-major form
    for x in df['row']:
        
        img =  dataset[b'data'][x]
        
        # Consolidated stacking/reshaping from earlier
        target = np.dstack((
            img[0:1024].reshape(32,32),
            img[1024:2048].reshape(32,32),
            img[2048:].reshape(32,32)
        ))
        # Save the image
        plt.imsave(path+'/'+df['filenames'][x], target)
    # Return any signal data you want for debugging
    return

save_images(dataset = dataset_train, path="./train", df=df_train)
save_images(dataset = dataset_test, path="./test", df=df_test)

Load data into production system

import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role
session = sagemaker.Session()

bucket= session.default_bucket() 
print("Default Bucket: {}".format(bucket))

region = session.boto_region_name 
print("AWS Region: {}".format(region))

role = get_execution_role()
print("RoleArn: {}".format(role))

Now we can sync our data into AWS S3

import os

os.environ["DEFAULT_S3_BUCKET"] = bucket
!aws s3 sync ./train s3://${DEFAULT_S3_BUCKET}/train/
!aws s3 sync ./test s3://${DEFAULT_S3_BUCKET}/test/

Model training and deployment

For Image Classification, Sagemaker also expects metadata e.g. in the form of TSV files with labels and filepaths. We can generate these using our Pandas DataFrames

def to_metadata_file(df, prefix):
    df["s3_path"] = df["filenames"]
    df["labels"] = df["labels"].apply(lambda x: 0 if x==8 else 1)
    return df[["row", "labels", "s3_path"]].to_csv(
        f"{prefix}.lst", sep="\t", index=False, header=False
    )
    
to_metadata_file(df_train.copy(), "train")
to_metadata_file(df_test.copy(), "test")

Thereafter we can aupload our manifest file to s3

import boto3

# Upload files
boto3.Session().resource('s3').Bucket(
    bucket).Object('train.lst').upload_file('./train.lst')
boto3.Session().resource('s3').Bucket(
    bucket).Object('test.lst').upload_file('./test.lst')

Using the bucket and region info we can get the latest prebuilt container to run our training job, and define an output location on our s3 bucket for the model. Use the image_uris function from the SageMaker SDK to retrieve the latest image-classification image below:

# Use the image_uris function to retrieve the latest 'image-classification' image 
region_name = boto3.Session().region_name
algo_image = sagemaker.image_uris.retrieve(region=region_name, framework="image-classification")
s3_output_location = f"s3://{bucket}/models/image_model"

We’re ready to create an estimator! Create an estimator img_classifier_model that uses one instance of ml.p3.2xlarge.

img_classifier_model=sagemaker.estimator.Estimator( 
    algo_image,
    role=role,
    instance_count=1,
    instance_type='ml.p3.2xlarge',
    input_mode="File",
    output_path=s3_output_location,
    sagemaker_session=session
    
)

Hyper parameters

img_classifier_model.set_hyperparameters(
    image_shape='3,32,32', 
    num_classes=2, 
    num_training_samples=len(df_train)
)

The image-classification image uses four input channels with very specific input parameters.

from sagemaker.debugger import Rule, rule_configs
from sagemaker.session import TrainingInput
model_inputs = {
        "train": sagemaker.inputs.TrainingInput(
            s3_data=f"s3://{bucket}/train/",
            content_type="application/x-image"
        ),
        "validation": sagemaker.inputs.TrainingInput(
            s3_data=f"s3://{bucket}/test/",
            content_type="application/x-image"
        ),
        "train_lst": sagemaker.inputs.TrainingInput(
            s3_data=f"s3://{bucket}/train.lst",
            content_type="application/x-image"
        ),
        "validation_lst": sagemaker.inputs.TrainingInput(
            s3_data=f"s3://{bucket}/test.lst",
            content_type="application/x-image"
        )
}

Great, now we can train the model using the model_inputs. In the cell below, call the fit method on our model,:

img_classifier_model.fit(model_inputs)

2023-02-06 06:51:13 Starting - Starting the training job...
2023-02-06 06:51:37 Starting - Preparing the instances for trainingProfilerReport-1675666273: InProgress
.........
2023-02-06 06:53:11 Downloading - Downloading input data...
2023-02-06 06:53:32 Training - Downloading the training image...............
2023-02-06 06:56:12 Training - Training image download completed. Training in progress...Docker entrypoint called with argument(s): train
Running default environment configuration script
Nvidia gpu devices, drivers and cuda toolkit versions (only available on hosts with GPU):
Mon Feb  6 06:56:23 2023 

[02/06/2023 06:59:37 INFO 139809553229632] Epoch[29] Train-accuracy=0.984879
[02/06/2023 06:59:37 INFO 139809553229632] Epoch[29] Time cost=5.077
[02/06/2023 06:59:38 INFO 139809553229632] Epoch[29] Validation-accuracy=0.786458

2023-02-06 06:59:53 Uploading - Uploading generated training model
2023-02-06 07:00:24 Completed - Training job completed
Training seconds: 459
Billable seconds: 459

Getting ready to deploy

To begin with, let’s configure Model Monitor to track our deployment. We’ll define a DataCaptureConfig below:

from sagemaker.model_monitor import DataCaptureConfig

data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=f"s3://{bucket}/data_capture"
)

Note the destination_s3_uri parameter: At the end of the project, we can explore the data_capture directory in S3 to find crucial data about the inputs and outputs Model Monitor has observed on our model endpoint over time.

With that done, deploy your model on a single ml.m5.xlarge instance with the data capture config attached:

deployment = img_classifier_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    data_capture_config=data_capture_config
    )

endpoint = deployment.endpoint_name
print(endpoint)

----------!image-classification-2023-02-06-07-02-45-972

Predictor

predictor = deployment

Then we process the payload

from sagemaker.serializers import IdentitySerializer
import base64

predictor.serializer = IdentitySerializer("image/png")
with open("./test/bicycle_s_001789.png", "rb") as f:
    payload = f.read()

    
inference = predictor.predict(payload)

print(inference)

b'[0.9234810471534729, 0.07651900500059128]'

The inference object is an array of two values, the predicted probability value for each of classes (bicycle and motorcycle respectively.) So, for a value of b’[0.92, 0.076]’ indicates the probability of being a bike is 92% and being a motorcycle is 8%.

Lambdas and step function workflow

We will write and deploy three Lambda functions, and then use the Step Functions visual editor to chain them together!

The first lambda function is responsible for data generation. The second one is responsible for image classification. And the third function is responsible for filtering out low-confidence inferences.

First Lambda

import json
import boto3
import base64

s3 = boto3.resource("s3")

def lambda_handler(event, context):
    """A function to serialize target data from S3"""
    
    # Get the s3 address from the Step Function event input
    key =  "test/bicycle_s_000513.png"
    bucket = "sagemaker-us-east-1-221437400076"
    
    # Download the data from s3 to /tmp/image.png
    s3.Bucket(bucket).download_file(key, '/tmp/image.png')
    
    # We read the data from a file
    with open("/tmp/image.png", "rb") as f:
        image_data = base64.b64encode(f.read())

    # Pass the data back to the Step Function
    print("Event:", event.keys())
    return {
        'statusCode': 200,
        'body': {
            "s3_bucket": bucket,
            "s3_key": key,
            "image_data": image_data,
            "inferences": []
        }
    }

Second Lambda

import json
import sagemaker
import boto3
import base64
import os 
from sagemaker.serializers import IdentitySerializer

s3 = boto3.resource('s3')
runtime= boto3.client('runtime.sagemaker')

# Fill this in with the name of your deployed model
ENDPOINT = "image-classification-2023-02-06-07-02-45-972" 

def lambda_handler(event, context):
    
  
    # Decode the image data
    image = base64.b64decode(event['body']['image_data'])

    # Instantiate a Predictor
    predictor = runtime.invoke_endpoint(EndpointName=ENDPOINT, ContentType='image/png', Body=image)
    
    inferences = predictor['Body'].read().decode('utf-8')
    event["inferences"] = [float(x) for x in inferences[1:-1].split(',')]
    
    # We return the data back to the Step Function    
    return {
        'statusCode': 200,
        'body': {
            "image_data": event['body']['image_data'],
            "s3_bucket": event['body']['s3_bucket'],
            "s3_key": event['body']['s3_key'],
            "inferences": event['inferences'],
        }
    }

The third Lambda

import json


THRESHOLD = .93


def lambda_handler(event, context):
    
    # Grab the inferences from the event
    inferences = event['body']['inferences']
    
    # Check if any values in our inferences are above THRESHOLD
    meets_threshold = any (x >= THRESHOLD for x in inferences)
    
    # If our threshold is met, pass our data back out of the
    # Step Function, else, end the Step Function with an error
    if meets_threshold:
        pass
    else:
        raise("THRESHOLD_CONFIDENCE_NOT_MET")

    return {
        'statusCode': 200,
        'body': {
            "image_data": event['body']['image_data'],
            "s3_bucket": event['body']['s3_bucket'],
            "s3_key": event['body']['s3_key'],
            "inferences": event['body']['inferences'],
        }
    }

Step function

Testing and evaluation

import random
import boto3
import json


def generate_test_case():
    # Setup s3 in boto3
    s3 = boto3.resource('s3')
    
    # Randomly pick from sfn or test folders in our bucket
    objects = s3.Bucket(bucket).objects.filter(Prefix="test")
    
    # Grab any random object key from that folder!
    obj = random.choice([x.key for x in objects])
    
    return json.dumps({
        "image_data": "",
        "s3_bucket": bucket,
        "s3_key": obj
    })
generate_test_case()

'{"image_data": "", "s3_bucket": "sagemaker-us-east-1-221437400076", "s3_key": "test/bicycle_s_000777.png"}'

Project Repository

Project: Build a ML Workflow For Scones Unlimited On Amazon SageMaker