Skip to content

[Xet] Basic shard creation #1633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3b5e2b9
createXorbs also outputs file hash, sha256 and representation
coyotte508 Jul 16, 2025
3127a01
createXorbs handles a stream of blobs
coyotte508 Jul 16, 2025
099cc40
basic shard creation
coyotte508 Jul 16, 2025
8b35869
shard magic tag
coyotte508 Jul 16, 2025
6676431
remove shard key expiry
coyotte508 Jul 16, 2025
3fc715b
actually make API calls to xet backend to upload shards/xorbs
coyotte508 Jul 18, 2025
7a38fc7
fix prefix for shard upload
coyotte508 Jul 18, 2025
947a926
fix verificaiton data
coyotte508 Jul 21, 2025
9cd2e66
update wasm bindings
coyotte508 Jul 21, 2025
bf8ae1c
fixup! update wasm bindings
coyotte508 Jul 21, 2025
70f0a0e
no need to compute shard hash client-side
coyotte508 Jul 21, 2025
431949b
no need for shard hash
coyotte508 Jul 23, 2025
515cae6
Merge remote-tracking branch 'origin/main' into shard-creation
coyotte508 Jul 24, 2025
adbe363
add va prefix to xorb too
coyotte508 Jul 24, 2025
2f9d4a0
progress events for uploading xorbs
coyotte508 Jul 24, 2025
a13da79
commit leftover xorb after all chunks have been processed
coyotte508 Jul 24, 2025
3cbde5e
integrate xet upload in commit function
coyotte508 Jul 24, 2025
651f6c0
add local dedup for xet uploads
coyotte508 Jul 24, 2025
f3e190f
move chunk caching to its own class/file
coyotte508 Jul 24, 2025
06dcd6d
Make sure to not OOB when writing shards
coyotte508 Jul 25, 2025
c093c64
dedup boolean when loading chunks from wasm
coyotte508 Jul 29, 2025
2e27699
global dedup! (just need hmac algorithm)
coyotte508 Jul 29, 2025
7c397dd
delay file events until matching xorb is emitted
coyotte508 Jul 29, 2025
5f3a61b
add dedup ratio to information
coyotte508 Jul 29, 2025
66299a3
fixup! add dedup ratio to information
coyotte508 Jul 29, 2025
988d85b
use hmac function from wasm
coyotte508 Jul 29, 2025
433284e
add bench script
coyotte508 Jul 31, 2025
74dfb71
fix wasm instantiation
coyotte508 Jul 31, 2025
b19209f
fix api calls
coyotte508 Jul 31, 2025
6d03446
use custom fetch for chunk call
coyotte508 Aug 1, 2025
627f024
correct global dedup call
coyotte508 Aug 1, 2025
e2fd1c1
fixes and add a shard file for tests
coyotte508 Aug 1, 2025
29eaef7
more recent target for tsup
coyotte508 Aug 1, 2025
c6f922f
fixes with xet protocol
coyotte508 Aug 1, 2025
9dbd3af
endianness matters when writing/reading hashes
coyotte508 Aug 1, 2025
072d889
shard parser works
coyotte508 Aug 1, 2025
82dffa6
fix OOBs
coyotte508 Aug 1, 2025
63d2e07
top-level comment
coyotte508 Aug 1, 2025
ad697ae
update wasm
coyotte508 Aug 1, 2025
c85003f
improve stats in bench scritp
coyotte508 Aug 1, 2025
846de2b
fix data intake for createXorb
coyotte508 Aug 1, 2025
23afed8
add commit option to bench script
coyotte508 Aug 1, 2025
235ce44
fix PUT => POST calls
coyotte508 Aug 1, 2025
ab9ced2
error in dedup
coyotte508 Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/hub/scripts/build-xet-wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ fi

# copy the generated hf_xet_thin_wasm_bg.js to the hub package and hf_xet_thin_wasm_bg.wasm to the hub package
cp "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.js" "./src/vendor/xet-chunk/chunker_wasm_bg.js"
cp "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.wasm.d.ts" "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.d.ts"
echo "// Generated by build-xet-wasm.sh" > "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
echo "export const wasmBase64 = atob(\`" >> "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
base64 "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.wasm" | fold -w 100 >> "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
Expand Down
176 changes: 126 additions & 50 deletions packages/hub/src/utils/createXorbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,70 +13,146 @@ const MAX_CHUNK_SIZE = 2 * TARGET_CHUNK_SIZE;
const XORB_SIZE = 64 * 1024 * 1024;
const MAX_XORB_CHUNKS = 8 * 1024;

