Stream sender & receiverΒΆ

This example shows a score-cpp sender and receiver application that are used to multicast a stream of atomic messages on a local network.

The complete sender example is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Copyright (c) 2016 Steinwurf ApS
// All Rights Reserved
//
// THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF STEINWURF
// The copyright notice above does not evidence any
// actual or intended publication of such source code.

#include <cstdint>
#include <iostream>
#include <vector>
#include <string>
#include <thread>

#include <scorecpp/scorecpp.hpp>

int main()
{
    // Create the io_service object
    scorecpp::io_service io;

    // Create the score sender object
    scorecpp::sender sender(io);

    // Configure the destination address and port on the score sender.
    // Address can be any IPv4 address, including multicast and broadcast.
    std::error_code ec;
    sender.add_remote("224.0.0.251", 7891, ec);

    if (ec)
    {
        std::cerr << "Could not add remote. Error: " << ec.message()
                  << std::endl;
        return ec.value();
    }

    // Setup the sender to auto flush every 150ms
    sender.set_auto_flush(true); // Ensure auto flush enabled (on by default)
    sender.set_flush_timeout(150); // sender flush after 150ms idle period

    // At this point the sender is ready to send data over the network.

    // Run the io_service in another thread, so that the main thread
    // can be used to get user input.
    std::thread io_thread([&]()
    {
        io.run();
    });

    // Display some helpful instruction
    std::cout << "Enter text to send with the SCORE protocol. "
              << "Type 'Q' and press [ENTER] to exit." << std::endl;

    // Read data from standard input and write this to the stream.
    // The auto flushing should avoid buffering too much data, and will
    // limit the transmission delay.
    // data. This lowers the decoding delay.
    while (true)
    {
        std::string input;
        std::getline(std::cin, input);

        std::vector<uint8_t> data(input.begin(), input.end());
        sender.write_data(data.data(), data.size());

        // If input is the EOT message, we stop the io_service when the send
        // queue has been emptied, and do not send any more messages.
        if (data.size() == 1 && (input[0] == 'Q' || input[0] == 'q'))
        {
            // Ensure that the EOT message is transmitted and everything is
            // queued for transmission immediately.
            sender.flush();

            sender.set_on_send_queue_threshold_callback(0, [&io]()
            {
                io.stop();
                std::cout << "IO service stopped." << std::endl;
            });
            break;
        }
    }

    // Wait for the io_service to stop when all transmissions are finished.
    io_thread.join();

    return 0;
}

After creating an io_service and a sender object, we configure the destination address for the sender with set_remote. We set the end-of-transmission callback to stop the io_service when the sender completes the transmission of all data and repair packets. This should happen after the user closes the stream by typing Q.

We start a separate thread for running the io_service (this is the event loop that drives the sender and all network operations), because we will use the main thread to get input from the user. We read lines from standard input in a whileloop, and each line is written to the sender as a separate atomic message. If the message is the end-of-transmission message (‘Q’), we call flush after write_data to ensure the internal buffers are cleared and everything is queued for transmission. After this we set a callback to be executed when the sender transmission queue is empty (everything is sent). Here we just stop the IO service.

The flush function should only be called when it should be certain that no data is buffered within the sender. For example before shutting down the sender. It may also be used before a break or pause in the data stream will occur. However for such cases the sender has a built-in auto flush mechanism that will trigger a flush after an idle period, ensuring that no data will be buffered in the sender for long periods of time.

We exit the while loop if the user types Q or q. The receivers will stop after receiving the end-of-transmission message (“Q” or “q”). After this, we join io_thread to wait for the io_service to stop. The io_service is stopped in the callback given to set_on_send_queue_threshold_callback. This means that the thread stops when all data has been pushed to the network.

The code for the corresponding receiver application is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Copyright (c) 2016 Steinwurf ApS
// All Rights Reserved
//
// THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF STEINWURF
// The copyright notice above does not evidence any
// actual or intended publication of such source code.

#include <cstdint>
#include <iostream>

#include <scorecpp/scorecpp.hpp>

int main()
{
    // Create the io_service object
    scorecpp::io_service io;

    // Create the score receiver object
    scorecpp::receiver receiver(io);

    // Bind the score receiver to a specific address and port
    // The address can be a local IPv4 address or a multicast/broadcast address
    // It can also be "0.0.0.0" to listen on all local interfaces
    std::error_code ec;
    receiver.bind("224.0.0.251", 7891, ec);

    if (ec)
    {
        std::cerr << "Could not bind score receiver. Error: " << ec.message()
                  << std::endl;
        return ec.value();
    }

    // This callback function will be executed when some data is received.
    // In this example, we just print how many bytes were received.
    auto read_data = [&io](uint8_t* data, uint32_t size)
    {
        // We use a single-byte message with value 'Q' or 'q' as the
        // end-of-transmission message.
        if (size == 1 && (data[0] == 'Q' || data[0] == 'q'))
        {
            // Shut down the io_service, as no more data will come
            io.stop();
            std::cout << "End of transmission. IO service stopped" << std::endl;
        }
        else
        {
            std::cout << "Message received: " << std::string((char*)data, size)
                      << std::endl;
        }
    };
    // Set the callback to be used when data is received
    receiver.set_data_ready_callback(read_data);

    // Run the event loop of the io_service
    io.run();

    return 0;
}

The receiver is less complex in this case, since we only run the main thread where we print the incoming messages in the read_data lambda function. This callback function is invoked for each atomic message that is received from the sender. It is guaranteed that the messages are delivered in order, but some messages might be lost under poor network conditions. The score sender can be configured to compensate a certain level of packet loss or to automatically adjust the data redundancy based on the feedback from the receivers. For these adjustments, see the Features section.