-
Notifications
You must be signed in to change notification settings - Fork 566
/
Copy pathjobserver.rs
146 lines (133 loc) · 5.38 KB
/
jobserver.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::io;
use std::process::Command;
use std::sync::Arc;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::StreamExt;
use crate::errors::*;
// The execution model of sccache is that on the first run it spawns a server
// in the background and detaches it.
// When normally executing the rust compiler from either cargo or make, it
// will use cargo/make's jobserver and limit its resource usage accordingly.
// When executing the rust compiler through the sccache server, that jobserver
// is not available, and spawning as many rustc as there are CPUs can lead to
// a quadratic use of the CPU resources (each rustc spawning as many threads
// as there are CPUs).
// One way around this issue is to inherit the jobserver from cargo or make
// when the sccache server is spawned, but that means that in some cases, the
// cargo or make process can't terminate until the sccache server terminates
// after its idle timeout (which also never happens if SCCACHE_IDLE_TIMEOUT=0).
// Also, if the sccache server ends up shared between multiple runs of
// cargo/make, then which jobserver is used doesn't make sense anymore.
// Ideally, the sccache client would give a handle to the jobserver it has
// access to, so that the rust compiler would "just" use the jobserver it
// would have used if it had run without sccache, but that adds some extra
// complexity, and requires to use Unix domain sockets.
// What we do instead is to arbitrary use our own jobserver.
// Unfortunately, that doesn't absolve us from having to deal with the original
// jobserver, because make may give us file descriptors to its pipes, and the
// simple fact of keeping them open can block it.
// So if it does give us those file descriptors, close the preemptively.
//
// unsafe because it can use the wrong fds.
#[cfg(not(windows))]
pub unsafe fn discard_inherited_jobserver() {
if let Some(value) = ["CARGO_MAKEFLAGS", "MAKEFLAGS", "MFLAGS"]
.into_iter()
.find_map(|env| std::env::var(env).ok())
{
if let Some(auth) = value.rsplit(' ').find_map(|arg| {
arg.strip_prefix("--jobserver-auth=")
.or_else(|| arg.strip_prefix("--jobserver-fds="))
}) {
if !auth.starts_with("fifo:") {
let mut parts = auth.splitn(2, ',');
let read = parts.next().unwrap();
let write = match parts.next() {
Some(w) => w,
None => return,
};
let read = read.parse().unwrap();
let write = write.parse().unwrap();
if read < 0 || write < 0 {
return;
}
unsafe {
if libc::fcntl(read, libc::F_GETFD) == -1 {
return;
}
if libc::fcntl(write, libc::F_GETFD) == -1 {
return;
}
libc::close(read);
libc::close(write);
}
}
}
}
}
#[derive(Clone)]
pub struct Client {
helper: Option<Arc<jobserver::HelperThread>>,
tx: Option<mpsc::UnboundedSender<oneshot::Sender<io::Result<jobserver::Acquired>>>>,
inner: jobserver::Client,
}
pub struct Acquired {
_token: Option<jobserver::Acquired>,
}
impl Client {
pub fn new() -> Client {
Client::new_num(crate::util::num_cpus())
}
pub fn new_num(num: usize) -> Client {
let inner = jobserver::Client::new(num).expect("failed to create jobserver");
Client::_new(inner, false)
}
fn _new(inner: jobserver::Client, inherited: bool) -> Client {
let (helper, tx) = if inherited {
(None, None)
} else {
let (tx, mut rx) = mpsc::unbounded::<oneshot::Sender<_>>();
let helper = inner
.clone()
.into_helper_thread(move |token| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {
if let Some(sender) = rx.next().await {
drop(sender.send(token));
}
});
})
.expect("failed to spawn helper thread");
(Some(Arc::new(helper)), Some(tx))
};
Client { inner, helper, tx }
}
/// Configures this jobserver to be inherited by the specified command
pub fn configure(&self, cmd: &mut Command) {
self.inner.configure(cmd)
}
/// Returns a future that represents an acquired jobserver token.
///
/// This should be invoked before any "work" is spawned (for whatever the
/// definition of "work" is) to ensure that the system is properly
/// rate-limiting itself.
pub async fn acquire(&self) -> Result<Acquired> {
let (helper, tx) = match (self.helper.as_ref(), self.tx.as_ref()) {
(Some(a), Some(b)) => (a, b),
_ => return Ok(Acquired { _token: None }),
};
let (mytx, myrx) = oneshot::channel();
helper.request_token();
tx.unbounded_send(mytx).unwrap();
let acquired = myrx
.await
.context("jobserver helper panicked")?
.context("failed to acquire jobserver token")?;
Ok(Acquired {
_token: Some(acquired),
})
}
}