NetKraft

NetKraft

Project Details


Production Time - ongoing, part time, 2018 - now

Team Size - 3

Role - Programmer

Language - C#

Description


NetKraft is an open source networking library written in C# by myself and two of my friends. We originally created it for a game we were making in Unity when we learned that the included networking solution was not working for our purposes. 


It later turned into a much larger scale project since we fell further down the rabbit hole of networking, and came up with new ideas and new features we wanted to implement.


Currently NetKraft is a networking library useful for easily setting up peer to peer connections with UDP, with a powerful messaging system for easy data transfer which can utilize reliable, unreliable as well as acknowledged channels.


We are currently working on implementing native delta compression in our messages, which is important for the games we are working on using the library (mostly for testing), since world state updates are sent frequently but not altered significantly.


Client


The NetKraftClient is the socket wrapper class users can access to establish connections. It can add endpoints manually or start hosting, meaning it will listen for unhandled connections, and if a join request is received it will add that endpoint.


The client constantly runs a receive function on a seperate thread and sends of messages to the appropriate channel to be read.




namespace Netkraft
{
    public class NetkraftClient
    {
        private Socket _socket;
        private readonly List< ClientConnection> _clientConnections = new List< ClientConnection>();
        private Dictionary< IPEndPoint, ClientConnection> _iPEndPointToUnhandledConnections = null;
        private readonly Dictionary< IPEndPoint, ClientConnection> _iPEndPointToClientConnection = new Dictionary< IPEndPoint, ClientConnection>();

        private MemoryStream _instantMessageStream = new MemoryStream();
        //Receive vars
        private MemoryStream _receiveStream = new MemoryStream();
        private readonly byte[] _buffer = new byte[65536]; //UDP messages can't exceed 65507 bytes so this should always be sufficient
        private EndPoint _sender = new IPEndPoint(IPAddress.Any, 0);

        public Action< RequestJoinResponse> JoinResponseCallback = null;
        public Action< RequestJoin, ClientConnection> ClientJoinCallback = null;

        private Random _rand = new Random();
        public int FakeLossPercentage = 0;

        //Constuctor
        public NetkraftClient(int port)
        {
            _socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);//Socket that supports IPV4
            _socket.Bind(new IPEndPoint(IPAddress.Any, port));
            new Thread(Receive).Start();
        }
        private void Receive()
        {
            while(true)
            {
                int size = _socket.ReceiveFrom(_buffer, ref _sender);
                if (_iPEndPointToClientConnection.ContainsKey((IPEndPoint)_sender))
                {
                    _iPEndPointToClientConnection[(IPEndPoint)_sender].Receive(_buffer, size);
                }
                //Host handling unhandled connections
                else if(_iPEndPointToUnhandledConnections != null)
                {
                    lock (_iPEndPointToUnhandledConnections)
                    {
                        if (!_iPEndPointToUnhandledConnections.ContainsKey((IPEndPoint)_sender))
                            _iPEndPointToUnhandledConnections.Add((IPEndPoint)_sender, new ClientConnection(this, (IPEndPoint)_sender));
                        
                        _iPEndPointToUnhandledConnections[(IPEndPoint)_sender].Receive(_buffer, size);
                    }

                }
            }
        }
        internal void SendStream(MemoryStream stream, int sizeOfStreamToSend, ClientConnection client)
        {
            if (_rand.Next(1, 100) < FakeLossPercentage) return;
            _socket.SendTo(stream.GetBuffer(), 0, sizeOfStreamToSend, SocketFlags.None, client.IpEndPoint);
        }

        //Public methods!
        /// < summary>
        /// Send a message to all connected clients Immediately not wating for the next tick.
        /// < /summary>
        /// < param name="message">Message to send< /param>
        
        public void SendImmediately(object message)
        public void AddToQueue(object message)
        public void SendQueue()
        public void Host()
        public void Join(string ip, int port)

