1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
| 分析报告:S3 分片上传状态存储方案
🔍 当前问题诊断
1. 现状分析
当前实现 (crates/yh-file-manager-sev-s3/src/vfs_s3.rs):
- 使用 yh-fast-kv-storage-hub 存储分片上传状态
- Key 格式: s3:multipart:{upload_id}
- TTL: 86400 秒(24小时)
核心痛点:
1 // vfs_s3.rs:267 - list_multipart_uploads 返回空列表
2 async fn list_multipart_uploads(&self, ...) -> S3Result<...> {
3 // 注意:这里由于 KV 后端不支持全量扫描,暂时返回空列表以保持 API 兼容。
4 let uploads = Vec::new();
5 Ok(s3s::S3Response::new(ListMultipartUploadsOutput { ... }))
6 }
7
8 // vfs_s3.rs:244 - cleanup_expired_multipart_uploads 返回 0
9 pub async fn cleanup_expired_multipart_uploads(&self, _seconds: u64) -> usize {
10 // 集中清理逻辑由 yh-task-registry 负责,这里保持接口兼容
11 0
12 }
2. yh-fast-kv-storage-hub 的限制
从 trait.rs 可以看出,CacheBackend 接口没有提供:
- ❌ keys() - 全量扫描所有键
- ❌ scan() - 模式匹配扫描
- ❌ iterate() - 迭代器
sql.rs 后端只支持:
- ✅ 精确查询 (get, set, del)
- ✅ Hash 操作 (hget, hset)
- ✅ TTL 管理
根本原因: SQL 后端的设计目标是高性能 KV 缓存,而非全量扫描场景。
---
💡 解决方案对比
方案 A: 改回内置 DashMap
优点:
- ✅ 支持全量扫描 (map.iter())
- ✅ 高性能(内存操作)
- ✅ 简单直接
缺点:
- ❌ 进程重启后数据丢失(致命问题)
- ❌ 无法利用现有的容灾机制(WAL、Journal、Task Registry)
- ❌ 需要重新实现持久化逻辑
---
方案 B: 增强 yh-fast-kv-storage-hub(推荐)
优点:
- ✅ 保持架构统一
- ✅ 支持多种后端(SQL/Redis/DashMap)
- ✅ 可以复用现有的清理机制
实现策略:
1 // 1. 在 CacheBackend trait 中添加扫描接口
2 #[async_trait::async_trait]
3 pub trait CacheBackend: Send + Sync {
4 // ... 现有方法 ...
5
6 /// 按前缀扫描键(可选实现) / Scan keys by prefix (optional implementation)
7 async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<String>, CacheError> {
8 // 默认实现:返回空列表
9 Ok(Vec::new())
10 }
11 }
12
13 // 2. SqlBackend 实现 scan_prefix
14 impl CacheBackend for SqlBackend {
15 async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<String>, CacheError> {
16 let db_backend = self.db.get_database_backend();
17 let sql = format!(
18 "SELECT k FROM {} WHERE k LIKE $1 LIMIT $2",
19 self.table_name
20 );
21 let pattern = format!("{}%", prefix);
22
23 let rows = self.db.query_all(Statement::from_sql_and_values(
24 db_backend, &sql, vec![pattern.into(), limit.into()]
25 )).await?;
26
27 let mut keys = Vec::new();
28 for row in rows {
29 if let Some(k) = row.try_get_by_index::<String>(0).ok() {
30 keys.push(k);
31 }
32 }
33 Ok(keys)
34 }
35 }
---
方案 C: 混合方案(最佳实践)
架构设计:
1 ┌─────────────────────────────────────────────────────────────┐
2 │ S3 分片上传状态管理 │
3 ├─────────────────────────────────────────────────────────────┤
4 │ │
5 │ 1. 内存层: DashMap (活跃分片) │
6 │ └── 快速访问,零延迟 │
7 │ │
8 │ 2. 持久层: yh-fast-kv-storage-hub (SQL) │
9 │ └── 进程重启恢复 │
10 │ │
11 │ 3. 清理层: yh-task-registry + yh-journal-log │
12 │ └── 定时清理 + 审计日志 │
13 │ │
14 │ 4. 容灾层: yh-filemanager-vfs-storage-hub (WAL) │
15 │ └── 物理文件保护 │
16 │ │
17 └─────────────────────────────────────────────────────────────┘
实现细节:
##### 1. 双层存储结构
1 // vfs_s3.rs
2 use dashmap::DashMap;
3 use std::sync::Arc;
4
5 #[derive(Clone)]
6 pub struct VfsS3Service {
7 db: Arc<DatabaseConnection>,
8 hub: Arc<VfsStorageHub>,
9 journal_logger: Arc<yh_journal_log::JournalLogger>,
10
11 // 新增:内存层缓存
12 active_uploads: Arc<DashMap<String, MultipartUploadState>>,
13 }
14
15 impl VfsS3Service {
16 pub fn new(...) -> Self {
17 Self {
18 db,
19 hub,
20 journal_logger,
21 active_uploads: Arc::new(DashMap::new()),
22 }
23 }
24
25 // 创建分片上传:同时写入内存和持久层
26 async fn create_multipart_upload(&self, ...) -> S3Result<...> {
27 let upload_id = uuid::Uuid::new_v4().to_string();
28 let state = MultipartUploadState::new(...);
29
30 // 1. 写入内存
31 self.active_uploads.insert(upload_id.clone(), state.clone());
32
33 // 2. 写入持久层
34 self.save_multipart_state(&state).await?;
35
36 // 3. 记录审计日志
37 self.journal_logger.log_event(
38 user_id,
39 yh_journal_log::types::JournalType::FileOperation,
40 "S3_MULTIPART_CREATE",
41 serde_json::json!({"upload_id": upload_id, "key": state.key})
42 ).await;
43
44 Ok(...)
45 }
46
47 // 上传分片:只更新内存,定期同步
48 async fn upload_part(&self, ...) -> S3Result<...> {
49 let mut state = self.get_multipart_state(&input.upload_id).await?;
50
51 // 更新分片信息
52 state.parts.insert(...);
53
54 // 1. 更新内存
55 self.active_uploads.insert(input.upload_id.clone(), state.clone());
56
57 // 2. 延迟写入持久层(每 N 个分片或超时)
58 if state.parts.len() % 10 == 0 {
59 self.save_multipart_state(&state).await?;
60 }
61
62 Ok(...)
63 }
64
65 // 完成分片上传:清理所有层
66 async fn complete_multipart_upload(&self, ...) -> S3Result<...> {
67 let state = self.get_multipart_state(&input.upload_id).await?;
68
69 // 1. 合并文件
70 // ... (现有逻辑)
71
72 // 2. 清理临时文件
73 for part_num in state.parts.keys() {
74 let _ = engine.delete(&format!("/.multipart/{}/{}", input.upload_id, part_num)).await;
75 }
76
77 // 3. 清理内存
78 self.active_uploads.remove(&input.upload_id);
79
80 // 4. 清理持久层
81 self.delete_multipart_state(&input.upload_id).await?;
82
83 // 5. 记录审计日志
84 self.journal_logger.log_event(
85 user_id,
86 yh_journal_log::types::JournalType::FileOperation,
87 "S3_MULTIPART_COMPLETE",
88 serde_json::json!({"upload_id": input.upload_id, "key": state.key})
89 ).await;
90
91 Ok(...)
92 }
93 }
##### 2. 进程重启恢复
1 // 在 VfsS3Service::new 中添加恢复逻辑
2 impl VfsS3Service {
3 pub fn new(...) -> Self {
4 let active_uploads = Arc::new(DashMap::new());
5
6 // 启动时恢复活跃分片
7 let active_uploads_clone = active_uploads.clone();
8 let db_clone = db.clone();
9 tokio::spawn(async move {
10 Self::recover_active_uploads(db_clone, active_uploads_clone).await;
11 });
12
13 Self { db, hub, journal_logger, active_uploads }
14 }
15
16 async fn recover_active_uploads(
17 db: Arc<DatabaseConnection>,
18 active_uploads: Arc<DashMap<String, MultipartUploadState>>,
19 ) {
20 // 从 SQL 后端扫描所有 s3:multipart:* 键
21 // 需要实现 scan_prefix 接口
22 if let Ok(manager) = yh_fast_kv_storage_hub::api::cache_manager::CacheManager::get().await {
23 let backend = manager.get_backend().await;
24
25 // 扫描所有分片上传状态
26 let keys = backend.scan_prefix("s3:multipart:", 1000).await.unwrap_or_default();
27
28 for key in keys {
29 if let Some(state) = backend.get_json::<MultipartUploadState>(&key).await.ok().flatten() {
30 // 检查是否过期
31 let age = SystemTime::now().duration_since(state.created_at).unwrap_or_default();
32 if age.as_secs() < 86400 {
33 // 恢复到内存
34 active_uploads.insert(state.upload_id.clone(), state);
35 yh_console_log::yhlog("info", &format!("Recovered multipart upload: {}", state.upload_id));
36 } else {
37 // 清理过期状态
38 let _ = backend.del(&key).await;
39 yh_console_log::yhlog("warn", &format!("Cleaned expired multipart upload: {}", state.upload_id));
40 }
41 }
42 }
43 }
44 }
45 }
##### 3. 定时清理机制
1 // vfs/hub.rs
2 impl VfsStorageHub {
3 pub async fn cleanup_all_s3_multiparts(&self, max_age_secs: u64) -> usize {
4 let mut cleaned = 0;
5
6 if let Ok(manager) = yh_fast_kv_storage_hub::api::cache_manager::CacheManager::get().await {
7 let backend = manager.get_backend().await;
8
9 // 扫描所有分片上传状态
10 let keys = backend.scan_prefix("s3:multipart:", 1000).await.unwrap_or_default();
11
12 for key in keys {
13 if let Some(state) = backend.get_json::<MultipartUploadState>(&key).await.ok().flatten() {
14 let age = SystemTime::now().duration_since(state.created_at).unwrap_or_default();
15
16 if age.as_secs() > max_age_secs {
17 // 1. 清理临时文件
18 if let Ok(engine) = self.create_scoped_engine(...) {
19 for part_num in state.parts.keys() {
20 let _ = engine.delete(&format!("/.multipart/{}/{}", state.upload_id, part_num)).await;
21 }
22 let _ = engine.delete(&format!("/.multipart/{}", state.upload_id)).await;
23 }
24
25 // 2. 清理持久层
26 let _ = backend.del(&key).await;
27
28 // 3. 记录审计日志
29 // self.journal_logger.log_event(...).await;
30
31 cleaned += 1;
32 }
33 }
34 }
35 }
36
37 cleaned
38 }
39 }
##### 4. 内存保护机制
1 // 防止内存爆炸
2 impl VfsS3Service {
3 const MAX_ACTIVE_UPLOADS: usize = 1000;
4
5 async fn create_multipart_upload(&self, ...) -> S3Result<...> {
6 // 检查内存限制
7 if self.active_uploads.len() >= Self::MAX_ACTIVE_UPLOADS {
8 // 触发清理
9 self.evict_inactive_uploads().await;
10 }
11
12 // ... 创建逻辑
13 }
14
15 async fn evict_inactive_uploads(&self) {
16 let now = SystemTime::now();
17 let mut to_evict = Vec::new();
18
19 for entry in self.active_uploads.iter() {
20 let age = now.duration_since(entry.value().created_at).unwrap_or_default();
21 if age.as_secs() > 3600 { // 1小时未活跃
22 to_evict.push(entry.key().clone());
23 }
24 }
25
26 for key in to_evict {
27 self.active_uploads.remove(&key);
28 // 注意:持久层保留,可按需恢复
29 }
30 }
31 }
---
📋 推荐实施方案
阶段 1: 增强 yh-fast-kv-storage-hub
1. 在 trait.rs 添加 scan_prefix() 接口
2. SqlBackend 实现 scan_prefix()(使用 LIKE 查询)
3. DashMap 实现全量扫描(已有能力)
4. Redis 实现使用 SCAN 命令
工作量: 2-3 小时
阶段 2: 实现混合存储方案
1. 在 VfsS3Service 添加 DashMap 层
2. 实现双层写入逻辑
3. 实现启动恢复逻辑
4. 实现内存保护机制
工作量: 4-6 小时
阶段 3: 集成容灾机制
1. 使用 yh-journal-log 记录分片操作
2. 实现 cleanup_all_s3_multiparts 完整逻辑
3. 添加 WAL 保护(可选,用于关键操作)
工作量: 2-3 小时
---
🎯 最终建议
推荐方案 C(混合方案),理由:
1. 性能最优: 活跃分片在内存,零延迟访问
2. 容灾完整: 进程重启可恢复,断电不丢失
3. 架构统一: 复用现有模块(Task Registry、Journal、WAL)
4. 内存安全: 有 LRU 淘汰机制,防止爆炸
5. 易于维护: 清晰的分层设计
关键指标:
- ✅ 支持全量扫描(通过 scan_prefix)
- ✅ 进程重启恢复(通过持久层)
- ✅ 断电保护(通过 WAL + Journal)
- ✅ 内存可控(通过 LRU 淘汰)
- ✅ 自动清理(通过 Task Registry)
实施优先级:
1. 高优先级: 阶段 1(增强 KV 接口)
2. 中优先级: 阶段 2(混合存储)
3. 低优先级: 阶段 3(容灾集成,可分步实施)
✦ 这个方案既解决了当前的性能和功能问题,又保持了系统的整体架构一致性。
|