Page 1 of 1

RX (Reactive Extensions) Part 2 Rate Topic: -----

#1 AdamSpeight2008  Icon User is offline

  • MrCupOfT
  • member icon


Reputation: 2271
  • View blog
  • Posts: 9,498
  • Joined: 29-May 08

Posted 17 March 2011 - 08:49 AM

RX (Reactive Extensions) Part 2


Link to RX Part 1


Windows Forms + RX

Note: This example utilises features of the .net4.0 framework. (In particular Tasks)

Let's update the previous example of an asynchronous downloader, which relays it progress to a textbox and a progressbar.

The WebClient Extensions

To make the coding easier to read and more modular, I've created a few extension methods for the Web Client. They extend to Web Client in to domain of IObservables.

Spoiler

Misc Exts.
The example also use a few miscellaneous extension methods.
Spoiler


The final module is le to encapsulates the downloader function.

Public Module Downloader
  Public Function Download(ByVal URI_ToDL As Uri,
                           ByVal SaveAt As String,
                           Optional ByVal PrB As ProgressBar = Nothing,
                           Optional ByVal Info As TextBox = Nothing) As Threading.Tasks.Task
    '
    ' Creates the task which supervises the downloading.
    ' With it being a task it operates on it own thread.
    Dim oTask As New Threading.Tasks.Task(
      Sub()
        Dim done = False
        Dim wC = New Net.WebClient()
        Dim token As Guid = Guid.NewGuid
        Dim ProgressBarChanger = wC.DownloadProgressChangedAsObservable(token).
         HoldUntilChanged(keySelector:=Function(x)
                                         ' What to hold on, until it changes.
                                         Return x.ProgressPercentage
                                       End Function)._ObjNull(PrB, Function(x) x.ObserveOn(PrB)).
                          Subscribe(onNext:=Sub(ev As Net.DownloadProgressChangedEventArgs)
                                              ' What to do when it happens.
                                              If Info IsNot Nothing Then Info.Text = String.Format("{0} / {1}", ev.BytesReceived, ev.TotalBytesToReceive)
                                              If PrB IsNot Nothing Then PrB.Value = ev.ProgressPercentage
                                            End Sub)

        Dim d = wC.DownloadFileAsObservable(URI_ToDL, SaveAt, token).
          Subscribe(onNext:=Sub(x As Tuple(Of Uri, String))
                              done = True
                            End Sub,
                    onerror:=Sub(x)
                               done = True
                               Console.WriteLine(x.ToString)
                             End Sub)
        ' Let the OS do something else until it done.
        While Not done
          My.Application.DoEvents()
        End While
      End Sub)
    ' Set the task runing
    oTask.RunSynchronously()
    ' Return the task.
    Return oTask
  End Function


End Module





A Simple form with a Button, a ProgressBar and a Textbox.

Imports WebClient_ObservableExtensions.WebClient_Extensions

Public Class Form1

  Private Sub Button1_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button1.Click
      Dim dlt = Downloader.Download(New Uri("http://download.thinkbroadband.com/20MB.zip"), "Test.zip", Me.ProgressBar1, Me.TextBox1)
  End Sub


End Class



Code Dessection

Why isn't a Cross-thread operation not valid exception thrown?

._ObjNull(PrB, Function(x) x.ObserveOn(PrB)).

The ObserveOn extension method provided by RX, instructs the compiler I want the Observation the viewed in this synchronisation Context. In the example it the context of the ProgressBar pointed to via the reference PrB.

The .ObjNull is there, to test if something (a progressbar control) was passed to method.


What's HoldUntilChanged doing?
Let extract it from the example and view in isolation.
HoldUntilChanged(keySelector:=Function(x)
                 ' What to hold on, until it changes.
                   Return x.ProgressPercentage
                 End Function)



The method expects function delegate which tell RX which parameter to use, to discard income events until it changes.
When it does changes the Observable Event is passed on through.

x is the same type as the event. In reality it can be any type but in this case is defined as Net.DownloadProgressChangedEventArgs.

Is This A Good Question/Topic? 2
  • +

Page 1 of 1