export async function* createXorbs(
fileSource: Blob
): AsyncGenerator<{ xorb: Uint8Array; hash: string }, void, undefined> {
export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGenerator<
| {
type: "xorb";
xorb: Uint8Array;
hash: string;
id: number;
chunks: Array<{ hash: string; length: number; offset: number }>;
}
| {
type: "file";
hash: string;
verificationHash: string;
sha256: string;
representation: Array<{
xorbId: number;
offset: number;
endOffset: number;
/** Unpacked length */
length: number;
}>;
},
void,
undefined
> {
const chunkModule = await import("../vendor/xet-chunk/chunker_wasm");
const sha256Module = await import("../vendor/hash-wasm/sha256-wrapper");
let xorbId = 0;

await chunkModule.init();
const chunker = new chunkModule.Chunker(TARGET_CHUNK_SIZE);

let xorb = new Uint8Array(XORB_SIZE);
const sourceChunks: Array<Uint8Array> = [];
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number; offset: number }>();

try {
const reader = fileSource.stream().getReader();
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number }>();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
// Failure to write chunk, maybe because it went over xorb size limit
yield { xorb: xorb.subarray(0, xorbOffset), hash: "" };
xorb = new Uint8Array(XORB_SIZE);
xorbOffset = writeChunk(xorb, 0, chunkToCopy);
for await (const fileSource of fileSources) {
const initialXorbOffset = xorbOffset;
const sourceChunks: Array<Uint8Array> = [];

const reader = fileSource.stream().getReader();
const fileChunks: Array<{ hash: string; length: number }> = [];
const fileRepresentation: Array<{ xorbId: number; offset: number; endOffset: number; length: number }> = [];

const sha256 = await sha256Module.createSHA256();
sha256.init();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkOffset = xorbOffset;
fileChunks.push({ hash: chunk.hash, length: chunk.length });
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
// Failure to write chunk, maybe because it went over xorb size limit
yield {
type: "xorb" as const,
xorb: xorb.subarray(0, xorbOffset),
hash: chunkModule.compute_xorb_hash(xorbChunks),
chunks: [...xorbChunks],
id: xorbId,
};
xorbId++;
xorb = new Uint8Array(XORB_SIZE);
chunkOffset = 0;
xorbOffset = writeChunk(xorb, 0, chunkToCopy);

if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
}
}
const lastRep = fileRepresentation.at(-1);

if (!lastRep) {
fileRepresentation.push({
xorbId,
offset: initialXorbOffset,
endOffset: xorbOffset - initialXorbOffset,
length: chunk.length,
});
} else {
if (lastRep.xorbId === xorbId) {
lastRep.endOffset = xorbOffset - lastRep.offset;
lastRep.length += chunk.length;
} else {
fileRepresentation.push({ xorbId, offset: 0, endOffset: xorbOffset, length: chunk.length });
}
}
xorbChunks.push({ hash: chunk.hash, length: chunk.length, offset: chunkOffset });
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield {
type: "xorb" as const,
xorb: xorb.subarray(0, xorbOffset),
hash: chunkModule.compute_xorb_hash(xorbChunks),
chunks: [...xorbChunks],
id: xorbId,
};
xorbId++;
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
}
}
xorbChunks.push(chunk);
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield { xorb: xorb.subarray(0, xorbOffset), hash: chunkModule.compute_xorb_hash(xorbChunks) };
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
}
sourceChunks.push(value);
sha256.update(value);
yield* addChunks(chunker.add_data(value));
}
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
}
sourceChunks.push(value);
yield* addChunks(chunker.add_data(value));
yield {
type: "file" as const,
hash: chunkModule.compute_file_hash(fileChunks),
verificationHash: chunkModule.compute_range_verification_hash(fileChunks.map((x) => x.hash)),
sha256: sha256.digest("hex"),
representation: fileRepresentation,
};
}
} finally {
chunker.free();
Expand Down
Loading
Loading