Skip to content

服务端实现

txt
.
├── server.js     # 服务端实现
├── uploads       # 上传文件存放目录
└── package.json  # 依赖管理文件
js
import fs from "node:fs";
import path from "node:path";
import stream from "node:stream/promises";
import cors from "cors";
import express from "express";
import multer from "multer";

//////////////////////////////////////////
// setup express app
//////////////////////////////////////////
const app = express();
const port = 3000;
const host = `http://localhost:${port}`;
const config = Object.freeze({
  host,
  delimiter: "@", // 保存/合并文件分片文件名的分隔符
  uploadDir: "uploads", // 文件上传后保存目录
});
app.set("config", config);
app.use(cors());
app.use(express.static(config.uploadDir));

// 文件上传中间件 multer 中间的 storage 配置
// 注意: 由于 multer 中间件是按照客户端的 formData 字段的顺序
// 来解析 req.body 的字段, 所以为了确保能够正确获取 req.body 的其他字段(除文件外),
// 在客户端生成 formData 时候, 应该将 file (文件)字段最后添加到 formData 中
const storage = multer.diskStorage({
  destination(req, _file, cb) {
    const chunkSaveDir = path.join(
      req.app.get("config").uploadDir,
      `${req.body.taskId}`,
    );
    if (!fs.existsSync(chunkSaveDir)) {
      fs.mkdirSync(chunkSaveDir, { recursive: true });
    }
    cb(null, chunkSaveDir);
  },
  filename(req, _file, cb) {
    const { taskId, index } = req.body;
    const delimiter = req.app.get("config").delimiter;
    cb(null, `${taskId}${delimiter}${index}`);
  },
});

// 上传文件分片
app.post("/upload_chunk", multer({ storage }).single("file"), (req, res) => {
  res.json({
    success: true,
    message: "ok",
    results: {
      body: req.body,
      file: req.file,
    },
  });
});

// 合并文件分片
app.get("/merge_chunk", async (req, res) => {
  const taskId = req.query.taskId;
  const config = req.app.get("config");
  const chunkSavePath = path.resolve(config.uploadDir, taskId);
  if (!fs.existsSync(chunkSavePath)) {
    return res.json({
      success: false,
      message: "file not exists",
      results: null,
    });
  }

  const chunks = fs
    .readdirSync(chunkSavePath)
    .map((item) => {
      const index = item.split(config.delimiter).pop();
      return {
        chunkPath: path.resolve(chunkSavePath, item),
        order: Number(index),
      };
    })
    .sort((a, b) => a.order - b.order);

  // 合并后的文件名和写入流
  const targetFilename = `${Date.now()}_${taskId}`;
  const mergedFileURL = `${config.host}/${targetFilename}`;
  const mergedFilePath = path.resolve(config.uploadDir, targetFilename);
  const writer = fs.createWriteStream(mergedFilePath);

  // 合并文件分片
  for (let i = 0; i < chunks.length; i++) {
    const item = chunks[i];
    const inputStream = fs.createReadStream(item.chunkPath);
    try {
      await stream.pipeline(inputStream, writer, { end: false });
    } catch (e) {
      writer.end();
      throw err;
    }
  }
  writer.end();

  res.json({
    success: true,
    message: "ok",
    results: mergedFileURL,
  });
});

//////////////////////////////////////////
// global error handler
//////////////////////////////////////////
app.use((err, _req, res, _next) => {
  res.status(500).json({
    success: false,
    message: err.message,
    results: null,
  });
});

app.listen(port, () => console.log(`>>> Server started on: ${host}`));
json
{
  "name": "file-uploader-server",
  "private": true,
  "version": "0.0.0",
  "type": "module",
  "scripts": {
    "dev": "nodemon ./server.js",
    "start": "node ./server.js"
  },
  "dependencies": {
    "cors": "^2.8.5",
    "express": "5.0.1",
    "multer": "1.4.5-lts.1"
  },
  "devDependencies": {
    "nodemon": "^3.1.9"
  }
}

客户端实现

txt
.
├── README.md
├── index.html
├── package.json
├── pnpm-lock.yaml
├── public
│   └── vite.svg
├── src
│   ├── App.vue
│   ├── main.js
│   └── tools
│       ├── async-queue.js
│       └── fileUploader.js
└── vite.config.js
vue
<template>
  <input type="file" @change="handleChange" />
</template>

<script setup>
import axios from "axios";
import { createFileUploader } from "./tools/fileUploader";

const http = axios.create({
  baseURL: "http://localhost:3000",
});

// 发送上传分片请求
async function uploadChunk(chunk) {
  const formData = new FormData();
  formData.append("taskId", chunk.taskId);
  formData.append("index", chunk.index);
  formData.append("count", chunk.count);
  formData.append("file", chunk.blob);

  const response = await http.request({
    url: "/upload_chunk",
    method: "POST",
    data: formData,
    headers: {
      "Content-Type": "multipart/form-data",
    },
  });

  return response;
}

// 上传完成, 发送合并分片请求
async function mergeChunk(taskId) {
  console.log("taskId:", taskId);
  const response = await http.get(`/merge_chunk?taskId=${taskId}`);
  console.log(response.data);
}

// 发送 & 上传
function handleChange(e) {
  const [file] = e.target.files;
  const uploader = createFileUploader(file, {
    workers: 5,
    // chunkSize: Math.pow(1024, 2) * 10, // default value is 10MB
  });

  uploader.on("error", (err) => console.error("Error:", err));
  uploader.on("completed", (taskId) => {
    console.log("文件上传完成, 发送合并文件请求");
    mergeChunk(taskId);
  });

  // start upload chunks
  uploader.upload(uploadChunk);
}
</script>
vue
<template>
  <input type="file" @change="handleChange" />
</template>

<script setup>
import axios from "axios";
import { createFileUploader } from "./tools/fileUploader";

const http = axios.create({
  baseURL: "http://localhost:3000",
});

// 发送上传分片请求
async function uploadChunk(chunk) {
  const formData = new FormData();
  formData.append("taskId", chunk.taskId);
  formData.append("index", chunk.index);
  formData.append("count", chunk.count);
  formData.append("file", chunk.blob);

  const response = await http.request({
    url: "/upload_chunk",
    method: "POST",
    data: formData,
    headers: {
      "Content-Type": "multipart/form-data",
    },
  });

  return response;
}

// 上传完成, 发送合并分片请求
async function mergeChunk(taskId) {
  console.log("taskId:", taskId);
  const response = await http.get(`/merge_chunk?taskId=${taskId}`);
  console.log(response.data);
}

// 发送 & 上传
function handleChange(e) {
  const [file] = e.target.files;
  const uploader = createFileUploader(file, {
    workers: 5,
    chunkSize: 1024 ** 2 * 5, // 5M
  });

  // 监听上传的一些事件
  uploader.on("error", (err) => console.error("Error:", err));
  uploader.on("completed", (taskId) => {
    console.log("文件上传完成, 发送合并文件请求");
    mergeChunk(taskId);
  });

  // 开始上传
  uploader.upload(uploadChunk);
}
</script>
js
import mitt from "mitt";
import { v4 as uuidv4 } from "uuid";
import { AsyncQueue } from "./async-queue";

const extend = Object.assign;
const isObject = (val) => val !== null && typeof val === "object";
const isCallable = (fn) => typeof fn === "function";

// 切分大文件/执行上传任务
class FileUploader {
  constructor(file, opts = {}) {
    if (!isObject(opts)) {
      throw new TypeError("[FileUploader]opts must be an object");
    }
    const defaultOptions = {
      workers: 6,
      chunkSize: 1024 ** 2 * 10, // 10MB
    };

    this.options = extend(defaultOptions, opts);
    this.file = file;
    this.taskId = this.taskIdGenerator();
    this.eventBus = mitt();
    this.taskQueue = new AsyncQueue({
      workers: this.options.workers,
      onError: (err) => this.eventBus.emit("error", err),
      onCompleted: () => this.eventBus.emit("completed", this.taskId),
    });
  }

  taskIdGenerator() {
    const ext = this.file.name.split(".").pop();
    const uuid = uuidv4();
    return `${uuid}.${ext}`;
  }

  on(event, handler) {
    this.eventBus.on(event, handler);
  }

  slice() {
    const { chunkSize } = this.options;
    const { file, taskId } = this;
    const filesize = file.size;
    const filetype = { type: file.type };
    const chunkCount = Math.ceil(filesize / chunkSize);

    let index = 1;
    let start = 0;
    let end = chunkSize;
    const chunks = [];
    while (index <= chunkCount) {
      const chunk = file.slice(start, end, filetype);
      // 切分文件, 将所有文件碎片放到一个数组中
      chunks.push({
        blob: chunk, // 二进制数据
        count: chunkCount, // 当前任务总共只有多少份
        index, // 当前任务是第几份
        taskId, // 当前上传任务的id, 由于md5无法异步的进行, 计算大文件会比较慢, 所以用 uuid
      });
      start = end;
      end += chunkSize;
      index++;
    }
    return chunks;
  }

  upload(uploadTaskFunc) {
    if (!isCallable(uploadTaskFunc)) {
      throw new TypeError("[upload]paramater must be a function");
    }
    const chunks = this.slice();
    const { taskQueue } = this;

    for (let i = 0; i < chunks.length; i++) {
      const item = chunks[i];
      taskQueue.enqueue(() => uploadTaskFunc(item));
    }

    taskQueue.start();
  }
}

export const createFileUploader = (file, opts) => new FileUploader(file, opts);
js
// 可控制并发的异步任务队列
import pLimit from "p-limit";
const isCallable = (fn) => typeof fn === "function";

export class AsyncQueue {
  constructor({ workers, onError, onCompleted }) {
    this.limit = pLimit(workers);
    this.onError = onError;
    this.onCompleted = onCompleted;
    this.hasError = false;
    this.isCompleted = false;
    this.promises = [];
  }

  enqueue(task) {
    if (!isCallable(task)) {
      throw new TypeError("[enqueue] task must be a function");
    }

    const promiseTask = async () => {
      try {
        await task();
      } catch (error) {
        this.handleError(error);
      }
    };

    this.promises.push(this.limit(promiseTask));
  }

  start() {
    if (this.hasError || this.isCompleted) {
      return;
    }

    // 执行所有任务,并在所有任务完成后检查是否完成
    Promise.all(this.promises)
      .then(() => {
        if (!this.hasError && !this.isCompleted) {
          this.onCompleted();
        }
        this.finalize();
      })
      .catch((err) => {
        this.handleError(err);
      });
  }

  handleError(error) {
    this.hasError = true;
    this.onError(error);
    this.finalize();
  }

  finalize() {
    this.isCompleted = true;
  }
}

Released under the MIT License.