1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
| #coding=utf-8 import sys import matplotlib.pyplot as plt from numpy import *
#加载数据集,testSet.txt里面有三列,第一二列为输入,第三列为输出 def loadDataSet(): dataMat = \[\]; laberMat = \[\] fr = open('testSet.txt') for line in fr.readlines(): lineArr = line.strip().split(); dataMat.append(\[1.0, float(lineArr\[0\]), float(lineArr\[1\])\]) laberMat.append(int(lineArr\[2\]))
return dataMat, laberMat;
#Logistic函数 def sigmoid(inX): return 1.0 / (1 + exp(-inX))
#梯度上升算法 def gradAscent(dataMatIn, classLabels): #转换成numpy矩阵数据类型 dataMatrix = mat(dataMatIn) labelMat = mat(classLabels).transpose() m,n = shape(dataMatrix) alpha = 0.001 maxCycles = 500 weights = ones((n,1)) #maxCycles次循环,一次使用全部数据更新参数; for k in range(maxCycles): h = sigmoid(dataMatrix * weights) error = (labelMat - h) weights = weights + alpha * dataMatrix.transpose() * error
return weights
#改进的梯度上升算法,每次只取一个数据进行更新 def stocGradAscent0(dataMatrix, classLabels): m,n = shape(dataMatrix) alpha = 0.01 weights = ones(n) #每一个数据更新一次weights for i in range(m): h = sigmoid(sum(dataMatrix\[i\] * weights)) error = classLabels\[i\] - h weights = weights + alpha * error * dataMatrix\[i\]
return weights
#随机梯度上升法,避免了周期性的波动(随机的作用) def stocGradAscent1(dataMatrix, classLabels, numIter=150): m,n = shape(dataMatrix) weights = ones(n) for j in range(numIter): dataIndex = range(m) for i in range(m): alpha = 4/(1.0+j+i) + 0.01 randIndex = int(random.uniform(0, len(dataIndex))) h = sigmoid(sum(dataMatrix\[randIndex\]*weights)) error = classLabels\[randIndex\] - h weights = weights + alpha\*error\*dataMatrix\[randIndex\] del(dataIndex\[randIndex\])
return weights
#随机梯度下降法,注意其中的计算; def stocGradDescent(dataMatrix, classLabels, numIter=150): m,n = shape(dataMatrix) weights = ones(n) for j in range(numIter): dataIndex = range(m) for i in range(m): alpha = 4/(1.0+j+i) + 0.01 randIndex = int(random.uniform(0, len(dataIndex))) h = sigmoid(sum(dataMatrix\[randIndex\]*weights)) error = h - classLabels\[randIndex\] weights = weights - alpha\*error\*dataMatrix\[randIndex\] del(dataIndex\[randIndex\])
return weights
#绘制结果; def plotBestFit(weights): dataMat, labelMat = loadDataSet() dataArr = array(dataMat) n = shape(dataArr)\[0\] xcord1 = \[\]; ycord1 = \[\] xcord2 = \[\]; ycord2 = \[\] for i in range(n): if(int(labelMat\[i\]) == 1): xcord1.append(dataArr\[i,1\]); ycord1.append(dataArr\[i,2\]) else: xcord2.append(dataArr\[i,1\]); ycord2.append(dataArr\[i,2\])
fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(xcord1, ycord1, s=30, c='red', marker='s') ax.scatter(xcord2, ycord2, s=30, c='green') x = arange(-3.0, 3.0, 0.1) # w0 + w1x1 + w2x2 = 0,将x2看做y轴 y = (-weights\[0\] - weights\[1\]*x) / weights\[2\] ax.plot(x,y) plt.xlabel('X1'); plt.ylabel('X2')
plt.show()
#分类函数 def classifyVector(inX, weights): prob = sigmoid(sum(inX*weights)) if(prob > 0.5): return 1.0 else: return 0.0
#病马的分类训练主程序; def colicTest(): frTrain = open('horseColicTraining.txt') frTest = open('horseColicTest.txt') trainingSet = \[\]; trainingLabels = \[\] for line in frTrain.readlines(): currLine = line.strip().split('\\t') lineArr = \[\] for i in range(21): lineArr.append(float(currLine\[i\])) trainingSet.append(lineArr) trainingLabels.append(float(currLine\[21\]))
trainWeights = stocGradAscent1(array(trainingSet), trainingLabels, 500) errorCount = 0; numTestVec = 0.0 for line in frTest.readlines(): numTestVec += 1.0 currLine = line.strip().split('\\t') lineArr = \[\] for i in range(21): lineArr.append(float(currLine\[i\]))
if int(classifyVector(array(lineArr), trainWeights)) != int(currLine\[21\]): errorCount += 1
errorRate = (float(errorCount) / numTestVec) print "The error rate of this test is: %f" % errorRate
return errorRate
#多次测试取平均值; def multiTest(): numTests = 10; errorSum = 0.0 for k in range(numTests): errorSum += colicTest()
print "After %d iterations the average error rate is: % f" % (numTests, errorSum/float(numTests))
if \_\_name\_\_ == '\_\_main\_\_': #multiTest() #example dataArr, labelMat = loadDataSet() #stochastic gradient descent #weights = stocGradDescent(array(dataArr), labelMat) #stochastic gradient ascent weights = stocGradAscent1(array(dataArr), labelMat) plotBestFit(weights)
|