Interprocess Communication
===========================

Applications:

1. One huge moonolithic program does everything
   Functions are exchange information via parameters

2. Multiple programs that communicate with each other
   IPC, like shell pipelines

3. One program with multiply threads
   IPC (even though it is between threads and not between processes)


Tools:

1. message passing (pipes, FIFOs, message queues)
2. synchronization (mutexes, cond. variables, locks, semaphores)
3. shared memory
4. RPC


History:

- Early programs used filesystem tricks
- Record locking (88)
- System V message queues, shared memory, semaphores (88)
- Posix message queues, shared memory, semaphores (93)
- Mutexes, cond.variables (95)
- Read-write locks
- Threads (pthreads: 95)


     ____     ____       ____    ____       ____    ____    ____
    |    |   |    |     |    |  |    |     |    |  |    |  |    |
    |proc|   |proc|     |proc|  |proc|     |proc|  |shm |  |proc|
    |____|   |____|     |____|  |____|     |____|  |____|  |____|
       |       |            |     |           |_____|  |____|
       |       |            |     |
    ---+-------+------------+-----+-------------------------------
       |       |            |     |   +-pipe
       |       |          shared info-+-message queue
       |       |                      +-semaphore
    ---+-------+-------------------------------------------------
       |       |
       |       |
      filesystem



Persistence:

- process-persistent IPC:
    exists until last process with IPC object closes the object
    (pipes, FIFO)

- kernel-persistent IPC:
    exists until kernel reboots or IPC object is explicitly deleted
    (message queues, semaphores, shared memory)

- filesystem-persistent IPC:
    exists until IPC object is explicitly deleted
    (message queues, semaphore, shared memory implemented mapped files)


Names:

    Pipe                    no name             descriptor
    FIFO                    pathname            descriptor

    Posix mutex             no name             pthread_mutex_t *
    Posix cond.variable     no name             pthread_cond_t *
    Poxix read-write lock   no name             pthread_rwlock_t *
    fcntl record lock       pathname            descriptor

    Posix message queue     Posix IPC name      mqd_t (value)
    Posix named semaphore   Posix IPC name      sem_t *
    Posix memory-based sem. no name             sem_t *
    Posix shared memory     Posix IPC name      descriptor

    System V. message queue Sys.V. IPC id       key_t (key)
    System V. semaphore     Sys.V. IPC id       key_t (key)
    System V. shared memory Sys.V. IPC id       key_t (key)

    TCP socket              IP addr + TCP port  descriptor
    UDP socket              IP addr + UDP port  descriptor
    UNIX domain socket      pathname            descriptor



POSIX IPC
==========

Functions:

        message queue   semaphore           shared memory

        <mqueue.h>      <semaphore.h>       <sys/mman.h>

        mq_open         sem_open            shm_open
        mq_close        sem_close
        mq_unlink       sm_unlink           shm_unlink

        mq_getattr                          ftruncate
        mq_setaddr                          fstat

        mq_send         sem_wait            mmap
        mq_receive      sem_trywait         munmap
        mq_notify       sem_post
                        sem_getvalue


Names:

    Conform to pathnames. If starts with "/", than the same
    for all processes, otherwise implementation dependent.


Flags:

    <sys/stat.h>


            mq_open     sem_open    shm_open

read-only   O_RDONLY                O_RDONLY
write-only  O_WRONLY
read-write  O_RDWR                  O_RDWR

create      O_CREAT     O_CREAT     O_CREAT
exclusive   O_EXCL      O_EXCL      O_EXCL

nonblocking O_NONBLOCK
truncate                            O_TRUNC

On create the user modes are required:

    S_IRUSR, S_IWUSR, S_IRGRP, S_IWGRP, S_IROTH, S_IWOTH

When a new object is created the effective user id, group id
are set like for files. If O_EXCL is set and the object is exists
than the call fails.



SYSTEM V. IPC
=============

Functions:

        message queue       semaphore       shared memory

        <sys/msg.h>         <sys/sem.h>     <sys/shm.h>

        msgget              semget          shmget
        msgctl              semctl          shmctl
        msgsnd              semop           shmat
        msgrcv                              smhdt


Names:

    NAME
       ftok  -  convert a pathname and a project identifier to
       a System V IPC key

    SYNOPSIS
       # include <sys/types.h>
       # include <sys/ipc.h>

       key_t ftok(const char *pathname, int proj_id);

       The server and the client(s) agree with a pathname,
       and create keys with the proj_id.

       Typical implementation combine filesystem info + i-node number
       of the pathname + 8 bits from id.


    IPC_PRIVATE
        generates a unique IPC object.


List and remove:

    $ ipcs
    ------ Shared Memory Segments --------
    key        shmid      owner      perms      bytes      nattch     status
    0x00000000 2359297    gsd       600        393216     2          dest
    0x00000000 2490370    gsd       600        393216     2          dest
    0x00000000 2162691    gsd       600        393216     2          dest
    0x00000000 2195460    gsd       600        393216     2          dest
    0x00000000 2228229    gsd       600        393216     2          dest
    0x00000000 2260998    gsd       600        393216     2          dest
    0x00000000 2293767    gsd       600        393216     2          dest
    0x00000000 2523144    gsd       600        393216     2          dest
    0x00000000 2555913    gsd       600        393216     2          dest
    0x00000000 2588682    gsd       600        393216     2          dest
    0x00000000 2621451    gsd       600        393216     2          dest
    0x00000000 2654220    gsd       600        393216     2          dest
    0x00000000 2686989    gsd       600        393216     2          dest
    0x00000000 2719758    gsd       600        393216     2          dest
    0x00000000 2752527    gsd       600        393216     2          dest

    ------ Semaphore Arrays --------
    key        semid      owner      perms      nsems
    0x4d02dc3c 0          gsd       600        8

    ------ Message Queues --------
    key        msqid      owner      perms      used-bytes   messages


    NAME
       ipcrm  -  remove an XSI message queue, semaphore set, or shared memory
       segment identifier

    SYNOPSIS
       ipcrm [ -q msgid | -Q msgkey | -s semid | -S semkey |
              -m shmid | -M shmkey ] ...





PIPES
=====

    NAME
       pipe - create pipe

    SYNOPSIS
       #include <unistd.h>

       int pipe(int filedes[2]);


    filedes[0]: read
    filedes[1]: write


        fd[1] --> pipe --> fd[0]


Full-duplex pipe
================

    NAME
       socketpair - create a pair of connected sockets

    SYNOPSIS
       #include <sys/types.h>
       #include <sys/socket.h>

       int socketpair(int d, int type, int protocol, int sv[2]);

       Its a full-duplex pipe, so fd[0] reads/writes from/to fd[1]

        fd[0] <--> pipe <--> fd[1]


Popen and pclose
================

    NAME
       popen, pclose - process I/O

    SYNOPSIS
       #include <stdio.h>

       FILE *popen(const char *command, const char *type);

       int pclose(FILE *stream);

       Creates a new process (with fork+exec) and if type is
       - "r" the calling process reads the standard output of the command
       - "w" the calling process writes the standard input of the command



FIFO
====

    $ mkfifo [OPTION] NAME...

    NAME
       mkfifo - make a FIFO special file (a named pipe)

    SYNOPSIS
       #include <sys/types.h>
       #include <sys/stat.h>

       int mkfifo(const char *pathname, mode_t mode);
       int open(const char *pathname, int flags);
       int close(int fd);


Set the nonblocking flag:

    writefd = open( fifoname, O_WRONLY|O_NONBLOCK, 0);

vagy

    int flags;

    if ( (flags = fcntl( fd, F_GETFL, 0)) < 0 )
        error();
    flags |= O_NONBLOCK;
    if ( (flags = fcntl( fd, F_SETFL, 0)) < 0 )
        error();


If there is one server and more clients:

    The clienst write into the same FIFO
    and the server writes to the cliens' FIFOs.


POSIX message queues
====================

    msg1(pr1) <-- msg2(pr2) <-- ... <-- msgN(prN)

    Difference between messages and pipes: no need for reader when
    a message is written.
    The writer can exit and a reader later reads the message. (Message
    queues are kernel-persistent).

    Difference between Posix and System V messages: a read on posix
    message queue always returns the oldest message on the highest
    priority, whereas a read on a System V message queue can return
    a message of any desired priority. Posix message queues allow to
    alarm the reader when a message is placed into the queue.

    NAME
       mq_open - open a message queue (REALTIME)

    SYNOPSIS
       #include <time.h>
       #include <mqueue.h>

       mqd_t mq_open(const char *name, int oflag, ...);
       int mq_close(mqd_t mqdes);
       int mq_unlink(const char *name);

       int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);
       int mq_setattr(mqd_t mqdes, struct mq_attr *mqstat, struct mq_attr *old);

       struct mq_attr {
           long mq_flags;   /* 0, O_NONBLOCK */
           long mq_maxmsg;  /* max number of messages */
           long mq_msgsize; /* max size of a message */
           long mq_curmsg;  /* current number of messages */
       };

       int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
              unsigned msg_prio);

       /* timeout if not set O_NONBLOCK */
       int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
           unsigned msg_prio, const struct timespec *abs_timeout);

       ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
              unsigned *msg_prio);

       /* timeout if not set O_NONBLOCK */
       ssize_t mq_timedreceive(mqd_t mqdes, char *restrict msg_ptr,
              size_t msg_len, unsigned *restrict msg_prio,
              const struct timespec *restrict abs_timeout);

       /* avoid polling */
       int mq_notify(mqd_t mqdes, const struct sigevent *notification);

        The notification is either
        - generation of a signal
        - creation of a thread to execute a function

        The process is registered for notification for the queue
        If NULL == notification, then the not. event is removed



