From 0e0789cc4d8493bd3770314d36c0f4ba1c53760c Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 3 Oct 2022 17:35:11 +0200 Subject: [PATCH] Ingest new json format --- brood/src/data.rs | 51 +---------- brood/src/ingest.rs | 204 ++++++++++++++++++++++++++------------------ brood/src/main.rs | 20 +++-- brood/src/test.rs | 21 ----- brood/src/util.rs | 3 + 5 files changed, 137 insertions(+), 162 deletions(-) delete mode 100644 brood/src/test.rs create mode 100644 brood/src/util.rs diff --git a/brood/src/data.rs b/brood/src/data.rs index 560f383..130f0ef 100644 --- a/brood/src/data.rs +++ b/brood/src/data.rs @@ -1,15 +1,14 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Page { pub link_idx: u32, - pub ns: u16, pub id: u32, pub title: String, pub redirect: bool, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct Link { pub to: u32, pub start: u32, @@ -21,49 +20,3 @@ pub struct AdjacencyList { pub pages: Vec, pub links: Vec, } - -#[derive(Debug, Serialize, Deserialize)] -pub struct SlimAdjacencyList { - pages: Vec<(u32, u32, u16, String, bool)>, - links: Vec<(u32, u32, u32)>, -} - -impl SlimAdjacencyList { - pub fn from_alist(alist: AdjacencyList) -> Self { - let pages = alist - .pages - .into_iter() - .map(|p| (p.link_idx, p.id, p.ns, p.title, p.redirect)) - .collect(); - - let links = alist - .links - .into_iter() - .map(|l| (l.to, l.start, l.end)) - .collect(); - - Self { pages, links } - } - - pub fn to_alist(self) -> AdjacencyList { - let pages = self - .pages - .into_iter() - .map(|(link_idx, id, ns, title, redirect)| Page { - link_idx, - ns, - id, - title, - redirect, - }) - .collect(); - - let links = self - .links - .into_iter() - .map(|(to, start, end)| Link { to, start, end }) - .collect(); - - AdjacencyList { pages, links } - } -} diff --git a/brood/src/ingest.rs b/brood/src/ingest.rs index d084a91..669411b 100644 --- a/brood/src/ingest.rs +++ b/brood/src/ingest.rs @@ -1,19 +1,20 @@ use std::collections::hash_map::Entry; use std::io::{self, BufRead, BufReader}; +use std::path::Path; use rustc_hash::FxHashMap; use serde::Deserialize; -use crate::data::{AdjacencyList, Link, Page, SlimAdjacencyList}; +use crate::data::{AdjacencyList, Link, Page}; +use crate::util; #[derive(Deserialize)] struct JsonPage { - ns: u16, id: u32, title: String, - redirect: Option, - #[serde(default)] + length: u32, links: Vec<(String, u32, u32)>, + redirect: Option, } /* @@ -39,30 +40,23 @@ form a proper adjacency list. struct FirstStage { /// List with page info and index into [`Self::links`]. - /// - /// The first entry with id 0 represents a nonexistent link. pages: Vec, - /// Map from index in [`Self::titles`] to index in [`Self::pages`] (used during the second pass). - pages_map: FxHashMap, /// List with link info and index into [`Self::titles`]. links: Vec, /// List with titles. titles: Vec, - /// Map from title to index in [`Self::titles`] (used during decoding). + /// Map from normalized title to index in [`Self::titles`]. titles_map: FxHashMap, } impl FirstStage { fn new() -> Self { - let mut result = Self { + Self { pages: vec![], - pages_map: FxHashMap::default(), links: vec![], titles: vec![], titles_map: FxHashMap::default(), - }; - result.push_page(0, 0, "this link does not exist".to_string(), false); - result + } } fn insert_title(&mut self, title: String) -> u32 { @@ -77,110 +71,150 @@ impl FirstStage { } } - fn push_page(&mut self, ns: u16, id: u32, title: String, redirect: bool) { + fn insert_page(&mut self, id: u32, title: String, redirect: bool) { + let link_idx = self.pages.len() as u32; self.pages.push(Page { - link_idx: self.links.len() as u32, - ns, + link_idx, id, title, redirect, }); } - fn insert_page(&mut self, ns: u16, id: u32, title: String, redirect: bool) { - // We know we haven't seen the page before - let title_idx = self.insert_title(title.clone()); - let idx = self.pages.len() as u32; - self.push_page(ns, id, title, redirect); - self.pages_map.insert(title_idx, idx); - } - fn insert_link(&mut self, to: u32, start: u32, end: u32) { self.links.push(Link { to, start, end }); } fn import_json_page(&mut self, page: JsonPage) { - self.insert_page(page.ns, page.id, page.title, page.redirect.is_some()); - if let Some(to) = page.redirect { - let to = self.insert_title(to); - self.insert_link(to, 0, 0); - } else { - for (to, start, end) in page.links { - let to = self.insert_title(to); - self.insert_link(to, start, end); - } + self.insert_page(page.id, page.title, page.redirect.is_some()); + for (to, start, end) in page.links { + let to = self.insert_title(util::normalize_link(&to)); + self.insert_link(to, start, end); } } fn finalize(&mut self) { - self.insert_page( - 0, - 0, - "dummy page at the end of all pages".to_string(), - false, - ); + self.insert_page(0, "dummy page at the end of all pages".to_string(), false); + } + + fn from_stdin() -> io::Result { + let mut result = Self::new(); + + let stdin = BufReader::new(io::stdin()); + for (i, line) in stdin.lines().enumerate() { + // let json_page = serde_json::from_str::(&line?)?; + let json_page = simd_json::serde::from_str::(&mut line?).unwrap(); + result.import_json_page(json_page); + + if (i + 1) % 100_000 == 0 { + eprintln!("{} pages imported", i + 1) + } + } + + result.finalize(); + Ok(result) } } -fn first_stage() -> io::Result { - let mut first_stage = FirstStage::new(); - let mut n = 0; - - let stdin = BufReader::new(io::stdin()); - for line in stdin.lines() { - // let json_page = serde_json::from_str::(&line?)?; - let json_page = simd_json::serde::from_str::(&mut line?).unwrap(); - first_stage.import_json_page(json_page); - - n += 1; - if n % 100_000 == 0 { - eprintln!("{n} imported") - } - } - - first_stage.finalize(); - Ok(first_stage) +struct SecondStage { + /// List with page info and index into [`Self::links`]. + pages: Vec, + /// List with link info and index into [`Self::pages`]. + links: Vec, + /// Map from normalized title to index in [`Self::pages`]. + pages_map: FxHashMap, } -fn second_stage(mut fs: FirstStage) -> AdjacencyList { - let mut n = 0; - - for link in &mut fs.links { - if let Some(to) = fs.pages_map.get(&link.to) { - link.to = *to; - } else { - link.to = 0; - } - - n += 1; - if n % 10_000_000 == 0 { - eprintln!("{n} links converted"); +impl SecondStage { + fn new() -> Self { + Self { + pages: vec![], + links: vec![], + pages_map: FxHashMap::default(), } } - AdjacencyList { - pages: fs.pages, - links: fs.links, + fn initialize_pages_map(&mut self, pages: &[Page]) { + for (idx, page) in pages.iter().enumerate() { + let title = util::normalize_link(&page.title); + self.pages_map.insert(title, idx as u32); + } + } + + fn insert_page(&mut self, page: &Page) { + let mut page = page.clone(); + page.link_idx = self.pages.len() as u32; + self.pages.push(page); + } + + fn insert_link(&mut self, mut link: Link, titles: &[String]) { + let title = &titles[link.to as usize]; + if let Some(page_idx) = self.pages_map.get(title) { + link.to = *page_idx; + self.links.push(link); + } + } + + fn finalize(&mut self, pages: &[Page]) { + self.insert_page(pages.last().unwrap()); + } + + fn from_first_stage(first_stage: FirstStage) -> Self { + drop(first_stage.titles_map); + + let mut result = Self::new(); + + eprintln!("> Initializing pages map"); + result.initialize_pages_map(&first_stage.pages); + + eprintln!("> Rearranging links"); + for page_idx in 0..first_stage.pages.len() - 1 { + let page = &first_stage.pages[page_idx]; + result.insert_page(page); + + let next_link_idx = first_stage.pages[page_idx + 1].link_idx; + for link_idx in page.link_idx..next_link_idx { + let link = first_stage.links[link_idx as usize]; + result.insert_link(link, &first_stage.titles); + } + + if (page_idx + 1) % 100_000 == 0 { + eprintln!("{} pages updated", page_idx + 1); + } + } + + result.finalize(&first_stage.pages); + result + } + + fn into_adjacency_list(self) -> AdjacencyList { + AdjacencyList { + pages: self.pages, + links: self.links, + } } } -pub fn ingest() -> io::Result<()> { - eprintln!("FIRST STAGE"); - let first_stage = first_stage()?; - eprintln!("SECOND STAGE"); - let second_stage = second_stage(first_stage); +pub fn ingest(datafile: &Path) -> io::Result<()> { + eprintln!(">> First stage"); + let first_stage = FirstStage::from_stdin()?; - eprintln!("CONSISTENCY CHECK"); - let range = 0..second_stage.pages.len() as u32; - for link in &second_stage.links { + eprintln!(">> Second stage"); + let second_stage = SecondStage::from_first_stage(first_stage); + + let data = second_stage.into_adjacency_list(); + + eprintln!(">> Consistency check"); + let range = 0..data.pages.len() as u32; + for link in &data.links { if !range.contains(&link.to) { eprintln!("Invalid link detected!"); } } - eprintln!("EXPORT"); - let data = SlimAdjacencyList::from_alist(second_stage); - ciborium::ser::into_writer(&data, io::stdout()).unwrap(); + // eprintln!("EXPORT"); + // let data = SlimAdjacencyList::from_alist(second_stage); + // ciborium::ser::into_writer(&data, io::stdout()).unwrap(); // simd_json::to_writer(io::stdout(), &data).unwrap(); Ok(()) diff --git a/brood/src/main.rs b/brood/src/main.rs index ab78063..6547c63 100644 --- a/brood/src/main.rs +++ b/brood/src/main.rs @@ -1,8 +1,9 @@ -mod ingest; mod data; -mod test; +mod ingest; +mod util; use std::io; +use std::path::PathBuf; use clap::Parser; @@ -10,13 +11,18 @@ use clap::Parser; enum Command { /// Read sift data on stdin and output brood data on stdout. Ingest, - /// Test various things - Test, +} + +#[derive(Debug, Parser)] +struct Args { + datafile: PathBuf, + #[command(subcommand)] + command: Command, } fn main() -> io::Result<()> { - match Command::parse() { - Command::Ingest => ingest::ingest(), - Command::Test => test::test(), + let args = Args::parse(); + match args.command { + Command::Ingest => ingest::ingest(&args.datafile), } } diff --git a/brood/src/test.rs b/brood/src/test.rs deleted file mode 100644 index 5ebeabf..0000000 --- a/brood/src/test.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::io::{self, BufReader}; - -use crate::data::SlimAdjacencyList; - -pub fn test() -> io::Result<()> { - eprintln!("IMPORT"); - let data: SlimAdjacencyList = ciborium::de::from_reader(BufReader::new(io::stdin())).unwrap(); - // let data: SlimAdjacencyList = - // simd_json::serde::from_reader(BufReader::new(io::stdin())).unwrap(); - let data = data.to_alist(); - - eprintln!("CONSISTENCY CHECK"); - let range = 0..data.pages.len() as u32; - for link in &data.links { - if !range.contains(&link.to) { - eprintln!("Invalid link detected!"); - } - } - - Ok(()) -} diff --git a/brood/src/util.rs b/brood/src/util.rs new file mode 100644 index 0000000..ac9a115 --- /dev/null +++ b/brood/src/util.rs @@ -0,0 +1,3 @@ +pub fn normalize_link(link: &str) -> String { + link.trim().to_lowercase().replace(' ', "_") +}