package com.hyman.service;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@Service
public class ReactiveFileUploadService {
private final WebClient webClient;
public ReactiveFileUploadService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.build();
}
/**
* 使用 Reactive WebClient 上传文件到第三方接口
*/
public Mono<String> uploadFile(String filePath, String uploadUrl) {
return Mono.fromCallable(() -> {
// 读取文件
Path path = Paths.get(filePath);
byte[] fileBytes = Files.readAllBytes(path);
String fileName = path.getFileName().toString();
return new FileResource(fileBytes, fileName);
})
.flatMap(fileResource -> {
// 构建 multipart 请求
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.part("file", fileResource)
.filename(fileResource.getFilename())
.contentType(MediaType.APPLICATION_OCTET_STREAM);
return webClient.post()
.uri(uploadUrl)
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(bodyBuilder.build()))
.retrieve()
.bodyToMono(String.class);
})
.onErrorResume(e -> Mono.just("上传失败: " + e.getMessage()));
}
/**
* 自定义文件资源类
*/
private static class FileResource extends ByteArrayResource {
private final String filename;
public FileResource(byte[] byteArray, String filename) {
super(byteArray);
this.filename = filename;
}
@Override
public String getFilename() {
return filename;
}
}
}
========================
package com.hyman.service;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Service;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@Service
public class AdvancedReactiveFileUploadService {
private final WebClient webClient;
public AdvancedReactiveFileUploadService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("").build();
}
/**
* 流式上传大文件 - 避免内存溢出
*/
public Mono<UploadResult> uploadLargeFile(String filePath, String uploadUrl,
Map<String, String> headers,
Map<String, String> formParams) {
return Mono.fromCallable(() -> Paths.get(filePath))
.flatMap(path -> {
if (!Files.exists(path) || !Files.isRegularFile(path)) {
return Mono.just(UploadResult.error("文件不存在: " + filePath));
}
try {
long fileSize = Files.size(path);
String fileName = path.getFileName().toString();
// 构建 multipart 请求
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
// 添加表单参数
if (formParams != null) {
formParams.forEach((key, value) ->
bodyBuilder.part(key, value));
}
// 添加文件(使用 FileSystemResource 避免加载到内存)
bodyBuilder.part("file", new FileSystemResource(path))
.filename(fileName)
.contentType(MediaType.APPLICATION_OCTET_STREAM);
// 构建请求头
WebClient.RequestHeadersSpec<?> requestSpec = webClient.post()
.uri(uploadUrl)
.contentType(MediaType.MULTIPART_FORM_DATA);
// 添加自定义请求头
if (headers != null) {
headers.forEach(requestSpec::header);
}
return requestSpec
.body(BodyInserters.fromMultipartData(bodyBuilder.build()))
.retrieve()
.toEntity(String.class)
.map(response -> UploadResult.success(
response.getStatusCodeValue(),
response.getBody(),
fileName,
fileSize
))
.onErrorResume(e -> Mono.just(UploadResult.error(
"上传失败: " + e.getMessage())));
} catch (IOException e) {
return Mono.just(UploadResult.error("文件读取失败: " + e.getMessage()));
}
});
}
/**
* 上传多个文件
*/
public Mono<UploadResult> uploadMultipleFiles(String[] filePaths, String uploadUrl) {
return Flux.fromArray(filePaths)
.flatMap(filePath -> uploadLargeFile(filePath, uploadUrl, null, null))
.collectList()
.map(results -> {
long successCount = results.stream().filter(UploadResult::isSuccess).count();
return UploadResult.batchResult(results, successCount, results.size());
});
}
/**
* 使用 Flux<DataBuffer> 进行真正的流式上传
*/
public Mono<UploadResult> streamUpload(String filePath, String uploadUrl) {
return Mono.fromCallable(() -> Paths.get(filePath))
.flatMap(path -> {
try {
Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(
path,
DefaultDataBufferFactory.sharedInstance,
4096
);
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.asyncPart("file", dataBufferFlux, DataBuffer.class)
.filename(path.getFileName().toString());
return webClient.post()
.uri(uploadUrl)
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(bodyBuilder.build()))
.retrieve()
.bodyToMono(String.class)
.map(response -> UploadResult.success(200, response,
path.getFileName().toString(), Files.size(path)))
.onErrorResume(e -> Mono.just(UploadResult.error(e.getMessage())));
} catch (IOException e) {
return Mono.just(UploadResult.error("文件读取失败: " + e.getMessage()));
}
});
}
/**
* 上传结果类
*/
public static class UploadResult {
private boolean success;
private int statusCode;
private String message;
private String response;
private String fileName;
private long fileSize;
private String error;
// 静态工厂方法
public static UploadResult success(int statusCode, String response, String fileName, long fileSize) {
UploadResult result = new UploadResult();
result.success = true;
result.statusCode = statusCode;
result.response = response;
result.fileName = fileName;
result.fileSize = fileSize;
result.message = "上传成功";
return result;
}
public static UploadResult error(String error) {
UploadResult result = new UploadResult();
result.success = false;
result.error = error;
result.message = "上传失败";
return result;
}
public static UploadResult batchResult(java.util.List<UploadResult> results,
long successCount, long totalCount) {
UploadResult result = new UploadResult();
result.success = successCount == totalCount;
result.message = String.format("批量上传完成: 成功 %d/%d", successCount, totalCount);
return result;
}
// getter 方法
public boolean isSuccess() { return success; }
public int getStatusCode() { return statusCode; }
public String getMessage() { return message; }
public String getResponse() { return response; }
public String getFileName() { return fileName; }
public long getFileSize() { return fileSize; }
public String getError() { return error; }
@Override
public String toString() {
return String.format(
"UploadResult{success=%s, statusCode=%d, fileName='%s', fileSize=%d, message='%s'}",
success, statusCode, fileName, fileSize, message
);
}
}
}
===========