        public ClientConnection AddEndPoint(IPEndPoint ipEndPoint)
        {
            ClientConnection c = new ClientConnection(this, ipEndPoint);
            _clientConnections.Add(c);
            if(!_iPEndPointToClientConnection.ContainsKey(ipEndPoint))
                _iPEndPointToClientConnection.Add(ipEndPoint, c);
            return c;
        }
        public void ReceiveTick()
        {
            for(int i=0;i< _clientConnections.Count; i++)
                _clientConnections[i].ReceiveTick();

            //Unhandeld
            if (_iPEndPointToUnhandledConnections == null) return;
            lock (_iPEndPointToUnhandledConnections)
            {
                foreach (ClientConnection CC in _iPEndPointToUnhandledConnections.Values)
                    CC.ReceiveTickRestrictive();
            }
        }
    }
}


    

Channels


Channels are used for reading messages and running any code that the user has specified should be run when a message of a specific type is received. The user can run a clients ReceiveTick method to read a message on the main thread. Messages can also be sent either by calling SendTick, which sends all buffered messages, or by calling send immedietly to send a method without waiting.



namespace Netkraft
{
    //Supports both Reliable messages Wand reliable Ackwnoalged messages!
    class ReliableChannel : Channel
    {
        private MemoryStream _queueMessageStream = new MemoryStream();
        private MemoryStream _receiveStream = new MemoryStream();
        private int _messagesInReceiveStream = 0;
        private List< object>[] _messageQueues = new List< object>[64];
        private List< ReliableAcknowledgmentMessage> _reliableMessagesToSend = new List< ReliableAcknowledgmentMessage>();
        private bool _messageHasBeenAdded = false;
        private int _currentId = 0;
        private Acknowledger _acker;

        public ReliableChannel(NetkraftClient masterClient, ClientConnection connection)
        {
            _masterClient = masterClient;
            _connection = connection;
            _acker = new Acknowledger((x) =>
            {
                //Console.WriteLine("Acknowledge: " + x);
                foreach (object o in _messageQueues[x % 64])
                {
                    if (o is IAcknowledged)
                        ((IAcknowledged)o).OnAcknowledgment(connection);
                }
                _messageQueues[x % 64].Clear();

            });
            for (int i = 0; i < 64; i++)
            {
                _messageQueues[i] = new List< object>();
            }
        }

        public override void AddToQueue(object message)
        public override void SendImmediately(object message)
        
        public override void SendQueue()
        {
            //Resend all old message that have not been acknowledged
            for (int i = 0; i < 64; i++)
                SendQueueSpecific(i);
            //Send current message
            SendQueueSpecific(_currentId % 64);
            //Reseting
            if (_messageHasBeenAdded)
            {
                _acker.OnSendMessage(_currentId % 64);
                _currentId++;
                _messageQueues[_currentId % 64].Clear();
            }
            _messageHasBeenAdded = false;
        }
        //Internal methods Called on Recive Thread!
        public override void Receive(byte[] buffer, int size)
        {
            lock (_receiveStream)
            {
                byte id = (byte)BitConverter.ToChar(buffer, 1);
                if (_acker.MessageHasBeenReceived(id)) return;//Stop the same message from being recived multiple times.
                _acker.OnReceiveMessage(id);
                uint mask = _acker.GetReceiveMaskForId(id);//Acknowledge ID
                _messagesInReceiveStream += BitConverter.ToUInt16(buffer, 2);//Amount of messages encoded
                _receiveStream.Write(buffer, 4, size - 4); //4 because of header (byte + byte + ushort)
                lock (_reliableMessagesToSend)
                    _reliableMessagesToSend.Add(new ReliableAcknowledgmentMessage { Mask = mask, Id = id });
            }
        }
        //Called on main thread.
        public override void ReceiveTick()
        {
            //TODO: make a permenant solution
            lock (_receiveStream)
            {
                if (_messagesInReceiveStream == 0) return;
                //Read
                _receiveStream.Seek(0, SeekOrigin.Begin);
                for (int i = 0; i < _messagesInReceiveStream; i++)
                {
                    object mess = Message.ReadMessage(_receiveStream, _connection);
                    ((IReliableMessage)mess).OnReceive(_connection);
                }

                //Send reliable ack
                lock (_reliableMessagesToSend)
                    foreach (ReliableAcknowledgmentMessage RAM in _reliableMessagesToSend)
                        _connection.AddToQueue(RAM);
                //Reset
                _reliableMessagesToSend.Clear();
                _receiveStream.Seek(0, SeekOrigin.Begin);
                _messagesInReceiveStream = 0;
            }
        }
        private struct ReliableAcknowledgmentMessage : IUnreliableMessage
        {
            public uint Mask;
            public byte Id;

