Ingest new json format

This commit is contained in:
Joscha 2022-10-03 17:35:11 +02:00
parent 78a5aa5169
commit 0e0789cc4d
5 changed files with 137 additions and 162 deletions

View file

@ -1,19 +1,20 @@
use std::collections::hash_map::Entry;
use std::io::{self, BufRead, BufReader};
use std::path::Path;
use rustc_hash::FxHashMap;
use serde::Deserialize;
use crate::data::{AdjacencyList, Link, Page, SlimAdjacencyList};
use crate::data::{AdjacencyList, Link, Page};
use crate::util;
#[derive(Deserialize)]
struct JsonPage {
ns: u16,
id: u32,
title: String,
redirect: Option<String>,
#[serde(default)]
length: u32,
links: Vec<(String, u32, u32)>,
redirect: Option<String>,
}
/*
@ -39,30 +40,23 @@ form a proper adjacency list.
struct FirstStage {
/// List with page info and index into [`Self::links`].
///
/// The first entry with id 0 represents a nonexistent link.
pages: Vec<Page>,
/// Map from index in [`Self::titles`] to index in [`Self::pages`] (used during the second pass).
pages_map: FxHashMap<u32, u32>,
/// List with link info and index into [`Self::titles`].
links: Vec<Link>,
/// List with titles.
titles: Vec<String>,
/// Map from title to index in [`Self::titles`] (used during decoding).
/// Map from normalized title to index in [`Self::titles`].
titles_map: FxHashMap<String, u32>,
}
impl FirstStage {
fn new() -> Self {
let mut result = Self {
Self {
pages: vec![],
pages_map: FxHashMap::default(),
links: vec![],
titles: vec![],
titles_map: FxHashMap::default(),
};
result.push_page(0, 0, "this link does not exist".to_string(), false);
result
}
}
fn insert_title(&mut self, title: String) -> u32 {
@ -77,110 +71,150 @@ impl FirstStage {
}
}
fn push_page(&mut self, ns: u16, id: u32, title: String, redirect: bool) {
fn insert_page(&mut self, id: u32, title: String, redirect: bool) {
let link_idx = self.pages.len() as u32;
self.pages.push(Page {
link_idx: self.links.len() as u32,
ns,
link_idx,
id,
title,
redirect,
});
}
fn insert_page(&mut self, ns: u16, id: u32, title: String, redirect: bool) {
// We know we haven't seen the page before
let title_idx = self.insert_title(title.clone());
let idx = self.pages.len() as u32;
self.push_page(ns, id, title, redirect);
self.pages_map.insert(title_idx, idx);
}
fn insert_link(&mut self, to: u32, start: u32, end: u32) {
self.links.push(Link { to, start, end });
}
fn import_json_page(&mut self, page: JsonPage) {
self.insert_page(page.ns, page.id, page.title, page.redirect.is_some());
if let Some(to) = page.redirect {
let to = self.insert_title(to);
self.insert_link(to, 0, 0);
} else {
for (to, start, end) in page.links {
let to = self.insert_title(to);
self.insert_link(to, start, end);
}
self.insert_page(page.id, page.title, page.redirect.is_some());
for (to, start, end) in page.links {
let to = self.insert_title(util::normalize_link(&to));
self.insert_link(to, start, end);
}
}
fn finalize(&mut self) {
self.insert_page(
0,
0,
"dummy page at the end of all pages".to_string(),
false,
);
self.insert_page(0, "dummy page at the end of all pages".to_string(), false);
}
fn from_stdin() -> io::Result<Self> {
let mut result = Self::new();
let stdin = BufReader::new(io::stdin());
for (i, line) in stdin.lines().enumerate() {
// let json_page = serde_json::from_str::<JsonPage>(&line?)?;
let json_page = simd_json::serde::from_str::<JsonPage>(&mut line?).unwrap();
result.import_json_page(json_page);
if (i + 1) % 100_000 == 0 {
eprintln!("{} pages imported", i + 1)
}
}
result.finalize();
Ok(result)
}
}
fn first_stage() -> io::Result<FirstStage> {
let mut first_stage = FirstStage::new();
let mut n = 0;
let stdin = BufReader::new(io::stdin());
for line in stdin.lines() {
// let json_page = serde_json::from_str::<JsonPage>(&line?)?;
let json_page = simd_json::serde::from_str::<JsonPage>(&mut line?).unwrap();
first_stage.import_json_page(json_page);
n += 1;
if n % 100_000 == 0 {
eprintln!("{n} imported")
}
}
first_stage.finalize();
Ok(first_stage)
struct SecondStage {
/// List with page info and index into [`Self::links`].
pages: Vec<Page>,
/// List with link info and index into [`Self::pages`].
links: Vec<Link>,
/// Map from normalized title to index in [`Self::pages`].
pages_map: FxHashMap<String, u32>,
}
fn second_stage(mut fs: FirstStage) -> AdjacencyList {
let mut n = 0;
for link in &mut fs.links {
if let Some(to) = fs.pages_map.get(&link.to) {
link.to = *to;
} else {
link.to = 0;
}
n += 1;
if n % 10_000_000 == 0 {
eprintln!("{n} links converted");
impl SecondStage {
fn new() -> Self {
Self {
pages: vec![],
links: vec![],
pages_map: FxHashMap::default(),
}
}
AdjacencyList {
pages: fs.pages,
links: fs.links,
fn initialize_pages_map(&mut self, pages: &[Page]) {
for (idx, page) in pages.iter().enumerate() {
let title = util::normalize_link(&page.title);
self.pages_map.insert(title, idx as u32);
}
}
fn insert_page(&mut self, page: &Page) {
let mut page = page.clone();
page.link_idx = self.pages.len() as u32;
self.pages.push(page);
}
fn insert_link(&mut self, mut link: Link, titles: &[String]) {
let title = &titles[link.to as usize];
if let Some(page_idx) = self.pages_map.get(title) {
link.to = *page_idx;
self.links.push(link);
}
}
fn finalize(&mut self, pages: &[Page]) {
self.insert_page(pages.last().unwrap());
}
fn from_first_stage(first_stage: FirstStage) -> Self {
drop(first_stage.titles_map);
let mut result = Self::new();
eprintln!("> Initializing pages map");
result.initialize_pages_map(&first_stage.pages);
eprintln!("> Rearranging links");
for page_idx in 0..first_stage.pages.len() - 1 {
let page = &first_stage.pages[page_idx];
result.insert_page(page);
let next_link_idx = first_stage.pages[page_idx + 1].link_idx;
for link_idx in page.link_idx..next_link_idx {
let link = first_stage.links[link_idx as usize];
result.insert_link(link, &first_stage.titles);
}
if (page_idx + 1) % 100_000 == 0 {
eprintln!("{} pages updated", page_idx + 1);
}
}
result.finalize(&first_stage.pages);
result
}
fn into_adjacency_list(self) -> AdjacencyList {
AdjacencyList {
pages: self.pages,
links: self.links,
}
}
}
pub fn ingest() -> io::Result<()> {
eprintln!("FIRST STAGE");
let first_stage = first_stage()?;
eprintln!("SECOND STAGE");
let second_stage = second_stage(first_stage);
pub fn ingest(datafile: &Path) -> io::Result<()> {
eprintln!(">> First stage");
let first_stage = FirstStage::from_stdin()?;
eprintln!("CONSISTENCY CHECK");
let range = 0..second_stage.pages.len() as u32;
for link in &second_stage.links {
eprintln!(">> Second stage");
let second_stage = SecondStage::from_first_stage(first_stage);
let data = second_stage.into_adjacency_list();
eprintln!(">> Consistency check");
let range = 0..data.pages.len() as u32;
for link in &data.links {
if !range.contains(&link.to) {
eprintln!("Invalid link detected!");
}
}
eprintln!("EXPORT");
let data = SlimAdjacencyList::from_alist(second_stage);
ciborium::ser::into_writer(&data, io::stdout()).unwrap();
// eprintln!("EXPORT");
// let data = SlimAdjacencyList::from_alist(second_stage);
// ciborium::ser::into_writer(&data, io::stdout()).unwrap();
// simd_json::to_writer(io::stdout(), &data).unwrap();
Ok(())