Fixed some networking api related threading
							parent
							
								
									657a36a21a
								
							
						
					
					
						commit
						91dbf2f9b0
					
				| 
						 | 
					@ -72,7 +72,8 @@ public ISteamNetworking
 | 
				
			||||||
    class RunEveryRunCB *run_every_runcb;
 | 
					    class RunEveryRunCB *run_every_runcb;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    std::recursive_mutex messages_mutex;
 | 
					    std::recursive_mutex messages_mutex;
 | 
				
			||||||
    std::vector<Common_Message> messages;
 | 
					    std::list<Common_Message> messages;
 | 
				
			||||||
 | 
					    std::list<Common_Message> unprocessed_messages;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    std::recursive_mutex connections_edit_mutex;
 | 
					    std::recursive_mutex connections_edit_mutex;
 | 
				
			||||||
    std::vector<struct Steam_Networking_Connection> connections;
 | 
					    std::vector<struct Steam_Networking_Connection> connections;
 | 
				
			||||||
| 
						 | 
					@ -131,6 +132,17 @@ void remove_connection(CSteamID id)
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        auto msg = std::begin(unprocessed_messages);
 | 
				
			||||||
 | 
					        while (msg != std::end(unprocessed_messages)) {
 | 
				
			||||||
 | 
					            if (msg->source_id() == id.ConvertToUint64()) {
 | 
				
			||||||
 | 
					                msg = messages.erase(msg);
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                ++msg;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
SNetSocket_t create_connection_socket(CSteamID target, int nVirtualPort, uint32 nIP, uint16 nPort, SNetListenSocket_t id=0, enum steam_socket_connection_status status=SOCKET_CONNECTING, SNetSocket_t other_id=0)
 | 
					SNetSocket_t create_connection_socket(CSteamID target, int nVirtualPort, uint32 nIP, uint16 nPort, SNetListenSocket_t id=0, enum steam_socket_connection_status status=SOCKET_CONNECTING, SNetSocket_t other_id=0)
 | 
				
			||||||
| 
						 | 
					@ -817,9 +829,10 @@ void RunCallbacks()
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
    std::lock_guard<std::recursive_mutex> lock(messages_mutex);
 | 
					    std::lock_guard<std::recursive_mutex> lock(messages_mutex);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for (auto &msg : messages) {
 | 
					    {
 | 
				
			||||||
        CSteamID source_id((uint64)msg.source_id());
 | 
					        auto msg = std::begin(unprocessed_messages);
 | 
				
			||||||
        if (!msg.network().processed()) {
 | 
					        while (msg != std::end(unprocessed_messages)) {
 | 
				
			||||||
 | 
					            CSteamID source_id((uint64)msg->source_id());
 | 
				
			||||||
            if (!connection_exists(source_id)) {
 | 
					            if (!connection_exists(source_id)) {
 | 
				
			||||||
                if (new_connection_times.find(source_id) == new_connection_times.end()) {
 | 
					                if (new_connection_times.find(source_id) == new_connection_times.end()) {
 | 
				
			||||||
                    new_connections_to_call_cb.push(source_id);
 | 
					                    new_connections_to_call_cb.push(source_id);
 | 
				
			||||||
| 
						 | 
					@ -827,11 +840,13 @@ void RunCallbacks()
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                struct Steam_Networking_Connection *conn = get_or_create_connection(source_id);
 | 
					                struct Steam_Networking_Connection *conn = get_or_create_connection(source_id);
 | 
				
			||||||
                conn->open_channels.insert(msg.network().channel());
 | 
					                conn->open_channels.insert(msg->network().channel());
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            msg.mutable_network()->set_processed(true);
 | 
					            msg->mutable_network()->set_processed(true);
 | 
				
			||||||
            msg.mutable_network()->set_time_processed(current_time);
 | 
					            msg->mutable_network()->set_time_processed(current_time);
 | 
				
			||||||
 | 
					            messages.push_back(*msg);
 | 
				
			||||||
 | 
					            msg = unprocessed_messages.erase(msg);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -897,12 +912,12 @@ void Callback(Common_Message *msg)
 | 
				
			||||||
        }PRINT_DEBUG("\n");
 | 
					        }PRINT_DEBUG("\n");
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        std::lock_guard<std::recursive_mutex> lock(messages_mutex);
 | 
					 | 
				
			||||||
        if (msg->network().type() == Network::DATA) {
 | 
					        if (msg->network().type() == Network::DATA) {
 | 
				
			||||||
            messages.push_back(Common_Message(*msg));
 | 
					            unprocessed_messages.push_back(Common_Message(*msg));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (msg->network().type() == Network::NEW_CONNECTION) {
 | 
					        if (msg->network().type() == Network::NEW_CONNECTION) {
 | 
				
			||||||
 | 
					            std::lock_guard<std::recursive_mutex> lock(messages_mutex);
 | 
				
			||||||
            auto msg_temp = std::begin(messages);
 | 
					            auto msg_temp = std::begin(messages);
 | 
				
			||||||
            while (msg_temp != std::end(messages)) {
 | 
					            while (msg_temp != std::end(messages)) {
 | 
				
			||||||
                //only delete processed to handle unreliable message arriving at the same time.
 | 
					                //only delete processed to handle unreliable message arriving at the same time.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue