官术网_书友最值得收藏!

4.5 打通Spark系統運行內幕機制循環流程

Spark通過DAGScheduler面向整個Job劃分出了不同的Stage,劃分Stage之后,Stage從后往前劃分,執行的時候從前往后執行,每個Stage內部有一系列的任務,Stage里面的任務是并行計算,并行任務的邏輯是完全相同的,但處理的數據不同。DAGScheduler以TaskSet的方式,把一個DAG構建的Stage中的所有任務提交給底層的調度器TaskScheduler。TaskScheduler是一個接口,與具體的任務解耦合,可以運行在不同的調度模式下,如可運行在Standalone模式,也可運行在Yarn上。

Spark基礎調度(圖4-6)包括RDD Objects、DAGScheduler、TaskScheduler、Worker等內容。

圖4-6 Spark基礎調度圖

DAGScheduler在提交TaskSet給底層調度器的時候是面向接口TaskScheduler的,這符合面向對象中依賴抽象而不依賴具體的原則,帶來底層資源調度器的可插拔性,導致Spark可以運行在眾多的資源調度器模式上,如Standalone、Yarn、Mesos、Local、EC2、其他自定義的資源調度器;在Standalone的模式下我們聚焦于TaskSchedulerImpl。

TaskScheduler是一個接口Trait,底層任務調度接口,由[org.apache.spark.scheduler. TaskSchedulerImpl]實現。這個接口允許插入不同的任務調度程序。每個任務調度器在單獨的SparkContext中調度任務。任務調度程序從每個Stage的DAGScheduler獲得提交的任務集,負責發送任務到集群運行,如果任務運行失敗,將重試,返回DAGScheduler事件。

Spark 2.1.1版本的TaskScheduler.scala的源碼如下。

1.    private[spark] trait TaskScheduler {
2.
3.   private val appId = "spark-application-" + System.currentTimeMillis
4.
5.   def rootPool: Pool
6.
7.   def schedulingMode: SchedulingMode
8.
9.   def start(): Unit
10.
11.  //成功初始化后調用(通常在Spark上下文中)。Yarn使用這個來引導基于優先位置的資源
     //分配,等待從節點登記等
12.  def postStartHook() { }
13.
14.  //從群集斷開連接
15.  def stop(): Unit
16.
17.  //提交要運行的任務序列
18.  def submitTasks(taskSet: TaskSet): Unit
19.
20.  //取消Stage
21.  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
22.
23.  //系統為upcalls設置DAG調度,這是保證在submitTasks被調用前被設置
24.  def setDAGScheduler(dagScheduler: DAGScheduler): Unit
25.
26.  //獲取集群中使用的默認并行級別,作為對作業的提示
27.  def defaultParallelism(): Int
28.
29.  /**
       * 更新正運行任務,讓master 知道BlockManager 仍活著。如果driver 知道給定的
       * 塊管理器,則返回true;否則,返回false,指示塊管理器應重新注冊
30.    */
31.
32.  def executorHeartbeatReceived(
33.      execId: String,
34.      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
35.      blockManagerId: BlockManagerId): Boolean
36.
37.  /**
38.    *獲取與作業相關聯的應用程序ID
39.    * @return An application ID
40.    */
41.   def applicationId(): String = appId
42.
43.   /**
        *處理丟失的 executor
44.     */
45.   def executorLost(executorId: String, reason: ExecutorLossReason): Unit
46.
47.   /**
48.     *獲取與作業相關聯的應用程序的嘗試ID
49.     *
50.     * @return應用程序的嘗試ID
51.     */
52.   def applicationAttemptId(): Option[String]
53.
54. }

Spark 2.2.0版本的TaskScheduler.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第21行之后新增加了killTaskAttempt方法。

1.  ......
2.  /**
3.    *殺死任務嘗試
4.    *
5.    * @return任務是否成功被殺死
6.    */
7.    def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason:
      String): Boolean
8.  .......

DAGScheduler把TaskSet交給底層的接口TaskScheduler,具體實現時有不同的方法。TaskScheduler主要由TaskSchedulerImpl實現。

TaskSchedulerImpl也有自己的子類YarnScheduler。

1.     private[spark] class YarnScheduler(sc: SparkContext) extends
       TaskSchedulerImpl(sc) {
2.
3.    //RackResolver 記錄INFO日志信息時,解析rack的信息
4.    if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
5.      Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
6.    }
7.
8.    //默認情況下,rack是未知的
9.    override def getRackForHost(hostPort: String): Option[String] = {
10.     val host = Utils.parseHostPort(hostPort)._1
11.     Option(RackResolver.resolve(sc.hadoopConfiguration, host).
        getNetworkLocation)
12.   }
13. }

YarnScheduler的子類YarnClusterScheduler實現如下。

1.  private[spark] class YarnClusterScheduler(sc: SparkContext) extends
    YarnScheduler(sc) {
2.  logInfo("Created YarnClusterScheduler")
3.
4.  override def postStartHook() {
5.      ApplicationMaster.sparkContextInitialized(sc)
6.      super.postStartHook()
7.      logInfo("YarnClusterScheduler.postStartHook done")
8.    }
9.
10. }

默認情況下,我們研究Standalone的模式,所以主要研究TaskSchedulerImpl。DAGScheduler把TaskSet交給TaskScheduler,TaskScheduler中通過TastSetManager管理具體的任務。TaskScheduler的核心任務是提交TaskSet到集群運算,并匯報結果。

 為TaskSet創建和維護一個TaskSetManager,并追蹤任務的本地性以及錯誤信息。

 遇到延后的Straggle任務,會放到其他節點重試。

 向DAGScheduler匯報執行情況,包括在Shuffle輸出lost的時候報告fetch failed錯誤等信息。

TaskSet是一個普通的類,第一個成員是tasks,tasks是一個數組。TaskSet的源碼如下。

1.     private[spark] class TaskSet(
2.      val tasks: Array[Task[_]],
3.      val stageId: Int,
4.      val stageAttemptId: Int,
5.      val priority: Int,
6.      val properties: Properties) {
7.    val id: String = stageId + "." + stageAttemptId
8.
9.    override def toString: String = "TaskSet " + id
10. }

TaskScheduler內部有SchedulerBackend,SchedulerBackend管理Executor資源。從Standalone的模式來講,具體實現是StandaloneSchedulerBackend(Spark 2.0版本將之前的AppClient名字更新為StandaloneAppClient)。

SchedulerBackend本身是一個接口,是一個trait。SchedulerBackend的源碼如下。

1.   private[spark] trait SchedulerBackend {
2.    private val appId = "spark-application-" + System.currentTimeMillis
3.
4.    def start(): Unit
5.    def stop(): Unit
6.    def reviveOffers(): Unit
7.    def defaultParallelism(): Int
8.
9.    def killTask(taskId: Long, executorId: String, interruptThread:
      Boolean): Unit =
10.     throw new UnsupportedOperationException
11.   def isReady(): Boolean = true
12.
13.   /**
14.     *獲取與作業關聯的應用ID
15.     *
16.     * @return 應用程序 ID
17.     */
18.   def applicationId(): String = appId
19.
20.   /**
21.     *如果集群管理器支持多個嘗試,則獲取此運行的嘗試ID,應用程序運行在客戶端模式將沒
        *有嘗試ID
22.     *
23.     * @return如果可用,返回應用程序嘗試ID
24.     */
25.   def applicationAttemptId(): Option[String] = None
26.
27.   /**
28.     *得到driver 日志的URL。這些URL是用來在用戶界面中顯示鏈接driver的Executors
        *選項卡
29.     *
30.     * @return Map 包含日志名稱和URLs
31.     */
32.   def getDriverLogUrls: Option[Map[String, String]] = None
33.
34. }

StandaloneSchedulerBackend:專門負責收集Worker的資源信息。接收Worker向Driver注冊的信息,ExecutorBackend啟動的時候進行注冊,為當前應用程序準備計算資源,以進程為單位。

StandaloneSchedulerBackend的源碼如下。

1.   private[spark] class StandaloneSchedulerBackend(
2.      scheduler: TaskSchedulerImpl,
3.      sc: SparkContext,
4.      masters: Array[String])
5.    extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
6.    with StandaloneAppClientListener
7.    with Logging {
8.    private var client: StandaloneAppClient = null
9.  ......

StandaloneSchedulerBackend里有一個Client: StandaloneAppClient。

1.  private[spark] class StandaloneAppClient(
2.    rpcEnv: RpcEnv,
3.    masterUrls: Array[String],
4.    appDescription: ApplicationDescription,
5.    listener: StandaloneAppClientListener,
6.    conf: SparkConf)
7.  extends Logging {

StandaloneAppClient允許應用程序與Spark standalone集群管理器通信。獲取Master的URL、應用程序描述和集群事件監聽器,當各種事件發生時可以回調監聽器。masterUrls的格式為spark://host:port,StandaloneAppClient需要向Master注冊。

StandaloneAppClient在StandaloneSchedulerBackend.scala的start方法啟動時進行賦值,用new()函數創建一個StandaloneAppClient。

Spark 2.1.1版本的StandaloneSchedulerBackend.scala的源碼如下。

1.    private[spark] class StandaloneSchedulerBackend(
2.  ......
3.
4.  override def start() {
5.  ......
6.   val appDesc = new ApplicationDescription(sc.appName, maxCores,
     sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec,
     coresPerExecutor, initialExecutorLimit)
7.     client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this,
       conf)
8.      client.start()
9.      launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
10.     waitForRegistration()
11.     launcherBackend.setState(SparkAppHandle.State.RUNNING)
12.   }

Spark 2.2.0版本的StandaloneSchedulerBackend.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第6行ApplicationDescription傳入的第5個參數appUIAddress更改為webUrl。

1.    ......
2.  val appDesc = ApplicationDescription(sc.appName, maxCores, sc.
    executorMemory, command, webUrl, sc.eventLogDir,sc.eventLogCodec,
          coresPerExecutor, initialExecutorLimit)
3.  .....

StandaloneAppClient.scala中,里面有一個類是ClientEndpoint,核心工作是在啟動時向Master注冊。StandaloneAppClient的start方法啟動時,就調用new函數創建一個ClientEndpoint。

StandaloneAppClient的源碼如下。

1.      private[spark] class StandaloneAppClient(
2.  ......
3.   private class ClientEndpoint(override val rpcEnv: RpcEnv) extends
     ThreadSafeRpcEndpoint
4.      with Logging {
5.  ......
6.  def start() {
7.      //啟動 rpcEndpoint; 將回調listener
8.      endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
        (rpcEnv)))
9.    }

StandaloneSchedulerBackend在啟動時構建StandaloneAppClient實例,并在StandaloneAppClient實例start時啟動了ClientEndpoint消息循環體。ClientEndpoint在啟動時會向Master注冊當前程序。

StandaloneAppClient中ClientEndpoint類的onStart()方法如下。

1.   override def onStart(): Unit = {
2.     try {
3.       registerWithMaster(1)
4.     } catch {
5.       case e: Exception =>
6.         logWarning("Failed to connect to master", e)
7.         markDisconnected()
8.         stop()
9.     }
10.  }

這是StandaloneSchedulerBackend的第一個注冊的核心功能。StandaloneSchedulerBackend繼承自CoarseGrainedSchedulerBackend。而CoarseGrainedSchedulerBackend在啟動時就創建DriverEndpoint,從實例的角度講,DriverEndpoint也屬于StandaloneSchedulerBackend實例。

1.     private[spark]
2.  class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
    rpcEnv: RpcEnv)
3.    extends ExecutorAllocationClient with SchedulerBackend with Logging
4.  {
5.  ......
6.   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties:
     Seq[(String, String)])
7.      extends ThreadSafeRpcEndpoint with Logging {
8.  ......

StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型為DriverEndpoint(這就是我們程序運行時的經典對象Driver)的消息循環體。StandaloneSchedulerBackend在運行時向Master注冊申請資源,當Worker的ExecutorBackend啟動時會發送RegisteredExecutor信息向DriverEndpoint注冊,此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源,TaskScheduler就是通過StandaloneSchedulerBackend擁有的計算資源來具體運行Task的;StandaloneSchedulerBackend不是應用程序的總管,應用程序的總管是DAGScheduler、TaskScheduler,StandaloneSchedulerBackend向應用程序的Task分配具體的計算資源,并把Task發送到集群中。

SparkContext、DAGScheduler、TaskSchedulerImpl、StandaloneSchedulerBackend在應用程序啟動時只實例化一次,應用程序存在期間始終存在這些對象。

這里基于Spark 2.2版本講解:

Spark調度器三大核心資源:SparkContext、DAGScheduler、TaskSchedulerImpl,TaskSchedulerImpl作為具體的底層調度器,運行時需要計算資源,因此需要StandaloneSchedulerBackend。StandaloneSchedulerBackend設計巧妙的地方是啟動時啟動StandaloneAppClient,而StandaloneAppClient在start時有一個ClientEndpoint的消息循環體,ClientEndpoint的消息循環體啟動的時候向Master注冊應用程序。

StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start啟動的時候會實例化DriverEndpoint,所有的ExecutorBackend啟動的時候都要向DriverEndpoint注冊,注冊最后落到了StandaloneSchedulerBackend的內存數據結構中,表面上看是在CoarseGrainedSchedulerBackend,但是實例化的時候是StandaloneSchedulerBackend,注冊給父類的成員其實就是子類的成員。

作為前提問題:TaskScheduler、StandaloneSchedulerBackend是如何啟動的?TaskSchedulerImpl是什么時候實例化的?

TaskSchedulerImpl是在SparkContext中實例化的。在SparkContext類實例化的時候,只要不是方法體里面的內容,都會被執行,(sched, ts)是SparkContext的成員,將調用createTaskScheduler方法。調用createTaskScheduler方法返回一個Tuple,包括兩個元素:sched是我們的schedulerBackend;ts是taskScheduler。

1. class SparkContext(config: SparkConf) extends Logging {
2.  ......
3. //創建啟動調度器 scheduler
4.    val (sched, ts) = SparkContext.createTaskScheduler(this, master,
      deployMode)
5.    _schedulerBackend = sched
6.    _taskScheduler = ts
7.    _dagScheduler = new DAGScheduler(this)
8.    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

createTaskScheduler里有很多運行模式,這里關注Standalone模式,首先調用new()函數創建一個TaskSchedulerImpl,TaskSchedulerImpl和SparkContext是一一對應的,整個程序運行的時候只有一個TaskSchedulerImpl,也只有一個SparkContext;接著實例化StandaloneSchedulerBackend,整個程序運行的時候只有一個StandaloneSchedulerBackend。createTaskScheduler方法如下。

1.      private def createTaskScheduler(
2.        sc: SparkContext,
3.        master: String,
4.        deployMode: String): (SchedulerBackend, TaskScheduler) = {
5.      import SparkMasterRegex._
6.  ......
7.    master match {
8.  ......
9.        case SPARK_REGEX(sparkUrl) =>
10.         val scheduler = new TaskSchedulerImpl(sc)
11.         val masterUrls = sparkUrl.split(",").map("spark://" + _)
12.         val   backend    =  new   StandaloneSchedulerBackend(scheduler,  sc,
            masterUrls)
13.         scheduler.initialize(backend)
14.         (backend, scheduler)
15. ......

在SparkContext實例化的時候通過createTaskScheduler來創建TaskSchedulerImpl和StandaloneSchedulerBackend。然后在createTaskScheduler中調用scheduler.initialize(backend)。

initialize的方法參數把StandaloneSchedulerBackend傳進來,schedulingMode模式匹配有兩種方式:FIFO、FAIR。

initialize的方法中調用schedulableBuilder.buildPools()。buildPools方法根據FIFOSchedulableBuilder、FairSchedulableBuilder不同的模式重載方法實現。

1.   private[spark] trait SchedulableBuilder {
2.    def rootPool: Pool
3.
4.    def buildPools(): Unit
5.
6.    def addTaskSetManager(manager: Schedulable, properties: Properties):
      Unit
7.  }

initialize的方法把StandaloneSchedulerBackend傳進來了,但還沒有啟動StandaloneSchedulerBackend。在TaskSchedulerImpl的initialize方法中把StandaloneSchedulerBackend傳進來,從而賦值為TaskSchedulerImpl的backend;在TaskSchedulerImpl調用start方法時會調用backend.start方法,在start方法中會最終注冊應用程序。

下面來看SparkContext.scala的taskScheduler的啟動。

1.    val   (sched,    ts)   =  SparkContext.createTaskScheduler(this,  master,
      deployMode)
2.      _schedulerBackend = sched
3.      _taskScheduler = ts
4.      _dagScheduler = new DAGScheduler(this)
5.  ......
6.      _taskScheduler.start()
7.      _applicationId = _taskScheduler.applicationId()
8.      _applicationAttemptId = taskScheduler.applicationAttemptId()
9.      _conf.set("spark.app.id", _applicationId)
10. ......

其中調用了_taskScheduler的start方法。

1.   private[spark] trait TaskScheduler {
2.  ......
3.
4.    def start(): Unit
5.  .....

TaskScheduler的start()方法沒有具體實現。TaskScheduler子類的TaskSchedulerImpl的start()方法的源碼如下。

1.    override def start() {
2.     backend.start()
3.
4.     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
5.       logInfo("Starting speculative execution thread")
6.       speculationScheduler.scheduleAtFixedRate(new Runnable {
7.         override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
8.           checkSpeculatableTasks()
9.         }
10.      },   SPECULATION_INTERVAL_MS,        SPECULATION_INTERVAL_MS,   TimeUnit.
         MILLISECONDS)
11.    }
12.  }

TaskSchedulerImpl的start通過backend.start啟動了StandaloneSchedulerBackend的start方法。

StandaloneSchedulerBackend的start方法中,將command封裝注冊給Master,Master轉過來要Worker啟動具體的Executor。command已經封裝好指令,Executor具體要啟動進程入口類CoarseGrainedExecutorBackend。然后調用new()函數創建一個StandaloneAppClient,通過client.start啟動client。

StandaloneAppClient的start方法中調用new()函數創建一個ClientEndpoint:

1.   def start() {
2.    //啟動 rpcEndpoint; 將回調listener
3.    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
      (rpcEnv)))
4.  }

ClientEndpoint的源碼如下。

1.      private class ClientEndpoint(override val rpcEnv: RpcEnv) extends
        ThreadSafeRpcEndpoint
2.      with Logging {
3.  ......
4.      override def onStart(): Unit = {
5.        try {
6.          registerWithMaster(1)
7.        } catch {
8.          case e: Exception =>
9.            logWarning("Failed to connect to master", e)
10.       markDisconnected()
11.       stop()
12.    }
13.  }

ClientEndpoint是一個ThreadSafeRpcEndpoint。ClientEndpoint的onStart方法中調用registerWithMaster(1)進行注冊,向Master注冊程序。registerWithMaster方法如下。

1.     private def registerWithMaster(nthRetry: Int) {
2.     registerMasterFutures.set(tryRegisterAllMasters())
3.     registrationRetryTimer.set(registrationRetryThread.schedule(new
       Runnable {
4.       override def run(): Unit = {
5.         if (registered.get) {
6.           registerMasterFutures.get.foreach(_.cancel(true))
7.           registerMasterThreadPool.shutdownNow()
8.         } else if (nthRetry >= REGISTRATION_RETRIES) {
9.           markDead("All masters are unresponsive! Giving up.")
10.        } else {
11.          registerMasterFutures.get.foreach(_.cancel(true))
12.          registerWithMaster(nthRetry + 1)
13.        }
14.      }
15.    }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
16.  }

程序注冊后,Master通過schedule分配資源,通知Worker啟動Executor,Executor啟動的進程是CoarseGrainedExecutorBackend,Executor啟動后又轉過來向Driver注冊,Driver其實是StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend的一個消息循環體DriverEndpoint。

總結:

在SparkContext實例化的時候調用createTaskScheduler來創建TaskSchedulerImpl和StandaloneSchedulerBackend,同時在SparkContext實例化的時候會調用TaskSchedulerImpl的start,在start方法中會調用StandaloneSchedulerBackend的start,在該start方法中會創建StandaloneAppClient對象,并調用StandaloneAppClient對象的start方法,在該start方法中會創建ClientEndpoint,創建ClientEndpoint時會傳入Command來指定具體為當前應用程序啟動的Executor的入口類的名稱為CoarseGrainedExecutorBackend,然后ClientEndpoint啟動并通過tryRegisterMaster來注冊當前的應用程序到Master中,Master接收到注冊信息后如果可以運行程序,為該程序生產Job ID并通過schedule來分配計算資源,具體計算資源的分配是通過應用程序的運行方式、Memory、cores等配置信息決定的。最后,Master會發送指令給Worker,Worker為當前應用程序分配計算資源時會首先分配ExecutorRunner。ExecutorRunner內部會通過Thread的方式構建ProcessBuilder來啟動另外一個JVM進程,這個JVM進程啟動時加載的main方法所在的類的名稱就是在創建ClientEndpoint時傳入的Command來指定具體名稱為CoarseGrainedExecutorBackend的類,此時JVM在通過ProcessBuilder啟動時獲得了CoarseGrainedExecutorBackend后加載并調用其中的main方法,在main方法中會實例化CoarseGrainedExecutorBackend本身這個消息循環體,而CoarseGrainedExecutorBackend在實例化時會通過回調onStart向DriverEndpoint發送RegisterExecutor來注冊當前的CoarseGrainedExecutorBackend,此時DriverEndpoint收到該注冊信息并保存在StandaloneSchedulerBackend實例的內存數據結構中,這樣Driver就獲得了計算資源。

CoarseGrainedExecutorBackend.scala的main方法如下。

1.   def main(args: Array[String]) {
2.      var driverUrl: String = null
3.      var executorId: String = null
4.      var hostname: String = null
5.      var cores: Int = 0
6.      var appId: String = null
7.      var workerUrl: Option[String] = None
8.      val userClassPath = new mutable.ListBuffer[URL]()
9.
10.     var argv = args.toList
11.    ......
12.     run(driverUrl, executorId, hostname, cores, appId, workerUrl,
        userClassPath)
13.     System.exit(0)
14.   }

CoarseGrainedExecutorBackend的main然后開始調用run方法。

1.     private def run(
2.        driverUrl: String,
3.        executorId: String,
4.        hostname: String,
5.        cores: Int,
6.        appId: String,
7.        workerUrl: Option[String],
8.        userClassPath: Seq[URL]) {
9.  ......
10.        env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
11.         env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath,
            env))
12. ......

在CoarseGrainedExecutorBackend的main方法中,通過env.rpcEnv.setupEndpoint ("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))構建了CoarseGrainedExecutorBackend實例本身。

主站蜘蛛池模板: 田阳县| 鲁甸县| 阜城县| 松阳县| 淮滨县| 嘉善县| 佛冈县| 思南县| 汉源县| 普安县| 惠州市| 湖南省| 霍山县| 裕民县| 宁化县| 玉屏| 会东县| 七台河市| 莫力| 罗源县| 静海县| 南雄市| 武邑县| 磐安县| 甘洛县| 措勤县| 金堂县| 呼图壁县| 奈曼旗| 南开区| 新巴尔虎右旗| 奉化市| 锦州市| 黄山市| 禹州市| 揭东县| 平乐县| 连平县| 开江县| 通海县| 绿春县|