服务端实现
txt
.
├── server.js # 服务端实现
├── uploads # 上传文件存放目录
└── package.json # 依赖管理文件
js
import fs from "node:fs";
import path from "node:path";
import stream from "node:stream/promises";
import cors from "cors";
import express from "express";
import multer from "multer";
//////////////////////////////////////////
// setup express app
//////////////////////////////////////////
const app = express();
const port = 3000;
const host = `http://localhost:${port}`;
const config = Object.freeze({
host,
delimiter: "@", // 保存/合并文件分片文件名的分隔符
uploadDir: "uploads", // 文件上传后保存目录
});
app.set("config", config);
app.use(cors());
app.use(express.static(config.uploadDir));
// 文件上传中间件 multer 中间的 storage 配置
// 注意: 由于 multer 中间件是按照客户端的 formData 字段的顺序
// 来解析 req.body 的字段, 所以为了确保能够正确获取 req.body 的其他字段(除文件外),
// 在客户端生成 formData 时候, 应该将 file (文件)字段最后添加到 formData 中
const storage = multer.diskStorage({
destination(req, _file, cb) {
const chunkSaveDir = path.join(
req.app.get("config").uploadDir,
`${req.body.taskId}`,
);
if (!fs.existsSync(chunkSaveDir)) {
fs.mkdirSync(chunkSaveDir, { recursive: true });
}
cb(null, chunkSaveDir);
},
filename(req, _file, cb) {
const { taskId, index } = req.body;
const delimiter = req.app.get("config").delimiter;
cb(null, `${taskId}${delimiter}${index}`);
},
});
// 上传文件分片
app.post("/upload_chunk", multer({ storage }).single("file"), (req, res) => {
res.json({
success: true,
message: "ok",
results: {
body: req.body,
file: req.file,
},
});
});
// 合并文件分片
app.get("/merge_chunk", async (req, res) => {
const taskId = req.query.taskId;
const config = req.app.get("config");
const chunkSavePath = path.resolve(config.uploadDir, taskId);
if (!fs.existsSync(chunkSavePath)) {
return res.json({
success: false,
message: "file not exists",
results: null,
});
}
const chunks = fs
.readdirSync(chunkSavePath)
.map((item) => {
const index = item.split(config.delimiter).pop();
return {
chunkPath: path.resolve(chunkSavePath, item),
order: Number(index),
};
})
.sort((a, b) => a.order - b.order);
// 合并后的文件名和写入流
const targetFilename = `${Date.now()}_${taskId}`;
const mergedFileURL = `${config.host}/${targetFilename}`;
const mergedFilePath = path.resolve(config.uploadDir, targetFilename);
const writer = fs.createWriteStream(mergedFilePath);
// 合并文件分片
for (let i = 0; i < chunks.length; i++) {
const item = chunks[i];
const inputStream = fs.createReadStream(item.chunkPath);
try {
await stream.pipeline(inputStream, writer, { end: false });
} catch (e) {
writer.end();
throw err;
}
}
writer.end();
res.json({
success: true,
message: "ok",
results: mergedFileURL,
});
});
//////////////////////////////////////////
// global error handler
//////////////////////////////////////////
app.use((err, _req, res, _next) => {
res.status(500).json({
success: false,
message: err.message,
results: null,
});
});
app.listen(port, () => console.log(`>>> Server started on: ${host}`));
json
{
"name": "file-uploader-server",
"private": true,
"version": "0.0.0",
"type": "module",
"scripts": {
"dev": "nodemon ./server.js",
"start": "node ./server.js"
},
"dependencies": {
"cors": "^2.8.5",
"express": "5.0.1",
"multer": "1.4.5-lts.1"
},
"devDependencies": {
"nodemon": "^3.1.9"
}
}
客户端实现
txt
.
├── README.md
├── index.html
├── package.json
├── pnpm-lock.yaml
├── public
│ └── vite.svg
├── src
│ ├── App.vue
│ ├── main.js
│ └── tools
│ ├── async-queue.js
│ └── fileUploader.js
└── vite.config.js
vue
<template>
<input type="file" @change="handleChange" />
</template>
<script setup>
import axios from "axios";
import { createFileUploader } from "./tools/fileUploader";
const http = axios.create({
baseURL: "http://localhost:3000",
});
// 发送上传分片请求
async function uploadChunk(chunk) {
const formData = new FormData();
formData.append("taskId", chunk.taskId);
formData.append("index", chunk.index);
formData.append("count", chunk.count);
formData.append("file", chunk.blob);
const response = await http.request({
url: "/upload_chunk",
method: "POST",
data: formData,
headers: {
"Content-Type": "multipart/form-data",
},
});
return response;
}
// 上传完成, 发送合并分片请求
async function mergeChunk(taskId) {
console.log("taskId:", taskId);
const response = await http.get(`/merge_chunk?taskId=${taskId}`);
console.log(response.data);
}
// 发送 & 上传
function handleChange(e) {
const [file] = e.target.files;
const uploader = createFileUploader(file, {
workers: 5,
// chunkSize: Math.pow(1024, 2) * 10, // default value is 10MB
});
uploader.on("error", (err) => console.error("Error:", err));
uploader.on("completed", (taskId) => {
console.log("文件上传完成, 发送合并文件请求");
mergeChunk(taskId);
});
// start upload chunks
uploader.upload(uploadChunk);
}
</script>
vue
<template>
<input type="file" @change="handleChange" />
</template>
<script setup>
import axios from "axios";
import { createFileUploader } from "./tools/fileUploader";
const http = axios.create({
baseURL: "http://localhost:3000",
});
// 发送上传分片请求
async function uploadChunk(chunk) {
const formData = new FormData();
formData.append("taskId", chunk.taskId);
formData.append("index", chunk.index);
formData.append("count", chunk.count);
formData.append("file", chunk.blob);
const response = await http.request({
url: "/upload_chunk",
method: "POST",
data: formData,
headers: {
"Content-Type": "multipart/form-data",
},
});
return response;
}
// 上传完成, 发送合并分片请求
async function mergeChunk(taskId) {
console.log("taskId:", taskId);
const response = await http.get(`/merge_chunk?taskId=${taskId}`);
console.log(response.data);
}
// 发送 & 上传
function handleChange(e) {
const [file] = e.target.files;
const uploader = createFileUploader(file, {
workers: 5,
chunkSize: 1024 ** 2 * 5, // 5M
});
// 监听上传的一些事件
uploader.on("error", (err) => console.error("Error:", err));
uploader.on("completed", (taskId) => {
console.log("文件上传完成, 发送合并文件请求");
mergeChunk(taskId);
});
// 开始上传
uploader.upload(uploadChunk);
}
</script>
js
import mitt from "mitt";
import { v4 as uuidv4 } from "uuid";
import { AsyncQueue } from "./async-queue";
const extend = Object.assign;
const isObject = (val) => val !== null && typeof val === "object";
const isCallable = (fn) => typeof fn === "function";
// 切分大文件/执行上传任务
class FileUploader {
constructor(file, opts = {}) {
if (!isObject(opts)) {
throw new TypeError("[FileUploader]opts must be an object");
}
const defaultOptions = {
workers: 6,
chunkSize: 1024 ** 2 * 10, // 10MB
};
this.options = extend(defaultOptions, opts);
this.file = file;
this.taskId = this.taskIdGenerator();
this.eventBus = mitt();
this.taskQueue = new AsyncQueue({
workers: this.options.workers,
onError: (err) => this.eventBus.emit("error", err),
onCompleted: () => this.eventBus.emit("completed", this.taskId),
});
}
taskIdGenerator() {
const ext = this.file.name.split(".").pop();
const uuid = uuidv4();
return `${uuid}.${ext}`;
}
on(event, handler) {
this.eventBus.on(event, handler);
}
slice() {
const { chunkSize } = this.options;
const { file, taskId } = this;
const filesize = file.size;
const filetype = { type: file.type };
const chunkCount = Math.ceil(filesize / chunkSize);
let index = 1;
let start = 0;
let end = chunkSize;
const chunks = [];
while (index <= chunkCount) {
const chunk = file.slice(start, end, filetype);
// 切分文件, 将所有文件碎片放到一个数组中
chunks.push({
blob: chunk, // 二进制数据
count: chunkCount, // 当前任务总共只有多少份
index, // 当前任务是第几份
taskId, // 当前上传任务的id, 由于md5无法异步的进行, 计算大文件会比较慢, 所以用 uuid
});
start = end;
end += chunkSize;
index++;
}
return chunks;
}
upload(uploadTaskFunc) {
if (!isCallable(uploadTaskFunc)) {
throw new TypeError("[upload]paramater must be a function");
}
const chunks = this.slice();
const { taskQueue } = this;
for (let i = 0; i < chunks.length; i++) {
const item = chunks[i];
taskQueue.enqueue(() => uploadTaskFunc(item));
}
taskQueue.start();
}
}
export const createFileUploader = (file, opts) => new FileUploader(file, opts);
js
// 可控制并发的异步任务队列
import pLimit from "p-limit";
const isCallable = (fn) => typeof fn === "function";
export class AsyncQueue {
constructor({ workers, onError, onCompleted }) {
this.limit = pLimit(workers);
this.onError = onError;
this.onCompleted = onCompleted;
this.hasError = false;
this.isCompleted = false;
this.promises = [];
}
enqueue(task) {
if (!isCallable(task)) {
throw new TypeError("[enqueue] task must be a function");
}
const promiseTask = async () => {
try {
await task();
} catch (error) {
this.handleError(error);
}
};
this.promises.push(this.limit(promiseTask));
}
start() {
if (this.hasError || this.isCompleted) {
return;
}
// 执行所有任务,并在所有任务完成后检查是否完成
Promise.all(this.promises)
.then(() => {
if (!this.hasError && !this.isCompleted) {
this.onCompleted();
}
this.finalize();
})
.catch((err) => {
this.handleError(err);
});
}
handleError(error) {
this.hasError = true;
this.onError(error);
this.finalize();
}
finalize() {
this.isCompleted = true;
}
}