Testing S3 Multipart Uploads: Chunked Uploads, Resumable Uploads, and Large File Handling

Testing S3 Multipart Uploads: Chunked Uploads, Resumable Uploads, and Large File Handling

S3 multipart upload is a three-phase protocol: initiate (get an upload ID), upload parts (minimum 5MB each), and complete (assemble the parts). Testing it requires verifying each phase independently, handling aborts, and testing resumable upload state management. moto supports multipart upload end-to-end.

Key Takeaways

Multipart upload is a three-phase transaction. Initiate returns an upload ID. Each part upload uses that ID and returns an ETag. Complete takes the upload ID and all ETags to assemble the final object.

Parts must be at least 5MB — except the last one. S3 rejects CompleteMultipartUpload if any part except the last is smaller than 5MB. Your tests need to handle this constraint.

Abort is as important to test as complete. Failed or abandoned uploads accumulate storage costs unless aborted. Test that your application calls AbortMultipartUpload on failure.

Resumable upload state lives outside S3. S3 gives you an upload ID but doesn't track which parts you've uploaded. Your application must persist the upload ID and completed part ETags to support resume.

moto supports multipart upload fully. You can test all three phases, list in-progress uploads, and verify aborts — all without touching real AWS.

Multipart upload is S3's mechanism for uploading files larger than 5GB (required) or any large file where reliability matters (recommended above 100MB). The protocol is more complex than a simple put_object — it's a three-phase transaction that your application code must manage correctly. When it goes wrong (network failure mid-upload, application crash, disk full), you need to know your cleanup logic works. That's what these tests verify.

Understanding the Multipart Upload Protocol

Before testing, understand what you're testing:

1. CreateMultipartUpload → returns UploadId
2. UploadPart (part 1, using UploadId) → returns ETag
3. UploadPart (part 2, using UploadId) → returns ETag
4. ... (minimum 5MB per part, except last)
5. CompleteMultipartUpload (UploadId + all ETags) → creates final object
   OR
5. AbortMultipartUpload (UploadId) → discards all parts

The upload ID ties the parts together. Without it, you can't complete or abort. This is why resumable uploads must persist the upload ID somewhere (database, Redis, local file).

Basic Multipart Upload Test

import boto3
import pytest
from moto import mock_aws

@pytest.fixture
def s3_client():
    with mock_aws():
        client = boto3.client("s3", region_name="us-east-1")
        yield client

@pytest.fixture
def bucket(s3_client):
    s3_client.create_bucket(Bucket="multipart-test-bucket")
    return "multipart-test-bucket"

def test_complete_multipart_upload(s3_client, bucket):
    key = "large-files/big-dataset.csv"
    
    # Phase 1: Initiate
    response = s3_client.create_multipart_upload(
        Bucket=bucket,
        Key=key,
        ContentType="text/csv"
    )
    upload_id = response["UploadId"]
    assert upload_id  # must be non-empty
    
    # Phase 2: Upload parts (each must be >= 5MB except the last)
    # For moto, the 5MB minimum is not enforced — use realistic sizes in integration tests
    part_size = 5 * 1024 * 1024  # 5MB
    
    parts = []
    for part_number in range(1, 4):  # 3 parts
        is_last = part_number == 3
        data = b"x" * (part_size if not is_last else 1024 * 100)  # last part can be smaller
        
        part_response = s3_client.upload_part(
            Bucket=bucket,
            Key=key,
            UploadId=upload_id,
            PartNumber=part_number,
            Body=data
        )
        
        parts.append({
            "PartNumber": part_number,
            "ETag": part_response["ETag"]
        })
    
    # Phase 3: Complete
    complete_response = s3_client.complete_multipart_upload(
        Bucket=bucket,
        Key=key,
        UploadId=upload_id,
        MultipartUpload={"Parts": parts}
    )
    
    # Verify the object exists and has the right key
    assert complete_response["Key"] == key
    assert complete_response["Bucket"] == bucket
    
    # Verify we can retrieve it
    head = s3_client.head_object(Bucket=bucket, Key=key)
    expected_size = (5 * 1024 * 1024 * 2) + (1024 * 100)
    assert head["ContentLength"] == expected_size
    assert head["ContentType"] == "text/csv"

Testing Multipart Upload Abort

Abort is critical — uncompleted multipart uploads continue to incur storage charges on real AWS. Test that your error handling calls abort:

def test_abort_multipart_upload(s3_client, bucket):
    key = "aborted/upload.bin"
    
    # Start the upload
    response = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
    upload_id = response["UploadId"]
    
    # Upload one part
    s3_client.upload_part(
        Bucket=bucket,
        Key=key,
        UploadId=upload_id,
        PartNumber=1,
        Body=b"x" * 1024
    )
    
    # Verify the in-progress upload is listed
    in_progress = s3_client.list_multipart_uploads(Bucket=bucket)
    assert "Uploads" in in_progress
    upload_ids = [u["UploadId"] for u in in_progress["Uploads"]]
    assert upload_id in upload_ids
    
    # Abort
    s3_client.abort_multipart_upload(
        Bucket=bucket,
        Key=key,
        UploadId=upload_id
    )
    
    # Verify it's no longer listed
    in_progress_after = s3_client.list_multipart_uploads(Bucket=bucket)
    remaining_ids = [u["UploadId"] for u in in_progress_after.get("Uploads", [])]
    assert upload_id not in remaining_ids
    
    # Verify the key doesn't exist (abort removes partial data)
    from botocore.exceptions import ClientError
    with pytest.raises(ClientError) as exc:
        s3_client.head_object(Bucket=bucket, Key=key)
    assert exc.value.response["Error"]["Code"] == "404"

def test_application_aborts_on_upload_failure(s3_client, bucket):
    """Test that your application code properly aborts failed uploads."""
    
    class UploadManager:
        def __init__(self, s3, bucket_name):
            self.s3 = s3
            self.bucket = bucket_name
        
        def upload_with_cleanup(self, key: str, parts_data: list) -> bool:
            response = self.s3.create_multipart_upload(Bucket=self.bucket, Key=key)
            upload_id = response["UploadId"]
            completed_parts = []
            
            try:
                for i, data in enumerate(parts_data, start=1):
                    if data is None:
                        raise ValueError(f"Part {i} data is None — simulated failure")
                    
                    part_resp = self.s3.upload_part(
                        Bucket=self.bucket,
                        Key=key,
                        UploadId=upload_id,
                        PartNumber=i,
                        Body=data
                    )
                    completed_parts.append({"PartNumber": i, "ETag": part_resp["ETag"]})
                
                self.s3.complete_multipart_upload(
                    Bucket=self.bucket,
                    Key=key,
                    UploadId=upload_id,
                    MultipartUpload={"Parts": completed_parts}
                )
                return True
            
            except Exception:
                self.s3.abort_multipart_upload(
                    Bucket=self.bucket, Key=key, UploadId=upload_id
                )
                return False
    
    manager = UploadManager(s3_client, bucket)
    
    # Simulate a failure on part 2 (None data)
    success = manager.upload_with_cleanup(
        "failed/upload.bin",
        [b"part 1 data" * 1000, None, b"part 3 data" * 1000]
    )
    
    assert success is False
    
    # Verify no orphaned uploads
    uploads = s3_client.list_multipart_uploads(Bucket=bucket)
    assert len(uploads.get("Uploads", [])) == 0

Testing Resumable Upload State

Resumable uploads require your application to persist upload state (upload ID + completed parts) so it can be resumed after failure:

import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class UploadState:
    upload_id: str
    bucket: str
    key: str
    completed_parts: list = field(default_factory=list)
    
    def to_json(self) -> str:
        return json.dumps({
            "upload_id": self.upload_id,
            "bucket": self.bucket,
            "key": self.key,
            "completed_parts": self.completed_parts
        })
    
    @classmethod
    def from_json(cls, data: str) -> "UploadState":
        d = json.loads(data)
        return cls(**d)

class ResumableUploader:
    def __init__(self, s3_client, state_store: dict):
        self.s3 = s3_client
        self.state_store = state_store  # dict-like (Redis, DB, etc.)
    
    def start_or_resume(self, bucket: str, key: str) -> UploadState:
        state_key = f"upload:{bucket}:{key}"
        
        if state_key in self.state_store:
            return UploadState.from_json(self.state_store[state_key])
        
        response = self.s3.create_multipart_upload(Bucket=bucket, Key=key)
        state = UploadState(upload_id=response["UploadId"], bucket=bucket, key=key)
        self.state_store[state_key] = state.to_json()
        return state
    
    def upload_part(self, state: UploadState, part_number: int, data: bytes) -> UploadState:
        # Skip already-completed parts
        completed_numbers = [p["PartNumber"] for p in state.completed_parts]
        if part_number in completed_numbers:
            return state
        
        response = self.s3.upload_part(
            Bucket=state.bucket,
            Key=state.key,
            UploadId=state.upload_id,
            PartNumber=part_number,
            Body=data
        )
        
        state.completed_parts.append({
            "PartNumber": part_number,
            "ETag": response["ETag"]
        })
        
        # Persist updated state
        state_key = f"upload:{state.bucket}:{state.key}"
        self.state_store[state_key] = state.to_json()
        return state
    
    def complete(self, state: UploadState):
        parts = sorted(state.completed_parts, key=lambda p: p["PartNumber"])
        self.s3.complete_multipart_upload(
            Bucket=state.bucket,
            Key=state.key,
            UploadId=state.upload_id,
            MultipartUpload={"Parts": parts}
        )
        
        state_key = f"upload:{state.bucket}:{state.key}"
        del self.state_store[state_key]