SYSTEM V MESSAGE QUEUES
=======================

    has a more complex structure than POSIX queues

    NAME
       msgget - get a message queue identifier

    SYNOPSIS
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>

       int msgget(key_t key, int msgflg);
       int msgsnd(int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg);

       struct msgbuf {
           long mtype;      /* priority, must be >0 */
           char mtext[1];   /* message data */
       };

       struct my_msgbuf {
           long   mtype;      /* priority, must be >0 */
           double my_double;  /* message data 1 */
           char   mtext[100]; /* message data 2 */
       };

       ssize_t  msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz,
                       long msgtyp, int msgflg);

    If the type is 0, then returns the oldest message in queue.
    If the type is >0, then the oldest message with type returns
    If the type is <0, then the oldest message with the lowest type returns

       int msgctl(int msqid, int cmd, struct msqid_ds *buf);

    IPC_RMID - remove the queue
    IPC_SET  - set header
    IPC_STAT - reads header



POSIX SEMAPHORES
================

- Posix named semaphores  (IPC names)
- Posix memory-based semaphores in shared memory
- System V semaphores


Three fundamental operations:

    create a semaphore  (with value 1)

    wait for semaphore

        /* start atomic operation */
        while ( semaphore_values <= 0 )
            ;
        --semaphore_value;
        /* end atomic operation */

    post to a semaphore /* up, unlock, signal */

        ++semaphore_value;



    producer --> shared buffer --> consumer

    producer                        consumer

    initailize semaphore get to 0
    initialize semaphore put to 1

    for ( ; ; )                     for ( ; ; )
    {                               {
        sem_wait(&put);                 sem_wait(&get);
        put data into buffer            process data in buffer
        sem_post(&get);                 sem_post(&put);
    }                               }



    named semaphore:                memory based semaphore:

    sem_open()                      sem_init()

                    sem_wait()
                    sem_trywait()
                    sem_post()
                    sem_getvalue()

    sem_close()                     sem_destroy()
    sem_unlink()

    NAME
       sem_open  -  initialize and open a named semaphore

    SYNOPSIS
       #include <semaphore.h>

       sem_t *sem_open(const char *name, int oflag,
                /* mode_t mode, unsigned int initial_value */ );

    Error returns as SEM_FAILED, which is NOT NECCESSARY (sem_t*)(-1)
       int sem_close(sem_t *sem);
       int sem_unlink(const char *name);


    NAME
       sem_trywait, sem_wait - lock a semaphore (REALTIME)

    SYNOPSIS
       #include <semaphore.h>

       int sem_trywait(sem_t *sem);
       int sem_wait(sem_t *sem);

    sem_trywait does not block, but returns EAGAIN


       int sem_post(sem_t *sem);
       int sem_getvalue(sem_t *sem, int *sval);


SYSTEM V SEMAPHORES
===================

    Able to maintain a set of semaphores

    NAME
       semget - get a semaphore set identifier

    SYNOPSIS
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/sem.h>

       int semget(key_t key, int nsems, int semflg);

   key is the usual: IPC_PRIVATE or a result of ftok()
   the semaphores are not initialized in the set!q


      struct sembuf {
        short sem_num;  /* semaphore number: 0,1, ..., nsems-1 */
        short sem_op;   /* operation: <0, 0, >0 */
        short sem_flg;  /* 0, IPC_NOWAIT, SEM_UNDO */
      };

      int semop(int semid, struct sembuf *sops, unsigned nsops);

      int semtimedop(int semid, struct  sembuf  *sops,
                        unsigned nsops, struct timespec *timeout);



     Sops is an array of sembuf structures. Each element in the array
     specifies an operation for one particular semaphore in the set.
     The array are guaranteed to be performed atomically by the kernel.

     If sem_op > 0   sem_value += sem_op
     If sem_op == 0  wait until sem_value == 0
     If sem_op < 0   wait until sem_val >= -sem_op

     int semctl(int semid, int semnum, int cmd, ...);



SHARED MEMORY
=============




    NAME
       mmap, munmap - map or unmap files or devices into memory

    SYNOPSIS
       #include <sys/mman.h>

       void  *mmap(void  *start,  size_t length, int prot,
                        int flags, int fd, off_t offset);
       int munmap(void *start, size_t length);
       int msync(void *start, size_t length, int flags);

        PROT_READ               MAP_SHARED
        PROT_WRITE              MAP_PRIVATE
        PROT_EXEC               +
        PROT_NONE               MAP_FIXED


        map a file or a shared memory into a process address space


POSIX SHARED MEMORY
===================

       #include <sys/types.h>
       #include <sys/mman.h>

       int shm_open(const char *name, int oflag, mode_t mode);
       int shm_unlink(const char *name);
       int ftruncate(int fd, off_t length);
       int fstat(int filedes, struct stat *buf);


SYSTEM V SHARED MEMORY
======================

    NAME
       shmget - allocates a shared memory segment

    SYNOPSIS
       #include <sys/ipc.h>
       #include <sys/shm.h>

       int shmget(key_t key, int size, int shmflg);
       void *shmat(int shmid, const void *shmaddr, int shmflg);
       int shmdt(const void *shmaddr);
       int shmctl(int shmid, int cmd, struct shmid_ds *buf);