Restructure shared types and db columns
Now, the server sends the runner pretty much all run metadata. This way, the reservation the server makes for the runner is accurate, providing the runner responds with the same metadata it was sent. It also means that only the server's system clock is relevant for tie breakers, and a run's duration spans from the moment it is reserved to the moment it is saved. Also, the bench method is now always called `bench_method` and a human-readable description is stored in the database for each run.
This commit is contained in:
parent
53be0338f2
commit
c7a89867a7
8 changed files with 121 additions and 98 deletions
|
|
@ -28,31 +28,39 @@ use crate::{
|
|||
somehow,
|
||||
};
|
||||
|
||||
async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> {
|
||||
async fn save_work(finished: FinishedRun, db: &SqlitePool) -> somehow::Result<()> {
|
||||
let mut tx = db.begin().await?;
|
||||
let conn = tx.acquire().await?;
|
||||
|
||||
let end = OffsetDateTime::now_utc();
|
||||
let bench_method = match finished.run.bench_method {
|
||||
BenchMethod::Internal => "internal".to_string(),
|
||||
BenchMethod::Repo { hash } => format!("bench repo, hash {hash}"),
|
||||
};
|
||||
|
||||
sqlx::query!(
|
||||
"\
|
||||
INSERT INTO runs ( \
|
||||
id, \
|
||||
hash, \
|
||||
bench_method, \
|
||||
start, \
|
||||
end, \
|
||||
exit_code \
|
||||
) \
|
||||
VALUES (?, ?, ?, ?, ?) \
|
||||
VALUES (?, ?, ?, ?, ?, ?) \
|
||||
",
|
||||
run.id,
|
||||
run.hash,
|
||||
run.start,
|
||||
run.end,
|
||||
run.exit_code,
|
||||
finished.run.id,
|
||||
finished.run.hash,
|
||||
bench_method,
|
||||
finished.run.start,
|
||||
end,
|
||||
finished.exit_code,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
for (name, measurement) in run.measurements {
|
||||
for (name, measurement) in finished.measurements {
|
||||
sqlx::query!(
|
||||
"\
|
||||
INSERT INTO run_measurements ( \
|
||||
|
|
@ -65,7 +73,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> {
|
|||
) \
|
||||
VALUES (?, ?, ?, ?, ?, ?) \
|
||||
",
|
||||
run.id,
|
||||
finished.run.id,
|
||||
name,
|
||||
measurement.value,
|
||||
measurement.stddev,
|
||||
|
|
@ -76,7 +84,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> {
|
|||
.await?;
|
||||
}
|
||||
|
||||
for (idx, (source, text)) in run.output.into_iter().enumerate() {
|
||||
for (idx, (source, text)) in finished.output.into_iter().enumerate() {
|
||||
// Hopefully we won't need more than 4294967296 output chunks per run :P
|
||||
let idx = idx as u32;
|
||||
sqlx::query!(
|
||||
|
|
@ -89,7 +97,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> {
|
|||
) \
|
||||
VALUES (?, ?, ?, ?) \
|
||||
",
|
||||
run.id,
|
||||
finished.run.id,
|
||||
idx,
|
||||
source,
|
||||
text,
|
||||
|
|
@ -99,7 +107,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> {
|
|||
}
|
||||
|
||||
// The thing has been done :D
|
||||
sqlx::query!("DELETE FROM queue WHERE hash = ?", run.hash)
|
||||
sqlx::query!("DELETE FROM queue WHERE hash = ?", finished.run.hash)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
|
|
@ -120,7 +128,7 @@ async fn post_status(
|
|||
Err(response) => return Ok(response),
|
||||
};
|
||||
|
||||
if let Some(run) = request.submit_work {
|
||||
if let Some(run) = request.submit_run {
|
||||
save_work(run, &db).await?;
|
||||
}
|
||||
|
||||
|
|
@ -153,8 +161,8 @@ async fn post_status(
|
|||
name.clone(),
|
||||
WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status),
|
||||
);
|
||||
let work = match request.request_work {
|
||||
true => guard.find_work(&name, &queue, bench_method),
|
||||
let work = match request.request_run {
|
||||
true => guard.find_and_reserve_run(&name, &queue, bench_method),
|
||||
false => None,
|
||||
};
|
||||
let abort_work = guard.should_abort_work(&name, &queue);
|
||||
|
|
@ -162,7 +170,11 @@ async fn post_status(
|
|||
};
|
||||
|
||||
debug!("Received status update from {name}");
|
||||
Ok(Json(ServerResponse { work, abort_work }).into_response())
|
||||
Ok(Json(ServerResponse {
|
||||
run: work,
|
||||
abort_run: abort_work,
|
||||
})
|
||||
.into_response())
|
||||
}
|
||||
|
||||
fn stream_response(repo: Arc<ThreadSafeRepository>, id: ObjectId) -> impl IntoResponse {
|
||||
|
|
|
|||
|
|
@ -64,12 +64,19 @@ async fn get_workers(
|
|||
let status = match &info.status {
|
||||
WorkerStatus::Idle => Status::Idle,
|
||||
WorkerStatus::Busy => Status::Busy,
|
||||
WorkerStatus::Working(run) => {
|
||||
let message =
|
||||
sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash)
|
||||
.fetch_one(db)
|
||||
.await?;
|
||||
Status::Working(RunLink::new(base, run.id.clone(), &run.hash, &message))
|
||||
WorkerStatus::Working(unfinished) => {
|
||||
let message = sqlx::query_scalar!(
|
||||
"SELECT message FROM commits WHERE hash = ?",
|
||||
unfinished.run.hash
|
||||
)
|
||||
.fetch_one(db)
|
||||
.await?;
|
||||
Status::Working(RunLink::new(
|
||||
base,
|
||||
unfinished.run.id.clone(),
|
||||
&unfinished.run.hash,
|
||||
&message,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -89,9 +96,9 @@ async fn get_queue(
|
|||
// Group workers by commit hash
|
||||
let mut workers_by_commit: HashMap<String, Vec<WorkerLink>> = HashMap::new();
|
||||
for (name, info) in workers {
|
||||
if let WorkerStatus::Working(run) = &info.status {
|
||||
if let WorkerStatus::Working(unfinished) = &info.status {
|
||||
workers_by_commit
|
||||
.entry(run.hash.clone())
|
||||
.entry(unfinished.run.hash.clone())
|
||||
.or_default()
|
||||
.push(WorkerLink::new(base, name.clone()));
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue