Refactor ingestion

This commit is contained in:
Joscha 2022-10-22 01:01:49 +02:00
parent 3296f6d15a
commit 5656f65b6c

View file

@ -28,40 +28,24 @@ Importing is a tad complicated because of multiple criteria:
Because of this, the import is a bit more complex and has two passes.
The first pass imports the data into an adjacency-list-like format:
- `pages`: List with page info and index in `links`
- `links`: List with link info and index in `titles`
- `titles`: List with titles
- `titles_map`: Map from title to index in `titles` (used during decoding)
The first pass imports the data into an adjacency-list-like format, but the
`Link::to` field points to a title in `Titles` instead of a page.
The second pass then takes 1 and 3 and changes the indices in 3 to point to the
entries in 1 using 2 and 4. After this, 2, 4 and 5 can be discarded and 1 and 3
form a proper adjacency list.
The second pass then resolves the links to page indices and throws away all
links that don't point to any known page.
*/
struct FirstStage {
/// List with page info and index into [`Self::links`].
pages: Vec<Page<()>>,
/// List with link info and index into [`Self::titles`].
links: Vec<Link<()>>,
/// List with titles.
#[derive(Default)]
struct Titles {
/// Normalized titles
titles: Vec<String>,
/// Map from normalized title to index in [`Self::titles`].
titles_map: FxHashMap<String, u32>,
map: FxHashMap<String, u32>,
}
impl FirstStage {
fn new() -> Self {
Self {
pages: vec![],
links: vec![],
titles: vec![],
titles_map: FxHashMap::default(),
}
}
fn insert_title(&mut self, title: String) -> u32 {
match self.titles_map.entry(title.clone()) {
impl Titles {
fn insert(&mut self, title: String) -> u32 {
match self.map.entry(title.clone()) {
Entry::Occupied(occupied) => *occupied.get(),
Entry::Vacant(vacant) => {
let idx = self.titles.len() as u32;
@ -72,150 +56,113 @@ impl FirstStage {
}
}
fn insert_page(&mut self, id: u32, length: u32, redirect: bool, title: String) {
let link_idx = self.links.len() as u32;
self.pages.push(Page {
link_idx,
id,
length,
redirect,
title,
data: (),
});
}
fn insert_link(&mut self, to: u32, start: u32, end: u32) {
self.links.push(Link {
to,
start,
end,
data: (),
});
}
fn import_json_page(&mut self, page: JsonPage) {
self.insert_page(page.id, page.length, page.redirect.is_some(), page.title);
for (to, start, end) in page.links {
let to = self.insert_title(util::normalize_link(&to));
self.insert_link(to, start, end);
}
}
fn finalize(&mut self) {
self.insert_page(
0,
0,
false,
"dummy page at the end of all pages".to_string(),
);
}
fn from_stdin() -> io::Result<Self> {
let mut result = Self::new();
let stdin = BufReader::new(io::stdin());
for (i, line) in stdin.lines().enumerate() {
// let json_page = serde_json::from_str::<JsonPage>(&line?)?;
let json_page = simd_json::serde::from_str::<JsonPage>(&mut line?).unwrap();
result.import_json_page(json_page);
if (i + 1) % 100_000 == 0 {
eprintln!("{} pages imported", i + 1)
}
}
result.finalize();
Ok(result)
fn get(&self, i: u32) -> &str {
&self.titles[i as usize]
}
}
struct SecondStage {
/// List with page info and index into [`Self::links`].
pages: Vec<Page<()>>,
/// List with link info and index into [`Self::pages`].
links: Vec<Link<()>>,
/// Map from normalized title to index in [`Self::pages`].
pages_map: FxHashMap<String, u32>,
fn first_stage() -> io::Result<(AdjacencyList<(), ()>, Titles)> {
let mut titles = Titles::default();
let mut result = AdjacencyList::default();
let stdin = BufReader::new(io::stdin());
for (i, line) in stdin.lines().enumerate() {
let json_page = simd_json::serde::from_str::<JsonPage>(&mut line?).unwrap();
result.pages.push(Page {
link_idx: result.links.len() as u32,
id: json_page.id,
length: json_page.length,
redirect: json_page.redirect.is_some(),
title: json_page.title,
data: (),
});
for (to, start, end) in json_page.links {
let to = titles.insert(util::normalize_link(&to));
result.links.push(Link {
to,
start,
end,
data: (),
});
}
if (i + 1) % 100_000 == 0 {
eprintln!("{} pages imported", i + 1)
}
}
eprintln!("Pages: {}", result.pages.len());
eprintln!("Links: {}", result.links.len());
eprintln!("Titles: {}", titles.titles.len());
eprintln!("Title map entries: {}", titles.map.len());
result.pages.push(Page {
link_idx: result.links.len() as u32,
id: 0,
length: 0,
redirect: false,
title: "Sentinel page at the end of all pages, Q2AKO3OYzyitmCJURghJ".to_string(),
data: (),
});
Ok((result, titles))
}
impl SecondStage {
fn new() -> Self {
Self {
pages: vec![],
links: vec![],
pages_map: FxHashMap::default(),
}
/// Create map from normalized title to index in pages.
fn initialize_pages_map(pages: &[Page<()>]) -> FxHashMap<String, u32> {
let mut result = FxHashMap::default();
for (i, p) in pages.iter().enumerate() {
result.insert(util::normalize_link(&p.title), i as u32);
}
result
}
fn initialize_pages_map(&mut self, pages: &[Page<()>]) {
for (idx, page) in pages.iter().enumerate() {
let title = util::normalize_link(&page.title);
self.pages_map.insert(title, idx as u32);
}
}
fn second_stage(first_stage: &AdjacencyList<(), ()>, titles: &Titles) -> AdjacencyList<(), ()> {
let pages_map = initialize_pages_map(&first_stage.pages);
let mut result = AdjacencyList::default();
fn insert_page(&mut self, page: &Page<()>) {
let mut page = page.clone();
page.link_idx = self.links.len() as u32;
self.pages.push(page);
}
for page_idx in 0..first_stage.pages.len() - 1 {
let mut page = first_stage.pages[page_idx].clone();
let start_link_idx = page.link_idx;
let end_link_idx = first_stage.pages[page_idx + 1].link_idx;
fn insert_link(&mut self, mut link: Link<()>, titles: &[String]) {
let title = &titles[link.to as usize];
if let Some(page_idx) = self.pages_map.get(title) {
link.to = *page_idx;
self.links.push(link);
}
}
page.link_idx = result.links.len() as u32;
result.pages.push(page);
fn finalize(&mut self, pages: &[Page<()>]) {
self.insert_page(pages.last().unwrap());
}
fn from_first_stage(first_stage: FirstStage) -> Self {
drop(first_stage.titles_map);
let mut result = Self::new();
eprintln!("> Initializing pages map");
result.initialize_pages_map(&first_stage.pages);
eprintln!("> Rearranging links");
for page_idx in 0..first_stage.pages.len() - 1 {
let page = &first_stage.pages[page_idx];
result.insert_page(page);
let next_link_idx = first_stage.pages[page_idx + 1].link_idx;
for link_idx in page.link_idx..next_link_idx {
let link = first_stage.links[link_idx as usize];
result.insert_link(link, &first_stage.titles);
}
if (page_idx + 1) % 100_000 == 0 {
eprintln!("{} pages updated", page_idx + 1);
for link_idx in start_link_idx..end_link_idx {
let mut link = first_stage.links[link_idx as usize];
let title = util::normalize_link(titles.get(link.to));
if let Some(to) = pages_map.get(&title) {
// The link points to an existing article, we should keep it
link.to = *to;
result.links.push(link);
}
}
result.finalize(&first_stage.pages);
result
}
fn into_adjacency_list(self) -> AdjacencyList<(), ()> {
AdjacencyList {
pages: self.pages,
links: self.links,
if (page_idx + 1) % 100_000 == 0 {
eprintln!("{} pages processed", page_idx + 1)
}
}
eprintln!("Pages: {}", result.pages.len());
eprintln!("Links: {}", result.links.len());
eprintln!("Page map entries: {}", pages_map.len());
let mut sentinel = first_stage.pages.last().unwrap().clone();
sentinel.link_idx = result.links.len() as u32;
result.pages.push(sentinel);
result
}
pub fn ingest(datafile: &Path) -> io::Result<()> {
eprintln!(">> First stage");
let first_stage = FirstStage::from_stdin()?;
let (first_stage, titles) = first_stage()?;
eprintln!(">> Second stage");
let second_stage = SecondStage::from_first_stage(first_stage);
let data = second_stage.into_adjacency_list();
let data = second_stage(&first_stage, &titles);
eprintln!(">> Consistency check");
data.check_consistency();