Stream sender & receiverΒΆ

This example shows a score sender and receiver application that are used to multicast a stream of atomic messages on a local network.

The complete sender example is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Copyright (c) 2017 Steinwurf ApS
// All Rights Reserved
//
// THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF STEINWURF
// The copyright notice above does not evidence any
// actual or intended publication of such source code.

#include <cstdint>
#include <iostream>
#include <vector>
#include <string>
#include <thread>

#include <score/api/udp_sender.hpp>

int main()
{
    // Create the io_service object
    score::api::io_service io;

    // Create the sender object
    score::api::udp_sender sender(io);

    // Configure the destination address and port on the sender.
    // Address can be any IPv4 address, including multicast and broadcast.
    std::error_code error;
    sender.add_remote("224.0.0.251", 7891, error);

    if (error)
    {
        std::cerr << "Could not add remote. Error: " << error.message()
                  << std::endl;
        return error.value();
    }

    sender.set_error_message_callback([&io](
        std::string operation, std::error_code error)
    {
        std::cerr << "Error in " << operation << ": " << error.message()
                  << std::endl;

        // Stop if something went wrong
        io.stop();
    });

    // At this point the sender is ready to send data over the network.

    // Run the io_service in another thread, so that the main thread
    // can be used to get user input.
    std::thread io_thread([&]()
    {
        io.run();
    });

    // Display some helpful instruction
    std::cout << "Enter text to send with the SCORE protocol. "
              << "Type 'Q' and press [ENTER] to exit." << std::endl;

    // Read data from standard input and write this to the stream.
    // The auto flushing should avoid buffering too much data, and will
    // limit the transmission delay.
    // data. This lowers the decoding delay.
    while (true)
    {
        std::string input;
        std::getline(std::cin, input);
        if (input.size() == 0)
            continue;

        sender.write_message((uint8_t*)input.c_str(), input.size());

        // Ensure that the message is transmitted immediately
        // This is not recommended in a real streaming application, but it
        // is quite useful in a chat example like this.
        sender.flush();

        // If input is the EOT message, we stop the io_service when the send
        // queue has been emptied, and do not send any more messages.
        if (input.size() == 1 && (input[0] == 'Q' || input[0] == 'q'))
        {
            sender.set_on_queue_empty_callback([&io]()
            {
                io.stop();
                std::cout << "IO service stopped." << std::endl;
            });
            break;
        }
    }

    // Wait for the io_service to stop when all transmissions are finished.
    io_thread.join();

    std::cout << "Sent: " << sender.sent_bytes() << " bytes" << std::endl;

    return 0;
}

After creating an io_service and a sender object, we configure the destination address for the sender with set_remote. We set the end-of-transmission callback to stop the io_service when the sender completes the transmission of all data and repair packets. This should happen after the user closes the stream by typing Q.

We start a separate thread for running the io_service (this is the event loop that drives the sender and all network operations), because we will use the main thread to get input from the user. We read lines from standard input in a while loop, and each line is written to the sender as a separate atomic message. We call flush after each write_data to ensure that the messages are transmitted immediately. Calling ``flush()`` after each message is not recommended in a real application, but this example functions like a multicast chat program, and we want to deliver the messages to receivers right away. The flush function should only be called when we want to ensure that no data is buffered within the sender. For example, before shutting down the sender. It may also be used before a break or pause in the data stream will occur.

If the input message is the end-of-transmission message (‘Q’), then we set a callback to be executed when the sender transmission queue gets empty (everything is sent). We will stop the IO service when this happens.

We also exit the while loop if the user types Q or q. After this, we join the io_thread to wait for the io_service to stop when all data has been pushed to the network.

The code for the corresponding receiver application is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// Copyright (c) 2017 Steinwurf ApS
// All Rights Reserved
//
// THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF STEINWURF
// The copyright notice above does not evidence any
// actual or intended publication of such source code.

#include <cstdint>
#include <vector>
#include <string>
#include <iostream>

#include <score/api/udp_receiver.hpp>

int main()
{
    // Create the io_service object
    score::api::io_service io;

    // Create the receiver object
    score::api::udp_receiver receiver(io);

    // Bind the score receiver to a specific address and port
    // The address can be a local IPv4 address or a multicast/broadcast address
    // It can also be "0.0.0.0" to listen on all local interfaces
    std::error_code error;
    receiver.bind("224.0.0.251", 7891, error);

    if (error)
    {
        std::cerr << "Could not bind score receiver. Error: " << error.message()
                  << std::endl;
        return error.value();
    }

    receiver.set_error_message_callback([](
        std::string operation, std::error_code error)
    {
        std::cerr << "Error in " << operation << ": " << error.message()
                  << std::endl;
    });

    // This callback function will be executed when a message is received.
    // In this example, we just print how many bytes were received.
    auto read_message = [&io](const auto& message)
    {
        // We use a single-byte message with value 'Q' or 'q' as the
        // end-of-transmission message.
        if (message.size() == 1 && (message[0] == 'Q' || message[0] == 'q'))
        {
            // Shut down the io_service, as no more data will come
            io.stop();
            std::cout << "End of transmission. IO service stopped" << std::endl;
        }
        else
        {
            std::cout << "Message received: "
                      << std::string((char*)message.data(), message.size())
                      << std::endl;
        }
    };
    // Set the callback to be used when a message is received
    receiver.set_message_ready_callback(read_message);

    // Run the event loop of the io_service
    io.run();

    return 0;
}

The receiver is less complex in this case, since we only run the main thread where we print the incoming messages in the read_data lambda function. This callback function is invoked for each atomic message that is received from the sender. It is guaranteed that the messages are delivered in order, but some messages might be lost under poor network conditions. The score sender can be configured to compensate a certain level of packet loss or to automatically adjust the data redundancy based on the feedback from the receivers. For these adjustments, see the Features section.