msg_tool\scripts\kirikiri\archive\xp3pack/
writer.rs1use super::archive::*;
2use super::consts::*;
3use super::reader::*;
4use super::segmenter::*;
5use crate::ext::io::*;
6use crate::ext::mutex::*;
7use crate::scripts::base::*;
8use crate::types::*;
9use crate::utils::encoding::*;
10use crate::utils::threadpool::ThreadPool;
11use anyhow::Result;
12use sha2::{Digest, Sha256};
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::io::{Seek, Write};
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17
18#[derive(Clone)]
19struct WrittenSegment {
20 is_compressed: bool,
21 start: u64,
22 original_size: u64,
23 archived_size: u64,
24}
25
26#[derive(Default)]
27struct Stats {
28 total_original_size: AtomicU64,
29 final_archive_size: AtomicU64,
30 total_segments: AtomicUsize,
31 unique_segments: AtomicUsize,
32 deduplication_savings: AtomicU64,
33}
34
35impl std::fmt::Display for Stats {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 let total_original_size = self
38 .total_original_size
39 .load(std::sync::atomic::Ordering::Relaxed);
40 let final_archive_size = self
41 .final_archive_size
42 .load(std::sync::atomic::Ordering::Relaxed);
43 let total_segments = self
44 .total_segments
45 .load(std::sync::atomic::Ordering::Relaxed);
46 let unique_segments = self
47 .unique_segments
48 .load(std::sync::atomic::Ordering::Relaxed);
49 let deduplication_savings = self
50 .deduplication_savings
51 .load(std::sync::atomic::Ordering::Relaxed);
52 write!(
53 f,
54 "Total Original Size: {} bytes\nFinal Archive Size: {} bytes\nTotal Segments: {}\nUnique Segments: {}\nDeduplication Savings: {} bytes",
55 total_original_size,
56 final_archive_size,
57 total_segments,
58 unique_segments,
59 deduplication_savings
60 )
61 }
62}
63
64pub struct Xp3ArchiveWriter<T: Write + Seek> {
65 file: Arc<Mutex<T>>,
66 segments: Arc<Mutex<HashMap<[u8; 32], WrittenSegment>>>,
67 items: Arc<Mutex<BTreeMap<String, ArchiveItem>>>,
68 runner: ThreadPool<Result<()>>,
69 compress_files: bool,
70 compress_index: bool,
71 zlib_compression_level: u32,
72 segmenter: Option<Arc<Box<dyn Segmenter + Send + Sync>>>,
73 stats: Arc<Stats>,
74 compress_workers: usize,
75 processing_segments: Arc<Mutex<HashSet<[u8; 32]>>>,
76 use_zstd: bool,
77 zstd_compression_level: i32,
78 no_adler: bool,
79}
80
81impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
82 pub fn new(filename: &str, files: &[&str], config: &ExtraConfig) -> Result<Self> {
83 let file = std::fs::File::create(filename)?;
84 let mut file = std::io::BufWriter::new(file);
85 let mut items = BTreeMap::new();
86 for file in files {
87 let item = ArchiveItem {
88 name: file.to_string(),
89 file_hash: 0,
90 original_size: 0,
91 archived_size: 0,
92 segments: Vec::new(),
93 };
94 items.insert(file.to_string(), item);
95 }
96 let segmenter = create_segmenter(config.xp3_segmenter).map(|s| Arc::new(s));
97 file.write_all(XP3_MAGIC)?;
98 file.write_u64(0)?; Ok(Self {
100 file: Arc::new(Mutex::new(file)),
101 segments: Arc::new(Mutex::new(HashMap::new())),
102 items: Arc::new(Mutex::new(items)),
103 runner: ThreadPool::new(
104 if config.xp3_segmenter.is_none() {
105 1
106 } else {
107 config.xp3_pack_workers.max(1)
108 },
109 Some("xp3-writer"),
110 false,
111 )?,
112 compress_files: config.xp3_compress_files,
113 compress_index: config.xp3_compress_index,
114 zlib_compression_level: config.zlib_compression_level,
115 segmenter,
116 stats: Arc::new(Stats::default()),
117 compress_workers: config.xp3_compress_workers.max(1),
118 processing_segments: Arc::new(Mutex::new(HashSet::new())),
119 use_zstd: config.xp3_zstd,
120 zstd_compression_level: config.zstd_compression_level,
121 no_adler: config.xp3_no_adler,
122 })
123 }
124}
125
126struct Writer<'a> {
127 inner: Box<dyn Write + 'a>,
128 mem: MemWriter,
129}
130
131impl std::fmt::Debug for Writer<'_> {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("Writer").field("mem", &self.mem).finish()
134 }
135}
136
137impl<'a> Write for Writer<'a> {
138 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
139 self.mem.write(buf)
140 }
141
142 fn flush(&mut self) -> std::io::Result<()> {
143 self.mem.flush()
144 }
145}
146
147impl<'a> Seek for Writer<'a> {
148 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
149 self.mem.seek(pos)
150 }
151
152 fn stream_position(&mut self) -> std::io::Result<u64> {
153 self.mem.stream_position()
154 }
155
156 fn rewind(&mut self) -> std::io::Result<()> {
157 self.mem.rewind()
158 }
159}
160
161impl<'a> Drop for Writer<'a> {
162 fn drop(&mut self) {
163 let _ = self.inner.write_all(&self.mem.data);
164 let _ = self.inner.flush();
165 }
166}
167
168impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
169 fn new_file<'a>(
170 &'a mut self,
171 name: &str,
172 size: Option<u64>,
173 ) -> Result<Box<dyn WriteSeek + 'a>> {
174 let inner = self.new_file_non_seek(name, size)?;
175 Ok(Box::new(Writer {
176 inner,
177 mem: MemWriter::new(),
178 }))
179 }
180
181 fn new_file_non_seek<'a>(
182 &'a mut self,
183 name: &str,
184 _size: Option<u64>,
185 ) -> Result<Box<dyn Write + 'a>> {
186 if self.segmenter.is_none() {
187 self.runner.join();
188 }
189 for err in self.runner.take_results() {
190 err?;
191 }
192 let item = {
193 let items = self.items.lock_blocking();
194 Arc::new(Mutex::new(
195 items
196 .get(name)
197 .ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))?
198 .clone(),
199 ))
200 };
201 let (reader, writer) = std::io::pipe()?;
202 let reader = Reader::new(reader);
203 {
204 let file = self.file.clone();
205 let segments = self.segments.clone();
206 let items = self.items.clone();
207 let segmenter = self.segmenter.clone();
208 let stats = self.stats.clone();
209 let is_compressed = self.compress_files;
210 let zlib_compression_level = self.zlib_compression_level;
211 let workers = if self.segmenter.is_some() && is_compressed {
212 Some(Arc::new(ThreadPool::<Result<()>>::new(
213 self.compress_workers,
214 Some("xp3-compress"),
215 false,
216 )?))
217 } else {
218 None
219 };
220 let processiong_segments = self.processing_segments.clone();
221 let use_zstd = self.use_zstd;
222 let zstd_compression_level = self.zstd_compression_level;
223 self.runner.execute(
224 move |_| {
225 let mut reader = reader;
226 let mut offset_in_file = 0u64;
227 if let Some(segmenter) = segmenter {
228 for seg in segmenter.segment(&mut reader) {
229 let seg = seg?;
230 let hash: [u8; 32] = Sha256::digest(&seg).into();
231 let seg_offset_in_file = offset_in_file;
232 offset_in_file += seg.len() as u64;
233 let fseg = match {
234 let mut segments = segments.lock_blocking();
235 if let Some(old_seg) = segments.get(&hash) {
236 Err(old_seg.clone())
237 } else {
238 let seg_data = WrittenSegment {
239 is_compressed,
240 start: 0,
241 original_size: seg.len() as u64,
242 archived_size: seg.len() as u64,
243 };
244 segments.insert(hash, seg_data.clone());
245 Ok(seg_data)
246 }
247 } {
248 Ok(mut info) => {
249 if let Some(workers) = workers.as_ref() {
250 {
251 let mut processing =
252 processiong_segments.lock_blocking();
253 processing.insert(hash);
254 }
255 let file = file.clone();
256 let segments = segments.clone();
257 let stats = stats.clone();
258 let item = item.clone();
259 let processiong_segments = processiong_segments.clone();
260 workers.execute(
261 move |_| {
262 let data = {
263 if use_zstd {
264 let mut e = zstd::stream::Encoder::new(
265 Vec::new(),
266 zstd_compression_level,
267 )?;
268 e.write_all(&seg)?;
269 e.finish()?
270 } else {
271 let mut e = flate2::write::ZlibEncoder::new(
272 Vec::new(),
273 flate2::Compression::new(
274 zlib_compression_level,
275 ),
276 );
277 e.write_all(&seg)?;
278 e.finish()?
279 }
280 };
281 let mut file = file.lock_blocking();
282 let start = file.seek(std::io::SeekFrom::End(0))?;
283 file.write_all(&data)?;
284 info.start = start;
285 info.archived_size = data.len() as u64;
286 let stats = stats.clone();
287 stats.total_original_size.fetch_add(
288 info.original_size,
289 Ordering::Relaxed,
290 );
291 stats.final_archive_size.fetch_add(
292 info.archived_size,
293 Ordering::Relaxed,
294 );
295 stats
296 .total_segments
297 .fetch_add(1, Ordering::Relaxed);
298 stats
299 .unique_segments
300 .fetch_add(1, Ordering::Relaxed);
301 let mut segments = segments.lock_blocking();
302 segments.insert(hash, info.clone());
303 let ninfo = Segment {
304 is_compressed: info.is_compressed,
305 start: info.start,
306 offset_in_file: seg_offset_in_file,
307 original_size: info.original_size,
308 archived_size: info.archived_size,
309 };
310 let mut item = item.lock_blocking();
311 item.original_size += ninfo.original_size;
312 item.archived_size += ninfo.archived_size;
313 item.segments.push(ninfo);
314 let mut processing =
315 processiong_segments.lock_blocking();
316 processing.remove(&hash);
317 Ok(())
318 },
319 true,
320 )?;
321 None
322 } else {
323 {
324 let mut processing =
325 processiong_segments.lock_blocking();
326 processing.insert(hash);
327 }
328 let data = seg;
329 let mut file = file.lock_blocking();
330 let start = file.seek(std::io::SeekFrom::End(0))?;
331 file.write_all(&data)?;
332 info.start = start;
333 info.archived_size = data.len() as u64;
334 let stats = stats.clone();
335 stats
336 .total_original_size
337 .fetch_add(info.original_size, Ordering::Relaxed);
338 stats
339 .final_archive_size
340 .fetch_add(info.archived_size, Ordering::Relaxed);
341 stats.total_segments.fetch_add(1, Ordering::Relaxed);
342 stats.unique_segments.fetch_add(1, Ordering::Relaxed);
343 let mut segments = segments.lock_blocking();
344 segments.insert(hash, info.clone());
345 let ninfo = Segment {
346 is_compressed: info.is_compressed,
347 start: info.start,
348 offset_in_file: seg_offset_in_file,
349 original_size: info.original_size,
350 archived_size: info.archived_size,
351 };
352 {
353 let mut processing =
354 processiong_segments.lock_blocking();
355 processing.remove(&hash);
356 }
357 Some(ninfo)
358 }
359 }
360 Err(mut seg_info) => {
361 let mut need_update = false;
362 loop {
363 if {
364 let processing = processiong_segments.lock_blocking();
365 !processing.contains(&hash)
366 } {
367 break;
368 }
369 need_update = true;
370 std::thread::sleep(std::time::Duration::from_millis(10));
371 }
372 if need_update {
373 seg_info = {
374 let segments = segments.lock_blocking();
375 segments
376 .get(&hash)
377 .ok_or(anyhow::anyhow!(
378 "Failed to get latest segment info."
379 ))?
380 .clone()
381 };
382 }
383 let stats = stats.clone();
384 stats
385 .total_original_size
386 .fetch_add(seg_info.original_size, Ordering::Relaxed);
387 stats
388 .deduplication_savings
389 .fetch_add(seg_info.archived_size, Ordering::Relaxed);
390 stats.total_segments.fetch_add(1, Ordering::Relaxed);
391 let ninfo = Segment {
392 is_compressed: seg_info.is_compressed,
393 start: seg_info.start,
394 offset_in_file: seg_offset_in_file,
395 original_size: seg_info.original_size,
396 archived_size: seg_info.archived_size,
397 };
398 Some(ninfo)
399 }
400 };
401 if let Some(fseg) = fseg {
402 let mut item = item.lock_blocking();
403 item.original_size += fseg.original_size;
404 item.archived_size += fseg.archived_size;
405 item.segments.push(fseg);
406 }
407 }
408 } else {
409 let mut file = file.lock_blocking();
410 let start = file.seek(std::io::SeekFrom::End(0))?;
411 let size = {
412 let mut writer = if is_compressed {
413 if use_zstd {
414 let e = zstd::stream::Encoder::new(
415 &mut *file,
416 zstd_compression_level,
417 )?;
418 Box::new(e) as Box<dyn Write>
419 } else {
420 let e = flate2::write::ZlibEncoder::new(
421 &mut *file,
422 flate2::Compression::new(zlib_compression_level),
423 );
424 Box::new(e) as Box<dyn Write>
425 }
426 } else {
427 Box::new(&mut *file) as Box<dyn Write>
428 };
429 std::io::copy(&mut reader, &mut writer)?
430 };
431 let ninfo = Segment {
432 is_compressed,
433 start,
434 offset_in_file: 0,
435 original_size: size,
436 archived_size: if is_compressed {
437 file.stream_position()? - start
438 } else {
439 size
440 },
441 };
442 let mut item = item.lock_blocking();
443 item.original_size += ninfo.original_size;
444 item.archived_size += ninfo.archived_size;
445 let stats = stats.clone();
446 stats
447 .total_original_size
448 .fetch_add(ninfo.original_size, Ordering::Relaxed);
449 stats
450 .final_archive_size
451 .fetch_add(ninfo.archived_size, Ordering::Relaxed);
452 stats.total_segments.fetch_add(1, Ordering::Relaxed);
453 stats.unique_segments.fetch_add(1, Ordering::Relaxed);
454 item.segments.push(ninfo);
455 }
456 if let Some(workers) = workers {
457 workers.join();
458 for err in workers.take_results() {
459 err?;
460 }
461 }
462 let mut item = item.lock_blocking().to_owned();
463 item.file_hash = reader.into_checksum();
464 item.segments.sort_by_key(|s| s.offset_in_file);
465 let mut items = items.lock_blocking();
466 items.insert(item.name.clone(), item);
467 Ok(())
468 },
469 true,
470 )?;
471 }
472 Ok(Box::new(writer))
473 }
474
475 fn write_header(&mut self) -> Result<()> {
476 self.runner.join();
477 for err in self.runner.take_results() {
478 err?;
479 }
480 let mut file = self.file.lock_blocking();
481 let index_offset = file.seek(std::io::SeekFrom::End(0))?;
482 let mut index_data = MemWriter::new();
483 let items = self.items.lock_blocking();
484 for (_, item) in items.iter() {
485 let mut file_chunk = MemWriter::new();
486 let name = encode_string(Encoding::Utf16LE, &item.name, false)?;
487 let info_data_size = name.len() as u64 + 22;
488 file_chunk.write_all(CHUNK_INFO)?;
489 file_chunk.write_u64(info_data_size)?;
490 file_chunk.write_u32(0)?; file_chunk.write_u64(item.original_size)?;
492 file_chunk.write_u64(item.archived_size)?;
493 file_chunk.write_u16(name.len() as u16 / 2)?;
494 file_chunk.write_all(&name)?;
495 let segm_data_size = item.segments.len() as u64 * 28;
496 file_chunk.write_all(CHUNK_SEGM)?;
497 file_chunk.write_u64(segm_data_size)?;
498 for seg in &item.segments {
499 let flag = if seg.is_compressed {
500 TVP_XP3_SEGM_ENCODE_ZLIB
501 } else {
502 TVP_XP3_SEGM_ENCODE_RAW
503 };
504 file_chunk.write_u32(flag)?;
505 file_chunk.write_u64(seg.start)?;
506 file_chunk.write_u64(seg.original_size)?;
507 file_chunk.write_u64(seg.archived_size)?;
508 }
509 let adlr_data_size = 4;
510 file_chunk.write_all(CHUNK_ADLR)?;
511 file_chunk.write_u64(adlr_data_size)?;
512 if self.no_adler {
513 file_chunk.write_u32(0)?;
514 } else {
515 file_chunk.write_u32(item.file_hash)?;
516 }
517 index_data.write_all(CHUNK_FILE)?;
518 let file_chunk = file_chunk.into_inner();
519 index_data.write_u64(file_chunk.len() as u64)?;
520 index_data.write_all(&file_chunk)?;
521 }
522 let index_data = index_data.into_inner();
523 if self.compress_index {
524 let compressed_index = if self.use_zstd {
525 let mut e = zstd::stream::Encoder::new(Vec::new(), self.zstd_compression_level)?;
526 e.write_all(&index_data)?;
527 e.finish()?
528 } else {
529 let mut e = flate2::write::ZlibEncoder::new(
530 Vec::new(),
531 flate2::Compression::new(self.zlib_compression_level),
532 );
533 e.write_all(&index_data)?;
534 e.finish()?
535 };
536 file.write_u8(TVP_XP3_INDEX_ENCODE_ZLIB)?;
537 file.write_u64(compressed_index.len() as u64)?;
538 file.write_u64(index_data.len() as u64)?;
539 file.write_all(&compressed_index)?;
540 } else {
541 file.write_u8(TVP_XP3_INDEX_ENCODE_RAW)?;
542 file.write_u64(index_data.len() as u64)?;
543 file.write_all(&index_data)?;
544 }
545 file.write_u64_at(11, index_offset)?; file.flush()?;
547 eprintln!("XP3 Archive Statistics:\n{}", self.stats);
548 Ok(())
549 }
550}