# Node ID 60e542d698127acce4fe3da5f8bf24c0b36f7d2d # Parent daf8bbee4bc7d69cb32f5f083b2f461984b743bc diff --git a/src/dev/SConscript b/src/dev/SConscript --- a/src/dev/SConscript +++ b/src/dev/SConscript @@ -90,6 +90,7 @@ DebugFlag('DMACopyEngine') DebugFlag('Ethernet') DebugFlag('MultiEthernet') +DebugFlag('MultiEthernetPkt') DebugFlag('EthernetCksum') DebugFlag('EthernetDMA') DebugFlag('EthernetData') diff --git a/src/dev/multi_etherlink.hh b/src/dev/multi_etherlink.hh --- a/src/dev/multi_etherlink.hh +++ b/src/dev/multi_etherlink.hh @@ -223,11 +223,14 @@ virtual EtherInt *getEthPort(const std::string &if_name, int idx) M5_ATTR_OVERRIDE; + virtual unsigned drain(DrainManager *dm) M5_ATTR_OVERRIDE; + virtual void memWriteback() M5_ATTR_OVERRIDE; virtual void init() M5_ATTR_OVERRIDE; virtual void startup() M5_ATTR_OVERRIDE; - virtual void serialize(std::ostream &os); - virtual void unserialize(Checkpoint *cp, const std::string §ion); + virtual void serialize(std::ostream &os) M5_ATTR_OVERRIDE; + virtual void unserialize(Checkpoint *cp, + const std::string §ion) M5_ATTR_OVERRIDE; }; diff --git a/src/dev/multi_etherlink.cc b/src/dev/multi_etherlink.cc --- a/src/dev/multi_etherlink.cc +++ b/src/dev/multi_etherlink.cc @@ -56,6 +56,7 @@ #include "base/trace.hh" #include "debug/EthernetData.hh" #include "debug/MultiEthernet.hh" +#include "debug/MultiEthernetPkt.hh" #include "dev/etherdump.hh" #include "dev/etherint.hh" #include "dev/etherlink.hh" @@ -107,18 +108,34 @@ return localIface; } +unsigned +MultiEtherLink::drain(DrainManager *dm) +{ + DPRINTF(MultiEthernet,"MultiEtherLink::drain() called\n"); + multiIface->drainStart(); + return 0; +} + +void MultiEtherLink::memWriteback() +{ + DPRINTF(MultiEthernet,"MultiEtherLink::memWriteback() called\n"); + multiIface->drainDone(); +} + void MultiEtherLink::serialize(ostream &os) { - txLink->serialize("link0", os); - rxLink->serialize("link1", os); + multiIface->serialize("multiIface", os); + txLink->serialize("txLink", os); + rxLink->serialize("rxLink", os); } void MultiEtherLink::unserialize(Checkpoint *cp, const string §ion) { - txLink->unserialize("link0", cp, section); - rxLink->unserialize("link1", cp, section); + multiIface->unserialize("multiIface", cp, section); + txLink->unserialize("txLink", cp, section); + rxLink->unserialize("rxLink", cp, section); } void @@ -158,7 +175,7 @@ if (dump) dump->dump(packet); - DPRINTF(MultiEthernet, "MultiEtherLink::MultiLink::rxDone() " + DPRINTF(MultiEthernetPkt, "MultiEtherLink::MultiLink::rxDone() " "packet received: len=%d\n", packet->length); DDUMP(EthernetData, packet->data, packet->length); diff --git a/src/dev/multi_iface.hh b/src/dev/multi_iface.hh --- a/src/dev/multi_iface.hh +++ b/src/dev/multi_iface.hh @@ -78,6 +78,7 @@ #ifndef __DEV_MULTI_IFACE_HH__ #define __DEV_MULTI_IFACE_HH__ +#include #include #include #include @@ -95,17 +96,68 @@ */ class MultiIface { + public: + /*! + * The possible reasons a multi sync among gem5 peers is needed for. + */ + enum + class SyncTrigger { + periodic, /*!< Regular periodic sync. This can be interrupted by a + checkpoint sync request */ + ckpt, /*!< sync before taking a checkpoint */ + atomic /*!< sync that cannot be interrupted (e.g. sync at startup) */ + }; + private: typedef MultiHeaderPkt::MsgType MsgType; - /** - * The class to implement global sync operations among gem5 peer processes. + + /** Sync State-Machine + \dot + digraph Sync { + node [shape=box, fontsize=10]; + idle -> busy + [ label="new trigger\n by run()" fontsize=8 ]; + busy -> busy + [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum > 1) || \n(msg==CkptSyncReq &&\ntrigger == ckpt)" fontsize=8 ]; + busy -> idle + [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum == 1)" fontsize=8 ]; + busy -> interrupted + [ label="new message by progress():\n(msg == CkptSyncReq &&\ntrigger == periodic)" fontsize=8 ]; + idle -> asyncCkpt + [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; + asyncCkpt -> asyncCkpt + [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; + asyncCkpt -> busy + [ label="new trigger by run():\ntrigger == ckpt" fontsize=8 ]; + asyncCkpt -> idle + [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum == 0) " fontsize=8 ]; + asyncCkpt -> interrupted + [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum > 0) " fontsize=8 ]; + interrupted -> interrupted + [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum > 1)" fontsize=8 ]; + interrupted -> idle + [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum == 1)" fontsize=8 ]; + } + \enddot + */ + /** @class Sync + * This class implements global sync operations among gem5 peer processes. * - * @noteThis is used as a singleton object (shared by all MultiIface + * @note This class is used as a singleton object (shared by all MultiIface * objects). */ class Sync { private: + /*! + * Internal state of the sync singleton object. + */ + enum class SyncState { + busy, /*!< There is an on-going sync. */ + interrupted, /*!< An on-going periodic sync was interrupted. */ + asyncCkpt, /*!< A checkpoint (sim_exit) is already scheduled */ + idle /*!< There is no active sync. */ + }; /** * The lock to protect access to the MultiSync object. */ @@ -121,18 +173,42 @@ * synchronisation. */ unsigned waitNum; + /** + * The trigger for the most recent sync. + */ + SyncTrigger trigger; + /** + * Map sync triggers to request messages. + */ + std::array triggerToMsg = {{ + MsgType::cmdPeriodicSyncReq, + MsgType::cmdCkptSyncReq, + MsgType::cmdAtomicSyncReq + }}; + + /** + * Current sync state. + */ + SyncState state; public: - /* + /** * Core method to perform a full multi sync. + * + * @param t Sync trigger. + * @param sync_tick The tick the sync was expected to happen at. + * @return true if the sync completed, false if it was interrupted. + * + * @note In case of an interrupted periodic sync, sync_tick can be less + * than curTick() when we resume (i.e. re-run) it */ - void run(); + bool run(SyncTrigger t, Tick sync_tick); /** * Callback when the receiver thread gets a sync message. */ void progress(MsgType m); - Sync() : waitNum(0) {} + Sync() : waitNum(0), state(SyncState::idle) {} ~Sync() {} }; @@ -153,17 +229,34 @@ { public: /** + * Flag to indicate that the most recent periodic sync was interrupted + * (by a checkpoint request). + */ + bool interrupted; + /** + * The tick when the most recent periodic synchronisation was scheduled + * at. + */ + Tick scheduledAt; + /** + * Flag to indicate an on-going drain cycle. + */ + bool isDraining; + + public: + /** * Only the firstly instanstiated MultiIface object will * call this constructor. */ - SyncEvent() : GlobalSyncEvent(Default_Pri, 0) {} + SyncEvent() : GlobalSyncEvent(Default_Pri, 0), interrupted(false), + scheduledAt(0), isDraining(false) {} ~SyncEvent() { assert (scheduled() == false); } /** * Schedule the first periodic sync event. * * @param start Start tick for multi synchronisation - * @param repeate Frequency of multi synchronisation + * @param repeat Frequency of multi synchronisation * */ void start(Tick start, Tick repeat); @@ -171,7 +264,7 @@ * Reschedule (if necessary) the periodic sync event. * * @param start Start tick for multi synchronisation - * @param repeate Frequency of multi synchronisation + * @param repeat Frequency of multi synchronisation * * @note Useful if we have multiple MultiIface objects with * different 'start' and 'repeat' values for global sync. @@ -182,6 +275,14 @@ * simulation threads. (See further comments in the .cc file.) */ void process() M5_ATTR_OVERRIDE; + /** + * Schedule periodic sync when resuming from a checkpoint. + */ + void resume(); + + void serialize(const std::string &base, std::ostream &os); + void unserialize(const std::string &base, Checkpoint *cp, + const std::string §ion); }; /** @@ -203,10 +304,23 @@ } }; + /** + * Customized priority queue used to store incoming data packets info by + * the receiver thread. We need to expose the underlying container to + * enable iterator access for serializing. + */ + class RecvQueue : public std::priority_queue, + RecvInfoCompare> + { + public: + std::vector &impl() { return c; } + }; + /* * The priority queue to store RecvInfo items ordered by receive ticks. */ - std::priority_queue, RecvInfoCompare> recvQueue; + RecvQueue recvQueue; /** * The singleton Sync object to perform multi synchronisation. */ @@ -283,15 +397,18 @@ /** * Low level generic receive routine. * @param buf the buffer to store the incoming message - * @param lemgth buffer size (in bytes) + * @param length buffer size (in bytes) */ virtual bool recvRaw(void *buf, unsigned length) = 0; /** * Low level request for synchronisation among gem5 processes. Only one * MultiIface object needs to call this (in each gem5 process) to trigger a * multi sync. + * + * @param sync_req Sync request command. + * @param sync_tick The tick when sync is expected to happen in the sender. */ - virtual void syncRaw() = 0; + virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0; /** * The function executed by a receiver thread. */ @@ -328,7 +445,6 @@ EventManager *em); virtual ~MultiIface(); - /** * Send out an Ethernet packet. * @param pkt The Ethernet packet to send. @@ -353,10 +469,25 @@ * peer gem5 process. */ void initRandom(); + + /** + * Callback when draining starts. + */ + void drainStart(); + /** + * Callback when draining is complete. + */ + void drainDone(); + /** * Initialize the periodic synchronisation among peer gem5 processes. */ void startPeriodicSync(); + + void serialize(const std::string &base, std::ostream &os); + void unserialize(const std::string &base, Checkpoint *cp, + const std::string §ion); + }; diff --git a/src/dev/multi_iface.cc b/src/dev/multi_iface.cc --- a/src/dev/multi_iface.cc +++ b/src/dev/multi_iface.cc @@ -49,40 +49,159 @@ #include "base/random.hh" #include "base/trace.hh" #include "debug/MultiEthernet.hh" +#include "debug/MultiEthernetPkt.hh" #include "dev/etherpkt.hh" +#include "sim/sim_exit.hh" #include "sim/sim_object.hh" + MultiIface::Sync *MultiIface::sync = nullptr; MultiIface::SyncEvent *MultiIface::syncEvent = nullptr; unsigned MultiIface::recvThreadsNum = 0; MultiIface * MultiIface::master = nullptr; -void -MultiIface::Sync::run() +bool +MultiIface::Sync::run(SyncTrigger t, Tick sync_tick) { std::unique_lock sync_lock(lock); - assert(waitNum == 0); - waitNum = MultiIface::recvThreadsNum; - // initiate the global synchronisation - assert(MultiIface::master != nullptr); - MultiIface::master->syncRaw(); + trigger = t; + if (trigger != SyncTrigger::periodic) { + DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n", + (unsigned)trigger); + } + + switch (state) { + case SyncState::asyncCkpt: + switch (trigger) { + case SyncTrigger::ckpt: + assert(MultiIface::syncEvent->interrupted == false); + state = SyncState::busy; + break; + case SyncTrigger::periodic: + if (waitNum == 0) { + // So all recv threads got an async checkpoint request already + // and a simExit is scheduled at the end of the current tick + // (i.e. it is a periodic sync scheduled at the same tick as the + // simExit). + state = SyncState::idle; + DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted due " + "async ckpt scheduled\n"); + return false; + } else { + // we still need to wait for some receiver thread to get the + // aysnc ckpt request. We are going to proceed as 'interrupted' + // periodic sync. + state = SyncState::interrupted; + DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted due " + "ckpt request is coming in\n"); + } + break; + case SyncTrigger::atomic: + assert(trigger != SyncTrigger::atomic); + } + break; + case SyncState::idle: + state = SyncState::busy; + break; + // Only one sync can be active at any time + case SyncState::interrupted: + case SyncState::busy: + assert(state != SyncState::interrupted); + assert(state != SyncState::busy); + break; + } + // Kick-off the sync unless we are in the middle of an interrupted periodic + // sync + if (state != SyncState::interrupted) { + assert(waitNum == 0); + waitNum = MultiIface::recvThreadsNum; + // initiate the global synchronisation + assert(MultiIface::master != nullptr); + MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick); + } // now wait until all receiver threads complete the synchronisation auto lf = [this]{ return waitNum == 0; }; cv.wait(sync_lock, lf); + + // we are done + assert(state == SyncState::busy || state == SyncState::interrupted); + bool ret = (state != SyncState::interrupted); + state = SyncState::idle; + return ret; } void MultiIface::Sync::progress(MsgType msg) { - assert(msg == MsgType::cmdLeaveBarrier); - std::unique_lock sync_lock(lock); + switch (msg) { + case MsgType::cmdAtomicSyncAck: + assert(state == SyncState::busy && trigger == SyncTrigger::atomic); + break; + case MsgType::cmdPeriodicSyncAck: + assert(state == SyncState::busy && trigger == SyncTrigger::periodic); + break; + case MsgType::cmdCkptSyncAck: + assert(state == SyncState::busy && trigger == SyncTrigger::ckpt); + break; + case MsgType::cmdCkptSyncReq: + switch (state) { + case SyncState::busy: + if (trigger == SyncTrigger::ckpt) { + // We are already in a checkpoint sync but got another ckpt sync + // request. This may happen if two (or more) peer gem5 processes + // try to start a ckpt nearly at the same time. Incrementing + // waitNum here (before decrementing it below) effectively + // results in ignoring this new ckpt sync request. + waitNum++; + break; + } + assert (waitNum == recvThreadsNum); + state = SyncState::interrupted; + // we need to fall over here to handle "recvThreadsNum == 1" case + case SyncState::interrupted: + assert(trigger == SyncTrigger::periodic); + assert(waitNum >= 1); + if (waitNum == 1) { + exitSimLoop("checkpoint"); + } + break; + case SyncState::idle: + // There is no on-going sync so we got an async ckpt request. If we + // are the only receiver thread then we need to schedule the + // checkpoint. Otherwise, only change the state to 'asyncCkpt' and + // let the last receiver thread to schedule the checkpoint at the + // 'asyncCkpt' case. + // Note that a periodic or resume sync may start later and that can + // trigger a state change to 'interrupted' (so the checkpoint may + // get scheduled at 'interrupted' case finally). + assert(waitNum == 0); + state = SyncState::asyncCkpt; + waitNum = MultiIface::recvThreadsNum; + // we need to fall over here to handle "recvThreadsNum == 1" case + case SyncState::asyncCkpt: + assert(waitNum >= 1); + if (waitNum == 1) + exitSimLoop("checkpoint"); + break; + default: + panic("Unexpected state for checkpoint request message"); + break; + } + break; + default: + panic("Unknown msg type"); + break; + } waitNum--; - // Notify the simultaion thread - sync_lock.unlock(); - cv.notify_one(); + assert(state != SyncState::idle); + // Notify the simultaion thread if there is an on-going sync. + if (state != SyncState::asyncCkpt) { + sync_lock.unlock(); + cv.notify_one(); + } } void MultiIface::SyncEvent::start(Tick start, Tick interval) @@ -116,6 +235,18 @@ * Note that this is a global event so this process method will be called * by only exactly one thread. */ + // if we are draining the system then we must not start a periodic sync (as + // it is not sure that all peer gem5 will reach this tick before taking + // the checkpoint). + if (isDraining == true) { + assert(interrupted == false); + interrupted = true; + DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted due " + "draining\n"); + return; + } + if (interrupted == false) + scheduledAt = curTick(); /* * We hold the eventq lock at this point but the receiver thread may * need the lock to schedule new recv events while waiting for the @@ -125,11 +256,56 @@ */ curEventQueue()->unlock(); // we do a global sync here - MultiIface::sync->run(); + interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt); + // Global sync completed or got interrupted. // we are expected to exit with the eventq lock held curEventQueue()->lock(); - // schedule the next global sync event. - schedule(curTick() + repeat); + // schedule the next global sync event if this one completed. Otherwise (i.e. + // this one was interrupted by a checkpoint request), we will reschedule this + // one after the draining is complete. + if (!interrupted) + schedule(scheduledAt + repeat); +} + +void MultiIface::SyncEvent::resume() +{ + Tick sync_tick; + assert(!scheduled()); + if (interrupted) { + assert(curTick() >= scheduledAt); + // We have to complete the interrupted periodic sync asap. + // Note that this sync might be interrupted now again with a checkpoint + // request from a peer gem5... + sync_tick = curTick(); + schedule(sync_tick); + } else { + // So we completed the last periodic sync, let's find out the tick for + // next one + assert(curTick() > scheduledAt); + sync_tick = scheduledAt + repeat; + if (sync_tick < curTick()) + panic("Cannot resume periodic synchronisation"); + schedule(sync_tick); + } + DPRINTF(MultiEthernet, + "MultiIface::SyncEvent periodic sync resumed at %lld " + "(curTick:%lld)\n", sync_tick, curTick()); +} + +void MultiIface::SyncEvent::serialize(const std::string &base, std::ostream &os) +{ + // Save the periodic multi sync schedule information + paramOut(os, base + ".periodicSyncRepeat", repeat); + paramOut(os, base + ".periodicSyncInterrupted", interrupted); + paramOut(os, base + ".periodicSyncAt", scheduledAt); +} + +void MultiIface::SyncEvent::unserialize(const std::string &base, Checkpoint *cp, + const std::string §ion) +{ + paramIn(cp, section, base + ".periodicSyncRepeat", repeat); + paramIn(cp, section, base + ".periodicSyncInterrupted", interrupted); + paramIn(cp, section, base + ".periodicSyncAt", scheduledAt); } MultiIface::MultiIface(unsigned multi_rank, @@ -170,7 +346,7 @@ // Prepare a multi header packet for the Ethernet packet we want to // send out. - header_pkt.msgType = MultiHeaderPkt::MsgType::dataDescriptor; + header_pkt.msgType = MsgType::dataDescriptor; header_pkt.sendTick = curTick(); header_pkt.sendDelay = send_delay; @@ -183,8 +359,7 @@ // Send out the multi hedare packet followed by the Ethernet packet. sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress); sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress); - - DPRINTF(MultiEthernet, + DPRINTF(MultiEthernetPkt, "MultiIface::sendDataPacket() done size:%d send_delay:%llu " "src:0x%02x%02x%02x%02x%02x%02x " "dst:0x%02x%02x%02x%02x%02x%02x\n", @@ -224,7 +399,7 @@ // Compute the receive tick. It includes the send delay and the // simulated link delay. Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay; - DPRINTF(MultiEthernet, "MultiIface::recvThread() packet receive, " + DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, " "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick); @@ -326,6 +501,88 @@ recvThreadsNum++; } +void MultiIface::drainStart() { + // This can be called multiple times in the same drain cycle. + if (master == this) { + syncEvent->isDraining = true; + } +} + +void MultiIface::drainDone() { + if (master == this) { + assert(syncEvent->isDraining == true); + syncEvent->isDraining = false; + // We need to resume the interrupted periodic sync here now that the + // draining is done. If the last periodic sync completed before the + // checkpoint then the next one is already scheduled. + if (syncEvent->interrupted) + syncEvent->resume(); + } +} + +void MultiIface::serialize(const std::string &base, std::ostream &os) +{ + // Drain the multi interface before the checkpoint is taken. We cannot call + // this as part of the normal drain cycle because this multi sync has to be + // called exactly once after the system is fully drained. + // Note that every peer will take a checkpoint but they may take it at + // different ticks. + // This sync request may interrupt an on-going periodic sync in some peers. + sync->run(SyncTrigger::ckpt, curTick()); + + // Save the periodic multi sync status + syncEvent->serialize(base, os); + + unsigned n_rx_packets = recvQueue.size(); + if (scheduledRecvPacket != nullptr) + n_rx_packets++; + + paramOut(os, base + ".nRxPackets", n_rx_packets); + + if (n_rx_packets > 0) { + assert(recvDone->scheduled()); + scheduledRecvPacket->serialize(base + ".rxPacket[0]", os); + } + + for (unsigned i=1; i < n_rx_packets; i++) { + RecvInfo recv_info = recvQueue.impl().at(i-1); + recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), os); + Tick rx_tick = recv_info.second; + paramOut(os, base + csprintf(".rxTick[%d]", i), rx_tick); + } +} + +void MultiIface::unserialize(const std::string &base, Checkpoint *cp, + const std::string §ion) +{ + assert(recvQueue.size() == 0); + assert(scheduledRecvPacket == nullptr); + assert(recvDone->scheduled() == false); + + // restore periodic sync info + syncEvent->unserialize(base, cp, section); + + unsigned n_rx_packets; + paramIn(cp, section, base + ".nRxPackets", n_rx_packets); + + if (n_rx_packets > 0) { + scheduledRecvPacket = std::make_shared(16384); + scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp, section); + // Note: receive event will be scheduled when the link is unserialized + } + + for (unsigned i=1; i < n_rx_packets; i++) { + EthPacketPtr rx_packet = std::make_shared(16384); + rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), + cp, + section); + Tick rx_tick = 0; + paramIn(cp, section, base + csprintf(".rxTick[%d]", i), rx_tick); + assert(rx_tick > 0); + recvQueue.emplace(rx_packet,rx_tick); + } +} + void MultiIface::initRandom() { // Initialize the seed for random generator to avoid the same sequence @@ -338,17 +595,27 @@ void MultiIface::startPeriodicSync() { DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n"); - if (this == master) { + // Do a global sync here to ensure that peer gem5 processes are around + // (actually this may not be needed...) + sync->run(SyncTrigger::atomic, curTick()); + + // Start the periodic sync if it is a fresh simulation from scratch + if (curTick() == 0) { + if (this == master) { syncEvent->start(syncStart, syncRepeat); inform("Multi synchronisation activated: start at %lld, " "repeat at every %lld ticks.\n", syncStart, syncRepeat); + } else { + // In case another multiIface object requires different schedule for + // periodic sync than the master does. + syncEvent->adjust(syncStart, syncRepeat); + } } else { - // In case another multiIface object requires different schedule for - // periodic sync than the master does. - syncEvent->adjust(syncStart, syncRepeat); + // Schedule the next periodic sync if we are resuming from a checkpoint + if (this == master) + syncEvent->resume(); } DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n"); } - diff --git a/src/dev/multi_packet.hh b/src/dev/multi_packet.hh --- a/src/dev/multi_packet.hh +++ b/src/dev/multi_packet.hh @@ -72,7 +72,17 @@ /** * The msg type defines what informarion a multi header packet carries. */ - enum class MsgType { dataDescriptor, cmdEnterBarrier, cmdLeaveBarrier }; + enum class MsgType + { + dataDescriptor, + cmdPeriodicSyncReq, + cmdPeriodicSyncAck, + cmdCkptSyncReq, + cmdCkptSyncAck, + cmdAtomicSyncReq, + cmdAtomicSyncAck, + unknown + }; typedef struct { /** diff --git a/src/dev/tcp_iface.hh b/src/dev/tcp_iface.hh --- a/src/dev/tcp_iface.hh +++ b/src/dev/tcp_iface.hh @@ -109,7 +109,8 @@ return recvTCP(sock, buf, length); } - virtual void syncRaw() M5_ATTR_OVERRIDE; + virtual void syncRaw(MultiHeaderPkt::MsgType sync_req, + Tick sync_tick) M5_ATTR_OVERRIDE; public: /** diff --git a/src/dev/tcp_iface.cc b/src/dev/tcp_iface.cc --- a/src/dev/tcp_iface.cc +++ b/src/dev/tcp_iface.cc @@ -147,7 +147,7 @@ } void -TCPIface::syncRaw() +TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) { /* * Barrier is simply implemented by point-to-point messages to the server @@ -156,7 +156,8 @@ * sync request from all clients. */ MultiHeaderPkt::Header header_pkt; - header_pkt.msgType = MultiHeaderPkt::MsgType::cmdEnterBarrier; + header_pkt.msgType = sync_req; + header_pkt.sendTick = sync_tick; for (auto s: sockRegistry) sendTCP(s, (void *)&header_pkt, sizeof(header_pkt)); diff --git a/util/multi/Makefile b/util/multi/Makefile --- a/util/multi/Makefile +++ b/util/multi/Makefile @@ -46,7 +46,7 @@ vpath % $(M5_DIR)/build/$(M5_ARCH)/dev INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext -CCFLAGS= -g -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS) +CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS) default: tcp_server diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh --- a/util/multi/tcp_server.hh +++ b/util/multi/tcp_server.hh @@ -77,6 +77,14 @@ #include #include +#include "dev/etherpkt.hh" +#include "dev/multi_packet.hh" + +/** + * The maximum length of an Ethernet packet (allowing Jumbo frames). + */ +#define MAX_ETH_PACKET_LENGTH 9014 + class TCPServer { public: @@ -85,6 +93,9 @@ typedef MultiHeaderPkt::MsgType MsgType; private: + + enum + class SyncState { periodic, ckpt, asyncCkpt, atomic, idle }; /** * The Channel class encapsulates all the information about a client * and its current status. @@ -107,7 +118,7 @@ /** * Process an incoming command message. */ - void processCmd(MultiHeaderPkt::MsgType cmd); + void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick); public: @@ -122,7 +133,7 @@ /** * Current state of the channel wrt. multi synchronisation. */ - bool enteredBarrier; + SyncState state; /** * Multi rank of the client */ @@ -181,7 +192,11 @@ * As we dealt with only one message at a time, we can allocate and re-use * a single packet buffer (to hold any incoming data packet). */ - uint8_t packetBuffer[9000]; + uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH]; + /** + * Send tick of the current periodic sync. It is used for sanity check. + */ + Tick _periodicSyncTick; /** * The singleton server object. */ @@ -212,7 +227,21 @@ * @param st The state all channels should have if sync is complete. * @param ack The type of ack message to send out if the sync is compete. */ - void syncTryComplete(); + void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack); + /** + * Broadcast a request for checkpoint sync. + * + * @param ch The source channel of the checkpoint sync request. + */ + void ckptPropagate(Channel &ch); + /** + * Setter for current periodic send tick. + */ + void periodicSyncTick(Tick t) { _periodicSyncTick = t; } + /** + * Getter for current periodic send tick. + */ + Tick periodicSyncTick() { return _periodicSyncTick; } public: diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc --- a/util/multi/tcp_server.cc +++ b/util/multi/tcp_server.cc @@ -45,6 +45,8 @@ * Message server implementation using TCP stream sockets for parallel gem5 * runs. */ +#include "tcp_server.hh" + #include #include #include @@ -52,11 +54,6 @@ #include #include -#include - -#include "dev/etherpkt.hh" -#include "dev/multi_packet.hh" -#include "tcp_server.hh" using namespace std; @@ -64,9 +61,13 @@ #define PRINTF(...) fprintf(stderr, __VA_ARGS__) #ifdef DEBUG -#define DPRINTF(...) PRINTF(__VA_ARGS__) +static bool debugSetup = true; +static bool debugPeriodic = false; +static bool debugSync = true; +static bool debugPkt = false; +#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__) #else -#define DPRINTF(...) +#define DPRINTF(v,...) #endif #define inform(...) do { PRINTF("info: "); \ @@ -80,7 +81,7 @@ TCPServer *TCPServer::instance = nullptr; -TCPServer::Channel::Channel() : fd(-1), isAlive(false), enteredBarrier(false) +TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) { MultiHeaderPkt::clearAddress(address); } @@ -149,15 +150,67 @@ updateAddress(hdr_pkt.srcAddress); TCPServer::instance->xferData(hdr_pkt, *this); } else { - processCmd(hdr_pkt.msgType); + processCmd(hdr_pkt.msgType, hdr_pkt.sendTick); } } -void TCPServer::Channel::processCmd(MsgType cmd) +void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick) { - assert(enteredBarrier == false); - assert(cmd == MsgType::cmdEnterBarrier); - TCPServer::instance->syncTryComplete(); + switch (cmd) { + case MsgType::cmdAtomicSyncReq: + DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank); + assert(state == SyncState::idle); + state = SyncState::atomic; + TCPServer::instance->syncTryComplete(SyncState::atomic, + MsgType::cmdAtomicSyncAck); + break; + case MsgType::cmdPeriodicSyncReq: + DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick); + // sanity check + if (TCPServer::instance->periodicSyncTick() == 0) { + TCPServer::instance->periodicSyncTick(send_tick); + } else if ( TCPServer::instance->periodicSyncTick() != send_tick) { + panic("Out of order periodic sync request - rank:%d " + "(send_tick:%ld ongoing:%ld)", rank, send_tick, + TCPServer::instance->periodicSyncTick()); + } + switch (state) { + case SyncState::idle: + state = SyncState::periodic; + TCPServer::instance->syncTryComplete(SyncState::periodic, + MsgType::cmdPeriodicSyncAck); + break; + case SyncState::asyncCkpt: + // An async ckpt request has already been sent to this client and + // that will interrupt this periodic sync. We can simply drop this + // message. + break; + default: + panic("Unexpected state for periodic sync request (rank:%d)", + rank); + break; + } + break; + case MsgType::cmdCkptSyncReq: + DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank); + switch (state) { + case SyncState::idle: + TCPServer::instance->ckptPropagate(*this); + // we fall through here to complete #clients==1 case + case SyncState::asyncCkpt: + state = SyncState::ckpt; + TCPServer::instance->syncTryComplete(SyncState::ckpt, + MsgType::cmdCkptSyncAck); + break; + default: + panic("Unexpected state for ckpt sync request (rank:%d)", rank); + break; + } + break; + default: + panic("Unexpected header packet (rank:%d)",rank); + break; + } } TCPServer::TCPServer(unsigned clients_num, @@ -184,7 +237,7 @@ struct pollfd new_pollfd; Channel new_channel; - DPRINTF("Start listening on port %u ...\n", port); + DPRINTF(debugSetup, "Start listening on port %u ...\n", port); listen_sock = socket(AF_INET, SOCK_STREAM, 0); if ( listen_sock < 0 ) @@ -224,22 +277,23 @@ new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); clientsChannel.push_back(new_channel); - DPRINTF("New client connection addr:%u port:%hu rank:%d\n", + DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n", client_addr.sin_addr.s_addr, client_addr.sin_port, new_channel.rank); } ret = close(listen_sock); assert(ret == 0); - DPRINTF("Setup complete\n"); + DPRINTF(debugSetup, "Setup complete\n"); } void TCPServer::run() { - int nfd, num_active_clients = clientsPollFd.size(); + int nfd; + unsigned num_active_clients = clientsPollFd.size(); - DPRINTF("Entering run() loop\n"); + DPRINTF(debugSetup, "Entering run() loop\n"); while (num_active_clients == clientsPollFd.size()) { nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1); if (nfd == -1) @@ -264,7 +318,7 @@ pfd.events = 0; clientsChannel[i].isAlive = false; num_active_clients--; - DPRINTF("POLLRDHUP event"); + DPRINTF(debugSetup, "POLLRDHUP event"); } n++; if ((signed)n == nfd) @@ -272,7 +326,7 @@ } } } - DPRINTF("Exiting run() loop\n"); + DPRINTF(debugSetup, "Exiting run() loop\n"); } void @@ -284,8 +338,7 @@ if (n == 0) panic("recvRaw() failed"); - - DPRINTF("Incoming data packet (from rank %d) " + DPRINTF(debugPkt, "Incoming data packet (from rank %d) " "src:0x%02x%02x%02x%02x%02x%02x " "dst:0x%02x%02x%02x%02x%02x%02x\n", src.rank, @@ -301,7 +354,6 @@ hdr_pkt.dstAddress[3], hdr_pkt.dstAddress[4], hdr_pkt.dstAddress[5]); - // Now try to figure out the destination client(s). auto dst_info = addressMap.find(&hdr_pkt.dstAddress); @@ -319,12 +371,6 @@ } if (n == 0) { inform("Broadcast/multicast packet dropped\n"); - } else { - if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false) - DPRINTF("Broadcast/multicast packet is sent to %d dest\n", n); - else - DPRINTF("Unicast packet with unknown dest address is sent to " - "%d dest\n", n); } } else { // It is a unicast address with a known destination @@ -332,7 +378,7 @@ if ( dst->isAlive ) { dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt)); dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength); - DPRINTF("Unicast packet sent (to rank %d)\n",dst->rank); + DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank); } else { inform("Unicast packet dropped (destination exited)\n"); } @@ -340,22 +386,52 @@ } void -TCPServer::syncTryComplete() +TCPServer::syncTryComplete(SyncState st, MsgType ack) { // Check if the barrieris complete. If so then notify all the clients. for (auto &c : clientsChannel) { - if (c.isAlive && c.enteredBarrier == false) { + if (c.isAlive && (c.state != st)) { // sync not complete yet, stop here return; } } // Sync complete, send out the acks MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = MsgType::cmdLeaveBarrier; + hdr_pkt.msgType = ack; for (auto &c : clientsChannel) { if (c.isAlive) { c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.enteredBarrier = false; + c.state = SyncState::idle; + } + } + // Reset periodic send tick + _periodicSyncTick = 0; + //if (st != SyncState::periodic) + // DPRINTF("Sync COMPLETE\n"); + DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync, + "Sync COMPLETE\n"); +} + +void +TCPServer::ckptPropagate(Channel &ch) +{ + // Channel ch got a ckpt request that needs to be propagated to the other + // clients + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = MsgType::cmdCkptSyncReq; + for (auto &c : clientsChannel) { + if (c.isAlive && (&c != &ch)) { + switch (c.state) { + case SyncState::idle: + case SyncState::periodic: + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.state = SyncState::asyncCkpt; + break; + default: + panic("Unexpected state for ckpt sync request propagation " + "(rank:%d)\n",c.rank); + break; + } } } }