Store finished work in db

This commit is contained in:
Joscha 2023-08-11 15:16:20 +02:00
parent a9e08505bc
commit 5e0b8e3c8c
9 changed files with 190 additions and 38 deletions

View file

@ -14,7 +14,7 @@ use axum::{
Json, Router, TypedHeader,
};
use gix::{ObjectId, ThreadSafeRepository};
use sqlx::SqlitePool;
use sqlx::{Acquire, SqlitePool};
use time::OffsetDateTime;
use tracing::debug;
@ -24,10 +24,109 @@ use crate::{
workers::{WorkerInfo, Workers},
BenchRepo, Repo, Server,
},
shared::{BenchMethod, ServerResponse, Work, WorkerRequest},
shared::{BenchMethod, FinishedRun, ServerResponse, Work, WorkerRequest},
somehow,
};
async fn save_work(run: FinishedRun, db: SqlitePool) -> somehow::Result<()> {
let mut tx = db.begin().await?;
let conn = tx.acquire().await?;
sqlx::query!(
"\
INSERT INTO runs ( \
id, \
hash, \
start, \
end, \
exit_code \
) \
VALUES (?, ?, ?, ?, ?) \
",
run.id,
run.hash,
run.start,
run.end,
run.exit_code,
)
.execute(&mut *conn)
.await?;
for (name, measurement) in run.measurements {
sqlx::query!(
"\
INSERT INTO run_measurements ( \
id, \
name, \
value, \
stddev, \
unit, \
direction \
) \
VALUES (?, ?, ?, ?, ?, ?) \
",
run.id,
name,
measurement.value,
measurement.stddev,
measurement.unit,
measurement.direction,
)
.execute(&mut *conn)
.await?;
}
for (idx, (source, text)) in run.output.into_iter().enumerate() {
// Hopefully we won't need more than 4294967296 output chunks per run :P
let idx = idx as u32;
sqlx::query!(
"\
INSERT INTO run_output ( \
id, \
idx, \
source, \
text \
) \
VALUES (?, ?, ?, ?) \
",
run.id,
idx,
source,
text,
)
.execute(&mut *conn)
.await?;
}
// The thing has been done :D
sqlx::query!("DELETE FROM queue WHERE hash = ?", run.hash)
.execute(&mut *conn)
.await?;
tx.commit().await?;
Ok(())
}
fn prepare_work(
work: Option<&str>,
bench_repo: Option<BenchRepo>,
) -> somehow::Result<Option<Work>> {
Ok(if let Some(hash) = work {
let bench = match bench_repo {
Some(bench_repo) => BenchMethod::Repo {
hash: bench_repo.0.to_thread_local().head_id()?.to_string(),
},
None => BenchMethod::Internal,
};
Some(Work {
hash: hash.to_string(),
bench,
})
} else {
None
})
}
async fn post_status(
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
@ -50,40 +149,29 @@ async fn post_status(
.fetch_all(&db)
.await?;
let mut guard = workers.lock().unwrap();
guard.clean();
if !guard.verify(&name, &request.secret) {
return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response());
}
guard.update(
name.clone(),
WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status),
);
let work = match request.request_work {
true => guard.find_free_work(&queue),
false => None,
};
let abort_work = guard.should_abort_work(&name);
drop(guard);
// TODO Insert finished work into DB
// Find new work
let work = if let Some(hash) = work {
let bench = match bench_repo {
Some(bench_repo) => BenchMethod::Repo {
hash: bench_repo.0.to_thread_local().head_id()?.to_string(),
},
None => BenchMethod::Internal,
let (work, abort_work) = {
let mut guard = workers.lock().unwrap();
guard.clean();
if !guard.verify(&name, &request.secret) {
return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response());
}
guard.update(
name.clone(),
WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status),
);
let work = match request.request_work {
true => guard.find_free_work(&queue),
false => None,
};
Some(Work {
hash: hash.to_string(),
bench,
})
} else {
None
let abort_work = guard.should_abort_work(&name);
(work, abort_work)
};
if let Some(run) = request.submit_work {
save_work(run, db).await?;
}
let work = prepare_work(work, bench_repo)?;
debug!("Received status update from {name}");
Ok(Json(ServerResponse { work, abort_work }).into_response())
}

View file

@ -10,7 +10,7 @@ fn is_false(b: &bool) -> bool {
!b
}
#[derive(Clone, Serialize_repr, Deserialize_repr)]
#[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)]
#[repr(i8)]
pub enum Direction {
LessIsBetter = -1,
@ -29,7 +29,7 @@ pub struct Measurement {
pub direction: Option<Direction>,
}
#[derive(Clone, Serialize_repr, Deserialize_repr)]
#[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)]
#[repr(u8)]
pub enum Source {
// Stdin would be fd 0