diff --git a/src/dev/multi_etherlink.hh b/src/dev/multi_etherlink.hh --- a/src/dev/multi_etherlink.hh +++ b/src/dev/multi_etherlink.hh @@ -223,6 +223,7 @@ virtual EtherInt *getEthPort(const std::string &if_name, int idx) M5_ATTR_OVERRIDE; + virtual void init() M5_ATTR_OVERRIDE; virtual void startup() M5_ATTR_OVERRIDE; # Node ID 84a0d3f1b3d914cf5c253489493fdd2918d366a0 # Parent 9c94d22af1e795cb94c185eccf936fd8ef90ac41 diff --git a/src/dev/multi_etherlink.cc b/src/dev/multi_etherlink.cc --- a/src/dev/multi_etherlink.cc +++ b/src/dev/multi_etherlink.cc @@ -138,14 +138,14 @@ MultiEtherLink::init() { DPRINTF(MultiEthernet,"MultiEtherLink::init() called\n"); - multiIface->initRandom(); + multiIface->init(); } void MultiEtherLink::startup() { DPRINTF(MultiEthernet,"MultiEtherLink::startup() called\n"); - multiIface->startPeriodicSync(); + multiIface->startup(); } void diff --git a/src/dev/multi_iface.hh b/src/dev/multi_iface.hh --- a/src/dev/multi_iface.hh +++ b/src/dev/multi_iface.hh @@ -89,6 +89,7 @@ #include "sim/core.hh" #include "sim/drain.hh" #include "sim/global_event.hh" +#include "sim/serialize.hh" class EventManager; @@ -97,70 +98,22 @@ */ class MultiIface : public Drainable, public Serializable { - public: - /*! - * The possible reasons a multi sync among gem5 peers is needed for. - */ - enum - class SyncTrigger { - periodic, /*!< Regular periodic sync. This can be interrupted by a - checkpoint sync request */ - ckpt, /*!< sync before taking a checkpoint */ - atomic /*!< sync that cannot be interrupted (e.g. sync at startup) */ - }; + protected: + typedef MultiHeaderPkt::MsgType MsgType; private: - typedef MultiHeaderPkt::MsgType MsgType; - - /** Sync State-Machine - \dot - digraph Sync { - node [shape=box, fontsize=10]; - idle -> busy - [ label="new trigger\n by run()" fontsize=8 ]; - busy -> busy - [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum > 1) || \n(msg==CkptSyncReq &&\ntrigger == ckpt)" fontsize=8 ]; - busy -> idle - [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum == 1)" fontsize=8 ]; - busy -> interrupted - [ label="new message by progress():\n(msg == CkptSyncReq &&\ntrigger == periodic)" fontsize=8 ]; - idle -> asyncCkpt - [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; - asyncCkpt -> asyncCkpt - [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; - asyncCkpt -> busy - [ label="new trigger by run():\ntrigger == ckpt" fontsize=8 ]; - asyncCkpt -> idle - [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum == 0) " fontsize=8 ]; - asyncCkpt -> interrupted - [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum > 0) " fontsize=8 ]; - interrupted -> interrupted - [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum > 1)" fontsize=8 ]; - interrupted -> idle - [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum == 1)" fontsize=8 ]; - } - \enddot - */ + class SyncEvent; /** @class Sync * This class implements global sync operations among gem5 peer processes. * * @note This class is used as a singleton object (shared by all MultiIface * objects). */ - class Sync + class Sync : public Serializable { private: - /*! - * Internal state of the sync singleton object. - */ - enum class SyncState { - busy, /*!< There is an on-going sync. */ - interrupted, /*!< An on-going periodic sync was interrupted. */ - asyncCkpt, /*!< A checkpoint (sim_exit) is already scheduled */ - idle /*!< There is no active sync. */ - }; /** - * The lock to protect access to the MultiSync object. + * The lock to protect access to the Sync object. */ std::mutex lock; /** @@ -175,42 +128,73 @@ */ unsigned waitNum; /** - * The trigger for the most recent sync. + * Flag is set if m5_exit pseudo instruction encountered */ - SyncTrigger trigger; + bool needExit; /** - * Map sync triggers to request messages. + * Flag is set if m5_ckpt pseudo instruction encountered */ - std::array triggerToMsg = {{ - MsgType::cmdPeriodicSyncReq, - MsgType::cmdCkptSyncReq, - MsgType::cmdAtomicSyncReq - }}; + bool needCkpt; + /** + * Flag is set if exit is permitted upon sync completion + */ + bool doExit; + /** + * Flag is set if taking a ckpt is permitted upon sync completion + */ + bool doCkpt; + /** + * Flag is set if the sync before taking a ckpt is done. + */ + bool isCkptSyncDone; + /** + * Tick for the next periodic sync (if the event is not scheduled yet) + */ + Tick nextAt; + /** + * The repeat value for the next periodic sync + */ + Tick nextRepeat; - /** - * Current sync state. - */ - SyncState state; + friend class SyncEvent; public: /** + * Initialize periodic sync params. + * + * @param start Start tick for multi synchronisation + * @param repeat Frequency of multi synchronisation + * + */ + void init(Tick start, Tick repeat); + /** * Core method to perform a full multi sync. - * - * @param t Sync trigger. - * @param sync_tick The tick the sync was expected to happen at. - * @return true if the sync completed, false if it was interrupted. - * - * @note In case of an interrupted periodic sync, sync_tick can be less - * than curTick() when we resume (i.e. re-run) it */ - bool run(SyncTrigger t, Tick sync_tick); + void run(bool same_tick); /** - * Callback when the receiver thread gets a sync message. + * Callback when the receiver thread gets a sync ack message. */ - void progress(MsgType m); + void progress(Tick max_req_tick, + Tick next_repeat, + bool do_ckpt, + bool do_exit); - Sync() : waitNum(0), state(SyncState::idle) {} + + Sync() : waitNum(0), needExit(false), needCkpt(false), + doExit(false), doCkpt(false), + nextAt(std::numeric_limits::max()), + nextRepeat(std::numeric_limits::max()) {} ~Sync() {} + + void requestCkpt(); + void requestExit(); + + void drainStart() { isCkptSyncDone = false; } + void drainComplete(); + + void serialize(CheckpointOut &cp) const M5_ATTR_OVERRIDE; + void unserialize(CheckpointIn &cp) M5_ATTR_OVERRIDE; + }; @@ -226,63 +210,25 @@ * 3. Simulation thread(s) then waits until all receiver threads * completes the ongoing barrier. The global sync event is done. */ - class SyncEvent : public GlobalSyncEvent, public Serializable + class SyncEvent : public GlobalSyncEvent { public: /** - * Flag to indicate that the most recent periodic sync was interrupted - * (by a checkpoint request). - */ - bool interrupted; - /** - * The tick when the most recent periodic synchronisation was scheduled - * at. - */ - Tick scheduledAt; - /** - * Flag to indicate an on-going drain cycle. - */ - bool isDraining; - - public: - /** - * Only the firstly instanstiated MultiIface object will + * Only the firstly instantiated MultiIface object will * call this constructor. */ - SyncEvent() : GlobalSyncEvent(Default_Pri, 0), interrupted(false), - scheduledAt(0), isDraining(false) {} + SyncEvent() : GlobalSyncEvent(Sim_Exit_Pri, 0) {} - ~SyncEvent() { assert (scheduled() == false); } + ~SyncEvent() {} /** * Schedule the first periodic sync event. - * - * @param start Start tick for multi synchronisation - * @param repeat Frequency of multi synchronisation - * */ - void start(Tick start, Tick repeat); - /** - * Reschedule (if necessary) the periodic sync event. - * - * @param start Start tick for multi synchronisation - * @param repeat Frequency of multi synchronisation - * - * @note Useful if we have multiple MultiIface objects with - * different 'start' and 'repeat' values for global sync. - */ - void adjust(Tick start, Tick repeat); + void start(); /** * This is a global event so process() will be called by each * simulation threads. (See further comments in the .cc file.) */ void process() M5_ATTR_OVERRIDE; - /** - * Schedule periodic sync when resuming from a checkpoint. - */ - void resume(); - - void serialize(CheckpointOut &cp) const; - void unserialize(CheckpointIn &cp); }; /** * Class to encapsulate information about data packets received. @@ -522,13 +468,14 @@ virtual bool recvRaw(void *buf, unsigned length) = 0; /** * Low level request for synchronisation among gem5 processes. Only one - * MultiIface object needs to call this (in each gem5 process) to trigger - * a multi sync. + * MultiIface object needs to call this (in each gem5 process) to trigger a + * multi sync. * - * @param sync_req Sync request command. - * @param sync_tick The tick when sync is expected to happen in the sender. + * @param repeat The current repeat value for periodic sync. */ - virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0; + virtual void syncRaw(MsgType cmd, + bool same_tick = false, + Tick repeat = std::numeric_limits::max()) = 0; /** * Initialize hook for the underlying messaging system. @@ -598,27 +545,28 @@ * @param link_delay The link delay for the simulated Ethernet link. */ void spawnRecvThread(Event *recv_done, Tick link_delay); - /** - * Initialize the random number generator with a different seed in each - * peer gem5 process. - */ - void initRandom(); DrainState drain() M5_ATTR_OVERRIDE; + void drainResume() M5_ATTR_OVERRIDE; + void init(); + void startup(); + void serialize(CheckpointOut &cp) const M5_ATTR_OVERRIDE; + void unserialize(CheckpointIn &cp) M5_ATTR_OVERRIDE; /** - * Callback when draining is complete. + * Initiate the exit from the simulation. + * + * @return False if we are in multi mode and a collaborative exit is + * initiated, True otherwise. */ - void drainDone(); - + static bool readyToExit(Tick delay); /** - * Initialize the periodic synchronisation among peer gem5 processes. + * Initiate taking a checkpoint + * + * @return False if we are in multi mode and a collaborative checkpoint is + * initiated, True otherwise. */ - void startPeriodicSync(); - - void serialize(CheckpointOut &cp) const; - void unserialize(CheckpointIn &cp); - + static bool readyToCkpt(Tick delay, Tick period); /** * Getter for the multi rank param. */ @@ -627,7 +575,6 @@ * Getter for the multi size param. */ static uint64_t sizeParam(); -}; - + }; #endif diff --git a/src/dev/multi_iface.cc b/src/dev/multi_iface.cc --- a/src/dev/multi_iface.cc +++ b/src/dev/multi_iface.cc @@ -60,172 +60,136 @@ unsigned MultiIface::recvThreadsNum = 0; MultiIface *MultiIface::master = nullptr; -bool -MultiIface::Sync::run(SyncTrigger t, Tick sync_tick) +void +MultiIface::Sync::init(Tick start_tick, Tick repeat_tick) +{ + if (start_tick < nextAt) { + nextAt = start_tick; + inform("Next multi synchronisation tick is changed to %lu.\n", nextAt); + } + + if (repeat_tick == 0) + panic("Multi synchronisation interval must be greater than zero"); + + if (repeat_tick < nextRepeat) { + nextRepeat = repeat_tick; + inform("Multi synchronisation interval is changed to %lu.\n", + nextRepeat); + } +} + +void +MultiIface::Sync::run(bool same_tick) { std::unique_lock sync_lock(lock); - trigger = t; - if (trigger != SyncTrigger::periodic) { - DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n", - (unsigned)trigger); - } - - switch (state) { - case SyncState::asyncCkpt: - switch (trigger) { - case SyncTrigger::ckpt: - assert(MultiIface::syncEvent->interrupted == false); - state = SyncState::busy; - break; - case SyncTrigger::periodic: - if (waitNum == 0) { - // So all recv threads got an async checkpoint request already - // and a simExit is scheduled at the end of the current tick - // (i.e. it is a periodic sync scheduled at the same tick as - // the simExit). - state = SyncState::idle; - DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " - "due to async ckpt scheduled\n"); - return false; - } else { - // we still need to wait for some receiver thread to get the - // aysnc ckpt request. We are going to proceed as 'interrupted' - // periodic sync. - state = SyncState::interrupted; - DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " - "due to ckpt request is coming in\n"); - } - break; - case SyncTrigger::atomic: - assert(trigger != SyncTrigger::atomic); - } - break; - case SyncState::idle: - state = SyncState::busy; - break; - // Only one sync can be active at any time - case SyncState::interrupted: - case SyncState::busy: - assert(state != SyncState::interrupted); - assert(state != SyncState::busy); - break; - } - // Kick-off the sync unless we are in the middle of an interrupted - // periodic sync - if (state != SyncState::interrupted) { - assert(waitNum == 0); - waitNum = MultiIface::recvThreadsNum; - // initiate the global synchronisation - assert(MultiIface::master != nullptr); - MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick); - } + assert(waitNum == 0); + waitNum = MultiIface::recvThreadsNum; + // initiate the global synchronisation + MultiIface::master->syncRaw(MsgType::cmdSyncReq, same_tick, nextRepeat); // now wait until all receiver threads complete the synchronisation auto lf = [this]{ return waitNum == 0; }; cv.wait(sync_lock, lf); - - // we are done - assert(state == SyncState::busy || state == SyncState::interrupted); - bool ret = (state != SyncState::interrupted); - state = SyncState::idle; - return ret; + // global barrier is done } void -MultiIface::Sync::progress(MsgType msg) +MultiIface::Sync::progress(Tick max_req_tick, + Tick next_repeat, + bool do_ckpt, + bool do_exit) { std::unique_lock sync_lock(lock); + assert(waitNum > 0); + assert(waitNum == 1 || (nextAt == max_req_tick && nextRepeat == next_repeat && + doCkpt == do_ckpt && doExit == do_exit)); - switch (msg) { - case MsgType::cmdAtomicSyncAck: - assert(state == SyncState::busy && trigger == SyncTrigger::atomic); - break; - case MsgType::cmdPeriodicSyncAck: - assert(state == SyncState::busy && trigger == SyncTrigger::periodic); - break; - case MsgType::cmdCkptSyncAck: - assert(state == SyncState::busy && trigger == SyncTrigger::ckpt); - break; - case MsgType::cmdCkptSyncReq: - switch (state) { - case SyncState::busy: - if (trigger == SyncTrigger::ckpt) { - // We are already in a checkpoint sync but got another ckpt - // sync request. This may happen if two (or more) peer gem5 - // processes try to start a ckpt nearly at the same time. - // Incrementing waitNum here (before decrementing it below) - // effectively results in ignoring this new ckpt sync request. - waitNum++; - break; - } - assert (waitNum == recvThreadsNum); - state = SyncState::interrupted; - // we need to fall over here to handle "recvThreadsNum == 1" case - case SyncState::interrupted: - assert(trigger == SyncTrigger::periodic); - assert(waitNum >= 1); - if (waitNum == 1) { - exitSimLoop("checkpoint"); - } - break; - case SyncState::idle: - // There is no on-going sync so we got an async ckpt request. If we - // are the only receiver thread then we need to schedule the - // checkpoint. Otherwise, only change the state to 'asyncCkpt' and - // let the last receiver thread to schedule the checkpoint at the - // 'asyncCkpt' case. - // Note that a periodic or resume sync may start later and that can - // trigger a state change to 'interrupted' (so the checkpoint may - // get scheduled at 'interrupted' case finally). - assert(waitNum == 0); - state = SyncState::asyncCkpt; - waitNum = MultiIface::recvThreadsNum; - // we need to fall over here to handle "recvThreadsNum == 1" case - case SyncState::asyncCkpt: - assert(waitNum >= 1); - if (waitNum == 1) - exitSimLoop("checkpoint"); - break; - default: - panic("Unexpected state for checkpoint request message"); - break; - } - break; - default: - panic("Unknown msg type"); - break; - } + nextAt = max_req_tick; + nextRepeat = next_repeat; + doCkpt = do_ckpt; + doExit = do_exit; + waitNum--; - assert(state != SyncState::idle); - // Notify the simultaion thread if there is an on-going sync. - if (state != SyncState::asyncCkpt) { + // Notify the simulation thread if the on-going sync is complete + if (waitNum == 0) { sync_lock.unlock(); cv.notify_one(); } } -void MultiIface::SyncEvent::start(Tick start, Tick interval) +void +MultiIface::Sync::requestCkpt() { - assert(!scheduled()); - if (interval == 0) - panic("Multi synchronisation period must be greater than zero"); - repeat = interval; - schedule(start); + std::lock_guard sync_lock(lock); + master->syncRaw(MsgType::cmdCkptReq); + needCkpt = true; } void -MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick) +MultiIface::Sync::requestExit() { - // The new multi interface may require earlier start of the - // synchronisation. - assert(scheduled() == true); - if (start_tick < when()) - reschedule(start_tick); - // The new multi interface may require more frequent synchronisation. - if (repeat == 0) - panic("Multi synchronisation period must be greater than zero"); - if (repeat < repeat_tick) - repeat = repeat_tick; + std::lock_guard sync_lock(lock); + master->syncRaw(MsgType::cmdExitReq); + needExit = true; +} + +void +MultiIface::Sync::drainComplete() +{ + if (!isCkptSyncDone) { + // The first MultiIface object called right before writing the + // checkpoint. We need to drain the underlying physical network here + // Note that other gem5 peers may enter this barrier at different + // ticks due to draining. + run(false); + // Only the "first" MultiIface object has to perform the sync + isCkptSyncDone = true; + } +} + +void +MultiIface::Sync::serialize(CheckpointOut &cp) const +{ + SERIALIZE_SCALAR(needExit); +} + +void +MultiIface::Sync::unserialize(CheckpointIn &cp) +{ + UNSERIALIZE_SCALAR(needExit); +} + +void +MultiIface::SyncEvent::start() +{ + // Note that this may be called either from startup() or drainResume() + + // Store our initial start and repeat value + Tick start = MultiIface::sync->nextAt; + repeat = MultiIface::sync->nextRepeat; + // Do a global barrier to figure out the common start tick and repeat + // for periodic sync + MultiIface::sync->run(curTick() == 0); + + assert(!MultiIface::sync->doCkpt); + assert(!MultiIface::sync->doExit); + assert(MultiIface::sync->nextAt >= curTick()); + assert(MultiIface::sync->nextRepeat <= repeat); + + // if this is called at tick 0 then we use the config start param otherwise + // the maximum of the current tick of all gem5 peers + if (curTick() == 0) { + assert(!scheduled()); + assert(MultiIface::sync->nextAt == 0); + schedule(start); + } else { + if (scheduled()) + reschedule(MultiIface::sync->nextAt); + else + schedule(MultiIface::sync->nextAt); + } + inform("Multi sync scheduled at %lu and repeats %lu\n", when(), + MultiIface::sync->nextRepeat); } void @@ -235,18 +199,6 @@ * Note that this is a global event so this process method will be called * by only exactly one thread. */ - // if we are draining the system then we must not start a periodic sync (as - // it is not sure that all peer gem5 will reach this tick before taking - // the checkpoint). - if (isDraining == true) { - assert(interrupted == false); - interrupted = true; - DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted " - "due to draining\n"); - return; - } - if (interrupted == false) - scheduledAt = curTick(); /* * We hold the eventq lock at this point but the receiver thread may * need the lock to schedule new recv events while waiting for the @@ -254,57 +206,21 @@ * Note that the other simulation threads also release their eventq * locks while waiting for us due to the global event semantics. */ - curEventQueue()->unlock(); - // we do a global sync here - interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt); - // Global sync completed or got interrupted. - // we are expected to exit with the eventq lock held - curEventQueue()->lock(); - // schedule the next global sync event if this one completed. Otherwise - // (i.e. this one was interrupted by a checkpoint request), we will - // reschedule this one after the draining is complete. - if (!interrupted) - schedule(scheduledAt + repeat); -} + { + EventQueue::ScopedRelease sr(curEventQueue()); + // we do a global sync here that is supposed to happen at the same + // tick in all gem5 peers + MultiIface::sync->run(true); + // global sync completed + } + if (MultiIface::sync->doCkpt) + exitSimLoop("checkpoint"); + if (MultiIface::sync->doExit) + exitSimLoop("exit request from gem5 peers"); -void MultiIface::SyncEvent::resume() -{ - Tick sync_tick; - assert(!scheduled()); - if (interrupted) { - assert(curTick() >= scheduledAt); - // We have to complete the interrupted periodic sync asap. - // Note that this sync might be interrupted now again with a checkpoint - // request from a peer gem5... - sync_tick = curTick(); - schedule(sync_tick); - } else { - // So we completed the last periodic sync, let's find out the tick for - // next one - assert(curTick() > scheduledAt); - sync_tick = scheduledAt + repeat; - if (sync_tick < curTick()) - panic("Cannot resume periodic synchronisation"); - schedule(sync_tick); - } - DPRINTF(MultiEthernet, - "MultiIface::SyncEvent periodic sync resumed at %lld " - "(curTick:%lld)\n", sync_tick, curTick()); -} - -void MultiIface::SyncEvent::serialize(CheckpointOut &cp) const -{ - // Save the periodic multi sync schedule information - SERIALIZE_SCALAR(repeat); - SERIALIZE_SCALAR(interrupted); - SERIALIZE_SCALAR(scheduledAt); -} - -void MultiIface::SyncEvent::unserialize(CheckpointIn &cp) -{ - UNSERIALIZE_SCALAR(repeat); - UNSERIALIZE_SCALAR(interrupted); - UNSERIALIZE_SCALAR(scheduledAt); + // schedule the next periodic sync + repeat = MultiIface::sync->nextRepeat; + schedule(curTick() + repeat); } void @@ -607,8 +523,12 @@ if (header.msgType == MsgType::dataDescriptor) { recvData(header); } else { + assert(header.msgType == MsgType::cmdSyncAck); // everything else must be synchronisation related command - sync->progress(header.msgType); + sync->progress(header.maxSyncReqTick, + header.syncRepeat, + header.doCkpt, + header.doExit); } } } @@ -632,21 +552,16 @@ // This can be called multiple times in the same drain cycle. if (master == this) { - syncEvent->isDraining = true; + sync->drainStart(); } return DrainState::Drained; } -void MultiIface::drainDone() { +void +MultiIface::drainResume() { if (master == this) { - assert(syncEvent->isDraining == true); - syncEvent->isDraining = false; - // We need to resume the interrupted periodic sync here now that the - // draining is done. If the last periodic sync completed before the - // checkpoint then the next one is already scheduled. - if (syncEvent->interrupted) - syncEvent->resume(); + syncEvent->start(); } recvScheduler.resumeRecvTicks(); } @@ -657,17 +572,13 @@ // Drain the multi interface before the checkpoint is taken. We cannot call // this as part of the normal drain cycle because this multi sync has to be // called exactly once after the system is fully drained. - // Note that every peer will take a checkpoint but they may take it at - // different ticks. - // This sync request may interrupt an on-going periodic sync in some peers. - sync->run(SyncTrigger::ckpt, curTick()); + sync->drainComplete(); SERIALIZE_ARRAY(networkAddress, sizeof(networkAddress)); recvScheduler.serializeSection(cp, "recvScheduler"); - - // Save the periodic multi sync status - if (this == master) - syncEvent->serializeSection(cp, "syncEvent"); + if (this == master) { + sync->serializeSection(cp, "Sync"); + } } void @@ -675,14 +586,20 @@ { UNSERIALIZE_ARRAY(networkAddress, sizeof(networkAddress)); recvScheduler.unserializeSection(cp, "recvScheduler"); - - // restore periodic sync info - if (this == master) - syncEvent->unserializeSection(cp, "syncEvent"); + if (this == master) { + sync->unserializeSection(cp, "Sync"); + } } -void MultiIface::initRandom() +void +MultiIface::init() { + // Adjust the periodic sync start and interval. Different MultiIface + // might have different requirements. The singleton sync object + // will select the minimum values for both params. + assert(sync != nullptr); + sync->init(syncStart, syncRepeat); + // Initialize the seed for random generator to avoid the same sequence // in all gem5 peer processes assert(master != nullptr); @@ -690,31 +607,52 @@ random_mt.init(5489 * (rank+1) + 257); } -void MultiIface::startPeriodicSync() +void +MultiIface::startup() { - DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n"); - // Do a global sync here to ensure that peer gem5 processes are around - // (actually this may not be needed...) - sync->run(SyncTrigger::atomic, curTick()); + DPRINTF(MultiEthernet, "MultiIface::startup() started\n"); + // Now that everything is up-to-date give the underlying messaging system + // a chance to access any information (e.g. networkAddress restored from + // a checkpoint) + initRaw(); - // Start the periodic sync if it is a fresh simulation from scratch - if (curTick() == 0) { - if (this == master) { - syncEvent->start(syncStart, syncRepeat); - inform("Multi synchronisation activated: start at %lld, " - "repeat at every %lld ticks.\n", - syncStart, syncRepeat); - } else { - // In case another multiIface object requires different schedule - // for periodic sync than the master does. - syncEvent->adjust(syncStart, syncRepeat); - } - } else { - // Schedule the next periodic sync if resuming from a checkpoint - if (this == master) - syncEvent->resume(); + // If this run is a resume from a checkpoint than we schedule the first + // periodic sync in drainResume() + if (curTick() == 0 && this == master) + syncEvent->start(); + + DPRINTF(MultiEthernet, "MultiIface::startup() done\n"); +} + +bool +MultiIface::readyToCkpt(Tick delay, Tick period) +{ + bool ret = true; + DPRINTF(MultiEthernet, "MultiIface::readyToCkpt() called, delay:%lu " + "period:%lu\n", delay, period); + if (master) { + sync->requestCkpt(); + ret = false; + if (delay != 0 || period != 0) + inform("Non zero delay or period for m5_ckpt is ignored in " + "multi-gem5 mode\n"); } - DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n"); + return ret; +} + +bool +MultiIface::readyToExit(Tick delay) +{ + bool ret = true; + DPRINTF(MultiEthernet, "MultiIface::readyToExit() called, delay:%lu\n", + delay); + if (master) { + sync->requestExit(); + ret = false; + if (delay != 0) + inform("Non zero delay for m5_exit is ignored in multi-gem5 mode\n"); + } + return ret; } uint64_t diff --git a/src/dev/multi_packet.hh b/src/dev/multi_packet.hh --- a/src/dev/multi_packet.hh +++ b/src/dev/multi_packet.hh @@ -76,29 +76,42 @@ enum class MsgType { dataDescriptor, - cmdPeriodicSyncReq, - cmdPeriodicSyncAck, - cmdCkptSyncReq, - cmdCkptSyncAck, - cmdAtomicSyncReq, - cmdAtomicSyncAck, + cmdSyncReq, + cmdSyncAck, + cmdCkptReq, + cmdExitReq, unknown }; struct Header { /** - * The msg type field is valid for all header packets. In case of - * a synchronisation control command this is the only valid field. + * The msg type field is valid for all header packets. + * + * @note senderRank is used with data packets while collFlags are used + * by sync ack messages to trigger collective ckpt or exit events. */ MsgType msgType; - Tick sendTick; - Tick sendDelay; - unsigned senderRank; - /** - * Actual length of the simulated Ethernet packet. - */ - unsigned dataPacketLength; + union { + Tick sendTick; + Tick maxSyncReqTick; + }; + union { + Tick sendDelay; + Tick syncRepeat; + }; + union { + unsigned senderRank; + bool sameTick; + bool doCkpt; + }; + union { + /** + * Actual length of the simulated Ethernet packet. + */ + unsigned dataPacketLength; + bool doExit; + }; /** * Source MAC address. */ diff --git a/src/dev/tcp_iface.hh b/src/dev/tcp_iface.hh --- a/src/dev/tcp_iface.hh +++ b/src/dev/tcp_iface.hh @@ -110,11 +110,11 @@ return recvTCP(sock, buf, length); } - void - syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) M5_ATTR_OVERRIDE; + void syncRaw(MultiHeaderPkt::MsgType sync_req, + bool same_tick, + Tick sync_tick) M5_ATTR_OVERRIDE; - void - initRaw() M5_ATTR_OVERRIDE; + void initRaw() M5_ATTR_OVERRIDE; public: /** diff --git a/src/dev/tcp_iface.cc b/src/dev/tcp_iface.cc --- a/src/dev/tcp_iface.cc +++ b/src/dev/tcp_iface.cc @@ -159,7 +159,7 @@ } void -TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) +TCPIface::syncRaw(MsgType cmd, bool same_tick, Tick repeat) { /* * Barrier is simply implemented by point-to-point messages to the server @@ -167,11 +167,13 @@ * The server will send back an 'ack' message when it gets the * sync request from all clients. */ - MultiHeaderPkt::Header header_pkt; - header_pkt.msgType = sync_req; - header_pkt.sendTick = sync_tick; + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = cmd; + hdr_pkt.sendTick = curTick(); + hdr_pkt.sameTick = same_tick; + hdr_pkt.syncRepeat = repeat; - for (auto s : sockRegistry) - sendTCP(s, (void *)&header_pkt, sizeof(header_pkt)); + for (auto s: sockRegistry) + sendTCP(s, (void *)&hdr_pkt, sizeof(hdr_pkt)); } diff --git a/src/sim/global_event.hh b/src/sim/global_event.hh --- a/src/sim/global_event.hh +++ b/src/sim/global_event.hh @@ -219,7 +219,7 @@ }; GlobalSyncEvent(Priority p, Flags f) - : Base(p, f) + : Base(p, f), repeat(0) { } GlobalSyncEvent(Tick when, Tick _repeat, Priority p, Flags f) diff --git a/src/sim/pseudo_inst.cc b/src/sim/pseudo_inst.cc --- a/src/sim/pseudo_inst.cc +++ b/src/sim/pseudo_inst.cc @@ -359,8 +359,10 @@ m5exit(ThreadContext *tc, Tick delay) { DPRINTF(PseudoInst, "PseudoInst::m5exit(%i)\n", delay); - Tick when = curTick() + delay * SimClock::Int::ns; - exitSimLoop("m5_exit instruction encountered", 0, when, 0, true); + if (MultiIface::readyToExit(delay)) { + Tick when = curTick() + delay * SimClock::Int::ns; + exitSimLoop("m5_exit instruction encountered", 0, when, 0, true); + } } void @@ -516,10 +518,11 @@ if (!tc->getCpuPtr()->params()->do_checkpoint_insts) return; - Tick when = curTick() + delay * SimClock::Int::ns; - Tick repeat = period * SimClock::Int::ns; - - exitSimLoop("checkpoint", 0, when, repeat); + if (MultiIface::readyToCkpt(delay, period)) { + Tick when = curTick() + delay * SimClock::Int::ns; + Tick repeat = period * SimClock::Int::ns; + exitSimLoop("checkpoint", 0, when, repeat); + } } uint64_t diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh --- a/util/multi/tcp_server.hh +++ b/util/multi/tcp_server.hh @@ -77,7 +77,6 @@ #include #include -#include "dev/etherpkt.hh" #include "dev/multi_packet.hh" /** @@ -93,9 +92,6 @@ typedef MultiHeaderPkt::MsgType MsgType; private: - - enum - class SyncState { periodic, ckpt, asyncCkpt, atomic, idle }; /** * The Channel class encapsulates all the information about a client * and its current status. @@ -111,8 +107,7 @@ /** * Process an incoming command message. */ - void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick); - + void processCmd(Header &hdr_packet); public: /** @@ -120,19 +115,27 @@ */ int fd; /** - * Is client connected? + * Flag for on-going sync */ - bool isAlive; + bool isSync; /** - * Current state of the channel wrt. multi synchronisation. - */ - SyncState state; - /** - * Multi rank of the client + * Multi rank of the client */ unsigned rank; /** - * The MAC address of the client. + * Flag to is true if client is connected + */ + bool connected; + /** + * Flag to is true if client has requested exit + */ + bool needExit; + /** + * Flag to is true if client has requested checkppint + */ + bool needCkpt; + /** + * Current network address of the client */ AddressType address; /** @@ -148,8 +151,6 @@ public: Channel(); ~Channel () {} - - /** * Receive and process the next incoming header packet. */ @@ -177,7 +178,6 @@ */ unsigned recvRaw(void *buf, unsigned size) const; }; - /** * The array of socket descriptors needed by the poll() system call. */ @@ -186,8 +186,6 @@ * Array holding all clients info. */ std::vector clientsChannel; - - /** * We use a map to select the target client based on the destination * MAC address. @@ -206,9 +204,33 @@ */ uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH]; /** - * Send tick of the current periodic sync. It is used for sanity check. + * The repeat value for the next periodic sync */ - Tick _periodicSyncTick; + Tick nextSyncRepeat; + /** + * Max send tick of the current sync. + */ + Tick maxSyncReqTick; + /* + * Is the current sync must happen at the same tick in clients? + */ + bool isSameTickSync; + /* + * Number of current sync requets. + */ + unsigned numSyncReq; + /* + * Number of current ckpt requests. + */ + unsigned numCkptReq; + /* + * Number of current exit requests. + */ + unsigned numExitReq; + /* + * Number of connected clients. + */ + unsigned numConnectedClients; /** * The singleton server object. */ @@ -219,8 +241,8 @@ * @param listen_port The port we are listening on for new client * connection requests. * @param nclients The number of clients to connect to. - * @param timeout Timeout in sec to complete the setup phase - * (i.e. all gem5 establish socket connections) + * @param timeout Timeout in sec to complete the setup phase (i.e. all gem5 + * establish socket connections) */ void construct(unsigned listen_port, unsigned nclients, int timeout); /** @@ -232,27 +254,17 @@ */ void xferData(const Header &hdr, const Channel &ch); /** - * Check if the current round of a synchronisation is completed and notify - * the clients if it is so. + * Process a new sync request. + */ + void syncProgress(Tick send_tick, Tick sync_repeat, bool is_periodic, unsigned rank); + /** + * Process a client exit event * - * @param st The state all channels should have if sync is complete. - * @param ack The type of ack message to send out if the sync is compete. + * @param ch The channel on which the client exit event occured. + * + * @return False if this event must trigger a global exit. */ - void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack); - /** - * Broadcast a request for checkpoint sync. - * - * @param ch The source channel of the checkpoint sync request. - */ - void ckptPropagate(Channel &ch); - /** - * Setter for current periodic send tick. - */ - void periodicSyncTick(Tick t) { _periodicSyncTick = t; } - /** - * Getter for current periodic send tick. - */ - Tick periodicSyncTick() { return _periodicSyncTick; } + bool processExitEvent(Channel &ch); public: diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc --- a/util/multi/tcp_server.cc +++ b/util/multi/tcp_server.cc @@ -53,6 +53,7 @@ #include #include #include +#include #include "tcp_server.hh" @@ -82,7 +83,8 @@ TCPServer *TCPServer::instance = nullptr; -TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) +TCPServer::Channel::Channel() : + fd(-1), isSync(false), connected(false), needExit(false), needCkpt(false) { MultiHeaderPkt::clearAddress(address); newAddress.second = false; @@ -149,6 +151,8 @@ ssize_t n; Header hdr_pkt; + assert (connected); + n = recvRaw(&hdr_pkt, sizeof(hdr_pkt)); if (n == 0) { @@ -161,62 +165,42 @@ updateAddress(hdr_pkt.srcAddress); TCPServer::instance->xferData(hdr_pkt, *this); } else { - processCmd(hdr_pkt.msgType, hdr_pkt.sendTick); + processCmd(hdr_pkt); } } -void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick) +void +TCPServer::Channel::processCmd(Header &hdr_pkt) { - switch (cmd) { - case MsgType::cmdAtomicSyncReq: - DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank); - assert(state == SyncState::idle); - state = SyncState::atomic; - TCPServer::instance->syncTryComplete(SyncState::atomic, - MsgType::cmdAtomicSyncAck); + switch (hdr_pkt.msgType) { + case MsgType::cmdSyncReq: + assert(isSync == false); + isSync = true; + TCPServer::instance->syncProgress( + hdr_pkt.sendTick, + hdr_pkt.syncRepeat, + hdr_pkt.sameTick, + rank); break; - case MsgType::cmdPeriodicSyncReq: - DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick); - // sanity check - if (TCPServer::instance->periodicSyncTick() == 0) { - TCPServer::instance->periodicSyncTick(send_tick); - } else if ( TCPServer::instance->periodicSyncTick() != send_tick) { - panic("Out of order periodic sync request - rank:%d " - "(send_tick:%ld ongoing:%ld)", rank, send_tick, - TCPServer::instance->periodicSyncTick()); - } - switch (state) { - case SyncState::idle: - state = SyncState::periodic; - TCPServer::instance->syncTryComplete(SyncState::periodic, - MsgType::cmdPeriodicSyncAck); - break; - case SyncState::asyncCkpt: - // An async ckpt request has already been sent to this client and - // that will interrupt this periodic sync. We can simply drop this - // message. - break; - default: - panic("Unexpected state for periodic sync request (rank:%d)", - rank); - break; - } + case MsgType::cmdExitReq: + DPRINTF(debugSync, "EXIT request (rank %d tick %lu)\n", + rank, hdr_pkt.sendTick); + if (needExit) + inform("Multiple exit request (rank %d tick %lu)\n", + rank, hdr_pkt.sendTick); + else + TCPServer::instance->numExitReq++; + needExit = true; break; - case MsgType::cmdCkptSyncReq: - DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank); - switch (state) { - case SyncState::idle: - TCPServer::instance->ckptPropagate(*this); - // we fall through here to complete #clients==1 case - case SyncState::asyncCkpt: - state = SyncState::ckpt; - TCPServer::instance->syncTryComplete(SyncState::ckpt, - MsgType::cmdCkptSyncAck); - break; - default: - panic("Unexpected state for ckpt sync request (rank:%d)", rank); - break; - } + case MsgType::cmdCkptReq: + DPRINTF(debugSync, "CKPT request (rank %d tick %lu)\n", + rank, hdr_pkt.sendTick); + if (needCkpt) + inform("Multiple ckpt request (rank:%d tick:%lu\n", + rank, hdr_pkt.sendTick); + else + TCPServer::instance->numCkptReq++; + needCkpt = true; break; default: panic("Unexpected header packet (rank:%d)",rank); @@ -226,7 +210,14 @@ TCPServer::TCPServer(unsigned clients_num, unsigned listen_port, - int timeout_in_sec) + int timeout_in_sec) : + nextSyncRepeat(std::numeric_limits::max()), + maxSyncReqTick(0), + isSameTickSync(false), + numSyncReq(0), + numCkptReq(0), + numExitReq(0), + numConnectedClients(0) { assert(instance == nullptr); construct(clients_num, listen_port, timeout_in_sec); @@ -284,7 +275,8 @@ new_pollfd.revents = 0; clientsPollFd.push_back(new_pollfd); new_channel.fd = new_sock; - new_channel.isAlive = true; + new_channel.connected = true; + // Get rank from the client new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); // Get initial network address new_channel.recvRaw(&new_channel.newAddress.first, @@ -309,16 +301,16 @@ TCPServer::run() { int nfd; - unsigned num_active_clients = clientsPollFd.size(); + numConnectedClients = clientsPollFd.size(); DPRINTF(debugSetup, "Entering run() loop\n"); - while (num_active_clients == clientsPollFd.size()) { + while (numConnectedClients > 0) { nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1); if (nfd == -1) panic("poll() failed:%s", strerror(errno)); for (unsigned i = 0, n = 0; - i < clientsPollFd.size() && (signed)n < nfd; + i < clientsPollFd.size() && (signed)n < nfd && numConnectedClients; i++) { struct pollfd &pfd = clientsPollFd[i]; if (pfd.revents) { @@ -328,25 +320,41 @@ clientsChannel[i].headerPktIn(); } if (pfd.revents & POLLRDHUP) { - // One gem5 process exited or aborted. Either way, we - // assume the full simulation should stop now (either - // because m5 exit was called or a serious error - // occurred.) So we quit the run loop here and close all - // sockets to notify the remaining peer gem5 processes. + // One gem5 process exited or aborted. pfd.events = 0; - clientsChannel[i].isAlive = false; - num_active_clients--; - DPRINTF(debugSetup, "POLLRDHUP event"); + if (!processExitEvent(clientsChannel[i])) { + numConnectedClients = 0; + break; + } else { + numConnectedClients--; + } } n++; - if ((signed)n == nfd) - break; } } } DPRINTF(debugSetup, "Exiting run() loop\n"); } +bool +TCPServer::processExitEvent(Channel &ch) +{ + + DPRINTF(debugSetup, "POLLRDHUP event (rank:%d, needExit:%d)\n", + ch.rank, (int)ch.needExit); + assert (ch.connected); + + // Sanity check + for (auto &c : clientsChannel) { + if ((ch.needExit != c.needExit) || c.isSync || c.needCkpt) + panic("Client %d closed connection unexpectedly whilst client %d" + "is active (needExit %d isSync %d needCkpt %d)", + ch.rank, c.rank, c.needExit, c.isSync, c.needCkpt); + } + ch.connected = false; + return ch.needExit; +} + void TCPServer::xferData(const Header &hdr_pkt, const Channel &src) { @@ -381,7 +389,7 @@ dst_info == addressMap.end()) { unsigned n = 0; for (auto const &c: clientsChannel) { - if (c.isAlive && &c!=&src) { + if (c.connected && (&c != &src)) { c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength); n++; @@ -393,68 +401,80 @@ } else { // It is a unicast address with a known destination Channel *dst = dst_info->second; - if (dst->isAlive) { + if ( dst->connected) { dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt)); dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength); - DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank); + DPRINTF(debugPkt, "Unicast packet sent (from/to rank %d/%d)\n", + src.rank, dst->rank); } else { - inform("Unicast packet dropped (destination exited)\n"); + inform("Unicast packet dropped - destination exited " + "(from/to rank %d/%d\n", src.rank, dst->rank); } } } void -TCPServer::syncTryComplete(SyncState st, MsgType ack) +TCPServer::syncProgress(Tick send_tick, Tick next_sync_repeat, + bool same_tick, unsigned rank) { - // Check if the barrieris complete. If so then notify all the clients. + DPRINTF(same_tick ? debugPeriodic : debugSync, + "Sync request rank:%d tick:%lu\n", rank, send_tick); + + assert(numSyncReq == 0 || (same_tick == isSameTickSync)); + assert(!same_tick || (numSyncReq == 0) || + (isSameTickSync && (maxSyncReqTick == send_tick))); + isSameTickSync = same_tick; + if (maxSyncReqTick < send_tick) + maxSyncReqTick = send_tick; + + if (next_sync_repeat < nextSyncRepeat) { + nextSyncRepeat = next_sync_repeat; + inform("Periodic sync repeat is adjusted to %lu\n", nextSyncRepeat); + } + assert(numSyncReq < numConnectedClients); + numSyncReq++; + if (numSyncReq < numConnectedClients) + return; + + // Sync complete, send out the acks + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = MsgType::cmdSyncAck; + hdr_pkt.maxSyncReqTick = maxSyncReqTick; + hdr_pkt.syncRepeat = nextSyncRepeat; + assert(numCkptReq <= numConnectedClients); + + bool coll_ckpt = (numCkptReq == numConnectedClients); + if (coll_ckpt) { + hdr_pkt.doCkpt = true; + numCkptReq = 0; + } else { + hdr_pkt.doCkpt = false; + } + + assert(numExitReq <= numConnectedClients); + if (numExitReq == numConnectedClients) { + hdr_pkt.doExit = true; + numExitReq = 0; + } else { + hdr_pkt.doExit = false; + } + for (auto &c : clientsChannel) { - if (c.isAlive && (c.state != st)) { - // sync not complete yet, stop here - return; + if (c.connected) { + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.isSync = false; + if (coll_ckpt) + c.needCkpt = false; + // Update the address map at each peridoc sync if the channel has + // new address + if (c.newAddress.second == true && isSameTickSync) + c.updateAddressMap(); } } - // Sync complete, send out the acks - MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = ack; - for (auto &c : clientsChannel) { - if (c.isAlive) { - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.state = SyncState::idle; - // Update the address map at each periodic sync if the channel has - // new address - if (c.newAddress.second == true && st == SyncState::periodic) - c.updateAddressMap(); - - } - } - // Reset periodic send tick - _periodicSyncTick = 0; - DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync, - "Sync COMPLETE\n"); -} - -void -TCPServer::ckptPropagate(Channel &ch) -{ - // Channel ch got a ckpt request that needs to be propagated to the other - // clients - MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = MsgType::cmdCkptSyncReq; - for (auto &c : clientsChannel) { - if (c.isAlive && (&c != &ch)) { - switch (c.state) { - case SyncState::idle: - case SyncState::periodic: - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.state = SyncState::asyncCkpt; - break; - default: - panic("Unexpected state for ckpt sync request propagation " - "(rank:%d)\n",c.rank); - break; - } - } - } + DPRINTF(isSameTickSync ? debugPeriodic : debugSync, "Sync COMPLETE\n"); + numSyncReq = 0; + maxSyncReqTick = 0; + isSameTickSync = false; } void @@ -465,9 +485,10 @@ panic("Got SIGTERM\n"); } -int main(int argc, char *argv[]) +int +main(int argc, char *argv[]) { - TCPServer *server; + TCPServer *server = nullptr; int clients_num = -1, listen_port = -1; int first_arg = 1, timeout_in_sec = 60;