All of lore.kernel.org
 help / color / mirror / Atom feed
* [Lustre-devel] global epochs [an alternative proposal, long and dry].
@ 2008-12-22  7:53 Nikita Danilov
  2008-12-22 11:52 ` Alex Zhuravlev
  2009-01-15 23:40 ` [Lustre-devel] global epochs vs fsync Alex Zhuravlev
  0 siblings, 2 replies; 25+ messages in thread
From: Nikita Danilov @ 2008-12-22  7:53 UTC (permalink / raw)
  To: lustre-devel

Hello,

a few proposals for a distributed recovery for the upcoming CMD release
of Lustre were discussed recently. In my opinion, many of them
(including the clients-epoch approach that I advocated) are very
complex, and a simpler solution, that can be fully understood is
needed. The following is an attempt to provide such a solution.

Nikita.

								 * * *

This is a strawman proposal. At least it would help us to settle the
terminology.

The following describes an alternative distributed recovery mechanism. As this
proposal is somewhat radically alternative, exposition is rather abstract,
please bear with it.

The summary is that the original `global epochs' [10] proposal is modified to
involve all cluster nodes, including clients. This seeks to fix what is seen
as a major drawback of the said algorithm: its reliance on a master-slave
processing.

Definitions:
============

A _cluster_ consists of _nodes_. Every node has a volatile storage. Some nodes
have persistent storage. Persistent means `surviving any failure considered in
the model'.

Nodes exchange _messages_. A message X with a parameter Y, sent from a node N
to a node M is denoted as

        X(Y) : N -> M

Synchronous message send is denoted as

        X(Y) : N ->> M

It is, in reality, a sequence

        X(Y) : N -> M
        RepX : M -> N

of a send and a reply.

Nodes _join_ the cluster, and _part_ from the cluster. A node can be forcibly
parted from the cluster---_evicted_.

An _operation_ is a `primitive' distributed modification of state, that moves
distributed state from one consistent state to another consistent
state. `Primitive' because without such a qualification a valid sequence of
operations would be an operation itself.

An operation OP consists of _updates_ of a state of every node involved in
this operation: OP = (U(0), ... U(n)), where U(k) is an update for a node
U(k).node.

A _reintegration_ of an operation is a process by which a node (by sending
messages) requests other nodes to _execute_ updates of a given operation,
i.e., to effect corresponding state change in the node storage (volatile or
persistent). Details of reintegration are described below.

A node with a persistent storage supports _transactions_, which are means to
declare that a sequence of updates, executed in a volatile storage, must reach
persistent storage atomically.

Two updates are _conflicting_ if their results (including success or failure
indication) and the final state are depending on the order of their
execution.

For a given update U, a node N can send a message to U.node, requesting a
_lock_ that will delay requests for locks for conflicting updates requested
from other nodes until the lock is either released by another message or when
N leaves the cluster. (In reality locks are taken on objects, but introducing
them would complicate the exposition.)

Epoch Basics:
=============

The core epochs algorithm is very simple.

Every node N keeps in its volatile storage an _epoch number_, denoted
N.epoch. Every message X is tagged with an epoch number that is denoted as
X.epoch. These epoch numbers are maintained according to the following
protocol:

        E1. On receiving X : M -> N, N sets

                N.epoch = max(N.epoch, X.epoch);

        E2. On sending X : N -> M, N sets

                X.epoch = N.epoch;

Assignments in E1 and E2 must be mutually atomic. Compare this with `Lamport
timestamps' [1] and `vector clocks' [2].

Progressing toward new epochs will be described later, for now assume that
there are multiple epoch numbers at the same time stored in the node memories
and traversing the network in messages.

Operations:
===========

        O1. To reintegrate an operation OP = (U(0), ... U(n)), a node N

                - sends lock requests: LOCK(U(k)) : N ->> U(k).node;

                - sends reintegration messages: REINT(U(k)) : N -> U(k).node
                  atomically w.r.t. E1.

                - adds U to the volatile `redo log'.

O1 doesn't require all LOCK messages to be synchronous and serialized: it's
only necessary that replies to all LOCK messages are received before first
REINT message is sent.

We denote REINT(U).epoch as U.epoch (well-defined), and say that update U `is
in the epoch U.epoch', and that corresponding undo record (see O2) is a record
`in epoch U.epoch'.

        O2. On receiving REINT(U) : M -> N (where N == U.node), 
            node N transactionally

                - executes U in the volatile storage, and

                - adds to the `undo log' a record [U, OP]

            Note that U.epoch can be less than N.epoch at the time of
            execution (it cannot be greater than the latter due to E1).

We consider only single-level reintegration, where execution of an update
requires no further reintegrations. Generalization to the multi-level case is
left as an exercise for a curious reader.

Correctness:
============

We can now prove a number of very simple statements:

S0: For a node N, N.epoch increases monotonically in time.  

    Proof: The only place where N.epoch is modified is E1, and this is
    obviously a non-decreasing function.

S1: A collection of all updates in a given epoch is presicely a collection of
updates for some set of operations (i.e., epoch contains no partial
operations).

    Proof: Obvious from O1: all updates for a given operation are sent in the
    same epoch.

S2: For any sequence of conflicting updates (U{0}, ... U{n}), the sequence
(U{0}.epoch, ..., U{n}.epoch) is monotonically increasing.

    Proof: Consider conflicting updates U{k} and U{k+1}. From O1 and the
    definition of locking it is immediately clear that the following sequence
    of message sends took place:

            LOCK(U{k})      : N -> S   ; request a lock for U{k}
            RepLOCK(U{k})   : S -> N   ; get the lock for U{k}

        (*) REINT(U{k})     : N -> S   ; reintegrate U{k}

            LOCK(U{k+1})    : M -> S   ; conflicting lock is requested by M

        (*) UNLOCK(U{k})    : N -> S   ; N yields the lock

        (*) RepLOCK(U{k+1}) : S -> M   ; M get the lock

        (*) REINT(U{k+1})   : M -> S   ; reintegrate U{k+1}

     Only ordering of messages marked with (*) matters, the rest is just for
     completeness. Then

     U{k}.epoch == REINT(U{k}).epoch       ; by definition
                <= UNLOCK(U{k}).epoch      ; by S0 for N and E2
                <= RepLOCK(U{k+1})         ; by S0 for S and E2
                <= REINT(U{k+1})           ; by S0 for M
                == U{k+1}.epoch            ; by definition

In the essence, S2 states that epoch ordering is compatible with the causal
ordering of updates. An important consequence of this is that an epoch cannot
`depend' on a previous epoch. Note that the proof of S2 is very similar to the
proof of serializability [7] of the database schedules under the two-phase
locking (2PL) protocol [3].

From S0, S1 and S2 it seems very plausible to conclude that

S3: For any epoch E, a collection of updates in all epochs up to and including
E is presicely a collection of updates in some prefix of execution
history. That is, for every node N, said collection contains updates from all
operations reintegrated by N before some moment T in N's physical time, and no
updates from operations reintegrated by N after T. Alternatively, `an epoch
boundary is a consistent state snapshot'.

We won't prove S3, as this requires formalizing the notions of global and
local histories, distributed schedules, etc., which is more formalism than is
tolerable at the moment.

Intermezzo:
===========

S3 is the main weapon in achieving correct distributed recovery: it claims
that restoring the distributed state as of on an epoch boundary results in a
globally consistent state. The key observation is that due to O2 every node
with a persistent storage has enough information to individually restore its
state to the boundary of _any_ epoch, all updates from which it has on its
persistent storage, even in the face of failures. Once all such nodes agreed
on a common epoch number, they restore their state independently. It is this
agreeing on a single number instead of agreeing on a common set of updates
that greatly simplifies recovery.

Advancing epochs:
=================

So far no way to progress to the next epoch was introduced. If algorithms
described above were ran as is, there would be only one epoch boundary: an
initial file system state (as created by mkfs), and it would be the only point
to which epoch-based recovery could restore the system up to.

A switch to the next epoch can be initiated by any node N, and is effected by

        E3. N.epoch++;

That's all. That is, multiple nodes can advance epochs completely
independently without any communication whatsoever. To understand why this is
sound recall the proof of S3: all it relies on is that epochs monotonically
increase across a chain _dependent_ messages, and to be involved into
dependent operation nodes communicate (through another node perhaps), and
their epoch numbers are synchronized by E1 and E2.

E3 is executed atomically w.r.t. E1 and E2. Note that E3 doesn't break epoch
monotonicity assumed by S0.

To speed up announcement of a new epoch, N

        E4. (optionally) sends null messages to some nodes.

The more, if any, null messages are sent to other nodes, the faster news about
new epoch are spread across the cluster. In the extreme case, N broadcasts
announcement to the whole cluster. Note that there is no synchrony
requirements for the null messages: it is perfectly valid, for example, that N
is still sending them when another node already started sending the next round
of announcements.

There is a great laxity in deciding when to switch to the next epoch. Possible
variants include: on every reintegration (an extreme case), on a timeout, on a
certain amount of updates in the existing epoch, etc. Similarly it's a matter
of policy to allow all or only select nodes to advance epochs.

Retiring epochs:
================

The description above outlines, in principle, a workable system, on top of
which distributed recovery can be implemented. Yet in O2 a flaw is hidden:
`undo log' can only grow, and no way to limit its size is indicated. While
from some points of view (audit, undelete, backup) a complete log of system
updates from the beginning of time is useful, it is generally unacceptable to
keep an O(operations) rather than O(data + metadata) state on the persistent
storage. To this end a mechanism to prune undo log without sacrificing
correctness is necessary.

Clearly, an entry can be pruned from an undo log the moment it is guaranteed
that the corresponding update will be never undone as a part of restoring
consistent state during recovery. As our goal is to restore to the epoch
boundary, all undo entries for a given epoch are discardable if one of them
is. To understand what epochs can be retired, let's look at the epochs that
can be not.

Obviously, an epoch cannot be discarded from an undo log if some of its
updates are in volatile storage only: if nodes with these volatile updates
fail, epoch can never be completed, and has to be undone completely.

From this it is tempting to conclude that an epoch can be pruned from undo
logs once all of its updates are on the persistent storage, but, welladay
[4], this is no so, because even as a given epoch can be everywhere stabilized
to the persistent storage, some of its preceding epochs can be still volatile.

This in fact is the only obstacle: an epoch can be pruned from undo
logs as soon as it and all preceding epochs are everywhere stabilized.

Note that because epochs are advanced independently, updates for a given epoch
can be spread across all nodes, and the only safe way to learn about
everywhere stable epochs is to ask every node in the cluster what is the
oldest epoch for which it has updates in the volatile storage only.

Finding out everywhere stable epochs can be done across various `topologies':
star, ring, etc. [6] We shall discuss the simplest star model, but see below.

Every node N maintains an `oldest locally volatile epoch' N.lvepoch, defined
as an earliest epoch that still has on this node updates in the volatile
memory only.

For a server node, N.lvepoch is an epoch of the earliest update that was
executed, but hasn't yet committed to the persistent storage.

For a client node, N.lvepoch is an epoch of the earliest reintegration that
has at least one update that hasn't been committed to the stable storage on
the corresponding server.

Note that N.lvepoch does _not_ necessary increase monotonically with time, as
a node can receive, as a part of reintegration, updates with an epoch smaller
than any epoch it seen before. The following however holds:

S4: For any node N, N.lvepoch <= N.epoch, at any time.

    Proof: if N received an update U as a part of reintegration, N.epoch was
    updated by E1 as part of REINT(U) processing, assuring that U.epoch <=
    N.epoch. If U originates on N, it is tagged with the current node epoch,
    so U.epoch == N.epoch. Since that moment, N.epoch continues to increase
    monotonically, guaranteeing that U.epoch <= N.epoch for any volatile (or
    stable, for that matter) update U on N. Therefore,

            N.lvepoch == min{U.epoch | volatile update U@N} <= N.epoch;

A node SC (Stability Coordinator) is selected in the cluster configuration. SC
monitors

        - cluster membership: every node N sends 

                HAS_VOLATILE(N, N.epoch) : N ->> SC 

          when it enters the cluster (where N.epoch is set up during the
          initial hand-shake with the cluster entry node) and

                HAS_VOLATILE(N, +infinity) : N ->> SC 

          when it parts from the cluster. When a node N is evicted by a node
          M, a HAS_VOLATILE(N, +infinity) : M ->> SC is send.

        - an oldest locally volatile epoch for every node as an array
          SC.lvepoch[].

These data are updated as following:

        E5. Periodically every node N sends 

                HAS_VOLATILE(N, N.lvepoch) : N -> SC.

        E6. On receiving HAS_VOLATILE(N, lvepoch) : M -> SC, SC sets

                SC.lvepoch[N] = lvepoch;

        E7. When min{SC.lvepoch[*]} changes, SC broadcasts

                MIN_VOLATILE(min{SC.lvepoch[*]}) : SC -> N

            to every node N.

Protocol E5--E7 implements a `stability algorithm'.

Clearly, stability algorithm aligns very well with the tree reduction [6]: in
a typical cluster clients will report their oldest volatile epochs to the
servers, that would compute minimum across their children and forward it
upward until the root node is reached, from where the global minimum is
propagated back.

S5: When a node N receives

        MIN_VOLATILE(E) : SC -> N

it can safely prune all epochs older than E from its undo logs.

    Proof: Imagine that some epoch earlier than E is somewhere volatile at the
    moment when N receives the message above, that is some node M has volatile
    update U such that U.epoch < E. Let Q be the node that originated U (this
    might be M or some other node). We have the following sequence of
    messages:

            HAS_VOLATILE(Q, lvepoch) : X -> SC
            MIN_VOLATILE(E) : SC -> N

    where the last such stability message from Q is meant. If X != Q, that is,
    Q has been evicted by a certain node X, then, by the definition of the
    eviction process (see below), the following sequence of messages took
    place:

            HAS_VOLATILE(M, M.lvepoch) : M ->> SC
            HAS_VOLATILE(Q, +infinity) : X ->> SC
            MIN_VOLATILE(E) : SC -> N

    and we have

            U.epoch >= M.lvepoch           ; by the definition of M.lvepoch
                    >= min{SC.lvepoch[*]}  ; by E6
                    == E                   ; by E7.

    Contradiction. 
                
    Hence, X == Q, and Q hasn't been evicted. If lvepoch > U.epoch, then by
    the definition of Q.lvepoch, Q has been informed by the servers (including
    M), that all updates in U.epoch, including U have been stabilized, which
    contradicts the initial assumption about U. Hence,

            U.epoch >= Q.lvepoch
                    >= min{SC.lvepoch[*]}  ; by E6
                    == E                   ; by E7.

    Contradiction. 

Redo logs:
==========

The problem of pruning redo logs, filled by O1 is much simpler: once a record
for an update U is discarded from the undo log, corresponding record can be
discarded from the redo log too, because if record is never undone, there will
never be a chance to redo it. This policy is conservative, because redo logs
can be pruned much more aggressively, yet, it is simple, and all
infrastructure for it already exists.

Recovery:
=========

Let's after all describe how the recovery process looks like.

There are two types of recovery:

        - eviction: it happens when a node without persistent storage
          fails. In this case, some other node takes the task of restoring
          consistent state, and

        - recovery (proper), that happens when a node with persistent storage
          fails. In this case failed node initiates distributed recovery
          algorithm when it restarts.

When a node N decides to evict a victim node V, it

        V1. sends EVICT_PREP(V) : N -> M to all nodes that might potentially
            keep volatile updates for V (typically, all servers, including N).

        V2. On receiving EVICT_PREP(V) : N -> M, node M

                - records that V is evicted, denying all future messages from
                  it,

                - sends HAS_VOLATILE(N, N.lvepoch) : N ->> SC.

                - finds all locks it granted to V,

                - finds in its undo log all records [U, OP] where U is
                  protected by these locks (U.node == V of course), and
                  adds them to the list L,

                - sends EVICT_ACK(V, L) : M -> N.

        V3. On receiving EVICT_ACK(V, L) : M -> N from all nodes, N

                - removes from every L all entries that together match
                  complete operation.

                - if all L lists are now empty, then all updates from all
                  operations reintegrated by V reached other nodes, and there is
                  nothing to do;

                - otherwise, some of the updates were lost, and other updates
                  from the same operations have to be undone, which might
                  require undoing yet other updates, including updates not
                  protected by the locks held by V, and updates made by other
                  clients. N initiates proper recovery (see below), that can be
                  started immediately from the step R4, by sending

                        RECOVERY_COMMIT(N, min{U.epoch | U in L}) : N ->> M.

        V4. In any case, eviction is finished by sending

                        HAS_VOLATILE(V, +infinity) : N ->> SC.

This algorithm is `obviously' correct, as it either

        - discards V volatile storage in the case when contents of this
          storage is duplicated on other nodes (thus global state is not
          changed), or

        - invokes proper recovery.

Many optimizations are possible:

        - M can omit `local' operations from L,

        - tree reduction of L construction,

        - nodes might force updates to the persistent storage
          (commit-on-eviction) to reduce the risk of future proper recovery
          failing due to missing V redo log.

The unfortunate fact that eviction might force undoing updates made by other
clients and, hence, cascading evictions is a direct consequence of a weaker
isolation level implied by O1 and O2, viz. an ability to read data modified by
an update that is a part of an operation other updates of which hasn't reached
their target nodes yet. This is similar to `cascading aborts' that arise due
to reads of uncommitted data [8], and can be addressed by a very simple
mechanism:

        O3: For any operation OP1 = (U(0), ... U(n)), a REINT(U(k)) message
            can be sent only once for any operation OP0 containing an update
            conflicting with any of U(i), replies to all OP0 reintegration
            messages have been received.

That is, new reintegration can start only after all conflicting reintegrations
fully completed, where `conflict' is understood to be between operations
rather than between individual updates.

With a modest decrease in reintegration concurrency, introduced by this
mechanism that we are going to call `volatile-on-share' (because on conflict
updates are forced to at least volatile storage of their target nodes, compare
with commit-on-share [11]), eviction algorithm can be simplified as following:

        V3'. On receiving EVICT_ACK(V, L) : M -> N from all nodes, N

                - removes from every L all entries that together match
                  complete operation.

                - for any non-empty L, N sends

                        EVICT_COMMIT(V, L) : N ->> M

        V4'. On receiving EVICT_COMMIT(V, L) : N ->> M, node M

                - undoes all updates in L, with O3 guaranteeing that no
                  conflicting updates exist.

                - releases all locks found in V2.

        V5'. N finishes eviction by sending

                        HAS_VOLATILE(V, +infinity) : N ->> SC.

Proper recovery.

We shall assume, that the transaction system on nodes with persistent storage
maintains commit ordering:

        If transactions T0 and T1 contain conflicting updates U0 and U1, U0
        precedes U1 in time and T1 has been committed to the persistent
        storage, then so is T0.

(If a pair of transactions has multiple conflicting updates, they all have to
be in the same order, otherwise transactions are not serializable.)

The rough recovery plan is to

        - find out which is the latest everywhere stable epoch (by running
          stability algorithm described above),

        - undo all epochs up to the epoch found, and

        - apply all available redo logs to restore as much state as possible.

Some node (presumably a node that failed and restarted) acts as a recovery
coordinator (RC). RC maintains `oldest somewhere volatile epoch' RC.svepoch as
described below.

The following protocol (two phase commit protocol [9] for RC.svepoch, in fact)
is executed:

        R1. RC sends RECOVERY_PREP(RC) : RC -> N to all nodes with the
        persistent storage.

        R2. On receiving RECOVERY_PREP(RC) : RC -> N, node N sends

                HAS_VOLATILE(N, N.lvepoch) : N -> RC

        R3. On receiving HAS_VOLATILE(N, lvepoch) : N -> RC, RC sets

                RC.svepoch = min(RC.svepoch, lvepoch).

        R4. Once RC received all HAS_VOLATILE messages from all servers, it
        broadcasts

                RECOVERY_COMMIT(RC, RC.svepoch) : RC ->> N

        R5. on receiving RECOVERY_COMMIT(RC, E), node N undoes all updates in
        has in its logs in all epochs starting from E.

Nodes might need to keep some persistent state to guarantee recovery progress
in the face of repeated failures, in the standard 2PC fashion.

Once undo-part of recovery is finished, clients are asked to push their redo
logs, starting from epoch RC.svepoch to servers, before they can start
requesting new reintegrations. Usual Lustre algorithm (version based recovery)
can be used here, with nodes evicted, when they cannot redo updates due to
some other client failure.

Belle Epoque:
=============

Proposed algorithm has the following advantages:

        - it is very simple. Surprisingly, with some effort, it even seems to
          be amendable to a more or less complete formal analysis;

        - it is non-blocking: in no event `normal processing' i.e.,
          reintegration has to block waiting for some epoch-related processing;

        - it is scalable, provided failures are relatively rare;

Its main disadvantage is that due to the clients participation in the
stabilization algorithm, a failed client can delay detection of everywhere
stable epochs, and, hence, lead to larger undo-redo lists, and a longer
recovery. It seems that in the worst case, a sequence of client failures can
delay detection of epoch stabilization by a timeout times some small constant.

References:
===========

[1] Lamport timestamps: http://en.wikipedia.org/wiki/Lamport_timestamps

[2] Vector clocks: http://en.wikipedia.org/wiki/Vector_clocks

[3] Two phase locking: http://en.wikipedia.org/wiki/Two_phase_locking

[4] Nurse:    Ah, well-a-day! he's dead, he's dead, he's dead!
              We are undone, lady, we are undone!

[5] Network topology: http://en.wikipedia.org/wiki/Network_topology

[6] Tree reduction: http://en.wikipedia.org/wiki/Graph_reduction

[7] Serializability: http://en.wikipedia.org/wiki/Serializability

[8] Recoverability: http://en.wikipedia.org/wiki/Schedule_(computer_science)#Avoids_cascading_aborts_.28rollbacks.29

[9] Two-phase commit: http://en.wikipedia.org/wiki/Two-phase_commit_protocol

[10] Cuts: http://arch.lustre.org/index.php?title=Cuts

[11] Commit on share: http://arch.lustre.org/index.php?title=Commit_on_Share

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22  7:53 [Lustre-devel] global epochs [an alternative proposal, long and dry] Nikita Danilov
@ 2008-12-22 11:52 ` Alex Zhuravlev
  2008-12-22 12:45   ` Nikita Danilov
  2009-01-15 23:40 ` [Lustre-devel] global epochs vs fsync Alex Zhuravlev
  1 sibling, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-22 11:52 UTC (permalink / raw)
  To: lustre-devel

Hello,

Nikita Danilov wrote:
> For a given update U, a node N can send a message to U.node, requesting a
> _lock_ that will delay requests for locks for conflicting updates requested
> from other nodes until the lock is either released by another message or when
> N leaves the cluster. (In reality locks are taken on objects, but introducing
> them would complicate the exposition.)

I find this relying on explicit request (lock in this case) as a disadvantage:
lock can be taken long before reintegration meaning epoch might be pinned for
long pinning in turn a lot of undo/redo logs. It's also not very clear how fsync(2)
and similar requests will be working with such pinned epochs - broadcast to release
or change on-client epoch? Another point is that some operation can be lockless:
for example, we were planning to skip extents locking for exclusively open files
while epoch could be used by SNS for data.

> Every node N maintains an `oldest locally volatile epoch' N.lvepoch, defined
> as an earliest epoch that still has on this node updates in the volatile
> memory only.
> For a client node, N.lvepoch is an epoch of the earliest reintegration that
> has at least one update that hasn't been committed to the stable storage on
> the corresponding server.

this means client actually should maintain many epochs at same time as any lock
enqueue can advance epoch.

> A node SC (Stability Coordinator) is selected in the cluster configuration. SC
> monitors

I think having SC is also drawback:
1) choosing such node is additional complexity and delay
2) failing of such node would need global resend of states
3) many unrelated nodes can get stuck due to large redo logs

> These data are updated as following:
> 
>         E5. Periodically every node N sends 
> 
>                 HAS_VOLATILE(N, N.lvepoch) : N -> SC.
> 
>         E6. On receiving HAS_VOLATILE(N, lvepoch) : M -> SC, SC sets
> 
>                 SC.lvepoch[N] = lvepoch;
> 
>         E7. When min{SC.lvepoch[*]} changes, SC broadcasts
> 
>                 MIN_VOLATILE(min{SC.lvepoch[*]}) : SC -> N
> 
>             to every node N.
> 
> Protocol E5--E7 implements a `stability algorithm'.

given current epoch can be advanced by lock enqueue, client can get many used
epochs at same time, thus we'd have to track them all in the protocol.

> Clearly, stability algorithm aligns very well with the tree reduction [6]: in
> a typical cluster clients will report their oldest volatile epochs to the
> servers, that would compute minimum across their children and forward it
> upward until the root node is reached, from where the global minimum is
> propagated back.

I'm not sure it scales well as any failed node may cause global stuck in undo/redo
pruning.

> Redo logs:
> ==========
> 
> The problem of pruning redo logs, filled by O1 is much simpler: once a record
> for an update U is discarded from the undo log, corresponding record can be
> discarded from the redo log too, because if record is never undone, there will
> never be a chance to redo it. This policy is conservative, because redo logs
> can be pruned much more aggressively, yet, it is simple, and all
> infrastructure for it already exists.

it's probably simpler, but single node suffers from this global dependency much:
there might be a lot of epochs under work and lots of RPCs (especially with tree
reduction) before client can discard redo. I don't think this really scales well
with 100K and more nodes.

> Proper recovery.
> The rough recovery plan is to
> 
>         - find out which is the latest everywhere stable epoch (by running
>           stability algorithm described above),

It's not very clear how server finds epoch stable in case of total power off:
no client can provide this data.

> 
>         - undo all epochs up to the epoch found, and
> 
>         - apply all available redo logs to restore as much state as possible.
> 
> Some node (presumably a node that failed and restarted) acts as a recovery
> coordinator (RC). RC maintains `oldest somewhere volatile epoch' RC.svepoch as
> described below.
> 
> The following protocol (two phase commit protocol [9] for RC.svepoch, in fact)
> is executed:
> 
>         R1. RC sends RECOVERY_PREP(RC) : RC -> N to all nodes with the
>         persistent storage.
> 
>         R2. On receiving RECOVERY_PREP(RC) : RC -> N, node N sends
> 
>                 HAS_VOLATILE(N, N.lvepoch) : N -> RC
> 
>         R3. On receiving HAS_VOLATILE(N, lvepoch) : N -> RC, RC sets
> 
>                 RC.svepoch = min(RC.svepoch, lvepoch).
> 
>         R4. Once RC received all HAS_VOLATILE messages from all servers, it
>         broadcasts
> 
>                 RECOVERY_COMMIT(RC, RC.svepoch) : RC ->> N
> 
>         R5. on receiving RECOVERY_COMMIT(RC, E), node N undoes all updates in
>         has in its logs in all epochs starting from E.
> 
> Nodes might need to keep some persistent state to guarantee recovery progress
> in the face of repeated failures, in the standard 2PC fashion.
> 
> Once undo-part of recovery is finished, clients are asked to push their redo
> logs, starting from epoch RC.svepoch to servers, before they can start
> requesting new reintegrations. Usual Lustre algorithm (version based recovery)
> can be used here, with nodes evicted, when they cannot redo updates due to
> some other client failure.
> 

while one may find this simple I think we shouldn't sacrifice scalability and
performance for simplicity.

Instead we could do the following:

  * when client issues transaction it labels it with unique id
  * server executing operation write atomically undo record with:
    * VBR versions so that we can build chains of really depended operations
    * unique transaction id generated by client
    * number of servers involved in transaction
  * periodically servers exchange their committed unique transaction ids
    (only distributed transaction are involved in this)
  * once some distributed transaction is committed on all involved servers, we can prune
    it and all its local successors
  * during recovery:
    * first, all capable clients replay they redo (replay queue)
    * servers read their undo logs, find distributed transactions
    * servers exchange their distributed transaction ids
    * servers find partially committed distributed transactions
    * servers undo partially committed distributed transactions and all depending on them

I see the following advantages of this dependency-based approach:
  * only servers are involved
  * no single point of failure that may cause many nodes to block due to large redo logs
  * client doesn't need to track many global epochs - just use current mechanism, no
    changes on the client side
  * no rely on some protocol like ldlm
  * support for lockless operations
  * with late replay we don't need to update redo with some new epoch
  * doesn't depend on current cluster state:
    * can forward transactions via intermediate nodes
    * may be important for complex setups over WAN
  * fsync(2) and synchronous requests can be implemented optimal way
  * support for global and local epochs with no additional code
  * amount of network overhead is proportional to number of distributed transactions:
    * server just needs to send arrays of transaction ids to other servers
    * much better batching compared to the above
  * with 32K distributed transaction per second and 16byte unique transaction id, server 
nwould need to send ~2,5MB per 5 second
  * if server is told what other transaction's participant, then this exchange can be very 
efficient
  * no need in undo for non-depended changes:
   * in the simplest form - no uncommitted distributed transaction in undo before
   * in the complex form - tracking real dependency at executime time
   * it means in many cases recovery can be very fast
  * recovery can be completed quickly as undo is smaller and we undo-redo very selectively
  * as long as no distributed transactions are issued (A works with /home/a living on mds1,
    B works with /home/b living on mds2) no any epoch-related activity is required,
    including undo



thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 11:52 ` Alex Zhuravlev
@ 2008-12-22 12:45   ` Nikita Danilov
  2008-12-22 13:48     ` Alexander Zarochentsev
  2008-12-22 14:44     ` Alex Zhuravlev
  0 siblings, 2 replies; 25+ messages in thread
From: Nikita Danilov @ 2008-12-22 12:45 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Hello,

Hello,

 > 
 > Nikita Danilov wrote:
 > > For a given update U, a node N can send a message to U.node, requesting a
 > > _lock_ that will delay requests for locks for conflicting updates requested
 > > from other nodes until the lock is either released by another message or when
 > > N leaves the cluster. (In reality locks are taken on objects, but introducing
 > > them would complicate the exposition.)
 > 
 > I find this relying on explicit request (lock in this case) as a disadvantage:
 > lock can be taken long before reintegration meaning epoch might be pinned for

Hm.. a lock doesn't pin an epoch in any way.

 > long pinning in turn a lot of undo/redo logs. It's also not very clear how fsync(2)
 > and similar requests will be working with such pinned epochs - broadcast to release
 > or change on-client epoch? Another point is that some operation can be lockless:
 > for example, we were planning to skip extents locking for exclusively open files
 > while epoch could be used by SNS for data.

Locks are only needed to make proof of S2 possible. Once lockless
operation or SNS guarantee in some domain-specific way that no epoch can
depend on a future one, we are fine.

 > 
 > > Every node N maintains an `oldest locally volatile epoch' N.lvepoch, defined
 > > as an earliest epoch that still has on this node updates in the volatile
 > > memory only.
 > > For a client node, N.lvepoch is an epoch of the earliest reintegration that
 > > has at least one update that hasn't been committed to the stable storage on
 > > the corresponding server.
 > 
 > this means client actually should maintain many epochs at same time as any lock
 > enqueue can advance epoch.

I don't understand what is meant by "maintaining an epoch" here. Epoch
is just a number. Surely a client will keep in its memory (in the redo
log) a list of updates tagged by multiple epochs, but I don't see any
problem with this.

 > 
 > > A node SC (Stability Coordinator) is selected in the cluster configuration. SC
 > > monitors
 > 
 > I think having SC is also drawback:
 > 1) choosing such node is additional complexity and delay
 > 2) failing of such node would need global resend of states
 > 3) many unrelated nodes can get stuck due to large redo logs

As I pointed out, only the simplest `1-level star' form of a stability
algorithm was described for simplicity. This algorithms is amendable to
a lot of optimization, because it, in effect, has to find a running
minimum in a distributed array, and this can be done in a scalable way:

        Clearly, stability algorithm aligns very well with the tree
        reduction [6]: in a typical cluster clients will report their
        oldest volatile epochs to the servers, that would compute
        minimum across their children and forward it upward until the
        root node is reached, from where the global minimum is
        propagated back.

Note, that this requires _no_ additional rpcs from the clients.

 > > 
 > > Protocol E5--E7 implements a `stability algorithm'.
 > 
 > given current epoch can be advanced by lock enqueue, client can get many used
 > epochs at same time, thus we'd have to track them all in the protocol.

I am not sure I understand this. _Any_ message (including lock enqueue,
REINT, MIN_VOLATILE, CONNECT, EVICT, etc.) potentially updates the epoch
of a receiving node.

 > 
 > > Clearly, stability algorithm aligns very well with the tree reduction [6]: in
 > > a typical cluster clients will report their oldest volatile epochs to the
 > > servers, that would compute minimum across their children and forward it
 > > upward until the root node is reached, from where the global minimum is
 > > propagated back.
 > 
 > I'm not sure it scales well as any failed node may cause global stuck in undo/redo
 > pruning.

Only until this node is evicted, and I think that no matter what is the
pattern of failures, a single level of `tree reduction', can be delayed
by no more than a single eviction timeout.

 > 
 > > Redo logs:
 > > ==========
 > > 
 > > The problem of pruning redo logs, filled by O1 is much simpler: once a record
 > > for an update U is discarded from the undo log, corresponding record can be
 > > discarded from the redo log too, because if record is never undone, there will
 > > never be a chance to redo it. This policy is conservative, because redo logs
 > > can be pruned much more aggressively, yet, it is simple, and all
 > > infrastructure for it already exists.
 > 
 > it's probably simpler, but single node suffers from this global dependency much:
 > there might be a lot of epochs under work and lots of RPCs (especially with tree
 > reduction) before client can discard redo. I don't think this really scales well
 > with 100K and more nodes.

Actually, single-server operation can be discarded from a redo log as
soon as it commits on the target server, because the later can always
redo it (possibly after undo). Given that majority of operations are
single server, redo logs won't be much larger than they are to-day.

 > 
 > > Proper recovery.
 > > The rough recovery plan is to
 > > 
 > >         - find out which is the latest everywhere stable epoch (by running
 > >           stability algorithm described above),
 > 
 > It's not very clear how server finds epoch stable in case of total power off:
 > no client can provide this data.

The `marker' of a last everywhere stable epoch is the end of undo
log---when a server receives a MIN_VOLATILE message, it prunes all
everywhere stable epochs from its undo log, so on recovery servers
simply exchange the oldest epochs in their logs, and find youngest of
them. All the epochs before this one are everywhere stable (because at
least one server pruned them from its undo logs and hence it received a
MIN_VOLATILE message authorizing it to do so), and every server can roll
back to it. So, rolling all servers back to this epoch is possible and
restores a consistent snapshot.

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 12:45   ` Nikita Danilov
@ 2008-12-22 13:48     ` Alexander Zarochentsev
  2008-12-22 14:21       ` Nikita Danilov
  2008-12-22 14:44     ` Alex Zhuravlev
  1 sibling, 1 reply; 25+ messages in thread
From: Alexander Zarochentsev @ 2008-12-22 13:48 UTC (permalink / raw)
  To: lustre-devel

On 22 December 2008 15:45:51 Nikita Danilov wrote:
> Alex Zhuravlev writes:
>  > Hello,
>
>  > I'm not sure it scales well as any failed node may cause global
>  > stuck in undo/redo pruning.
>
> Only until this node is evicted, and I think that no matter what is
> the pattern of failures, a single level of `tree reduction', can be
> delayed by no more than a single eviction timeout.

It introduces unneeded dependency between nodes, any node cannot prune 
its own undo logs if all nodes have an agreement that the epoch can be 
pruned. IMO it is what scalable system should avoid. 

If we would have a disaster in a part of the cluster, client nodes would 
disconnect and reconnect often, the undo logs will be overloaded, and 
the cluster will stop, no?

Thanks,
-- 
Alexander "Zam" Zarochentsev
Staff Engineer
Lustre Group, Sun Microsystems

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 13:48     ` Alexander Zarochentsev
@ 2008-12-22 14:21       ` Nikita Danilov
  2008-12-22 14:45         ` Alex Zhuravlev
  0 siblings, 1 reply; 25+ messages in thread
From: Nikita Danilov @ 2008-12-22 14:21 UTC (permalink / raw)
  To: lustre-devel

Alexander Zarochentsev writes:
 > On 22 December 2008 15:45:51 Nikita Danilov wrote:
 > > Alex Zhuravlev writes:
 > >  > Hello,
 > >
 > >  > I'm not sure it scales well as any failed node may cause global
 > >  > stuck in undo/redo pruning.
 > >
 > > Only until this node is evicted, and I think that no matter what is
 > > the pattern of failures, a single level of `tree reduction', can be
 > > delayed by no more than a single eviction timeout.
 > 
 > It introduces unneeded dependency between nodes, any node cannot prune 
 > its own undo logs if all nodes have an agreement that the epoch can be 
 > pruned. IMO it is what scalable system should avoid. 

This is a price paid for the cheap introduction of new epochs. If epoch
scope is limited to a known group of nodes, then retiring such an epoch
requires consensus only between nodes of this group (cheaper than a
global consensus), but introduction of new epochs requires coordination
between groups. In various designs that we considered where epochs are
per-client this manifests itself as an absence of total ordering between
epochs that requires translation between client epochs and server
transaction identifiers.

All in all, I have a feeling that _all_ such algorithms have similar
communication overhead for the `usual' workload.

 > 
 > If we would have a disaster in a part of the cluster, client nodes would 
 > disconnect and reconnect often, the undo logs will be overloaded, and 
 > the cluster will stop, no?

Well it won't stop, because a node either manages to reconnect in time
(in which case it communicates its state to the superior), or it is
evicted on a timeout. In any case, stabilization algorithm progresses.

Then, I think that even the simplest global epoch based recovery is very
challenging to implement.

 > 
 > Thanks,
 > -- 
 > Alexander "Zam" Zarochentsev

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 12:45   ` Nikita Danilov
  2008-12-22 13:48     ` Alexander Zarochentsev
@ 2008-12-22 14:44     ` Alex Zhuravlev
  2008-12-22 17:15       ` Nikita Danilov
  1 sibling, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-22 14:44 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
>  > I find this relying on explicit request (lock in this case) as a disadvantage:
>  > lock can be taken long before reintegration meaning epoch might be pinned for
> 
> Hm.. a lock doesn't pin an epoch in any way.

well, I think it does as you don't want to use epoch received few minutes ago with lock.
if node is in WBC mode and granted some STL-like lock, then it may be sending few MBs
batch every, say, 5 minutes. there might be no interaction between batches. this means
client would need to refresh epoch. depending on workload it may happen that client
won't be able to send batch awaiting new epoch or client may refresh epoch with no real
batches after that.

> Locks are only needed to make proof of S2 possible. Once lockless
> operation or SNS guarantee in some domain-specific way that no epoch can
> depend on a future one, we are fine.

well, I guess "in some domain-specific way" means another complexity.

>  > this means client actually should maintain many epochs at same time as any lock
>  > enqueue can advance epoch.
> 
> I don't understand what is meant by "maintaining an epoch" here. Epoch
> is just a number. Surely a client will keep in its memory (in the redo
> log) a list of updates tagged by multiple epochs, but I don't see any
> problem with this.

the problem is that with out-of-order epochs sent to different servers client can't
use notion of "last_committed" anymore.

>  > I think having SC is also drawback:
>  > 1) choosing such node is additional complexity and delay
>  > 2) failing of such node would need global resend of states
>  > 3) many unrelated nodes can get stuck due to large redo logs
> 
> As I pointed out, only the simplest `1-level star' form of a stability
> algorithm was described for simplicity. This algorithms is amendable to
> a lot of optimization, because it, in effect, has to find a running
> minimum in a distributed array, and this can be done in a scalable way:

the bad think, IMHO, in all this is that all nodes making decision must
understand topology. server should separate epochs from different clients,
it's hard to send batches via some intermediate server/node.

> Note, that this requires _no_ additional rpcs from the clients.

disagree. at least for distributed operations client has to report non-volatile
epoch from time to time. in some cases we can use protocol like ping, in some - not.

>  > given current epoch can be advanced by lock enqueue, client can get many used
>  > epochs at same time, thus we'd have to track them all in the protocol.
> 
> I am not sure I understand this. _Any_ message (including lock enqueue,
> REINT, MIN_VOLATILE, CONNECT, EVICT, etc.) potentially updates the epoch
> of a receiving node.

correct, this means client may have many epochs to track. thus no last_committed anymore.

> Only until this node is evicted, and I think that no matter what is the
> pattern of failures, a single level of `tree reduction', can be delayed
> by no more than a single eviction timeout.

the problem is that may affect non-related nodes very easily.

> Actually, single-server operation can be discarded from a redo log as
> soon as it commits on the target server, because the later can always
> redo it (possibly after undo). Given that majority of operations are
> single server, redo logs won't be much larger than they are to-day.

undo to redo? even longer recovery?

thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 14:21       ` Nikita Danilov
@ 2008-12-22 14:45         ` Alex Zhuravlev
  0 siblings, 0 replies; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-22 14:45 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
> All in all, I have a feeling that _all_ such algorithms have similar
> communication overhead for the `usual' workload.

not sure what you meant by "such" and "usual", but dependency-based
recovery may have zero overhead for "usual local" workload, in terms
of network and disk traffic.

thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 14:44     ` Alex Zhuravlev
@ 2008-12-22 17:15       ` Nikita Danilov
  2008-12-22 17:36         ` Alex Zhuravlev
  0 siblings, 1 reply; 25+ messages in thread
From: Nikita Danilov @ 2008-12-22 17:15 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Nikita Danilov wrote:
 > >  > I find this relying on explicit request (lock in this case) as a disadvantage:
 > >  > lock can be taken long before reintegration meaning epoch might be pinned for
 > > 
 > > Hm.. a lock doesn't pin an epoch in any way.
 > 
 > well, I think it does as you don't want to use epoch received few minutes ago with lock.

What is the problem with this?

 > 
 > > Locks are only needed to make proof of S2 possible. Once lockless
 > > operation or SNS guarantee in some domain-specific way that no epoch can
 > > depend on a future one, we are fine.
 > 
 > well, I guess "in some domain-specific way" means another complexity.

Any IO mechanism has to guarantee that operations are "serializable",
that is, no circular dependencies exist. This is what global epochs
need, they don't depend on DLM per se.

 > > I don't understand what is meant by "maintaining an epoch" here. Epoch
 > > is just a number. Surely a client will keep in its memory (in the redo
 > > log) a list of updates tagged by multiple epochs, but I don't see any
 > > problem with this.
 > 
 > the problem is that with out-of-order epochs sent to different servers client can't
 > use notion of "last_committed" anymore.

What do you mean by "out of order" here?

 > 
 > >  > I think having SC is also drawback:
 > >  > 1) choosing such node is additional complexity and delay
 > >  > 2) failing of such node would need global resend of states
 > >  > 3) many unrelated nodes can get stuck due to large redo logs
 > > 
 > > As I pointed out, only the simplest `1-level star' form of a stability
 > > algorithm was described for simplicity. This algorithms is amendable to
 > > a lot of optimization, because it, in effect, has to find a running
 > > minimum in a distributed array, and this can be done in a scalable way:
 > 
 > the bad think, IMHO, in all this is that all nodes making decision must
 > understand topology. server should separate epochs from different clients,
 > it's hard to send batches via some intermediate server/node.

Hm.. I would think that this is very easy, thanks to the good properties
of the minimum function (associativity, commutativity, etc.): client
piggy-backs its earliest volatile epoch to any message it sends to any
server, and server batches these data from clients and forwards them to
SC.

 > 
 > > Note, that this requires _no_ additional rpcs from the clients.
 > 
 > disagree. at least for distributed operations client has to report non-volatile
 > epoch from time to time. in some cases we can use protocol like ping, in some - not.

I agree with this, but I am not sure this is a problem. If client is
idle for seconds, pinging is not a big deal.

 > 
 > >  > given current epoch can be advanced by lock enqueue, client can get many used
 > >  > epochs at same time, thus we'd have to track them all in the protocol.
 > > 
 > > I am not sure I understand this. _Any_ message (including lock enqueue,
 > > REINT, MIN_VOLATILE, CONNECT, EVICT, etc.) potentially updates the epoch
 > > of a receiving node.
 > 
 > correct, this means client may have many epochs to track. thus no last_committed anymore.

Presicely the contrary: MIN_VOLATILE message returns something
equivalent to the cluster-wide global last_committed.

 > 
 > undo to redo? even longer recovery?

No, redo to undo. :-)

 > 
 > thanks, Alex

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 17:15       ` Nikita Danilov
@ 2008-12-22 17:36         ` Alex Zhuravlev
  2008-12-22 18:57           ` Nikita Danilov
  0 siblings, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-22 17:36 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
>  > well, I think it does as you don't want to use epoch received few minutes ago with lock.
> 
> What is the problem with this?

the problem is that this epoch may hold lots of other epochs? this may be especially
important for fsync(2) or any synchronous request.

> Any IO mechanism has to guarantee that operations are "serializable",
> that is, no circular dependencies exist. This is what global epochs
> need, they don't depend on DLM per se.

global epochs depend on DLM as a transport to refresh epochs. at least the idea, AFAIU,
is to use LDLM RPC to carry epoch protocol. otherwise it'd need separate RPC. I'm just
saying that there are case, probably important, when such explicit RPC will be needed,
probably in nearly-sync manner. I think this is also additional complexity.

>  > the problem is that with out-of-order epochs sent to different servers client can't
>  > use notion of "last_committed" anymore.
> 
> What do you mean by "out of order" here?

epoch N+1 can be committed by mds1 before epoch N is committed by mds2. each such
epoch is to be tracked separately and "last_committed" can't be used I think.
additional complexity in the protocol.

>  > the bad think, IMHO, in all this is that all nodes making decision must
>  > understand topology. server should separate epochs from different clients,
>  > it's hard to send batches via some intermediate server/node.
> 
> Hm.. I would think that this is very easy, thanks to the good properties
> of the minimum function (associativity, commutativity, etc.): client
> piggy-backs its earliest volatile epoch to any message it sends to any
> server, and server batches these data from clients and forwards them to
> SC.

1) if epoch isn't bound to some node, then it's also can be hard to push epochs
    to implement fsync(2)
2) batching means additional delay

> I agree with this, but I am not sure this is a problem. If client is
> idle for seconds, pinging is not a big deal.

I tend to think ping can be a problem at proper scale. I wouldn't rely on this.

> Presicely the contrary: MIN_VOLATILE message returns something
> equivalent to the cluster-wide global last_committed.

you meant "from sc" direction. but before that client has to track local committness
of each epoch to servers.

thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 17:36         ` Alex Zhuravlev
@ 2008-12-22 18:57           ` Nikita Danilov
  2008-12-23  6:44             ` Alex Zhuravlev
  2008-12-23 23:37             ` Andreas Dilger
  0 siblings, 2 replies; 25+ messages in thread
From: Nikita Danilov @ 2008-12-22 18:57 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > global epochs depend on DLM as a transport to refresh epochs. at least the idea, AFAIU,
 > is to use LDLM RPC to carry epoch protocol. otherwise it'd need separate RPC. I'm just

Any message is used as a transport for epochs, including any reply
from a server. So a typical scenario would be


client                server
   epoch = 8            epoch = 9

   LOCK --------------->   
        <-------------- REPLY
   epoch = 9
                        <----- some other message with epoch = 10 from somewhere
                        epoch = 10
   ....

   REINT --------------->
         <-------------- REPLY
   epoch = 10

                        <----- some other message with epoch = 11 from somewhere
                        epoch = 11

   REINT --------------->
         <-------------- REPLY
   epoch = 11

etc. Note, that nothing prevents server from increasing its local epoch
before replying to every reintegration (this was mentioned in the
original document as an "extreme case"). With this policy there is never
more than one reintegration on a given client in a given epoch, and we
can indeed implement stability algorithm without clients.

 > saying that there are case, probably important, when such explicit RPC will be needed,
 > probably in nearly-sync manner. I think this is also additional complexity.

DLM plays no special role in the epochs mechanism. All that it is used
for is to guarantee that conflicting operations are executed in the
proper order (i.e., an epoch of dependent operation is never less than
an epoch of an operation it depends on), but this is what DLM is for,
and this has be guaranteed anyway.

 > 
 > >  > the problem is that with out-of-order epochs sent to different servers client can't
 > >  > use notion of "last_committed" anymore.
 > > 
 > > What do you mean by "out of order" here?
 > 
 > epoch N+1 can be committed by mds1 before epoch N is committed by mds2. each such
 > epoch is to be tracked separately and "last_committed" can't be used I think.

last_committed can be and have to be used. When a client reintegrated
operation OP = (U(0), ..., U(N)), it counts this operation as `volatile'
until all N servers reported (through the usual last_committed
mechanism, as it is used by Lustre currently) that all updates have
committed.

 > 
 > you meant "from sc" direction. but before that client has to track local committness
 > of each epoch to servers.

Yes, and it can use last_committed of each server to do this.

 > 
 > thanks, Alex

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 18:57           ` Nikita Danilov
@ 2008-12-23  6:44             ` Alex Zhuravlev
  2008-12-23 10:00               ` Nikita Danilov
  2008-12-23 23:37             ` Andreas Dilger
  1 sibling, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-23  6:44 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
> Any message is used as a transport for epochs, including any reply
> from a server. So a typical scenario would be

I agree, but I think there will be cases with no messages at all.
like WBC doing flush every few minutes and then going idle. depending
on workload this may introduce additional network overhead on any node.

> etc. Note, that nothing prevents server from increasing its local epoch
> before replying to every reintegration (this was mentioned in the
> original document as an "extreme case"). With this policy there is never
> more than one reintegration on a given client in a given epoch, and we
> can indeed implement stability algorithm without clients.

hmm? if it's client only who're aware of parts of distributed transaction,
how can we?


> DLM plays no special role in the epochs mechanism. All that it is used
> for is to guarantee that conflicting operations are executed in the
> proper order (i.e., an epoch of dependent operation is never less than
> an epoch of an operation it depends on), but this is what DLM is for,
> and this has be guaranteed anyway.

conflict resolution can be delegated to some different mechanism when STL takes place.

> last_committed can be and have to be used. When a client reintegrated
> operation OP = (U(0), ..., U(N)), it counts this operation as `volatile'
> until all N servers reported (through the usual last_committed
> mechanism, as it is used by Lustre currently) that all updates have
> committed.

yup. at some point I got to think you're going to use epochs instead of transno
in last_committed, which could be a problem.


just to list my observations about global epochs:
  * it's a problem to implement synchronous operations
  * network overhead even with local-only changes depending on workload
  * disk overhead even with local-only changes
  * SC is a single point of failure with any topology as it's the only place to
    find final minimum
  * tree reduction isn't obvious thing because client can't report its minimum
    to any node, instead tree is rather static thing and any change should be
    done very carefully. otherwise it's very easy to lose minimum



thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23  6:44             ` Alex Zhuravlev
@ 2008-12-23 10:00               ` Nikita Danilov
  2008-12-23 10:21                 ` Alex Zhuravlev
  0 siblings, 1 reply; 25+ messages in thread
From: Nikita Danilov @ 2008-12-23 10:00 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Nikita Danilov wrote:
 > > Any message is used as a transport for epochs, including any reply
 > > from a server. So a typical scenario would be
 > 
 > I agree, but I think there will be cases with no messages at all.
 > like WBC doing flush every few minutes and then going idle. depending
 > on workload this may introduce additional network overhead on any node.

Indeed, but in any such case additional null rpc won't harm. In fact, no
node should sit isolated for minutes with something in its cache, as it
can miss a recovery.

 > 
 > > etc. Note, that nothing prevents server from increasing its local epoch
 > > before replying to every reintegration (this was mentioned in the
 > > original document as an "extreme case"). With this policy there is never
 > > more than one reintegration on a given client in a given epoch, and we
 > > can indeed implement stability algorithm without clients.
 > 
 > hmm? if it's client only who're aware of parts of distributed transaction,
 > how can we?

If we have no more than 1 reintegration in a given epoch on a given
client, then the server that received an OP = (U(0), ..., U(N)) in epoch
E from a client, can send to SC a message telling it that this client
contains N volatile updates in epoch E, and whenever some server commits
one of U's it sends to SC a message asking it to decrease a counter for
this client. Most obvious implementation will batch these notification,
i.e., when a server commits a transaction group it notifies SC about all
changes in one message. I personally don't think that is the best
approach.

 > 
 > > DLM plays no special role in the epochs mechanism. All that it is used
 > > for is to guarantee that conflicting operations are executed in the
 > > proper order (i.e., an epoch of dependent operation is never less than
 > > an epoch of an operation it depends on), but this is what DLM is for,
 > > and this has be guaranteed anyway.
 > 
 > conflict resolution can be delegated to some different mechanism when STL takes place.

Yes, and this mechanism (if it is correct at all) will guarantee that an
epoch cannot depend on a future epoch.

 > 
 > just to list my observations about global epochs:
 >   * it's a problem to implement synchronous operations
 >   * network overhead even with local-only changes depending on workload
 >   * disk overhead even with local-only changes
 >   * SC is a single point of failure with any topology as it's the only place to
 >     find final minimum
 >   * tree reduction isn't obvious thing because client can't report its minimum
 >     to any node, instead tree is rather static thing and any change should be
 >     done very carefully. otherwise it's very easy to lose minimum

Unfortunately, as far as I know, no other solution was described with a
level of detail sufficient to compare. :-)

 > 
 > thanks, Alex

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 10:00               ` Nikita Danilov
@ 2008-12-23 10:21                 ` Alex Zhuravlev
  2008-12-23 11:06                   ` Nikita Danilov
  0 siblings, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-23 10:21 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
> If we have no more than 1 reintegration in a given epoch on a given
> client, then the server that received an OP = (U(0), ..., U(N)) in epoch
> E from a client, can send to SC a message telling it that this client
> contains N volatile updates in epoch E, and whenever some server commits
> one of U's it sends to SC a message asking it to decrease a counter for
> this client. Most obvious implementation will batch these notification,
> i.e., when a server commits a transaction group it notifies SC about all
> changes in one message. I personally don't think that is the best
> approach.

essentially this is very similar to dependency-based recovery, but with
no it's advantages and with SC tracking all states and being single point
of failure. I think we need more scalable solution.

> Yes, and this mechanism (if it is correct at all) will guarantee that an
> epoch cannot depend on a future epoch.

again, it's not about dependency, it's about network overhead of global epochs.

>  > just to list my observations about global epochs:
>  >   * it's a problem to implement synchronous operations
>  >   * network overhead even with local-only changes depending on workload
>  >   * disk overhead even with local-only changes
>  >   * SC is a single point of failure with any topology as it's the only place to
>  >     find final minimum
>  >   * tree reduction isn't obvious thing because client can't report its minimum
>  >     to any node, instead tree is rather static thing and any change should be
>  >     done very carefully. otherwise it's very easy to lose minimum
> 
> Unfortunately, as far as I know, no other solution was described with a
> level of detail sufficient to compare. :-)

I could say the same about tree reduction, for example ;)

dependency-based recovery was discussed with many details I think. and benefits are
very clear, IMHO. as well as overall simplicity due to local implementation (compared
with implementation involving all nodes in a cluster).

thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 10:21                 ` Alex Zhuravlev
@ 2008-12-23 11:06                   ` Nikita Danilov
  2008-12-23 11:31                     ` Alex Zhuravlev
  0 siblings, 1 reply; 25+ messages in thread
From: Nikita Danilov @ 2008-12-23 11:06 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Nikita Danilov wrote:
 > > If we have no more than 1 reintegration in a given epoch on a given
 > > client, then the server that received an OP = (U(0), ..., U(N)) in epoch
 > > E from a client, can send to SC a message telling it that this client
 > > contains N volatile updates in epoch E, and whenever some server commits
 > > one of U's it sends to SC a message asking it to decrease a counter for
 > > this client. Most obvious implementation will batch these notification,
 > > i.e., when a server commits a transaction group it notifies SC about all
 > > changes in one message. I personally don't think that is the best
 > > approach.
 > 
 > essentially this is very similar to dependency-based recovery, but with
 > no it's advantages and with SC tracking all states and being single point
 > of failure. I think we need more scalable solution.

We are talking about few megabytes of data in network or in memory. It's
easy to replicate this state.

 > 
 > > Yes, and this mechanism (if it is correct at all) will guarantee that an
 > > epoch cannot depend on a future epoch.
 > 
 > again, it's not about dependency, it's about network overhead of global epochs.

Again, global epochs do not depend on DLM to propagate epochs. E.g.,
lockless IO can be implemented without any additional rpcs.

 > 
 > >  > just to list my observations about global epochs:
 > >  >   * it's a problem to implement synchronous operations
 > >  >   * network overhead even with local-only changes depending on workload
 > >  >   * disk overhead even with local-only changes
 > >  >   * SC is a single point of failure with any topology as it's the only place to
 > >  >     find final minimum
 > >  >   * tree reduction isn't obvious thing because client can't report its minimum
 > >  >     to any node, instead tree is rather static thing and any change should be
 > >  >     done very carefully. otherwise it's very easy to lose minimum
 > > 
 > > Unfortunately, as far as I know, no other solution was described with a
 > > level of detail sufficient to compare. :-)
 > 
 > I could say the same about tree reduction, for example ;)

Tree reduction is but an optimization. I am pretty convinced that core
algorithm works, because this can be proved.

 > 
 > dependency-based recovery was discussed with many details I think.

Let's see...

>   * when client issues transaction it labels it with unique id
>   * server executing operation write atomically undo record with:
>     * VBR versions so that we can build chains of really depended operations
>     * unique transaction id generated by client
>     * number of servers involved in transaction
>   * periodically servers exchange their committed unique transaction ids
>     (only distributed transaction are involved in this)
>   * once some distributed transaction is committed on all involved servers, we can prune
>     it and all its local successors

Either I am misunderstanding this, or this is not correct, because not
only a given operation, but also all operations it depends on have to be
committed, and it is not clear how this is determined.

One reason I wrote so lengthy a text was that I want to spell out
everything explicitly and unambiguously (and obviously failed in the
latter, as ensued discussion has shown).

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 11:06                   ` Nikita Danilov
@ 2008-12-23 11:31                     ` Alex Zhuravlev
  2008-12-23 12:50                       ` Nikita Danilov
  0 siblings, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-23 11:31 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
> We are talking about few megabytes of data in network or in memory. It's
> easy to replicate this state.

I disagree - whole state can be distributed over 100K and more nodes and
some operations many need all nodes to communicate their state. this is
especially problem with lossy network.

> Again, global epochs do not depend on DLM to propagate epochs. E.g.,
> lockless IO can be implemented without any additional rpcs.

sorry, I said nothing about DLM. I said "additional RPC", which is required
in some cases. ping, for example, can issue RPC once per 60s. more over,
ping also can use tree or some different topology making epoch refresh more
complex.

> Tree reduction is but an optimization. I am pretty convinced that core
> algorithm works, because this can be proved.

sorry, works doesn't always mean "meet requirements". in our case scalability
is the top one. in this regard I don't see how this model can work well with
synchronous operations. at same time it was stated that we have to support
such operations well, e.g. for nfs exports. I also tried to point out onto
few overheads in the algorithm.

>>   * once some distributed transaction is committed on all involved servers, we can prune
>>     it and all its local successors
> 
> Either I am misunderstanding this, or this is not correct, because not
> only a given operation, but also all operations it depends on have to be
> committed, and it is not clear how this is determined.

the algorithm works starting from oldest operations and discards them when there is no
undo before this one.

> One reason I wrote so lengthy a text was that I want to spell out
> everything explicitly and unambiguously (and obviously failed in the
> latter, as ensued discussion has shown).

yes, it's well written and proven thing. the point is different - if it's clear that
in some cases it doesn't work well (see sync requirement), what the proof does?

thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 11:31                     ` Alex Zhuravlev
@ 2008-12-23 12:50                       ` Nikita Danilov
  2008-12-23 13:11                         ` Alex Zhuravlev
  2008-12-24 10:32                         ` Alex Zhuravlev
  0 siblings, 2 replies; 25+ messages in thread
From: Nikita Danilov @ 2008-12-23 12:50 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Nikita Danilov wrote:
 > > We are talking about few megabytes of data in network or in memory. It's
 > > easy to replicate this state.
 > 
 > I disagree - whole state can be distributed over 100K and more nodes and
 > some operations many need all nodes to communicate their state. this is
 > especially problem with lossy network.

The question was about SC being the single point of failure. This can be
eliminated by replicating stability messages to a few nodes.

 > 
 > > Tree reduction is but an optimization. I am pretty convinced that core
 > > algorithm works, because this can be proved.
 > 
 > sorry, works doesn't always mean "meet requirements". in our case scalability
 > is the top one. in this regard I don't see how this model can work well with

But "works" always means at least "meet requirements". There is no such
thing as efficient (or scalable), but incorrect program. Ordinary Lustre
recovery was implemented years ago and it is still has problems. I bet
it looked very easy in the beginning, so it was tempting to optimize it.

 > >>   * once some distributed transaction is committed on all involved servers, we can prune
 > >>     it and all its local successors
 > > 
 > > Either I am misunderstanding this, or this is not correct, because not
 > > only a given operation, but also all operations it depends on have to be
 > > committed, and it is not clear how this is determined.
 > 
 > the algorithm works starting from oldest operations and discards them when there is no
 > undo before this one.

So let's suppose we have four servers and three operations:

     S0   S1   S2   S3
OP0  U1   U2
OP1       U3   U4
OP2            U5   U6

Where `U?' means that a given operation sent an update to a given
server, and all updates happen to be conflicting.

Suppose that transaction groups with these updates commit at the same
time and servers are ready to send information to each other. What
information each server sends and where?

 > 
 > > One reason I wrote so lengthy a text was that I want to spell out
 > > everything explicitly and unambiguously (and obviously failed in the
 > > latter, as ensued discussion has shown).
 > 
 > yes, it's well written and proven thing. the point is different - if it's clear that
 > in some cases it doesn't work well (see sync requirement), what the proof does?

It assures you that it _works_. Maybe sub-optimally, but it does. The
program that is lighting fast, consumes zero memory and scales across
the galaxy is useless if it is incorrect.

 > 
 > thanks, Alex

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 12:50                       ` Nikita Danilov
@ 2008-12-23 13:11                         ` Alex Zhuravlev
  2008-12-23 13:24                           ` Nikita Danilov
  2008-12-24 10:32                         ` Alex Zhuravlev
  1 sibling, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-23 13:11 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
> The question was about SC being the single point of failure. This can be
> eliminated by replicating stability messages to a few nodes.

more complexity to workaround initial problem?

> But "works" always means at least "meet requirements". There is no such
> thing as efficient (or scalable), but incorrect program. Ordinary Lustre
> recovery was implemented years ago and it is still has problems. I bet
> it looked very easy in the beginning, so it was tempting to optimize it.

then we can just proceed with synchronous IO if scalability isn't a requirement.
and BKL is much better because of simplicity.

> So let's suppose we have four servers and three operations:
> 
>      S0   S1   S2   S3
> OP0  U1   U2
> OP1       U3   U4
> OP2            U5   U6
> 
> Where `U?' means that a given operation sent an update to a given
> server, and all updates happen to be conflicting.
> 
> Suppose that transaction groups with these updates commit at the same
> time and servers are ready to send information to each other. What
> information each server sends and where?

I'll prepare a detailed description in a separate mail.

>  > yes, it's well written and proven thing. the point is different - if it's clear that
>  > in some cases it doesn't work well (see sync requirement), what the proof does?
> 
> It assures you that it _works_. Maybe sub-optimally, but it does. The
> program that is lighting fast, consumes zero memory and scales across
> the galaxy is useless if it is incorrect.

interesting point. sounds like it's absolutely impossible to prove (somehow)
another approach. having something "proved" doesn't mean we shouldn't try
another approach to avoid sub-optimal but important cases?


thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 13:11                         ` Alex Zhuravlev
@ 2008-12-23 13:24                           ` Nikita Danilov
  0 siblings, 0 replies; 25+ messages in thread
From: Nikita Danilov @ 2008-12-23 13:24 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Nikita Danilov wrote:
 > > The question was about SC being the single point of failure. This can be
 > > eliminated by replicating stability messages to a few nodes.
 > 
 > more complexity to workaround initial problem?

More optional optimizations that are easy to implement later should they
prove necessary.

 > 
 > > But "works" always means at least "meet requirements". There is no such
 > > thing as efficient (or scalable), but incorrect program. Ordinary Lustre
 > > recovery was implemented years ago and it is still has problems. I bet
 > > it looked very easy in the beginning, so it was tempting to optimize it.
 > 
 > then we can just proceed with synchronous IO if scalability isn't a requirement.
 > and BKL is much better because of simplicity.

Precisely. If Linus decided to do an initial Linux SMP implementation
based on a fine grained locking the Linux kernel would have been
as... some other Free Beautifully Scalable kernel with a Daemon (slow,
un-scalable, and buggy). :-)

 > > Suppose that transaction groups with these updates commit at the same
 > > time and servers are ready to send information to each other. What
 > > information each server sends and where?
 > 
 > I'll prepare a detailed description in a separate mail.

Thanks.

 > 
 > >  > yes, it's well written and proven thing. the point is different - if it's clear that
 > >  > in some cases it doesn't work well (see sync requirement), what the proof does?
 > > 
 > > It assures you that it _works_. Maybe sub-optimally, but it does. The
 > > program that is lighting fast, consumes zero memory and scales across
 > > the galaxy is useless if it is incorrect.
 > 
 > interesting point. sounds like it's absolutely impossible to prove (somehow)
 > another approach. having something "proved" doesn't mean we shouldn't try
 > another approach to avoid sub-optimal but important cases?

We definitely should try, but I think much much more formal and rigorous
treatment than we are accustomed to is necessary for such fundamental
thing as recovery.

 > 
 > 
 > thanks, Alex

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-22 18:57           ` Nikita Danilov
  2008-12-23  6:44             ` Alex Zhuravlev
@ 2008-12-23 23:37             ` Andreas Dilger
  2008-12-24 12:35               ` Eric Barton
  2008-12-24 16:16               ` Nikita Danilov
  1 sibling, 2 replies; 25+ messages in thread
From: Andreas Dilger @ 2008-12-23 23:37 UTC (permalink / raw)
  To: lustre-devel

Nikita,
I still need more time to re-read and digest what you have written,
but thanks in advance for taking the time to explain it clearly and
precisely.  This algorithm does seem to be related to the one originally
described in Peter's "Cluster Metadata Recovery" paper where the epoch
numbers are pushed and replied by every request, but is much better
described.


I think what would help me understand it a bit easier if it could be more
closely mapped onto a potential implementation, and the issues we may see
there.  For example, the issue with fsync possibly involving all? nodes
(including clients) is not obvious from your description.

Similarly, some description of the practical requirements for message
exchange, how easy/hard it would be to e.g. "find all undo records
related to...", and the practical bound of the number of operations that
might have to be kept in memory and/or rolled back/forward during
recovery would be useful.

In particular, the mention that clients need to participate to determine
the oldest uncommitted operation seems troublesome unless the servers
themselves can place a bound on this by the frequency of their commits.


On Dec 22, 2008  21:57 +0300, Nikita Danilov wrote:
> Any message is used as a transport for epochs, including any reply
> from a server. So a typical scenario would be
> 
> 
> client                server
>    epoch = 8            epoch = 9
> 
>    LOCK --------------->   
>         <-------------- REPLY
>    epoch = 9
>                         <----- other message with epoch = 10 from somewhere
>                         epoch = 10
>    ....
> 
>    REINT --------------->
>          <-------------- REPLY
>    epoch = 10
> 
>                         <----- other message with epoch = 11 from somewhere
>                         epoch = 11
> 
>    REINT --------------->
>          <-------------- REPLY
>    epoch = 11
> 
> etc. Note, that nothing prevents server from increasing its local epoch
> before replying to every reintegration (this was mentioned in the
> original document as an "extreme case"). With this policy there is never
> more than one reintegration on a given client in a given epoch, and we
> can indeed implement stability algorithm without clients.

I was wondering if we could make some analogies between the current
transno-based recovery system and your current proposal.  For example,
in our current recovery we increment the transno on the server before
the reply for every reintegration, and due to single-RPC-in-flight to
the client it could be considered in a separate "epoch" for every RPC
to match your "extreme case" above.

Similarly, I wonder if we could somehow map client (lack of) involvement
in epochs to our current configuration, and only require "client"
participation in the case of WBC or CMD?


One thing that crossed my mind at this point is that the 1.8 servers already
track recovery "epochs" for VBR using the transno (epoch is in high 32-bit
word of transno, counter is in low 32-bit word).  These "recovery epochs"
are not (currently) synchronized between servers, but that would seem to be
possible/needed in the future.

Alternately, we might consider the VBR recovery "epochs" to be the same
as the epochs you are proposing, and transno increment does not affect
these epochs except to order operations within the epoch.  We would
increment these epochs periodically (either due to too many operations,
or time limit).

The current VBR epochs only make up 32 bits of the transno, but we might
consider increasing the size of this epoch field to allow more epochs.
If we need to do that it should preferrably be done ASAP before the 1.8.0
release is made (this would be a trivial change at this stage).


Cheers, Andreas
--
Andreas Dilger
Sr. Staff Engineer, Lustre Group
Sun Microsystems of Canada, Inc.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 12:50                       ` Nikita Danilov
  2008-12-23 13:11                         ` Alex Zhuravlev
@ 2008-12-24 10:32                         ` Alex Zhuravlev
  2008-12-24 11:37                           ` Nikita Danilov
  1 sibling, 1 reply; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-24 10:32 UTC (permalink / raw)
  To: lustre-devel

Hello,

Nikita Danilov wrote:
> So let's suppose we have four servers and three operations:
> 
>      S0   S1   S2   S3
> OP0  U1   U2
> OP1       U3   U4
> OP2            U5   U6
> 
> Where `U?' means that a given operation sent an update to a given
> server, and all updates happen to be conflicting.
> 
> Suppose that transaction groups with these updates commit at the same
> time and servers are ready to send information to each other. What
> information each server sends and where?

instead of digging right into details, let's agree about few simple statements
the idea is based on ?


(0) operation is globally committed if no operation it depends on can be aborted

(1) some external mechanism order operations and updates (e.g. LDLM, local locking, etc)

(2) if update U1 executed before update U2 and U2 is committed, then U1 must be committed

(3) requirement: if operation O2 depends on operation O1, then O1 has conflicting
     update on same server with O2

     example 1: mkdir /a; touch /a/b
      mkdir consists of two updates: U1 - create object on mds1, U2 - creates dir
      entry on mds2. touch consists of single update: U3 - to create object on mds1
      and directory entry in a on mds1. U1 and U3 will be conflicting as they touch
      same object

(4) operation is globally committed if all updates this operation consists of are
     committed and everything it depends on is committed as well

     explanation: say, operation O consists of two updates U1 (server S1) and U2
     (server S2). let's say U1 depends on Ua on server S1 and U2 depends on Ub on
     server S2. we stated that any update O can depend on are already executed due
     to (1). thus Ua is already executed and Ub is already executed as well. due to
     (2) commit of U1 means commit of Ua and commit of U2 means commit of Ub.

     thus direct dependency is resolved.

     if there is any indirect dependency, it's resolved same way due to (4)


In the example above, commit of U5 means commit of U4, same for U3 and U2. IOW,
when U3 and U4 are committed, then we can consider OP1 is globally committed
(won't be aborted).

any objections?


thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-24 10:32                         ` Alex Zhuravlev
@ 2008-12-24 11:37                           ` Nikita Danilov
  2008-12-26  9:01                             ` Alex Zhuravlev
  0 siblings, 1 reply; 25+ messages in thread
From: Nikita Danilov @ 2008-12-24 11:37 UTC (permalink / raw)
  To: lustre-devel

Alex Zhuravlev writes:
 > Hello,
 > 
 > Nikita Danilov wrote:
 > > So let's suppose we have four servers and three operations:
 > > 
 > >      S0   S1   S2   S3
 > > OP0  U1   U2
 > > OP1       U3   U4
 > > OP2            U5   U6
 > > 
 > > Where `U?' means that a given operation sent an update to a given
 > > server, and all updates happen to be conflicting.
 > > 
 > > Suppose that transaction groups with these updates commit at the same
 > > time and servers are ready to send information to each other. What
 > > information each server sends and where?
 > 
 > instead of digging right into details, let's agree about few simple statements
 > the idea is based on ?
 > 
 > 
 > (0) operation is globally committed if no operation it depends on can be aborted

... and all updates of the operation itself are committed on the
respective servers.

 > 
 > (1) some external mechanism order operations and updates (e.g. LDLM, local locking, etc)

Agree.

 > 
 > (2) if update U1 executed before update U2 and U2 is committed, then U1 must be committed

I think this is only valid when U1 and U2 are on the same server. And
even in this case this is probably required only when U1 and U2 are
conflicting.

 > 
 > (3) requirement: if operation O2 depends on operation O1, then O1 has conflicting
 >      update on same server with O2

Agree, provided that `depends' means `directly depends', i.e., not
through some intermediate operation.

 > 
 >      example 1: mkdir /a; touch /a/b
 >       mkdir consists of two updates: U1 - create object on mds1, U2 - creates dir
 >       entry on mds2. touch consists of single update: U3 - to create object on mds1
 >       and directory entry in a on mds1. U1 and U3 will be conflicting as they touch
 >       same object
 > 
 > (4) operation is globally committed if all updates this operation consists of are
 >      committed and everything it depends on is committed as well

I think this is wrong. Everything it depends on must be _globally_
(recursively) committed as well. Otherwise in the following scenario

        mkdir /a
        mkdir /a/b
        touch /a/b/f

file creation depends on mkdir /a/b only, but touch is not globally
committed when all updates of mkdir /a/b are committed, because mkdir /a
might be still rolled back.

As a note, I tried very hard to avoid confusion by using different
terms: operations (a distributed state update) vs. transaction (a group
of updates on a given server that reaches persistent storage
atomically), and `stabilizes' vs. `commits' respectively.

 > 
 >      explanation: say, operation O consists of two updates U1 (server S1) and U2
 >      (server S2). let's say U1 depends on Ua on server S1 and U2 depends on Ub on
 >      server S2. we stated that any update O can depend on are already executed due
 >      to (1). thus Ua is already executed and Ub is already executed as well. due to
 >      (2) commit of U1 means commit of Ua and commit of U2 means commit of Ub.
 > 
 >      thus direct dependency is resolved.
 > 
 >      if there is any indirect dependency, it's resolved same way due to (4)
 > 
 > 
 > In the example above, commit of U5 means commit of U4, same for U3 and U2. IOW,
 > when U3 and U4 are committed, then we can consider OP1 is globally committed
 > (won't be aborted).

Err.. what if U3 and U4 are committed on S1 and S2, but S0 hasn't
received U1 at all (e.g., U1 is an inode creation, that was executed
without a lock and client failed), or U1 was executed, but not committed
and S0 failed? It seems that OP0 will have to be rolled back, and hence
OP1 and OP2 cannot be considered globally committed^W^Weverywhere
stable?

 > 
 > any objections?

I was more interested in how batching is implemented and, specifically,
at what moment server can actually remove at entry from an undo log
(i.e., before or after it sends a batch, etc.), because it looks to me
that server agreement on what operations are everywhere stable requires,
in a general case, a two phase commit, or some other atomic commitment
protocol.

 > 
 > 
 > thanks, Alex

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 23:37             ` Andreas Dilger
@ 2008-12-24 12:35               ` Eric Barton
  2008-12-24 16:16               ` Nikita Danilov
  1 sibling, 0 replies; 25+ messages in thread
From: Eric Barton @ 2008-12-24 12:35 UTC (permalink / raw)
  To: lustre-devel

Andreas, Nikita, Alex,

We will go through this in detail at the tech leads meeting in Beijing.

I think I am beginning to understand Nikita's proposal and I think it helps
to adopt his use of "operation" (rename, mkdir etc) and "update" (the part
of an operation executed on a single server).

I believe it would be especially useful if we could finish working through
the previous proposal too - then we would start to understand the
similarities and differences and that in turn would allow us to make
better critical judgments overall - e.g what is the volume and pattern
of additional message passing required for distributed operations, what
are the expected sizes of undo/redo logs, how does aggregation designed
to mitigate these issues affect latency etc.

A major concern I have with whatever scheme we finally adopt, is how to
ensure the performance of synchronous metadata operations (as required by
NFS) isn't completely hosed.  With CMD, you can only be sure an operation
is stored stably when it can no longer be undone - i.e. when it and all
operations it is transitively dependent on have been committed globally.
Making this fast seems to be in direct opposition to scaling throughput,
so understanding the tradeoff precisely seems essential.

    Cheers,
              Eric

> -----Original Message-----
> From: Andreas.Dilger at Sun.COM [mailto:Andreas.Dilger at Sun.COM] On Behalf Of Andreas Dilger
> Sent: 23 December 2008 11:38 PM
> To: Nikita Danilov
> Cc: Alex Zhuravlev; lustre-tech-leads at sun.com; lustre-devel at lists.lustre.org
> Subject: Re: global epochs [an alternative proposal, long and dry].
> 
> Nikita,
> I still need more time to re-read and digest what you have written,
> but thanks in advance for taking the time to explain it clearly and
> precisely.  This algorithm does seem to be related to the one originally
> described in Peter's "Cluster Metadata Recovery" paper where the epoch
> numbers are pushed and replied by every request, but is much better
> described.
> 
> 
> I think what would help me understand it a bit easier if it could be more
> closely mapped onto a potential implementation, and the issues we may see
> there.  For example, the issue with fsync possibly involving all? nodes
> (including clients) is not obvious from your description.
> 
> Similarly, some description of the practical requirements for message
> exchange, how easy/hard it would be to e.g. "find all undo records
> related to...", and the practical bound of the number of operations that
> might have to be kept in memory and/or rolled back/forward during
> recovery would be useful.
> 
> In particular, the mention that clients need to participate to determine
> the oldest uncommitted operation seems troublesome unless the servers
> themselves can place a bound on this by the frequency of their commits.
> 
> 
> On Dec 22, 2008  21:57 +0300, Nikita Danilov wrote:
> > Any message is used as a transport for epochs, including any reply
> > from a server. So a typical scenario would be
> >
> >
> > client                server
> >    epoch = 8            epoch = 9
> >
> >    LOCK --------------->
> >         <-------------- REPLY
> >    epoch = 9
> >                         <----- other message with epoch = 10 from somewhere
> >                         epoch = 10
> >    ....
> >
> >    REINT --------------->
> >          <-------------- REPLY
> >    epoch = 10
> >
> >                         <----- other message with epoch = 11 from somewhere
> >                         epoch = 11
> >
> >    REINT --------------->
> >          <-------------- REPLY
> >    epoch = 11
> >
> > etc. Note, that nothing prevents server from increasing its local epoch
> > before replying to every reintegration (this was mentioned in the
> > original document as an "extreme case"). With this policy there is never
> > more than one reintegration on a given client in a given epoch, and we
> > can indeed implement stability algorithm without clients.
> 
> I was wondering if we could make some analogies between the current
> transno-based recovery system and your current proposal.  For example,
> in our current recovery we increment the transno on the server before
> the reply for every reintegration, and due to single-RPC-in-flight to
> the client it could be considered in a separate "epoch" for every RPC
> to match your "extreme case" above.
> 
> Similarly, I wonder if we could somehow map client (lack of) involvement
> in epochs to our current configuration, and only require "client"
> participation in the case of WBC or CMD?
> 
> 
> One thing that crossed my mind at this point is that the 1.8 servers already
> track recovery "epochs" for VBR using the transno (epoch is in high 32-bit
> word of transno, counter is in low 32-bit word).  These "recovery epochs"
> are not (currently) synchronized between servers, but that would seem to be
> possible/needed in the future.
> 
> Alternately, we might consider the VBR recovery "epochs" to be the same
> as the epochs you are proposing, and transno increment does not affect
> these epochs except to order operations within the epoch.  We would
> increment these epochs periodically (either due to too many operations,
> or time limit).
> 
> The current VBR epochs only make up 32 bits of the transno, but we might
> consider increasing the size of this epoch field to allow more epochs.
> If we need to do that it should preferrably be done ASAP before the 1.8.0
> release is made (this would be a trivial change at this stage).
> 
> 
> Cheers, Andreas
> --
> Andreas Dilger
> Sr. Staff Engineer, Lustre Group
> Sun Microsystems of Canada, Inc.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-23 23:37             ` Andreas Dilger
  2008-12-24 12:35               ` Eric Barton
@ 2008-12-24 16:16               ` Nikita Danilov
  1 sibling, 0 replies; 25+ messages in thread
From: Nikita Danilov @ 2008-12-24 16:16 UTC (permalink / raw)
  To: lustre-devel

Andreas Dilger writes:
 > Nikita,

Hello,

 > I still need more time to re-read and digest what you have written,
 > but thanks in advance for taking the time to explain it clearly and
 > precisely.  This algorithm does seem to be related to the one originally
 > described in Peter's "Cluster Metadata Recovery" paper where the epoch
 > numbers are pushed and replied by every request, but is much better
 > described.

thanks.

 > I think what would help me understand it a bit easier if it could be more
 > closely mapped onto a potential implementation, and the issues we may see
 > there.  For example, the issue with fsync possibly involving all? nodes
 > (including clients) is not obvious from your description.

I agree with Eric that we can discuss this in more detail in Beijing,
and just want to make one rather obvious remark: your and Alex's
concerns about fsync are indeed justified, because in the global epochs
model fsync is no different from sync, as no per-object dependencies are
tracked. On the other hand, for the target use case of pNFS, where every
operation is synchronous, this is probably less important.

Nikita.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs [an alternative proposal, long and dry].
  2008-12-24 11:37                           ` Nikita Danilov
@ 2008-12-26  9:01                             ` Alex Zhuravlev
  0 siblings, 0 replies; 25+ messages in thread
From: Alex Zhuravlev @ 2008-12-26  9:01 UTC (permalink / raw)
  To: lustre-devel

Nikita Danilov wrote:
> ... and all updates of the operation itself are committed on the
> respective servers.

yes

>  > (2) if update U1 executed before update U2 and U2 is committed, then U1 must be committed
> 
> I think this is only valid when U1 and U2 are on the same server. And
> even in this case this is probably required only when U1 and U2 are
> conflicting.

agree about same server. i think this is model used by ext3 and DMU.

>  > (3) requirement: if operation O2 depends on operation O1, then O1 has conflicting
>  >      update on same server with O2
> 
> Agree, provided that `depends' means `directly depends', i.e., not
> through some intermediate operation.

yes

>  > (4) operation is globally committed if all updates this operation consists of are
>  >      committed and everything it depends on is committed as well
> 
> I think this is wrong. Everything it depends on must be _globally_
> (recursively) committed as well. Otherwise in the following scenario

OK
additional clarification: "all updates this operation consists of are globally committed"

> As a note, I tried very hard to avoid confusion by using different
> terms: operations (a distributed state update) vs. transaction (a group
> of updates on a given server that reaches persistent storage
> atomically), and `stabilizes' vs. `commits' respectively.
> 

I like the terms.

> Err.. what if U3 and U4 are committed on S1 and S2, but S0 hasn't
> received U1 at all (e.g., U1 is an inode creation, that was executed
> without a lock and client failed), or U1 was executed, but not committed
> and S0 failed? It seems that OP0 will have to be rolled back, and hence
> OP1 and OP2 cannot be considered globally committed^W^Weverywhere
> stable?

with fixed definition i think it's correct.

> I was more interested in how batching is implemented and, specifically,
> at what moment server can actually remove at entry from an undo log
> (i.e., before or after it sends a batch, etc.), because it looks to me
> that server agreement on what operations are everywhere stable requires,
> in a general case, a two phase commit, or some other atomic commitment
> protocol.

then a bit more words.

I think the following statement is still true: when any operation is being
executed (updates are being executed on target servers), all updates it
depends on are already executed. let's fix server's state at time our updates
begin to execute: S1 is a state on server 1, S2 is a state on server 2,,,,
Sn is a state on server N. due to (2) once all states S1..Sn are committed,
all dependency our updates might have are resolved and they can't be aborted
due to abort of some previous operation.

in practice this mean that having series of updates on some server:
U1, U2, U3, U4,,,,, Un, Un+1 we can choose some N, ask all servers for their
last generated transno (not last committed transno) and assign set of transno
to point N. once all servers have reported corresponded transno committed,
we know that all dependency updates U1..Un might have are resolved and U1..Un
can't be aborted.  (5)

of course, this is true only for operations with number of updates = 1 (iirc,
we call them local operations in contrast with distributed where number of
updates > 1). for distributed operations we also need to make sure all updates
are committed.

when some server commits update and corresponded operation has 2 or more updates,
then server reports this to other servers involved in the operation. in practice,
server doesn't report immediately, instead it put transaction id into some batch
(batches) which will be fired later.  (6)

now back to series updates on server: U1, U2, U3, U4,,,, Un, Un+1. in general,
each update has own undo record in the log. record for any local update at the
beginning of the series can be cancelled once corresponded update is locally
committed. record for any distributed operation's update can be removed from the
series so that it doesn't hold remaining records, but not cancelled.

In order to cancel undo record for a distributed operation we need to make sure
that during recovery none of undo record of this operation can be used, otherwise
recovery can be confused finding record on one server, but not on another one.

this can be done with llog-like protocol: for any distributed operation, server
with minimal id cancel own undo record and generates another record M marking
operation globally committed. then server notifies other servers involved in the
operation, their cancel own undo records, once cancels are committed, record M
can be cancelled.  (7)


now let's consider that example:

      S0   S1   S2   S3
OP0  U1   U2
OP1       U3   U4
OP2            U5   U6

le's redraw it a bit ....

undo series of S0:  U1
undo series of S1:  U2  U3
undo series of S2:      U4  U5
undo series of S3:          U6


S0 reports committness of U1 in transno T01 (OP1) to S1, now S1 knows U2 depends on S0/T01
S1 reports committness of U2 in transno T11 (OP1) to S0, now S0 knows U1 depends on S1/T11
S1 reports committness of U3 in transno T12 (OP2) to S2, now S2 knows U4 depends on S1/T12
S2 reports committness of U4 in transno T21 (OP2) to S1, now S1 knows U3 depends on S2/T21
S2 reports committness of U5 in transno T22 (OP3) to S3, now S3 knows U6 depends on S2/T22
S3 reports committness of U6 in transno T31 (OP3) to S2, now S2 knows U5 depends on S3/T31

now each server knows direct dependency. then all them have to resolve global dependency:
S0 requests current state from S1,S2,S3 - they return last generated transno
S1 requests current state from S0,S2,S3 --//--
S2 requests current state from S0,S1,S3 --//--
S3 requests current state from S0,S1,S2 --//--

at some point all servers report collected transno committed. given all updates belong to
distributed transactions, servers can remove them from series so that they don't hold
dependency for anyone, but not cancel. as noted in (7) we can use llog-like protocol to
cancel undo records for distributed operations. as they don't block any operation we can
postpone cancel for very long to improve bandwidth usage.


I think this *oversimplified* approach demonstrates that we can do "stabilization" with
anywhere-generated-id operations.

messages reporting committness can be batched. we can even use bi-directional protocol
when S0 reporting committness of U1 to S1 gets a reply claiming committness of U2 back.

any message can carry "last generated transno" along with "last committed", making
"request current state" not needed.

One of important advantages such approach has is ability to implement fsync(2)
more optimal way, without involving whole cluster.

The simplest optimization could be to omit requests for other server's state
(see (5)) and undo records, for all local operations if there is undo log is
empty. so, as long as server doesn't execute global operations all local
operations are executed with zero "epoch overhead", like today.

More advanced approach could include propagation of involved servers when they
exchange committness of distributed operations (see (6)). Say, if server S1
has no other distributed operations (thus doesn't depend on other servers), then
reporting commit of update U1 (part of operation O1) to server S2 it tells
dependency of itself on S1,S2. when S2 reports committness of some other operation
O2 to server S3, it tells dependency on S1,S2. now, when S3 resolves global
dependency (see (5)), it doesn't requests state from all the servers, but only
from S1 and S2. We can go further even and include last generated transno along
with server into report. Then other servers don't need to request states even,
just wait till servers have those transno committed.

even more advanced approach could be to track precise dependency for any operation.
this is not very useful for ldiskfs as fsync(2) flushes all pending updates, but
with DMU we could use zlog and flush only really required bits.


thanks, Alex

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Lustre-devel] global epochs vs fsync
  2008-12-22  7:53 [Lustre-devel] global epochs [an alternative proposal, long and dry] Nikita Danilov
  2008-12-22 11:52 ` Alex Zhuravlev
@ 2009-01-15 23:40 ` Alex Zhuravlev
  1 sibling, 0 replies; 25+ messages in thread
From: Alex Zhuravlev @ 2009-01-15 23:40 UTC (permalink / raw)
  To: lustre-devel

Hello,

here is another thought ...

whole distributed recovery can be divided into two parts:
* "purge" job, before recovery takes place, we write and purge undo records
* "undo" job, when recovery takes place and we rollback to some consistent state

global epochs do very well "purge job", because of constant overhead.
but when it comes to fsync, global epochs do not because to fsync some epoch
X, we need to wait till all nodes having unstable epochs Y < X report it
to SC and then we have to synchronously write new global stable epoch.
it's especially not very well from reliability point of view - having many
nodes to contribute makes this vulnerable for failures.

with dependencies we could implement more efficient fsync because all you need
is to sync _servers_ holding uncommitted updates for some object and updates it
depends on, we don't need to wait for other nodes, then write some record. IOW,
dependencies do well "undo job", because it's not a global undo of all unlucky
operations, but only really inconsistent ones. but dependencies do bad with "purge
job", because traffic overhead (in bytes) is order of distributed updates.

the question is .... could we use global epochs for "purge" and dependencies for
"undo" ? say, updates are tagged with an epoch and unique tag. for regular activity
global minimum is found in lazy manner. but during recovery we build chains of
dependencies using unique tags and do very selective rollback.

thanks, Alex


Nikita Danilov wrote:
> Hello,
> 
> a few proposals for a distributed recovery for the upcoming CMD release
> of Lustre were discussed recently. In my opinion, many of them
> (including the clients-epoch approach that I advocated) are very
> complex, and a simpler solution, that can be fully understood is
> needed. The following is an attempt to provide such a solution.
> 
> Nikita.
> 
> 								 * * *
> 
> This is a strawman proposal. At least it would help us to settle the
> terminology.
> 
> The following describes an alternative distributed recovery mechanism. As this
> proposal is somewhat radically alternative, exposition is rather abstract,
> please bear with it.
> 
> The summary is that the original `global epochs' [10] proposal is modified to
> involve all cluster nodes, including clients. This seeks to fix what is seen
> as a major drawback of the said algorithm: its reliance on a master-slave
> processing.
> 
> Definitions:
> ============
> 
> A _cluster_ consists of _nodes_. Every node has a volatile storage. Some nodes
> have persistent storage. Persistent means `surviving any failure considered in
> the model'.
> 
> Nodes exchange _messages_. A message X with a parameter Y, sent from a node N
> to a node M is denoted as
> 
>         X(Y) : N -> M
> 
> Synchronous message send is denoted as
> 
>         X(Y) : N ->> M
> 
> It is, in reality, a sequence
> 
>         X(Y) : N -> M
>         RepX : M -> N
> 
> of a send and a reply.
> 
> Nodes _join_ the cluster, and _part_ from the cluster. A node can be forcibly
> parted from the cluster---_evicted_.
> 
> An _operation_ is a `primitive' distributed modification of state, that moves
> distributed state from one consistent state to another consistent
> state. `Primitive' because without such a qualification a valid sequence of
> operations would be an operation itself.
> 
> An operation OP consists of _updates_ of a state of every node involved in
> this operation: OP = (U(0), ... U(n)), where U(k) is an update for a node
> U(k).node.
> 
> A _reintegration_ of an operation is a process by which a node (by sending
> messages) requests other nodes to _execute_ updates of a given operation,
> i.e., to effect corresponding state change in the node storage (volatile or
> persistent). Details of reintegration are described below.
> 
> A node with a persistent storage supports _transactions_, which are means to
> declare that a sequence of updates, executed in a volatile storage, must reach
> persistent storage atomically.
> 
> Two updates are _conflicting_ if their results (including success or failure
> indication) and the final state are depending on the order of their
> execution.
> 
> For a given update U, a node N can send a message to U.node, requesting a
> _lock_ that will delay requests for locks for conflicting updates requested
> from other nodes until the lock is either released by another message or when
> N leaves the cluster. (In reality locks are taken on objects, but introducing
> them would complicate the exposition.)
> 
> Epoch Basics:
> =============
> 
> The core epochs algorithm is very simple.
> 
> Every node N keeps in its volatile storage an _epoch number_, denoted
> N.epoch. Every message X is tagged with an epoch number that is denoted as
> X.epoch. These epoch numbers are maintained according to the following
> protocol:
> 
>         E1. On receiving X : M -> N, N sets
> 
>                 N.epoch = max(N.epoch, X.epoch);
> 
>         E2. On sending X : N -> M, N sets
> 
>                 X.epoch = N.epoch;
> 
> Assignments in E1 and E2 must be mutually atomic. Compare this with `Lamport
> timestamps' [1] and `vector clocks' [2].
> 
> Progressing toward new epochs will be described later, for now assume that
> there are multiple epoch numbers at the same time stored in the node memories
> and traversing the network in messages.
> 
> Operations:
> ===========
> 
>         O1. To reintegrate an operation OP = (U(0), ... U(n)), a node N
> 
>                 - sends lock requests: LOCK(U(k)) : N ->> U(k).node;
> 
>                 - sends reintegration messages: REINT(U(k)) : N -> U(k).node
>                   atomically w.r.t. E1.
> 
>                 - adds U to the volatile `redo log'.
> 
> O1 doesn't require all LOCK messages to be synchronous and serialized: it's
> only necessary that replies to all LOCK messages are received before first
> REINT message is sent.
> 
> We denote REINT(U).epoch as U.epoch (well-defined), and say that update U `is
> in the epoch U.epoch', and that corresponding undo record (see O2) is a record
> `in epoch U.epoch'.
> 
>         O2. On receiving REINT(U) : M -> N (where N == U.node), 
>             node N transactionally
> 
>                 - executes U in the volatile storage, and
> 
>                 - adds to the `undo log' a record [U, OP]
> 
>             Note that U.epoch can be less than N.epoch at the time of
>             execution (it cannot be greater than the latter due to E1).
> 
> We consider only single-level reintegration, where execution of an update
> requires no further reintegrations. Generalization to the multi-level case is
> left as an exercise for a curious reader.
> 
> Correctness:
> ============
> 
> We can now prove a number of very simple statements:
> 
> S0: For a node N, N.epoch increases monotonically in time.  
> 
>     Proof: The only place where N.epoch is modified is E1, and this is
>     obviously a non-decreasing function.
> 
> S1: A collection of all updates in a given epoch is presicely a collection of
> updates for some set of operations (i.e., epoch contains no partial
> operations).
> 
>     Proof: Obvious from O1: all updates for a given operation are sent in the
>     same epoch.
> 
> S2: For any sequence of conflicting updates (U{0}, ... U{n}), the sequence
> (U{0}.epoch, ..., U{n}.epoch) is monotonically increasing.
> 
>     Proof: Consider conflicting updates U{k} and U{k+1}. From O1 and the
>     definition of locking it is immediately clear that the following sequence
>     of message sends took place:
> 
>             LOCK(U{k})      : N -> S   ; request a lock for U{k}
>             RepLOCK(U{k})   : S -> N   ; get the lock for U{k}
> 
>         (*) REINT(U{k})     : N -> S   ; reintegrate U{k}
> 
>             LOCK(U{k+1})    : M -> S   ; conflicting lock is requested by M
> 
>         (*) UNLOCK(U{k})    : N -> S   ; N yields the lock
> 
>         (*) RepLOCK(U{k+1}) : S -> M   ; M get the lock
> 
>         (*) REINT(U{k+1})   : M -> S   ; reintegrate U{k+1}
> 
>      Only ordering of messages marked with (*) matters, the rest is just for
>      completeness. Then
> 
>      U{k}.epoch == REINT(U{k}).epoch       ; by definition
>                 <= UNLOCK(U{k}).epoch      ; by S0 for N and E2
>                 <= RepLOCK(U{k+1})         ; by S0 for S and E2
>                 <= REINT(U{k+1})           ; by S0 for M
>                 == U{k+1}.epoch            ; by definition
> 
> In the essence, S2 states that epoch ordering is compatible with the causal
> ordering of updates. An important consequence of this is that an epoch cannot
> `depend' on a previous epoch. Note that the proof of S2 is very similar to the
> proof of serializability [7] of the database schedules under the two-phase
> locking (2PL) protocol [3].
> 
>>From S0, S1 and S2 it seems very plausible to conclude that
> 
> S3: For any epoch E, a collection of updates in all epochs up to and including
> E is presicely a collection of updates in some prefix of execution
> history. That is, for every node N, said collection contains updates from all
> operations reintegrated by N before some moment T in N's physical time, and no
> updates from operations reintegrated by N after T. Alternatively, `an epoch
> boundary is a consistent state snapshot'.
> 
> We won't prove S3, as this requires formalizing the notions of global and
> local histories, distributed schedules, etc., which is more formalism than is
> tolerable at the moment.
> 
> Intermezzo:
> ===========
> 
> S3 is the main weapon in achieving correct distributed recovery: it claims
> that restoring the distributed state as of on an epoch boundary results in a
> globally consistent state. The key observation is that due to O2 every node
> with a persistent storage has enough information to individually restore its
> state to the boundary of _any_ epoch, all updates from which it has on its
> persistent storage, even in the face of failures. Once all such nodes agreed
> on a common epoch number, they restore their state independently. It is this
> agreeing on a single number instead of agreeing on a common set of updates
> that greatly simplifies recovery.
> 
> Advancing epochs:
> =================
> 
> So far no way to progress to the next epoch was introduced. If algorithms
> described above were ran as is, there would be only one epoch boundary: an
> initial file system state (as created by mkfs), and it would be the only point
> to which epoch-based recovery could restore the system up to.
> 
> A switch to the next epoch can be initiated by any node N, and is effected by
> 
>         E3. N.epoch++;
> 
> That's all. That is, multiple nodes can advance epochs completely
> independently without any communication whatsoever. To understand why this is
> sound recall the proof of S3: all it relies on is that epochs monotonically
> increase across a chain _dependent_ messages, and to be involved into
> dependent operation nodes communicate (through another node perhaps), and
> their epoch numbers are synchronized by E1 and E2.
> 
> E3 is executed atomically w.r.t. E1 and E2. Note that E3 doesn't break epoch
> monotonicity assumed by S0.
> 
> To speed up announcement of a new epoch, N
> 
>         E4. (optionally) sends null messages to some nodes.
> 
> The more, if any, null messages are sent to other nodes, the faster news about
> new epoch are spread across the cluster. In the extreme case, N broadcasts
> announcement to the whole cluster. Note that there is no synchrony
> requirements for the null messages: it is perfectly valid, for example, that N
> is still sending them when another node already started sending the next round
> of announcements.
> 

^ permalink raw reply	[flat|nested] 25+ messages in thread

end of thread, other threads:[~2009-01-15 23:40 UTC | newest]

Thread overview: 25+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2008-12-22  7:53 [Lustre-devel] global epochs [an alternative proposal, long and dry] Nikita Danilov
2008-12-22 11:52 ` Alex Zhuravlev
2008-12-22 12:45   ` Nikita Danilov
2008-12-22 13:48     ` Alexander Zarochentsev
2008-12-22 14:21       ` Nikita Danilov
2008-12-22 14:45         ` Alex Zhuravlev
2008-12-22 14:44     ` Alex Zhuravlev
2008-12-22 17:15       ` Nikita Danilov
2008-12-22 17:36         ` Alex Zhuravlev
2008-12-22 18:57           ` Nikita Danilov
2008-12-23  6:44             ` Alex Zhuravlev
2008-12-23 10:00               ` Nikita Danilov
2008-12-23 10:21                 ` Alex Zhuravlev
2008-12-23 11:06                   ` Nikita Danilov
2008-12-23 11:31                     ` Alex Zhuravlev
2008-12-23 12:50                       ` Nikita Danilov
2008-12-23 13:11                         ` Alex Zhuravlev
2008-12-23 13:24                           ` Nikita Danilov
2008-12-24 10:32                         ` Alex Zhuravlev
2008-12-24 11:37                           ` Nikita Danilov
2008-12-26  9:01                             ` Alex Zhuravlev
2008-12-23 23:37             ` Andreas Dilger
2008-12-24 12:35               ` Eric Barton
2008-12-24 16:16               ` Nikita Danilov
2009-01-15 23:40 ` [Lustre-devel] global epochs vs fsync Alex Zhuravlev

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.