msg_tool\scripts\kirikiri\archive\xp3pack/
writer.rs

1use super::archive::*;
2use super::consts::*;
3use super::reader::*;
4use super::segmenter::*;
5use crate::ext::io::*;
6use crate::ext::mutex::*;
7use crate::scripts::base::*;
8use crate::types::*;
9use crate::utils::encoding::*;
10use crate::utils::threadpool::ThreadPool;
11use anyhow::Result;
12use sha2::{Digest, Sha256};
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::io::{Seek, Write};
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17
18#[derive(Clone)]
19struct WrittenSegment {
20    is_compressed: bool,
21    start: u64,
22    original_size: u64,
23    archived_size: u64,
24}
25
26#[derive(Default)]
27struct Stats {
28    total_original_size: AtomicU64,
29    final_archive_size: AtomicU64,
30    total_segments: AtomicUsize,
31    unique_segments: AtomicUsize,
32    deduplication_savings: AtomicU64,
33}
34
35impl std::fmt::Display for Stats {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        let total_original_size = self
38            .total_original_size
39            .load(std::sync::atomic::Ordering::Relaxed);
40        let final_archive_size = self
41            .final_archive_size
42            .load(std::sync::atomic::Ordering::Relaxed);
43        let total_segments = self
44            .total_segments
45            .load(std::sync::atomic::Ordering::Relaxed);
46        let unique_segments = self
47            .unique_segments
48            .load(std::sync::atomic::Ordering::Relaxed);
49        let deduplication_savings = self
50            .deduplication_savings
51            .load(std::sync::atomic::Ordering::Relaxed);
52        write!(
53            f,
54            "Total Original Size: {} bytes\nFinal Archive Size: {} bytes\nTotal Segments: {}\nUnique Segments: {}\nDeduplication Savings: {} bytes",
55            total_original_size,
56            final_archive_size,
57            total_segments,
58            unique_segments,
59            deduplication_savings
60        )
61    }
62}
63
64pub struct Xp3ArchiveWriter<T: Write + Seek> {
65    file: Arc<Mutex<T>>,
66    segments: Arc<Mutex<HashMap<[u8; 32], WrittenSegment>>>,
67    items: Arc<Mutex<BTreeMap<String, ArchiveItem>>>,
68    runner: ThreadPool<Result<()>>,
69    compress_files: bool,
70    compress_index: bool,
71    zlib_compression_level: u32,
72    segmenter: Option<Arc<Box<dyn Segmenter + Send + Sync>>>,
73    stats: Arc<Stats>,
74    compress_workers: usize,
75    processing_segments: Arc<Mutex<HashSet<[u8; 32]>>>,
76    use_zstd: bool,
77    zstd_compression_level: i32,
78    no_adler: bool,
79}
80
81impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
82    pub fn new(filename: &str, files: &[&str], config: &ExtraConfig) -> Result<Self> {
83        let file = std::fs::File::create(filename)?;
84        let mut file = std::io::BufWriter::new(file);
85        let mut items = BTreeMap::new();
86        for file in files {
87            let item = ArchiveItem {
88                name: file.to_string(),
89                file_hash: 0,
90                original_size: 0,
91                archived_size: 0,
92                segments: Vec::new(),
93            };
94            items.insert(file.to_string(), item);
95        }
96        let segmenter = create_segmenter(config.xp3_segmenter).map(|s| Arc::new(s));
97        file.write_all(XP3_MAGIC)?;
98        file.write_u64(0)?; // Placeholder for index offset
99        Ok(Self {
100            file: Arc::new(Mutex::new(file)),
101            segments: Arc::new(Mutex::new(HashMap::new())),
102            items: Arc::new(Mutex::new(items)),
103            runner: ThreadPool::new(
104                if config.xp3_segmenter.is_none() {
105                    1
106                } else {
107                    config.xp3_pack_workers.max(1)
108                },
109                Some("xp3-writer"),
110                false,
111            )?,
112            compress_files: config.xp3_compress_files,
113            compress_index: config.xp3_compress_index,
114            zlib_compression_level: config.zlib_compression_level,
115            segmenter,
116            stats: Arc::new(Stats::default()),
117            compress_workers: config.xp3_compress_workers.max(1),
118            processing_segments: Arc::new(Mutex::new(HashSet::new())),
119            use_zstd: config.xp3_zstd,
120            zstd_compression_level: config.zstd_compression_level,
121            no_adler: config.xp3_no_adler,
122        })
123    }
124}
125
126struct Writer<'a> {
127    inner: Box<dyn Write + 'a>,
128    mem: MemWriter,
129}
130
131impl std::fmt::Debug for Writer<'_> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("Writer").field("mem", &self.mem).finish()
134    }
135}
136
137impl<'a> Write for Writer<'a> {
138    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
139        self.mem.write(buf)
140    }
141
142    fn flush(&mut self) -> std::io::Result<()> {
143        self.mem.flush()
144    }
145}
146
147impl<'a> Seek for Writer<'a> {
148    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
149        self.mem.seek(pos)
150    }
151
152    fn stream_position(&mut self) -> std::io::Result<u64> {
153        self.mem.stream_position()
154    }
155
156    fn rewind(&mut self) -> std::io::Result<()> {
157        self.mem.rewind()
158    }
159}
160
161impl<'a> Drop for Writer<'a> {
162    fn drop(&mut self) {
163        let _ = self.inner.write_all(&self.mem.data);
164        let _ = self.inner.flush();
165    }
166}
167
168impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
169    fn new_file<'a>(
170        &'a mut self,
171        name: &str,
172        size: Option<u64>,
173    ) -> Result<Box<dyn WriteSeek + 'a>> {
174        let inner = self.new_file_non_seek(name, size)?;
175        Ok(Box::new(Writer {
176            inner,
177            mem: MemWriter::new(),
178        }))
179    }
180
181    fn new_file_non_seek<'a>(
182        &'a mut self,
183        name: &str,
184        _size: Option<u64>,
185    ) -> Result<Box<dyn Write + 'a>> {
186        if self.segmenter.is_none() {
187            self.runner.join();
188        }
189        for err in self.runner.take_results() {
190            err?;
191        }
192        let item = {
193            let items = self.items.lock_blocking();
194            Arc::new(Mutex::new(
195                items
196                    .get(name)
197                    .ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))?
198                    .clone(),
199            ))
200        };
201        let (reader, writer) = std::io::pipe()?;
202        let reader = Reader::new(reader);
203        {
204            let file = self.file.clone();
205            let segments = self.segments.clone();
206            let items = self.items.clone();
207            let segmenter = self.segmenter.clone();
208            let stats = self.stats.clone();
209            let is_compressed = self.compress_files;
210            let zlib_compression_level = self.zlib_compression_level;
211            let workers = if self.segmenter.is_some() && is_compressed {
212                Some(Arc::new(ThreadPool::<Result<()>>::new(
213                    self.compress_workers,
214                    Some("xp3-compress"),
215                    false,
216                )?))
217            } else {
218                None
219            };
220            let processiong_segments = self.processing_segments.clone();
221            let use_zstd = self.use_zstd;
222            let zstd_compression_level = self.zstd_compression_level;
223            self.runner.execute(
224                move |_| {
225                    let mut reader = reader;
226                    let mut offset_in_file = 0u64;
227                    if let Some(segmenter) = segmenter {
228                        for seg in segmenter.segment(&mut reader) {
229                            let seg = seg?;
230                            let hash: [u8; 32] = Sha256::digest(&seg).into();
231                            let seg_offset_in_file = offset_in_file;
232                            offset_in_file += seg.len() as u64;
233                            let fseg = match {
234                                let mut segments = segments.lock_blocking();
235                                if let Some(old_seg) = segments.get(&hash) {
236                                    Err(old_seg.clone())
237                                } else {
238                                    let seg_data = WrittenSegment {
239                                        is_compressed,
240                                        start: 0,
241                                        original_size: seg.len() as u64,
242                                        archived_size: seg.len() as u64,
243                                    };
244                                    segments.insert(hash, seg_data.clone());
245                                    Ok(seg_data)
246                                }
247                            } {
248                                Ok(mut info) => {
249                                    if let Some(workers) = workers.as_ref() {
250                                        {
251                                            let mut processing =
252                                                processiong_segments.lock_blocking();
253                                            processing.insert(hash);
254                                        }
255                                        let file = file.clone();
256                                        let segments = segments.clone();
257                                        let stats = stats.clone();
258                                        let item = item.clone();
259                                        let processiong_segments = processiong_segments.clone();
260                                        workers.execute(
261                                            move |_| {
262                                                let data = {
263                                                    if use_zstd {
264                                                        let mut e = zstd::stream::Encoder::new(
265                                                            Vec::new(),
266                                                            zstd_compression_level,
267                                                        )?;
268                                                        e.write_all(&seg)?;
269                                                        e.finish()?
270                                                    } else {
271                                                        let mut e = flate2::write::ZlibEncoder::new(
272                                                            Vec::new(),
273                                                            flate2::Compression::new(
274                                                                zlib_compression_level,
275                                                            ),
276                                                        );
277                                                        e.write_all(&seg)?;
278                                                        e.finish()?
279                                                    }
280                                                };
281                                                let mut file = file.lock_blocking();
282                                                let start = file.seek(std::io::SeekFrom::End(0))?;
283                                                file.write_all(&data)?;
284                                                info.start = start;
285                                                info.archived_size = data.len() as u64;
286                                                let stats = stats.clone();
287                                                stats.total_original_size.fetch_add(
288                                                    info.original_size,
289                                                    Ordering::Relaxed,
290                                                );
291                                                stats.final_archive_size.fetch_add(
292                                                    info.archived_size,
293                                                    Ordering::Relaxed,
294                                                );
295                                                stats
296                                                    .total_segments
297                                                    .fetch_add(1, Ordering::Relaxed);
298                                                stats
299                                                    .unique_segments
300                                                    .fetch_add(1, Ordering::Relaxed);
301                                                let mut segments = segments.lock_blocking();
302                                                segments.insert(hash, info.clone());
303                                                let ninfo = Segment {
304                                                    is_compressed: info.is_compressed,
305                                                    start: info.start,
306                                                    offset_in_file: seg_offset_in_file,
307                                                    original_size: info.original_size,
308                                                    archived_size: info.archived_size,
309                                                };
310                                                let mut item = item.lock_blocking();
311                                                item.original_size += ninfo.original_size;
312                                                item.archived_size += ninfo.archived_size;
313                                                item.segments.push(ninfo);
314                                                let mut processing =
315                                                    processiong_segments.lock_blocking();
316                                                processing.remove(&hash);
317                                                Ok(())
318                                            },
319                                            true,
320                                        )?;
321                                        None
322                                    } else {
323                                        {
324                                            let mut processing =
325                                                processiong_segments.lock_blocking();
326                                            processing.insert(hash);
327                                        }
328                                        let data = seg;
329                                        let mut file = file.lock_blocking();
330                                        let start = file.seek(std::io::SeekFrom::End(0))?;
331                                        file.write_all(&data)?;
332                                        info.start = start;
333                                        info.archived_size = data.len() as u64;
334                                        let stats = stats.clone();
335                                        stats
336                                            .total_original_size
337                                            .fetch_add(info.original_size, Ordering::Relaxed);
338                                        stats
339                                            .final_archive_size
340                                            .fetch_add(info.archived_size, Ordering::Relaxed);
341                                        stats.total_segments.fetch_add(1, Ordering::Relaxed);
342                                        stats.unique_segments.fetch_add(1, Ordering::Relaxed);
343                                        let mut segments = segments.lock_blocking();
344                                        segments.insert(hash, info.clone());
345                                        let ninfo = Segment {
346                                            is_compressed: info.is_compressed,
347                                            start: info.start,
348                                            offset_in_file: seg_offset_in_file,
349                                            original_size: info.original_size,
350                                            archived_size: info.archived_size,
351                                        };
352                                        {
353                                            let mut processing =
354                                                processiong_segments.lock_blocking();
355                                            processing.remove(&hash);
356                                        }
357                                        Some(ninfo)
358                                    }
359                                }
360                                Err(mut seg_info) => {
361                                    let mut need_update = false;
362                                    loop {
363                                        if {
364                                            let processing = processiong_segments.lock_blocking();
365                                            !processing.contains(&hash)
366                                        } {
367                                            break;
368                                        }
369                                        need_update = true;
370                                        std::thread::sleep(std::time::Duration::from_millis(10));
371                                    }
372                                    if need_update {
373                                        seg_info = {
374                                            let segments = segments.lock_blocking();
375                                            segments
376                                                .get(&hash)
377                                                .ok_or(anyhow::anyhow!(
378                                                    "Failed to get latest segment info."
379                                                ))?
380                                                .clone()
381                                        };
382                                    }
383                                    let stats = stats.clone();
384                                    stats
385                                        .total_original_size
386                                        .fetch_add(seg_info.original_size, Ordering::Relaxed);
387                                    stats
388                                        .deduplication_savings
389                                        .fetch_add(seg_info.archived_size, Ordering::Relaxed);
390                                    stats.total_segments.fetch_add(1, Ordering::Relaxed);
391                                    let ninfo = Segment {
392                                        is_compressed: seg_info.is_compressed,
393                                        start: seg_info.start,
394                                        offset_in_file: seg_offset_in_file,
395                                        original_size: seg_info.original_size,
396                                        archived_size: seg_info.archived_size,
397                                    };
398                                    Some(ninfo)
399                                }
400                            };
401                            if let Some(fseg) = fseg {
402                                let mut item = item.lock_blocking();
403                                item.original_size += fseg.original_size;
404                                item.archived_size += fseg.archived_size;
405                                item.segments.push(fseg);
406                            }
407                        }
408                    } else {
409                        let mut file = file.lock_blocking();
410                        let start = file.seek(std::io::SeekFrom::End(0))?;
411                        let size = {
412                            let mut writer = if is_compressed {
413                                if use_zstd {
414                                    let e = zstd::stream::Encoder::new(
415                                        &mut *file,
416                                        zstd_compression_level,
417                                    )?;
418                                    Box::new(e) as Box<dyn Write>
419                                } else {
420                                    let e = flate2::write::ZlibEncoder::new(
421                                        &mut *file,
422                                        flate2::Compression::new(zlib_compression_level),
423                                    );
424                                    Box::new(e) as Box<dyn Write>
425                                }
426                            } else {
427                                Box::new(&mut *file) as Box<dyn Write>
428                            };
429                            std::io::copy(&mut reader, &mut writer)?
430                        };
431                        let ninfo = Segment {
432                            is_compressed,
433                            start,
434                            offset_in_file: 0,
435                            original_size: size,
436                            archived_size: if is_compressed {
437                                file.stream_position()? - start
438                            } else {
439                                size
440                            },
441                        };
442                        let mut item = item.lock_blocking();
443                        item.original_size += ninfo.original_size;
444                        item.archived_size += ninfo.archived_size;
445                        let stats = stats.clone();
446                        stats
447                            .total_original_size
448                            .fetch_add(ninfo.original_size, Ordering::Relaxed);
449                        stats
450                            .final_archive_size
451                            .fetch_add(ninfo.archived_size, Ordering::Relaxed);
452                        stats.total_segments.fetch_add(1, Ordering::Relaxed);
453                        stats.unique_segments.fetch_add(1, Ordering::Relaxed);
454                        item.segments.push(ninfo);
455                    }
456                    if let Some(workers) = workers {
457                        workers.join();
458                        for err in workers.take_results() {
459                            err?;
460                        }
461                    }
462                    let mut item = item.lock_blocking().to_owned();
463                    item.file_hash = reader.into_checksum();
464                    item.segments.sort_by_key(|s| s.offset_in_file);
465                    let mut items = items.lock_blocking();
466                    items.insert(item.name.clone(), item);
467                    Ok(())
468                },
469                true,
470            )?;
471        }
472        Ok(Box::new(writer))
473    }
474
475    fn write_header(&mut self) -> Result<()> {
476        self.runner.join();
477        for err in self.runner.take_results() {
478            err?;
479        }
480        let mut file = self.file.lock_blocking();
481        let index_offset = file.seek(std::io::SeekFrom::End(0))?;
482        let mut index_data = MemWriter::new();
483        let items = self.items.lock_blocking();
484        for (_, item) in items.iter() {
485            let mut file_chunk = MemWriter::new();
486            let name = encode_string(Encoding::Utf16LE, &item.name, false)?;
487            let info_data_size = name.len() as u64 + 22;
488            file_chunk.write_all(CHUNK_INFO)?;
489            file_chunk.write_u64(info_data_size)?;
490            file_chunk.write_u32(0)?; // flags
491            file_chunk.write_u64(item.original_size)?;
492            file_chunk.write_u64(item.archived_size)?;
493            file_chunk.write_u16(name.len() as u16 / 2)?;
494            file_chunk.write_all(&name)?;
495            let segm_data_size = item.segments.len() as u64 * 28;
496            file_chunk.write_all(CHUNK_SEGM)?;
497            file_chunk.write_u64(segm_data_size)?;
498            for seg in &item.segments {
499                let flag = if seg.is_compressed {
500                    TVP_XP3_SEGM_ENCODE_ZLIB
501                } else {
502                    TVP_XP3_SEGM_ENCODE_RAW
503                };
504                file_chunk.write_u32(flag)?;
505                file_chunk.write_u64(seg.start)?;
506                file_chunk.write_u64(seg.original_size)?;
507                file_chunk.write_u64(seg.archived_size)?;
508            }
509            let adlr_data_size = 4;
510            file_chunk.write_all(CHUNK_ADLR)?;
511            file_chunk.write_u64(adlr_data_size)?;
512            if self.no_adler {
513                file_chunk.write_u32(0)?;
514            } else {
515                file_chunk.write_u32(item.file_hash)?;
516            }
517            index_data.write_all(CHUNK_FILE)?;
518            let file_chunk = file_chunk.into_inner();
519            index_data.write_u64(file_chunk.len() as u64)?;
520            index_data.write_all(&file_chunk)?;
521        }
522        let index_data = index_data.into_inner();
523        if self.compress_index {
524            let compressed_index = if self.use_zstd {
525                let mut e = zstd::stream::Encoder::new(Vec::new(), self.zstd_compression_level)?;
526                e.write_all(&index_data)?;
527                e.finish()?
528            } else {
529                let mut e = flate2::write::ZlibEncoder::new(
530                    Vec::new(),
531                    flate2::Compression::new(self.zlib_compression_level),
532                );
533                e.write_all(&index_data)?;
534                e.finish()?
535            };
536            file.write_u8(TVP_XP3_INDEX_ENCODE_ZLIB)?;
537            file.write_u64(compressed_index.len() as u64)?;
538            file.write_u64(index_data.len() as u64)?;
539            file.write_all(&compressed_index)?;
540        } else {
541            file.write_u8(TVP_XP3_INDEX_ENCODE_RAW)?;
542            file.write_u64(index_data.len() as u64)?;
543            file.write_all(&index_data)?;
544        }
545        file.write_u64_at(11, index_offset)?; // Write index offset to header
546        file.flush()?;
547        eprintln!("XP3 Archive Statistics:\n{}", self.stats);
548        Ok(())
549    }
550}