收发消息
建立好Mojo连接之后,就可以通过MessagePipe收发数据了。MessagePipe是双向传输的,既可以写入数据,也可以读取数据。
server代码:
class ReadThread : public base::SimpleThread {
public:
explicit ReadThread(mojo::MessagePipeHandle pipe_handle)
: base::SimpleThread("read"), pipe_handle_(pipe_handle) {}
~ReadThread() override {}
void Run() override {
std::string s;
do {
mojo::Wait(pipe_handle_, MOJO_HANDLE_SIGNAL_READABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, nullptr);
std::vector<uint8_t> bytes;
ReadMessageRaw(pipe_handle_, &bytes, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE);
s = std::string(bytes.begin(), bytes.end());
std::cout << "client=>server:" << s << endl;
} while (s != "quit");
exit(0);
}
private:
mojo::MessagePipeHandle pipe_handle_;
};
int main(int argc, char* argv[]) {
base::AtExitManager exit_manager;
base::CommandLine::Init(0, 0);
logging::LoggingSettings settings;
settings.logging_dest = logging::LOG_TO_ALL;
settings.delete_old = logging::DELETE_OLD_LOG_FILE;
settings.log_file_path = L"mojo.log";
logging::InitLogging(settings);
base::MessageLoop loop;
mojo::core::Init();
base::Thread ipc_thread("ipc");
ipc_thread.StartWithOptions(
base::Thread::Options(base::MessagePumpType::IO, 0));
mojo::core::ScopedIPCSupport ipc_support(
ipc_thread.task_runner(),
mojo::core::ScopedIPCSupport::ShutdownPolicy::CLEAN);
mojo::NamedPlatformChannel::Options options;
options.server_name = L"test";
mojo::NamedPlatformChannel named_channel(options);
mojo::ScopedMessagePipeHandle pipe = mojo::OutgoingInvitation::SendIsolated(
named_channel.TakeServerEndpoint(), nullptr);
ReadThread read_thread(pipe.get());
read_thread.Start();
std::string message;
while (std::getline(std::cin, message)) {
mojo::WriteMessageRaw(pipe.get(), message.c_str(),
static_cast<uint32_t>(message.size()), nullptr, 0,
MOJO_WRITE_MESSAGE_FLAG_NONE);
if (message == std::string("quit")) {
exit(0);
}
}
base::RunLoop().Run();
return 0;
}
client代码:
class ReadThread : public base::SimpleThread {
public:
explicit ReadThread(mojo::MessagePipeHandle pipe_handle)
: base::SimpleThread("read"), pipe_handle_(pipe_handle) {}
~ReadThread() override {}
void Run() override {
std::string s;
do {
mojo::Wait(pipe_handle_, MOJO_HANDLE_SIGNAL_READABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, nullptr);
std::vector<uint8_t> bytes;
ReadMessageRaw(pipe_handle_, &bytes, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE);
s = std::string(bytes.begin(), bytes.end());
std::cout << "server=>client:" << s << endl;
} while (s != "quit");
exit(0);
}
private:
mojo::MessagePipeHandle pipe_handle_;
};
int main(int argc, char* argv[]) {
base::AtExitManager exit_manager;
base::CommandLine::Init(0, 0);
logging::LoggingSettings settings;
settings.logging_dest = logging::LOG_TO_ALL;
settings.delete_old = logging::DELETE_OLD_LOG_FILE;
settings.log_file_path = L"mojo.log";
logging::InitLogging(settings);
base::MessageLoop loop;
mojo::core::Init();
base::Thread ipc_thread("ipc");
ipc_thread.StartWithOptions(
base::Thread::Options(base::MessagePumpType::IO, 0));
mojo::core::ScopedIPCSupport ipc_support(
ipc_thread.task_runner(),
mojo::core::ScopedIPCSupport::ShutdownPolicy::CLEAN);
mojo::PlatformChannelEndpoint endpoint =
mojo::NamedPlatformChannel::ConnectToServer(L"test");
mojo::ScopedMessagePipeHandle pipe =
mojo::IncomingInvitation::AcceptIsolated(std::move(endpoint));
ReadThread read_thread(pipe.get());
read_thread.Start();
std::string message;
while (std::getline(std::cin, message)) {
mojo::WriteMessageRaw(pipe.get(), message.c_str(),
static_cast<uint32_t>(message.size()), nullptr, 0,
MOJO_WRITE_MESSAGE_FLAG_NONE);
if (message == std::string("quit")) {
exit(0);
}
}
base::RunLoop().Run();
return 0;
}
内部实现

Mojo中又很多Pipe类型,比如Message Pipe、Data Pipes、Invitation,他们在Mojo内部都是一种数据类型的dispatche。Mojo不仅能传递一般的数据,还可以传递一些与操作系统相关的句柄,比如文件描述符,内核对象等等。
Pipe都是独立虚拟的消息收发队列,真正实现进程间通信的是Channel对象。Pipe实际上是一个随机数对,IPC双方根据随机数id去查询对应的消息队列读写数据。所以说在Mojo里面创建Pipe是非常廉价的。

Mojo里面发送数据,首先创建一种内部的消息格式,然后把真正要传输的数据附加上去,放到对应的pipe队列里,然后通过dispatche去投递消息,最终到达Channel写入数据。
所有的消息读取和写入通过重叠IO异步在IO线程里操作,包括异步的IPC建立。