def test_resumable_upload_persists_state(s3_client, bucket):
    state_store = {}
    uploader = ResumableUploader(s3_client, state_store)
    
    # Start upload
    state = uploader.start_or_resume(bucket, "resumable/big-file.bin")
    upload_id = state.upload_id
    
    # Upload part 1
    state = uploader.upload_part(state, 1, b"x" * 1000)
    assert len(state.completed_parts) == 1
    assert f"upload:{bucket}:resumable/big-file.bin" in state_store

def test_resumable_upload_skips_completed_parts(s3_client, bucket):
    state_store = {}
    uploader = ResumableUploader(s3_client, state_store)
    
    state = uploader.start_or_resume(bucket, "resumable/file2.bin")
    state = uploader.upload_part(state, 1, b"part 1 data" * 100)
    state = uploader.upload_part(state, 2, b"part 2 data" * 100)
    
    # Simulate "resume" — calling upload_part again for part 1 should skip
    parts_before = len(state.completed_parts)
    state = uploader.upload_part(state, 1, b"new data for part 1")  # should be ignored
    
    assert len(state.completed_parts) == parts_before  # no duplicate added

def test_complete_resumable_upload_cleans_state(s3_client, bucket):
    state_store = {}
    uploader = ResumableUploader(s3_client, state_store)
    
    state = uploader.start_or_resume(bucket, "resumable/final.bin")
    state = uploader.upload_part(state, 1, b"only part")
    
    uploader.complete(state)
    
    # State should be cleaned up
    assert f"upload:{bucket}:resumable/final.bin" not in state_store
    
    # Object should exist
    head = s3_client.head_object(Bucket=bucket, Key="resumable/final.bin")
    assert head["ContentLength"] > 0

Testing List In-Progress Uploads

Monitoring in-progress uploads is important for cleanup jobs:

def test_list_multipart_uploads(s3_client, bucket):
    # Start multiple uploads
    upload_ids = []
    for i in range(3):
        response = s3_client.create_multipart_upload(
            Bucket=bucket,
            Key=f"parallel/file{i}.bin"
        )
        upload_ids.append(response["UploadId"])
    
    # List all in-progress uploads
    response = s3_client.list_multipart_uploads(Bucket=bucket)
    
    assert "Uploads" in response
    assert len(response["Uploads"]) == 3
    
    listed_ids = {u["UploadId"] for u in response["Uploads"]}
    for uid in upload_ids:
        assert uid in listed_ids
    
    # List with prefix filter
    prefix_response = s3_client.list_multipart_uploads(
        Bucket=bucket,
        Prefix="parallel/"
    )
    assert len(prefix_response["Uploads"]) == 3

def test_cleanup_stale_uploads(s3_client, bucket):
    """A cleanup job should abort any upload older than N days."""
    
    # Start uploads
    stale_ids = []
    for i in range(2):
        r = s3_client.create_multipart_upload(Bucket=bucket, Key=f"stale/upload{i}.bin")
        stale_ids.append((f"stale/upload{i}.bin", r["UploadId"]))
    
    # Abort all listed uploads (simulating a cleanup job)
    uploads = s3_client.list_multipart_uploads(Bucket=bucket).get("Uploads", [])
    for upload in uploads:
        s3_client.abort_multipart_upload(
            Bucket=bucket,
            Key=upload["Key"],
            UploadId=upload["UploadId"]
        )
    
    # Verify all cleaned up
    after = s3_client.list_multipart_uploads(Bucket=bucket)
    assert len(after.get("Uploads", [])) == 0

These tests give you confidence that your multipart upload code handles the full lifecycle: successful uploads, failures with proper cleanup, and resumability. The moto mock handles all of it without any real AWS calls.

Read more