diff --git a/src/dev/multi_iface.hh b/src/dev/multi_iface.hh --- a/src/dev/multi_iface.hh +++ b/src/dev/multi_iface.hh @@ -284,52 +284,168 @@ void serialize(CheckpointOut &cp) const; void unserialize(CheckpointIn &cp); }; + /** + * Class to encapsulate information about data packets received. - /** - * The receive thread needs to store the packet pointer and the computed - * receive tick for each incoming data packet. This information is used - * by the simulation thread when it processes the corresponding receive - * event. (See more comments at the implemetation of the recvThreadFunc() - * and RecvPacketIn() methods.) + * @note The main purpose of the class to take care of scheduling receive + * done events for the simulated network link and store incoming packets + * until they can be received by the simulated network link. */ - typedef std::pair RecvInfo; + class RecvScheduler : public Serializable + { + private: + /** + * Received packet descriptor. This information is used by the receive + * thread to schedule receive events and by the simulation thread to + * process those events. + */ + struct Desc : public Serializable { + EthPacketPtr packet; + Tick sendTick; + Tick sendDelay; + int sourceRank; - /** - * Comparison predicate for RecvInfo, needed by the recvQueue. - */ - struct RecvInfoCompare { - bool operator()(const RecvInfo &lhs, const RecvInfo &rhs) + Desc() : sendTick(0), sendDelay(0), sourceRank(-1) {} + Desc(EthPacketPtr p, Tick s, Tick d, int r) : + packet(p), sendTick(s), sendDelay(d), sourceRank(r) {} + Desc(const Desc &d) : + packet(d.packet), sendTick(d.sendTick), sendDelay(d.sendDelay), + sourceRank(d.sourceRank) {} + + void serialize(CheckpointOut &cp) const M5_ATTR_OVERRIDE; + void unserialize(CheckpointIn &cp) M5_ATTR_OVERRIDE; + }; + /** + * Comparison predicate for receive descriptors. + * + * @note Receive descriptors are stored in the ordered receive queue. + * The primary key for ordering is the send_tick+send_delay, the + * secondary is the rank of the sender. We use the secondary key to + * ensure reproducibility across multi-node gem5 runs (i.e. the ordering + * of the incoming packets are always well defined in the receive + * queue). + * It would be simpler to store the calculated receive tick in the + * descriptors and use that as primary key but unfortunately the + * receive tick may change as new packets are coming in out of order + * (and we have to maintain a big enough receive window, see + * calcReceiveTick() method). + */ + struct DescCompare { + bool operator()(const Desc &lhs, const Desc &rhs) + { + Tick lhs_key = lhs.sendTick + lhs.sendDelay; + Tick rhs_key = rhs.sendTick + rhs.sendDelay; + return ((lhs_key > rhs_key) || + (lhs_key == rhs_key && lhs.sourceRank > rhs.sourceRank)); + } + }; + /** + * Customized priority queue used to store incoming data packets + * descriptors by the receiver thread. We need to expose the underlying + * container to enable iterator access for serializing. + */ + class DescQueue : public std::priority_queue, + DescCompare> { - return lhs.second > rhs.second; - } - }; + public: + std::vector &impl() { return c; } + const std::vector &impl() const { return c; } + }; + /** + * The priority queue to store the receive descriptors. + */ + DescQueue descQueue; + /** + * The tick when the most recent receive event was processed. + * + * @note This information is necessary to simulate possible receiver + * link contention when calculating the receive tick for the next + * incoming data packet (see the calcReceiveTick() method) + */ + Tick prevRecvTick; + /** + * The receive done event for the simulated Ethernet link. + * + * @note This object is constructed by the simulated network link. We + * schedule this object for each incoming data packet. + */ + Event *recvDone; + /** + * The link delay in ticks for the simulated Ethernet link. + * + * @note This value is used for calculating the receive ticks for an + * incoming data packets. + */ + Tick linkDelay; + /** + * The event manager associated with the simulated Ethernet link. + * + * @note It is used to access the event queue for scheduling receive + * done events for the link. + */ + EventManager *eventManager; + /** + * Calculate the tick to schedule the next receive done event. + * + * @param send_tick The tick the packet was sent. + * @param send_delay The simulated delay at the sender side. + * @param prev_recv_tick Tick when the last receive event was + * processed. + * + * @note This method tries to take into account possible receiver link + * contention and adjust receive tick for the incoming packets + * accordingly. + */ + Tick calcReceiveTick(Tick send_tick, + Tick send_delay, + Tick prev_recv_tick, + unsigned source_rank); - /** - * Customized priority queue used to store incoming data packets info by - * the receiver thread. We need to expose the underlying container to - * enable iterator access for serializing. - */ - class RecvQueue : public std::priority_queue, - RecvInfoCompare> - { public: - std::vector &impl() { return c; } - const std::vector &impl() const { return c; } - }; + /** + * Scheduler for the incoming data packets. + * + * @param em The event manager associated with the simulated Ethernet + * link. + */ + RecvScheduler(EventManager *em) : + prevRecvTick(0), recvDone(nullptr), linkDelay(0), + eventManager(em) {} - /* - * The priority queue to store RecvInfo items ordered by receive ticks. - */ - RecvQueue recvQueue; - /** - * The singleton Sync object to perform multi synchronisation. - */ - static Sync *sync; - /** - * The singleton SyncEvent object to schedule periodic multi sync. - */ - static SyncEvent *syncEvent; + /** + * Initialize network link parameters. + * + * @note This method is called from the receiver thread (see + * recvThreadFunc()). + */ + void init(Event *recv_done, Tick link_delay); + /** + * Fetch the next packet that is to be received by the simulated network + * link. + * + * @note This method is called from the process() method of the receive + * done event associated with the network link. + */ + EthPacketPtr popPacket(); + /** + * Push a newly arrived packet into the desc queue. + */ + void pushPacket(EthPacketPtr new_packet, + Tick send_tick, + Tick send_delay, + unsigned source_rank); + + void serialize(CheckpointOut &cp) const M5_ATTR_OVERRIDE; + void unserialize(CheckpointIn &cp) M5_ATTR_OVERRIDE; + /** + * Adjust receive ticks for pending packets when resuming from a + * checkpoint + * + * @note Link speed and delay parameters may change at resume. + */ + void resumeRecvTicks(); + }; /** * Tick to schedule the first multi sync event. * This is just as optimization : we do not need any multi sync @@ -346,27 +462,11 @@ */ std::thread *recvThread; /** - * The event manager associated with the MultiIface object. + * Meta information about data packets received. */ - EventManager *eventManager; + RecvScheduler recvScheduler; - /** - * The receive done event for the simulated Ethernet link. - * It is scheduled by the receiver thread for each incoming data - * packet. - */ - Event *recvDone; - - /** - * The packet that belongs to the currently scheduled recvDone event. - */ - EthPacketPtr scheduledRecvPacket; - - /** - * The link delay in ticks for the simulated Ethernet link. - */ - Tick linkDelay; - + protected: /** * The rank of this process among the gem5 peers. */ @@ -377,6 +477,12 @@ */ unsigned size; /** + * The network address of the associated link. + */ + MultiHeaderPkt::AddressType networkAddress; + + private: + /** * Total number of receiver threads (in this gem5 process). * During the simulation it should be constant and equal to the * number of MultiIface objects (i.e. simulated Ethernet @@ -384,12 +490,20 @@ */ static unsigned recvThreadsNum; /** + * The singleton Sync object to perform multi synchronisation. + */ + static Sync *sync; + /** + * The singleton SyncEvent object to schedule periodic multi sync. + */ + static SyncEvent *syncEvent; + /** * The very first MultiIface object created becomes the master. We need * a master to co-ordinate the global synchronisation. */ static MultiIface *master; - protected: + private: /** * Low level generic send routine. * @param buf buffer that holds the data to send out @@ -416,9 +530,19 @@ */ virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0; /** + * Initialize hook for the underlying messaging system. + + * @note This method must be called during startup(). It gives the + * underlying messaging system a chance to access up-to-date information + * from the base class before the simulation starts (e.g. networkAddress + * restored from a checkpoint). + */ + virtual void initRaw() = 0; + + /** * The function executed by a receiver thread. */ - void recvThreadFunc(); + void recvThreadFunc(Event *recv_done, Tick link_delay); /** * Receive a multi header packet. Called by the receiver thread. * @param header the structure to store the incoming header packet. @@ -459,18 +583,21 @@ */ void packetOut(EthPacketPtr pkt, Tick send_delay); /** - * Fetch the next packet from the receive queue. + * Fetch the packet scheduled to be received next by the simulated + * network link. + * + * @note This method is called within the process() method of the link + * receive done event. It also schedules the next receive event if the + * receive queue is not empty. */ - EthPacketPtr packetIn(); - + EthPacketPtr packetIn() { return recvScheduler.popPacket(); } /** * spawn the receiver thread. * @param recv_done The receive done event associated with the simulated * Ethernet link. * @param link_delay The link delay for the simulated Ethernet link. */ - void spawnRecvThread(Event *recv_done, - Tick link_delay); + void spawnRecvThread(Event *recv_done, Tick link_delay); /** * Initialize the random number generator with a different seed in each * peer gem5 process. # Node ID 9c94d22af1e795cb94c185eccf936fd8ef90ac41 # Parent bad3fbf4a977e4670a338853110311c908b0a69c diff --git a/src/dev/multi_iface.cc b/src/dev/multi_iface.cc --- a/src/dev/multi_iface.cc +++ b/src/dev/multi_iface.cc @@ -307,17 +307,189 @@ UNSERIALIZE_SCALAR(scheduledAt); } +void +MultiIface::RecvScheduler::init(Event *recv_done, Tick link_delay) +{ + // This is called from the receiver thread when it starts running. The new + // receiver thread shares the event queue with the simulation thread + // (associated with the simulated Ethernet link). + curEventQueue(eventManager->eventQueue()); + + recvDone = recv_done; + linkDelay = link_delay; +} + +Tick +MultiIface::RecvScheduler::calcReceiveTick(Tick send_tick, + Tick send_delay, + Tick prev_recv_tick, + unsigned source_rank) +{ + Tick recv_tick = send_tick + send_delay + linkDelay; + // Adjust the receive tick if we do not have a big enough receive window. + // This gives us a very raw simulation of (receive) link contention. + if (prev_recv_tick + send_delay > recv_tick) { + DPRINTF(MultiEthernetPkt, "MultiIface::caclReceiveTick() " + "recv_tick %lu adjusted to %lu (prev_recv_tick: %lu " + "send_delay: %lu)\n", + recv_tick, prev_recv_tick + send_delay, prev_recv_tick, + send_delay); + recv_tick = prev_recv_tick + send_delay; + } + if (recv_tick <= curTick()) { + panic("Simulators out of sync - missed packet receive by %llu ticks" + "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " + "linkDelay: %lu source_rank: %d)", + curTick() - recv_tick, prev_recv_tick, send_tick, send_delay, + linkDelay, source_rank); + } + return recv_tick; +} + +void +MultiIface::RecvScheduler::resumeRecvTicks() +{ + // Schedule pending packets asap in case link speed/delay changed when + // resuming from the checkpoint. + // This may be done during unserialize except that curTick() is unknown. + std::vector v; + while (!descQueue.empty()) { + Desc d = descQueue.top(); + descQueue.pop(); + d.sendTick = curTick(); + d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed + v.push_back(d); + } + + for (auto &d : v) + descQueue.push(d); + + if (recvDone->scheduled()) { + assert(!descQueue.empty()); + eventManager->reschedule(recvDone, curTick()); + } else { + assert(descQueue.empty() && v.empty()); + } +} + +void +MultiIface::RecvScheduler::pushPacket(EthPacketPtr new_packet, + Tick send_tick, + Tick send_delay, + unsigned source_rank) +{ + // Note : this is called from the receiver thread + curEventQueue()->lock(); + Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick, + source_rank); + + DPRINTF(MultiEthernetPkt, "MultiIface::recvScheduler::pushPacket " + "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", + send_tick, send_delay, linkDelay, recv_tick); + // Every packet must be sent and arrive in the same quantum + assert(send_tick > master->syncEvent->when() - + master->syncEvent->repeat); + // No packet may be scheduled for receive in the arrivel quantum + assert(send_tick + send_delay + linkDelay > master->syncEvent->when()); + + // Now we are about to schedule a recvDone event for the new data packet. + // We use the same recvDone object for all incoming data packets. Packet + // descriptors are saved in the ordered queue. The currently scheduled + // packet is always on the top of the queue. + // NOTE: we use the event queue lock to protect the receive desc queue, + // too, which is accessed both by the receiver thread and the simulation + // thread. + descQueue.emplace(new_packet, send_tick, send_delay, source_rank); + if (descQueue.size() > 1) { + assert(recvDone->scheduled()); + if (descQueue.top().packet == new_packet) + eventManager->reschedule(recvDone, recv_tick); + } else { + assert(!recvDone->scheduled()); + eventManager->schedule(recvDone, recv_tick); + } + curEventQueue()->unlock(); +} + +EthPacketPtr +MultiIface::RecvScheduler::popPacket() +{ + // Note : this is called from the simulation thread when a receive done + // event is being processed for the link. We assume that the thread holds + // the event queue queue lock when this is called! + EthPacketPtr next_packet = descQueue.top().packet; + descQueue.pop(); + + if (descQueue.size() > 0) { + Tick recv_tick = calcReceiveTick(descQueue.top().sendTick, + descQueue.top().sendDelay, + curTick(), + descQueue.top().sourceRank); + eventManager->schedule(recvDone, recv_tick); + } + prevRecvTick = curTick(); + return next_packet; +} + +void +MultiIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const +{ + SERIALIZE_SCALAR(sendTick); + SERIALIZE_SCALAR(sendDelay); + SERIALIZE_SCALAR(sourceRank); + packet->serialize("rxPacket", cp); +} + +void +MultiIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp) +{ + UNSERIALIZE_SCALAR(sendTick); + UNSERIALIZE_SCALAR(sendDelay); + UNSERIALIZE_SCALAR(sourceRank); + packet = std::make_shared(16384); + packet->unserialize("rxPacket", cp); +} + +void +MultiIface::RecvScheduler::serialize(CheckpointOut &cp) const +{ + SERIALIZE_SCALAR(prevRecvTick); + // serialize the receive desc queue + unsigned n_desc_queue = descQueue.size(); + SERIALIZE_SCALAR(n_desc_queue); + for (int i = 0; i < n_desc_queue; i++) { + descQueue.impl().at(i).serializeSection(cp, csprintf("rxDesc_%d", i)); + } +} + +void +MultiIface::RecvScheduler::unserialize(CheckpointIn &cp) +{ + assert(descQueue.size() == 0); + assert(recvDone->scheduled() == false); + + UNSERIALIZE_SCALAR(prevRecvTick); + // unserialize the receive desc queue + unsigned n_desc_queue; + UNSERIALIZE_SCALAR(n_desc_queue); + for (int i = 0; i < n_desc_queue; i++) { + Desc recv_desc; + recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i)); + descQueue.push(recv_desc); + } +} + MultiIface::MultiIface(unsigned multi_rank, unsigned multi_size, Tick sync_start, Tick sync_repeat, EventManager *em) : syncStart(sync_start), syncRepeat(sync_repeat), - recvThread(nullptr), eventManager(em), recvDone(nullptr), - scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank), - size(multi_size) + recvThread(nullptr), recvScheduler(em), + rank(multi_rank), size(multi_size) { DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank); + MultiHeaderPkt::clearAddress(networkAddress); if (master == nullptr) { assert(sync == nullptr); assert(syncEvent == nullptr); @@ -336,6 +508,7 @@ delete syncEvent; assert(sync); delete sync; + master = nullptr; } } @@ -350,6 +523,7 @@ header_pkt.msgType = MsgType::dataDescriptor; header_pkt.sendTick = curTick(); header_pkt.sendDelay = send_delay; + header_pkt.senderRank = rank; // Store also the source and destination addresses. pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress, @@ -357,6 +531,10 @@ header_pkt.dataPacketLength = pkt->size(); + // update our network address + if (!MultiHeaderPkt::isAddressEqual(header_pkt.srcAddress, networkAddress)) + MultiHeaderPkt::copyAddress(networkAddress, header_pkt.srcAddress); + // Send out the multi hedare packet followed by the Ethernet packet. sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress); sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress); @@ -394,62 +572,35 @@ panic("Missing data packet"); new_packet->length = header_pkt.dataPacketLength; - // Grab the event queue lock to schedule a new receive event for the - // data packet. - curEventQueue()->lock(); - // Compute the receive tick. It includes the send delay and the - // simulated link delay. - Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay; - DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, " - "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", - header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick); - if (recv_tick <= curTick()) { - panic("Simulators out of sync - missed packet receive by %llu ticks", - curTick() - recv_tick); - } - // Now we are about to schedule a recvDone event for the new data packet. - // We use the same recvDone object for all incoming data packets. If - // that is already scheduled - i.e. a receive event for a previous - // data packet is already pending - then we have to check whether the - // receive tick for the new packet is earlier than that of the currently - // pending event. Packets may arrive out-of-order with respect to - // simulated receive time. If that is the case, we need to re-schedule the - // recvDone event for the new packet. Otherwise, we save the packet - // pointer and the recv tick for the new packet in the recvQueue. See - // the implementation of the packetIn() method for comments on how this - // information is retrieved from the recvQueue by the simulation thread. - if (!recvDone->scheduled()) { - assert(recvQueue.size() == 0); - assert(scheduledRecvPacket == nullptr); - scheduledRecvPacket = new_packet; - eventManager->schedule(recvDone, recv_tick); - } else if (recvDone->when() > recv_tick) { - recvQueue.emplace(scheduledRecvPacket, recvDone->when()); - eventManager->reschedule(recvDone, recv_tick); - scheduledRecvPacket = new_packet; - } else { - recvQueue.emplace(new_packet, recv_tick); - } - curEventQueue()->unlock(); + // Schedule a new receive event for the data packet. + recvScheduler.pushPacket(new_packet, + header_pkt.sendTick, + header_pkt.sendDelay, + header_pkt.senderRank); + } void -MultiIface::recvThreadFunc() +MultiIface::recvThreadFunc(Event *recv_done, Tick link_delay) { EthPacketPtr new_packet; MultiHeaderPkt::Header header; - // The new receiver thread shares the event queue with the simulation - // thread (associated with the simulated Ethernet link). - curEventQueue(eventManager->eventQueue()); + // Initialize receive scheduler parameters + recvScheduler.init(recv_done, link_delay); + // Main loop to wait for and process any incoming message. for (;;) { // recvHeader() blocks until the next multi header packet comes in. if (!recvHeader(header)) { // We lost connection to the peer gem5 processes most likely // because one of them called m5 exit. So we stop here. - exit_message("info", 0, "Message server closed connection, " + // Grab the eventq lock to stop the simulation thread + curEventQueue()->lock(); + exit_message("info", + 0, + "Message server closed connection, " "simulation is exiting"); } // We got a valid multi header packet, let's process it @@ -462,41 +613,15 @@ } } -EthPacketPtr -MultiIface::packetIn() -{ - // We are called within the process() method of the recvDone event. We - // return the packet that triggered the current receive event. - // If there is further packets in the recvQueue, we also have to schedule - // the recvEvent for the next packet with the smallest receive tick. - // The priority queue container ensures that smallest receive tick is - // always on the top of the queue. - assert(scheduledRecvPacket != nullptr); - EthPacketPtr next_packet = scheduledRecvPacket; - - if (! recvQueue.empty()) { - eventManager->schedule(recvDone, recvQueue.top().second); - scheduledRecvPacket = recvQueue.top().first; - recvQueue.pop(); - } else { - scheduledRecvPacket = nullptr; - } - - return next_packet; -} - void MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay) { assert(recvThread == nullptr); - // all receive thread must be spawned before simulation starts - assert(eventManager->eventQueue()->getCurTick() == 0); - recvDone = recv_done; - linkDelay = link_delay; - - recvThread = new std::thread(&MultiIface::recvThreadFunc, this); - + recvThread = new std::thread(&MultiIface::recvThreadFunc, + this, + recv_done, + link_delay); recvThreadsNum++; } @@ -523,6 +648,7 @@ if (syncEvent->interrupted) syncEvent->resume(); } + recvScheduler.resumeRecvTicks(); } void @@ -536,55 +662,23 @@ // This sync request may interrupt an on-going periodic sync in some peers. sync->run(SyncTrigger::ckpt, curTick()); - unsigned n_rx_packets = recvQueue.size(); - if (scheduledRecvPacket != nullptr) - n_rx_packets++; - - SERIALIZE_SCALAR(n_rx_packets); - - if (n_rx_packets > 0) { - assert(recvDone->scheduled()); - scheduledRecvPacket->serialize("rxPacket[0]", cp); - } - - for (unsigned i=1; i < n_rx_packets; i++) { - const RecvInfo recv_info = recvQueue.impl().at(i-1); - recv_info.first->serialize(csprintf(".rxPacket[%d]", i), cp); - Tick rx_tick = recv_info.second; - paramOut(cp, csprintf("rxTick[%d]", i), rx_tick); - } + SERIALIZE_ARRAY(networkAddress, sizeof(networkAddress)); + recvScheduler.serializeSection(cp, "recvScheduler"); // Save the periodic multi sync status - syncEvent->serializeSection(cp, "syncEvent"); + if (this == master) + syncEvent->serializeSection(cp, "syncEvent"); } void MultiIface::unserialize(CheckpointIn &cp) { - assert(recvQueue.size() == 0); - assert(scheduledRecvPacket == nullptr); - assert(recvDone->scheduled() == false); - - unsigned n_rx_packets; - UNSERIALIZE_SCALAR(n_rx_packets); - - if (n_rx_packets > 0) { - scheduledRecvPacket = std::make_shared(16384); - scheduledRecvPacket->unserialize("rxPacket[0]", cp); - // Note: receive event will be scheduled when the link is unserialized - } - - for (unsigned i=1; i < n_rx_packets; i++) { - EthPacketPtr rx_packet = std::make_shared(16384); - rx_packet->unserialize(csprintf(".rxPacket[%d]", i), cp); - Tick rx_tick = 0; - paramIn(cp, csprintf(".rxTick[%d]", i), rx_tick); - assert(rx_tick > 0); - recvQueue.emplace(rx_packet,rx_tick); - } + UNSERIALIZE_ARRAY(networkAddress, sizeof(networkAddress)); + recvScheduler.unserializeSection(cp, "recvScheduler"); // restore periodic sync info - syncEvent->unserializeSection(cp, "syncEvent"); + if (this == master) + syncEvent->unserializeSection(cp, "syncEvent"); } void MultiIface::initRandom() diff --git a/src/dev/multi_packet.hh b/src/dev/multi_packet.hh --- a/src/dev/multi_packet.hh +++ b/src/dev/multi_packet.hh @@ -94,6 +94,7 @@ MsgType msgType; Tick sendTick; Tick sendDelay; + unsigned senderRank; /** * Actual length of the simulated Ethernet packet. */ diff --git a/src/dev/tcp_iface.hh b/src/dev/tcp_iface.hh --- a/src/dev/tcp_iface.hh +++ b/src/dev/tcp_iface.hh @@ -96,21 +96,25 @@ protected: - virtual void - sendRaw(void *buf, unsigned length, - const MultiHeaderPkt::AddressType dest_addr=nullptr) - M5_ATTR_OVERRIDE + void + sendRaw(void *buf, + unsigned length, + const MultiHeaderPkt::AddressType dest_addr=nullptr) M5_ATTR_OVERRIDE { sendTCP(sock, buf, length); } - virtual bool recvRaw(void *buf, unsigned length) M5_ATTR_OVERRIDE + bool + recvRaw(void *buf, unsigned length) M5_ATTR_OVERRIDE { return recvTCP(sock, buf, length); } - virtual void syncRaw(MultiHeaderPkt::MsgType sync_req, - Tick sync_tick) M5_ATTR_OVERRIDE; + void + syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) M5_ATTR_OVERRIDE; + + void + initRaw() M5_ATTR_OVERRIDE; public: /** diff --git a/src/dev/tcp_iface.cc b/src/dev/tcp_iface.cc --- a/src/dev/tcp_iface.cc +++ b/src/dev/tcp_iface.cc @@ -54,6 +54,7 @@ #include "base/types.hh" #include "debug/MultiEthernet.hh" +#include "sim/sim_exit.hh" // MSG_NOSIGNAL does not exists on OS X #if defined(__APPLE__) || defined(__MACH__) @@ -99,8 +100,6 @@ freeaddrinfo(addr_results); // add our socket to the static registry sockRegistry.push_back(sock); - // let the server know who we are - sendTCP(sock, &multi_rank, sizeof(multi_rank)); } TCPIface::~TCPIface() @@ -112,12 +111,31 @@ } void +TCPIface::initRaw() +{ + // let the server know who we are + sendTCP(sock, &rank, sizeof(rank)); + // Let the server know our network address. This is necessary for + // deterministic execution after restoring from a checkpoint - that's why + // we do not send this info right in the ctor. + sendTCP(sock, &networkAddress, sizeof(networkAddress)); +} + +void TCPIface::sendTCP(int sock, void *buf, unsigned length) { ssize_t ret; ret = ::send(sock, buf, length, MSG_NOSIGNAL); - panic_if(ret < 0, "send() failed: %s", strerror(errno)); + if (ret < 0) { + if (errno == ECONNRESET || errno == EPIPE) { + inform("send(): %s", strerror(errno)); + exit_message("info", 0, "Message server closed connection, " + "simulation is exiting"); + } else { + panic("send() failed: %s", strerror(errno)); + } + } panic_if(ret != length, "send() failed"); } diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh --- a/util/multi/tcp_server.hh +++ b/util/multi/tcp_server.hh @@ -104,11 +104,6 @@ { private: /** - * The MAC address of the client. - */ - AddressType address; - - /** * Update the client MAC address. It is called every time a new data * packet is to come in. */ @@ -136,6 +131,19 @@ * Multi rank of the client */ unsigned rank; + /** + * The MAC address of the client. + */ + AddressType address; + /** + * New network address of the client. + * + * @note For the sake of deterministic runs, we do not use an updated + * address until the next periodic sync completes. That avoids race + * conditions regarding broadcast/unicast data packets. + */ + typedef std::pair AddressInfo; + AddressInfo newAddress; public: Channel(); @@ -154,6 +162,12 @@ */ void sendRaw(const void *data, unsigned size) const; /** + * Register an address change in the address map. + * + * @note This method is only called when a periodic sync completes + */ + void updateAddressMap(); + /** * Receive raw data from the connected client. * * @param buf The buffer to store the incoming data into. @@ -186,7 +200,6 @@ } }; std::map addressMap; - /** * As we dealt with only one message at a time, we can allocate and re-use * a single packet buffer (to hold any incoming data packet). @@ -200,7 +213,6 @@ * The singleton server object. */ static TCPServer *instance; - /** * Set up the socket connections to all the clients. * @@ -251,4 +263,8 @@ * The main server loop that waits for and processes incoming messages. */ void run(); + /** + * The singleton server object. + */ + static TCPServer *getInstance() { return instance; } }; diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc --- a/util/multi/tcp_server.cc +++ b/util/multi/tcp_server.cc @@ -50,6 +50,7 @@ #include #include +#include #include #include @@ -84,6 +85,7 @@ TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) { MultiHeaderPkt::clearAddress(address); + newAddress.second = false; } unsigned @@ -113,23 +115,32 @@ panic("write() failed"); } -void TCPServer::Channel::updateAddress(const AddressType &new_address) +void +TCPServer::Channel::updateAddress(const AddressType &new_address) { // check if the known address has changed (e.g. the client reconfigured // its Ethernet NIC) if (MultiHeaderPkt::isAddressEqual(address, new_address)) return; + MultiHeaderPkt::copyAddress(newAddress.first, new_address); + newAddress.second = true; +} + +void +TCPServer::Channel::updateAddressMap() +{ + assert(newAddress.second); // So we have to update the address. Note that we always // store the same address as key in the map but the ordering // may change so we need to erase and re-insert it again. - auto info = TCPServer::instance->addressMap.find(&address); - if (info != TCPServer::instance->addressMap.end()) { - TCPServer::instance->addressMap.erase(info); + auto m = TCPServer::instance->addressMap.find(&address); + if (m != TCPServer::instance->addressMap.end()) { + TCPServer::instance->addressMap.erase(m); } - - MultiHeaderPkt::copyAddress(address, new_address); + MultiHeaderPkt::copyAddress(address, newAddress.first); TCPServer::instance->addressMap[&address] = this; + newAddress.second = false; } void @@ -275,6 +286,13 @@ new_channel.fd = new_sock; new_channel.isAlive = true; new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); + // Get initial network address + new_channel.recvRaw(&new_channel.newAddress.first, + sizeof(new_channel.newAddress.first)); + new_channel.newAddress.second = + !MultiHeaderPkt::isAddressEqual(new_channel.address, + new_channel.newAddress.first); + clientsChannel.push_back(new_channel); DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n", @@ -402,6 +420,11 @@ if (c.isAlive) { c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); c.state = SyncState::idle; + // Update the address map at each periodic sync if the channel has + // new address + if (c.newAddress.second == true && st == SyncState::periodic) + c.updateAddressMap(); + } } // Reset periodic send tick @@ -434,6 +457,14 @@ } } +void +sigtermHandler(int sig) +{ + if (TCPServer::getInstance() != nullptr) + delete TCPServer::getInstance(); + panic("Got SIGTERM\n"); +} + int main(int argc, char *argv[]) { TCPServer *server; @@ -450,6 +481,8 @@ panic("We need two command line args (number of clients and tcp listen" " port"); + signal(SIGTERM, sigtermHandler); + clients_num = atoi(argv[first_arg]); listen_port = atoi(argv[first_arg + 1]);