Chromium进程间通信Mojo 收发消息

收发消息

建立好Mojo连接之后,就可以通过MessagePipe收发数据了。MessagePipe是双向传输的,既可以写入数据,也可以读取数据。

server代码:

class ReadThread : public base::SimpleThread {
 public:
  explicit ReadThread(mojo::MessagePipeHandle pipe_handle)
      : base::SimpleThread("read"), pipe_handle_(pipe_handle) {}
  ~ReadThread() override {}

  void Run() override {
    std::string s;
    do {
      mojo::Wait(pipe_handle_, MOJO_HANDLE_SIGNAL_READABLE,
                 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, nullptr);
      std::vector<uint8_t> bytes;
      ReadMessageRaw(pipe_handle_, &bytes, nullptr,
                     MOJO_READ_MESSAGE_FLAG_NONE);

      s = std::string(bytes.begin(), bytes.end());
      std::cout << "client=>server:" << s << endl;
    } while (s != "quit");
    exit(0);
  }

 private:
  mojo::MessagePipeHandle pipe_handle_;
};

int main(int argc, char* argv[]) {
  base::AtExitManager exit_manager;
  base::CommandLine::Init(0, 0);
  logging::LoggingSettings settings;
  settings.logging_dest = logging::LOG_TO_ALL;
  settings.delete_old = logging::DELETE_OLD_LOG_FILE;
  settings.log_file_path = L"mojo.log";
  logging::InitLogging(settings);

  base::MessageLoop loop;

  mojo::core::Init();
  base::Thread ipc_thread("ipc");
  ipc_thread.StartWithOptions(
      base::Thread::Options(base::MessagePumpType::IO, 0));
  mojo::core::ScopedIPCSupport ipc_support(
      ipc_thread.task_runner(),
      mojo::core::ScopedIPCSupport::ShutdownPolicy::CLEAN);

  mojo::NamedPlatformChannel::Options options;
  options.server_name = L"test";
  mojo::NamedPlatformChannel named_channel(options);
  mojo::ScopedMessagePipeHandle pipe = mojo::OutgoingInvitation::SendIsolated(
      named_channel.TakeServerEndpoint(), nullptr);

  ReadThread read_thread(pipe.get());
  read_thread.Start();

  std::string message;
  while (std::getline(std::cin, message)) {
    mojo::WriteMessageRaw(pipe.get(), message.c_str(),
                          static_cast<uint32_t>(message.size()), nullptr, 0,
                          MOJO_WRITE_MESSAGE_FLAG_NONE);
    if (message == std::string("quit")) {
      exit(0);
    }
  }

  base::RunLoop().Run();

  return 0;
}

client代码:

class ReadThread : public base::SimpleThread {
 public:
  explicit ReadThread(mojo::MessagePipeHandle pipe_handle)
      : base::SimpleThread("read"), pipe_handle_(pipe_handle) {}
  ~ReadThread() override {}

  void Run() override {
    std::string s;
    do {
      mojo::Wait(pipe_handle_, MOJO_HANDLE_SIGNAL_READABLE,
                 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, nullptr);
      std::vector<uint8_t> bytes;
      ReadMessageRaw(pipe_handle_, &bytes, nullptr,
                     MOJO_READ_MESSAGE_FLAG_NONE);

      s = std::string(bytes.begin(), bytes.end());
      std::cout << "server=>client:" << s << endl;
    } while (s != "quit");
    exit(0);
  }

 private:
  mojo::MessagePipeHandle pipe_handle_;
};

int main(int argc, char* argv[]) {
  base::AtExitManager exit_manager;
  base::CommandLine::Init(0, 0);
  logging::LoggingSettings settings;
  settings.logging_dest = logging::LOG_TO_ALL;
  settings.delete_old = logging::DELETE_OLD_LOG_FILE;
  settings.log_file_path = L"mojo.log";
  logging::InitLogging(settings);

  base::MessageLoop loop;

  mojo::core::Init();
  base::Thread ipc_thread("ipc");
  ipc_thread.StartWithOptions(
      base::Thread::Options(base::MessagePumpType::IO, 0));
  mojo::core::ScopedIPCSupport ipc_support(
      ipc_thread.task_runner(),
      mojo::core::ScopedIPCSupport::ShutdownPolicy::CLEAN);

  mojo::PlatformChannelEndpoint endpoint =
      mojo::NamedPlatformChannel::ConnectToServer(L"test");
  mojo::ScopedMessagePipeHandle pipe =
      mojo::IncomingInvitation::AcceptIsolated(std::move(endpoint));

  ReadThread read_thread(pipe.get());
  read_thread.Start();

  std::string message;
  while (std::getline(std::cin, message)) {
    mojo::WriteMessageRaw(pipe.get(), message.c_str(),
                          static_cast<uint32_t>(message.size()), nullptr, 0,
                          MOJO_WRITE_MESSAGE_FLAG_NONE);
    if (message == std::string("quit")) {
      exit(0);
    }
  }

  base::RunLoop().Run();

  return 0;
}

内部实现

mojo类图

Mojo中又很多Pipe类型,比如Message Pipe、Data Pipes、Invitation,他们在Mojo内部都是一种数据类型的dispatche。Mojo不仅能传递一般的数据,还可以传递一些与操作系统相关的句柄,比如文件描述符,内核对象等等。

Pipe都是独立虚拟的消息收发队列,真正实现进程间通信的是Channel对象。Pipe实际上是一个随机数对,IPC双方根据随机数id去查询对应的消息队列读写数据。所以说在Mojo里面创建Pipe是非常廉价的。

mojo发送消息顺序图

Mojo里面发送数据,首先创建一种内部的消息格式,然后把真正要传输的数据附加上去,放到对应的pipe队列里,然后通过dispatche去投递消息,最终到达Channel写入数据。

所有的消息读取和写入通过重叠IO异步在IO线程里操作,包括异步的IPC建立。