Interprocess Communication =========================== Applications: 1. One huge moonolithic program does everything Functions are exchange information via parameters 2. Multiple programs that communicate with each other IPC, like shell pipelines 3. One program with multiply threads IPC (even though it is between threads and not between processes) Tools: 1. message passing (pipes, FIFOs, message queues) 2. synchronization (mutexes, cond. variables, locks, semaphores) 3. shared memory 4. RPC History: - Early programs used filesystem tricks - Record locking (88) - System V message queues, shared memory, semaphores (88) - Posix message queues, shared memory, semaphores (93) - Mutexes, cond.variables (95) - Read-write locks - Threads (pthreads: 95) ____ ____ ____ ____ ____ ____ ____ | | | | | | | | | | | | | | |proc| |proc| |proc| |proc| |proc| |shm | |proc| |____| |____| |____| |____| |____| |____| |____| | | | | |_____| |____| | | | | ---+-------+------------+-----+------------------------------- | | | | +-pipe | | shared info-+-message queue | | +-semaphore ---+-------+------------------------------------------------- | | | | filesystem Persistence: - process-persistent IPC: exists until last process with IPC object closes the object (pipes, FIFO) - kernel-persistent IPC: exists until kernel reboots or IPC object is explicitly deleted (message queues, semaphores, shared memory) - filesystem-persistent IPC: exists until IPC object is explicitly deleted (message queues, semaphore, shared memory implemented mapped files) Names: Pipe no name descriptor FIFO pathname descriptor Posix mutex no name pthread_mutex_t * Posix cond.variable no name pthread_cond_t * Poxix read-write lock no name pthread_rwlock_t * fcntl record lock pathname descriptor Posix message queue Posix IPC name mqd_t (value) Posix named semaphore Posix IPC name sem_t * Posix memory-based sem. no name sem_t * Posix shared memory Posix IPC name descriptor System V. message queue Sys.V. IPC id key_t (key) System V. semaphore Sys.V. IPC id key_t (key) System V. shared memory Sys.V. IPC id key_t (key) TCP socket IP addr + TCP port descriptor UDP socket IP addr + UDP port descriptor UNIX domain socket pathname descriptor POSIX IPC ========== Functions: message queue semaphore shared memory <mqueue.h> <semaphore.h> <sys/mman.h> mq_open sem_open shm_open mq_close sem_close mq_unlink sm_unlink shm_unlink mq_getattr ftruncate mq_setaddr fstat mq_send sem_wait mmap mq_receive sem_trywait munmap mq_notify sem_post sem_getvalue Names: Conform to pathnames. If starts with "/", than the same for all processes, otherwise implementation dependent. Flags: <sys/stat.h> mq_open sem_open shm_open read-only O_RDONLY O_RDONLY write-only O_WRONLY read-write O_RDWR O_RDWR create O_CREAT O_CREAT O_CREAT exclusive O_EXCL O_EXCL O_EXCL nonblocking O_NONBLOCK truncate O_TRUNC On create the user modes are required: S_IRUSR, S_IWUSR, S_IRGRP, S_IWGRP, S_IROTH, S_IWOTH When a new object is created the effective user id, group id are set like for files. If O_EXCL is set and the object is exists than the call fails. SYSTEM V. IPC ============= Functions: message queue semaphore shared memory <sys/msg.h> <sys/sem.h> <sys/shm.h> msgget semget shmget msgctl semctl shmctl msgsnd semop shmat msgrcv smhdt Names: NAME ftok - convert a pathname and a project identifier to a System V IPC key SYNOPSIS # include <sys/types.h> # include <sys/ipc.h> key_t ftok(const char *pathname, int proj_id); The server and the client(s) agree with a pathname, and create keys with the proj_id. Typical implementation combine filesystem info + i-node number of the pathname + 8 bits from id. IPC_PRIVATE generates a unique IPC object. List and remove: $ ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00000000 2359297 gsd 600 393216 2 dest 0x00000000 2490370 gsd 600 393216 2 dest 0x00000000 2162691 gsd 600 393216 2 dest 0x00000000 2195460 gsd 600 393216 2 dest 0x00000000 2228229 gsd 600 393216 2 dest 0x00000000 2260998 gsd 600 393216 2 dest 0x00000000 2293767 gsd 600 393216 2 dest 0x00000000 2523144 gsd 600 393216 2 dest 0x00000000 2555913 gsd 600 393216 2 dest 0x00000000 2588682 gsd 600 393216 2 dest 0x00000000 2621451 gsd 600 393216 2 dest 0x00000000 2654220 gsd 600 393216 2 dest 0x00000000 2686989 gsd 600 393216 2 dest 0x00000000 2719758 gsd 600 393216 2 dest 0x00000000 2752527 gsd 600 393216 2 dest ------ Semaphore Arrays -------- key semid owner perms nsems 0x4d02dc3c 0 gsd 600 8 ------ Message Queues -------- key msqid owner perms used-bytes messages NAME ipcrm - remove an XSI message queue, semaphore set, or shared memory segment identifier SYNOPSIS ipcrm [ -q msgid | -Q msgkey | -s semid | -S semkey | -m shmid | -M shmkey ] ... PIPES ===== NAME pipe - create pipe SYNOPSIS #include <unistd.h> int pipe(int filedes[2]); filedes[0]: read filedes[1]: write fd[1] --> pipe --> fd[0] Full-duplex pipe ================ NAME socketpair - create a pair of connected sockets SYNOPSIS #include <sys/types.h> #include <sys/socket.h> int socketpair(int d, int type, int protocol, int sv[2]); Its a full-duplex pipe, so fd[0] reads/writes from/to fd[1] fd[0] <--> pipe <--> fd[1] Popen and pclose ================ NAME popen, pclose - process I/O SYNOPSIS #include <stdio.h> FILE *popen(const char *command, const char *type); int pclose(FILE *stream); Creates a new process (with fork+exec) and if type is - "r" the calling process reads the standard output of the command - "w" the calling process writes the standard input of the command FIFO ==== $ mkfifo [OPTION] NAME... NAME mkfifo - make a FIFO special file (a named pipe) SYNOPSIS #include <sys/types.h> #include <sys/stat.h> int mkfifo(const char *pathname, mode_t mode); int open(const char *pathname, int flags); int close(int fd); Set the nonblocking flag: writefd = open( fifoname, O_WRONLY|O_NONBLOCK, 0); vagy int flags; if ( (flags = fcntl( fd, F_GETFL, 0)) < 0 ) error(); flags |= O_NONBLOCK; if ( (flags = fcntl( fd, F_SETFL, 0)) < 0 ) error(); If there is one server and more clients: The clienst write into the same FIFO and the server writes to the cliens' FIFOs. POSIX message queues ==================== msg1(pr1) <-- msg2(pr2) <-- ... <-- msgN(prN) Difference between messages and pipes: no need for reader when a message is written. The writer can exit and a reader later reads the message. (Message queues are kernel-persistent). Difference between Posix and System V messages: a read on posix message queue always returns the oldest message on the highest priority, whereas a read on a System V message queue can return a message of any desired priority. Posix message queues allow to alarm the reader when a message is placed into the queue. NAME mq_open - open a message queue (REALTIME) SYNOPSIS #include <time.h> #include <mqueue.h> mqd_t mq_open(const char *name, int oflag, ...); int mq_close(mqd_t mqdes); int mq_unlink(const char *name); int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat); int mq_setattr(mqd_t mqdes, struct mq_attr *mqstat, struct mq_attr *old); struct mq_attr { long mq_flags; /* 0, O_NONBLOCK */ long mq_maxmsg; /* max number of messages */ long mq_msgsize; /* max size of a message */ long mq_curmsg; /* current number of messages */ }; int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); /* timeout if not set O_NONBLOCK */ int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); /* timeout if not set O_NONBLOCK */ ssize_t mq_timedreceive(mqd_t mqdes, char *restrict msg_ptr, size_t msg_len, unsigned *restrict msg_prio, const struct timespec *restrict abs_timeout); /* avoid polling */ int mq_notify(mqd_t mqdes, const struct sigevent *notification); The notification is either - generation of a signal - creation of a thread to execute a function The process is registered for notification for the queue If NULL == notification, then the not. event is removed SYSTEM V MESSAGE QUEUES ======================= has a more complex structure than POSIX queues NAME msgget - get a message queue identifier SYNOPSIS #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, int msgflg); int msgsnd(int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg); struct msgbuf { long mtype; /* priority, must be >0 */ char mtext[1]; /* message data */ }; struct my_msgbuf { long mtype; /* priority, must be >0 */ double my_double; /* message data 1 */ char mtext[100]; /* message data 2 */ }; ssize_t msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz, long msgtyp, int msgflg); If the type is 0, then returns the oldest message in queue. If the type is >0, then the oldest message with type returns If the type is <0, then the oldest message with the lowest type returns int msgctl(int msqid, int cmd, struct msqid_ds *buf); IPC_RMID - remove the queue IPC_SET - set header IPC_STAT - reads header POSIX SEMAPHORES ================ - Posix named semaphores (IPC names) - Posix memory-based semaphores in shared memory - System V semaphores Three fundamental operations: create a semaphore (with value 1) wait for semaphore /* start atomic operation */ while ( semaphore_values <= 0 ) ; --semaphore_value; /* end atomic operation */ post to a semaphore /* up, unlock, signal */ ++semaphore_value; producer --> shared buffer --> consumer producer consumer initailize semaphore get to 0 initialize semaphore put to 1 for ( ; ; ) for ( ; ; ) { { sem_wait(&put); sem_wait(&get); put data into buffer process data in buffer sem_post(&get); sem_post(&put); } } named semaphore: memory based semaphore: sem_open() sem_init() sem_wait() sem_trywait() sem_post() sem_getvalue() sem_close() sem_destroy() sem_unlink() NAME sem_open - initialize and open a named semaphore SYNOPSIS #include <semaphore.h> sem_t *sem_open(const char *name, int oflag, /* mode_t mode, unsigned int initial_value */ ); Error returns as SEM_FAILED, which is NOT NECCESSARY (sem_t*)(-1) int sem_close(sem_t *sem); int sem_unlink(const char *name); NAME sem_trywait, sem_wait - lock a semaphore (REALTIME) SYNOPSIS #include <semaphore.h> int sem_trywait(sem_t *sem); int sem_wait(sem_t *sem); sem_trywait does not block, but returns EAGAIN int sem_post(sem_t *sem); int sem_getvalue(sem_t *sem, int *sval); SYSTEM V SEMAPHORES =================== Able to maintain a set of semaphores NAME semget - get a semaphore set identifier SYNOPSIS #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semget(key_t key, int nsems, int semflg); key is the usual: IPC_PRIVATE or a result of ftok() the semaphores are not initialized in the set!q struct sembuf { short sem_num; /* semaphore number: 0,1, ..., nsems-1 */ short sem_op; /* operation: <0, 0, >0 */ short sem_flg; /* 0, IPC_NOWAIT, SEM_UNDO */ }; int semop(int semid, struct sembuf *sops, unsigned nsops); int semtimedop(int semid, struct sembuf *sops, unsigned nsops, struct timespec *timeout); Sops is an array of sembuf structures. Each element in the array specifies an operation for one particular semaphore in the set. The array are guaranteed to be performed atomically by the kernel. If sem_op > 0 sem_value += sem_op If sem_op == 0 wait until sem_value == 0 If sem_op < 0 wait until sem_val >= -sem_op int semctl(int semid, int semnum, int cmd, ...); SHARED MEMORY ============= NAME mmap, munmap - map or unmap files or devices into memory SYNOPSIS #include <sys/mman.h> void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset); int munmap(void *start, size_t length); int msync(void *start, size_t length, int flags); PROT_READ MAP_SHARED PROT_WRITE MAP_PRIVATE PROT_EXEC + PROT_NONE MAP_FIXED map a file or a shared memory into a process address space POSIX SHARED MEMORY =================== #include <sys/types.h> #include <sys/mman.h> int shm_open(const char *name, int oflag, mode_t mode); int shm_unlink(const char *name); int ftruncate(int fd, off_t length); int fstat(int filedes, struct stat *buf); SYSTEM V SHARED MEMORY ====================== NAME shmget - allocates a shared memory segment SYNOPSIS #include <sys/ipc.h> #include <sys/shm.h> int shmget(key_t key, int size, int shmflg); void *shmat(int shmid, const void *shmaddr, int shmflg); int shmdt(const void *shmaddr); int shmctl(int shmid, int cmd, struct shmid_ds *buf);