src/main/scala/CouchStream.scala
author Tomas Zeman <tzeman@volny.cz>
Thu, 05 May 2016 16:26:07 +0200
changeset 5 141e76d946e3
parent 4 b25c02bd6b11
permissions -rw-r--r--
CouchStream: process entity as UTF-8
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     1
package couchdb.changes
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     2
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     3
import akka.actor.{ActorRef, Actor, ActorLogging, Status}
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     4
import akka.io.IO
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     5
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     6
import scala.concurrent.ExecutionContext.Implicits.global
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     7
import scala.language.postfixOps
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     8
import scala.language.reflectiveCalls
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     9
import scala.util.Try
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    10
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    11
import spray.can.Http
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    12
import spray.can.Http.ConnectionAttemptFailedException
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    13
import spray.client.pipelining._
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    14
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    15
import spray.http._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    16
import spray.json._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    17
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    18
case class ResultRow(seq: Int, id: String, doc: JsObject)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    19
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    20
trait ChangesMarshaller {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    21
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    22
  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    23
    import DefaultJsonProtocol._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    24
    implicit val changesFormat = jsonFormat3(ResultRow)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    25
5
141e76d946e3 CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents: 4
diff changeset
    26
    def apply(entity: HttpEntity): Deserialized[ResultRow] = Try {
141e76d946e3 CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents: 4
diff changeset
    27
      Right(JsonParser(entity.asString(HttpCharsets.`UTF-8`)).asJsObject.
141e76d946e3 CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents: 4
diff changeset
    28
        convertTo[ResultRow])
141e76d946e3 CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents: 4
diff changeset
    29
    } getOrElse Left(MalformedContent("bad json"))
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    30
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    31
  }
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    32
}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    33
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    34
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    35
  with ChangesMarshaller with settings {
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    36
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    37
  val io = IO(Http)(context.system)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    38
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    39
  def config = context.system.settings.config
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    40
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    41
  def receive: Receive = {
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    42
    case Start =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    43
      val uri = Uri(s"${couchConf.url}/_changes").withQuery(
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    44
          "feed" -> "continuous"
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    45
        , "heartbeat" -> couchConf.heartbeat.toMillis.toString
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    46
        , "include_docs" -> "true"
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    47
      )
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    48
      val rq = HttpRequest(HttpMethods.GET, uri = uri)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    49
      log.debug("Sending request {}", rq)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    50
      sendTo(io).withResponsesReceivedBy(self)(rq)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    51
    case x@ChunkedResponseStart(_) =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    52
      log.debug("Received {}", x)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    53
    case MessageChunk(data, _) =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    54
      log.debug("Received data chunk {}", data)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    55
      if (data.length > 2) // skip heartbeat messages
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    56
        ChangesMarshaller(data).fold(
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    57
          _ => log.warning("Failed to deserialize entity {}", data),
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    58
          processor !
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    59
        )
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    60
    case Status.Failure(t) =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    61
      log.error("Received failure {}", t)
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    62
      t match {
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    63
        case x:ConnectionAttemptFailedException =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    64
          log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    65
          context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    66
            Start)
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    67
        case _ =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    68
      }
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    69
    case x =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    70
      log.debug("Received {}", x)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    71
  }
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    72
}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    73
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    74
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    75
// vim: set ts=2 sw=2 et: