`
songry
  • 浏览: 82938 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Clojure-JVM上的函数式编程语言(11)引用类型 作者: R. Mark Volkmann

阅读更多

 原帖地址:http://java.ociweb.com/mark/clojure/article.html#ReferenceTypes

 作者:R. Mark Volkmann

 译者:RoySong

 

引用类型(Reference types)

    引用类型是针对不可变数据的可变引用。在Clojure中有四种引用类型: Vars , Refs , AtomsAgents

它们有很多相同点。

  • 它们都能够包含任意类型的对象。
  • 它们都能够被间接引用,在采取deref函数或者@读取器宏来检索它们中包含的对象时。
  • 它们都支持验证器(validators)-- 当值被改变时调用的函数,如果新的值是合法有效的,验证器就会返回true。否则验证器会返回false或者抛出一个异常。如果验证器仅仅返回false,则包含 "Invalid reference state"信息的IllegalStateException将被抛出。
  • 它们都支持监视器(watchers)Agents。当一个被监视的值引用改变时,对应的Agent就被唤醒。更多地细节,参见"Agents "一节。

    下面的表格简略介绍了四种引用类型的区别,以及能够创建和修改它们的函数。下面表格中的每个函数将在接下来进行

讨论。

Var Ref Atom Agent 用途 创建 修改
同步改变到一个单一的、线程本地(thread-local)的值
同步、调整改变到一个或者多个值 同步改变到一个单一的值 同步改变到一个单一的值
(def name initial-value ) (ref initial-value ) (atom initial-value ) (agent initial-value )
(def name new-value )
sets new root value
(alter-var-root
(var name ) update-fn args )

atomically sets new root value
(set! name new-value )
sets new, thread-local value inside a binding form
(ref-set ref new-value )
must be inside a dosync

(alter ref
update-fn arguments )

must be inside a dosync

(commute ref
update-fn arguments )

must be inside a dosync
(reset! atom new-value )

(compare-and-set! atom current-value new-value )

(swap! atom
update-fn arguments )
(send agent
update-fn arguments )


(send-off agent
update-fn arguments )

 

Vars

    Vars 是一个引用,它能够拥有一个所有线程共享的根绑定(root binding),也能够在每个线程中拥有不同的值

(thread-local)。

 

    创建Var并设置根绑定:

(def name value)
 

    上面的value是可选项,如果在创建时没有指定value,那么创建的这个Var是未绑定的("unbound")。上面

这个form也能够用来改变已存在Var的根绑定。

 

    有两种方式来为已存在的Var创建线程本地绑定:

(binding [name expression] body)
(set! name expression) ; inside a binding that bound the same name
 

    binding 宏的使用之前讨论过。下面的例子展示了它和set!特殊form的结合,改变了曾经绑定了线程本地值的Var的值。

(def v 1)

(defn change-it []
  (println "2) v =" v) ; -> 1

  (def v 2) ; changes root value
  (println "3) v =" v) ; -> 2

  (binding [v 3] ; binds a thread-local value
    (println "4) v =" v) ; -> 3

    (set! v 4) ; changes thread-local value
    (println "5) v =" v)) ; -> 4

  (println "6) v =" v)) ; thread-local value is gone now -> 2

(println "1) v =" v) ; -> 1

(let [thread (Thread. #(change-it))]
  (.start thread)
  (.join thread)) ; wait for thread to finish

(println "7) v =" v) ; -> 2
 

    使用Vars通常很郁闷,因为对它们值的改变不是跨线程协调的。举个例子,线程A使用一个Var的根值,然后在还没有

执行完时就发现另一个线程B对Var的值进行了改动。

 

Refs

    Refs用来保证在多个线程之间改变一个或者多个绑定时能够协调。这种协调是通过Software Transactional Memory (STM)来实现的。Refs仅在事务内才能被改变。

 

    STM的属性跟数据库的事务很相似。在一个STM里面所做的所有改变只有在事务提交的时间点上才会对其他的

线程生效。这使得它同时具备了原子性和隔离性。验证函数使确保产生的改变能够同其他数据的值保持一致成为

可能。

 

    代码执行于事务中通常出现在调用 dosync宏的主体内部,而 dosync宏能够区别事务 。当处于事务内部时,被改变

的Refs拥有一个私有的、线程内(private, in-transaction)的值,这些值不会被其他线程看到直到事务被提交。

 

    如果在事务结束之前都没有抛出异常,在事务内对Refs做出的改变就会被提交。这代表着线程内的改变就会在

线程外生效了。

 

    如果线程内任意代码执行时抛出了异常,包括从验证函数中抛出了异常,事务都会回滚。这代表着事务内的改变

都会被丢弃掉。

 

    当在一个已启动的线程中试图访问或者修改被其他线程修改并提交过的Ref,则会产生冲突,当前线程就会

重试(Retry)。这代表着线程中所有的改动都会被丢弃,并回到dosync主体起点从头开始。实际上无法确定线程

在什么时候会侦测到冲突,什么时候会重试,仅仅知道当线程侦测到冲突后就会执行重试。

 

    重要的是,在线程中执行的代码可以远离副作用(side effects),它可能由于重试而运行多次。支持调用拥有

副作用函数的一种方式是将执行中的调用放入到线程里的agents中,这些将被保留直到线程结束。如果线程提交则

不管之前发生过多少次重试,只会将当前的执行行为发送出去。如果线程回滚了,则执行行为不会被发送出去。

 

    ref函数创造一个新的Ref对象。创建一个Ref并保持对它的权限的一种方式是采用def特殊form:

(def name (ref value))
 

    dosync宏创建一个线程,这个线程持续到宏主体内的所有表达式求值完毕为止。 ref-set函数改变一个Ref的线程中

值并返回它,ref-set函数必须在某个线程内部被调用,否则会抛出一个 IllegalStateException。它做出的改变

仅仅在线程提交后才是可见的。这会发生在dosync结束,没抛出任何异常,线程提交后。举个例子:

(dosync
  ...
  (ref-set name new-value)
  ...)
 

    如果新的值需要从旧的值中计算得来,那么就需要以下三步:

    1、根据Ref来获取旧的值

    2、计算出新的值

    3、保存新的值

 

    alter和 commute函数能够执行这三步如同一个简单操作一般。 alter函数用在改变必须依从一个特定顺序的场景。 commute

函数被用在改变发生的次序并不重要的情况下(也就是说,交替执行),并且支持改变平行执行。跟ref-set一样,这两个

函数也只能在某个事务内部调用。它们俩都接受一个用于计算新值的“更新函数”("update function")做为第二个参数,

然后再接收一些可选的额外参数,这些参数将被传递给“更新函数”。这个“更新函数”将接收Ref的当前线程值以及额外的参数,

如果有的话。无论什么时候新值是基于旧值(用一个函数来计算旧值)的情况,比起ref-set来alter或者 commute是更好的

选择。

 

    举个例子,假设我们要对一个名为counter的Ref值加一,这会向如下代码一样实现,采用inc做为更新函数:

(dosync
  ...
  (alter counter inc)
  ; or as
  (commute counter inc)
  ...)
 

    如果一个alter函数在当前线程开始时去试图修改一个已经被其他线程修改过的Ref,当前线程会从开始处重试。调用

commute能够阻止这种情形,调用在当前线程为Ref使用线程内值时进行。这样性能可以得到提升因为没有发生重试。记住

commute仅仅能够应用于跨越多线程的更新操作次序并不重要的场合。

 

    如果事务提交了,那么线程内会针对commute调用做出一些额外的事情。对于每个 commute调用来说,它们设值的Ref会

采用下面的调用结果重新设值:

(apply update-function last-committed-value-of-ref args)
 

    注意Ref最后提交的值将被传给更新函数。这个值有可能是在当前线程开始后其他线程提交的值,而并非当前线程中Ref

的线程中值。

 

    采用commute来代替 alter是一种优化。它不会产生不同的结束结果除非更新的次序也很重要。

 

    让我们看一个例子同时采用了Refs和Atoms(将在接下来讨论到),这个例子包含了银行账户和对应的线程。首先我们

定义数据模型:

(ns com.ociweb.bank)

; Assume the only account data that can change is its balance.
(defstruct account-struct :id :owner :balance-ref)

; We need to be able to add and delete accounts to and from a map.
; We want it to be sorted so we can easily
; find the highest account number
; for the purpose of assigning the next one.
(def account-map-ref (ref (sorted-map)))
 

    接下来的代码在账户map中创建了一个新的账户,并返回它:

(defn open-account
  "creates a new account, stores it in the account map and returns it"
  [owner]
  (dosync ; required because a Ref is being changed
    (let [account-map @account-map-ref
          last-entry (last account-map)
          ; The id for the new account is one higher than the last one.
          id (if last-entry (inc (key last-entry)) 1)
          ; Create the new account with a zero starting balance.
          account (struct account-struct id owner (ref 0))]
      ; Add the new account to the map of accounts.
      (alter account-map-ref assoc id account)
      ; Return the account that was just created.
      account)))
 

    下面的函数支持从账户中存取钱:

(defn deposit [account amount]
  "adds money to an account; can be a negative amount"
  (dosync ; required because a Ref is being changed
    (Thread/sleep 50) ; simulate a long-running operation
    (let [owner (account :owner)
          balance-ref (account :balance-ref)
          type (if (pos? amount) "deposit" "withdraw")
          direction (if (pos? amount) "to" "from")
          abs-amount (Math/abs amount)]
      (if (>= (+ @balance-ref amount) 0) ; sufficient balance?
        (do
          (alter balance-ref + amount)
          (println (str type "ing") abs-amount direction owner))
        (throw (IllegalArgumentException.
                 (str "insufficient balance for " owner
                      " to withdraw " abs-amount)))))))

(defn withdraw
  "removes money from an account"
  [account amount]
  ; A withdrawal is like a negative deposit.
  (deposit account (- amount)))
 

    接下来的函数支持从一个账户往另一个账户转账。线程通过dosync开始以保证存取动作都发生或者都不发生。

(defn transfer [from-account to-account amount]
  (dosync
    (println "transferring" amount
             "from" (from-account :owner)
             "to" (to-account :owner))
    (withdraw from-account amount)
    (deposit to-account amount)))
 

    下面的函数支持汇报当前账户的状态。线程通过dosync开始以保证账户汇报的一致性。比如说,它不会汇报

转账中的余额。

(defn- report-1 ; a private function
  "prints information about a single account"
  [account]
  ; This assumes it is being called from within
  ; the transaction started in report.
  (let [balance-ref (account :balance-ref)]
    (println "balance for" (account :owner) "is" @balance-ref)))

(defn report
  "prints information about any number of accounts"
  [& accounts]
  (dosync (doseq [account accounts] (report-1 account))))
 

    上述代码并未处理已开始线程中产生的异常。做为代替,我们在当前线程中为这些异常定义一个异常处理器。

; Set a default uncaught exception handler
; to handle exceptions not caught in other threads.
(Thread/setDefaultUncaughtExceptionHandler
  (proxy [Thread$UncaughtExceptionHandler] []
    (uncaughtException [thread throwable]
      ; Just print the message in the exception.
      (println (.. throwable .getCause .getMessage)))))
 

    然后我们可以试着执行一下上面定义的所有函数:

(let [a1 (open-account "Mark")
      a2 (open-account "Tami")
      thread (Thread. #(transfer a1 a2 50))]
  (try
    (deposit a1 100)
    (deposit a2 200)

    ; There are sufficient funds in Mark's account at this point
    ; to transfer $50 to Tami's account.
    (.start thread) ; will sleep in deposit function twice!

    ; Unfortunately, due to the time it takes to complete the transfer
    ; (simulated with sleep calls), the next call will complete first.
    (withdraw a1 75)

    ; Now there are insufficient funds in Mark's account
    ; to complete the transfer.

    (.join thread) ; wait for thread to finish
    (report a1 a2)
    (catch IllegalArgumentException e
      (println (.getMessage e) "in main thread"))))
 

    得到的输出结果如下:

depositing 100 to Mark
depositing 200 to Tami
transferring 50 from Mark to Tami
withdrawing 75 from Mark
transferring 50 from Mark to Tami (a retry)
insufficient balance for Mark to withdraw 50
balance for Mark is 25
balance for Tami is 200

 验证函数(validation function)

    在继续讨论其他的引用类型之前,我们先看看验证函数。下面是一个验证所有指定给Ref的值是否Integer的验证函数:

; Note the use of the :validator directive when creating the Ref
; to assign a validation function which is integer? in this case.
(def my-ref (ref 0 :validator integer?))

(try
  (dosync
    (ref-set my-ref 1) ; works

    ; The next line doesn't work, so the transaction is rolled back
    ; and the previous change isn't committed.
    (ref-set my-ref "foo"))
  (catch IllegalStateException e
    ; do nothing
    ))

(println "my-ref =" @my-ref) ; due to validation failure -> 0

 Atoms

    Atoms提供了一种比联合Ref和STM更简单的机制来更新单一值。它们不会受到线程的影响。

 

    有三个函数能够改变Atom的值:reset! , compare-and-set!和 swap!。

 

    reset!函数能够直接重设Atom的值为新值,而不管原来的值是什么。例子如下:

(def my-atom (atom 1))
(reset! my-atom 2)
(println @my-atom) ; -> 2
 

    compare-and-set!函数接收三个参数,其一是预计要修改的Atom,其二是预计的Atom当前值,另一个是预期的新值。如果

Atom的当前值和第二个参数预计值相同,则将新值赋给Atom并返回true。否则则不改变Atom的值并返回false。这个机制

的用处体现在能够避免在特定点间接引用的其他代码改变了Atom的值。

 

    compare-and-set!函数通常用在代码的结束处,代码的开头通常是对Atom间接引用值的绑定。而在中间的代码能够产生两

结果。一是在中间的代码执行直到完毕,Atom的值没有被改变,而到了最后compare-and-set!函数改变了Atom的值。

另一个 结果是在中间的代码还没有执行完毕时,就有其他的代码改变了Atom的值,那么compare-and-set!函数就不会在

代码末尾改变Atom的值,而是返回false。例子如下:

(def my-atom (atom 1))

(defn update-atom []
  (let [curr-val @my-atom]
    (println "update-atom: curr-val =" curr-val) ; -> 1
    (Thread/sleep 50) ; give reset! time to run
    (println
      (compare-and-set! my-atom curr-val (inc curr-val))))) ; -> false

(let [thread (Thread. #(update-atom))]
  (.start thread)
  (Thread/sleep 25) ; give thread time to call update-atom
  (reset! my-atom 3) ; happens after update-atom binds curr-val
  (.join thread)) ; wait for thread to finish

(println @my-atom) ; -> 3
 

    为什么这段代码的输出是3?update-atom函数在 reset!函数之前被另一个线程调用了,它获取到Atom的当前值1.

然后它进入休眠让reset!函数有时间去执行。在那之后,Atom的值为3.当 update-atom函数调用 compare-and-set!来

让Atom的值增加时,会失败因为Atom的当前值已经不是原本的1了。这代表着Atom当前的保持值为3.

 

    swap!函数的第一个参数是目标Atom,然后是一个用来计算Atom新值的函数,然后是任意数量的参数用来传给

计算新值 函数。计算新值 函数被调用时传入的是目标Atom的当前值以及如果存在的任意数量的参数。swap!函数

本质上是 compare-and-set!的一个包装器,除了一个重要的不同。它 首先 对Atom取值并保存Atom的当前值。然后

它调用函数来计算一个新值。最后它调用compare-and-set!并把第一步保存的值传入。如果 compare-and-set!

返回false,说明Atom的值不同于调用函数前的值,然后swap!函数会被重复调用直到检查通过为止。这就是那个

重要的不同。上面的代码采用swap!和compare-and-set!来写的例子如下:

(def my-atom (atom 1))

(defn update-atom [curr-val]
  (println "update-atom: curr-val =" curr-val)
  (Thread/sleep 50) ; give reset! time to run
  (inc curr-val))

(let [thread (Thread. #(swap! my-atom update-atom))]
  (.start thread)
  (Thread/sleep 25) ; give swap! time to call update-atom
  (reset! my-atom 3)
  (.join thread)) ; wait for thread to finish

(println @my-atom) ; -> 4
 

    为什么输出的结果是4?在reset!函数调用之前,另一个线程调用了 swap!函数。当在 swap!函数中调用了 update-atom

之后,Atom的当前值是1.但是,由于sleep调用,update-atom函数无法调用结束,直到 reset!调用结束,将Atom的

值设为3.然后update-atom函数返回2.在 swap!函数将Atom的值设为2之前,它会检查Atom的当前值是否仍然是1.它现在

不是,所以swap!函数再次调用 update-atom函数。这次Atom的当前值是3,所以它增加了1并返回4.现在 swap!函数

验证当前值成功了,并将Atom设值为4.

 

Agents

    Agents被用来运行相互之间无须协调的分离线程中的任务。它对于改变一个作为某个Agent值的单一对象的状态来说

非常有用。这个值通过在单独线程中运行某个”行为“("action")来改变。这个行为是一个函数,接收Agent的当前值

作为参数,并可以接收其他的一些可选参数。对于指定的Agent,一次只能运行一个行为。

 

    agent函数创建了一个新的Agent,如下:

(def my-agent (agent initial-value))
 

    send函数会为Agent分发一个行为,并且不等行为执行完毕就迅速返回了。而行为在提供的线程池中某个线程内运行。

当行为执行完毕时,它的返回值会被赋给Agent。send-off函数也具备同样的功能,不过它采用的线程来源于另一个

线程池。

 

    send函数采用的线程池是 "fixed thread pool" (参见java.util.concurrent.Executors中的

newFixedThreadPool 方法),池中线程的数量是处理器的数目加二。如果所有线程都被占用,行为就会等待直到

某个线程空闲出来。send-off函数采用的线程池是 "cached thread pool"(参见 java.util.concurrent.Executors

中的 newCachedThreadPool 方法),在这个线程池中,如果已有的线程都被使用,则会另外新增一些线程出来

以供使用。

 

    如果send或者 send-off函数是从一个线程内部来调用的,行为不会被发送直到线程提交。 就在线程提交时

决定值保存的调用才发生这个意义而言,这跟在Ref上调用 commute 函数的情形有些像。

 

    在行为内部,被操作的Agent会绑定在符号*agent*上面。

 

    await函数能够接收任意数目的Agent并阻塞当前线程直到线程中分发给Agent的所有行为都执行完毕。 await-for函数

也具备同样的功能,不过它会接收一个毫秒数代表超时时间作为第一个参数。如果行为在超时时间之前结束,则返回

非nil值,否则,返回nil。await和 await-for函数不能从线程内部调用。

 

    如果一个行为抛出异常,那么引用它的Agent也会抛出一个异常,所有行为抛出的异常都会被发送给对应的Agent,

如果要取回可以采用agent-errors函数。 clear-agent-errors函数可以清除指定Agent的异常集合。(译者注:在Clojure

1.2中,agent-errors和 clear-agent-errors函数已经被废弃,可以采用 agent-error和restart-agent函数来代替。)

 

    shutdown-agents函数会等待Agent中所有在运行的行为完毕后,才停止Agent所使用的线程池中的所有线程。在这个

函数被调用后,就不能为Agent发起新的行为了。对shutdown-agents函数的调用是有必要的,这样才能让JVM 有序

地退出,因为Agent所使用的线程并非守护线程。

 

watchers

注意:这章的内容需要更新,因为Clojure1.1所做出的改变。add-watcher和 remove-watcher函数已经被移除了。而

add-watch和 remove-watch函数被添加进来,它们拥有不同的实现机制。

 

    Agent可以担当Watcher,另外一钟引用类型。当被观察的引用值改变时,会向Agent发送一个行为来唤醒它。

发送函数的类型,send或者 send-off,会在 watcher注册为引用对象时被定义。行为函数会接收到Agent的当前值

(并非被改变的引用对象的值)和状态被改变的引用对象本身。行为函数的返回值会成为Agent的新值。

 

    像之前规定的那样,函数式编程语言强调采用“纯函数”,因为它没有副作用比如改变全局状态。Clojure并不会

阻止函数改变全局状态,但是支持很轻易地找到改变全局状态的函数。一种方法是查找所有的函数和宏的源代码,

找到调用了某个能够改变全局状态的函数(比如:alter)的地方。另一种方法是采用watchers来检测改变。一个

watchers能够打印出一个堆栈踪迹来标识出是哪个函数改变了状态。

 

    下面的例子为一个Var,一个Ref,一个Atom分别注册了一个watcher Agent。这个watcher Agent的状态

是一个map,用来记录每个引用被观测到改变的次数。这个map的key是引用对象,而value是改变次数。

(def my-watcher (agent {}))

(defn my-watcher-action [current-value reference]
  (let [change-count-map current-value
        old-count (change-count-map reference)
        new-count (if old-count (inc old-count) 1)]
  ; Return an updated map of change counts
  ; that will become the new value of the Agent.
  (assoc change-count-map reference new-count)))

(def my-var "v1")
(def my-ref (ref "r1"))
(def my-atom (atom "a1"))

(add-watcher (var my-var) :send-off my-watcher my-watcher-action)
(add-watcher my-ref :send-off my-watcher my-watcher-action)
(add-watcher my-atom :send-off my-watcher my-watcher-action)

; Change the root binding of the Var in two ways.
(def my-var "v2")
(alter-var-root (var my-var) (fn [curr-val] "v3"))

; Change the Ref in two ways.
(dosync
  ; The next line only changes the in-transaction value
  ; so the watcher isn't notified.
  (ref-set my-ref "r2")
  ; When the transaction commits, the watcher is
  ; notified of one change this Ref ... the last one.
  (ref-set my-ref "r3"))
(dosync
  (alter my-ref (fn [_] "r4"))) ; And now one more.

; Change the Atom in two ways.
(reset! my-atom "a2")
(compare-and-set! my-atom @my-atom "a3")

; Wait for all the actions sent to the watcher Agent to complete.
(await my-watcher)

; Output the number of changes to
; each reference object that was watched.
(let [change-count-map @my-watcher]
  (println "my-var changes =" (change-count-map (var my-var))) ; -> 2
  (println "my-ref changes =" (change-count-map my-ref)) ; -> 2
  (println "my-atom changes =" (change-count-map my-atom))) ; -> 2

(shutdown-agents)
 

附注:我的翻译一直都很烂,其中这章是最烂的,很多概念不知道怎么合理表述,很多概念估计我自己的理解也有问题,

预计在未来进行一次大的修正。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics