Skip to content

ДЗ Flat Combine=

xphoenix edited this page Nov 28, 2018 · 7 revisions

Flat Combine


Реализовать flat-combine поверх std::map контейнера как реализацию интерфейса Storage. Посмотреть на изменение производительности по сравнению с остальными реализациями хранилищ из проекта (global_lock, shared_lock, stripped_lock)

Для организации ThreadLocalStorage, доступного другим потокам можно использовать готовые примитивы из pthread - pthread_key_create, pthread_getspecific и pthread_setspecific. Примерно вот так:

 * Create new thread local variable
template <typename T> class ThreadLocal {
    ThreadLocal(T *initial = nullptr, std::function<void(T *)> destructor = nullptr) {
        pthread_key_create(&_th_key, destructor);

    inline T *get() {
        // TODO: implement
        return nullptr;

    inline void set(T *val) {
        // TODO: implement

    T &operator*() { return *get(); }

    pthread_key_t _th_key;

Один из возможных способов реализовать алгоритм написан ниже. Основное преимущество такого похода в том, что весь сложный код к барьерами памяти можно сосредоточить в одном месте:

 * Create new flat combine synchronizaion primitive
 * @template_param OpNode
 * Class for a single pending operation descriptor. Must provides following API:
 * - complete() returns true is operation gets completed and false otherwise
 * - error(const std::exception &ex) set operation as failed. After this call return,
 *   subsequent calls to complete() method must return true
 * @template_param QMS
 * Maximum array size that could be passed to a single Combine function call
template <typename OpNode, std::size_t QMS = 64> class FlatCombiner {
    // User defined type for the pending operations description, must be plain object without
    // virtual functions
    using pending_operation = OpNode;

    // Function that combine multiple operations and apply it onto data structure
    using combiner = std::function<void(OpNode *, OpNode *)>;

    // Maximum number of pernding operations could be passed to a single Combine call
    static const std::size_t max_call_size = QMS;
     * @param Combine function that aplly pending operations onto some data structure. It accepts array
     * of pending ops and allowed to modify it in any way except delete pointers
    FlatCombiner(std::function<void(OpNode *, OpNode *)> combine) : _slot(nullptr, orphan_slot), _combine(combine) {}
    ~FlatCombiner() { /* dequeue all slot, think about slot deletition */ }

     * Return pending operation slot to the calling thread, object stays valid as long
     * as current thread is alive or got called detach method
    pending_operation *get_slot() {
        Slot *result = _slot.get();
        if (result == nullptr) {
            result = new Slot();
            // TODO: setup usage bit in the pointer

        return result->user_op;

     * Put pending operation in the queue and try to execute it. Method gets blocked until
     * slot gets complete, in other words until slot.complete() returns false
    void apply_slot(pending_operation &slot) {
        Slot *slot = reinterpret_cast<Slot *>(((void *)&slot) - containerof(Slot, user_op));
        // TODO: assert slot params
        // TODO: enqueue slot if needs
        // TODO: try to become executor (cquire lock)
        // TODO: scan qeue, dequeue stale nodes, prepare array to be passed
        // to Combine call
        // TODO: call Combine function
        // TODO: unlock
        // TODO: if lock fails, do thread_yeild and goto 3 TODO

     * Detach calling thread from this flat combiner, in other word
     * destroy thread slot in the queue
    void detach() {
        pending_operation *result = _slot.get();
        if (result != nullptr) {

    // Extend user provided pending operation type with fields required for the
    // flat combine algorithm to work
    using Slot = struct Slot {
        // User pending operation to be complete
        OpNode user_op;

        // When last time this slot was detected as been in use
        uint64_t generation;

        // Pointer to the next slot. One bit of pointer is stolen to
        // mark if owner thread is still alive, based on this information
        // combiner/thread_local destructor able to take decission about
        // deleting node.
        // So if stolen bit is set then the only reference left to this slot
        // if the queue. If pointer is zero and bit is set then the only ref
        // left is thread_local storage. If next is zero there are no
        // link left and slot could be deleted
        std::atomic<uint64_t> next_and_alive;

         * Remove alive bit from the next_and_alive pointer and return
         * only correct pointer to the next slot
        Slot *next() {
            // TODO: implement
            return nullptr;

     * Try to acquire "lock", in case of success returns current generation. If
     * fails the return 0
     * @param suc memory barier to set in case of success lock
     * @param fail memory barrier to set in case of failure
    uint64_t try_lock(std::memory_order suc, std::memory_order fail) {
        // TODO: implements
        return 0;

     * Try to release "lock". Increase generation number in case of sucess
     * @param suc memory barier to set in case of success lock
     * @param fail memory barrier to set in case of failure
    void unlock(std::memory_order suc, std::memory_order fail) {
        // TODO: implements

     * Remove slot from the queue. Note that method must be called only
     * under "lock" to eliminate concurrent queue modifications
    void dequeue_slot(Slot *parent, Slot *slot2remove) {
        // TODO: remove node from the queue
        // TODO: set pointer pare of "next" to null, DO NOT modify usage bit
        // TODO: if next == 0, delete pointer

     * Function called once thread owning this slot is going to die or to
     * destory slot in some other way
     * @param Slot pointer to the slot is being to orphan
    void orphan_slot(Slot *) {}

    static constexpr uint64_t LCK_BIT_MASK = uint64_t(1) << 63L;
    static constexpr uint64_t GEN_VAL_MASK = ~LCK_BIT_MASK;

    // First bit is used to see if lock is acquired already or no. Rest of bits is
    // a counter showing how many "generation" has been passed. One generation is a
    // single call of flat_combine function.
    // Based on that counter stale slots found and gets removed from the pending
    // operations queue
    std::atomic<uint64_t> _lock;

    // Pending operations queue. Each operation to be applied to the protected
    // data structure is ends up in this queue and then executed as a batch by
    // flat_combine method call
    std::atomic<Slot *> _queue;
    // Function to call in order to execute operations
    combiner _combine;

    // Usual strategy for the combine flat would be sort operations by some creteria
    // and optimize it somehow. That array is using by executor thread to prepare
    // number of ops to pass to combine
    std::array<OpNode *, QMS> _combine_shot;

    // Slot of the current thread. If nullptr then cur thread gets access in the
    // first time or after a long period when slot has been deleted already
    ThreadLocal<Slot> _slot;

Использование кода будет примерно таким:

class StorageSlot {
  int opcode; // put, set, delete, e.t.c
  std::string *key;
  std::string *value;

  bool completed;
  std::exception *ex;

  bool complete() {
    return (completed || ex != nullptr);

class StorageFcImpl : public Storage {
    StorageFcImpl() : _combiner(flat_combine) {}

    bool Put(const std::string &key, const std::string &value) {
        StorageSlot *slot = combiner.get_slot();
        slot.opcode = OpCode::Put;
        slot.key = &key;
        slot.value = &value;

        if (slot.ex != nullptr) { throw *slot.ex; }

        return (*slot.value == value);

    void flat_combine(StorageSlot *begin, StorageSlot *end) override {
        std::Sort(begin, end, key_op_comparator);

        for (Slot *p = begin; p != end; p = p->next) {
            // eliminate as much ops as possible
            // use map methods with a hint to use the fact keys are ordered

   FlatCombiner<StorageSlot> _combiner;
   std::map<std::string, std::string> _backend;

Для тех, кому интересно: сольно рекомендую сделать дополнительное упражнение:

  • напишите простой тест: 2-4-8-16 потоков добавляют элементы в storage
  • запустите как perf stat ./fc-bench
  • подумайте
  • посмотрите что расскажут pmu-tools
  • попробуйте исправить произвдительность


Как и в большистве случаев многопоточного кода - тесты руками