IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

Boost C++ Discussion :

Thread sans fin et bugs divers lors de l'implémentation d'une lecture/écriture parallèle ligne par ligne


Sujet :

Boost C++

  1. #1
    Membre régulier Avatar de kidpaddle2
    Inscrit en
    Avril 2006
    Messages
    430
    Détails du profil
    Informations forums :
    Inscription : Avril 2006
    Messages : 430
    Points : 95
    Points
    95
    Par défaut Thread sans fin et bugs divers lors de l'implémentation d'une lecture/écriture parallèle ligne par ligne
    Bonjour,

    Je suis en train d'implémenter un module de lecture/écriture optimisé permettant à la lecture, le traitement et l'écriture ligne par ligne, de se dérouler en parallèle : des threads de lecture remplissent ligne par ligne chacun une FIFO, qui sont lues par un autre thread de traitement qui remplit avec les résultats une FIFO de sortie, à partir de laquelle le fichier de sortie sera écrit par un autre thread d'écriture.

    Pour le test ci-dessous, je n'ai fait qu'une lecture, et le traitement se résume à une simple copie. Je me trouve face aux problèmes suivants :
    • Sous Windows et Linux : le programme ne se termine pas 3 fois sur 5 (les chances de réussite augmentent bizarrement avec le nombre d'opérations entre les deux FIFO), et génère de temps à autres (inexcusable quand même) une SIGSEV.
    • Sous Linux, j'ai droit à du SIGABRT à la création du thread, et après le thread à "memory clobbered before allocated block" (soit une autre SIGABRT) aléatoirement dans les quinze premières lignes.


    Croyez bien que j'ai HORREUR de poser mon problème et de vous inviter à réfléchir mais j'ai épuisé mes ressources... Je m'en excuse donc d'avance, en espérant une réponse qui me sortirait d'un beau pétrin.

    Main.cpp :
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    #include "io.hpp"
     
    #include <iostream>
     
    int main(int argc, char *argv[]) {
        CSV::Reader reader;
        CSV::Writer writer;
     
        if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
            CSV::Row row;
     
            reader.run(); //Reads the CSV file and fills the read queue
            writer.run(); //Reads the to-be-written queue and writes it to a txt file
     
            //The loop is supposed to end only if the reader is finished and empty
            while(!(reader.is_finished() && reader.empty())) {
                //Transfers line by line from the read to the to-be-written queues
                reader.wait_and_pop(row);
                writer.push(row);
            }
            //The reader will likely finish before the writer, so he has to finish his queue before continuing.
            writer.finish(); 
        }
        else {
            std::cout << "File error";
        }
     
        return EXIT_SUCCESS;
    }
    Io.hpp :
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    #ifndef IO_H_INCLUDED
    #define IO_H_INCLUDED
     
    #include "threads.hpp"
     
    #include <fstream>
     
    namespace CSV {
        class Row {
            std::vector<std::string> m_data;
     
            friend class Iterator;
            friend void write_row(Row const &row, std::ostream &stream);
     
            void read_next(std::istream& csv);
     
            public:
                inline std::string const& operator[](std::size_t index) const {
                    return m_data[index];
                }
                inline std::size_t size() const {
                    return m_data.size();
                }
        };
     
        /** Reading *************************************************************************/
     
        class Iterator {
            public:
                Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
                    ++(*this);
                }
                Iterator() : m_csv(NULL) {}
     
                //Pre-Increment
                Iterator& operator++() {
                    if (m_csv != NULL) {
                        m_row.read_next(*m_csv);
                        m_csv = m_csv->good() ? m_csv : NULL;
                    }
     
                    return *this;
                }
                inline Row const& operator*() const {
                    return m_row;
                }
     
                inline bool operator==(Iterator const& rhs) {
                    return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
                }
                inline bool operator!=(Iterator const& rhs) {
                    return !((*this) == rhs);
                }
            private:
                std::istream* m_csv;
                Row m_row;
        };
     
        class Reader : public Concurrent_queue<Row>, public Thread {
            std::ifstream m_csv;
     
            Thread_safe_value<bool> m_finished;
     
            void work() {
                if(!!m_csv) {
                    for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
                        push(*it);
                    }
                    m_finished.set(true);
                }
            }
     
        public:
            Reader() {
                m_finished.set(false);
            }
     
            inline bool open(std::string path) {
                m_csv.open(path.c_str());
     
                return !!m_csv;
            }
     
            inline bool is_finished() {
                return m_finished.get();
            }
        };
     
        /** Writing ***************************************************************************/
     
        void write_row(Row const &row, std::ostream &stream);
     
        //Is m_finishing really thread-safe ? By the way, is it mandatory ?
        class Writer : public Concurrent_queue<Row>, public Thread {
            std::ofstream m_csv;
     
            Thread_safe_value<bool> m_finishing;
     
            void work() {
                if(!!m_csv) {
                    CSV::Row row;
     
                    while(!(m_finishing.get() && empty())) {
                        wait_and_pop(row);
                        write_row(row, m_csv);
                    }
                }
            }
     
        public:
            Writer() {
                m_finishing.set(false);
            }
     
            inline void finish() {
                m_finishing.set(true);
                catch_up();
            }
     
            inline bool open(std::string path) {
                m_csv.open(path.c_str());
     
                return !!m_csv;
            }
        };
    }
     
    #endif
    Io.cpp :
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #include "io.hpp"
     
    #include <boost/bind.hpp>
    #include <boost/tokenizer.hpp>
     
    void CSV::Row::read_next(std::istream& csv) {
        std::string row;
        std::getline(csv, row);
     
        boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
        m_data.assign(tokenizer.begin(), tokenizer.end());
    }
     
    void CSV::write_row(Row const &row, std::ostream &stream) {
        std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
        stream << std::endl;
    }
    Threads.hpp :
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    #ifndef THREADS_HPP_INCLUDED
    #define THREADS_HPP_INCLUDED
     
    #include <boost/bind.hpp>
    #include <boost/thread.hpp>
     
    class Thread {
    protected:
        boost::thread *m_thread;
     
        virtual void work() = 0;
     
        void do_work() {
            work();
        }
     
    public:
        Thread() : m_thread(NULL) {}
        virtual ~Thread() {
            catch_up();
            if(m_thread != NULL) {
                delete m_thread;
            }
        }
     
        inline void catch_up() {
            if(m_thread != NULL) {
                m_thread->join();
            }
        }
     
        void run() {
            m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
        }
    };
     
    /** Thread-safe datas **********************************************************/
     
    #include <queue>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>
     
    template <class T>
    class Thread_safe_value : public boost::noncopyable {
        T m_value;
        boost::mutex m_mutex;
     
        public:
            T const &get() {
                boost::mutex::scoped_lock lock(m_mutex);
                return m_value;
            }
            void set(T const &value) {
                boost::mutex::scoped_lock lock(m_mutex);
                m_value = value;
            }
    };
     
    template<typename Data>
    class Concurrent_queue {
        std::queue<Data> m_queue;
        mutable boost::mutex m_mutex;
        boost::condition_variable m_cond;
     
    public:
        void push(Data const& data) {
            boost::mutex::scoped_lock lock(m_mutex);
            m_queue.push(data);
            lock.unlock();
            m_cond.notify_one();
        }
     
        bool empty() const {
            boost::mutex::scoped_lock lock(m_mutex);
            return m_queue.empty();
        }
     
        void wait_and_pop(Data& popped) {
            boost::mutex::scoped_lock lock(m_mutex);
            while(m_queue.empty()) {
                m_cond.wait(lock);
            }
     
            popped = m_queue.front();
            m_queue.pop();
        }
    };
     
    #endif // THREAD_HPP_INCLUDED
    D'avance, merci.

    Cordialement,

    Kidpaddle2.

    P.S: Concurrent_queue est le seul élément qui n'est pas de moi : http://www.justsoftwaresolutions.co....variables.html. J'ai confiance en son efficacité.

  2. #2
    Membre émérite

    Inscrit en
    Mai 2008
    Messages
    1 014
    Détails du profil
    Informations forums :
    Inscription : Mai 2008
    Messages : 1 014
    Points : 2 252
    Points
    2 252
    Par défaut
    Bonjour,
    Citation Envoyé par kidpaddle2
    Je suis en train d'implémenter un module de lecture/écriture optimisé permettant à la lecture, le traitement et l'écriture ligne par ligne, de se dérouler en parallèle : des threads de lecture remplissent ligne par ligne chacun une FIFO, qui sont lues par un autre thread de traitement qui remplit avec les résultats une FIFO de sortie, à partir de laquelle le fichier de sortie sera écrit par un autre thread d'écriture.
    Euuu avant de rentrer directement dans les complications, as-tu essayé le cas simple, sans multi-threading ?

    Car ton problème semble très largement "IO-bound", donc le gain apporté par le multi-threading sera probablement médiocre, voire nul... Les têtes de lecture du disque dur ne vont pas se multiplier par magie...

  3. #3
    Membre régulier Avatar de kidpaddle2
    Inscrit en
    Avril 2006
    Messages
    430
    Détails du profil
    Informations forums :
    Inscription : Avril 2006
    Messages : 430
    Points : 95
    Points
    95
    Par défaut
    L'optimisation a déjà fait l'objet de plusieurs topics et après maintes discussions c'est ce qui en est sorti. Cela permet de commencer le traitement alors que la lecture n'est toujours pas finie (le disk scheduler s'occupera d'optimiser les rotations).

    J'ai des délais plutôt courts et des obligations de performances. Cette méthode s'est avérée extrêmement efficace (au lieu d'un writer, j'ai tout simplement utilisé des std::cout), mais je suis dorénavant bloqué...

    Quelqu'un pourrait-il être un véritable seigneur et m'aider svp ?

  4. #4
    Membre régulier Avatar de kidpaddle2
    Inscrit en
    Avril 2006
    Messages
    430
    Détails du profil
    Informations forums :
    Inscription : Avril 2006
    Messages : 430
    Points : 95
    Points
    95
    Par défaut
    Apparemment, il s'agirait d'une concurrence critique entre le thread de lecture qui essaie de changer m_finished et le thread principal, qui est en état d'attente, mais je n'ai aucune idée de comment y remédier (je n'ai même pas une meilleure vision des choses, c'est mon premier projet multiprocessus)... Et vous ?

  5. #5
    Membre émérite

    Inscrit en
    Mai 2008
    Messages
    1 014
    Détails du profil
    Informations forums :
    Inscription : Mai 2008
    Messages : 1 014
    Points : 2 252
    Points
    2 252
    Par défaut
    Malheureusement, j'ai testé chez moi avec des csv d'environ 200mo et n'ai pas réussi à reproduire le problème.
    Sur une vingtaine d'exécution, le fichier csv d'entrée a toujours été correctement copié dans output.txt et aucun blocage ou équivalent à SIGSEV en vu.

    Donc ça sent la data race bien mer....., le genre de truc excessivement difficile à reproduire et débugguer

    Une autre piste : Quelle est ta version de boost ? Pour ma part j'ai testé avec la version trunk. Peut être y a-t-il eu des corrections de bug dans boost::thread, boost::mutex et boost::condition_variable ?

  6. #6
    Membre régulier Avatar de kidpaddle2
    Inscrit en
    Avril 2006
    Messages
    430
    Détails du profil
    Informations forums :
    Inscription : Avril 2006
    Messages : 430
    Points : 95
    Points
    95
    Par défaut
    Merci de ta réponse.

    Eh bien, heureusement que je l'ai vu alors ! C'eût été mes clients qui auraient été contents :/ J'ai finalement résolu le problème en utilisant try_pop() qui pop uniquement si la file n'est pas vide, sans bouclage. C'est le bouclage de wait_and_pop() qui visiblement attendait un nouvel élément qui n'arrivait pas, qui empêchait l'arrêt.

    En clair, d'après ce que j'ai compris :
    file vide mais m_finished pas encore mis à true -> on entre dans la boucle -> wait_and_pop() bloque -> pendant ce temps-là m_finished passe à true mais peu importe, le thread principal est bloqué.

    Le problème est donc désormais résolu, mais j'ai un petit peu l'impression de faire de la magie avec les threads. Est-ce qu'au moins, avec le code précédent, je ne risque rien ?

    Je ne peux me permettre la moindre erreur...

  7. #7
    Membre émérite

    Inscrit en
    Mai 2008
    Messages
    1 014
    Détails du profil
    Informations forums :
    Inscription : Mai 2008
    Messages : 1 014
    Points : 2 252
    Points
    2 252
    Par défaut
    Hello,
    Je viens de lire un post très intéressant d'Herb Sutter sur son blog qui m'a fait furieusement penser à ton cas de figure.

    En tant qu'exercice, j'ai essayé d'appliquer ses conseils à ton problème, et voici le résultat

    (Ah oui, bien sur le code est un peu crade, pas de gestion d'erreur, tout est dans des .h etc..)

    main.cpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
     
    #include "CSVReader.h"
    #include "CSVWriter.h"
     
    #include <boost/progress.hpp>
     
    void Process(const std::string& fileIn, const std::string fileOut)
    {
       boost::progress_timer timer;
     
       CSVWriter writer(fileOut);
       CSVReader reader(fileIn, &writer);
     
       reader.Run();
       writer.Run();
     
       reader.WaitForCompletion();
       writer.WaitForCompletion();
    }
     
    int main(int argc, char *argv[]) 
    {
       Process("test.csv", "output.txt");
       return EXIT_SUCCESS;
    }
    ConcurrentQueue.h
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
     
    #ifndef CONCURRENTQUEUE_HPP_INCLUDED
    #define CONCURRENTQUEUE_HPP_INCLUDED
     
    #include <queue>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>
     
    template<typename Data>
    class ConcurrentQueue 
    {
        std::queue<Data> m_queue;
        mutable boost::mutex m_mutex;
        boost::condition_variable m_cond;
     
    public:
        void push(Data const& data) 
        {
            boost::mutex::scoped_lock lock(m_mutex);
            m_queue.push(data);
            lock.unlock();
            m_cond.notify_one();
        }
     
        bool empty() const 
        {
            boost::mutex::scoped_lock lock(m_mutex);
            return m_queue.empty();
        }
     
        void wait_and_pop(Data& popped) 
        {
            boost::mutex::scoped_lock lock(m_mutex);
            while(m_queue.empty()) 
            {
                m_cond.wait(lock);
            }
     
            popped = m_queue.front();
            m_queue.pop();
        }
    };
     
    #endif
    ActiveObject.hpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
     
    #ifndef ACTIVE_OBJECT_HPP_INCLUDED
    #define ACTIVE_OBJECT_HPP_INCLUDED
     
    #include "ConcurrentQueue.h"
     
    #include <boost/scoped_ptr.hpp>
    #include <boost/make_shared.hpp>
     
    class Active 
    {
    public:
     
      class Message {        // base of all message types
      public:
        virtual ~Message() { }
        virtual void Execute() { }
      };
     
      Active() : done_(false)
      {
         thd_.reset( new boost::thread( boost::bind(&Active::run, this) ) );
      }
     
      ~Active() 
      {
        Shutdown() ;
        thd_->join();
      }
     
      void Send( boost::shared_ptr<Message> m ) 
      {
        mq_.push( m );
      }
     
      void Shutdown()
      {
         Send ( boost::make_shared<MShutdown>(boost::ref(done_)));
      }
     
      void WaitForCompletion()
      {
         thd_->join();
      }
     
    private:
     
      Active( const Active& );           // no copying
      void operator=( const Active& );    // no copying
     
      bool done_;                                          // le flag
      ConcurrentQueue<boost::shared_ptr<Message> > mq_;    // la queue
      boost::scoped_ptr<boost::thread> thd_;               // le thread
     
      void run() 
      {
         boost::shared_ptr<Message> msg;
         while( !done_ ) 
         {
           mq_.wait_and_pop(msg);
           msg->Execute();            // execute message
         } // note: last message sets done to true
      }
     
      struct MShutdown : public Active::Message
      {
         bool& done_;
         MShutdown(bool& b):done_(b){}
         void Execute()
         {
            done_ = true;
         }
      };
    };
     
    #endif
    Io.hpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
     
    #ifndef IO_H_INCLUDED
    #define IO_H_INCLUDED
     
    #include <vector>
    #include <fstream>
    #include <iostream>
     
    namespace CSV 
    {
        class Row 
        {
            std::vector<std::string> m_data;
     
            friend class Iterator;
            friend void write_row(Row const &row, std::ostream &stream);
     
            void read_next(std::istream& csv);
        };
     
        /** Reading *************************************************************************/
     
        class Iterator {
            public:
                Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
                    ++(*this);
                }
                Iterator() : m_csv(NULL) {}
     
                //Pre-Increment
                Iterator& operator++() {
                    if (m_csv != NULL) {
                        m_row.read_next(*m_csv);
                        m_csv = m_csv->good() ? m_csv : NULL;
                    }
     
                    return *this;
                }
                inline Row const& operator*() const {
                    return m_row;
                }
     
                inline bool operator==(Iterator const& rhs) {
                    return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
                }
                inline bool operator!=(Iterator const& rhs) {
                    return !((*this) == rhs);
                }
            private:
                std::istream* m_csv;
                Row m_row;
        };
     
        /** Writing ***************************************************************************/
         void write_row(Row const &row, std::ostream &stream);
    }
     
    #endif
    Io.cpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
     
    #include "io.hpp"
     
    #include <boost/bind.hpp>
    #include <boost/tokenizer.hpp>
     
     
    void CSV::Row::read_next(std::istream& csv) {
        std::string row;
        std::getline(csv, row);
     
        boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
        m_data.assign(tokenizer.begin(), tokenizer.end());
    }
     
    void CSV::write_row(Row const &row, std::ostream &stream) {
        std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
        stream << std::endl;
    }
    CSVReader.hpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
     
    #ifndef CSV_READER_H_INCLUDED
    #define CSV_READER_H_INCLUDED
     
    #include "ActiveObject.h"
    #include "CSVWriter.h"
    #include <string>
     
    class CSVReader
    {
     
    public:
       CSVReader(const std::string& filename, CSVWriter* writer):
       m_csv(filename.c_str()), 
       m_writer(writer),
       m_it(m_csv)
       {
       }
     
       void Run()
       {
          parse_next_row(); // amorce
       }
     
       void WaitForCompletion()
       {
          a.WaitForCompletion();
       }
     
    private:
       CSVReader();
       CSVReader( const CSVReader& );           // no copying
       void operator=( const CSVReader& );    // no copying
     
       void parse_next_row()
       {
          if(m_it != CSV::Iterator())
          {
             a.Send( boost::make_shared<MParseNextRow>(this));
          }
          else
          {
             m_writer->Shutdown();
             a.Shutdown();
          }
       }
     
      class MParseNextRow : public Active::Message 
      {
        CSVReader* this_;  
      public:
        MParseNextRow( CSVReader* r ) : this_(r) { }
        void Execute() 
        {
          CSV::Iterator& it = this_->m_it;
          this_->m_writer->WriteRow(*it);
          ++it;
          this_->parse_next_row(); // loop
        }    
      };
     
      std::ifstream m_csv;
      CSVWriter* m_writer;
      CSV::Iterator m_it;
     
      // Helper goes last, for ordered destruction
      Active a;
    };
    #endif
    CSVWriter.hpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
     
    #ifndef CSV_WRITER_H_INCLUDED
    #define CSV_WRITER_H_INCLUDED
     
    #include "ActiveObject.h"
    #include "Io.hpp"
     
    #include <string>
    #include <iostream>
    #include <algorithm>
    #include <iterator>
     
    class CSVWriter
    {
     
    public:
       CSVWriter(const std::string& filename):  m_csv(filename.c_str())
       {
       }
     
       void Run()
       {
          // Nothing to do. (wait for message)
       }
     
       void Shutdown()
       {
          a.Shutdown();
       }
     
       void WriteRow(const CSV::Row& row)
       {
          a.Send( boost::make_shared<MWriteRow>(this, row));
       }
     
       void WaitForCompletion()
       {
          a.WaitForCompletion();
       }
     
     
    private:
       CSVWriter();
       CSVWriter( const CSVWriter& );           // no copying
       void operator=( const CSVWriter& );    // no copying
     
       class MWriteRow : public Active::Message
       {
          CSVWriter* this_;
          CSV::Row row_;
       public:
          MWriteRow(CSVWriter* writer, const CSV::Row& row):this_(writer),row_(row){}
          void Execute()
          {
             CSV::write_row(row_, this_->m_csv);
          }
       };
     
      std::ofstream m_csv;
      CSV::Row row_;
     
      // Helper goes last, for ordered destruction
      Active a;
    };
     
    #endif

  8. #8
    Membre expérimenté
    Profil pro
    Inscrit en
    Juin 2006
    Messages
    1 354
    Détails du profil
    Informations personnelles :
    Âge : 49
    Localisation : France

    Informations forums :
    Inscription : Juin 2006
    Messages : 1 354
    Points : 1 419
    Points
    1 419
    Par défaut
    Je ne compends pas pourquoi utiliser le multi-threading dans ce cas...
    pourquoi ne pas tout simplement lire une ligne, la traiter et l'ecrire?
    (pattern observer?)

    sinon c'est pas mal l'article de Herb Sutter.

  9. #9
    Membre émérite

    Inscrit en
    Mai 2008
    Messages
    1 014
    Détails du profil
    Informations forums :
    Inscription : Mai 2008
    Messages : 1 014
    Points : 2 252
    Points
    2 252
    Par défaut
    Citation Envoyé par epsilon68 Voir le message
    Je ne compends pas pourquoi utiliser le multi-threading dans ce cas...
    pourquoi ne pas tout simplement lire une ligne, la traiter et l'ecrire?
    Parce qu'en fait, si j'ai bien compris, kidpaddle2 veut lire ligne par ligne N fichiers csv simultanément, en combinant à chaque fois les N lignes lues en une seule ligne qui est ensuite écrite dans un fichier de sortie.

    Bon sinon j'ai repensé au problème aujourd'hui.
    Finalement, les passages de message entre objets actifs, c'est très bien, mais ça rajoute beaucoup de code et c'est quand même franchement difficile à développer et à maitriser. (Je me suis pris quelques bon gros deadlock avant d'arriver à la version posté dans mon message précédent )

    Après tout, pourquoi devrait-on se fader toute la gestion des threads, pool, synchronisation, passage de message et autres joyeusetés à la main ? Il vaut mieux s'appuyer sur des bibliothèques de plus haut niveau en tablant sur le fait que des gentils développeurs ont déjà sué à notre place.

    Donc j'ai refait l'exemple en me basant sur Intel TBB.
    D'un seul coup la taille du code a fondu de moitié et le code obtenu est finalement à peine plus long que la version mono-thread. Il aurait peut-être fallu commencer par là en fait.

    (Note sur le code qui suit : Sans compilateur C++0X, il faut remplacer tous les std::unique_ptr par des boost::shared_ptr et remplacer la lambda du parrallel_invoke par sa fonction objet correspondante)

    CSVReader.h
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
     
    #ifndef CSVREADER_HPP
    #define CSVREADER_HPP
     
    #include "Row.h"
    #include <fstream>
     
    class CSVReader
    {
       std::ifstream m_csv;
       Row m_row;
     
    public:
     
      CSVReader(const std::string& filename):m_csv(filename){}
     
      bool ParseNextRow() 
      {
         bool read_ok = m_row.read_next(m_csv);
         return read_ok;
      }
     
      const Row& GetRow() const
      {
         return m_row;
      }
    };
    #endif
    CSVWriter.h
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
     
    #ifndef CSVWRITER_H
    #define CSVWRITER_H
     
    #include "Row.h"
    #include <fstream>
     
    class CSVWriter
    {
       std::ofstream m_csv;
    public:
     
       CSVWriter(const std::string& filename):m_csv(filename){}
     
       void WriteRow(Row& row) 
       {
          row.write_row(m_csv);
       }
    };
     
    #endif
    Row.h
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
     
    #include <string>
    #include <iostream>
     
    #include <boost/tokenizer.hpp>
     
    class Row 
    {
       std::vector<std::string> m_data;
     
    public:
     
       bool read_next(std::istream& stream);
       void write_row(std::ostream& stream);
       void append(const Row& row);
    };
     
     
    bool Row::read_next(std::istream& is) 
    {
       std::string s;
       if(std::getline(is, s))
       {
         boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(s, boost::escaped_list_separator<char>('\\', ';', '\"'));
         m_data.assign(tokenizer.begin(), tokenizer.end());  
          return true;
       }
       return false;
    }
     
    void Row::write_row(std::ostream& stream) 
    {
       if(m_data.empty() == false)
       {
           std::copy(m_data.begin(), m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
           stream << "\n";
       }
    }
     
    void Row::append(const Row& row)
    {
       m_data.insert(m_data.end(), row.m_data.begin(), row.m_data.end());
    }
    #endif
    main.cpp
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
     
    #include <boost/progress.hpp>
    #include <tbb/tbb.h>
     
    #include "CSVReader.h"
    #include "CSVWriter.h"
     
     
    Row CombineRows(const std::vector<std::unique_ptr<CSVReader>>& readers)
    {
       Row finalRow;
       for(unsigned int i = 0 ; i < readers.size() ; i++)
       {
          finalRow.append(readers[i]->GetRow());
       }
       return finalRow;
    }
     
    void ProcessParallel()
    {
       boost::progress_timer t;
     
       try
       {
          CSVWriter writer("output.txt");
     
          typedef std::unique_ptr<CSVReader> CSVReaderPtr;
          std::vector<CSVReaderPtr> readers;
          readers.push_back(CSVReaderPtr(new CSVReader("Test1_700x20000.csv")));
          readers.push_back(CSVReaderPtr(new CSVReader("Test2_700x20000.csv")));
          readers.push_back(CSVReaderPtr(new CSVReader("Test3_700x20000.csv")));
     
          Row next_row;
          Row current_row;
     
          bool finished = false;
     
          while(finished == false)
          {
             tbb::parallel_invoke(
                // parse next rows in files and combine them
                [&readers, &finished, &next_row]()
                {
                   //workaround a bug of VS2010 (can't capture in nested lambda)
                   std::vector<CSVReaderPtr>& workaround_readers = readers; 
                   bool& workaround_finished = finished;
     
                   tbb::parallel_for(0u, readers.size(), 1u,  
                   [&workaround_readers, &workaround_finished](size_t index)
                   {
                       if (workaround_readers[index]->ParseNextRow() == false)
                       {
                         workaround_finished = true;
                       }
                   });
     
                   next_row = CombineRows(readers);
                },
                // write current row;
                [&writer, &current_row]()
                {
                   writer.WriteRow(current_row);
                }
            );
     
            current_row = next_row;
          }
       }
       catch(std::exception& e)
       {
          std::cout << e.what();
       }
    }
     
     
    void ProcessSerial()
    {
       boost::progress_timer t;
     
       try
       {
          CSVWriter writer("output.txt");
     
          typedef std::unique_ptr<CSVReader> CSVReaderPtr;
          std::vector<CSVReaderPtr> readers;
          readers.push_back(CSVReaderPtr(new CSVReader("Test1_700x20000.csv")));
          readers.push_back(CSVReaderPtr(new CSVReader("Test2_700x20000.csv")));
          readers.push_back(CSVReaderPtr(new CSVReader("Test3_700x20000.csv")));
     
          bool finished = false;
          std::vector<Row> current_rows(readers.size());
     
          while(1)
          {
             for(unsigned int i = 0 ; i < readers.size() ; i++)
             {
                if(readers[i]->ParseNextRow() == false)
                {
                  return; // GTFO
                }
             }
     
             Row row = CombineRows(readers);
             writer.WriteRow(row);
          }
       }
       catch(std::exception& e)
       {
          std::cout << e.what();
       }
    }
     
    int main(int argc, char *argv[]) 
    {
       ProcessParallel();
       ProcessSerial();
       ProcessParallel();
       ProcessSerial();
       ProcessParallel();
       ProcessSerial();
     
       return EXIT_SUCCESS;
    }
    Les résultats du traitement de trois fichiers CSV de 700 colonnes par 20000 lignes (environ 90 MB) sont franchement meilleurs que ce que j'attendais ! 35 secondes pour la version mono-thread et environ 19 secondes pour la multi-thread.

    Autre chose, j'ai eu la flemme de faire l'opération de réduction des trois lignes lues en une seule dans un thread à part, en pseudo-code ça ressemblerait à ça...
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
     
    std::vector<Row> lignesN+2;
    std::vector<Row> lignesN+1;
    Row ligneN+1;
    Row ligneN;
     
    while(finished)
    {
       parrallel_invoke(
         lire les lignes N+2,
         opération de réduction produisant la ligne N+1 à partir des lignes N+1, 
         écriture de la ligne N)
      );
     
      copier la ligne N+1 dans la ligne N
      copier les lignes N+2 dans les lignes N+1
    }
    Il y a pas mal de copies de lignes en plus, pour éviter que les threads se marchent dessus, donc il faut être certain que l'opération de réduction est assez lourde en calcul pour que cela ai un sens de la faire en parallèle de la lecture et de l'écriture.

+ Répondre à la discussion
Cette discussion est résolue.

Discussions similaires

  1. [MySQL] Performance lors d'un backup de base de données ligne par ligne
    Par Jolt0x dans le forum PHP & Base de données
    Réponses: 0
    Dernier message: 06/04/2014, 01h05
  2. Problème lors de l'implémentation d'une connexion à une BDD
    Par Gwyrrd dans le forum API standards et tierces
    Réponses: 4
    Dernier message: 11/12/2011, 13h49
  3. Lire un fichier ligne par ligne et écrire à la fin
    Par mailbox dans le forum Débuter
    Réponses: 2
    Dernier message: 01/03/2011, 21h33
  4. Réponses: 5
    Dernier message: 15/01/2007, 10h45
  5. Traitement ligne par ligne sans curseur
    Par AbyssoS dans le forum MS SQL Server
    Réponses: 1
    Dernier message: 28/02/2006, 17h46

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo