Введение
Как то ранее я писал пробный сервис на Rust, но время идет язык изменяется и поэтому я решил повторить свой опыт, но в этот раз решил написать серверное приложение с уклоном в телеметрию.
Для экспериментального проекта я выбрал реализацию протокола wialon ips 1.1, а чтобы более детально углубиться в асинхронность выбрал библиотеку mio.
Работа с библиотекой mio
Mio это низкоуровенная библиотека для работы с сетевыми соединениями в неблокирующем режиме и основанная на событиях. Кроме того эта библиотека имеет минимальный оверхед над абстракциями ОС.
Кроме того данная библиотека используется под капотом в популярной библиотеке tokio.
Основным понятием в библиотеке является пул событий. Как ясно из названия в нем регистрируются входящие события, с которыми в дальнейшим и будет работать приложение, и сохраняет их в хранилище. У каждого события есть свой токен, и сетевое соединение.
Пул событий объявляется следующим образом:
use mio::{Events, Poll};
let mut poll = Poll::new()?;
Чтобы событие появилось в пуле его надо зарегистрировать с помощью функции register
, которая принимает в параметрах сетевое соединение, токен и тип события:
const SERVER: Token = Token(0);
let addr = "127.0.0.1:13265".parse()?;
let mut server = TcpListener::bind(addr)?;
poll.registry().register(&mut server, SERVER, Interest::READABLE)?;
После того как событие добавили в пул, необходимо этот пул соединить с хранилищем событий и ждать когда произойдет первое событие:
let mut events = Events::with_capacity(128);
loop {
poll.poll(&mut events, None)?;
}
Данная операция выполняется в цикле так как, если событий не происходит мы ждем их появления.
Для обработки события необходимо запустить цикл, с обработчиками для каждого события:
const CLIENT: Token = Token(1);
for event in events.iter() {
match event.token() {
SERVER => loop {
let (mut connection, address) = match server.accept() {
Ok((connection, address)) => (connection, address),
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => break,
_ => return Err(e),
}
};
poll.registry().register(&mut connection, CLIENT, Interest::READABLE | Interest::WRITABLE)?;
},
CLIENT => {
// событие для чтения
if event.is_readable() {
// обработка полученного пакета
}
// событие для для записи
if event.is_writable() {
// обработка полученного пакета
}
}
}
}
Приложение ждет когда появится входящее соединение и если оно успешно установлено создает для него новый токен и добавляет его в пул для ожидания от него пакетов, и как только наступает событие (пришел сетевой пакет) приложение начинает его обработку.
Модернизация приложения
В целом всего выше описанного хватит чтобы сделать просто приложение на mio. Но как только мы делаем что-то реальное возникают следующие вопросы:
- как обработать пул соединений с клиентским сессиями
- как сделать неблокирующую операцию сохранения или вывода на экран принятых данных
Обработка пула соединений с клиентским сессиями
Для того чтобы в приложении обработать пул соединений один из вариантов завести хэш-таблицу в которой ключом будет токен, а значение открытое соединение, и в момент открытия входящего соединения добавлять его в эту хэш таблицу:
let connections: HashMap<Token, TcpStream> = HashMap::new();
let token: Token = Token(SERVER.0 + 1);
...
for event in events.iter() {
match event.token() {
SERVER => loop {
let (mut connection, address) = match server.accept() {
Ok((connection, address)) => (connection, address),
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => break,
_ => return Err(e),
}
};
token += 1
connections.insert(token, connection)
poll.registry().register(&mut connection, token, Interest::READABLE)?;
},
token => {
// клиент с token отправил данные, чем инициировал событие с типом чтения
let connection = connections.get_mut(&token).unwrap();
if event.is_readable() {
// чтение данных из connection для клиента инициирующего событие
// их обработка
}
}
}
}
После такой модификации кода когда на сервер приходит входящее соединение оно регистрируется в пуле событий и добавляется в пул соединений.
Далее когда от клиента с token
приходит сообщение пул событий сообщает о наступлении данного события и после этого можно прочитать сообщение отправленное клиентом и обработать его.
Таким образом получился простой механизм сессий.
Неблокирующий вывод данных
Следующая проблема это неблокирующее сохранение данных или вывод их на экран. Так как если это делать прямо в месте обработки соединений, то необходимо будет либо делать syscall для сохранения на диск, либо ждать ответа по сети и тд. Все это может повлиять на производительность и чтобы такого не было лучше эту обработку проводить в отдельном потоке.
Для взаимодействия между потоками можно использовать стандарные механизм каналов.
Модификация кода будет следующая:
use std::sync::mpsc::{sync_channel};
struct Packet{}
let (sender, receiver) = sync_channel::<Packet>(1024);
thread::spawn(move || {
loop {
let p = receiver.recv().unwrap();
println!("{}", p); // или сохранение на диск или в бд
}
});
...
for event in events.iter() {
match event.token() {
...
token => {
let connection = connections.get_mut(&token).unwrap();
if event.is_readable() {
let p = Packet{}
// получаем данные из connection и преобразуем их в Packet
sender.send(p)
}
}
}
}
В данном коде вы запустили отдельный поток, который отвечает только за обработку принятых данных, а объявленный ранее цикл событий обеспечивает ему их поставку.
Мои впечатления от Rust
Из плюсов языка для себя строгость языка в целом, а также интересные концепции языка типа borrow checker, который проверяет что нет ссылок на несуществующие объекты. Также понравилась идея что у данных может быть только одна мутабельная ссылка (если вдруг надо использовать несколько мутабельных ссылок есть модуль std:cell но надо быть аккуратным так как это не потоко-безопасно).
Из минусов для себя я ответил что качественный многопоточный код писать труднее чем на Go, хотя он и получается безопасней, но для этого надо понимать основные концепции языка такие как владение и время жизни.
Про generics и gc уже много писали в интернетах, поэтому описывать это не вижу смысла.
Заключение
В данной статья я описал в целом концепцию работы приложения, исходники которого можно найти на github, так как описывать весь код целиком не имеет большого смысла. Также в статье не приведено структурирование кода и объединения его в структуры, так как оформление тоже можно посмотреть на github.