            public void OnReceive(ClientConnection Context)
            {
                ReliableChannel con = (ReliableChannel)Context.GetMessageChannel(ChannelId.Reliable);
                con._acker.OnReceiveAcknowledgement(Mask, Id);
            }
        }
    }
}

    

Messages and Writables


A message is simply a struct that implements a message interface. All messages will be found using system reflection and given a certain header so the receiver knows which message has been sent. Messages implemente the OnReceive method which is run after the data has been read.


By default a number of types are specified as writable (ints, floats, etc as well as arrays of those types) which means the messages can convert them into byte arrays to be sent. The user can also specify their own writable types by providing functions for converting those types into byte arrays, using the WritableFieldTypeRead and WritableFieldTypeWrite attributes.


  public static class Message
    {
        private static Dictionary< Type, ushort> _typeToMessageID = new Dictionary< Type, ushort>();
        private static Dictionary< ushort, Type> _IDToMessageType = new Dictionary< ushort, Type>();
        private static MemoryStream _copyStream = new MemoryStream();
    
        static Message()
        {
            //Console.WriteLine("Inizilaze message system");
            Assembly[] assemblies = AppDomain.CurrentDomain.GetAssemblies();
            List< Type> messages = new List< Type>();
            foreach (Assembly a in assemblies)
            {
                foreach (Type t in a.GetTypes())
                {
                    if (!typeof(IUnreliableMessage).IsAssignableFrom(t) && !typeof(IReliableMessage).IsAssignableFrom(t))
                        continue;
                    messages.Add(t);
                }
            }
            foreach (Type t in messages)
            {
                ushort id = (ushort)_typeToMessageID.Keys.Count; //TODO: make this more reliable since both clients need all messages in the same order.
                _typeToMessageID.Add(t, id);
                _IDToMessageType.Add(id, t);
            }
        }
    }
    



            [WritableFieldTypeWrite(typeof(float))]
        internal static void WriteSingle(Stream stream, object value)
        {
            stream.Write(BitConverter.GetBytes((float)value), 0, 4);
        }
        [WritableFieldTypeRead(typeof(float))]
        internal static object ReadSingle(Stream stream)
        {
            stream.Read(buffer, 0, 4);
            return BitConverter.ToSingle(buffer, 0);
        }
        [WritableFieldTypeWrite(typeof(double))]
        internal static void WriteDouble(Stream stream, object value)
        {
            stream.Write(BitConverter.GetBytes((double)value), 0, 8);
        }
        [WritableFieldTypeRead(typeof(double))]
        internal static object ReadDouble(Stream stream)
        {
            stream.Read(buffer, 0, 8);
            return BitConverter.ToDouble(buffer, 0);
        }
        //Misc
        [WritableFieldTypeWrite(typeof(string))]
        internal static void WriteString(Stream stream, object value)
        {
            //Header
            byte[] bytes = Encoding.UTF8.GetBytes((string)value);
            ushort length = (ushort)bytes.Length;
            stream.Write(BitConverter.GetBytes(length), 0, 2);

            //Body
            stream.Write(bytes, 0, length);
        }
        [WritableFieldTypeRead(typeof(string))]
        internal static object ReadString(Stream stream)
        {
            //Header
            stream.Read(buffer, 0, 2);
            ushort length = BitConverter.ToUInt16(buffer, 0);

            //Body
            byte[] body = new byte[length];
            stream.Read(body, 0, length);
            return Encoding.UTF8.GetString(body);
        }