Hyper, Tokio и ожидание завершения потока


#1

Здравствуйте.
Продолжаю пытаться для обучения сделать OAuth авторизацию гугла. Сначала код

enum RequestFinished {Ready, NotReady}

impl Future for RequestFinished {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        match self {
            RequestFinished::Ready => Ok(Ready(())),
            RequestFinished::NotReady => Ok(NotReady)
        }
    }
}

fn retrieve_auth() {
    let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
    let (sender, receiver) = futures::sync::oneshot::channel::<()>();
    let sender = RefCell::new(Some(sender));
    let waiter: &mut RefCell<RequestFinished> = &mut RefCell::new(RequestFinished::NotReady);
    let proc = move || {
        let res = service_fn_ok(got_response);
        sender.borrow_mut().take().unwrap().send(());
        waiter.replace(RequestFinished::Ready); <--------------------
        res
    };

    let server = Server::bind(&addr)
        .serve(proc)
        .with_graceful_shutdown(receiver)
        .map_err(|e| eprintln!("server error: {}", e));

    hyper::rt::run(server);

    let uri = Url::parse_with_params(GOOGLE_AUTH_URL, [
        ("client_id", CLIENT_ID),
        ("redirect_uri", "http://[::1]:3000"),
        ("response_type", "code"),
        ("scope", "https://www.googleapis.com/auth/drive")
    ].iter());

    let https = HttpsConnector::new(1).unwrap();
    let res = Client::new()
        .get(uri.unwrap().as_str().parse::<Uri>().unwrap());

    waiter.into_inner().wait(); <--------------------
}

Сейчас он не компилируются по причине двойного заимствования waiter. И у меня никаких идей как это исправить. Ведь, по сути, нужна одна и та же Future в 2-х разных потока, ещё и во втором потке в качестве значения!
Как вариант, я думал использовать каналы, по аналогии, как сделано с sender-ом. Но для этого нужно создавать ещё один поток в Tokio - слишком тяжело, по-моему.

Подскажите, как проще реализовать паузу до возврата значения из потока?
Спасибо.


#2

Вам действительно нужно использовать ещё один канал, чтобы оповестить одну нить выполнения о завершении второй. При этом не требуется отдельный поток, всё может работать в рамках вообще одного потока. futures::sync::oneshot::channel не блокирует потоки на манер обычных каналов.

То, что Вы пытаетесь сделать, это оповестить одну нить выполнения (где отправляете запрос) о завершении другой нити выполнения (где получаете ответ). В данный момент Вы пытаетесь сделать это с помощью одной и той же кастомной Future ожидая её резолва в одной нити выполнения и меняя в другой. По сути, Вы пытаетесь повторить логику futures::sync::oneshot::channel (он резолвит receiver, когда выполняется sender.send()). Проще переиспользовать уже существующую абстракцию. Либо же, если действительно нужна более умная кастомная футура, то, как правило, делается обёртка над всё тем же каналом. Общий принцип: две отдельные нити выполнения связываются каналом.

Также, учтите, что hyper::rt::run(server); блокирует текущий поток. Потому весь код после него у Вас просто не выполнится. Исправить это можно, понапихав футур в футуру, которая скармливается в run().

hyper::rt::run(server.join(future_where_request_is_done));

#3

Спасибо за подробный ответ, буду пробовать.

Нитью, я так понимаю, называется отдельный поток выполнения в рамках одного “физического” потока ОС? Ну, аналог горутины?


#4

Прочитай https://docs.rs/futures/0.1.26/futures/sync/oneshot/fn.channel.html

Этот прием и после переделки на oneshot не будет работать, потому что client.get возвращает Future, которая должна быть выполнена на рантайме. Как я понимаю, ты хочешь в одном коде и сервер поднять, и клиентом дернуть запрос. Для этого есть несколько решений:

// в этом примере ошибка, т.к. hyper::rt::spawn можно выполнить только внутри футуры, которая выполняется внутри hyper::rt::run

let server = Server::bind... // все как у тебя и было, но без waiter
hyper::rt::spawn(server); // крутить сервер в отдельной треде
let (tx, rx) = oneshot::channel();
let get = Client::new().get...
    .and_then(|r| tx.send(r)); // засовываем в oneshot результат гета
hyper::rt::run(get); // тем не менее футуру надо выполнить на рантайме
// run - это блокирующая операция, т.е. пока все футуры не выполнятся, ты дальше не пройдешь
let res = rx.wait(); // а вот тут ты достаешь результат из oneshot channel
// и дальше можешь с этим работать

Если тебе жалко выделять аж целый тред под сервер, можно воспользоваться комбинаторами select или join, как это было сделано у нас: https://github.com/tox-rs/tox/blob/master/src/toxcore/tcp/handshake/mod.rs#L309

let server = Server::bind...
let (tx, rx) = oneshot::channel::<i32>();
let client = Client::new().get...
    .and_then(|r| tx.send(r));
let both = client.select(server).map... map_err; // или клиент удачно выполнится, или сервер
hyper::rt::run(both);// на одном рантайме поднимаем и клиент и сервер
let res = rs.wait();

Но не стоит забывать, что в данном варианте у тебя может быть ошибка, т.к. твой поток выполнения не всегда попадает в tx.send®, и ты будешь бесконечно ждать. для этого надо во всех вариантах завершений правильно в tx положить значение, там может лежать как Ok(res), так и Err(e) из .map_err

  1. Забить на oneshot и воспользоваться комбинаторным адом:
let server = Server::bind...
let (tx, rx) = oneshot::channel::<i32>();
let client = Client::new().get...
    .and_then(|r| do what ever you want with the result);
let both = client.select(server).map... map_err;
hyper::rt::run(both);

Ну и так далее.

  1. Можно извратиться и запустить это вообще в одном потоке, если ты жаден до ресурсов

Для этого надо вместо hyper::rt::run использовать tokio::current_thread::Runtime https://docs.rs/tokio/0.1/tokio/runtime/current_thread/index.html , который поднимает рантайм на текущем треде, т.е. у тебя не будут подняты доп. треды


#5

Спасибо!
Только у меня вот этот код

hyper::rt::spawn(server);

почему-то падает. Ругается, что нет дефолтного рантайма или что-то такое.


#6

А, блин. Точно. hyper::rt::spawn можно выполнить только внутри футуры, которая выполняется внутри hyper::rt::run


#7

Ага. Кое-что стало понятнее. Где-то видел такой вариант

hyper::rt::run(|| {
...
    hyper::rt::spawn(...);
...
    hyper::rt::spawn(...)
});

Вроде, должно получиться.


#8

Да, что-то типа того.


#9

Концептуально - да. Я имел в виду именно логическую нить выполнения (цепочку футур).