Rewrite ingest command

This commit is contained in:
Joscha 2024-12-31 00:48:24 +01:00
parent f819f5bf69
commit 3aa8222b6b
8 changed files with 502 additions and 242 deletions

View file

@ -1,16 +1,18 @@
use std::collections::hash_map::Entry;
use std::fs::File;
use std::io::{self, BufRead, BufReader, BufWriter};
use std::path::Path;
use std::u32;
use std::{
collections::{hash_map::Entry, HashMap},
fs::File,
io::{self, BufRead, BufReader, Seek},
path::{Path, PathBuf},
};
use rustc_hash::FxHashMap;
use serde::Deserialize;
use thousands::Separable;
use crate::data::adjacency_list::{AdjacencyList, Page};
use crate::data::info::{LinkInfo, PageInfo};
use crate::data::store;
use crate::util;
use crate::{
data::{self, Link, Page},
graph::{Graph, NodeIdx},
util::{Counter, TitleNormalizer},
};
#[derive(Deserialize)]
struct JsonPage {
@ -21,151 +23,139 @@ struct JsonPage {
redirect: Option<String>,
}
/*
Importing is a tad complicated because of multiple criteria:
fn read_titles(r: &mut BufReader<File>) -> io::Result<Vec<String>> {
let mut counter = Counter::new();
let mut titles = vec![];
1. The data must be read in a single pass on stdin
2. The process should not consume a lot of memory
(can't store the decoded json data directly)
3. The process should result in a nice and compact adjacency list format
Because of this, the import is a bit more complex and has two passes.
The first pass imports the data into an adjacency-list-like format, but the
`Link::to` field points to a title in `Titles` instead of a page.
The second pass then resolves the links to page indices and throws away all
links that don't point to any known page.
*/
#[derive(Default)]
struct Titles {
/// Normalized titles
titles: Vec<String>,
/// Map from normalized title to index in [`Self::titles`].
map: FxHashMap<String, u32>,
}
impl Titles {
fn insert(&mut self, title: String) -> u32 {
match self.map.entry(title.clone()) {
Entry::Occupied(occupied) => *occupied.get(),
Entry::Vacant(vacant) => {
let idx = self.titles.len() as u32;
self.titles.push(title);
vacant.insert(idx);
idx
}
}
for line in r.lines() {
counter.tick();
let page = serde_json::from_str::<JsonPage>(&line?).unwrap();
titles.push(page.title);
}
fn get(&self, i: u32) -> &str {
&self.titles[i as usize]
}
counter.done();
Ok(titles)
}
fn first_stage() -> io::Result<(AdjacencyList<PageInfo, LinkInfo>, Titles)> {
let mut titles = Titles::default();
let mut result = AdjacencyList::default();
fn compute_title_lookup(normalizer: &TitleNormalizer, titles: &[String]) -> HashMap<String, u32> {
let mut counter = Counter::new();
let mut title_lookup = HashMap::new();
let stdin = BufReader::new(io::stdin());
for (i, line) in stdin.lines().enumerate() {
let json_page = serde_json::from_str::<JsonPage>(&line?).unwrap();
result.push_page(PageInfo {
id: json_page.id,
length: json_page.length,
redirect: json_page.redirect.is_some(),
title: json_page.title,
});
if let Some(to) = json_page.redirect {
let to = titles.insert(util::normalize_link(&to));
result.push_link(to, LinkInfo::default());
} else {
for (to, start, len, flags) in json_page.links {
let to = titles.insert(util::normalize_link(&to));
result.push_link(to, LinkInfo { start, len, flags });
}
}
if (i + 1) % 100_000 == 0 {
eprintln!("{} pages imported", i + 1)
}
}
eprintln!("Pages: {}", result.pages.len());
eprintln!("Links: {}", result.links.len());
eprintln!("Titles: {}", titles.titles.len());
eprintln!("Title map entries: {}", titles.map.len());
Ok((result, titles))
}
/// Create map from normalized title to index in pages.
fn initialize_pages_map(pages: &[Page<PageInfo>]) -> FxHashMap<String, u32> {
let mut result = FxHashMap::default();
for (i, p) in pages.iter().enumerate() {
match result.entry(util::normalize_link(&p.data.title)) {
Entry::Occupied(entry) => {
eprintln!(
"{:?} already exists at index {} as {:?}",
p.data.title,
entry.get(),
util::normalize_link(&p.data.title)
);
for (i, title) in titles.iter().enumerate() {
counter.tick();
match title_lookup.entry(normalizer.normalize(title)) {
Entry::Occupied(mut entry) => {
let prev_i = *entry.get();
let prev = &titles[prev_i as usize];
if prev == title {
println!(" {title:?} ({prev_i}) occurs again at {i}");
// Prefer later occurrences of articles over earlier ones under
// the assumption that their contents are "fresher".
entry.insert(i as u32);
} else {
println!(
" {prev:?} ({prev_i}) and {title:?} ({i}) both normalize to {:?}",
normalizer.normalize(title)
);
}
}
Entry::Vacant(entry) => {
entry.insert(i as u32);
}
}
}
result
counter.done();
title_lookup
}
fn second_stage(
first_stage: &AdjacencyList<PageInfo, LinkInfo>,
titles: &Titles,
) -> AdjacencyList<PageInfo, LinkInfo> {
let pages_map = initialize_pages_map(&first_stage.pages);
let mut result = AdjacencyList::default();
fn read_page_data(
normalizer: &TitleNormalizer,
title_lookup: &HashMap<String, u32>,
r: &mut BufReader<File>,
) -> io::Result<(Vec<Page>, Vec<Link>, Graph)> {
let mut counter = Counter::new();
let mut pages = vec![];
let mut links = vec![];
let mut graph = Graph::new();
for (page_idx, page) in first_stage.pages() {
result.push_page(page.data.clone());
for (i, line) in r.lines().enumerate() {
counter.tick();
let page = serde_json::from_str::<JsonPage>(&line?).unwrap();
let normalized = normalizer.normalize(&page.title);
for (_, link) in first_stage.links(page_idx) {
let title = util::normalize_link(titles.get(link.to));
if let Some(to) = pages_map.get(&title) {
// The link points to an existing article, we should keep it
result.push_link(*to, link.data);
}
let expected_i = title_lookup[&normalized];
if i as u32 != expected_i {
// Articles may occur multiple times, and this is not the instance
// of the article we should keep.
println!(" Skipping {:?} ({i}) in favor of {expected_i}", page.title);
continue;
}
if (page_idx + 1) % 100_000 == 0 {
eprintln!("{} pages imported", page_idx + 1)
graph.add_node();
pages.push(Page {
id: page.id,
title: page.title,
length: page.length,
redirect: page.redirect.is_some(),
});
let mut page_links = page.links;
if let Some(target) = page.redirect {
page_links.clear();
let len = target.len() as u32;
page_links.push((target, 0, len, 0));
}
for (target, start, len, flags) in page_links {
if let Some(target_i) = title_lookup.get(&normalizer.normalize(&target)) {
graph.edges.push(NodeIdx(*target_i));
links.push(Link { start, len, flags });
}
}
}
eprintln!("Pages: {}", result.pages.len());
eprintln!("Links: {}", result.links.len());
eprintln!("Page map entries: {}", pages_map.len());
result
counter.done();
Ok((pages, links, graph))
}
pub fn ingest(datafile: &Path) -> io::Result<()> {
eprintln!(">> First stage");
let (first_stage, titles) = first_stage()?;
eprintln!(">> Second stage");
let data = second_stage(&first_stage, &titles);
eprintln!(">> Consistency check");
data.check_consistency();
eprintln!(">> Export");
let mut datafile = BufWriter::new(File::create(datafile)?);
store::write_adjacency_list(&data, &mut datafile)?;
Ok(())
/// Convert sift data to brood data.
#[derive(Debug, clap::Parser)]
pub struct Cmd {
/// The sift data file to ingest.
data: PathBuf,
}
impl Cmd {
pub fn run(self, data: &Path) -> io::Result<()> {
let normalizer = TitleNormalizer::new();
println!(">> First pass");
let mut sift_data = BufReader::new(File::open(&self.data)?);
println!("> Reading titles");
let titles = read_titles(&mut sift_data)?;
println!("> Computing title index lookup table");
let title_lookup = compute_title_lookup(&normalizer, &titles);
drop(titles); // Don't hoard memory
println!(">> Second pass");
sift_data.seek(io::SeekFrom::Start(0))?;
println!("> Reading page data");
let (pages, links, graph) = read_page_data(&normalizer, &title_lookup, &mut sift_data)?;
drop(title_lookup); // Don't hoard memory
drop(sift_data); // No longer needed
println!("> Checking consistency");
graph.check_consistency();
println!(">> Export");
println!("Pages: {}", pages.len().separate_with_underscores());
println!("Links: {}", links.len().separate_with_underscores());
data::write_to_file(data, &pages, &links, &graph)?;
Ok(())
}
}