Loading tnl2/event_connection.h +1 −1 Original line number Diff line number Diff line Loading @@ -437,7 +437,7 @@ public: } } event_connection() event_connection(bool is_initiator = false) : net_connection(is_initiator) { // event management data: Loading tnl2/ghost_connection.h +76 −44 Original line number Diff line number Diff line Loading @@ -34,6 +34,10 @@ public: type_database *_type_database; void set_type_database(type_database *type_db) { _type_database = type_db; } protected: /// Override of event_connection's alloc_notify, to use the ghost_packet_notify structure. Loading Loading @@ -148,7 +152,7 @@ protected: { parent::prepare_write_packet(); if(!does_ghost_from() && !_ghosting) if(!does_ghost_from() || !_ghosting) return; // first step is to check all our polled ghosts: Loading @@ -168,10 +172,12 @@ protected: if(!(walk->flags & (ghost_info::scope_local_always))) walk->flags &= ~ghost_info::in_scope; } if(_scope_object) { logprintf("performing scope query."); _scope_object->perform_scope_query(this); } } /// Override to write ghost updates into each packet. void write_packet(bit_stream &bstream, packet_notify *pnotify) Loading @@ -190,6 +196,7 @@ protected: if(!bstream.write_bool(_ghosting && _scope_object.is_valid())) return; logprintf("Filling packet -- %d ghosts to update", _ghost_zero_update_index); // fill a packet (or two) with ghosting data // 2. call scoped objects' priority functions if the flag set is nonzero Loading Loading @@ -273,7 +280,7 @@ protected: if(walk->flags & ghost_info::not_yet_ghosted) { uint32 class_index = type_rep->class_index; bstream.write_ranged_uint32(class_index, 0, _type_database->get_indexed_class_count()); bstream.write_ranged_uint32(class_index, 0, _type_database->get_indexed_class_count() - 1); is_initial_update = true; } Loading @@ -283,19 +290,7 @@ protected: // now loop through all the fields, and if it's in the update mask, blast it into the bit stream: void *object_pointer = (void *) walk->obj; while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) { type_database::field_rep *field_rep = i.value(); uint32 state_mask = 1 << field_rep->state_index; if(update_mask & state_mask) if(!field_rep->write_function(bstream, (void *) ((uint8 *)object_pointer + field_rep->offset))) returned_mask |= state_mask; } type_rep = type_rep->parent_class; } returned_mask = write_object_update(bstream, object_pointer, type_rep, update_mask); if(is_initial_update) { Loading @@ -307,7 +302,7 @@ protected: walk->type_rep->total_update_count++; walk->type_rep->total_update_bit_total += bstream.get_bit_position() - start_position; } TorqueLogMessageFormatted(LogGhostConnection, ("ghost_connection %s GHOST %d", walk->type_rep->name.c_str(), bstream.get_bit_position() - 16 - start_position)); TorqueLogMessageFormatted(LogGhostConnection, ("ghost_connection %s GHOST %d", walk->type_rep->name.c_str(), bstream.get_bit_position() - start_position)); assert((returned_mask & (~update_mask)) == 0); // Cannot set new bits in packUpdate return } Loading Loading @@ -401,14 +396,17 @@ protected: } else { int32 start_position = bstream.get_bit_position(); uint32 end_position = 0; void *object_pointer; type_database::type_rep *ttr; if(!_local_ghosts[index]) // it's a new ghost... cool { uint32 class_index = bstream.read_ranged_uint32(0, _type_database->get_indexed_class_count() - 1); type_database::type_rep *type_rep = _type_database->get_indexed_class(class_index); ttr = type_rep; uint32 instance_size = type_rep->type->size; object_pointer = operator new(instance_size); Loading @@ -426,16 +424,7 @@ protected: _local_ghosts[index] = obj; is_initial_update = true; while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) { // on initial update, read all the fields type_database::field_rep *field_rep = i.value(); field_rep->read_function(bstream, (void *) ((uint8 *)object_pointer + field_rep->offset)); } type_rep = type_rep->parent_class; } read_object_update(bstream, object_pointer, type_rep, 0xFFFFFFFF); if(!obj->on_ghost_add(this)) throw tnl_exception_ghost_add_failed; Loading @@ -444,8 +433,19 @@ protected: { object_pointer = (void *) _local_ghosts[index]; type_database::type_rep *type_rep = _local_ghosts[index]->_type_rep; ttr = type_rep; uint32 update_mask = bstream.read_integer(type_rep->max_state_index); read_object_update(bstream, object_pointer, type_rep, update_mask); } TorqueLogMessageFormatted(LogGhostConnection, ("ghost_connection %s read GHOST %d", ttr->name.c_str(), bstream.get_bit_position() - start_position)); } } } uint32 write_object_update(bit_stream &bstream, void *object_pointer, type_database::type_rep *type_rep, uint32 update_mask) { uint32 returned_mask = 0; while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) Loading @@ -454,12 +454,42 @@ protected: uint32 state_mask = 1 << field_rep->state_index; if(update_mask & state_mask) field_rep->read_function(bstream, (void *) ((uint8 *)object_pointer + field_rep->offset)); { bool res; void *field_ptr = (void *) ((uint8 *)object_pointer + field_rep->offset); if(field_rep->compound_type) res = field_rep->compound_type->write(bstream, field_ptr); else res = field_rep->write_function(bstream, field_ptr); if(!res) returned_mask |= state_mask; } } type_rep = type_rep->parent_class; } return returned_mask; } void read_object_update(bit_stream &bstream, void *object_pointer, type_database::type_rep *type_rep, uint32 update_mask) { while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) { type_database::field_rep *field_rep = i.value(); uint32 state_mask = 1 << field_rep->state_index; if(update_mask & state_mask) { void *field_ptr = (void *) ((uint8 *)object_pointer + field_rep->offset); if(field_rep->compound_type) field_rep->compound_type->read(bstream, field_ptr); else field_rep->read_function(bstream, field_ptr); } } type_rep = type_rep->parent_class; } } Loading Loading @@ -583,7 +613,7 @@ protected: virtual void on_end_ghosting() {} public: ghost_connection() ghost_connection(bool is_initiator = false) : event_connection(is_initiator) { // ghost management data: _scope_object = NULL; Loading @@ -595,6 +625,7 @@ public: _ghost_lookup_table = NULL; _local_ghosts = NULL; _ghost_zero_update_index = 0; register_rpc_methods(); } ~ghost_connection() Loading Loading @@ -687,12 +718,13 @@ public: { if (!_scoping || !does_ghost_from()) return; if (!object->is_ghostable()) return; type_database::type_rep *type_rep = _type_database->find_type(object->get_type_record()); assert(type_rep != 0); assert(object->_interface == 0 || object->_interface == _interface); object->_interface = _interface; int32 index = object->get_hash_id() & ghost_lookup_table_mask; Loading tnl2/net_connection.h +18 −7 Original line number Diff line number Diff line Loading @@ -125,12 +125,12 @@ public: delete note; } torque_connection get_torque_connection() torque_connection_id get_torque_connection() { return _connection; } void set_torque_connection(torque_connection connection) void set_torque_connection(torque_connection_id connection) { _connection = connection; } Loading Loading @@ -196,7 +196,6 @@ public: int32 start = stream.get_bit_position(); TorqueLogMessageFormatted(LogNetConnection, ("connection %d: START", _connection) ); TorqueLogMessageFormatted(LogNetConnection, ("connection %d: END - %llu bits", _connection, stream.get_bit_position() - start) ); time send_delay = _interface->get_process_start_time() - _last_packet_recv_time; if(send_delay > time(2047)) Loading @@ -204,7 +203,10 @@ public: stream.write_integer(uint32(send_delay.get_milliseconds() >> 3), 8); write_packet(stream, note); torque_connection_send_to(_connection, stream.get_next_byte_position(), stream.get_buffer(), &_last_send_sequence); TorqueLogMessageFormatted(LogNetConnection, ("connection %d: END - %llu bits", _connection, stream.get_bit_position() - start) ); _interface->get_socket().send_to_connection(_connection, stream, &_last_send_sequence); //torque_connection_send_to(_connection, stream.get_next_byte_position(), stream.get_buffer(), &_last_send_sequence); _notify_queue_tail->sequence = _last_send_sequence; } Loading @@ -213,7 +215,7 @@ public: read_packet_rate_info(data); _last_received_send_delay = time((data.read_integer(8) << 3) + 4); _last_packet_recv_time = _interface->get_process_start_time(); read_packet(data); } /// Called to prepare the connection for packet writing. Loading Loading @@ -317,14 +319,17 @@ public: bool is_connection_host() { return !_is_initiator; } bool is_connection_initiator() { return _is_initiator; } net_connection() net_connection(bool is_initiator = false) { _is_initiator = is_initiator; _round_trip_time = 0; _send_delay_credit = time(0); _last_update_time = time(0); Loading @@ -340,6 +345,11 @@ public: _last_send_sequence = 0; } void set_interface(net_interface *interface) { _interface = interface; } net_interface *get_interface() { return _interface; Loading @@ -360,6 +370,7 @@ protected: minimum_padding_bits = 32, ///< ask subclasses to reserve at least this much. }; bool _is_initiator; net_rate _local_rate; ///< Current communications rate negotiated for this connection. net_rate _remote_rate; ///< Maximum allowable communications rate for this connection. Loading @@ -370,7 +381,7 @@ protected: packet_notify *_notify_queue_head; ///< Linked list of structures representing the data in sent packets packet_notify *_notify_queue_tail; ///< Tail of the notify queue linked list. New packets are added to the end of the tail. torque_connection _connection; torque_connection_id _connection; time _last_packet_recv_time; ///< time of the receipt of the last data packet. float32 _round_trip_time; ///< Running average round trip time. time _send_delay_credit; ///< Metric to help compensate for irregularities on fixed rate packet sends. Loading tnl2/net_interface.h +57 −27 Original line number Diff line number Diff line Loading @@ -6,7 +6,7 @@ class net_interface : public ref_object { friend class net_object; typedef hash_table_array<torque_connection, ref_ptr<net_connection> >::pointer connection_pointer; typedef hash_table_array<torque_connection_id, ref_ptr<net_connection> >::pointer connection_pointer; struct connection_type_record { uint32 identifier; Loading @@ -16,30 +16,36 @@ class net_interface : public ref_object public: void set_private_key(asymmetric_key_ptr the_key) { byte_buffer_ptr private_key = the_key->get_private_key(); torque_socket_set_private_key(_socket, private_key->get_buffer_size(), private_key->get_buffer()); _socket.set_private_key(the_key); //byte_buffer_ptr private_key = the_key->get_private_key(); //torque_socket_set_private_key(_socket, private_key->get_buffer_size(), private_key->get_buffer()); } void set_challenge_response_data(bit_stream &the_data) void set_challenge_response_data(byte_buffer_ptr data) { torque_socket_set_challenge_response_data(_socket, the_data.get_next_byte_position(), the_data.get_buffer()); _socket.set_challenge_response_data(data); //torque_socket_set_challenge_response_data(_socket, the_data.get_next_byte_position(), the_data.get_buffer()); } void allow_incoming_connections(bool allow) void set_allows_connections(bool allow) { torque_socket_allow_incoming_connections(_socket, allow); _socket.set_allows_connections(allow); //torque_socket_allow_incoming_connections(_socket, allow); } bool does_allow_incoming_connections() bool does_allow_connections() { return torque_socket_does_allow_incoming_connections(_socket); return _socket.does_allow_connections(); //return torque_socket_does_allow_incoming_connections(_socket); } void process_socket() { torque_socket_event *event; while((event = torque_socket_get_next_event(_socket)) != NULL) while((event = _socket.get_next_event()) != NULL) { logprintf("Processing event of type %d, connection_index = %d, size = %d", event->event_type, event->connection, event->data_size); switch(event->event_type) { Loading Loading @@ -158,11 +164,17 @@ public: } void check_for_packet_sends() { _process_start_time = time::get_current(); collapse_dirty_list(); for(uint32 i = 0; i < _connection_table.size(); i++) (*_connection_table[i].value())->check_packet_send(false, get_process_start_time()); { net_connection *the_connection = *_connection_table[i].value(); if(the_connection->get_connection_state() == net_connection::state_established) the_connection->check_packet_send(false, get_process_start_time()); } } void _add_connection(ref_ptr<net_connection> &the_net_connection, torque_connection the_torque_connection) void _add_connection(ref_ptr<net_connection> &the_net_connection, torque_connection_id the_torque_connection) { the_net_connection->set_torque_connection(the_torque_connection); _connection_table.insert(the_torque_connection, the_net_connection); Loading @@ -172,8 +184,9 @@ public: { bit_stream challenge_response(event->data, event->data_size); byte_buffer_ptr public_key = new byte_buffer(event->public_key, event->public_key_size); connection_pointer p = _connection_table.find(event->connection); ref_ptr<net_connection> *the_connection = _connection_table.find(event->connection).value(); ref_ptr<net_connection> *the_connection = p.value(); if(the_connection) { (*the_connection)->set_connection_state(net_connection::state_requesting_connection); Loading @@ -193,18 +206,23 @@ public: type_record *rec = find_connection_type(type_identifier); if(!rec) { torque_connection_reject(event->connection, 0, response_stream.get_buffer()); _socket.reject_connection(event->connection, response_stream); //torque_connection_reject(event->connection, 0, response_stream.get_buffer()); } net_connection *allocated = (net_connection *) operator new(rec->size); rec->construct_object(allocated); ref_ptr<net_connection> the_connection = allocated; the_connection->set_interface(this); if(the_connection->read_connect_request(request_stream, response_stream)) { _add_connection(the_connection, event->connection); torque_connection_accept(event->connection, response_stream.get_byte_position(), response_stream.get_buffer()); _socket.accept_connection(event->connection, response_stream); //torque_connection_accept(event->connection, response_stream.get_byte_position(), response_stream.get_buffer()); } else torque_connection_reject(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); _socket.reject_connection(event->connection, response_stream); //torque_connection_reject(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); } void _process_arranged_connection_request(torque_socket_event *event) Loading @@ -225,14 +243,15 @@ public: (*the_connection)->set_connection_state(net_connection::state_accepted); if(!(*the_connection)->read_connect_accept(connection_accept_stream, response_stream)) { torque_connection_disconnect(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); _socket.disconnect(event->connection, response_stream); //torque_connection_disconnect(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); p.remove(); } else /*else { (*the_connection)->set_connection_state(net_connection::state_established); (*the_connection)->on_connection_established(); } }*/ } } Loading Loading @@ -309,26 +328,37 @@ public: } void connect(SOCKADDR *connect_address, ref_ptr<net_connection> &the_connection) void connect(const address &remote_host, ref_ptr<net_connection> &the_connection) { uint8 connect_buffer[torque_max_status_datagram_size]; bit_stream connect_stream(connect_buffer, sizeof(connect_buffer)); uint32 type_identifier = get_type_identifier(the_connection); core::write(connect_stream, type_identifier); the_connection->set_interface(this); the_connection->write_connect_request(connect_stream); torque_socket_connect(_socket, connect_address, connect_stream.get_next_byte_position(), connect_buffer); torque_connection_id connection_id = _socket.connect(remote_host, connect_stream); _add_connection(the_connection, connection_id); //torque_socket_connect(_socket, connect_address, connect_stream.get_next_byte_position(), connect_buffer); //the_connection->set_torque_connection(connection_id); } virtual ~net_interface() { torque_socket_destroy(_socket); collapse_dirty_list(); _dirty_list_head._next_dirty_list = 0; } torque_socket &get_socket() { return _socket; } net_interface(SOCKADDR &bind_address) net_interface(const address &bind_address) : _socket(bind_address) { _socket = torque_socket_create(&bind_address); //_socket = torque_socket_create(&bind_address); _dirty_list_head._next_dirty_list = &_dirty_list_tail; _dirty_list_tail._prev_dirty_list = &_dirty_list_head; Loading @@ -341,6 +371,6 @@ protected: net_object _dirty_list_head; net_object _dirty_list_tail; array<connection_type_record> _connection_class_table; hash_table_array<torque_connection, ref_ptr<net_connection> > _connection_table; hash_table_array<torque_connection_id, ref_ptr<net_connection> > _connection_table; }; tnl2/net_object.h +4 −10 Original line number Diff line number Diff line Loading @@ -134,7 +134,6 @@ protected: enum { is_ghost_flag = bit(1), ///< Set if this is a ghost. ghostable_flag = bit(3), ///< Set if this ref_object can ghost at all. }; uint32 _net_flags; ///< Flags field describing this ref_object, from NetFlag. Loading @@ -150,6 +149,7 @@ public: _prev_dirty_list = NULL; _next_dirty_list = NULL; _dirty_mask_bits = 0; _net_flags = 0; _type_rep = 0; } net_object *_prev_dirty_list; Loading @@ -161,7 +161,7 @@ public: while(_first_object_ref) _first_object_ref->connection->detach_object(_first_object_ref); if(_dirty_mask_bits) if(_next_dirty_list) { _prev_dirty_list->_next_dirty_list = _next_dirty_list; _next_dirty_list->_prev_dirty_list = _prev_dirty_list; Loading Loading @@ -189,7 +189,7 @@ public: { assert(or_mask != 0); //Assert(_dirty_mask_bits == 0 || (_prev_dirty_list != NULL || _next_dirty_list != NULL || _dirty_list == this), "Invalid dirty list state."); if(!_dirty_mask_bits) if(!_dirty_mask_bits && _interface) _interface->add_to_dirty_list(this); _dirty_mask_bits |= or_mask; Loading Loading @@ -227,12 +227,6 @@ public: return (_net_flags & is_ghost_flag) != 0; } /// is_ghostable returns true if this ref_object can be ghosted to any clients. bool is_ghostable() const { return (_net_flags & ghostable_flag) != 0; } /// Return a hash for this ref_object. /// /// @note This is based on its location in memory. Loading Loading
tnl2/event_connection.h +1 −1 Original line number Diff line number Diff line Loading @@ -437,7 +437,7 @@ public: } } event_connection() event_connection(bool is_initiator = false) : net_connection(is_initiator) { // event management data: Loading
tnl2/ghost_connection.h +76 −44 Original line number Diff line number Diff line Loading @@ -34,6 +34,10 @@ public: type_database *_type_database; void set_type_database(type_database *type_db) { _type_database = type_db; } protected: /// Override of event_connection's alloc_notify, to use the ghost_packet_notify structure. Loading Loading @@ -148,7 +152,7 @@ protected: { parent::prepare_write_packet(); if(!does_ghost_from() && !_ghosting) if(!does_ghost_from() || !_ghosting) return; // first step is to check all our polled ghosts: Loading @@ -168,10 +172,12 @@ protected: if(!(walk->flags & (ghost_info::scope_local_always))) walk->flags &= ~ghost_info::in_scope; } if(_scope_object) { logprintf("performing scope query."); _scope_object->perform_scope_query(this); } } /// Override to write ghost updates into each packet. void write_packet(bit_stream &bstream, packet_notify *pnotify) Loading @@ -190,6 +196,7 @@ protected: if(!bstream.write_bool(_ghosting && _scope_object.is_valid())) return; logprintf("Filling packet -- %d ghosts to update", _ghost_zero_update_index); // fill a packet (or two) with ghosting data // 2. call scoped objects' priority functions if the flag set is nonzero Loading Loading @@ -273,7 +280,7 @@ protected: if(walk->flags & ghost_info::not_yet_ghosted) { uint32 class_index = type_rep->class_index; bstream.write_ranged_uint32(class_index, 0, _type_database->get_indexed_class_count()); bstream.write_ranged_uint32(class_index, 0, _type_database->get_indexed_class_count() - 1); is_initial_update = true; } Loading @@ -283,19 +290,7 @@ protected: // now loop through all the fields, and if it's in the update mask, blast it into the bit stream: void *object_pointer = (void *) walk->obj; while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) { type_database::field_rep *field_rep = i.value(); uint32 state_mask = 1 << field_rep->state_index; if(update_mask & state_mask) if(!field_rep->write_function(bstream, (void *) ((uint8 *)object_pointer + field_rep->offset))) returned_mask |= state_mask; } type_rep = type_rep->parent_class; } returned_mask = write_object_update(bstream, object_pointer, type_rep, update_mask); if(is_initial_update) { Loading @@ -307,7 +302,7 @@ protected: walk->type_rep->total_update_count++; walk->type_rep->total_update_bit_total += bstream.get_bit_position() - start_position; } TorqueLogMessageFormatted(LogGhostConnection, ("ghost_connection %s GHOST %d", walk->type_rep->name.c_str(), bstream.get_bit_position() - 16 - start_position)); TorqueLogMessageFormatted(LogGhostConnection, ("ghost_connection %s GHOST %d", walk->type_rep->name.c_str(), bstream.get_bit_position() - start_position)); assert((returned_mask & (~update_mask)) == 0); // Cannot set new bits in packUpdate return } Loading Loading @@ -401,14 +396,17 @@ protected: } else { int32 start_position = bstream.get_bit_position(); uint32 end_position = 0; void *object_pointer; type_database::type_rep *ttr; if(!_local_ghosts[index]) // it's a new ghost... cool { uint32 class_index = bstream.read_ranged_uint32(0, _type_database->get_indexed_class_count() - 1); type_database::type_rep *type_rep = _type_database->get_indexed_class(class_index); ttr = type_rep; uint32 instance_size = type_rep->type->size; object_pointer = operator new(instance_size); Loading @@ -426,16 +424,7 @@ protected: _local_ghosts[index] = obj; is_initial_update = true; while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) { // on initial update, read all the fields type_database::field_rep *field_rep = i.value(); field_rep->read_function(bstream, (void *) ((uint8 *)object_pointer + field_rep->offset)); } type_rep = type_rep->parent_class; } read_object_update(bstream, object_pointer, type_rep, 0xFFFFFFFF); if(!obj->on_ghost_add(this)) throw tnl_exception_ghost_add_failed; Loading @@ -444,8 +433,19 @@ protected: { object_pointer = (void *) _local_ghosts[index]; type_database::type_rep *type_rep = _local_ghosts[index]->_type_rep; ttr = type_rep; uint32 update_mask = bstream.read_integer(type_rep->max_state_index); read_object_update(bstream, object_pointer, type_rep, update_mask); } TorqueLogMessageFormatted(LogGhostConnection, ("ghost_connection %s read GHOST %d", ttr->name.c_str(), bstream.get_bit_position() - start_position)); } } } uint32 write_object_update(bit_stream &bstream, void *object_pointer, type_database::type_rep *type_rep, uint32 update_mask) { uint32 returned_mask = 0; while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) Loading @@ -454,12 +454,42 @@ protected: uint32 state_mask = 1 << field_rep->state_index; if(update_mask & state_mask) field_rep->read_function(bstream, (void *) ((uint8 *)object_pointer + field_rep->offset)); { bool res; void *field_ptr = (void *) ((uint8 *)object_pointer + field_rep->offset); if(field_rep->compound_type) res = field_rep->compound_type->write(bstream, field_ptr); else res = field_rep->write_function(bstream, field_ptr); if(!res) returned_mask |= state_mask; } } type_rep = type_rep->parent_class; } return returned_mask; } void read_object_update(bit_stream &bstream, void *object_pointer, type_database::type_rep *type_rep, uint32 update_mask) { while(type_rep) { for(dictionary<type_database::field_rep>::pointer i = type_rep->fields.first(); i; ++i) { type_database::field_rep *field_rep = i.value(); uint32 state_mask = 1 << field_rep->state_index; if(update_mask & state_mask) { void *field_ptr = (void *) ((uint8 *)object_pointer + field_rep->offset); if(field_rep->compound_type) field_rep->compound_type->read(bstream, field_ptr); else field_rep->read_function(bstream, field_ptr); } } type_rep = type_rep->parent_class; } } Loading Loading @@ -583,7 +613,7 @@ protected: virtual void on_end_ghosting() {} public: ghost_connection() ghost_connection(bool is_initiator = false) : event_connection(is_initiator) { // ghost management data: _scope_object = NULL; Loading @@ -595,6 +625,7 @@ public: _ghost_lookup_table = NULL; _local_ghosts = NULL; _ghost_zero_update_index = 0; register_rpc_methods(); } ~ghost_connection() Loading Loading @@ -687,12 +718,13 @@ public: { if (!_scoping || !does_ghost_from()) return; if (!object->is_ghostable()) return; type_database::type_rep *type_rep = _type_database->find_type(object->get_type_record()); assert(type_rep != 0); assert(object->_interface == 0 || object->_interface == _interface); object->_interface = _interface; int32 index = object->get_hash_id() & ghost_lookup_table_mask; Loading
tnl2/net_connection.h +18 −7 Original line number Diff line number Diff line Loading @@ -125,12 +125,12 @@ public: delete note; } torque_connection get_torque_connection() torque_connection_id get_torque_connection() { return _connection; } void set_torque_connection(torque_connection connection) void set_torque_connection(torque_connection_id connection) { _connection = connection; } Loading Loading @@ -196,7 +196,6 @@ public: int32 start = stream.get_bit_position(); TorqueLogMessageFormatted(LogNetConnection, ("connection %d: START", _connection) ); TorqueLogMessageFormatted(LogNetConnection, ("connection %d: END - %llu bits", _connection, stream.get_bit_position() - start) ); time send_delay = _interface->get_process_start_time() - _last_packet_recv_time; if(send_delay > time(2047)) Loading @@ -204,7 +203,10 @@ public: stream.write_integer(uint32(send_delay.get_milliseconds() >> 3), 8); write_packet(stream, note); torque_connection_send_to(_connection, stream.get_next_byte_position(), stream.get_buffer(), &_last_send_sequence); TorqueLogMessageFormatted(LogNetConnection, ("connection %d: END - %llu bits", _connection, stream.get_bit_position() - start) ); _interface->get_socket().send_to_connection(_connection, stream, &_last_send_sequence); //torque_connection_send_to(_connection, stream.get_next_byte_position(), stream.get_buffer(), &_last_send_sequence); _notify_queue_tail->sequence = _last_send_sequence; } Loading @@ -213,7 +215,7 @@ public: read_packet_rate_info(data); _last_received_send_delay = time((data.read_integer(8) << 3) + 4); _last_packet_recv_time = _interface->get_process_start_time(); read_packet(data); } /// Called to prepare the connection for packet writing. Loading Loading @@ -317,14 +319,17 @@ public: bool is_connection_host() { return !_is_initiator; } bool is_connection_initiator() { return _is_initiator; } net_connection() net_connection(bool is_initiator = false) { _is_initiator = is_initiator; _round_trip_time = 0; _send_delay_credit = time(0); _last_update_time = time(0); Loading @@ -340,6 +345,11 @@ public: _last_send_sequence = 0; } void set_interface(net_interface *interface) { _interface = interface; } net_interface *get_interface() { return _interface; Loading @@ -360,6 +370,7 @@ protected: minimum_padding_bits = 32, ///< ask subclasses to reserve at least this much. }; bool _is_initiator; net_rate _local_rate; ///< Current communications rate negotiated for this connection. net_rate _remote_rate; ///< Maximum allowable communications rate for this connection. Loading @@ -370,7 +381,7 @@ protected: packet_notify *_notify_queue_head; ///< Linked list of structures representing the data in sent packets packet_notify *_notify_queue_tail; ///< Tail of the notify queue linked list. New packets are added to the end of the tail. torque_connection _connection; torque_connection_id _connection; time _last_packet_recv_time; ///< time of the receipt of the last data packet. float32 _round_trip_time; ///< Running average round trip time. time _send_delay_credit; ///< Metric to help compensate for irregularities on fixed rate packet sends. Loading
tnl2/net_interface.h +57 −27 Original line number Diff line number Diff line Loading @@ -6,7 +6,7 @@ class net_interface : public ref_object { friend class net_object; typedef hash_table_array<torque_connection, ref_ptr<net_connection> >::pointer connection_pointer; typedef hash_table_array<torque_connection_id, ref_ptr<net_connection> >::pointer connection_pointer; struct connection_type_record { uint32 identifier; Loading @@ -16,30 +16,36 @@ class net_interface : public ref_object public: void set_private_key(asymmetric_key_ptr the_key) { byte_buffer_ptr private_key = the_key->get_private_key(); torque_socket_set_private_key(_socket, private_key->get_buffer_size(), private_key->get_buffer()); _socket.set_private_key(the_key); //byte_buffer_ptr private_key = the_key->get_private_key(); //torque_socket_set_private_key(_socket, private_key->get_buffer_size(), private_key->get_buffer()); } void set_challenge_response_data(bit_stream &the_data) void set_challenge_response_data(byte_buffer_ptr data) { torque_socket_set_challenge_response_data(_socket, the_data.get_next_byte_position(), the_data.get_buffer()); _socket.set_challenge_response_data(data); //torque_socket_set_challenge_response_data(_socket, the_data.get_next_byte_position(), the_data.get_buffer()); } void allow_incoming_connections(bool allow) void set_allows_connections(bool allow) { torque_socket_allow_incoming_connections(_socket, allow); _socket.set_allows_connections(allow); //torque_socket_allow_incoming_connections(_socket, allow); } bool does_allow_incoming_connections() bool does_allow_connections() { return torque_socket_does_allow_incoming_connections(_socket); return _socket.does_allow_connections(); //return torque_socket_does_allow_incoming_connections(_socket); } void process_socket() { torque_socket_event *event; while((event = torque_socket_get_next_event(_socket)) != NULL) while((event = _socket.get_next_event()) != NULL) { logprintf("Processing event of type %d, connection_index = %d, size = %d", event->event_type, event->connection, event->data_size); switch(event->event_type) { Loading Loading @@ -158,11 +164,17 @@ public: } void check_for_packet_sends() { _process_start_time = time::get_current(); collapse_dirty_list(); for(uint32 i = 0; i < _connection_table.size(); i++) (*_connection_table[i].value())->check_packet_send(false, get_process_start_time()); { net_connection *the_connection = *_connection_table[i].value(); if(the_connection->get_connection_state() == net_connection::state_established) the_connection->check_packet_send(false, get_process_start_time()); } } void _add_connection(ref_ptr<net_connection> &the_net_connection, torque_connection the_torque_connection) void _add_connection(ref_ptr<net_connection> &the_net_connection, torque_connection_id the_torque_connection) { the_net_connection->set_torque_connection(the_torque_connection); _connection_table.insert(the_torque_connection, the_net_connection); Loading @@ -172,8 +184,9 @@ public: { bit_stream challenge_response(event->data, event->data_size); byte_buffer_ptr public_key = new byte_buffer(event->public_key, event->public_key_size); connection_pointer p = _connection_table.find(event->connection); ref_ptr<net_connection> *the_connection = _connection_table.find(event->connection).value(); ref_ptr<net_connection> *the_connection = p.value(); if(the_connection) { (*the_connection)->set_connection_state(net_connection::state_requesting_connection); Loading @@ -193,18 +206,23 @@ public: type_record *rec = find_connection_type(type_identifier); if(!rec) { torque_connection_reject(event->connection, 0, response_stream.get_buffer()); _socket.reject_connection(event->connection, response_stream); //torque_connection_reject(event->connection, 0, response_stream.get_buffer()); } net_connection *allocated = (net_connection *) operator new(rec->size); rec->construct_object(allocated); ref_ptr<net_connection> the_connection = allocated; the_connection->set_interface(this); if(the_connection->read_connect_request(request_stream, response_stream)) { _add_connection(the_connection, event->connection); torque_connection_accept(event->connection, response_stream.get_byte_position(), response_stream.get_buffer()); _socket.accept_connection(event->connection, response_stream); //torque_connection_accept(event->connection, response_stream.get_byte_position(), response_stream.get_buffer()); } else torque_connection_reject(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); _socket.reject_connection(event->connection, response_stream); //torque_connection_reject(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); } void _process_arranged_connection_request(torque_socket_event *event) Loading @@ -225,14 +243,15 @@ public: (*the_connection)->set_connection_state(net_connection::state_accepted); if(!(*the_connection)->read_connect_accept(connection_accept_stream, response_stream)) { torque_connection_disconnect(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); _socket.disconnect(event->connection, response_stream); //torque_connection_disconnect(event->connection, response_stream.get_next_byte_position(), response_stream.get_buffer()); p.remove(); } else /*else { (*the_connection)->set_connection_state(net_connection::state_established); (*the_connection)->on_connection_established(); } }*/ } } Loading Loading @@ -309,26 +328,37 @@ public: } void connect(SOCKADDR *connect_address, ref_ptr<net_connection> &the_connection) void connect(const address &remote_host, ref_ptr<net_connection> &the_connection) { uint8 connect_buffer[torque_max_status_datagram_size]; bit_stream connect_stream(connect_buffer, sizeof(connect_buffer)); uint32 type_identifier = get_type_identifier(the_connection); core::write(connect_stream, type_identifier); the_connection->set_interface(this); the_connection->write_connect_request(connect_stream); torque_socket_connect(_socket, connect_address, connect_stream.get_next_byte_position(), connect_buffer); torque_connection_id connection_id = _socket.connect(remote_host, connect_stream); _add_connection(the_connection, connection_id); //torque_socket_connect(_socket, connect_address, connect_stream.get_next_byte_position(), connect_buffer); //the_connection->set_torque_connection(connection_id); } virtual ~net_interface() { torque_socket_destroy(_socket); collapse_dirty_list(); _dirty_list_head._next_dirty_list = 0; } torque_socket &get_socket() { return _socket; } net_interface(SOCKADDR &bind_address) net_interface(const address &bind_address) : _socket(bind_address) { _socket = torque_socket_create(&bind_address); //_socket = torque_socket_create(&bind_address); _dirty_list_head._next_dirty_list = &_dirty_list_tail; _dirty_list_tail._prev_dirty_list = &_dirty_list_head; Loading @@ -341,6 +371,6 @@ protected: net_object _dirty_list_head; net_object _dirty_list_tail; array<connection_type_record> _connection_class_table; hash_table_array<torque_connection, ref_ptr<net_connection> > _connection_table; hash_table_array<torque_connection_id, ref_ptr<net_connection> > _connection_table; };
tnl2/net_object.h +4 −10 Original line number Diff line number Diff line Loading @@ -134,7 +134,6 @@ protected: enum { is_ghost_flag = bit(1), ///< Set if this is a ghost. ghostable_flag = bit(3), ///< Set if this ref_object can ghost at all. }; uint32 _net_flags; ///< Flags field describing this ref_object, from NetFlag. Loading @@ -150,6 +149,7 @@ public: _prev_dirty_list = NULL; _next_dirty_list = NULL; _dirty_mask_bits = 0; _net_flags = 0; _type_rep = 0; } net_object *_prev_dirty_list; Loading @@ -161,7 +161,7 @@ public: while(_first_object_ref) _first_object_ref->connection->detach_object(_first_object_ref); if(_dirty_mask_bits) if(_next_dirty_list) { _prev_dirty_list->_next_dirty_list = _next_dirty_list; _next_dirty_list->_prev_dirty_list = _prev_dirty_list; Loading Loading @@ -189,7 +189,7 @@ public: { assert(or_mask != 0); //Assert(_dirty_mask_bits == 0 || (_prev_dirty_list != NULL || _next_dirty_list != NULL || _dirty_list == this), "Invalid dirty list state."); if(!_dirty_mask_bits) if(!_dirty_mask_bits && _interface) _interface->add_to_dirty_list(this); _dirty_mask_bits |= or_mask; Loading Loading @@ -227,12 +227,6 @@ public: return (_net_flags & is_ghost_flag) != 0; } /// is_ghostable returns true if this ref_object can be ghosted to any clients. bool is_ghostable() const { return (_net_flags & ghostable_flag) != 0; } /// Return a hash for this ref_object. /// /// @note This is based on its location in memory. Loading