ceph monitor paxos算法

对于分布式来说最重要的莫过于所有副本数据的一致性。

在monitor节点中,存在着Leader和Peon两种角色。当客户端发出读命令时可以由相应的Peon或者Leader返回。一旦发生修改动作,所有的消息会第一时间发送给Leader节点,然后由Leader节点分发给Peon节点。

paxos算法保证了一次修改操作只能批准一个值,从而保证了分布式系统副本的一致性。

多个节点之间存在两种通讯模型:共享内存(Shared memory)、消息传递(Messages passing),Paxos是基于消息传递的通讯模型的。

paxos转换时机

1.monitor启动时,paxos初始化;

2.monitor进入bootstrap,判断是否需要同步数据,然后触发选举;

3.monitor根据选举结果,paxos对应初始化为leader或peon;

4.monitor进入recovery状态(monitor异常时也会触发);

5.monitor运行过程中,paxos决议;

paxos的同步数据过程

每次monitor server启动时都会按照monmap中的服务器地址去连接其他monitor服务器,看下是否需要同步数据。这个过程叫做bootstrap(). bootstrap的第一个目的是补全数据,从其他服务拉缺失的paxos log或者全量复制数据库,其次是在必要时形成多数派建立一个paxos集群或者加入到已有的多数派中。

选主

victory

paxos的recovery过程

目的是就PN达成一致,包括三个步骤,如下:

在Leader选举成功后,Leader和Peon都进入Recovery阶段。该阶段的目的是为了保证新Quorum的所有成员状态一致,这些状态包括:最后一个批准(Committed)的提案最后一个没批准的提案最后一个接受(Acceppted)的提案。每个节点的这些状态都持久化到磁盘。对旧Quorum的所有成员来说,最后一个通过的提案应该都是相同的,但对不属于旧Quorum的成员来说,它的最后一个通过的提案是落后的。

同步已批准提案的方法是,Leader先向新Quorum的所有Peon节点发送OP_COLLECT消息,并在消息中携带Leader自己的第1个和最后1个批准的提案的值的版本号。Peon收到OP_COLLECT消息后,将自己的第1个和最后1个批准的提案的值的版本号返回给Leader,并且如果Peon的最后1个批准的版本号大于Leader最后一个批准的版本号时,将所有在大于Leader最后一个版本号的提案值发送给Leader。Leader将根据这些信息填补自己错过的提案。这样,当Leader接收到所有Peon对OP_COLLECT消息的回应后,也就将自己更新到了最新的状态。这时Leader又反过来将最新状态同步到其它节点。

为获取新Quorum所有成员中的最大提案号,Leader在发送OP_COLLECT消息时,提出它知道的最大的提案号,并将该提案号附加在OP_COLLECT消息中。如果Peon已接受的最大提案号大于Leader提出的提案号,则拒绝接受Leader提出的提案号并将自己已接受的最大提案号通过OP_LAST消息发送给Leader。Leader收到OP_LAST消息后,发现自己的提案号不是最大时,就在Peon接受的最大提案号的基础上提出更大的提案号,重新进入Recovery阶段。这样,最终可以获取到最大的提案号。

总而言之,Recovery阶段的目的是让新Quorum中所有节点处于一致状态。实现这一目的的方法分成两步: 首先,在Leader节点收集所有节点的状态,通过比较得到最新状态; 然后,Leader将最新状态同步给其它节点。有两个比较重要的状态,最后一次批准的提案和已接受的最大提案号。

注意 区分提案号(proposal number)提案值(value)提案值的版本号(value version) 这三个概念。提案号由Leader提出,为避免不同Leader提出的提案号不冲突,同个Leader提出的提案号是不连续的。提案的值的版本号是连续的。

paxos的propose过程

即正常工作过程中的Propose、accept和commit过程。

从begin()到commit_finish()称为一轮,即一次提议的完成过程。

串行化提议:

Ceph的paxos实现,每次只允许一个提议在执行中,即上面提到 的一轮完成才能执行下一轮。在这个过程中,会有多次写盘操作。

这个过程实际上比较慢。对于ceph自身来说,osd等的状态变更, 不频繁,无需极高的paxos性能。 但是如果用于做用于分布式数据库等系统的日志,这种方式则有不足。

何时需要发起提案Proposal

Paxos的Trigger点总是要发起提案,那么ceph中需要发起提案的地方,大抵有以下三种:

PaxosService更新log为例

客户端发来修改请求到底是个什么过程呢?

其他需要monitor协作的模块构建与mon通讯的对象,然后发送消息给mon: monc->send_mon_message(m);

monitor收到消息后,进入Monitor::handle_forward()

Monitor::handle_forward() -->_ms_dispatch-->dispatch_op-->PaxosService.cc:dispatch-->propose_pending-->paxos->trigger_propose();-->propose_pending-->begin(bl);

发起提案

begin
  // note which pn this pending value is for.
  设置pending_pn以及pending_v
  t->put(get_name(), "pending_v", last_committed + 1);
  t->put(get_name(), "pending_pn", accepted_pn);
  ...
  //将transaction写入rocksdb
  get_store()->apply_transaction(t);
  ....
  //设置last_committed等
  begin->values[last_committed+1] = new_value;
  begin->last_committed = last_committed;
  begin->pn = accepted_pn;
  ...
  //通知其他monitor决议
  mon->send_mon_message(begin, *p);

提案的内容可以利用ceph-kvstore-tool从rocksdb中看到

ceph-kvstore-tool rocksdb /var/lib/ceph/mon/ceph-3/store.db/ get paxos 456221 out /etc/ceph/paxos_456221

然后用ceph-dencoder查看相关内容

ceph-dencoder type MonitorDBStore::Transaction import /etc/ceph/paxos_456221 dump_json

{
"transaction": {
"ops": [
{
"op_num": 0,
"type": "PUT",
"prefix": "logm",
"key": "full_292111",
"length": 24085
},
{
"op_num": 1,
"type": "PUT",
"prefix": "logm",
"key": "full_latest",
"length": 8
},
{
"op_num": 2,
"type": "PUT",
"prefix": "logm",
"key": "292112",
"length": 724
},
{
"op_num": 3,
"type": "PUT",
"prefix": "logm",
"key": "last_committed",
"length": 8
}
],
"num_keys": 4,
"num_bytes": 25001
}
}

peon决议之后会将返回给leader

// peon
void Paxos::handle_begin(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_begin");
  MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
  dout(10) << "handle_begin " << *begin << dendl;

  // can we accept this?
  if (begin->pn < accepted_pn) {
    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
    op->mark_paxos_event("have higher pn, ignore");
    return;
  }
  ...
  t->put(get_name(), v, begin->values[v]);

  // note which pn this pending value is for.
  t->put(get_name(), "pending_v", v);
  t->put(get_name(), "pending_pn", accepted_pn);

  ...
  auto start = ceph::coarse_mono_clock::now();
  get_store()->apply_transaction(t);
  auto end = ceph::coarse_mono_clock::now();
  ...
  // reply 接受之后返回通知leader
  MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
				    ceph_clock_now());
  accept->pn = accepted_pn;
  accept->last_committed = last_committed;
  begin->get_connection()->send_message(accept);
}

leader接收accept消息,执行commit

主接受accept消息处理
// leader
void Paxos::handle_accept(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_accept");
//提交commit
 if (accepted == mon->get_quorum()) {
    // yay, commit!
    dout(10) << " got majority, committing, done with update" << dendl;
    op->mark_paxos_event("commit_start");
    commit_start();
  }
commit_start中将lastcommit+1
 // commit locally
  t->put(get_name(), "last_committed", last_committed + 1);
   /*让事务生效,注意,此处是异步掉用*/
  get_store()->queue_transaction(t, new C_Committed(this));
 //此处事务的处理,是异步的,掉用了MonitorDBStore的queue_transaction函数。当事务完成之后,会掉用相关的回调函数。

leader执行回调

void queue_transaction(MonitorDBStore::TransactionRef t,
			 Context *oncommit) {
    io_work.queue(new C_DoTransaction(this, t, oncommit));
  }
执行回调C_Committed
struct C_Committed : public Context {
  Paxos *paxos;
  explicit C_Committed(Paxos *p) : paxos(p) {}
  void finish(int r) override {
    ceph_assert(r >= 0);
    std::lock_guard l(paxos->mon->lock);
    if (paxos->is_shutdown()) {
      paxos->abort_commit();
      return;
    }
    paxos->commit_finish();
  }
};
  //commit_finish中会处理last_committed++;以及通知其他mon,然后执行refresh
  // tell everyone
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;

    dout(10) << " sending commit to mon." << *p << dendl;
    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
				      ceph_clock_now());
    commit->values[last_committed] = new_value;
    commit->pn = accepted_pn;
    commit->last_committed = last_committed;

    mon->send_mon_message(commit, *p);
	...
	
	if (do_refresh()) {
    commit_proposal();
    if (mon->get_quorum().size() > 1) {
	  //延长租约,会发送mon->send_mon_message(lease, *p);到其他mon
      extend_lease();
    }

    ceph_assert(g_conf()->paxos_kill_at != 10);

    finish_round();
  }
  }

peon接收后存放数据,并执行refresh


void Paxos::handle_commit(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_commit");
  MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
  dout(10) << "handle_commit on " << commit->last_committed << dendl;

  logger->inc(l_paxos_commit);

  if (!mon->is_peon()) {
    dout(10) << "not a peon, dropping" << dendl;
    ceph_abort();
    return;
  }

  op->mark_paxos_event("store_state");
  store_state(commit);

  (void)do_refresh();
}

接受延长lease的处理
// peon
void Paxos::handle_lease(MonOpRequestRef op)
并且peon的状态从updating转变成active

do_refresh会调动各种paxos服务执行update_from_paxos

do_refresh--> mon->refresh_from_paxos-->svc->refresh(need_bootstrap)->update_from_paxos;svc->post_refresh();
svc即是具体的各种paxos服务

不同的实现类对update_from_paxos的处理不一样

void LogMonitor::update_from_paxos(bool *need_bootstrap)
{
  dout(10) << __func__ << dendl;
  version_t version = get_last_committed();
  dout(10) << __func__ << " version " << version
           << " summary v " << summary.version << dendl;
  if (version == summary.version)
    return;
  ceph_assert(version >= summary.version);

  map<string,bufferlist> channel_blog;

  version_t latest_full = get_version_latest_full();
  dout(10) << __func__ << " latest full " << latest_full << dendl;
  if ((latest_full > 0) && (latest_full > summary.version)) {
    bufferlist latest_bl;
    get_version_full(latest_full, latest_bl);
    ceph_assert(latest_bl.length() != 0);
    dout(7) << __func__ << " loading summary e" << latest_full << dendl;
    auto p = latest_bl.cbegin();
    decode(summary, p);
    dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
  }

  // walk through incrementals
  while (version > summary.version) {
    bufferlist bl;
    int err = get_version(summary.version+1, bl);
    ceph_assert(err == 0);
    ceph_assert(bl.length());

    auto p = bl.cbegin();
    __u8 v;
    decode(v, p);
    while (!p.end()) {

      //写入logm full
      summary.add(le);
    }

    summary.version++;
     //根据配置更新每个channel下的日志记录(删除old),默认配置为50条
    summary.prune(g_conf()->mon_log_max_summary);
  }

  dout(15) << __func__ << " logging for "
           << channel_blog.size() << " channels" << dendl;
  for(map<string,bufferlist>::iterator p = channel_blog.begin();
      p != channel_blog.end(); ++p) {
      ...
      //根据channel写入不同的文件(例如ceph-mon,ceph-audit.log等)
      int err = p->second.write_fd(fd);
  }

  check_subs();
}

ceph-kvstore-tool rocksdb /var/lib/ceph/mon/ceph-3/store.db/ get logm full_292111 out /etc/ceph/logm_full_292111

ceph-dencoder type LogSummary import/etc/ceph/logm_full_292111 dump_json

logm / full_292112 记录的logsummury的内容,这个里面有诸多的LogEntry,而这次提案中logm / 292112保存的是单个的log entity

{
    "version": 292112,
    "tail_by_channel": {
        "": {
            "1": {
                "name": "mon.ceph-1",
                "rank": "mon.0",
                "addrs": {
                    "addrvec": [
                        {
                            "type": "v2",
                            "addr": "10.0.204.93:3300",
                            "nonce": 0
                        },
                        {
                            "type": "v1",
                            "addr": "10.0.204.93:6789",
                            "nonce": 0
                        }
                    ]
                },
                "stamp": "2023-07-03 10:01:21.142025",
                "seq": 0,
                "channel": "",
                "priority": "[INF]",
                "message": "mkfs 1a031b5a-b249-41d2-b30b-4076d1c6c32d"
            }
        },
        "audit": {
            "40823867": {
                "name": "mon.ceph-3",
                "rank": "mon.2",
                "addrs": {
                    "addrvec": [
                        {
                            "type": "v2",
                            "addr": "10.0.204.94:3300",
                            "nonce": 0
                        },
                        {
                            "type": "v1",
                            "addr": "10.0.204.94:6789",
                            "nonce": 0
                        }
                    ]
                },
                "stamp": "2023-07-12 14:25:39.471472",
                "seq": 2,
                "channel": "audit",
                "priority": "[DBG]",
                "message": "from='client.? 10.0.204.94:0/4002593326' entity='client.admin' cmd=[{\"prefix\": \"health\"}]: dispatch"
            },

其中292112对应的值即是一个LogEntry

osdmap与此类似

osdmap / 1657 记录本次要变化的osdmap

osdmap / full_1657 记录的是全量的osdmap内容

update_from_paxos之后会根据osdmap / 1657 以及osdmap全量数据更新为osdmap / full_1657

参考(转载)文档:

ceph monitor paxos算法

Ceph Monitor实现

Table of Contents