{ unit for a Pascal simulation environment, Object Oriented Turbo Pascal 6.0} { THIS UNIT is the PROCESS MESSAGE-PASSING SYSTEM} { Author : Lin Jensen, Bishop's University Date : Feb. 15, 1991 Rev. : October 18, 1996, for global port reporting} UNIT MESSAGE; INTERFACE USES COPROC, QUEUE, SIMOBJ, PROC_MAN, STATOBJ; { =================================================================== Message passing is built on top of context switching and process management, using semaphores: A Non_Blocking SEND leaves a message at a PORT. The message (type note) consists of the Senders Pid (process identifier), -- and you can create your own object descended from note with whatever fields are appropriate RECEIVE receives messages at a port. A process may block on that port's semaphore until a message is available. IT IS UP TO THE PROCESSES TO HAVE A MESSAGE PASSING PROTOCOL, AND TO MAKE SENSE OF EACH OTHER'S MESSAGES. ===================================================================== } TYPE notePtr = ^note; note = OBJECT (QueueElement) {you might create a larger note as appropriate} sender : process; end; {for example---- firenote = object (note) thefire : fire; where fire points to a fire process end; } port = ^portdescr; portdescr = OBJECT (aSimObject) messages : FifoQueue; {holds the messages} messageThere : semaphore; {holds waiting processes or counts waiting messages} stat : measure; {for report on message queue length} constructor init; procedure send (notice : notePtr); {senders pid added} function receive : notePtr; procedure reset; virtual; {reset asociated statistics} procedure report; virtual; {report on queue length stats.} destructor dissolve; {frees up processes, discards messages} private function Get : notePtr; end; IMPLEMENTATION constructor portdescr.init; begin aSimObject.init('message port'); {prepare for reporter's report} messages.init; NEW (messageThere, SubInit (0)); {initialize semaphore for message availability} stat.subinit; stat.specificlabel ('port'); messageThere^.specificlabel ('port'); end; procedure portdescr.send (notice : notePtr); {senders pid added} begin with notice^ do {add senders address to note} sender := GetPid; messages.tail_insert (notice); {notice to port} stat.update (1); MessageThere^.signal; {enable receiving} end; {send} function portdescr.Get : notePtr; begin Get := notePtr(messages.Get) {TypeCast pointer to correct type} end; function portdescr.receive : notePtr; {blocks till message there, then gives pointer to return address and contents} var notice : notePtr; begin MessageThere^.wait; {block until a message is available} receive := Get; {get the first message in queue, now it exists} stat.update (-1); end; {receive} procedure portdescr.reset; begin stat.reset; {message stats} messageThere^.reset; {receiver process stats} end; procedure portdescr.report; begin aSimObject.report; Writeln('MESSAGES have waited:'); stat.report; Writeln ('Mean Delay = ', stat.DelayQ :7:2); Writeln; Writeln ('And Receiving PROCESSES have waited:'); messageThere^.report; {report on associated semaphore} end; destructor portdescr.dissolve; {frees up processes, discards messages} begin dispose(messagethere, dissolve); {semaphore closed, any waiting ->ready} messages.destroy; {discard all messages} aSimObject.Destroy; {no more can there be a report!} end; {no initializations} END.