Testing S3 Multipart Uploads: Chunked Uploads, Resumable Uploads, and Large File Handling
S3 multipart upload is a three-phase protocol: initiate (get an upload ID), upload parts (minimum 5MB each), and complete (assemble the parts). Testing it requires verifying each phase independently, handling aborts, and testing resumable upload state management. moto supports multipart upload end-to-end.
Key Takeaways
Multipart upload is a three-phase transaction. Initiate returns an upload ID. Each part upload uses that ID and returns an ETag. Complete takes the upload ID and all ETags to assemble the final object.
Parts must be at least 5MB — except the last one. S3 rejects CompleteMultipartUpload if any part except the last is smaller than 5MB. Your tests need to handle this constraint.
Abort is as important to test as complete. Failed or abandoned uploads accumulate storage costs unless aborted. Test that your application calls AbortMultipartUpload on failure.
Resumable upload state lives outside S3. S3 gives you an upload ID but doesn't track which parts you've uploaded. Your application must persist the upload ID and completed part ETags to support resume.
moto supports multipart upload fully. You can test all three phases, list in-progress uploads, and verify aborts — all without touching real AWS.
Multipart upload is S3's mechanism for uploading files larger than 5GB (required) or any large file where reliability matters (recommended above 100MB). The protocol is more complex than a simple put_object — it's a three-phase transaction that your application code must manage correctly. When it goes wrong (network failure mid-upload, application crash, disk full), you need to know your cleanup logic works. That's what these tests verify.
Understanding the Multipart Upload Protocol
Before testing, understand what you're testing:
1. CreateMultipartUpload → returns UploadId
2. UploadPart (part 1, using UploadId) → returns ETag
3. UploadPart (part 2, using UploadId) → returns ETag
4. ... (minimum 5MB per part, except last)
5. CompleteMultipartUpload (UploadId + all ETags) → creates final object
OR
5. AbortMultipartUpload (UploadId) → discards all partsThe upload ID ties the parts together. Without it, you can't complete or abort. This is why resumable uploads must persist the upload ID somewhere (database, Redis, local file).
Basic Multipart Upload Test
import boto3
import pytest
from moto import mock_aws
@pytest.fixture
def s3_client():
with mock_aws():
client = boto3.client("s3", region_name="us-east-1")
yield client
@pytest.fixture
def bucket(s3_client):
s3_client.create_bucket(Bucket="multipart-test-bucket")
return "multipart-test-bucket"
def test_complete_multipart_upload(s3_client, bucket):
key = "large-files/big-dataset.csv"
# Phase 1: Initiate
response = s3_client.create_multipart_upload(
Bucket=bucket,
Key=key,
ContentType="text/csv"
)
upload_id = response["UploadId"]
assert upload_id # must be non-empty
# Phase 2: Upload parts (each must be >= 5MB except the last)
# For moto, the 5MB minimum is not enforced — use realistic sizes in integration tests
part_size = 5 * 1024 * 1024 # 5MB
parts = []
for part_number in range(1, 4): # 3 parts
is_last = part_number == 3
data = b"x" * (part_size if not is_last else 1024 * 100) # last part can be smaller
part_response = s3_client.upload_part(
Bucket=bucket,
Key=key,
UploadId=upload_id,
PartNumber=part_number,
Body=data
)
parts.append({
"PartNumber": part_number,
"ETag": part_response["ETag"]
})
# Phase 3: Complete
complete_response = s3_client.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={"Parts": parts}
)
# Verify the object exists and has the right key
assert complete_response["Key"] == key
assert complete_response["Bucket"] == bucket
# Verify we can retrieve it
head = s3_client.head_object(Bucket=bucket, Key=key)
expected_size = (5 * 1024 * 1024 * 2) + (1024 * 100)
assert head["ContentLength"] == expected_size
assert head["ContentType"] == "text/csv"Testing Multipart Upload Abort
Abort is critical — uncompleted multipart uploads continue to incur storage charges on real AWS. Test that your error handling calls abort:
def test_abort_multipart_upload(s3_client, bucket):
key = "aborted/upload.bin"
# Start the upload
response = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = response["UploadId"]
# Upload one part
s3_client.upload_part(
Bucket=bucket,
Key=key,
UploadId=upload_id,
PartNumber=1,
Body=b"x" * 1024
)
# Verify the in-progress upload is listed
in_progress = s3_client.list_multipart_uploads(Bucket=bucket)
assert "Uploads" in in_progress
upload_ids = [u["UploadId"] for u in in_progress["Uploads"]]
assert upload_id in upload_ids
# Abort
s3_client.abort_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id
)
# Verify it's no longer listed
in_progress_after = s3_client.list_multipart_uploads(Bucket=bucket)
remaining_ids = [u["UploadId"] for u in in_progress_after.get("Uploads", [])]
assert upload_id not in remaining_ids
# Verify the key doesn't exist (abort removes partial data)
from botocore.exceptions import ClientError
with pytest.raises(ClientError) as exc:
s3_client.head_object(Bucket=bucket, Key=key)
assert exc.value.response["Error"]["Code"] == "404"
def test_application_aborts_on_upload_failure(s3_client, bucket):
"""Test that your application code properly aborts failed uploads."""
class UploadManager:
def __init__(self, s3, bucket_name):
self.s3 = s3
self.bucket = bucket_name
def upload_with_cleanup(self, key: str, parts_data: list) -> bool:
response = self.s3.create_multipart_upload(Bucket=self.bucket, Key=key)
upload_id = response["UploadId"]
completed_parts = []
try:
for i, data in enumerate(parts_data, start=1):
if data is None:
raise ValueError(f"Part {i} data is None — simulated failure")
part_resp = self.s3.upload_part(
Bucket=self.bucket,
Key=key,
UploadId=upload_id,
PartNumber=i,
Body=data
)
completed_parts.append({"PartNumber": i, "ETag": part_resp["ETag"]})
self.s3.complete_multipart_upload(
Bucket=self.bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={"Parts": completed_parts}
)
return True
except Exception:
self.s3.abort_multipart_upload(
Bucket=self.bucket, Key=key, UploadId=upload_id
)
return False
manager = UploadManager(s3_client, bucket)
# Simulate a failure on part 2 (None data)
success = manager.upload_with_cleanup(
"failed/upload.bin",
[b"part 1 data" * 1000, None, b"part 3 data" * 1000]
)
assert success is False
# Verify no orphaned uploads
uploads = s3_client.list_multipart_uploads(Bucket=bucket)
assert len(uploads.get("Uploads", [])) == 0Testing Resumable Upload State
Resumable uploads require your application to persist upload state (upload ID + completed parts) so it can be resumed after failure:
import json
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class UploadState:
upload_id: str
bucket: str
key: str
completed_parts: list = field(default_factory=list)
def to_json(self) -> str:
return json.dumps({
"upload_id": self.upload_id,
"bucket": self.bucket,
"key": self.key,
"completed_parts": self.completed_parts
})
@classmethod
def from_json(cls, data: str) -> "UploadState":
d = json.loads(data)
return cls(**d)
class ResumableUploader:
def __init__(self, s3_client, state_store: dict):
self.s3 = s3_client
self.state_store = state_store # dict-like (Redis, DB, etc.)
def start_or_resume(self, bucket: str, key: str) -> UploadState:
state_key = f"upload:{bucket}:{key}"
if state_key in self.state_store:
return UploadState.from_json(self.state_store[state_key])
response = self.s3.create_multipart_upload(Bucket=bucket, Key=key)
state = UploadState(upload_id=response["UploadId"], bucket=bucket, key=key)
self.state_store[state_key] = state.to_json()
return state
def upload_part(self, state: UploadState, part_number: int, data: bytes) -> UploadState:
# Skip already-completed parts
completed_numbers = [p["PartNumber"] for p in state.completed_parts]
if part_number in completed_numbers:
return state
response = self.s3.upload_part(
Bucket=state.bucket,
Key=state.key,
UploadId=state.upload_id,
PartNumber=part_number,
Body=data
)
state.completed_parts.append({
"PartNumber": part_number,
"ETag": response["ETag"]
})
# Persist updated state
state_key = f"upload:{state.bucket}:{state.key}"
self.state_store[state_key] = state.to_json()
return state
def complete(self, state: UploadState):
parts = sorted(state.completed_parts, key=lambda p: p["PartNumber"])
self.s3.complete_multipart_upload(
Bucket=state.bucket,
Key=state.key,
UploadId=state.upload_id,
MultipartUpload={"Parts": parts}
)
state_key = f"upload:{state.bucket}:{state.key}"
del self.state_store[state_key]
def test_resumable_upload_persists_state(s3_client, bucket):
state_store = {}
uploader = ResumableUploader(s3_client, state_store)
# Start upload
state = uploader.start_or_resume(bucket, "resumable/big-file.bin")
upload_id = state.upload_id
# Upload part 1
state = uploader.upload_part(state, 1, b"x" * 1000)
assert len(state.completed_parts) == 1
assert f"upload:{bucket}:resumable/big-file.bin" in state_store
def test_resumable_upload_skips_completed_parts(s3_client, bucket):
state_store = {}
uploader = ResumableUploader(s3_client, state_store)
state = uploader.start_or_resume(bucket, "resumable/file2.bin")
state = uploader.upload_part(state, 1, b"part 1 data" * 100)
state = uploader.upload_part(state, 2, b"part 2 data" * 100)
# Simulate "resume" — calling upload_part again for part 1 should skip
parts_before = len(state.completed_parts)
state = uploader.upload_part(state, 1, b"new data for part 1") # should be ignored
assert len(state.completed_parts) == parts_before # no duplicate added
def test_complete_resumable_upload_cleans_state(s3_client, bucket):
state_store = {}
uploader = ResumableUploader(s3_client, state_store)
state = uploader.start_or_resume(bucket, "resumable/final.bin")
state = uploader.upload_part(state, 1, b"only part")
uploader.complete(state)
# State should be cleaned up
assert f"upload:{bucket}:resumable/final.bin" not in state_store
# Object should exist
head = s3_client.head_object(Bucket=bucket, Key="resumable/final.bin")
assert head["ContentLength"] > 0Testing List In-Progress Uploads
Monitoring in-progress uploads is important for cleanup jobs:
def test_list_multipart_uploads(s3_client, bucket):
# Start multiple uploads
upload_ids = []
for i in range(3):
response = s3_client.create_multipart_upload(
Bucket=bucket,
Key=f"parallel/file{i}.bin"
)
upload_ids.append(response["UploadId"])
# List all in-progress uploads
response = s3_client.list_multipart_uploads(Bucket=bucket)
assert "Uploads" in response
assert len(response["Uploads"]) == 3
listed_ids = {u["UploadId"] for u in response["Uploads"]}
for uid in upload_ids:
assert uid in listed_ids
# List with prefix filter
prefix_response = s3_client.list_multipart_uploads(
Bucket=bucket,
Prefix="parallel/"
)
assert len(prefix_response["Uploads"]) == 3
def test_cleanup_stale_uploads(s3_client, bucket):
"""A cleanup job should abort any upload older than N days."""
# Start uploads
stale_ids = []
for i in range(2):
r = s3_client.create_multipart_upload(Bucket=bucket, Key=f"stale/upload{i}.bin")
stale_ids.append((f"stale/upload{i}.bin", r["UploadId"]))
# Abort all listed uploads (simulating a cleanup job)
uploads = s3_client.list_multipart_uploads(Bucket=bucket).get("Uploads", [])
for upload in uploads:
s3_client.abort_multipart_upload(
Bucket=bucket,
Key=upload["Key"],
UploadId=upload["UploadId"]
)
# Verify all cleaned up
after = s3_client.list_multipart_uploads(Bucket=bucket)
assert len(after.get("Uploads", [])) == 0These tests give you confidence that your multipart upload code handles the full lifecycle: successful uploads, failures with proper cleanup, and resumability. The moto mock handles all of it without any real AWS calls.