In ML, jobs represent discrete tasks such as model training, inference, or data processing. Efficient management of these tasks is crucial, especially in shared resource environments. Job queues play a vital role in this process by optimising resource allocation and preventing resource contention. This is particularly important in ML workflows, where tasks are often resource-intensive and time-consuming.

Getting started with queued jobs

To get started with jobs, you need to have the Tensorfuse CLI installed on your machine. You can install the CLI using the following command:

pip install --upgrade pip
pip install --upgrade tensorkube
tensorkube login

Configuration for AWS

You can run the following commands to setup AWS credentials on your machine:

aws configure

or you can manually export them as environment variables:

export AWS_ACCESS_KEY_ID=your_access_key_id
export AWS_SECRET_ACCESS_KEY=your_secret_access_key
export AWS_DEFAULT_REGION=your_default_region

Deploying and Running Jobs

  1. Deploy a Job

    tensorkube job deploy --name <job-name> --gpus <number-of-gpus> --gpu-type <gpu-type> --max-scale <max-scale> --cpu <cpu-units> --memory <memory-size> --secret <secret-name>
    

    This command deploys a job with the specified parameters.

    If your queued jobs also include a different payload for each job, please refer to point 3 for information on how to access the payload in your deployment.

    • --name <job-name>: The name of the job.
    • --gpus <number-of-gpus>: The number of GPUs required for the job. [Default 0]
    • --gpu-type <gpu-type>: The type of GPU required.
    • --max-scale <max-scale>: The maximum scale for the job. [Default 3]
    • --cpu <cpu-units>: The amount of CPU units required. Used only if GPUs are 0. Specified in milliCPUs [Default 100]
    • --memory <memory-size>: The amount of memory required. Specified in MB [Default 200]
    • --secret <secret-key>: The name of the secret required by the job. Can be used multiple times to attach multiple secrets.
  2. Queue a Job

    tensorkube job queue --job-name <job-name> --job-id <job-id> --payload <payload>
    

    This command queues a job by pushing data to the queue, which triggers the execution of the job. Make sure that the job-name matches the job you deployed.

    • --job-name <job-name>: The name of the job to be queued.
    • --job-id <job-id>: The unique identifier for the job.
    • --payload <payload>: The parameters or data to be passed to the job. Data Type: String.
  3. Accessing your payload

To access your payload string inside the deployment, install the tensorkube package in your Docker image and add the following snippet to your code.

from tensorkube import get_queued_message

message = get_queued_message()

If you are sending a json object as a string, remember to convert it back to a json object like so

import json
from tensorkube import get_queued_message

message = json.loads(get_queued_message())
  1. Poll for Job Status

    tensorkube job get --job-name <job-name> --job-id <job-id>
    

    This command returns the status of any particular job

    • --job-name <job-name>: The name of the job to be polled.
    • --job-id <job-id>: The unique identifier for the job whose status you want to check.

Example

Let’s say you have an inference job as follows, and your job payload is the prompt for the inference.

job.py
import torch
import os
import json
from transformers import AutoModelForCausalLM, AutoTokenizer
from tensorkube import get_queued_message


model_dir = "models"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForCausalLM.from_pretrained(
    model_dir,
    torch_dtype="auto",
).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_dir)


def generate_text(input_text: str):
    if not input_text:
        return {"error": "text field is required"}
    prompt = input_text
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt}
    ]
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = tokenizer([text], return_tensors="pt").to(device)

    generated_ids = model.generate(
        model_inputs.input_ids,
        attention_mask=model_inputs.attention_mask,
        max_new_tokens=512
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return {"generated_text": response}


if __name__ == "__main__":
    prompt = json.loads(get_queued_message())
    print(generate_text(prompt['text']))

Create a dockerfile for this as follows:

Dockerfile
# Use the nvidia cuda base image
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04

ENV HF_HUB_ENABLE_HF_TRANSFER=1

# Update and install required packages
RUN apt-get update && apt-get install -y \
    python3.10 \
    python3.10-dev \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

# Set Python 3.10 as the default Python version
RUN ln -s /usr/bin/python3.10 /usr/bin/python

# Upgrade pip
RUN pip3 install --no-cache-dir --upgrade pip && pip install transformers && pip install tensorkube torch huggingface_hub[hf_transfer]

# # Set working directory
WORKDIR /code

# Copy the code files
COPY download.py /code/download.py

# Run the downloader script to download the model
RUN python download.py

COPY job.py /code/job.py

CMD ["python", "job.py"]

Here the download.py file is used to download the model from HuggingFace and store it during build. It will be as follows:

download.py
import os

from huggingface_hub import snapshot_download
from tensorkube import get_queued_message
from transformers import utils
access_token = "<TOKEN>"


if __name__=='__main__':
    # download the meta/llama3 model
    os.makedirs('./models/qwen', exist_ok=True)
    snapshot_download(repo_id="Qwen/Qwen2-1.5B-Instruct", local_dir="models",ignore_patterns=["*.pt", "*.bin"], token=access_token)
    utils.move_cache()

Deployment

Deploy this job definition:

tensorkube job deploy --name inference-job --gpus 1 --gpu-type a10g

Queue a job process:

tensorkube job queue --job-name inference-job --job-id 1 --payload '{"text": "What is life?"}'

Get its status:

tensorkube job get --job-name inference-job --job-id 1

Programmatic Access to Job Queues

You can also queue jobs programmatically from your python code.

Prerequisetes:

  1. Tensorkube: Install tensorkube the tensorkube package using the command
pip install tensorkube
  1. AWS CLI: This is used by the tensorkube package to be able to access the EKS cluster. You can find the steps to install AWS CLI here: https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html

  2. Configure AWS: Run the aws configure and enter your ACCESS_KEY_ID, SECRET_ACCESS_KEY, SESSION_TOKEN(only for Identity Center User) and REGION values as you are prompted. You can also directly modify your ~/.aws/credentials file. Read more about configuring your AWS CLI here https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html

Code Snippet

To programmatically queue a job, add the following snippet to your code

job_queue.py
from tensorkube import queue_new_job

job_name=<YOUR_DEPLOYED_JOB_NAME>
job_id=<ID_OF_NEW_QUEUED_JOB>
job_payload=<STRING_PAYLOAD_FOR_JOB>

queue_new_job(job_name, job_id, job_payload)