Abstract
The aim of this project is to build ML Workflow on AWS Sagemaker
Background
Image Classifiers are used in the field of computer vision to identify the content of an image and it is used across a broad variety of industries, from advanced technologies like autonomous vehicles and augmented reality, to eCommerce platforms, and even in diagnostic medicine.
The image classification model can help the team in a variety of ways in their operating environment: detecting people and vehicles in video feeds from roadways, better support routing for their engagement on social media, detecting defects in their scones, and many more!
In this project, we’ll be building an image classification model that can automatically detect which kind of vehicle delivery drivers have, in order to route them to the correct loading bay and orders. Assigning delivery professionals who have a bicycle to nearby orders and giving motorcyclists orders that are farther can help Unlimited optimize operations.
In this project, we’ll use AWS Sagemaker to build an image classification model that can tell bicycles apart from motorcycles. we’ll deploy our model, use AWS Lambda functions to build supporting services, and AWS Step Functions to compose the model and services into an event-driven application.
Dataset
The dataset is CIFAR-100. The CIFAR dataset is open source and generously hosted by the University of Toronto at: https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz.
Project Steps Overview
Data staging
In AWS sagemeker studio the notebook we choose has been tested on the Python 3 (Data Science) kernel with the ml.t3.medium instance.
Data extraction from a hosting service
import requests
def extract_cifar_data(url, filename="cifar.tar.gz"):
"""A function for extracting the CIFAR-100 dataset and storing it as a gzipped file
Arguments:
url -- the URL where the dataset is hosted
filename -- the full path where the dataset will be written
"""
r = requests.get(url, filename)
with open(filename, "wb") as file_context:
file_context.write(r.content)
return
extract_cifar_data("https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz")
Data transformation into a usable shape and format
Clearly, distributing the data as a gzipped archive makes sense for the hosting service! It saves on bandwidth and storage, and it’s a widely-used archive format. In fact, it’s so widely used that the Python community ships a utility for working with them, tarfile, as part of its Standard Library.
import tarfile
with tarfile.open("cifar.tar.gz", "r:gz") as tar:
tar.extractall()
import pickle
with open("./cifar-100-python/meta", "rb") as f:
dataset_meta = pickle.load(f, encoding='bytes')
with open("./cifar-100-python/test", "rb") as f:
dataset_test = pickle.load(f, encoding='bytes')
with open("./cifar-100-python/train", "rb") as f:
dataset_train = pickle.load(f, encoding='bytes')
Now we can construct a dataframe
df_train = pd.DataFrame({
"filenames": dataset_train[b'filenames'],
"labels": dataset_train[b'fine_labels'],
"row": range(len(dataset_train[b'filenames']))
})
# Drop all rows from df_train where label is not 8 or 48
df_train = df_train.loc[df_train['labels'].isin([8,48])]
# Decode df_train.filenames so they are regular strings
df_train["filenames"] = df_train["filenames"].apply(
lambda x: x.decode("utf-8")
)
df_test = pd.DataFrame({
"filenames": dataset_test[b'filenames'],
"labels": dataset_test[b'fine_labels'],
"row": range(len(dataset_test[b'filenames']))
})
# Drop all rows from df_test where label is not 8 or 48
df_test = df_test.loc[df_test['labels'].isin([8,48])]
# Decode df_test.filenames so they are regular strings
df_test["filenames"] = df_test["filenames"].apply(
lambda x: x.decode("utf-8")
)
Now that the data is filtered for just our classes, we can save all our images.
def save_images(dataset, path, df):
#Grab the image data in row-major form
for x in df['row']:
img = dataset[b'data'][x]
# Consolidated stacking/reshaping from earlier
target = np.dstack((
img[0:1024].reshape(32,32),
img[1024:2048].reshape(32,32),
img[2048:].reshape(32,32)
))
# Save the image
plt.imsave(path+'/'+df['filenames'][x], target)
# Return any signal data you want for debugging
return
save_images(dataset = dataset_train, path="./train", df=df_train)
save_images(dataset = dataset_test, path="./test", df=df_test)
Load data into production system
import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role
session = sagemaker.Session()
bucket= session.default_bucket()
print("Default Bucket: {}".format(bucket))
region = session.boto_region_name
print("AWS Region: {}".format(region))
role = get_execution_role()
print("RoleArn: {}".format(role))
Now we can sync our data into AWS S3
import os
os.environ["DEFAULT_S3_BUCKET"] = bucket
!aws s3 sync ./train s3://${DEFAULT_S3_BUCKET}/train/
!aws s3 sync ./test s3://${DEFAULT_S3_BUCKET}/test/
Model training and deployment
For Image Classification, Sagemaker also expects metadata e.g. in the form of TSV files with labels and filepaths. We can generate these using our Pandas DataFrames
def to_metadata_file(df, prefix):
df["s3_path"] = df["filenames"]
df["labels"] = df["labels"].apply(lambda x: 0 if x==8 else 1)
return df[["row", "labels", "s3_path"]].to_csv(
f"{prefix}.lst", sep="\t", index=False, header=False
)
to_metadata_file(df_train.copy(), "train")
to_metadata_file(df_test.copy(), "test")
Thereafter we can aupload our manifest file to s3
import boto3
# Upload files
boto3.Session().resource('s3').Bucket(
bucket).Object('train.lst').upload_file('./train.lst')
boto3.Session().resource('s3').Bucket(
bucket).Object('test.lst').upload_file('./test.lst')
Using the bucket
and region
info we can get the latest prebuilt container to run our training job, and define an output location on our s3 bucket for the model. Use the image_uris
function from the SageMaker SDK to retrieve the latest image-classification
image below:
# Use the image_uris function to retrieve the latest 'image-classification' image
region_name = boto3.Session().region_name
algo_image = sagemaker.image_uris.retrieve(region=region_name, framework="image-classification")
s3_output_location = f"s3://{bucket}/models/image_model"
We’re ready to create an estimator! Create an estimator img_classifier_model
that uses one instance of ml.p3.2xlarge.
img_classifier_model=sagemaker.estimator.Estimator(
algo_image,
role=role,
instance_count=1,
instance_type='ml.p3.2xlarge',
input_mode="File",
output_path=s3_output_location,
sagemaker_session=session
)
Hyper parameters
img_classifier_model.set_hyperparameters(
image_shape='3,32,32',
num_classes=2,
num_training_samples=len(df_train)
)
The image-classification
image uses four input channels with very specific input parameters.
from sagemaker.debugger import Rule, rule_configs
from sagemaker.session import TrainingInput
model_inputs = {
"train": sagemaker.inputs.TrainingInput(
s3_data=f"s3://{bucket}/train/",
content_type="application/x-image"
),
"validation": sagemaker.inputs.TrainingInput(
s3_data=f"s3://{bucket}/test/",
content_type="application/x-image"
),
"train_lst": sagemaker.inputs.TrainingInput(
s3_data=f"s3://{bucket}/train.lst",
content_type="application/x-image"
),
"validation_lst": sagemaker.inputs.TrainingInput(
s3_data=f"s3://{bucket}/test.lst",
content_type="application/x-image"
)
}
Great, now we can train the model using the model_inputs. In the cell below, call the fit
method on our model,:
img_classifier_model.fit(model_inputs)
2023-02-06 06:51:13 Starting - Starting the training job...
2023-02-06 06:51:37 Starting - Preparing the instances for trainingProfilerReport-1675666273: InProgress
.........
2023-02-06 06:53:11 Downloading - Downloading input data...
2023-02-06 06:53:32 Training - Downloading the training image...............
2023-02-06 06:56:12 Training - Training image download completed. Training in progress...Docker entrypoint called with argument(s): train
Running default environment configuration script
Nvidia gpu devices, drivers and cuda toolkit versions (only available on hosts with GPU):
Mon Feb 6 06:56:23 2023
[02/06/2023 06:59:37 INFO 139809553229632] Epoch[29] Train-accuracy=0.984879
[02/06/2023 06:59:37 INFO 139809553229632] Epoch[29] Time cost=5.077
[02/06/2023 06:59:38 INFO 139809553229632] Epoch[29] Validation-accuracy=0.786458
2023-02-06 06:59:53 Uploading - Uploading generated training model
2023-02-06 07:00:24 Completed - Training job completed
Training seconds: 459
Billable seconds: 459
Getting ready to deploy
To begin with, let’s configure Model Monitor to track our deployment. We’ll define a DataCaptureConfig below:
from sagemaker.model_monitor import DataCaptureConfig
data_capture_config = DataCaptureConfig(
enable_capture=True,
sampling_percentage=100,
destination_s3_uri=f"s3://{bucket}/data_capture"
)
Note the destination_s3_uri
parameter: At the end of the project, we can explore the data_capture
directory in S3 to find crucial data about the inputs and outputs Model Monitor has observed on our model endpoint over time.
With that done, deploy your model on a single ml.m5.xlarge
instance with the data capture config attached:
deployment = img_classifier_model.deploy(
initial_instance_count=1,
instance_type='ml.m5.xlarge',
data_capture_config=data_capture_config
)
endpoint = deployment.endpoint_name
print(endpoint)
----------!image-classification-2023-02-06-07-02-45-972
Predictor
predictor = deployment
Then we process the payload
from sagemaker.serializers import IdentitySerializer
import base64
predictor.serializer = IdentitySerializer("image/png")
with open("./test/bicycle_s_001789.png", "rb") as f:
payload = f.read()
inference = predictor.predict(payload)
print(inference)
b'[0.9234810471534729, 0.07651900500059128]'
The inference object is an array of two values, the predicted probability value for each of classes (bicycle and motorcycle respectively.) So, for a value of b’[0.92, 0.076]’ indicates the probability of being a bike is 92% and being a motorcycle is 8%.
Lambdas and step function workflow
We will write and deploy three Lambda functions, and then use the Step Functions visual editor to chain them together!
The first lambda function is responsible for data generation. The second one is responsible for image classification. And the third function is responsible for filtering out low-confidence inferences.
First Lambda
import json
import boto3
import base64
s3 = boto3.resource("s3")
def lambda_handler(event, context):
"""A function to serialize target data from S3"""
# Get the s3 address from the Step Function event input
key = "test/bicycle_s_000513.png"
bucket = "sagemaker-us-east-1-221437400076"
# Download the data from s3 to /tmp/image.png
s3.Bucket(bucket).download_file(key, '/tmp/image.png')
# We read the data from a file
with open("/tmp/image.png", "rb") as f:
image_data = base64.b64encode(f.read())
# Pass the data back to the Step Function
print("Event:", event.keys())
return {
'statusCode': 200,
'body': {
"s3_bucket": bucket,
"s3_key": key,
"image_data": image_data,
"inferences": []
}
}
Second Lambda
import json
import sagemaker
import boto3
import base64
import os
from sagemaker.serializers import IdentitySerializer
s3 = boto3.resource('s3')
runtime= boto3.client('runtime.sagemaker')
# Fill this in with the name of your deployed model
ENDPOINT = "image-classification-2023-02-06-07-02-45-972"
def lambda_handler(event, context):
# Decode the image data
image = base64.b64decode(event['body']['image_data'])
# Instantiate a Predictor
predictor = runtime.invoke_endpoint(EndpointName=ENDPOINT, ContentType='image/png', Body=image)
inferences = predictor['Body'].read().decode('utf-8')
event["inferences"] = [float(x) for x in inferences[1:-1].split(',')]
# We return the data back to the Step Function
return {
'statusCode': 200,
'body': {
"image_data": event['body']['image_data'],
"s3_bucket": event['body']['s3_bucket'],
"s3_key": event['body']['s3_key'],
"inferences": event['inferences'],
}
}
The third Lambda
import json
THRESHOLD = .93
def lambda_handler(event, context):
# Grab the inferences from the event
inferences = event['body']['inferences']
# Check if any values in our inferences are above THRESHOLD
meets_threshold = any (x >= THRESHOLD for x in inferences)
# If our threshold is met, pass our data back out of the
# Step Function, else, end the Step Function with an error
if meets_threshold:
pass
else:
raise("THRESHOLD_CONFIDENCE_NOT_MET")
return {
'statusCode': 200,
'body': {
"image_data": event['body']['image_data'],
"s3_bucket": event['body']['s3_bucket'],
"s3_key": event['body']['s3_key'],
"inferences": event['body']['inferences'],
}
}
Step function
Testing and evaluation
import random
import boto3
import json
def generate_test_case():
# Setup s3 in boto3
s3 = boto3.resource('s3')
# Randomly pick from sfn or test folders in our bucket
objects = s3.Bucket(bucket).objects.filter(Prefix="test")
# Grab any random object key from that folder!
obj = random.choice([x.key for x in objects])
return json.dumps({
"image_data": "",
"s3_bucket": bucket,
"s3_key": obj
})
generate_test_case()
'{"image_data": "", "s3_bucket": "sagemaker-us-east-1-221437400076", "s3_key": "test/bicycle_s_000777.png"}'
Project Repository
Project: Build a ML Workflow For Scones Unlimited On Amazon SageMaker