Index: src/include/gnunet_set_service.h =================================================================== --- src/include/gnunet_set_service.h (revision 37683) +++ src/include/gnunet_set_service.h (working copy) @@ -216,7 +216,8 @@ * * @param cls closure */ -typedef void (*GNUNET_SET_Continuation) (void *cls); +typedef void +(*GNUNET_SET_Continuation) (void *cls); /** @@ -227,9 +228,10 @@ * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK * @param status see `enum GNUNET_SET_Status` */ -typedef void (*GNUNET_SET_ResultIterator) (void *cls, - const struct GNUNET_SET_Element *element, - enum GNUNET_SET_Status status); +typedef void +(*GNUNET_SET_ResultIterator) (void *cls, + const struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status); /** * Iterator for set elements. @@ -239,8 +241,9 @@ * iterated over * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop. */ -typedef int (*GNUNET_SET_ElementIterator) (void *cls, - const struct GNUNET_SET_Element *element); +typedef int +(*GNUNET_SET_ElementIterator) (void *cls, + const struct GNUNET_SET_Element *element); /** Index: src/set/gnunet-service-set.c =================================================================== --- src/set/gnunet-service-set.c (revision 37683) +++ src/set/gnunet-service-set.c (working copy) @@ -67,6 +67,11 @@ struct GNUNET_HashCode app_id; /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + + /** * The type of the operation. */ enum GNUNET_SET_OperationType operation; @@ -229,6 +234,7 @@ GNUNET_MQ_destroy (listener->client_mq); listener->client_mq = NULL; } + GNUNET_CADET_close_port (listener->open_port); GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener); @@ -320,7 +326,8 @@ &gc); } -int + +static int is_excluded_generation (unsigned int generation, struct GenerationRange *excluded, unsigned int excluded_size) @@ -337,7 +344,7 @@ } -int +static int is_element_of_generation (struct ElementEntry *ee, unsigned int query_generation, struct GenerationRange *excluded, @@ -729,7 +736,8 @@ incoming->suggest_id); cmsg->accept_id = htonl (incoming->suggest_id); cmsg->peer_id = incoming->spec->peer; - GNUNET_MQ_send (listener->client_mq, mqm); + GNUNET_MQ_send (listener->client_mq, + mqm); } @@ -1134,6 +1142,99 @@ /** + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + * + * @param cls channel context + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls) +{ + struct Operation *incoming = cls; + + incoming->timeout_task = NULL; + GNUNET_assert (GNUNET_YES == incoming->is_incoming); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Remote peer's incoming request timed out\n"); + incoming_destroy (incoming); +} + + +/** + * Terminates an incoming operation in case we have not yet received an + * operation request. Called by the channel destruction handler. + * + * @param op the channel context + */ +static void +handle_incoming_disconnect (struct Operation *op) +{ + GNUNET_assert (GNUNET_YES == op->is_incoming); + /* channel is already dead, incoming_destroy must not + * destroy it ... */ + op->channel = NULL; + incoming_destroy (op); + op->vt = NULL; +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param initiator peer that started the channel + * @param port Port this channel is for. + * @param options Unused. + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_HashCode *port, + enum GNUNET_CADET_ChannelOption options) +{ + static const struct SetVT incoming_vt = { + .msg_handler = &handle_incoming_msg, + .peer_disconnect = &handle_incoming_disconnect + }; + struct Listener *listener = cls; + struct Operation *incoming; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New incoming channel\n"); + incoming = GNUNET_new (struct Operation); + incoming->is_incoming = GNUNET_YES; + incoming->peer = *initiator; + incoming->channel = channel; + incoming->mq = GNUNET_CADET_mq_create (incoming->channel); + incoming->vt = &incoming_vt; + incoming->timeout_task + = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + incoming); + GNUNET_CONTAINER_DLL_insert_tail (incoming_head, + incoming_tail, + incoming); + // incoming_suggest (incoming, + // listener); + return incoming; +} + + +/** * Called when a client wants to create a new listener. * * @param cls unused @@ -1165,6 +1266,10 @@ GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New listener created (op %u, app %s)\n", listener->operation, @@ -1187,7 +1292,8 @@ incoming_suggest (op, listener); } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); } @@ -1305,6 +1411,7 @@ r); } + /** * Called when a client wants to initiate a set operation with another * peer. Initiates the CADET connection to the listener and sends the @@ -1358,7 +1465,7 @@ op->channel = GNUNET_CADET_channel_create (cadet, op, &msg->target_peer, - GC_u2h (GNUNET_APPLICATION_TYPE_SET), + &msg->app_id, GNUNET_CADET_OPTION_RELIABLE); op->mq = GNUNET_CADET_mq_create (op->channel); set->vt->evaluate (op, @@ -1735,108 +1842,12 @@ /** - * Timeout happens iff: - * - we suggested an operation to our listener, - * but did not receive a response in time - * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST - * - * @param cls channel context - * @param tc context information (why was this task triggered now) - */ -static void -incoming_timeout_cb (void *cls) -{ - struct Operation *incoming = cls; - - incoming->timeout_task = NULL; - GNUNET_assert (GNUNET_YES == incoming->is_incoming); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Remote peer's incoming request timed out\n"); - incoming_destroy (incoming); -} - - -/** - * Terminates an incoming operation in case we have not yet received an - * operation request. Called by the channel destruction handler. - * - * @param op the channel context - */ -static void -handle_incoming_disconnect (struct Operation *op) -{ - GNUNET_assert (GNUNET_YES == op->is_incoming); - /* channel is already dead, incoming_destroy must not - * destroy it ... */ - op->channel = NULL; - incoming_destroy (op); - op->vt = NULL; -} - - -/** - * Method called whenever another peer has added us to a channel the - * other peer initiated. Only called (once) upon reception of data - * with a message type which was subscribed to in - * GNUNET_CADET_connect(). - * - * The channel context represents the operation itself and gets added to a DLL, - * from where it gets looked up when our local listener client responds - * to a proposed/suggested operation or connects and associates with this operation. - * - * @param cls closure - * @param channel new handle to the channel - * @param initiator peer that started the channel - * @param port Port this channel is for. - * @param options Unused. - * @return initial channel context for the channel - * returns NULL on error - */ -static void * -channel_new_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_HashCode *port, - enum GNUNET_CADET_ChannelOption options) -{ - static const struct SetVT incoming_vt = { - .msg_handler = &handle_incoming_msg, - .peer_disconnect = &handle_incoming_disconnect - }; - struct Operation *incoming; - - if (0 != memcmp (GC_u2h (GNUNET_APPLICATION_TYPE_SET), port, sizeof (*port))) - { - GNUNET_break (0); - GNUNET_CADET_channel_destroy (channel); - return NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New incoming channel\n"); - incoming = GNUNET_new (struct Operation); - incoming->is_incoming = GNUNET_YES; - incoming->peer = *initiator; - incoming->channel = channel; - incoming->mq = GNUNET_CADET_mq_create (incoming->channel); - incoming->vt = &incoming_vt; - incoming->timeout_task - = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, - &incoming_timeout_cb, - incoming); - GNUNET_CONTAINER_DLL_insert_tail (incoming_head, - incoming_tail, - incoming); - return incoming; -} - - -/** * Function called whenever a channel is destroyed. Should clean up * any associated state. It must NOT call * GNUNET_CADET_channel_destroy() on the channel. * * The peer_disconnect function is part of a a virtual table set initially either - * when a peer creates a new channel with us (#channel_new_cb()), or once we create + * when a peer creates a new channel with us, or once we create * a new channel ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1881,7 +1892,7 @@ * received via a cadet channel. * * The msg_handler is a virtual table set in initially either when a peer - * creates a new channel with us (channel_new_cb), or once we create a new channel + * creates a new channel with us, or once we create a new channel * ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1999,9 +2010,6 @@ cadet = GNUNET_CADET_connect (cfg, NULL, &channel_end_cb, cadet_handlers); - GNUNET_CADET_open_port (cadet, - GC_u2h (GNUNET_APPLICATION_TYPE_SET), - &channel_new_cb, NULL); if (NULL == cadet) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR,